Re: [PATCH net-next] tuntap: introduce tx skb ring
From: Michael S. Tsirkin
Date: Mon May 16 2016 - 00:23:37 EST
On Mon, May 16, 2016 at 09:17:01AM +0800, Jason Wang wrote:
> We used to queue tx packets in sk_receive_queue, this is less
> efficient since it requires spinlocks to synchronize between producer
> and consumer.
>
> This patch tries to address this by using circular buffer which allows
> lockless synchronization. This is done by switching from
> sk_receive_queue to a tx skb ring with a new flag IFF_TX_RING and when
> this is set:
Why do we need a new flag? Is there a userspace-visible
behaviour change?
>
> - store pointer to skb in circular buffer in tun_net_xmit(), and read
> it from the circular buffer in tun_do_read().
> - introduce a new proto_ops peek which could be implemented by
> specific socket which does not use sk_receive_queue.
> - store skb length in circular buffer too, and implement a lockless
> peek for tuntap.
> - change vhost_net to use proto_ops->peek() instead
> - new spinlocks were introduced to synchronize among producers (and so
> did for consumers).
>
> Pktgen test shows about 9% improvement on guest receiving pps:
>
> Before: ~1480000pps
> After : ~1610000pps
>
> (I'm not sure noblocking read is still needed, so it was not included
> in this patch)
How do you mean? Of course we must support blocking and non-blocking
read - userspace uses it.
> Signed-off-by: Jason Wang <jasowang@xxxxxxxxxx>
> ---
> ---
> drivers/net/tun.c | 157 +++++++++++++++++++++++++++++++++++++++++---
> drivers/vhost/net.c | 16 ++++-
> include/linux/net.h | 1 +
> include/uapi/linux/if_tun.h | 1 +
> 4 files changed, 165 insertions(+), 10 deletions(-)
>
> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
> index 425e983..6001ece 100644
> --- a/drivers/net/tun.c
> +++ b/drivers/net/tun.c
> @@ -71,6 +71,7 @@
> #include <net/sock.h>
> #include <linux/seq_file.h>
> #include <linux/uio.h>
> +#include <linux/circ_buf.h>
>
> #include <asm/uaccess.h>
>
> @@ -130,6 +131,8 @@ struct tap_filter {
> #define MAX_TAP_FLOWS 4096
>
> #define TUN_FLOW_EXPIRE (3 * HZ)
> +#define TUN_RING_SIZE 256
Can we resize this according to tx_queue_len set by user?
> +#define TUN_RING_MASK (TUN_RING_SIZE - 1)
>
> struct tun_pcpu_stats {
> u64 rx_packets;
> @@ -142,6 +145,11 @@ struct tun_pcpu_stats {
> u32 rx_frame_errors;
> };
>
> +struct tun_desc {
> + struct sk_buff *skb;
> + int len; /* Cached skb len for peeking */
> +};
> +
> /* A tun_file connects an open character device to a tuntap netdevice. It
> * also contains all socket related structures (except sock_fprog and tap_filter)
> * to serve as one transmit queue for tuntap device. The sock_fprog and
> @@ -167,6 +175,13 @@ struct tun_file {
> };
> struct list_head next;
> struct tun_struct *detached;
> + /* reader lock */
> + spinlock_t rlock;
> + unsigned long tail;
> + struct tun_desc tx_descs[TUN_RING_SIZE];
> + /* writer lock */
> + spinlock_t wlock;
> + unsigned long head;
> };
>
> struct tun_flow_entry {
> @@ -515,7 +530,27 @@ static struct tun_struct *tun_enable_queue(struct tun_file *tfile)
>
> static void tun_queue_purge(struct tun_file *tfile)
> {
> + unsigned long head, tail;
> + struct tun_desc *desc;
> + struct sk_buff *skb;
> skb_queue_purge(&tfile->sk.sk_receive_queue);
> + spin_lock(&tfile->rlock);
> +
> + head = ACCESS_ONCE(tfile->head);
> + tail = tfile->tail;
> +
> + /* read tail before reading descriptor at tail */
> + smp_rmb();
I think you mean read *head* here
> +
> + while (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) {
> + desc = &tfile->tx_descs[tail];
> + skb = desc->skb;
> + kfree_skb(skb);
> + tail = (tail + 1) & TUN_RING_MASK;
> + /* read descriptor before incrementing tail. */
> + smp_store_release(&tfile->tail, tail & TUN_RING_MASK);
> + }
> + spin_unlock(&tfile->rlock);
> skb_queue_purge(&tfile->sk.sk_error_queue);
> }
>
Barrier pairing seems messed up. Could you tag
each barrier with its pair pls?
E.g. add /* Barrier A for pairing */ Before barrier and
its pair.
> @@ -824,6 +859,7 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
> int txq = skb->queue_mapping;
> struct tun_file *tfile;
> u32 numqueues = 0;
> + unsigned long flags;
>
> rcu_read_lock();
> tfile = rcu_dereference(tun->tfiles[txq]);
> @@ -888,8 +924,35 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>
> nf_reset(skb);
>
> - /* Enqueue packet */
> - skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
> + if (tun->flags & IFF_TX_RING) {
> + unsigned long head, tail;
> +
> + spin_lock_irqsave(&tfile->wlock, flags);
> +
> + head = tfile->head;
> + tail = ACCESS_ONCE(tfile->tail);
this should be acquire
> +
> + if (CIRC_SPACE(head, tail, TUN_RING_SIZE) >= 1) {
> + struct tun_desc *desc = &tfile->tx_descs[head];
> +
> + desc->skb = skb;
> + desc->len = skb->len;
> + if (skb_vlan_tag_present(skb))
> + desc->len += VLAN_HLEN;
> +
> + /* read descriptor before incrementing head. */
write descriptor. so smp_wmb is enough.
> + smp_store_release(&tfile->head,
> + (head + 1) & TUN_RING_MASK);
> + } else {
> + spin_unlock_irqrestore(&tfile->wlock, flags);
> + goto drop;
> + }
> +
> + spin_unlock_irqrestore(&tfile->wlock, flags);
> + } else {
> + /* Enqueue packet */
> + skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
> + }
>
> /* Notify and wake up reader process */
> if (tfile->flags & TUN_FASYNC)
> @@ -1085,6 +1148,13 @@ static void tun_net_init(struct net_device *dev)
> }
> }
>
> +static bool tun_queue_not_empty(struct tun_file *tfile)
> +{
> + struct sock *sk = tfile->socket.sk;
> +
> + return (!skb_queue_empty(&sk->sk_receive_queue) ||
> + ACCESS_ONCE(tfile->head) != ACCESS_ONCE(tfile->tail));
> +}
> /* Character device part */
>
> /* Poll */
> @@ -1104,7 +1174,7 @@ static unsigned int tun_chr_poll(struct file *file, poll_table *wait)
>
> poll_wait(file, sk_sleep(sk), wait);
>
> - if (!skb_queue_empty(&sk->sk_receive_queue))
> + if (tun_queue_not_empty(tfile))
> mask |= POLLIN | POLLRDNORM;
>
> if (sock_writeable(sk) ||
> @@ -1494,11 +1564,36 @@ static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile,
> if (tun->dev->reg_state != NETREG_REGISTERED)
> return -EIO;
>
> - /* Read frames from queue */
> - skb = __skb_recv_datagram(tfile->socket.sk, noblock ? MSG_DONTWAIT : 0,
> - &peeked, &off, &err);
> - if (!skb)
> - return err;
> + if (tun->flags & IFF_TX_RING) {
> + unsigned long head, tail;
> + struct tun_desc *desc;
> +
> + spin_lock(&tfile->rlock);
> + head = ACCESS_ONCE(tfile->head);
> + tail = tfile->tail;
> +
> + if (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) {
> + /* read tail before reading descriptor at tail */
> + smp_rmb();
> + desc = &tfile->tx_descs[tail];
> + skb = desc->skb;
> + /* read descriptor before incrementing tail. */
> + smp_store_release(&tfile->tail,
> + (tail + 1) & TUN_RING_MASK);
same comments as purge, also - pls order code consistently.
> + } else {
> + spin_unlock(&tfile->rlock);
> + return -EAGAIN;
> + }
> +
> + spin_unlock(&tfile->rlock);
> + } else {
> + /* Read frames from queue */
> + skb = __skb_recv_datagram(tfile->socket.sk,
> + noblock ? MSG_DONTWAIT : 0,
> + &peeked, &off, &err);
> + if (!skb)
> + return err;
> + }
>
> ret = tun_put_user(tun, tfile, skb, to);
> if (unlikely(ret < 0))
> @@ -1629,8 +1724,47 @@ out:
> return ret;
> }
>
> +static int tun_peek(struct socket *sock, bool exact)
> +{
> + struct tun_file *tfile = container_of(sock, struct tun_file, socket);
> + struct sock *sk = sock->sk;
> + struct tun_struct *tun;
> + int ret = 0;
> +
> + if (!exact)
> + return tun_queue_not_empty(tfile);
> +
> + tun = __tun_get(tfile);
> + if (!tun)
> + return 0;
> +
> + if (tun->flags & IFF_TX_RING) {
> + unsigned long head = ACCESS_ONCE(tfile->head);
> + unsigned long tail = ACCESS_ONCE(tfile->tail);
> +
> + if (head != tail)
> + ret = tfile->tx_descs[tail].len;
no ordering at all here?
> + } else {
> + struct sk_buff *head;
> + unsigned long flags;
> +
> + spin_lock_irqsave(&sk->sk_receive_queue.lock, flags);
> + head = skb_peek(&sk->sk_receive_queue);
> + if (likely(head)) {
> + ret = head->len;
> + if (skb_vlan_tag_present(head))
> + ret += VLAN_HLEN;
> + }
> + spin_unlock_irqrestore(&sk->sk_receive_queue.lock, flags);
> + }
> +
> + tun_put(tun);
> + return ret;
> +}
> +
> /* Ops structure to mimic raw sockets with tun */
> static const struct proto_ops tun_socket_ops = {
> + .peek = tun_peek,
> .sendmsg = tun_sendmsg,
> .recvmsg = tun_recvmsg,
> };
> @@ -2306,13 +2440,13 @@ static int tun_chr_open(struct inode *inode, struct file * file)
> {
> struct net *net = current->nsproxy->net_ns;
> struct tun_file *tfile;
> -
> DBG1(KERN_INFO, "tunX: tun_chr_open\n");
>
> tfile = (struct tun_file *)sk_alloc(net, AF_UNSPEC, GFP_KERNEL,
> &tun_proto, 0);
> if (!tfile)
> return -ENOMEM;
> +
> RCU_INIT_POINTER(tfile->tun, NULL);
> tfile->flags = 0;
> tfile->ifindex = 0;
> @@ -2332,6 +2466,11 @@ static int tun_chr_open(struct inode *inode, struct file * file)
> INIT_LIST_HEAD(&tfile->next);
>
> sock_set_flag(&tfile->sk, SOCK_ZEROCOPY);
> + tfile->head = 0;
> + tfile->tail = 0;
> +
> + spin_lock_init(&tfile->rlock);
> + spin_lock_init(&tfile->wlock);
>
> return 0;
> }
> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
> index f744eeb..10ff494 100644
> --- a/drivers/vhost/net.c
> +++ b/drivers/vhost/net.c
> @@ -455,10 +455,14 @@ out:
>
> static int peek_head_len(struct sock *sk)
> {
> + struct socket *sock = sk->sk_socket;
> struct sk_buff *head;
> int len = 0;
> unsigned long flags;
>
> + if (sock->ops->peek)
> + return sock->ops->peek(sock, true);
> +
> spin_lock_irqsave(&sk->sk_receive_queue.lock, flags);
> head = skb_peek(&sk->sk_receive_queue);
> if (likely(head)) {
> @@ -471,6 +475,16 @@ static int peek_head_len(struct sock *sk)
> return len;
> }
>
> +static int sk_has_rx_data(struct sock *sk)
> +{
> + struct socket *sock = sk->sk_socket;
> +
> + if (sock->ops->peek)
> + return sock->ops->peek(sock, false);
> +
> + return skb_queue_empty(&sk->sk_receive_queue);
> +}
> +
> static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
> {
> struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX];
> @@ -487,7 +501,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
> endtime = busy_clock() + vq->busyloop_timeout;
>
> while (vhost_can_busy_poll(&net->dev, endtime) &&
> - skb_queue_empty(&sk->sk_receive_queue) &&
> + !sk_has_rx_data(sk) &&
> vhost_vq_avail_empty(&net->dev, vq))
> cpu_relax_lowlatency();
>
> diff --git a/include/linux/net.h b/include/linux/net.h
> index 72c1e06..3c4ecd5 100644
> --- a/include/linux/net.h
> +++ b/include/linux/net.h
> @@ -132,6 +132,7 @@ struct module;
> struct proto_ops {
> int family;
> struct module *owner;
> + int (*peek) (struct socket *sock, bool exact);
> int (*release) (struct socket *sock);
> int (*bind) (struct socket *sock,
> struct sockaddr *myaddr,
> diff --git a/include/uapi/linux/if_tun.h b/include/uapi/linux/if_tun.h
> index 3cb5e1d..d64ddc1 100644
> --- a/include/uapi/linux/if_tun.h
> +++ b/include/uapi/linux/if_tun.h
> @@ -61,6 +61,7 @@
> #define IFF_TUN 0x0001
> #define IFF_TAP 0x0002
> #define IFF_NO_PI 0x1000
> +#define IFF_TX_RING 0x0010
> /* This flag has no real effect */
> #define IFF_ONE_QUEUE 0x2000
> #define IFF_VNET_HDR 0x4000
> --
> 2.7.4