Re: [RFC v2 3/5] vhost/vsock: add support for vhost dgram.
From: Michael S. Tsirkin
Date: Tue Sep 14 2021 - 02:53:01 EST
On Tue, Sep 14, 2021 at 05:54:36AM +0000, Jiang Wang wrote:
> This patch supports dgram on vhost side, including
> tx and rx. The vhost send packets asynchronously.
how exactly is fairness ensured? what prevents one socket
from monopolizing the device? spec proposal seems to
leave it to the implementation.
> Also, ignore vq errors when vq number is larger than 2,
> so it will be comptaible with old versions.
this needs more explanation.
> Signed-off-by: Jiang Wang <jiang.wang@xxxxxxxxxxxxx>
> ---
> drivers/vhost/vsock.c | 217 ++++++++++++++++++++++++++++++++++++------
> 1 file changed, 189 insertions(+), 28 deletions(-)
>
> diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
> index c79789af0365..a8755cbebd40 100644
> --- a/drivers/vhost/vsock.c
> +++ b/drivers/vhost/vsock.c
> @@ -28,7 +28,10 @@
> * small pkts.
> */
> #define VHOST_VSOCK_PKT_WEIGHT 256
> +#define VHOST_VSOCK_DGRM_MAX_PENDING_PKT 128
>
> +/* Max wait time in busy poll in microseconds */
> +#define VHOST_VSOCK_BUSY_POLL_TIMEOUT 20
While adding busy polling isn't out of the question, I would think
it should be kept separate from initial dgram support.
> enum {
> VHOST_VSOCK_FEATURES = VHOST_FEATURES |
> (1ULL << VIRTIO_F_ACCESS_PLATFORM) |
> @@ -46,7 +49,7 @@ static DEFINE_READ_MOSTLY_HASHTABLE(vhost_vsock_hash, 8);
>
> struct vhost_vsock {
> struct vhost_dev dev;
> - struct vhost_virtqueue vqs[2];
> + struct vhost_virtqueue vqs[4];
>
> /* Link to global vhost_vsock_hash, writes use vhost_vsock_mutex */
> struct hlist_node hash;
> @@ -55,6 +58,11 @@ struct vhost_vsock {
> spinlock_t send_pkt_list_lock;
> struct list_head send_pkt_list; /* host->guest pending packets */
>
> + spinlock_t dgram_send_pkt_list_lock;
> + struct list_head dgram_send_pkt_list; /* host->guest pending packets */
> + struct vhost_work dgram_send_pkt_work;
> + int dgram_used; /*pending packets to be send */
to be sent?
> +
> atomic_t queued_replies;
>
> u32 guest_cid;
> @@ -92,10 +100,22 @@ static void
> vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
> struct vhost_virtqueue *vq)
> {
> - struct vhost_virtqueue *tx_vq = &vsock->vqs[VSOCK_VQ_TX];
> + struct vhost_virtqueue *tx_vq;
> int pkts = 0, total_len = 0;
> bool added = false;
> bool restart_tx = false;
> + spinlock_t *lock;
> + struct list_head *send_pkt_list;
> +
> + if (vq == &vsock->vqs[VSOCK_VQ_RX]) {
> + tx_vq = &vsock->vqs[VSOCK_VQ_TX];
> + lock = &vsock->send_pkt_list_lock;
> + send_pkt_list = &vsock->send_pkt_list;
> + } else {
> + tx_vq = &vsock->vqs[VSOCK_VQ_DGRAM_TX];
> + lock = &vsock->dgram_send_pkt_list_lock;
> + send_pkt_list = &vsock->dgram_send_pkt_list;
> + }
>
> mutex_lock(&vq->mutex);
>
> @@ -116,36 +136,48 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
> size_t iov_len, payload_len;
> int head;
> bool restore_flag = false;
> + bool is_dgram = false;
>
> - spin_lock_bh(&vsock->send_pkt_list_lock);
> - if (list_empty(&vsock->send_pkt_list)) {
> - spin_unlock_bh(&vsock->send_pkt_list_lock);
> + spin_lock_bh(lock);
> + if (list_empty(send_pkt_list)) {
> + spin_unlock_bh(lock);
> vhost_enable_notify(&vsock->dev, vq);
> break;
> }
>
> - pkt = list_first_entry(&vsock->send_pkt_list,
> + pkt = list_first_entry(send_pkt_list,
> struct virtio_vsock_pkt, list);
> list_del_init(&pkt->list);
> - spin_unlock_bh(&vsock->send_pkt_list_lock);
> + spin_unlock_bh(lock);
> +
> + if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM)
> + is_dgram = true;
>
> head = vhost_get_vq_desc(vq, vq->iov, ARRAY_SIZE(vq->iov),
> &out, &in, NULL, NULL);
> if (head < 0) {
> - spin_lock_bh(&vsock->send_pkt_list_lock);
> - list_add(&pkt->list, &vsock->send_pkt_list);
> - spin_unlock_bh(&vsock->send_pkt_list_lock);
> + spin_lock_bh(lock);
> + list_add(&pkt->list, send_pkt_list);
> + spin_unlock_bh(lock);
> break;
> }
>
> if (head == vq->num) {
> - spin_lock_bh(&vsock->send_pkt_list_lock);
> - list_add(&pkt->list, &vsock->send_pkt_list);
> - spin_unlock_bh(&vsock->send_pkt_list_lock);
> + if (is_dgram) {
> + virtio_transport_free_pkt(pkt);
> + vq_err(vq, "Dgram virtqueue is full!");
> + spin_lock_bh(lock);
> + vsock->dgram_used--;
> + spin_unlock_bh(lock);
> + break;
> + }
> + spin_lock_bh(lock);
> + list_add(&pkt->list, send_pkt_list);
> + spin_unlock_bh(lock);
>
> /* We cannot finish yet if more buffers snuck in while
> - * re-enabling notify.
> - */
> + * re-enabling notify.
> + */
looks like you are breaking alignment of comments...
> if (unlikely(vhost_enable_notify(&vsock->dev, vq))) {
> vhost_disable_notify(&vsock->dev, vq);
> continue;
> @@ -156,6 +188,12 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
> if (out) {
> virtio_transport_free_pkt(pkt);
> vq_err(vq, "Expected 0 output buffers, got %u\n", out);
> + if (is_dgram) {
> + spin_lock_bh(lock);
> + vsock->dgram_used--;
> + spin_unlock_bh(lock);
> + }
> +
> break;
> }
>
> @@ -163,6 +201,18 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
> if (iov_len < sizeof(pkt->hdr)) {
> virtio_transport_free_pkt(pkt);
> vq_err(vq, "Buffer len [%zu] too small\n", iov_len);
> + if (is_dgram) {
> + spin_lock_bh(lock);
> + vsock->dgram_used--;
> + spin_unlock_bh(lock);
> + }
> + break;
> + }
> +
> + if (iov_len < pkt->len - pkt->off &&
> + vq == &vsock->vqs[VSOCK_VQ_DGRAM_RX]) {
> + virtio_transport_free_pkt(pkt);
> + vq_err(vq, "Buffer len [%zu] too small for dgram\n", iov_len);
> break;
> }
>
> @@ -199,6 +249,11 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
> if (nbytes != sizeof(pkt->hdr)) {
> virtio_transport_free_pkt(pkt);
> vq_err(vq, "Faulted on copying pkt hdr\n");
> + if (is_dgram) {
> + spin_lock_bh(lock);
> + vsock->dgram_used--;
> + spin_unlock_bh(lock);
> + }
> break;
> }
>
> @@ -224,19 +279,19 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
> /* If we didn't send all the payload we can requeue the packet
> * to send it with the next available buffer.
> */
> - if (pkt->off < pkt->len) {
> + if ((pkt->off < pkt->len)
> + && (vq == &vsock->vqs[VSOCK_VQ_RX])) {
&& on the previous line and no need for extra ().
> if (restore_flag)
> pkt->hdr.flags |= cpu_to_le32(VIRTIO_VSOCK_SEQ_EOR);
> -
no need to drop this imho.
> /* We are queueing the same virtio_vsock_pkt to handle
> * the remaining bytes, and we want to deliver it
> * to monitoring devices in the next iteration.
> */
> pkt->tap_delivered = false;
>
> - spin_lock_bh(&vsock->send_pkt_list_lock);
> - list_add(&pkt->list, &vsock->send_pkt_list);
> - spin_unlock_bh(&vsock->send_pkt_list_lock);
> + spin_lock_bh(lock);
> + list_add(&pkt->list, send_pkt_list);
> + spin_unlock_bh(lock);
> } else {
> if (pkt->reply) {
> int val;
> @@ -251,6 +306,11 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
> }
>
> virtio_transport_free_pkt(pkt);
> + if (is_dgram) {
> + spin_lock_bh(lock);
> + vsock->dgram_used--;
> + spin_unlock_bh(lock);
> + }
> }
> } while(likely(!vhost_exceeds_weight(vq, ++pkts, total_len)));
> if (added)
> @@ -274,11 +334,25 @@ static void vhost_transport_send_pkt_work(struct vhost_work *work)
> vhost_transport_do_send_pkt(vsock, vq);
> }
>
> +static void vhost_transport_dgram_send_pkt_work(struct vhost_work *work)
> +{
> + struct vhost_virtqueue *vq;
> + struct vhost_vsock *vsock;
> +
> + vsock = container_of(work, struct vhost_vsock, dgram_send_pkt_work);
> + vq = &vsock->vqs[VSOCK_VQ_DGRAM_RX];
> +
> + vhost_transport_do_send_pkt(vsock, vq);
> +}
> +
> static int
> vhost_transport_send_pkt(struct virtio_vsock_pkt *pkt)
> {
> struct vhost_vsock *vsock;
> int len = pkt->len;
> + spinlock_t *lock;
> + struct list_head *send_pkt_list;
> + struct vhost_work *work;
>
> rcu_read_lock();
>
> @@ -290,14 +364,39 @@ vhost_transport_send_pkt(struct virtio_vsock_pkt *pkt)
> return -ENODEV;
> }
>
> + if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_STREAM ||
> + le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_SEQPACKET) {
> + lock = &vsock->send_pkt_list_lock;
> + send_pkt_list = &vsock->send_pkt_list;
> + work = &vsock->send_pkt_work;
> + } else if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM) {
> + lock = &vsock->dgram_send_pkt_list_lock;
> + send_pkt_list = &vsock->dgram_send_pkt_list;
> + work = &vsock->dgram_send_pkt_work;
> + } else {
> + rcu_read_unlock();
> + virtio_transport_free_pkt(pkt);
> + return -EINVAL;
> + }
> +
> +
> if (pkt->reply)
> atomic_inc(&vsock->queued_replies);
>
> - spin_lock_bh(&vsock->send_pkt_list_lock);
> - list_add_tail(&pkt->list, &vsock->send_pkt_list);
> - spin_unlock_bh(&vsock->send_pkt_list_lock);
> + spin_lock_bh(lock);
> + if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM) {
> + if (vsock->dgram_used == VHOST_VSOCK_DGRM_MAX_PENDING_PKT)
> + len = -ENOMEM;
> + else {
> + vsock->dgram_used++;
> + list_add_tail(&pkt->list, send_pkt_list);
> + }
> + } else
> + list_add_tail(&pkt->list, send_pkt_list);
>
> - vhost_work_queue(&vsock->dev, &vsock->send_pkt_work);
> + spin_unlock_bh(lock);
> +
> + vhost_work_queue(&vsock->dev, work);
>
> rcu_read_unlock();
> return len;
> @@ -487,6 +586,18 @@ static bool vhost_transport_seqpacket_allow(u32 remote_cid)
> return seqpacket_allow;
> }
>
> +static inline unsigned long busy_clock(void)
> +{
> + return local_clock() >> 10;
> +}
> +
> +static bool vhost_can_busy_poll(unsigned long endtime)
> +{
> + return likely(!need_resched() && !time_after(busy_clock(), endtime) &&
> + !signal_pending(current));
> +}
> +
> +
> static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
> {
> struct vhost_virtqueue *vq = container_of(work, struct vhost_virtqueue,
> @@ -497,6 +608,8 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
> int head, pkts = 0, total_len = 0;
> unsigned int out, in;
> bool added = false;
> + unsigned long busyloop_timeout = VHOST_VSOCK_BUSY_POLL_TIMEOUT;
> + unsigned long endtime;
>
> mutex_lock(&vq->mutex);
>
> @@ -506,11 +619,14 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
> if (!vq_meta_prefetch(vq))
> goto out;
>
> + endtime = busy_clock() + busyloop_timeout;
> vhost_disable_notify(&vsock->dev, vq);
> + preempt_disable();
That's quite a bit of work done with preempt disabled. Why?
What's going on here?
> do {
> u32 len;
>
> - if (!vhost_vsock_more_replies(vsock)) {
> + if (vq == &vsock->vqs[VSOCK_VQ_TX]
> + && !vhost_vsock_more_replies(vsock)) {
> /* Stop tx until the device processes already
> * pending replies. Leave tx virtqueue
> * callbacks disabled.
> @@ -524,6 +640,11 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
> break;
>
> if (head == vq->num) {
> + if (vhost_can_busy_poll(endtime)) {
> + cpu_relax();
> + continue;
> + }
> +
> if (unlikely(vhost_enable_notify(&vsock->dev, vq))) {
> vhost_disable_notify(&vsock->dev, vq);
> continue;
> @@ -555,6 +676,7 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
> total_len += len;
> added = true;
> } while(likely(!vhost_exceeds_weight(vq, ++pkts, total_len)));
> + preempt_enable();
>
> no_more_replies:
> if (added)
> @@ -593,14 +715,30 @@ static int vhost_vsock_start(struct vhost_vsock *vsock)
>
> if (!vhost_vq_access_ok(vq)) {
> ret = -EFAULT;
> + /* when running with old guest and qemu, vq 2 may
> + * not exist, so return 0 in this case.
Can't the new stuff be gated by a feature bit as usual?
> + */
> + if (i == 2) {
> + mutex_unlock(&vq->mutex);
> + break;
> + }
> goto err_vq;
> }
>
> if (!vhost_vq_get_backend(vq)) {
> vhost_vq_set_backend(vq, vsock);
> ret = vhost_vq_init_access(vq);
> - if (ret)
> - goto err_vq;
> + if (ret) {
> + mutex_unlock(&vq->mutex);
> + /* when running with old guest and qemu, vq 2 may
> + * not exist, so return 0 in this case.
> + */
> + if (i == 2) {
> + mutex_unlock(&vq->mutex);
> + break;
> + }
> + goto err;
> + }
> }
>
> mutex_unlock(&vq->mutex);
> @@ -610,6 +748,7 @@ static int vhost_vsock_start(struct vhost_vsock *vsock)
> * let's kick the send worker to send them.
> */
> vhost_work_queue(&vsock->dev, &vsock->send_pkt_work);
> + vhost_work_queue(&vsock->dev, &vsock->dgram_send_pkt_work);
>
> mutex_unlock(&vsock->dev.mutex);
> return 0;
> @@ -684,8 +823,14 @@ static int vhost_vsock_dev_open(struct inode *inode, struct file *file)
>
> vqs[VSOCK_VQ_TX] = &vsock->vqs[VSOCK_VQ_TX];
> vqs[VSOCK_VQ_RX] = &vsock->vqs[VSOCK_VQ_RX];
> + vqs[VSOCK_VQ_DGRAM_TX] = &vsock->vqs[VSOCK_VQ_DGRAM_TX];
> + vqs[VSOCK_VQ_DGRAM_RX] = &vsock->vqs[VSOCK_VQ_DGRAM_RX];
> vsock->vqs[VSOCK_VQ_TX].handle_kick = vhost_vsock_handle_tx_kick;
> vsock->vqs[VSOCK_VQ_RX].handle_kick = vhost_vsock_handle_rx_kick;
> + vsock->vqs[VSOCK_VQ_DGRAM_TX].handle_kick =
> + vhost_vsock_handle_tx_kick;
> + vsock->vqs[VSOCK_VQ_DGRAM_RX].handle_kick =
> + vhost_vsock_handle_rx_kick;
>
> vhost_dev_init(&vsock->dev, vqs, ARRAY_SIZE(vsock->vqs),
> UIO_MAXIOV, VHOST_VSOCK_PKT_WEIGHT,
> @@ -695,6 +840,11 @@ static int vhost_vsock_dev_open(struct inode *inode, struct file *file)
> spin_lock_init(&vsock->send_pkt_list_lock);
> INIT_LIST_HEAD(&vsock->send_pkt_list);
> vhost_work_init(&vsock->send_pkt_work, vhost_transport_send_pkt_work);
> + spin_lock_init(&vsock->dgram_send_pkt_list_lock);
> + INIT_LIST_HEAD(&vsock->dgram_send_pkt_list);
> + vhost_work_init(&vsock->dgram_send_pkt_work,
> + vhost_transport_dgram_send_pkt_work);
> +
> return 0;
>
> out:
> @@ -769,6 +919,17 @@ static int vhost_vsock_dev_release(struct inode *inode, struct file *file)
> }
> spin_unlock_bh(&vsock->send_pkt_list_lock);
>
> + spin_lock_bh(&vsock->dgram_send_pkt_list_lock);
> + while (!list_empty(&vsock->dgram_send_pkt_list)) {
> + struct virtio_vsock_pkt *pkt;
> +
> + pkt = list_first_entry(&vsock->dgram_send_pkt_list,
> + struct virtio_vsock_pkt, list);
> + list_del_init(&pkt->list);
> + virtio_transport_free_pkt(pkt);
> + }
> + spin_unlock_bh(&vsock->dgram_send_pkt_list_lock);
> +
> vhost_dev_cleanup(&vsock->dev);
> kfree(vsock->dev.vqs);
> vhost_vsock_free(vsock);
> @@ -954,7 +1115,7 @@ static int __init vhost_vsock_init(void)
> int ret;
>
> ret = vsock_core_register(&vhost_transport.transport,
> - VSOCK_TRANSPORT_F_H2G);
> + VSOCK_TRANSPORT_F_H2G | VSOCK_TRANSPORT_F_DGRAM);
> if (ret < 0)
> return ret;
> return misc_register(&vhost_vsock_misc);
> --
> 2.20.1