Re: [RFC v2] virtio: support packed ring

From: Tiwei Bie
Date: Fri Apr 13 2018 - 03:18:06 EST


On Fri, Apr 13, 2018 at 12:30:24PM +0800, Jason Wang wrote:
> On 2018å04æ01æ 22:12, Tiwei Bie wrote:
> > Hello everyone,
> >
> > This RFC implements packed ring support for virtio driver.
> >
> > The code was tested with DPDK vhost (testpmd/vhost-PMD) implemented
> > by Jens at http://dpdk.org/ml/archives/dev/2018-January/089417.html
> > Minor changes are needed for the vhost code, e.g. to kick the guest.
> >
> > TODO:
> > - Refinements and bug fixes;
> > - Split into small patches;
> > - Test indirect descriptor support;
> > - Test/fix event suppression support;
> > - Test devices other than net;
> >
> > RFC v1 -> RFC v2:
> > - Add indirect descriptor support - compile test only;
> > - Add event suppression supprt - compile test only;
> > - Move vring_packed_init() out of uapi (Jason, MST);
> > - Merge two loops into one in virtqueue_add_packed() (Jason);
> > - Split vring_unmap_one() for packed ring and split ring (Jason);
> > - Avoid using '%' operator (Jason);
> > - Rename free_head -> next_avail_idx (Jason);
> > - Add comments for virtio_wmb() in virtqueue_add_packed() (Jason);
> > - Some other refinements and bug fixes;
> >
> > Thanks!
> >
> > Signed-off-by: Tiwei Bie <tiwei.bie@xxxxxxxxx>
> > ---
> > drivers/virtio/virtio_ring.c | 1094 +++++++++++++++++++++++++++++-------
> > include/linux/virtio_ring.h | 8 +-
> > include/uapi/linux/virtio_config.h | 12 +-
> > include/uapi/linux/virtio_ring.h | 61 ++
> > 4 files changed, 980 insertions(+), 195 deletions(-)
[...]
> > +static struct vring_packed_desc *alloc_indirect_packed(struct virtqueue *_vq,
> > + unsigned int total_sg,
> > + gfp_t gfp)
> > +{
> > + struct vring_packed_desc *desc;
> > +
> > + /*
> > + * We require lowmem mappings for the descriptors because
> > + * otherwise virt_to_phys will give us bogus addresses in the
> > + * virtqueue.
> > + */
> > + gfp &= ~__GFP_HIGHMEM;
> > +
> > + desc = kmalloc(total_sg * sizeof(struct vring_packed_desc), gfp);
>
> Can we simply check vq->packed here to avoid duplicating helpers?

Then it would be something like this:

static void *alloc_indirect(struct virtqueue *_vq, unsigned int total_sg,
gfp_t gfp)
{
struct vring_virtqueue *vq = to_vvq(_vq);
void *data;

/*
* We require lowmem mappings for the descriptors because
* otherwise virt_to_phys will give us bogus addresses in the
* virtqueue.
*/
gfp &= ~__GFP_HIGHMEM;

if (vq->packed) {
data = kmalloc(total_sg * sizeof(struct vring_packed_desc),
gfp);
if (!data)
return NULL;
} else {
struct vring_desc *desc;
unsigned int i;

desc = kmalloc(total_sg * sizeof(struct vring_desc), gfp);
if (!desc)
return NULL;

for (i = 0; i < total_sg; i++)
desc[i].next = cpu_to_virtio16(_vq->vdev, i + 1);

data = desc;
}

return data;
}

I would prefer to have two simpler helpers (and to the callers,
it's already very clear about which one they should call), i.e.
the current implementation:

static struct vring_packed_desc *alloc_indirect_packed(struct virtqueue *_vq,
unsigned int total_sg,
gfp_t gfp)
{
struct vring_packed_desc *desc;

/*
* We require lowmem mappings for the descriptors because
* otherwise virt_to_phys will give us bogus addresses in the
* virtqueue.
*/
gfp &= ~__GFP_HIGHMEM;

desc = kmalloc(total_sg * sizeof(struct vring_packed_desc), gfp);

return desc;
}

static struct vring_desc *alloc_indirect_split(struct virtqueue *_vq,
unsigned int total_sg,
gfp_t gfp)
{
struct vring_desc *desc;
unsigned int i;

/*
* We require lowmem mappings for the descriptors because
* otherwise virt_to_phys will give us bogus addresses in the
* virtqueue.
*/
gfp &= ~__GFP_HIGHMEM;

desc = kmalloc(total_sg * sizeof(struct vring_desc), gfp);
if (!desc)
return NULL;

for (i = 0; i < total_sg; i++)
desc[i].next = cpu_to_virtio16(_vq->vdev, i + 1);
return desc;
}

>
> > +
> > + return desc;
> > +}
[...]
> > +static inline int virtqueue_add_packed(struct virtqueue *_vq,
> > + struct scatterlist *sgs[],
> > + unsigned int total_sg,
> > + unsigned int out_sgs,
> > + unsigned int in_sgs,
> > + void *data,
> > + void *ctx,
> > + gfp_t gfp)
> > +{
> > + struct vring_virtqueue *vq = to_vvq(_vq);
> > + struct vring_packed_desc *desc;
> > + struct scatterlist *sg;
> > + unsigned int i, n, descs_used, uninitialized_var(prev), err_idx;
> > + __virtio16 uninitialized_var(head_flags), flags;
> > + int head, wrap_counter;
> > + bool indirect;
> > +
> > + START_USE(vq);
> > +
> > + BUG_ON(data == NULL);
> > + BUG_ON(ctx && vq->indirect);
> > +
> > + if (unlikely(vq->broken)) {
> > + END_USE(vq);
> > + return -EIO;
> > + }
> > +
> > +#ifdef DEBUG
> > + {
> > + ktime_t now = ktime_get();
> > +
> > + /* No kick or get, with .1 second between? Warn. */
> > + if (vq->last_add_time_valid)
> > + WARN_ON(ktime_to_ms(ktime_sub(now, vq->last_add_time))
> > + > 100);
> > + vq->last_add_time = now;
> > + vq->last_add_time_valid = true;
> > + }
> > +#endif
> > +
> > + BUG_ON(total_sg == 0);
> > +
> > + head = vq->next_avail_idx;
> > + wrap_counter = vq->wrap_counter;
> > +
> > + /* If the host supports indirect descriptor tables, and we have multiple
> > + * buffers, then go indirect. FIXME: tune this threshold */
> > + if (vq->indirect && total_sg > 1 && vq->vq.num_free)
>
> Let's introduce a helper like virtqueue_need_indirect() to avoid duplicating
> codes and FIXME.

Okay.

>
> > + desc = alloc_indirect_packed(_vq, total_sg, gfp);
> > + else {
> > + desc = NULL;
> > + WARN_ON_ONCE(total_sg > vq->vring_packed.num && !vq->indirect);
> > + }
> > +
> > + if (desc) {
> > + /* Use a single buffer which doesn't continue */
> > + indirect = true;
> > + /* Set up rest to use this indirect table. */
> > + i = 0;
> > + descs_used = 1;
> > + } else {
> > + indirect = false;
> > + desc = vq->vring_packed.desc;
> > + i = head;
> > + descs_used = total_sg;
> > + }
> > +
> > + if (vq->vq.num_free < descs_used) {
> > + pr_debug("Can't add buf len %i - avail = %i\n",
> > + descs_used, vq->vq.num_free);
> > + /* FIXME: for historical reasons, we force a notify here if
> > + * there are outgoing parts to the buffer. Presumably the
> > + * host should service the ring ASAP. */
> > + if (out_sgs)
> > + vq->notify(&vq->vq);
> > + if (indirect)
> > + kfree(desc);
> > + END_USE(vq);
> > + return -ENOSPC;
> > + }
> > +
> > + for (n = 0; n < out_sgs + in_sgs; n++) {
> > + for (sg = sgs[n]; sg; sg = sg_next(sg)) {
> > + dma_addr_t addr = vring_map_one_sg(vq, sg, n < out_sgs ?
> > + DMA_TO_DEVICE : DMA_FROM_DEVICE);
> > + if (vring_mapping_error(vq, addr))
> > + goto unmap_release;
> > +
> > + flags = cpu_to_virtio16(_vq->vdev, VRING_DESC_F_NEXT |
> > + (n < out_sgs ? 0 : VRING_DESC_F_WRITE) |
> > + VRING_DESC_F_AVAIL(vq->wrap_counter) |
> > + VRING_DESC_F_USED(!vq->wrap_counter));
> > + if (!indirect && i == head)
> > + head_flags = flags;
> > + else
> > + desc[i].flags = flags;
> > +
> > + desc[i].addr = cpu_to_virtio64(_vq->vdev, addr);
> > + desc[i].len = cpu_to_virtio32(_vq->vdev, sg->length);
> > + desc[i].id = cpu_to_virtio32(_vq->vdev, head);
>
> Similar to V1, we only need this for the last descriptor.

Okay, will just set it for the last desc.

>
> > + prev = i;
>
> It looks to me there's no need to track prev inside the loop here.
>
> > + i++;
> > + if (!indirect && i >= vq->vring_packed.num) {
> > + i = 0;
> > + vq->wrap_counter ^= 1;
> > + }
> > + }
> > + }
> > + /* Last one doesn't continue. */
> > + if (total_sg == 1)
> > + head_flags &= cpu_to_virtio16(_vq->vdev, ~VRING_DESC_F_NEXT);
> > + else
> > + desc[prev].flags &= cpu_to_virtio16(_vq->vdev, ~VRING_DESC_F_NEXT);
>
> The only case when prev != i - 1 is i == 0, we can add a if here.

It's just a mirror of the existing implementation in split ring.
It seems that split ring implementation needs this just because
it's much harder for it to find the prev, which is not true for
packed ring. So I'll take your suggestion. Thanks!

>
[...]
> > +static bool virtqueue_kick_prepare_packed(struct virtqueue *_vq)
> > +{
> > + struct vring_virtqueue *vq = to_vvq(_vq);
> > + u16 new, old, off_wrap;
> > + bool needs_kick;
> > +
> > + START_USE(vq);
> > + /* We need to expose the new flags value before checking notification
> > + * suppressions. */
> > + virtio_mb(vq->weak_barriers);
> > +
> > + old = vq->next_avail_idx - vq->num_added;
> > + new = vq->next_avail_idx;
> > + vq->num_added = 0;
> > +
> > +#ifdef DEBUG
> > + if (vq->last_add_time_valid) {
> > + WARN_ON(ktime_to_ms(ktime_sub(ktime_get(),
> > + vq->last_add_time)) > 100);
> > + }
> > + vq->last_add_time_valid = false;
> > +#endif
> > +
> > + off_wrap = virtio16_to_cpu(_vq->vdev, vq->vring_packed.device->off_wrap);
> > +
> > + if (vq->event) {
>
> It looks to me we should examine RING_EVENT_FLAGS_DESC in desc_event_flags
> instead of vq->event here. Spec does not forces to use evenf_off and
> event_wrap if event index is enabled.
>
> > + // FIXME: fix this!
> > + needs_kick = ((off_wrap >> 15) == vq->wrap_counter) &&
> > + vring_need_event(off_wrap & ~(1<<15), new, old);
>
> Why need a & here?

Because wrap_counter (the most significant bit in off_wrap)
isn't part of the index.

>
> > + } else {
>
> Need a smp_rmb() to make sure desc_event_flags was checked before flags.

I don't get your point, if my understanding is correct,
desc_event_flags is vq->vring_packed.device->flags. So
what's the "flags"?

>
> > + needs_kick = (vq->vring_packed.device->flags !=
> > + cpu_to_virtio16(_vq->vdev, VRING_EVENT_F_DISABLE));
> > + }
> > + END_USE(vq);
> > + return needs_kick;
> > +}
[...]
> > +static int detach_buf_packed(struct vring_virtqueue *vq, unsigned int head,
> > + void **ctx)
> > +{
> > + struct vring_packed_desc *desc;
> > + unsigned int i, j;
> > +
> > + /* Clear data ptr. */
> > + vq->desc_state[head].data = NULL;
> > +
> > + i = head;
> > +
> > + for (j = 0; j < vq->desc_state[head].num; j++) {
> > + desc = &vq->vring_packed.desc[i];
> > + vring_unmap_one_packed(vq, desc);
> > + desc->flags = 0x0;
>
> Looks like this is unnecessary.

It's safer to zero it. If we don't zero it, after we
call virtqueue_detach_unused_buf_packed() which calls
this function, the desc is still available to the
device.

>
> > + i++;
> > + if (i >= vq->vring_packed.num)
> > + i = 0;
> > + }
[...]
> > +static unsigned virtqueue_enable_cb_prepare_packed(struct virtqueue *_vq)
> > +{
> > + struct vring_virtqueue *vq = to_vvq(_vq);
> > + u16 last_used_idx, wrap_counter, off_wrap;
> > +
> > + START_USE(vq);
> > +
> > + last_used_idx = vq->last_used_idx;
> > + wrap_counter = vq->wrap_counter;
> > +
> > + if (last_used_idx > vq->next_avail_idx)
> > + wrap_counter ^= 1;
> > +
> > + off_wrap = last_used_idx | (wrap_counter << 15);
> > +
> > + /* We optimistically turn back on interrupts, then check if there was
> > + * more to do. */
> > + /* Depending on the VIRTIO_RING_F_EVENT_IDX feature, we need to
> > + * either clear the flags bit or point the event index at the next
> > + * entry. Always do both to keep code simple. */
> > + if (vq->event_flags_shadow == VRING_EVENT_F_DISABLE) {
> > + vq->event_flags_shadow = vq->event ? VRING_EVENT_F_DESC:
> > + VRING_EVENT_F_ENABLE;
> > + vq->vring_packed.driver->flags = cpu_to_virtio16(_vq->vdev,
> > + vq->event_flags_shadow);
> > + }
>
> A smp_wmb() is missed here?
>
> > + vq->vring_packed.driver->off_wrap = cpu_to_virtio16(_vq->vdev, off_wrap);
>
> And according to the spec, it looks to me write a VRING_EVENT_F_ENABLE is
> sufficient here.

I didn't think much when implementing the event suppression
for packed ring previously. After I saw your comments, I found
something new. Indeed, unlike the split ring, for the packed
ring, spec doesn't say we must use VRING_EVENT_F_DESC when
EVENT_IDX is negotiated. So do you think below thought is
right or makes sense?

- For virtqueue_enable_cb_prepare(), we just need to enable
the ring by setting flags to VRING_EVENT_F_ENABLE in any
case.

- We will try to use VRING_EVENT_F_DESC (if EVENT_IDX is
negotiated) only when we want to delay the interrupts
virtqueue_enable_cb_delayed().

>
> > + END_USE(vq);
> > + return last_used_idx;
> > +}
> > +
[...]
> > @@ -1157,14 +1852,18 @@ void vring_transport_features(struct virtio_device *vdev)
> > for (i = VIRTIO_TRANSPORT_F_START; i < VIRTIO_TRANSPORT_F_END; i++) {
> > switch (i) {
> > - case VIRTIO_RING_F_INDIRECT_DESC:
> > +#if 0
> > + case VIRTIO_RING_F_INDIRECT_DESC: // FIXME not tested yet.
> > break;
> > - case VIRTIO_RING_F_EVENT_IDX:
> > + case VIRTIO_RING_F_EVENT_IDX: // FIXME probably not work.
> > break;
> > +#endif
>
> It would be better if you can split EVENT_IDX and INDIRECT_DESC into
> separate patches too.

Sure. Will do it in the next version.

Thanks for the review!

>
> Thanks
>