[PATCH -v3 06/10] locking/mutex: Add lock handoff to avoid starvation

From: Peter Zijlstra
Date: Mon Sep 05 2016 - 08:44:11 EST


Implement lock handoff to avoid lock starvation.

Lock starvation is possible because mutex_lock() allows lock stealing,
where a running (or optimistic spinning) task beats the woken waiter
to the acquire.

Lock stealing is an important performance optimization because waiting
for a waiter to wake up and get runtime can take a significant time,
during which everyboy would stall on the lock.

The down-side is of course that it allows for starvation.

This patch has the waiter requesting a handoff if it fails to acquire
the lock upon waking. This re-introduces some of the wait time,
because once we do a handoff we have to wait for the waiter to wake up
again.

A future patch will add a round of optimistic spinning to attempt to
alleviate this penalty, but if that turns out to not be enough, we can
add a counter and only request handoff after multiple failed wakeups.

There are a few tricky implementation details:

- accepting a handoff must only be done in the wait-loop. Since the
handoff condition is owner == current, it can easily cause
recursive locking trouble.

- accepting the handoff must be careful to provide the ACQUIRE
semantics.

- having the HANDOFF bit set on unlock requires care, we must not
clear the owner.

- we must be careful to not leave HANDOFF set after we've acquired
the lock. An alternative approach is to combine the bit set and a
trylock in one cmpxchg loop, which ensures we either steal/acquire
the lock or unlock must observe and clear HANDOFF.

Signed-off-by: Peter Zijlstra (Intel) <peterz@xxxxxxxxxxxxx>
---
kernel/locking/mutex.c | 139 +++++++++++++++++++++++++++++++++++++++++--------
1 file changed, 117 insertions(+), 22 deletions(-)

--- a/kernel/locking/mutex.c
+++ b/kernel/locking/mutex.c
@@ -54,8 +54,10 @@ EXPORT_SYMBOL(__mutex_init);
* bits to store extra state.
*
* Bit0 indicates a non-empty waiter list; unlock must issue a wakeup.
+ * Bit1 indicates unlock needs to hand the lock to the top-waiter
*/
#define MUTEX_FLAG_WAITERS 0x01
+#define MUTEX_FLAG_HANDOFF 0x02

#define MUTEX_FLAGS 0x03

@@ -71,8 +73,13 @@ static inline unsigned long __owner_flag

