[PATCH v2 3/4] locking/mutex: Avoid missed wakeup of mutex waiter

From: Waiman Long
Date: Fri Feb 12 2016 - 12:32:44 EST


The current mutex code sets count to -1 and then sets the task
state. This is the same sequence that the mutex unlock path is checking
count and task state. That could lead to a missed wakeup even though
the problem will be cleared when a new waiter enters the waiting queue.

This patch reverses the order in the locking slowpath so that the task
state is set first before setting the count. This should eliminate
the potential missed wakeup and improve latency.

Signed-off-by: Waiman Long <Waiman.Long@xxxxxx>
---
kernel/locking/mutex.c | 46 +++++++++++++++++++++++++++++++---------------
1 files changed, 31 insertions(+), 15 deletions(-)

diff --git a/kernel/locking/mutex.c b/kernel/locking/mutex.c
index 29c6d90..27123bd 100644
--- a/kernel/locking/mutex.c
+++ b/kernel/locking/mutex.c
@@ -564,20 +564,6 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,

while (!acquired) {
/*
- * Lets try to take the lock again - this is needed even if
- * we get here for the first time (shortly after failing to
- * acquire the lock), to make sure that we get a wakeup once
- * it's unlocked. Later on, if we sleep, this is the
- * operation that gives us the lock. We xchg it to -1, so
- * that when we release the lock, we properly wake up the
- * other waiters. We only attempt the xchg if the count is
- * non-negative in order to avoid unnecessary xchg operations:
- */
- if (atomic_read(&lock->count) >= 0 &&
- (atomic_xchg_acquire(&lock->count, -1) == 1))
- break;
-
- /*
* got a signal? (This code gets eliminated in the
* TASK_UNINTERRUPTIBLE case.)
*/
@@ -592,7 +578,37 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
goto err;
}

- __set_task_state(task, state);
+
+ /*
+ * We need to set the state first before changing the count
+ * to -1 to avoid missed wakeup even though the problem can
+ * be cleared by a new waiter entering the queue.
+ *
+ * Sleep Wakeup
+ * ----- ------
+ * [S] p->state = state [RmW] count = 1
+ * MB MB
+ * [RmW] count = -1 [L] if ((prev_count < 0) &&
+ * if (prev_count < 1) (p->state & state))
+ * sleep wakeup
+ */
+ set_task_state(task, state);
+
+ /*
+ * Lets try to take the lock again - this is needed even if
+ * we get here for the first time (shortly after failing to
+ * acquire the lock), to make sure that we get a wakeup once
+ * it's unlocked. Later on, if we sleep, this is the
+ * operation that gives us the lock. We xchg it to -1, so
+ * that when we release the lock, we properly wake up the
+ * other waiters. We only attempt the xchg if the count is
+ * non-negative in order to avoid unnecessary xchg operations:
+ */
+ if (atomic_read(&lock->count) >= 0 &&
+ (atomic_xchg_acquire(&lock->count, -1) == 1)) {
+ __set_task_state(task, TASK_RUNNING);
+ break;
+ }

/* didn't get the lock, go to sleep: */
spin_unlock_mutex(&lock->wait_lock, flags);
--
1.7.1