[PATCH 5/6] ipc/sem.c: Document and update memory barriers

From: Manfred Spraul
Date: Sat Oct 12 2019 - 01:50:30 EST


The patch documents and updates the memory barriers in ipc/sem.c:
- Add smp_store_release() to wake_up_sem_queue_prepare() and
document why it is needed.

- Read q->status using READ_ONCE+smp_acquire__after_ctrl_dep().
as the pair for the barrier inside wake_up_sem_queue_prepare().

- Add comments to all barriers, and mention the rules in the block
regarding locking.

Signed-off-by: Manfred Spraul <manfred@xxxxxxxxxxxxxxxx>
Cc: Waiman Long <longman@xxxxxxxxxx>
Cc: Davidlohr Bueso <dave@xxxxxxxxxxxx>
---
ipc/sem.c | 63 ++++++++++++++++++++++++++++++++++++++++++++-----------
1 file changed, 51 insertions(+), 12 deletions(-)

diff --git a/ipc/sem.c b/ipc/sem.c
index ec97a7072413..c6c5954a2030 100644
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -205,7 +205,9 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it);
*
* Memory ordering:
* Most ordering is enforced by using spin_lock() and spin_unlock().
- * The special case is use_global_lock:
+ *
+ * Exceptions:
+ * 1) use_global_lock:
* Setting it from non-zero to 0 is a RELEASE, this is ensured by
* using smp_store_release().
* Testing if it is non-zero is an ACQUIRE, this is ensured by using
@@ -214,6 +216,24 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it);
* this smp_load_acquire(), this is guaranteed because the smp_load_acquire()
* is inside a spin_lock() and after a write from 0 to non-zero a
* spin_lock()+spin_unlock() is done.
+ *
+ * 2) queue.status:
+ * Initialization is done while holding sem_lock(), so no further barrier is
+ * required.
+ * Setting it to a result code is a RELEASE, this is ensured by both a
+ * smp_store_release() (for case a) and while holding sem_lock()
+ * (for case b).
+ * The AQUIRE when reading the result code without holding sem_lock() is
+ * achieved by using READ_ONCE() + smp_acquire__after_ctrl_dep().
+ * (case a above).
+ * Reading the result code while holding sem_lock() needs no further barriers,
+ * the locks inside sem_lock() enforce ordering (case b above)
+ *
+ * 3) current->state:
+ * current->state is set to TASK_INTERRUPTIBLE while holding sem_lock().
+ * The wakeup is handled using the wake_q infrastructure. wake_q wakeups may
+ * happen immediately after calling wake_q_add. As wake_q_add() is called
+ * when holding sem_lock(), no further barriers are required.
*/

#define sc_semmsl sem_ctls[0]
@@ -766,15 +786,24 @@ static int perform_atomic_semop(struct sem_array *sma, struct sem_queue *q)
static inline void wake_up_sem_queue_prepare(struct sem_queue *q, int error,
struct wake_q_head *wake_q)
{
+ /*
+ * When the wakeup is performed, q->sleeper->state is read and later
+ * set to TASK_RUNNING. This may happen at any time, even before
+ * wake_q_add() returns. Memory ordering for q->sleeper->state is
+ * enforced by sem_lock(): we own sem_lock now (that was the ACQUIRE),
+ * and q->sleeper wrote q->sleeper->state before calling sem_unlock()
+ * (->RELEASE).
+ */
wake_q_add(wake_q, q->sleeper);
/*
- * Rely on the above implicit barrier, such that we can
- * ensure that we hold reference to the task before setting
- * q->status. Otherwise we could race with do_exit if the
- * task is awoken by an external event before calling
- * wake_up_process().
+ * Here, we need a barrier to protect the refcount increase inside
+ * wake_q_add().
+ * case a: The barrier inside wake_q_add() pairs with
+ * READ_ONCE(q->status) + smp_acquire__after_ctrl_dep() in
+ * do_semtimedop().
+ * case b: nothing, ordering is enforced by the locks in sem_lock().
*/
- WRITE_ONCE(q->status, error);
+ smp_store_release(&q->status, error);
}

static void unlink_queue(struct sem_array *sma, struct sem_queue *q)
@@ -2148,9 +2177,11 @@ static long do_semtimedop(int semid, struct sembuf __user *tsops,
}

do {
+ /* memory ordering ensured by the lock in sem_lock() */
WRITE_ONCE(queue.status, -EINTR);
queue.sleeper = current;

+ /* memory ordering is ensured by the lock in sem_lock() */
__set_current_state(TASK_INTERRUPTIBLE);
sem_unlock(sma, locknum);
rcu_read_unlock();
@@ -2174,12 +2205,16 @@ static long do_semtimedop(int semid, struct sembuf __user *tsops,
error = READ_ONCE(queue.status);
if (error != -EINTR) {
/*
- * User space could assume that semop() is a memory
- * barrier: Without the mb(), the cpu could
- * speculatively read in userspace stale data that was
- * overwritten by the previous owner of the semaphore.
+ * Memory barrier for queue.status, case a):
+ * The smp_acquire__after_ctrl_dep(), together with the
+ * READ_ONCE() above pairs with the barrier inside
+ * wake_up_sem_queue_prepare().
+ * The barrier protects user space, too: User space may
+ * assume that all data from the CPU that did the wakeup
+ * semop() is visible on the wakee CPU when the sleeping
+ * semop() returns.
*/
- smp_mb();
+ smp_acquire__after_ctrl_dep();
goto out_free;
}

@@ -2189,6 +2224,10 @@ static long do_semtimedop(int semid, struct sembuf __user *tsops,
if (!ipc_valid_object(&sma->sem_perm))
goto out_unlock_free;

+ /*
+ * No necessity for any barrier:
+ * We are protect by sem_lock() (case b)
+ */
error = READ_ONCE(queue.status);

/*
--
2.21.0