/*
* Actual trylock that will work on any unlocked state.
+ *
+ * When setting the owner field, we must preserve the low flag bits.
+ *
+ * Be careful with @handoff, only set that in a wait-loop (where you set
+ * HANDOFF) to avoid recursive lock attempts.
*/
-static inline bool __mutex_trylock(struct mutex *lock)
+static inline bool __mutex_trylock(struct mutex *lock, const bool handoff)
{
unsigned long owner, curr = (unsigned long)current;

@@ -80,8 +87,24 @@ static inline bool __mutex_trylock(struc
for (;;) { /* must loop, can race against a flag */
unsigned long old;

- if (__owner_task(owner))
+ if (__owner_task(owner)) {
+ if (handoff && unlikely(__owner_task(owner) == current)) {
+ /*
+ * Provide ACQUIRE semantics for the lock-handoff.
+ *
+ * We cannot easily use load-acquire here, since
+ * the actual load is a failed cmpxchg, which
+ * doesn't imply any barriers.
+ *
+ * Also, this is a fairly unlikely scenario, and
+ * this contains the cost.
+ */
+ smp_mb(); /* ACQUIRE */
+ return true;
+ }
+
return false;
+ }

old = atomic_long_cmpxchg_acquire(&lock->owner, owner,
curr | __owner_flags(owner));
@@ -134,6 +157,39 @@ static inline void __mutex_clear_flag(st
atomic_long_andnot(flag, &lock->owner);
}

+static inline bool __mutex_waiter_is_first(struct mutex *lock, struct mutex_waiter *waiter)
+{
+ return list_first_entry(&lock->wait_list, struct mutex_waiter, list) == waiter;
+}
+
+/*
+ * Give up ownership to a specific task, when @task = NULL, this is equivalent
+ * to a regular unlock. Clears HANDOFF, preserves WAITERS. Provides RELEASE
+ * semantics like a regular unlock, the __mutex_trylock() provides matching
+ * ACQUIRE semantics for the handoff.
+ */
+static void __mutex_handoff(struct mutex *lock, struct task_struct *task)
+{
+ unsigned long owner = atomic_long_read(&lock->owner);
+
+ for (;;) {
+ unsigned long old, new;
+
+#ifdef CONFIG_DEBUG_MUTEXES
+ DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current);
+#endif
+
+ new = (owner & MUTEX_FLAG_WAITERS);
+ new |= (unsigned long)task;
+
+ old = atomic_long_cmpxchg_release(&lock->owner, owner, new);
+ if (old == owner)
+ break;
+
+ owner = old;
+ }
+}
+
#ifndef CONFIG_DEBUG_LOCK_ALLOC
/*
* We split the mutex lock/unlock logic into separate fastpath and
@@ -398,7 +454,7 @@ static bool mutex_optimistic_spin(struct
break;

/* Try to acquire the mutex if it is unlocked. */
- if (__mutex_trylock(lock)) {
+ if (__mutex_trylock(lock, false)) {
osq_unlock(&lock->osq);
return true;
}
@@ -523,6 +579,7 @@ __mutex_lock_common(struct mutex *lock,
struct task_struct *task = current;
struct mutex_waiter waiter;
unsigned long flags;
+ bool first = false;
int ret;

if (use_ww_ctx) {
@@ -534,7 +591,8 @@ __mutex_lock_common(struct mutex *lock,
preempt_disable();
mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);

- if (__mutex_trylock(lock) || mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
+ if (__mutex_trylock(lock, false) ||
+ mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
/* got the lock, yay! */
lock_acquired(&lock->dep_map, ip);
if (use_ww_ctx) {
@@ -551,7 +609,7 @@ __mutex_lock_common(struct mutex *lock,
/*
* After waiting to acquire the wait_lock, try again.
*/
- if (__mutex_trylock(lock))
+ if (__mutex_trylock(lock, false))
goto skip_wait;

debug_mutex_lock_common(lock, &waiter);
@@ -561,15 +619,15 @@ __mutex_lock_common(struct mutex *lock,
list_add_tail(&waiter.list, &lock->wait_list);
waiter.task = task;

- if (list_first_entry(&lock->wait_list, struct mutex_waiter, list) == &waiter)
+ if (__mutex_waiter_is_first(lock, &waiter))
__mutex_set_flag(lock, MUTEX_FLAG_WAITERS);

+ if (__mutex_trylock(lock, false))
+ goto remove_waiter;
+
lock_contended(&lock->dep_map, ip);

for (;;) {
- if (__mutex_trylock(lock))
- break;
-
/*
* got a signal? (This code gets eliminated in the
* TASK_UNINTERRUPTIBLE case.)
@@ -586,17 +644,26 @@ __mutex_lock_common(struct mutex *lock,
}

__set_task_state(task, state);
-
- /* didn't get the lock, go to sleep: */
spin_unlock_mutex(&lock->wait_lock, flags);
schedule_preempt_disabled();
spin_lock_mutex(&lock->wait_lock, flags);
+
+ if (__mutex_waiter_is_first(lock, &waiter)) {
+ first = true;
+ __mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
+ }
+
+ if (__mutex_trylock(lock, true))
+ break;
}
__set_task_state(task, TASK_RUNNING);

+remove_waiter:
mutex_remove_waiter(lock, &waiter, task);
if (likely(list_empty(&lock->wait_list)))
- __mutex_clear_flag(lock, MUTEX_FLAG_WAITERS);
+ __mutex_clear_flag(lock, MUTEX_FLAGS);
+ else if (first && (atomic_long_read(&lock->owner) & MUTEX_FLAG_HANDOFF))
+ __mutex_clear_flag(lock, MUTEX_FLAG_HANDOFF);

debug_mutex_free_waiter(&waiter);

@@ -724,33 +791,61 @@ EXPORT_SYMBOL_GPL(__ww_mutex_lock_interr
*/
static noinline void __sched __mutex_unlock_slowpath(struct mutex *lock, unsigned long ip)
{
+ struct task_struct *next = NULL;
unsigned long owner, flags;
WAKE_Q(wake_q);

mutex_release(&lock->dep_map, 1, ip);

/*
- * Release the lock before (potentially) taking the spinlock
- * such that other contenders can get on with things ASAP.
+ * Release the lock before (potentially) taking the spinlock such that
+ * other contenders can get on with things ASAP.
+ *
+ * Except when HANDOFF, in that case we must not clear the owner field,
+ * but instead set it to the top waiter.
*/
- owner = atomic_long_fetch_and_release(MUTEX_FLAGS, &lock->owner);
- if (!owner)
- return;
+ owner = atomic_long_read(&lock->owner);
+ for (;;) {
+ unsigned long old;
+
+#ifdef CONFIG_DEBUG_MUTEXES
+ DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current);
+#endif
+
+ if (owner & MUTEX_FLAG_HANDOFF)
+ break;
+
+ old = atomic_long_cmpxchg_release(&lock->owner, owner,
+ __owner_flags(owner));
+ if (old == owner) {
+ if (owner & MUTEX_FLAG_WAITERS)
+ break;
+
+ return;
+ }
+
+ owner = old;
+ }

spin_lock_mutex(&lock->wait_lock, flags);
debug_mutex_unlock(lock);
-
if (!list_empty(&lock->wait_list)) {
/* get the first entry from the wait-list: */
struct mutex_waiter *waiter =
- list_entry(lock->wait_list.next,
- struct mutex_waiter, list);
+ list_first_entry(&lock->wait_list,
+ struct mutex_waiter, list);
+
+ next = waiter->task;

debug_mutex_wake_waiter(lock, waiter);
- wake_q_add(&wake_q, waiter->task);
+ wake_q_add(&wake_q, next);
}

+ if (owner & MUTEX_FLAG_HANDOFF)
+ __mutex_handoff(lock, next);
+
spin_unlock_mutex(&lock->wait_lock, flags);
+
wake_up_q(&wake_q);
}

@@ -853,7 +948,7 @@ __ww_mutex_lock_interruptible_slowpath(s
*/
int __sched mutex_trylock(struct mutex *lock)
{
- bool locked = __mutex_trylock(lock);
+ bool locked = __mutex_trylock(lock, false);

if (locked)
mutex_acquire(&lock->dep_map, 0, 1, _RET_IP_);