Re: [PATCH net-next] tuntap: introduce tx skb ring

From: Eric Dumazet
Date: Wed May 18 2016 - 12:41:11 EST


On Wed, 2016-05-18 at 19:26 +0300, Michael S. Tsirkin wrote:
> On Wed, May 18, 2016 at 10:13:59AM +0200, Jesper Dangaard Brouer wrote:
> > I agree. It is sad to see everybody is implementing the same thing,
> > open coding an array/circular based ring buffer. This kind of code is
> > hard to maintain and get right with barriers etc. We can achieve the
> > same performance with a generic implementation, by inlining the help
> > function calls.
>
> So my testing seems to show that at least for the common usecase
> in networking, which isn't lockless, circular buffer
> with indices does not perform that well, because
> each index access causes a cache line to bounce between
> CPUs, and index access causes stalls due to the dependency.


Yes.

>
> By comparison, an array of pointers where NULL means invalid
> and !NULL means valid, can be updated without messing up barriers
> at all and does not have this issue.

Right but then you need appropriate barriers.

>
> You also mentioned cache pressure caused by using large queues, and I
> think it's a significant issue. tun has a queue of 1000 entries by
> default and that's 8K already.
>
> So, I had an idea: with an array of pointers we could actually use
> only part of the ring as long as it stays mostly empty.
> We do want to fill at least two cache lines to prevent producer
> and consumer from writing over the same cache line all the time.
> This is SKB_ARRAY_MIN_SIZE logic below.
>
> Pls take a look at the implementation below. It's a straight port from virtio
> unit test, so should work fine, except the SKB_ARRAY_MIN_SIZE hack that
> I added. Today I run out of time for testing this. Posting for early
> flames/feedback.
>
> It's using skb pointers but we switching to void * would be easy at cost
> of type safety, though it appears that people want lockless push
> etc so I'm not sure of the value.
>
> --->
> skb_array: array based FIFO for skbs
>
> A simple array based FIFO of pointers.
> Intended for net stack so uses skbs for type
> safety, but we can replace with with void *
> if others find it useful outside of net stack.
>
> Signed-off-by: Michael S. Tsirkin <mst@xxxxxxxxxx>
>
> ---
>
> diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
> new file mode 100644
> index 0000000..a67cc8b
> --- /dev/null
> +++ b/include/linux/skb_array.h
> @@ -0,0 +1,116 @@
> +/*
> + * See Documentation/skbular-buffers.txt for more information.
> + */
> +
> +#ifndef _LINUX_SKB_ARRAY_H
> +#define _LINUX_SKB_ARRAY_H 1
> +
> +#include <linux/spinlock.h>
> +#include <linux/cache.h>
> +#include <linux/types.h>
> +#include <linux/compiler.h>
> +#include <linux/cache.h>
> +#include <linux/slab.h>
> +#include <asm/errno.h>
> +
> +struct sk_buff;
> +
> +struct skb_array {
> + int producer ____cacheline_aligned_in_smp;
> + spinlock_t producer_lock;
> + int consumer ____cacheline_aligned_in_smp;
> + spinlock_t consumer_lock;
> + /* Shared consumer/producer data */
> + int size ____cacheline_aligned_in_smp; /* max entries in queue */
> + struct sk_buff **queue;
> +};
> +
> +#define SKB_ARRAY_MIN_SIZE (2 * (0x1 << cache_line_size()) / \
> + sizeof (struct sk_buff *))
> +
> +static inline int __skb_array_produce(struct skb_array *a,
> + struct sk_buff *skb)
> +{
> + /* Try to start from beginning: good for cache utilization as we'll
> + * keep reusing the same cache line.
> + * Produce at least SKB_ARRAY_MIN_SIZE entries before trying to do this,
> + * to reduce bouncing cache lines between them.
> + */
> + if (a->producer >= SKB_ARRAY_MIN_SIZE && !a->queue[0])

a->queue[0] might be set by consumer, you probably need a barrier.

> + a->producer = 0;
> + if (a->queue[a->producer])
> + return -ENOSPC;
> + a->queue[a->producer] = skb;
> + if (unlikely(++a->producer > a->size))
> + a->producer = 0;
> + return 0;
> +}
> +
> +static inline int skb_array_produce_bh(struct skb_array *a,
> + struct sk_buff *skb)
> +{
> + int ret;
> +
> + spin_lock_bh(&a->producer_lock);
> + ret = __skb_array_produce(a, skb);
> + spin_unlock_bh(&a->producer_lock);
> +
> + return ret;
> +}
> +
> +static inline struct sk_buff *__skb_array_peek(struct skb_array *a)
> +{
> + if (a->queue[a->consumer])
> + return a->queue[a->consumer];
> +
> + /* Check whether producer started at the beginning. */
> + if (unlikely(a->consumer >= SKB_ARRAY_MIN_SIZE && a->queue[0])) {
> + a->consumer = 0;
> + return a->queue[0];
> + }
> +
> + return NULL;
> +}
> +
> +static inline void __skb_array_consume(struct skb_array *a)
> +{
> + a->queue[a->consumer++] = NULL;
> + if (unlikely(++a->consumer > a->size))

a->consumer is incremented twice ?

> + a->consumer = 0;
> +}
> +