Re: [PATCH] ipc/msg.c: wake up senders until there is a queue empty capacity

From: Artur Barsegyan
Date: Mon Jun 01 2020 - 10:02:52 EST


Hi, Manfred.

Did you get my last message?

On Wed, May 27, 2020 at 02:22:57PM +0300, Artur Barsegyan wrote:
> [sorry for the duplicates â I have changed my email client]
>
> About your case:
>
> The new receiver puts at the end of the receivers list.
> pipelined_send() starts from the beginning of the list and iterates until the end.
>
> If our queue is always full, each receiver should get a message because new receivers appends at the end.
> In my vision: we waste some time in that loop but in general should increase the throughout. But it should be tested.
>
> Yes, I'm gonna implement it and make a benchmark. But maybe it should be done in another patch thread?
>
> On Wed, May 27, 2020 at 08:03:17AM +0200, Manfred Spraul wrote:
> > Hello Artur,
> >
> > On 5/26/20 9:56 AM, Artur Barsegyan wrote:
> > > Hello, Manfred!
> > >
> > > Thank you, for your review. I've reviewed your patch.
> > >
> > > I forgot about the case with different message types. At now with your patch,
> > > a sender will force message consuming if that doesn't hold own capacity.
> > >
> > > I have measured queue throughput and have pushed the results to:
> > > https://github.com/artur-barsegyan/systemv_queue_research
> > >
> > > But I'm confused about the next thought: in general loop in the do_msgsnd()
> > > function, we doesn't check pipeline sending availability. Your case will be
> > > optimized if we check the pipeline sending inside the loop.
> >
> > I don't get your concern, or perhaps this is a feature that I had always
> > assumed as "normal":
> >
> > "msg_fits_inqueue(msq, msgsz)" is in the loop, this ensures progress.
> >
> > The rational is a design decision:
> >
> > The check for pipeline sending is only done if there would be space to store
> > the message in the queue.
> >
> > I was afraid that performing the pipeline send immediately, without checking
> > queue availability, could break apps:
> >
> > Some messages would arrive immediately (if there is a waiting receiver),
> > other messages are stuck forever (since the queue is full).
> >
> > Initial patch: https://lkml.org/lkml/1999/10/3/5 (without any remarks about
> > the design decision)
> >
> > The risk that I had seen was theoretical, I do not have any real bug
> > reports. So we could change it.
> >
> > Perhaps: Go in the same direction as it was done for POSIX mqueue: implement
> > pipelined receive.
> >
> > > On Sun, May 24, 2020 at 03:21:31PM +0200, Manfred Spraul wrote:
> > > > Hello Artur,
> > > >
> > > > On 5/23/20 10:34 PM, Artur Barsegyan wrote:
> > > > > Take into account the total size of the already enqueued messages of
> > > > > previously handled senders before another one.
> > > > >
> > > > > Otherwise, we have serious degradation of receiver throughput for
> > > > > case with multiple senders because another sender wakes up,
> > > > > checks the queue capacity and falls into sleep again.
> > > > >
> > > > > Each round-trip wastes CPU time a lot and leads to perceptible
> > > > > throughput degradation.
> > > > >
> > > > > Source code of:
> > > > > - sender/receiver
> > > > > - benchmark script
> > > > > - ready graphics of before/after results
> > > > >
> > > > > is located here: https://github.com/artur-barsegyan/systemv_queue_research
> > > > Thanks for analyzing the issue!
> > > >
> > > > > Signed-off-by: Artur Barsegyan <a.barsegyan96@xxxxxxxxx>
> > > > > ---
> > > > > ipc/msg.c | 4 +++-
> > > > > 1 file changed, 3 insertions(+), 1 deletion(-)
> > > > >
> > > > > diff --git a/ipc/msg.c b/ipc/msg.c
> > > > > index caca67368cb5..52d634b0a65a 100644
> > > > > --- a/ipc/msg.c
> > > > > +++ b/ipc/msg.c
> > > > > @@ -214,6 +214,7 @@ static void ss_wakeup(struct msg_queue *msq,
> > > > > struct msg_sender *mss, *t;
> > > > > struct task_struct *stop_tsk = NULL;
> > > > > struct list_head *h = &msq->q_senders;
> > > > > + size_t msq_quota_used = 0;
> > > > > list_for_each_entry_safe(mss, t, h, list) {
> > > > > if (kill)
> > > > > @@ -233,7 +234,7 @@ static void ss_wakeup(struct msg_queue *msq,
> > > > > * move the sender to the tail on behalf of the
> > > > > * blocked task.
> > > > > */
> > > > > - else if (!msg_fits_inqueue(msq, mss->msgsz)) {
> > > > > + else if (!msg_fits_inqueue(msq, msq_quota_used + mss->msgsz)) {
> > > > > if (!stop_tsk)
> > > > > stop_tsk = mss->tsk;
> > > > > @@ -241,6 +242,7 @@ static void ss_wakeup(struct msg_queue *msq,
> > > > > continue;
> > > > > }
> > > > > + msq_quota_used += mss->msgsz;
> > > > > wake_q_add(wake_q, mss->tsk);
> > > > You have missed the case of a do_msgsnd() that doesn't enqueue the message:
> > > >
> > > > Situation:
> > > >
> > > > - 2 messages of type 1 in the queue (2x8192 bytes, queue full)
> > > >
> > > > - 6 senders waiting to send messages of type 2
> > > >
> > > > - 6 receivers waiting to get messages of type 2.
> > > >
> > > > If now a receiver reads one message of type 1, then all 6 senders can send.
> > > >
> > > > WIth your patch applied, only one sender sends the message to one receiver,
> > > > and the remaining 10 tasks continue to sleep.
> > > >
> > > >
> > > > Could you please check and (assuming that you agree) run your benchmarks
> > > > with the patch applied?
> > > >
> > > > --
> > > >
> > > > ÂÂÂ Manfred
> > > >
> > > >
> > > >
> > > > From fe2f257b1950a19bf5c6f67e71aa25c2f13bcdc3 Mon Sep 17 00:00:00 2001
> > > > From: Manfred Spraul <manfred@xxxxxxxxxxxxxxxx>
> > > > Date: Sun, 24 May 2020 14:47:31 +0200
> > > > Subject: [PATCH 2/2] ipc/msg.c: Handle case of senders not enqueuing the
> > > > message
> > > >
> > > > The patch "ipc/msg.c: wake up senders until there is a queue empty
> > > > capacity" avoids the thundering herd problem by wakeing up
> > > > only as many potential senders as there is free space in the queue.
> > > >
> > > > This patch is a fix: If one of the senders doesn't enqueue its message,
> > > > then a search for further potential senders must be performed.
> > > >
> > > > Signed-off-by: Manfred Spraul <manfred@xxxxxxxxxxxxxxxx>
> > > > ---
> > > > ipc/msg.c | 21 +++++++++++++++++++++
> > > > 1 file changed, 21 insertions(+)
> > > >
> > > > diff --git a/ipc/msg.c b/ipc/msg.c
> > > > index 52d634b0a65a..f6d5188db38a 100644
> > > > --- a/ipc/msg.c
> > > > +++ b/ipc/msg.c
> > > > @@ -208,6 +208,12 @@ static inline void ss_del(struct msg_sender *mss)
> > > > list_del(&mss->list);
> > > > }
> > > > +/*
> > > > + * ss_wakeup() assumes that the stored senders will enqueue the pending message.
> > > > + * Thus: If a woken up task doesn't send the enqueued message for whatever
> > > > + * reason, then that task must call ss_wakeup() again, to ensure that no
> > > > + * wakeup is lost.
> > > > + */
> > > > static void ss_wakeup(struct msg_queue *msq,
> > > > struct wake_q_head *wake_q, bool kill)
> > > > {
> > > > @@ -843,6 +849,7 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext,
> > > > struct msg_queue *msq;
> > > > struct msg_msg *msg;
> > > > int err;
> > > > + bool need_wakeup;
> > > > struct ipc_namespace *ns;
> > > > DEFINE_WAKE_Q(wake_q);
> > > > @@ -869,6 +876,7 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext,
> > > > ipc_lock_object(&msq->q_perm);
> > > > + need_wakeup = false;
> > > > for (;;) {
> > > > struct msg_sender s;
> > > > @@ -898,6 +906,13 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext,
> > > > /* enqueue the sender and prepare to block */
> > > > ss_add(msq, &s, msgsz);
> > > > + /* Enqueuing a sender is actually an obligation:
> > > > + * The sender must either enqueue the message, or call
> > > > + * ss_wakeup(). Thus track that we have added our message
> > > > + * to the candidates for the message queue.
> > > > + */
> > > > + need_wakeup = true;
> > > > +
> > > > if (!ipc_rcu_getref(&msq->q_perm)) {
> > > > err = -EIDRM;
> > > > goto out_unlock0;
> > > > @@ -935,12 +950,18 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext,
> > > > msq->q_qnum++;
> > > > atomic_add(msgsz, &ns->msg_bytes);
> > > > atomic_inc(&ns->msg_hdrs);
> > > > +
> > > > + /* we have fulfilled our obligation, no need for wakeup */
> > > > + need_wakeup = false;
> > > > }
> > > > err = 0;
> > > > msg = NULL;
> > > > out_unlock0:
> > > > + if (need_wakeup)
> > > > + ss_wakeup(msq, &wake_q, false);
> > > > +
> > > > ipc_unlock_object(&msq->q_perm);
> > > > wake_up_q(&wake_q);
> > > > out_unlock1:
> > > > --
> > > > 2.26.2
> > > >
> >