Re: [RFC v3 6/7] virtio: in order support for virtio_ring

From: Guo Zhi
Date: Thu Sep 08 2022 - 04:49:02 EST




----- Original Message -----
> From: "jasowang" <jasowang@xxxxxxxxxx>
> To: "Guo Zhi" <qtxuning1999@xxxxxxxxxxx>, "eperezma" <eperezma@xxxxxxxxxx>, "sgarzare" <sgarzare@xxxxxxxxxx>, "Michael
> Tsirkin" <mst@xxxxxxxxxx>
> Cc: "netdev" <netdev@xxxxxxxxxxxxxxx>, "linux-kernel" <linux-kernel@xxxxxxxxxxxxxxx>, "kvm list" <kvm@xxxxxxxxxxxxxxx>,
> "virtualization" <virtualization@xxxxxxxxxxxxxxxxxxxxxxxxxx>
> Sent: Wednesday, September 7, 2022 1:38:03 PM
> Subject: Re: [RFC v3 6/7] virtio: in order support for virtio_ring

> 在 2022/9/1 13:54, Guo Zhi 写道:
>> If in order feature negotiated, we can skip the used ring to get
>> buffer's desc id sequentially. For skipped buffers in the batch, the
>> used ring doesn't contain the buffer length, actually there is not need
>> to get skipped buffers' length as they are tx buffer.
>>
>> Signed-off-by: Guo Zhi <qtxuning1999@xxxxxxxxxxx>
>> ---
>> drivers/virtio/virtio_ring.c | 74 +++++++++++++++++++++++++++++++-----
>> 1 file changed, 64 insertions(+), 10 deletions(-)
>>
>> diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
>> index 00aa4b7a49c2..d52624179b43 100644
>> --- a/drivers/virtio/virtio_ring.c
>> +++ b/drivers/virtio/virtio_ring.c
>> @@ -103,6 +103,9 @@ struct vring_virtqueue {
>> /* Host supports indirect buffers */
>> bool indirect;
>>
>> + /* Host supports in order feature */
>> + bool in_order;
>> +
>> /* Host publishes avail event idx */
>> bool event;
>>
>> @@ -144,6 +147,19 @@ struct vring_virtqueue {
>> /* DMA address and size information */
>> dma_addr_t queue_dma_addr;
>> size_t queue_size_in_bytes;
>> +
>> + /* If in_order feature is negotiated, here is the next head to consume */
>> + u16 next_desc_begin;
>> + /*
>> + * If in_order feature is negotiated,
>> + * here is the last descriptor's id in the batch
>> + */
>> + u16 last_desc_in_batch;
>> + /*
>> + * If in_order feature is negotiated,
>> + * buffers except last buffer in the batch are skipped buffer
>> + */
>> + bool is_skipped_buffer;
>> } split;
>>
>> /* Available for packed ring */
>> @@ -584,8 +600,6 @@ static inline int virtqueue_add_split(struct virtqueue *_vq,
>> total_sg * sizeof(struct vring_desc),
>> VRING_DESC_F_INDIRECT,
>> false);
>> - vq->split.desc_extra[head & (vq->split.vring.num - 1)].flags &=
>> - ~VRING_DESC_F_NEXT;
>
>
> This seems irrelevant.
>
We have to unmask VRING_DESC_F_NEXT, so that we can calculate the length of a descriptor chain
in get_buf_ctx_split.
Thanks.
>
>> }
>>
>> /* We're using some buffers from the free list. */
>> @@ -701,8 +715,16 @@ static void detach_buf_split(struct vring_virtqueue *vq,
>> unsigned int head,
>> }
>>
>> vring_unmap_one_split(vq, i);
>> - vq->split.desc_extra[i].next = vq->free_head;
>> - vq->free_head = head;
>> + /*
>> + * If in_order feature is negotiated,
>> + * the descriptors are made available in order.
>> + * Since the free_head is already a circular list,
>> + * it must consume it sequentially.
>> + */
>> + if (!vq->in_order) {
>> + vq->split.desc_extra[i].next = vq->free_head;
>> + vq->free_head = head;
>> + }
>>
>> /* Plus final descriptor */
>> vq->vq.num_free++;
>> @@ -744,7 +766,7 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue
>> *_vq,
>> {
>> struct vring_virtqueue *vq = to_vvq(_vq);
>> void *ret;
>> - unsigned int i;
>> + unsigned int i, j;
>> u16 last_used;
>>
>> START_USE(vq);
>> @@ -763,11 +785,38 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue
>> *_vq,
>> /* Only get used array entries after they have been exposed by host. */
>> virtio_rmb(vq->weak_barriers);
>>
>> - last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
>> - i = virtio32_to_cpu(_vq->vdev,
>> - vq->split.vring.used->ring[last_used].id);
>> - *len = virtio32_to_cpu(_vq->vdev,
>> - vq->split.vring.used->ring[last_used].len);
>> + if (vq->in_order) {
>> + last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
>
>
> Let's move this beyond the in_order check.
>
Sorry for my mistake.
>
>> + if (!vq->split.is_skipped_buffer) {
>> + vq->split.last_desc_in_batch =
>> + virtio32_to_cpu(_vq->vdev,
>> + vq->split.vring.used->ring[last_used].id);
>> + vq->split.is_skipped_buffer = true;
>> + }
>> + /* For skipped buffers in batch, we can ignore the len info, simply set len
>> as 0 */
>
>
> This seems to break the caller that depends on a correct len.
>

IMHO, we can do this because the device will only batch for skipped buffers which is tx.

>
>> + if (vq->split.next_desc_begin != vq->split.last_desc_in_batch) {
>> + *len = 0;
>> + } else {
>> + *len = virtio32_to_cpu(_vq->vdev,
>> + vq->split.vring.used->ring[last_used].len);
>> + vq->split.is_skipped_buffer = false;
>> + }
>> + i = vq->split.next_desc_begin;
>> + j = i;
>> + /* Indirect only takes one descriptor in descriptor table */
>> + while (!vq->indirect && (vq->split.desc_extra[j].flags & VRING_DESC_F_NEXT))
>> + j = (j + 1) & (vq->split.vring.num - 1);
>
>
> Any reason indirect descriptors can't be chained?
>
>
>> + /* move to next */
>> + j = (j + 1) % vq->split.vring.num;
>> + /* Next buffer will use this descriptor in order */
>> + vq->split.next_desc_begin = j;
>
>
> Is it more efficient to poke the available ring?
>
> Thanks
>
>
>> + } else {
>> + last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
>> + i = virtio32_to_cpu(_vq->vdev,
>> + vq->split.vring.used->ring[last_used].id);
>> + *len = virtio32_to_cpu(_vq->vdev,
>> + vq->split.vring.used->ring[last_used].len);
>> + }
>>
>> if (unlikely(i >= vq->split.vring.num)) {
>> BAD_RING(vq, "id %u out of range\n", i);
>> @@ -2223,6 +2272,7 @@ struct virtqueue *__vring_new_virtqueue(unsigned int
>> index,
>>
>> vq->indirect = virtio_has_feature(vdev, VIRTIO_RING_F_INDIRECT_DESC) &&
>> !context;
>> + vq->in_order = virtio_has_feature(vdev, VIRTIO_F_IN_ORDER);
>> vq->event = virtio_has_feature(vdev, VIRTIO_RING_F_EVENT_IDX);
>>
>> if (virtio_has_feature(vdev, VIRTIO_F_ORDER_PLATFORM))
>> @@ -2235,6 +2285,10 @@ struct virtqueue *__vring_new_virtqueue(unsigned int
>> index,
>> vq->split.avail_flags_shadow = 0;
>> vq->split.avail_idx_shadow = 0;
>>
>> + vq->split.next_desc_begin = 0;
>> + vq->split.last_desc_in_batch = 0;
>> + vq->split.is_skipped_buffer = false;
>> +
>> /* No callback? Tell other side not to bother us. */
>> if (!callback) {
>> vq->split.avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;