[RFC PATCH net-next 12/12] vhost_net: batch submitting XDP buffers to underlayer sockets

From: Jason Wang
Date: Mon May 21 2018 - 04:10:45 EST


This patch implements XDP batching for vhost_net with tun. This is
done by batching XDP buffs in vhost and submit them when:

- vhost_net can not build XDP buff (mostly because of the size of packet)
- #batched exceeds the limitation (VHOST_NET_RX_BATCH).
- tun accept a batch of XDP buff through msg_control and process them
in a batch

With this tun XDP can benefit from e.g batch transmission during
XDP_REDIRECT or XDP_TX.

Tests shows 21% improvement on TX pps (from ~3.2Mpps to ~3.9Mpps)
while transmitting through testpmd from guest to host by
xdp_redirect_map between tap0 and ixgbe.

Signed-off-by: Jason Wang <jasowang@xxxxxxxxxx>
---
drivers/net/tun.c | 36 +++++++++++++++++----------
drivers/vhost/net.c | 71 ++++++++++++++++++++++++++++++++++++-----------------
2 files changed, 71 insertions(+), 36 deletions(-)

diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index b586b3f..5d16d18 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -1616,7 +1616,6 @@ static u32 tun_do_xdp(struct tun_struct *tun,
switch (act) {
case XDP_REDIRECT:
*err = xdp_do_redirect(tun->dev, xdp, xdp_prog);
- xdp_do_flush_map();
if (*err)
break;
goto out;
@@ -1624,7 +1623,6 @@ static u32 tun_do_xdp(struct tun_struct *tun,
*err = tun_xdp_tx(tun->dev, xdp);
if (*err)
break;
- tun_xdp_flush(tun->dev);
goto out;
case XDP_PASS:
goto out;
@@ -2400,9 +2398,6 @@ static int tun_xdp_one(struct tun_struct *tun,
int err = 0;
bool skb_xdp = false;

- preempt_disable();
- rcu_read_lock();
-
xdp_prog = rcu_dereference(tun->xdp_prog);
if (xdp_prog) {
if (gso->gso_type) {
@@ -2461,15 +2456,12 @@ static int tun_xdp_one(struct tun_struct *tun,
tun_flow_update(tun, rxhash, tfile);

out:
- rcu_read_unlock();
- preempt_enable();
-
return err;
}

static int tun_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len)
{
- int ret;
+ int ret, i;
struct tun_file *tfile = container_of(sock, struct tun_file, socket);
struct tun_struct *tun = tun_get(tfile);
struct tun_msg_ctl *ctl = m->msg_control;
@@ -2477,10 +2469,28 @@ static int tun_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len)
if (!tun)
return -EBADFD;

- if (ctl && ctl->type == TUN_MSG_PTR) {
- ret = tun_xdp_one(tun, tfile, ctl->ptr);
- if (!ret)
- ret = total_len;
+ if (ctl && ((ctl->type & 0xF) == TUN_MSG_PTR)) {
+ int n = ctl->type >> 16;
+
+ preempt_disable();
+ rcu_read_lock();
+
+ for (i = 0; i < n; i++) {
+ struct xdp_buff *x = (struct xdp_buff *)ctl->ptr;
+ struct xdp_buff *xdp = &x[i];
+
+ xdp_set_data_meta_invalid(xdp);
+ xdp->rxq = &tfile->xdp_rxq;
+ tun_xdp_one(tun, tfile, xdp);
+ }
+
+ xdp_do_flush_map();
+ tun_xdp_flush(tun->dev);
+
+ rcu_read_unlock();
+ preempt_enable();
+
+ ret = total_len;
goto out;
}

diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
index 0d84de6..bec4109 100644
--- a/drivers/vhost/net.c
+++ b/drivers/vhost/net.c
@@ -118,6 +118,7 @@ struct vhost_net_virtqueue {
struct ptr_ring *rx_ring;
struct vhost_net_buf rxq;
struct xdp_buff xdp[VHOST_RX_BATCH];
+ struct vring_used_elem heads[VHOST_RX_BATCH];
};

struct vhost_net {
@@ -511,7 +512,7 @@ static int vhost_net_build_xdp(struct vhost_net_virtqueue *nvq,
void *buf;
int copied;

- if (len < nvq->sock_hlen)
+ if (unlikely(len < nvq->sock_hlen))
return -EFAULT;

if (SKB_DATA_ALIGN(len + pad) +
@@ -567,11 +568,37 @@ static int vhost_net_build_xdp(struct vhost_net_virtqueue *nvq,
return 0;
}

+static void vhost_tx_batch(struct vhost_net *net,
+ struct vhost_net_virtqueue *nvq,
+ struct socket *sock,
+ struct msghdr *msghdr, int n)
+{
+ struct tun_msg_ctl ctl = {
+ .type = n << 16 | TUN_MSG_PTR,
+ .ptr = nvq->xdp,
+ };
+ int err;
+
+ if (n == 0)
+ return;
+
+ msghdr->msg_control = &ctl;
+ err = sock->ops->sendmsg(sock, msghdr, 0);
+
+ if (unlikely(err < 0)) {
+ /* FIXME vq_err() */
+ vq_err(&nvq->vq, "sendmsg err!\n");
+ return;
+ }
+ vhost_add_used_and_signal_n(&net->dev, &nvq->vq, nvq->vq.heads, n);
+}
+
+/* Expects to be always run from workqueue - which acts as
+ * read-size critical section for our kind of RCU. */
static void handle_tx_copy(struct vhost_net *net)
{
struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX];
struct vhost_virtqueue *vq = &nvq->vq;
- struct xdp_buff xdp;
unsigned out, in;
int head;
struct msghdr msg = {
@@ -586,7 +613,6 @@ static void handle_tx_copy(struct vhost_net *net)
size_t hdr_size;
struct socket *sock;
struct vhost_net_ubuf_ref *uninitialized_var(ubufs);
- struct tun_msg_ctl ctl;
int sent_pkts = 0;
s16 nheads = 0;

@@ -631,22 +657,24 @@ static void handle_tx_copy(struct vhost_net *net)
vq->heads[nheads].id = cpu_to_vhost32(vq, head);
vq->heads[nheads].len = 0;

- err = vhost_net_build_xdp(nvq, &msg.msg_iter, &xdp);
- if (!err) {
- ctl.type = TUN_MSG_PTR;
- ctl.ptr = &xdp;
- msg.msg_control = &ctl;
- } else
- msg.msg_control = NULL;
-
total_len += len;
- if (total_len < VHOST_NET_WEIGHT &&
- vhost_has_more_pkts(net, vq)) {
- msg.msg_flags |= MSG_MORE;
- } else {
- msg.msg_flags &= ~MSG_MORE;
+ err = vhost_net_build_xdp(nvq, &msg.msg_iter,
+ &nvq->xdp[nheads]);
+ if (!err) {
+ if (++nheads == VHOST_RX_BATCH) {
+ vhost_tx_batch(net, nvq, sock, &msg, nheads);
+ nheads = 0;
+ }
+ goto done;
+ } else if (unlikely(err != -ENOSPC)) {
+ vq_err(vq, "Fail to build XDP buffer\n");
+ break;
}

+ vhost_tx_batch(net, nvq, sock, &msg, nheads);
+ msg.msg_control = NULL;
+ nheads = 0;
+
/* TODO: Check specific error and bomb out unless ENOBUFS? */
err = sock->ops->sendmsg(sock, &msg, len);
if (unlikely(err < 0)) {
@@ -657,11 +685,9 @@ static void handle_tx_copy(struct vhost_net *net)
if (err != len)
pr_debug("Truncated TX packet: "
" len %d != %zd\n", err, len);
- if (++nheads == VHOST_RX_BATCH) {
- vhost_add_used_and_signal_n(&net->dev, vq, vq->heads,
- nheads);
- nheads = 0;
- }
+
+ vhost_add_used_and_signal(&net->dev, vq, head, 0);
+done:
if (vhost_exceeds_weight(++sent_pkts, total_len)) {
vhost_poll_queue(&vq->poll);
break;
@@ -669,8 +695,7 @@ static void handle_tx_copy(struct vhost_net *net)
}
out:
if (nheads)
- vhost_add_used_and_signal_n(&net->dev, vq, vq->heads,
- nheads);
+ vhost_tx_batch(net, nvq, sock, &msg, nheads);
mutex_unlock(&vq->mutex);
}

--
2.7.4