[PATCH 2/5] ipc/mqueue.c: Update/document memory barriers

From: Manfred Spraul
Date: Fri Oct 11 2019 - 07:20:33 EST


Update and document memory barriers for mqueue.c:
- ewp->state is read without any locks, thus READ_ONCE is required.

- add smp_aquire__after_ctrl_dep() after the RAED_ONCE, we need
acquire semantics if the value is STATE_READY.

- document that the code relies on the barrier inside wake_q_add()

- document why __set_current_state() may be used:
Reading task->state cannot happen before the wake_q_add() call,
which happens while holding info->lock.

Signed-off-by: Manfred Spraul <manfred@xxxxxxxxxxxxxxxx>
Cc: Waiman Long <longman@xxxxxxxxxx>
Cc: Davidlohr Bueso <dave@xxxxxxxxxxxx>
---
ipc/mqueue.c | 32 +++++++++++++++++++++-----------
1 file changed, 21 insertions(+), 11 deletions(-)

diff --git a/ipc/mqueue.c b/ipc/mqueue.c
index 3d920ff15c80..902167407737 100644
--- a/ipc/mqueue.c
+++ b/ipc/mqueue.c
@@ -646,17 +646,25 @@ static int wq_sleep(struct mqueue_inode_info *info, int sr,
wq_add(info, sr, ewp);

for (;;) {
+ /* memory barrier not required, we hold info->lock */
__set_current_state(TASK_INTERRUPTIBLE);

spin_unlock(&info->lock);
time = schedule_hrtimeout_range_clock(timeout, 0,
HRTIMER_MODE_ABS, CLOCK_REALTIME);

- if (ewp->state == STATE_READY) {
+ if (READ_ONCE(ewp->state) == STATE_READY) {
+ /*
+ * Pairs, together with READ_ONCE(), with
+ * the barrier in wake_q_add().
+ */
+ smp_acquire__after_ctrl_dep();
retval = 0;
goto out;
}
spin_lock(&info->lock);
+
+ /* we hold info->lock, so no memory barrier required */
if (ewp->state == STATE_READY) {
retval = 0;
goto out_unlock;
@@ -928,16 +936,11 @@ static inline void pipelined_send(struct wake_q_head *wake_q,
{
receiver->msg = message;
list_del(&receiver->list);
+
wake_q_add(wake_q, receiver->task);
- /*
- * Rely on the implicit cmpxchg barrier from wake_q_add such
- * that we can ensure that updating receiver->state is the last
- * write operation: As once set, the receiver can continue,
- * and if we don't have the reference count from the wake_q,
- * yet, at that point we can later have a use-after-free
- * condition and bogus wakeup.
- */
- receiver->state = STATE_READY;
+
+ /* The memory barrier is provided by wake_q_add(). */
+ WRITE_ONCE(receiver->state, STATE_READY);
}

/* pipelined_receive() - if there is task waiting in sys_mq_timedsend()
@@ -956,8 +959,11 @@ static inline void pipelined_receive(struct wake_q_head *wake_q,
return;

list_del(&sender->list);
+
wake_q_add(wake_q, sender->task);
- sender->state = STATE_READY;
+
+ /* The memory barrier is provided by wake_q_add(). */
+ WRITE_ONCE(sender->state, STATE_READY);
}

static int do_mq_timedsend(mqd_t mqdes, const char __user *u_msg_ptr,
@@ -1044,6 +1050,8 @@ static int do_mq_timedsend(mqd_t mqdes, const char __user *u_msg_ptr,
} else {
wait.task = current;
wait.msg = (void *) msg_ptr;
+
+ /* memory barrier not required, we hold info->lock */
wait.state = STATE_NONE;
ret = wq_sleep(info, SEND, timeout, &wait);
/*
@@ -1147,6 +1155,8 @@ static int do_mq_timedreceive(mqd_t mqdes, char __user *u_msg_ptr,
ret = -EAGAIN;
} else {
wait.task = current;
+
+ /* memory barrier not required, we hold info->lock */
wait.state = STATE_NONE;
ret = wq_sleep(info, RECV, timeout, &wait);
msg_ptr = wait.msg;
--
2.21.0