[RFC v2 3/5] vhost/vsock: add support for vhost dgram.

From: Jiang Wang
Date: Tue Sep 14 2021 - 01:55:51 EST


This patch supports dgram on vhost side, including
tx and rx. The vhost send packets asynchronously.

Also, ignore vq errors when vq number is larger than 2,
so it will be comptaible with old versions.

Signed-off-by: Jiang Wang <jiang.wang@xxxxxxxxxxxxx>
---
drivers/vhost/vsock.c | 217 ++++++++++++++++++++++++++++++++++++------
1 file changed, 189 insertions(+), 28 deletions(-)

diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
index c79789af0365..a8755cbebd40 100644
--- a/drivers/vhost/vsock.c
+++ b/drivers/vhost/vsock.c
@@ -28,7 +28,10 @@
* small pkts.
*/
#define VHOST_VSOCK_PKT_WEIGHT 256
+#define VHOST_VSOCK_DGRM_MAX_PENDING_PKT 128

+/* Max wait time in busy poll in microseconds */
+#define VHOST_VSOCK_BUSY_POLL_TIMEOUT 20
enum {
VHOST_VSOCK_FEATURES = VHOST_FEATURES |
(1ULL << VIRTIO_F_ACCESS_PLATFORM) |
@@ -46,7 +49,7 @@ static DEFINE_READ_MOSTLY_HASHTABLE(vhost_vsock_hash, 8);

struct vhost_vsock {
struct vhost_dev dev;
- struct vhost_virtqueue vqs[2];
+ struct vhost_virtqueue vqs[4];

/* Link to global vhost_vsock_hash, writes use vhost_vsock_mutex */
struct hlist_node hash;
@@ -55,6 +58,11 @@ struct vhost_vsock {
spinlock_t send_pkt_list_lock;
struct list_head send_pkt_list; /* host->guest pending packets */

+ spinlock_t dgram_send_pkt_list_lock;
+ struct list_head dgram_send_pkt_list; /* host->guest pending packets */
+ struct vhost_work dgram_send_pkt_work;
+ int dgram_used; /*pending packets to be send */
+
atomic_t queued_replies;

u32 guest_cid;
@@ -92,10 +100,22 @@ static void
vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
struct vhost_virtqueue *vq)
{
- struct vhost_virtqueue *tx_vq = &vsock->vqs[VSOCK_VQ_TX];
+ struct vhost_virtqueue *tx_vq;
int pkts = 0, total_len = 0;
bool added = false;
bool restart_tx = false;
+ spinlock_t *lock;
+ struct list_head *send_pkt_list;
+
+ if (vq == &vsock->vqs[VSOCK_VQ_RX]) {
+ tx_vq = &vsock->vqs[VSOCK_VQ_TX];
+ lock = &vsock->send_pkt_list_lock;
+ send_pkt_list = &vsock->send_pkt_list;
+ } else {
+ tx_vq = &vsock->vqs[VSOCK_VQ_DGRAM_TX];
+ lock = &vsock->dgram_send_pkt_list_lock;
+ send_pkt_list = &vsock->dgram_send_pkt_list;
+ }

mutex_lock(&vq->mutex);

@@ -116,36 +136,48 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
size_t iov_len, payload_len;
int head;
bool restore_flag = false;
+ bool is_dgram = false;

- spin_lock_bh(&vsock->send_pkt_list_lock);
- if (list_empty(&vsock->send_pkt_list)) {
- spin_unlock_bh(&vsock->send_pkt_list_lock);
+ spin_lock_bh(lock);
+ if (list_empty(send_pkt_list)) {
+ spin_unlock_bh(lock);
vhost_enable_notify(&vsock->dev, vq);
break;
}

- pkt = list_first_entry(&vsock->send_pkt_list,
+ pkt = list_first_entry(send_pkt_list,
struct virtio_vsock_pkt, list);
list_del_init(&pkt->list);
- spin_unlock_bh(&vsock->send_pkt_list_lock);
+ spin_unlock_bh(lock);
+
+ if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM)
+ is_dgram = true;

head = vhost_get_vq_desc(vq, vq->iov, ARRAY_SIZE(vq->iov),
&out, &in, NULL, NULL);
if (head < 0) {
- spin_lock_bh(&vsock->send_pkt_list_lock);
- list_add(&pkt->list, &vsock->send_pkt_list);
- spin_unlock_bh(&vsock->send_pkt_list_lock);
+ spin_lock_bh(lock);
+ list_add(&pkt->list, send_pkt_list);
+ spin_unlock_bh(lock);
break;
}

if (head == vq->num) {
- spin_lock_bh(&vsock->send_pkt_list_lock);
- list_add(&pkt->list, &vsock->send_pkt_list);
- spin_unlock_bh(&vsock->send_pkt_list_lock);
+ if (is_dgram) {
+ virtio_transport_free_pkt(pkt);
+ vq_err(vq, "Dgram virtqueue is full!");
+ spin_lock_bh(lock);
+ vsock->dgram_used--;
+ spin_unlock_bh(lock);
+ break;
+ }
+ spin_lock_bh(lock);
+ list_add(&pkt->list, send_pkt_list);
+ spin_unlock_bh(lock);

/* We cannot finish yet if more buffers snuck in while
- * re-enabling notify.
- */
+ * re-enabling notify.
+ */
if (unlikely(vhost_enable_notify(&vsock->dev, vq))) {
vhost_disable_notify(&vsock->dev, vq);
continue;
@@ -156,6 +188,12 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
if (out) {
virtio_transport_free_pkt(pkt);
vq_err(vq, "Expected 0 output buffers, got %u\n", out);
+ if (is_dgram) {
+ spin_lock_bh(lock);
+ vsock->dgram_used--;
+ spin_unlock_bh(lock);
+ }
+
break;
}

@@ -163,6 +201,18 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
if (iov_len < sizeof(pkt->hdr)) {
virtio_transport_free_pkt(pkt);
vq_err(vq, "Buffer len [%zu] too small\n", iov_len);
+ if (is_dgram) {
+ spin_lock_bh(lock);
+ vsock->dgram_used--;
+ spin_unlock_bh(lock);
+ }
+ break;
+ }
+
+ if (iov_len < pkt->len - pkt->off &&
+ vq == &vsock->vqs[VSOCK_VQ_DGRAM_RX]) {
+ virtio_transport_free_pkt(pkt);
+ vq_err(vq, "Buffer len [%zu] too small for dgram\n", iov_len);
break;
}

@@ -199,6 +249,11 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
if (nbytes != sizeof(pkt->hdr)) {
virtio_transport_free_pkt(pkt);
vq_err(vq, "Faulted on copying pkt hdr\n");
+ if (is_dgram) {
+ spin_lock_bh(lock);
+ vsock->dgram_used--;
+ spin_unlock_bh(lock);
+ }
break;
}

@@ -224,19 +279,19 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
/* If we didn't send all the payload we can requeue the packet
* to send it with the next available buffer.
*/
- if (pkt->off < pkt->len) {
+ if ((pkt->off < pkt->len)
+ && (vq == &vsock->vqs[VSOCK_VQ_RX])) {
if (restore_flag)
pkt->hdr.flags |= cpu_to_le32(VIRTIO_VSOCK_SEQ_EOR);
-
/* We are queueing the same virtio_vsock_pkt to handle
* the remaining bytes, and we want to deliver it
* to monitoring devices in the next iteration.
*/
pkt->tap_delivered = false;

- spin_lock_bh(&vsock->send_pkt_list_lock);
- list_add(&pkt->list, &vsock->send_pkt_list);
- spin_unlock_bh(&vsock->send_pkt_list_lock);
+ spin_lock_bh(lock);
+ list_add(&pkt->list, send_pkt_list);
+ spin_unlock_bh(lock);
} else {
if (pkt->reply) {
int val;
@@ -251,6 +306,11 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
}

virtio_transport_free_pkt(pkt);
+ if (is_dgram) {
+ spin_lock_bh(lock);
+ vsock->dgram_used--;
+ spin_unlock_bh(lock);
+ }
}
} while(likely(!vhost_exceeds_weight(vq, ++pkts, total_len)));
if (added)
@@ -274,11 +334,25 @@ static void vhost_transport_send_pkt_work(struct vhost_work *work)
vhost_transport_do_send_pkt(vsock, vq);
}

+static void vhost_transport_dgram_send_pkt_work(struct vhost_work *work)
+{
+ struct vhost_virtqueue *vq;
+ struct vhost_vsock *vsock;
+
+ vsock = container_of(work, struct vhost_vsock, dgram_send_pkt_work);
+ vq = &vsock->vqs[VSOCK_VQ_DGRAM_RX];
+
+ vhost_transport_do_send_pkt(vsock, vq);
+}
+
static int
vhost_transport_send_pkt(struct virtio_vsock_pkt *pkt)
{
struct vhost_vsock *vsock;
int len = pkt->len;
+ spinlock_t *lock;
+ struct list_head *send_pkt_list;
+ struct vhost_work *work;

rcu_read_lock();

@@ -290,14 +364,39 @@ vhost_transport_send_pkt(struct virtio_vsock_pkt *pkt)
return -ENODEV;
}

+ if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_STREAM ||
+ le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_SEQPACKET) {
+ lock = &vsock->send_pkt_list_lock;
+ send_pkt_list = &vsock->send_pkt_list;
+ work = &vsock->send_pkt_work;
+ } else if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM) {
+ lock = &vsock->dgram_send_pkt_list_lock;
+ send_pkt_list = &vsock->dgram_send_pkt_list;
+ work = &vsock->dgram_send_pkt_work;
+ } else {
+ rcu_read_unlock();
+ virtio_transport_free_pkt(pkt);
+ return -EINVAL;
+ }
+
+
if (pkt->reply)
atomic_inc(&vsock->queued_replies);

- spin_lock_bh(&vsock->send_pkt_list_lock);
- list_add_tail(&pkt->list, &vsock->send_pkt_list);
- spin_unlock_bh(&vsock->send_pkt_list_lock);
+ spin_lock_bh(lock);
+ if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM) {
+ if (vsock->dgram_used == VHOST_VSOCK_DGRM_MAX_PENDING_PKT)
+ len = -ENOMEM;
+ else {
+ vsock->dgram_used++;
+ list_add_tail(&pkt->list, send_pkt_list);
+ }
+ } else
+ list_add_tail(&pkt->list, send_pkt_list);

- vhost_work_queue(&vsock->dev, &vsock->send_pkt_work);
+ spin_unlock_bh(lock);
+
+ vhost_work_queue(&vsock->dev, work);

rcu_read_unlock();
return len;
@@ -487,6 +586,18 @@ static bool vhost_transport_seqpacket_allow(u32 remote_cid)
return seqpacket_allow;
}

+static inline unsigned long busy_clock(void)
+{
+ return local_clock() >> 10;
+}
+
+static bool vhost_can_busy_poll(unsigned long endtime)
+{
+ return likely(!need_resched() && !time_after(busy_clock(), endtime) &&
+ !signal_pending(current));
+}
+
+
static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
{
struct vhost_virtqueue *vq = container_of(work, struct vhost_virtqueue,
@@ -497,6 +608,8 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
int head, pkts = 0, total_len = 0;
unsigned int out, in;
bool added = false;
+ unsigned long busyloop_timeout = VHOST_VSOCK_BUSY_POLL_TIMEOUT;
+ unsigned long endtime;

mutex_lock(&vq->mutex);

@@ -506,11 +619,14 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
if (!vq_meta_prefetch(vq))
goto out;

+ endtime = busy_clock() + busyloop_timeout;
vhost_disable_notify(&vsock->dev, vq);
+ preempt_disable();
do {
u32 len;

- if (!vhost_vsock_more_replies(vsock)) {
+ if (vq == &vsock->vqs[VSOCK_VQ_TX]
+ && !vhost_vsock_more_replies(vsock)) {
/* Stop tx until the device processes already
* pending replies. Leave tx virtqueue
* callbacks disabled.
@@ -524,6 +640,11 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
break;

if (head == vq->num) {
+ if (vhost_can_busy_poll(endtime)) {
+ cpu_relax();
+ continue;
+ }
+
if (unlikely(vhost_enable_notify(&vsock->dev, vq))) {
vhost_disable_notify(&vsock->dev, vq);
continue;
@@ -555,6 +676,7 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
total_len += len;
added = true;
} while(likely(!vhost_exceeds_weight(vq, ++pkts, total_len)));
+ preempt_enable();

no_more_replies:
if (added)
@@ -593,14 +715,30 @@ static int vhost_vsock_start(struct vhost_vsock *vsock)

if (!vhost_vq_access_ok(vq)) {
ret = -EFAULT;
+ /* when running with old guest and qemu, vq 2 may
+ * not exist, so return 0 in this case.
+ */
+ if (i == 2) {
+ mutex_unlock(&vq->mutex);
+ break;
+ }
goto err_vq;
}

if (!vhost_vq_get_backend(vq)) {
vhost_vq_set_backend(vq, vsock);
ret = vhost_vq_init_access(vq);
- if (ret)
- goto err_vq;
+ if (ret) {
+ mutex_unlock(&vq->mutex);
+ /* when running with old guest and qemu, vq 2 may
+ * not exist, so return 0 in this case.
+ */
+ if (i == 2) {
+ mutex_unlock(&vq->mutex);
+ break;
+ }
+ goto err;
+ }
}

mutex_unlock(&vq->mutex);
@@ -610,6 +748,7 @@ static int vhost_vsock_start(struct vhost_vsock *vsock)
* let's kick the send worker to send them.
*/
vhost_work_queue(&vsock->dev, &vsock->send_pkt_work);
+ vhost_work_queue(&vsock->dev, &vsock->dgram_send_pkt_work);

mutex_unlock(&vsock->dev.mutex);
return 0;
@@ -684,8 +823,14 @@ static int vhost_vsock_dev_open(struct inode *inode, struct file *file)

vqs[VSOCK_VQ_TX] = &vsock->vqs[VSOCK_VQ_TX];
vqs[VSOCK_VQ_RX] = &vsock->vqs[VSOCK_VQ_RX];
+ vqs[VSOCK_VQ_DGRAM_TX] = &vsock->vqs[VSOCK_VQ_DGRAM_TX];
+ vqs[VSOCK_VQ_DGRAM_RX] = &vsock->vqs[VSOCK_VQ_DGRAM_RX];
vsock->vqs[VSOCK_VQ_TX].handle_kick = vhost_vsock_handle_tx_kick;
vsock->vqs[VSOCK_VQ_RX].handle_kick = vhost_vsock_handle_rx_kick;
+ vsock->vqs[VSOCK_VQ_DGRAM_TX].handle_kick =
+ vhost_vsock_handle_tx_kick;
+ vsock->vqs[VSOCK_VQ_DGRAM_RX].handle_kick =
+ vhost_vsock_handle_rx_kick;

vhost_dev_init(&vsock->dev, vqs, ARRAY_SIZE(vsock->vqs),
UIO_MAXIOV, VHOST_VSOCK_PKT_WEIGHT,
@@ -695,6 +840,11 @@ static int vhost_vsock_dev_open(struct inode *inode, struct file *file)
spin_lock_init(&vsock->send_pkt_list_lock);
INIT_LIST_HEAD(&vsock->send_pkt_list);
vhost_work_init(&vsock->send_pkt_work, vhost_transport_send_pkt_work);
+ spin_lock_init(&vsock->dgram_send_pkt_list_lock);
+ INIT_LIST_HEAD(&vsock->dgram_send_pkt_list);
+ vhost_work_init(&vsock->dgram_send_pkt_work,
+ vhost_transport_dgram_send_pkt_work);
+
return 0;

out:
@@ -769,6 +919,17 @@ static int vhost_vsock_dev_release(struct inode *inode, struct file *file)
}
spin_unlock_bh(&vsock->send_pkt_list_lock);

+ spin_lock_bh(&vsock->dgram_send_pkt_list_lock);
+ while (!list_empty(&vsock->dgram_send_pkt_list)) {
+ struct virtio_vsock_pkt *pkt;
+
+ pkt = list_first_entry(&vsock->dgram_send_pkt_list,
+ struct virtio_vsock_pkt, list);
+ list_del_init(&pkt->list);
+ virtio_transport_free_pkt(pkt);
+ }
+ spin_unlock_bh(&vsock->dgram_send_pkt_list_lock);
+
vhost_dev_cleanup(&vsock->dev);
kfree(vsock->dev.vqs);
vhost_vsock_free(vsock);
@@ -954,7 +1115,7 @@ static int __init vhost_vsock_init(void)
int ret;

ret = vsock_core_register(&vhost_transport.transport,
- VSOCK_TRANSPORT_F_H2G);
+ VSOCK_TRANSPORT_F_H2G | VSOCK_TRANSPORT_F_DGRAM);
if (ret < 0)
return ret;
return misc_register(&vhost_vsock_misc);
--
2.20.1