Re: [RFC PATCH v2 4/8] virtio/vsock: add transport zerocopy callback

From: Arseniy Krasnov
Date: Thu Jun 09 2022 - 08:24:29 EST


On 09.06.2022 11:41, Stefano Garzarella wrote:
> On Fri, Jun 03, 2022 at 05:37:48AM +0000, Arseniy Krasnov wrote:
>> This adds transport callback which processes rx
>> queue of socket and instead of copying data to
>> user provided buffer, it inserts data pages of
>> each packet to user's vm area.
>>
>> Signed-off-by: Arseniy Krasnov <AVKrasnov@xxxxxxxxxxxxxx>
>> ---
>> include/linux/virtio_vsock.h            |   4 +
>> include/uapi/linux/virtio_vsock.h       |   6 +
>> net/vmw_vsock/virtio_transport_common.c | 208 +++++++++++++++++++++++-
>> 3 files changed, 215 insertions(+), 3 deletions(-)
>>
>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>> index d02cb7aa922f..47a68a2ea838 100644
>> --- a/include/linux/virtio_vsock.h
>> +++ b/include/linux/virtio_vsock.h
>> @@ -51,6 +51,7 @@ struct virtio_vsock_pkt {
>>     bool reply;
>>     bool tap_delivered;
>>     bool slab_buf;
>> +    bool split;
>> };
>>
>> struct virtio_vsock_pkt_info {
>> @@ -131,6 +132,9 @@ int virtio_transport_dgram_bind(struct vsock_sock *vsk,
>>                 struct sockaddr_vm *addr);
>> bool virtio_transport_dgram_allow(u32 cid, u32 port);
>>
>> +int virtio_transport_zerocopy_dequeue(struct vsock_sock *vsk,
>> +                      struct vm_area_struct *vma,
>> +                      unsigned long addr);
>> int virtio_transport_connect(struct vsock_sock *vsk);
>>
>> int virtio_transport_shutdown(struct vsock_sock *vsk, int mode);
>> diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
>> index 64738838bee5..6775c6c44b5b 100644
>> --- a/include/uapi/linux/virtio_vsock.h
>> +++ b/include/uapi/linux/virtio_vsock.h
>> @@ -66,6 +66,12 @@ struct virtio_vsock_hdr {
>>     __le32    fwd_cnt;
>> } __attribute__((packed));
>>
>> +struct virtio_vsock_usr_hdr {
>> +    u32 flags;
>> +    u32 len;
>> +    u32 copy_len;
>> +} __attribute__((packed));
>> +
>> enum virtio_vsock_type {
>>     VIRTIO_VSOCK_TYPE_STREAM = 1,
>>     VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>> index 278567f748f2..3a3e84176c75 100644
>> --- a/net/vmw_vsock/virtio_transport_common.c
>> +++ b/net/vmw_vsock/virtio_transport_common.c
>> @@ -12,6 +12,7 @@
>> #include <linux/ctype.h>
>> #include <linux/list.h>
>> #include <linux/virtio_vsock.h>
>> +#include <linux/mm.h>
>> #include <uapi/linux/vsockmon.h>
>>
>> #include <net/sock.h>
>> @@ -347,6 +348,196 @@ virtio_transport_stream_do_peek(struct vsock_sock *vsk,
>>     return err;
>> }
>>
>> +#define MAX_PAGES_TO_MAP 256
>> +
>> +int virtio_transport_zerocopy_dequeue(struct vsock_sock *vsk,
>> +                      struct vm_area_struct *vma,
>> +                      unsigned long addr)
>> +{
>> +    struct virtio_vsock_sock *vvs = vsk->trans;
>> +    struct virtio_vsock_usr_hdr *usr_hdr_buffer;
>> +    unsigned long max_pages_to_insert;
>> +    unsigned long tmp_pages_inserted;
>> +    unsigned long pages_to_insert;
>> +    struct page *usr_hdr_page;
>> +    unsigned long vma_size;
>> +    struct page **pages;
>> +    int max_vma_pages;
>> +    int max_usr_hdrs;
>> +    int res;
>> +    int err;
>> +    int i;
>> +
>> +    /* Only use VMA from first page. */
>> +    if (vma->vm_start != addr)
>> +        return -EFAULT;
>> +
>> +    vma_size = vma->vm_end - vma->vm_start;
>> +
>> +    /* Too small vma(at least one page for headers
>> +     * and one page for data).
>> +     */
>> +    if (vma_size < 2 * PAGE_SIZE)
>> +        return -EFAULT;
>> +
>> +    /* Page for meta data. */
>> +    usr_hdr_page = alloc_page(GFP_KERNEL);
>
> I think all these checks should be done in af_vsock.c>
> It would be nice to avoid that every transport reimplements the same thing and especially that all transports have the same behavior.
>
> If you can would be nice to have the transports to return an array of pages to map, and af_vsock will handle it and the usr_hdr_page.
>
> Do you think it's doable?
Yes, as we talked in patch above, this part could be common.
>
>> +
>> +    if (!usr_hdr_page)
>> +        return -EFAULT;
>> +
>> +    pages = kmalloc_array(MAX_PAGES_TO_MAP, sizeof(pages[0]), GFP_KERNEL);
>> +
>> +    if (!pages)
>> +        return -EFAULT;
>> +
>> +    pages[pages_to_insert++] = usr_hdr_page;
>> +
>> +    usr_hdr_buffer = page_to_virt(usr_hdr_page);
>> +
>> +    err = 0;
>> +
>> +    /* As we use first page for headers, so total number of
>> +     * pages for user is min between number of headers in
>> +     * first page and size of vma(in pages, except first page).
>> +     */
>> +    max_usr_hdrs = PAGE_SIZE / sizeof(*usr_hdr_buffer);
>> +    max_vma_pages = (vma_size / PAGE_SIZE) - 1;
>> +    max_pages_to_insert = min(max_usr_hdrs, max_vma_pages);
>> +
>> +    if (max_pages_to_insert > MAX_PAGES_TO_MAP)
>> +        max_pages_to_insert = MAX_PAGES_TO_MAP;
>> +
>> +    spin_lock_bh(&vvs->rx_lock);
>> +
>> +    while (!list_empty(&vvs->rx_queue) &&
>> +           pages_to_insert < max_pages_to_insert) {
>> +        struct virtio_vsock_pkt *pkt;
>> +        ssize_t rest_data_bytes;
>> +        size_t moved_data_bytes;
>> +        unsigned long pg_offs;
>> +
>> +        pkt = list_first_entry(&vvs->rx_queue,
>> +                       struct virtio_vsock_pkt, list);
>> +
>> +        /* Buffer was allocated by 'kmalloc()'. This could
>> +         * happen, when zerocopy was enabled, but we still
>> +         * have pending packet which was created before it.
>> +         */
>> +        if (pkt->slab_buf) {
>> +            usr_hdr_buffer->flags = le32_to_cpu(pkt->hdr.flags);
>> +            usr_hdr_buffer->len = 0;
>> +            usr_hdr_buffer->copy_len = le32_to_cpu(pkt->hdr.len);
>> +            /* Report user to read it using copy. */
>
> Is it a "to do"?
No, seems i need to move my comment under opening bracket
>
>> +            break;
>> +        }
>> +
>> +        /* This could happen, when packet was dequeued before
>> +         * by an ordinary 'read()' call. We can't handle such
>> +         * packet. Drop it.
>
> We can't drop packets.
> I think we should allow to enable/disable this new feature before the connection.
Yes, allow to enable/disable only in not connected state
>
>> +         */
>> +        if (pkt->off % PAGE_SIZE) {
>> +            list_del(&pkt->list);
>> +            virtio_transport_dec_rx_pkt(vvs, pkt);
>> +            virtio_transport_free_pkt(pkt);
>> +            continue;
>> +        }
>> +
>> +        rest_data_bytes = le32_to_cpu(pkt->hdr.len) - pkt->off;
>> +
>> +        /* For packets, bigger than one page, split it's
>> +         * high order allocated buffer to 0 order pages.
>> +         * Otherwise 'vm_insert_pages()' will fail, for
>> +         * all pages except first.
>> +         */
>> +        if (rest_data_bytes > PAGE_SIZE) {
>> +            /* High order buffer not split yet. */
>> +            if (!pkt->split) {
>> +                split_page(virt_to_page(pkt->buf),
>> +                       get_order(le32_to_cpu(pkt->hdr.len)));
>> +                pkt->split = true;
>> +            }
>> +        }
>> +
>> +        pg_offs = pkt->off;
>> +        moved_data_bytes = 0;
>> +
>> +        while (rest_data_bytes &&
>> +               pages_to_insert < max_pages_to_insert) {
>> +            struct page *buf_page;
>> +
>> +            buf_page = virt_to_page(pkt->buf + pg_offs);
>> +
>> +            pages[pages_to_insert++] = buf_page;
>> +            /* Get reference to prevent this page being
>> +             * returned to page allocator when packet will
>> +             * be freed. Ref count will be 2.
>> +             */
>> +            get_page(buf_page);
>> +            pg_offs += PAGE_SIZE;
>> +
>> +            if (rest_data_bytes >= PAGE_SIZE) {
>> +                moved_data_bytes += PAGE_SIZE;
>> +                rest_data_bytes -= PAGE_SIZE;
>> +            } else {
>> +                moved_data_bytes += rest_data_bytes;
>> +                rest_data_bytes = 0;
>> +            }
>> +        }
>> +
>> +        usr_hdr_buffer->flags = le32_to_cpu(pkt->hdr.flags);
>> +        usr_hdr_buffer->len = moved_data_bytes;
>> +        usr_hdr_buffer->copy_len = 0;
>> +        usr_hdr_buffer++;
>> +
>> +        pkt->off = pg_offs;
>> +
>> +        if (rest_data_bytes == 0) {
>> +            list_del(&pkt->list);
>> +            virtio_transport_dec_rx_pkt(vvs, pkt);
>> +            virtio_transport_free_pkt(pkt);
>> +        }
>> +
>> +        /* Now ref count for all pages of packet is 1. */
>> +    }
>> +
>> +    /* Set last buffer empty(if we have one). */
>> +    if (pages_to_insert - 1 < max_usr_hdrs)
>> +        usr_hdr_buffer->len = 0;
>> +
>> +    spin_unlock_bh(&vvs->rx_lock);
>> +
>> +    tmp_pages_inserted = pages_to_insert;
>> +
>> +    res = vm_insert_pages(vma, addr, pages, &tmp_pages_inserted);
>> +
>> +    if (res || tmp_pages_inserted) {
>> +        /* Failed to insert some pages, we have "partially"
>> +         * mapped vma. Do not return, set error code. This
>> +         * code will be returned to user. User needs to call
>> +         * 'madvise()/mmap()' to clear this vma. Anyway,
>> +         * references to all pages will to be dropped below.
>> +         */
>> +        err = -EFAULT;
>> +    }
>> +
>> +    /* Put reference for every page. */
>> +    for (i = 0; i < pages_to_insert; i++) {
>> +        /* Ref count is 2 ('get_page()' + 'vm_insert_pages()' above).
>> +         * Put reference once, page will be returned to allocator
>> +         * after user's 'madvice()/munmap()' call(or it wasn't mapped
>> +         * if 'vm_insert_pages()' failed).
>> +         */
>> +        put_page(pages[i]);
>> +    }
>> +
>> +    virtio_transport_send_credit_update(vsk);
>> +    kfree(pages);
>> +
>> +    return err;
>> +}
>> +EXPORT_SYMBOL_GPL(virtio_transport_zerocopy_dequeue);
>> +
>> static ssize_t
>> virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
>>                    struct msghdr *msg,
>> @@ -1344,10 +1535,21 @@ EXPORT_SYMBOL_GPL(virtio_transport_recv_pkt);
>> void virtio_transport_free_pkt(struct virtio_vsock_pkt *pkt)
>> {
>>     if (pkt->buf_len) {
>> -        if (pkt->slab_buf)
>> +        if (pkt->slab_buf) {
>>             kfree(pkt->buf);
>> -        else
>> -            free_pages(buf, get_order(pkt->buf_len));
>> +        } else {
>> +            unsigned int order = get_order(pkt->buf_len);
>> +            unsigned long buf = (unsigned long)pkt->buf;
>> +
>> +            if (pkt->split) {
>> +                int i;
>> +
>> +                for (i = 0; i < (1 << order); i++)
>> +                    free_page(buf + i * PAGE_SIZE);
>> +            } else {
>> +                free_pages(buf, order);
>> +            }
>> +        }
>>     }
>>
>>     kfree(pkt);
>> -- 2.25.1
>