Hello, Manfred!
Thank you, for your review. I've reviewed your patch.
I forgot about the case with different message types. At now with your patch,
a sender will force message consuming if that doesn't hold own capacity.
I have measured queue throughput and have pushed the results to:
https://github.com/artur-barsegyan/systemv_queue_research
But I'm confused about the next thought: in general loop in the do_msgsnd()
function, we doesn't check pipeline sending availability. Your case will be
optimized if we check the pipeline sending inside the loop.
On Sun, May 24, 2020 at 03:21:31PM +0200, Manfred Spraul wrote:
Hello Artur,
On 5/23/20 10:34 PM, Artur Barsegyan wrote:
Take into account the total size of the already enqueued messages ofThanks for analyzing the issue!
previously handled senders before another one.
Otherwise, we have serious degradation of receiver throughput for
case with multiple senders because another sender wakes up,
checks the queue capacity and falls into sleep again.
Each round-trip wastes CPU time a lot and leads to perceptible
throughput degradation.
Source code of:
- sender/receiver
- benchmark script
- ready graphics of before/after results
is located here: https://github.com/artur-barsegyan/systemv_queue_research
Signed-off-by: Artur Barsegyan <a.barsegyan96@xxxxxxxxx>You have missed the case of a do_msgsnd() that doesn't enqueue the message:
---
ipc/msg.c | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/ipc/msg.c b/ipc/msg.c
index caca67368cb5..52d634b0a65a 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -214,6 +214,7 @@ static void ss_wakeup(struct msg_queue *msq,
struct msg_sender *mss, *t;
struct task_struct *stop_tsk = NULL;
struct list_head *h = &msq->q_senders;
+ size_t msq_quota_used = 0;
list_for_each_entry_safe(mss, t, h, list) {
if (kill)
@@ -233,7 +234,7 @@ static void ss_wakeup(struct msg_queue *msq,
* move the sender to the tail on behalf of the
* blocked task.
*/
- else if (!msg_fits_inqueue(msq, mss->msgsz)) {
+ else if (!msg_fits_inqueue(msq, msq_quota_used + mss->msgsz)) {
if (!stop_tsk)
stop_tsk = mss->tsk;
@@ -241,6 +242,7 @@ static void ss_wakeup(struct msg_queue *msq,
continue;
}
+ msq_quota_used += mss->msgsz;
wake_q_add(wake_q, mss->tsk);
Situation:
- 2 messages of type 1 in the queue (2x8192 bytes, queue full)
- 6 senders waiting to send messages of type 2
- 6 receivers waiting to get messages of type 2.
If now a receiver reads one message of type 1, then all 6 senders can send.
WIth your patch applied, only one sender sends the message to one receiver,
and the remaining 10 tasks continue to sleep.
Could you please check and (assuming that you agree) run your benchmarks
with the patch applied?
--
ÂÂÂ Manfred
From fe2f257b1950a19bf5c6f67e71aa25c2f13bcdc3 Mon Sep 17 00:00:00 2001
From: Manfred Spraul <manfred@xxxxxxxxxxxxxxxx>
Date: Sun, 24 May 2020 14:47:31 +0200
Subject: [PATCH 2/2] ipc/msg.c: Handle case of senders not enqueuing the
message
The patch "ipc/msg.c: wake up senders until there is a queue empty
capacity" avoids the thundering herd problem by wakeing up
only as many potential senders as there is free space in the queue.
This patch is a fix: If one of the senders doesn't enqueue its message,
then a search for further potential senders must be performed.
Signed-off-by: Manfred Spraul <manfred@xxxxxxxxxxxxxxxx>
---
ipc/msg.c | 21 +++++++++++++++++++++
1 file changed, 21 insertions(+)
diff --git a/ipc/msg.c b/ipc/msg.c
index 52d634b0a65a..f6d5188db38a 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -208,6 +208,12 @@ static inline void ss_del(struct msg_sender *mss)
list_del(&mss->list);
}
+/*
+ * ss_wakeup() assumes that the stored senders will enqueue the pending message.
+ * Thus: If a woken up task doesn't send the enqueued message for whatever
+ * reason, then that task must call ss_wakeup() again, to ensure that no
+ * wakeup is lost.
+ */
static void ss_wakeup(struct msg_queue *msq,
struct wake_q_head *wake_q, bool kill)
{
@@ -843,6 +849,7 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext,
struct msg_queue *msq;
struct msg_msg *msg;
int err;
+ bool need_wakeup;
struct ipc_namespace *ns;
DEFINE_WAKE_Q(wake_q);
@@ -869,6 +876,7 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext,
ipc_lock_object(&msq->q_perm);
+ need_wakeup = false;
for (;;) {
struct msg_sender s;
@@ -898,6 +906,13 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext,
/* enqueue the sender and prepare to block */
ss_add(msq, &s, msgsz);
+ /* Enqueuing a sender is actually an obligation:
+ * The sender must either enqueue the message, or call
+ * ss_wakeup(). Thus track that we have added our message
+ * to the candidates for the message queue.
+ */
+ need_wakeup = true;
+
if (!ipc_rcu_getref(&msq->q_perm)) {
err = -EIDRM;
goto out_unlock0;
@@ -935,12 +950,18 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext,
msq->q_qnum++;
atomic_add(msgsz, &ns->msg_bytes);
atomic_inc(&ns->msg_hdrs);
+
+ /* we have fulfilled our obligation, no need for wakeup */
+ need_wakeup = false;
}
err = 0;
msg = NULL;
out_unlock0:
+ if (need_wakeup)
+ ss_wakeup(msq, &wake_q, false);
+
ipc_unlock_object(&msq->q_perm);
wake_up_q(&wake_q);
out_unlock1:
--
2.26.2