We currently use a full barrier on the sender side toI like the idea, the pairing in ipc is not good.
to avoid receiver tasks disappearing on us while still
performing on the sender side wakeup. We lack however,
the proper CPU-CPU interactions pairing on the receiver
side which busy-waits for the message. Similarly, we do
not need a full smp_mb, and can relax the semantics for
the writer and reader sides of the message. This is safe
as we are only ordering loads and stores to r_msg. And in
both smp_wmb and smp_rmb, there are no stores after the
calls _anyway_.
This obviously applies for pipelined_send and expunge_all,Idea for improvement: Add here /* smp_barrier_pair: ipc_msg_01 */
for EIRDM when destroying a queue.
Signed-off-by: Davidlohr Bueso <dbueso@xxxxxxx>
---
ipc/msg.c | 31 ++++++++++++++++++++++---------
1 file changed, 22 insertions(+), 9 deletions(-)
diff --git a/ipc/msg.c b/ipc/msg.c
index 2b6fdbb..ac5116e 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -196,7 +196,7 @@ static void expunge_all(struct msg_queue *msq, int res)
* or dealing with -EAGAIN cases. See lockless receive part 1
* and 2 in do_msgrcv().
*/
- smp_mb();
+ smp_wmb();
msr->r_msg = ERR_PTR(res);Idea for improvement: Add here /* smp_barrier_pair: ipc_msg_02 */
}
}
@@ -580,7 +580,7 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
/* initialize pipelined send ordering */
msr->r_msg = NULL;
wake_up_process(msr->r_tsk);
- smp_mb(); /* see barrier comment below */
+ smp_wmb(); /* see barrier comment below */
msr->r_msg = ERR_PTR(-E2BIG);
} else {
msr->r_msg = NULL;
@@ -589,11 +589,12 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
wake_up_process(msr->r_tsk);
/*
* Ensure that the wakeup is visible before
- * setting r_msg, as the receiving end depends
- * on it. See lockless receive part 1 and 2 in
- * do_msgrcv().
+ * setting r_msg, as the receiving can otherwise
+ * exit - once r_msg is set, the receiver can
+ * continue. See lockless receive part 1 and 2
+ * in do_msgrcv().
*/
- smp_mb();
+ smp_wmb();
msr->r_msg = msg;And here again /* smp_barrier_pair: for ipc_msg_01 and ipc_msg_02 */
return 1;
@@ -934,10 +935,22 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
* wake_up_process(). There is a race with exit(), see
* ipc/mqueue.c for the details.
*/
- msg = (struct msg_msg *)msr_d.r_msg;
- while (msg == NULL) {
- cpu_relax();
+ for (;;) {
+ /*
+ * Pairs with writer barrier in pipelined_send
+ * or expunge_all
+ */
+ smp_rmb();