Re: [MASSMAIL KLMS] Re: [PATCH v11 11/18] virtio/vsock: dequeue callback for SOCK_SEQPACKET

From: Arseny Krasnov
Date: Fri Jun 18 2021 - 12:27:13 EST



On 18.06.2021 19:25, Stefano Garzarella wrote:
> On Fri, Jun 18, 2021 at 07:08:30PM +0300, Arseny Krasnov wrote:
>> On 18.06.2021 18:55, Stefano Garzarella wrote:
>>> On Fri, Jun 18, 2021 at 06:04:37PM +0300, Arseny Krasnov wrote:
>>>> On 18.06.2021 16:44, Stefano Garzarella wrote:
>>>>> Hi Arseny,
>>>>> the series looks great, I have just a question below about
>>>>> seqpacket_dequeue.
>>>>>
>>>>> I also sent a couple a simple fixes, it would be great if you can review
>>>>> them:
>>>>> https://lore.kernel.org/netdev/20210618133526.300347-1-sgarzare@xxxxxxxxxx/
>>>>>
>>>>>
>>>>> On Fri, Jun 11, 2021 at 02:12:38PM +0300, Arseny Krasnov wrote:
>>>>>> Callback fetches RW packets from rx queue of socket until whole record
>>>>>> is copied(if user's buffer is full, user is not woken up). This is done
>>>>>> to not stall sender, because if we wake up user and it leaves syscall,
>>>>>> nobody will send credit update for rest of record, and sender will wait
>>>>>> for next enter of read syscall at receiver's side. So if user buffer is
>>>>>> full, we just send credit update and drop data.
>>>>>>
>>>>>> Signed-off-by: Arseny Krasnov <arseny.krasnov@xxxxxxxxxxxxx>
>>>>>> ---
>>>>>> v10 -> v11:
>>>>>> 1) 'msg_count' field added to count current number of EORs.
>>>>>> 2) 'msg_ready' argument removed from callback.
>>>>>> 3) If 'memcpy_to_msg()' failed during copy loop, there will be
>>>>>> no next attempts to copy data, rest of record will be freed.
>>>>>>
>>>>>> include/linux/virtio_vsock.h | 5 ++
>>>>>> net/vmw_vsock/virtio_transport_common.c | 84 +++++++++++++++++++++++++
>>>>>> 2 files changed, 89 insertions(+)
>>>>>>
>>>>>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>>>>>> index dc636b727179..1d9a302cb91d 100644
>>>>>> --- a/include/linux/virtio_vsock.h
>>>>>> +++ b/include/linux/virtio_vsock.h
>>>>>> @@ -36,6 +36,7 @@ struct virtio_vsock_sock {
>>>>>> u32 rx_bytes;
>>>>>> u32 buf_alloc;
>>>>>> struct list_head rx_queue;
>>>>>> + u32 msg_count;
>>>>>> };
>>>>>>
>>>>>> struct virtio_vsock_pkt {
>>>>>> @@ -80,6 +81,10 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>>>>>> struct msghdr *msg,
>>>>>> size_t len, int flags);
>>>>>>
>>>>>> +ssize_t
>>>>>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>>>>>> + struct msghdr *msg,
>>>>>> + int flags);
>>>>>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
>>>>>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>>>>>>
>>>>>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>>>>>> index ad0d34d41444..1e1df19ec164 100644
>>>>>> --- a/net/vmw_vsock/virtio_transport_common.c
>>>>>> +++ b/net/vmw_vsock/virtio_transport_common.c
>>>>>> @@ -393,6 +393,78 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
>>>>>> return err;
>>>>>> }
>>>>>>
>>>>>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>>>>>> + struct msghdr *msg,
>>>>>> + int flags)
>>>>>> +{
>>>>>> + struct virtio_vsock_sock *vvs = vsk->trans;
>>>>>> + struct virtio_vsock_pkt *pkt;
>>>>>> + int dequeued_len = 0;
>>>>>> + size_t user_buf_len = msg_data_left(msg);
>>>>>> + bool copy_failed = false;
>>>>>> + bool msg_ready = false;
>>>>>> +
>>>>>> + spin_lock_bh(&vvs->rx_lock);
>>>>>> +
>>>>>> + if (vvs->msg_count == 0) {
>>>>>> + spin_unlock_bh(&vvs->rx_lock);
>>>>>> + return 0;
>>>>>> + }
>>>>>> +
>>>>>> + while (!msg_ready) {
>>>>>> + pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
>>>>>> +
>>>>>> + if (!copy_failed) {
>>>>>> + size_t pkt_len;
>>>>>> + size_t bytes_to_copy;
>>>>>> +
>>>>>> + pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
>>>>>> + bytes_to_copy = min(user_buf_len, pkt_len);
>>>>>> +
>>>>>> + if (bytes_to_copy) {
>>>>>> + int err;
>>>>>> +
>>>>>> + /* sk_lock is held by caller so no one else can dequeue.
>>>>>> + * Unlock rx_lock since memcpy_to_msg() may sleep.
>>>>>> + */
>>>>>> + spin_unlock_bh(&vvs->rx_lock);
>>>>>> +
>>>>>> + err = memcpy_to_msg(msg, pkt->buf, bytes_to_copy);
>>>>>> + if (err) {
>>>>>> + /* Copy of message failed, set flag to skip
>>>>>> + * copy path for rest of fragments. Rest of
>>>>>> + * fragments will be freed without copy.
>>>>>> + */
>>>>>> + copy_failed = true;
>>>>>> + dequeued_len = err;
>>>>> If we fail to copy the message we will discard the entire packet.
>>>>> Is it acceptable for the user point of view, or we should leave the
>>>>> packet in the queue and the user can retry, maybe with a different
>>>>> buffer?
>>>>>
>>>>> Then we can remove the packets only when we successfully copied all the
>>>>> fragments.
>>>>>
>>>>> I'm not sure make sense, maybe better to check also other
>>>>> implementations :-)
>>>>>
>>>>> Thanks,
>>>>> Stefano
>>>> Understand, i'll check it on weekend, anyway I think it is
>>>> not critical for implementation.
>>> Yep, I agree.
>>>
>>>> I have another question: may be it is useful to research for
>>>> approach where packets are not queued until whole message
>>>> is received, but copied to user's buffer thus freeing memory.
>>>> (like previous implementation, of course with solution of problem
>>>> where part of message still in queue, while reader was woken
>>>> by timeout or signal).
>>>>
>>>> I think it is better, because  in current version, sender may set
>>>> 'peer_alloc_buf' to  for example 1MB, so at receiver we get
>>>> 1MB of 'kmalloc()' memory allocated, while having user's buffer
>>>> to copy data there or drop it(if user's buffer is full). This way
>>>> won't change spec(e.g. no message id or SEQ_BEGIN will be added).
>>>>
>>>> What do You think?
>>> Yep, I see your point and it would be great, but I think the main issues
>>> to fix is how to handle a signal while we are waiting other fragments
>>> since the other peer can take unspecified time to send them.
>> What about transport callback, something like 'seqpacket_drain()' or
>>
>> 'seqpacket_drop_curr()' - when we got signal or timeout, notify transport
>>
>> to drop current message. In virtio case this will set special flag in transport,
>>
>> so on next dequeue, this flag is checked and if it is set - we drop all packets
>>
>> until EOR found. Then we can copy untouched new record.
>>
> But in this way, we will lose the entire message.
>
> Is it acceptable for seqpacket?
>
> Stefano
Hm, i'll check it. At least for unix domain sockets - it supports SEQPACKET
>
>