Re: [PATCH net-next] tuntap: introduce tx skb ring
From: Michael S. Tsirkin
Date: Wed May 18 2016 - 12:46:27 EST
On Wed, May 18, 2016 at 09:41:03AM -0700, Eric Dumazet wrote:
> On Wed, 2016-05-18 at 19:26 +0300, Michael S. Tsirkin wrote:
> > On Wed, May 18, 2016 at 10:13:59AM +0200, Jesper Dangaard Brouer wrote:
> > > I agree. It is sad to see everybody is implementing the same thing,
> > > open coding an array/circular based ring buffer. This kind of code is
> > > hard to maintain and get right with barriers etc. We can achieve the
> > > same performance with a generic implementation, by inlining the help
> > > function calls.
> >
> > So my testing seems to show that at least for the common usecase
> > in networking, which isn't lockless, circular buffer
> > with indices does not perform that well, because
> > each index access causes a cache line to bounce between
> > CPUs, and index access causes stalls due to the dependency.
>
>
> Yes.
>
> >
> > By comparison, an array of pointers where NULL means invalid
> > and !NULL means valid, can be updated without messing up barriers
> > at all and does not have this issue.
>
> Right but then you need appropriate barriers.
>
> >
> > You also mentioned cache pressure caused by using large queues, and I
> > think it's a significant issue. tun has a queue of 1000 entries by
> > default and that's 8K already.
> >
> > So, I had an idea: with an array of pointers we could actually use
> > only part of the ring as long as it stays mostly empty.
> > We do want to fill at least two cache lines to prevent producer
> > and consumer from writing over the same cache line all the time.
> > This is SKB_ARRAY_MIN_SIZE logic below.
> >
> > Pls take a look at the implementation below. It's a straight port from virtio
> > unit test, so should work fine, except the SKB_ARRAY_MIN_SIZE hack that
> > I added. Today I run out of time for testing this. Posting for early
> > flames/feedback.
> >
> > It's using skb pointers but we switching to void * would be easy at cost
> > of type safety, though it appears that people want lockless push
> > etc so I'm not sure of the value.
> >
> > --->
> > skb_array: array based FIFO for skbs
> >
> > A simple array based FIFO of pointers.
> > Intended for net stack so uses skbs for type
> > safety, but we can replace with with void *
> > if others find it useful outside of net stack.
> >
> > Signed-off-by: Michael S. Tsirkin <mst@xxxxxxxxxx>
> >
> > ---
> >
> > diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
> > new file mode 100644
> > index 0000000..a67cc8b
> > --- /dev/null
> > +++ b/include/linux/skb_array.h
> > @@ -0,0 +1,116 @@
> > +/*
> > + * See Documentation/skbular-buffers.txt for more information.
> > + */
> > +
> > +#ifndef _LINUX_SKB_ARRAY_H
> > +#define _LINUX_SKB_ARRAY_H 1
> > +
> > +#include <linux/spinlock.h>
> > +#include <linux/cache.h>
> > +#include <linux/types.h>
> > +#include <linux/compiler.h>
> > +#include <linux/cache.h>
> > +#include <linux/slab.h>
> > +#include <asm/errno.h>
> > +
> > +struct sk_buff;
> > +
> > +struct skb_array {
> > + int producer ____cacheline_aligned_in_smp;
> > + spinlock_t producer_lock;
> > + int consumer ____cacheline_aligned_in_smp;
> > + spinlock_t consumer_lock;
> > + /* Shared consumer/producer data */
> > + int size ____cacheline_aligned_in_smp; /* max entries in queue */
> > + struct sk_buff **queue;
> > +};
> > +
> > +#define SKB_ARRAY_MIN_SIZE (2 * (0x1 << cache_line_size()) / \
> > + sizeof (struct sk_buff *))
> > +
> > +static inline int __skb_array_produce(struct skb_array *a,
> > + struct sk_buff *skb)
> > +{
> > + /* Try to start from beginning: good for cache utilization as we'll
> > + * keep reusing the same cache line.
> > + * Produce at least SKB_ARRAY_MIN_SIZE entries before trying to do this,
> > + * to reduce bouncing cache lines between them.
> > + */
> > + if (a->producer >= SKB_ARRAY_MIN_SIZE && !a->queue[0])
>
> a->queue[0] might be set by consumer, you probably need a barrier.
I think not - we write to the same place below and two accesses to
same address are never reordered.
> > + a->producer = 0;
> > + if (a->queue[a->producer])
> > + return -ENOSPC;
> > + a->queue[a->producer] = skb;
> > + if (unlikely(++a->producer > a->size))
> > + a->producer = 0;
> > + return 0;
> > +}
> > +
> > +static inline int skb_array_produce_bh(struct skb_array *a,
> > + struct sk_buff *skb)
> > +{
> > + int ret;
> > +
> > + spin_lock_bh(&a->producer_lock);
> > + ret = __skb_array_produce(a, skb);
> > + spin_unlock_bh(&a->producer_lock);
> > +
> > + return ret;
> > +}
> > +
> > +static inline struct sk_buff *__skb_array_peek(struct skb_array *a)
> > +{
> > + if (a->queue[a->consumer])
> > + return a->queue[a->consumer];
> > +
> > + /* Check whether producer started at the beginning. */
> > + if (unlikely(a->consumer >= SKB_ARRAY_MIN_SIZE && a->queue[0])) {
> > + a->consumer = 0;
> > + return a->queue[0];
> > + }
> > +
> > + return NULL;
> > +}
> > +
> > +static inline void __skb_array_consume(struct skb_array *a)
> > +{
> > + a->queue[a->consumer++] = NULL;
> > + if (unlikely(++a->consumer > a->size))
>
> a->consumer is incremented twice ?
Oops.
> > + a->consumer = 0;
> > +}
> > +
>
>
>