[RFC][PATCH 4/4] futex: Rewrite FUTEX_UNLOCK_PI
From: Peter Zijlstra
Date: Mon Oct 03 2016 - 05:25:08 EST
There's a number of 'interesting' problems with FUTEX_UNLOCK_PI, all
caused by holding hb->lock while doing the rt_mutex_unlock()
equivalient.
This patch doesn't attempt to fix any of the actual problems, but
instead reworks the code to not hold hb->lock across the unlock,
paving the way to actually fix the problems later.
The current reason we hold hb->lock over unlock is that it serializes
against FUTEX_LOCK_PI and avoids new waiters from coming in, this then
ensures the rt_mutex_next_owner() value is stable and can be written
into the user-space futex value before doing the unlock. Such that the
unlock will indeed end up at new_owner.
This patch recognises that holding rt_mutex::wait_lock results in the
very same guarantee, no new waiters can come in while we hold that
lock -- after all, waiters would need this lock to queue themselves.
It therefore restructures the code to keep rt_mutex::wait_lock held.
This (of course) is not entirely straight forward either, see the
comment in rt_mutex_slowunlock(), doing the unlock itself might drop
wait_lock, letting new waiters in. To cure this
rt_mutex_futex_unlock() becomes a variant of rt_mutex_slowunlock()
that return -EAGAIN instead. This ensures the FUTEX_UNLOCK_PI code
aborts and restarts the entire operation.
Signed-off-by: Peter Zijlstra (Intel) <peterz@xxxxxxxxxxxxx>
---
kernel/futex.c | 63 +++++++++++++++++--------------
kernel/locking/rtmutex.c | 81 ++++++++++++++++++++++++++++++++++++----
kernel/locking/rtmutex_common.h | 4 -
3 files changed, 110 insertions(+), 38 deletions(-)
--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -1294,24 +1294,21 @@ static void mark_wake_futex(struct wake_
static int wake_futex_pi(u32 __user *uaddr, u32 uval, struct futex_q *top_waiter,
struct futex_hash_bucket *hb)
{
- struct task_struct *new_owner;
struct futex_pi_state *pi_state = top_waiter->pi_state;
u32 uninitialized_var(curval), newval;
+ struct task_struct *new_owner;
WAKE_Q(wake_q);
- bool deboost;
int ret = 0;
- if (!pi_state)
- return -EINVAL;
+ raw_spin_lock_irq(&pi_state->pi_mutex.wait_lock);
+ WARN_ON_ONCE(!atomic_inc_not_zero(&pi_state->refcount));
/*
- * If current does not own the pi_state then the futex is
- * inconsistent and user space fiddled with the futex value.
+ * Now that we hold wait_lock, no new waiters can happen on the
+ * rt_mutex and new owner is stable. Drop hb->lock.
*/
- if (pi_state->owner != current)
- return -EINVAL;
+ spin_unlock(&hb->lock);
- raw_spin_lock_irq(&pi_state->pi_mutex.wait_lock);
new_owner = rt_mutex_next_owner(&pi_state->pi_mutex);
/*
@@ -1334,6 +1331,7 @@ static int wake_futex_pi(u32 __user *uad
if (cmpxchg_futex_value_locked(&curval, uaddr, uval, newval)) {
ret = -EFAULT;
+
} else if (curval != uval) {
/*
* If a unconditional UNLOCK_PI operation (user space did not
@@ -1346,10 +1344,9 @@ static int wake_futex_pi(u32 __user *uad
else
ret = -EINVAL;
}
- if (ret) {
- raw_spin_unlock_irq(&pi_state->pi_mutex.wait_lock);
- return ret;
- }
+
+ if (ret)
+ goto out_unlock;
raw_spin_lock(&pi_state->owner->pi_lock);
WARN_ON(list_empty(&pi_state->list));
@@ -1362,22 +1359,20 @@ static int wake_futex_pi(u32 __user *uad
pi_state->owner = new_owner;
raw_spin_unlock(&new_owner->pi_lock);
- raw_spin_unlock_irq(&pi_state->pi_mutex.wait_lock);
+ ret = rt_mutex_futex_unlock(&pi_state->pi_mutex, &wake_q);
- deboost = rt_mutex_futex_unlock(&pi_state->pi_mutex, &wake_q);
+out_unlock:
+ raw_spin_unlock_irq(&pi_state->pi_mutex.wait_lock);
- /*
- * First unlock HB so the waiter does not spin on it once he got woken
- * up. Second wake up the waiter before the priority is adjusted. If we
- * deboost first (and lose our higher priority), then the task might get
- * scheduled away before the wake up can take place.
- */
- spin_unlock(&hb->lock);
- wake_up_q(&wake_q);
- if (deboost)
+ if (ret > 0) {
+ wake_up_q(&wake_q);
rt_mutex_adjust_prio(current);
+ ret = 0;
+ }
- return 0;
+ put_pi_state(pi_state);
+
+ return ret;
}
/*
@@ -2656,6 +2651,20 @@ static int futex_unlock_pi(u32 __user *u
*/
top_waiter = futex_top_waiter(hb, &key);
if (top_waiter) {
+
+ if (!top_waiter->pi_state)
+ goto out_unlock;
+
+ /*
+ * If current does not own the pi_state then the futex is
+ * inconsistent and user space fiddled with the futex value.
+ */
+ if (top_waiter->pi_state->owner != current)
+ goto out_unlock;
+
+ /*
+ * wait_futex_pi() _will_ drop hb->lock
+ */
ret = wake_futex_pi(uaddr, uval, top_waiter, hb);
/*
* In case of success wake_futex_pi dropped the hash
@@ -2674,7 +2683,6 @@ static int futex_unlock_pi(u32 __user *u
* setting the FUTEX_WAITERS bit. Try again.
*/
if (ret == -EAGAIN) {
- spin_unlock(&hb->lock);
put_futex_key(&key);
goto retry;
}
@@ -2682,7 +2690,7 @@ static int futex_unlock_pi(u32 __user *u
* wake_futex_pi has detected invalid state. Tell user
* space.
*/
- goto out_unlock;
+ goto out_putkey;
}
/*
@@ -2707,7 +2715,6 @@ static int futex_unlock_pi(u32 __user *u
return ret;
pi_faulted:
- spin_unlock(&hb->lock);
put_futex_key(&key);
ret = fault_in_user_writeable(uaddr);
--- a/kernel/locking/rtmutex.c
+++ b/kernel/locking/rtmutex.c
@@ -1489,19 +1489,84 @@ void __sched rt_mutex_unlock(struct rt_m
EXPORT_SYMBOL_GPL(rt_mutex_unlock);
/**
- * rt_mutex_futex_unlock - Futex variant of rt_mutex_unlock
+ * rt_mutex_futex_unlock - Futex variant of rt_mutex_slowunlock
* @lock: the rt_mutex to be unlocked
+ * @wqh: wake queue
*
- * Returns: true/false indicating whether priority adjustment is
- * required or not.
+ * This cannot just be rt_mutex_slowunlock() since that does the wait_lock
+ * dance, and the futex code is tricky (ha!) and uses the wait_lock
+ * to hold off new waiters, so dropping this lock invalidates prior state
+ * and we need to redo all of it.
+ *
+ * Returns:
+ * -EAGAIN: if we need to retry the whole operation;
+ * 1: if we need to wake and deboost;
+ * 0: if we're done.
*/
-bool __sched rt_mutex_futex_unlock(struct rt_mutex *lock,
- struct wake_q_head *wqh)
+int __sched rt_mutex_futex_unlock(struct rt_mutex *lock,
+ struct wake_q_head *wake_q)
{
- if (likely(rt_mutex_cmpxchg_release(lock, current, NULL)))
- return false;
+ lockdep_assert_held(&lock->wait_lock);
+
+ debug_rt_mutex_unlock(lock);
+
+ /*
+ * We must be careful here if the fast path is enabled. If we
+ * have no waiters queued we cannot set owner to NULL here
+ * because of:
+ *
+ * foo->lock->owner = NULL;
+ * rtmutex_lock(foo->lock); <- fast path
+ * free = atomic_dec_and_test(foo->refcnt);
+ * rtmutex_unlock(foo->lock); <- fast path
+ * if (free)
+ * kfree(foo);
+ * raw_spin_unlock(foo->lock->wait_lock);
+ *
+ * So for the fastpath enabled kernel:
+ *
+ * Nothing can set the waiters bit as long as we hold
+ * lock->wait_lock. So we do the following sequence:
+ *
+ * owner = rt_mutex_owner(lock);
+ * clear_rt_mutex_waiters(lock);
+ * raw_spin_unlock(&lock->wait_lock);
+ * if (cmpxchg(&lock->owner, owner, 0) == owner)
+ * return;
+ * goto retry;
+ *
+ * The fastpath disabled variant is simple as all access to
+ * lock->owner is serialized by lock->wait_lock:
+ *
+ * lock->owner = NULL;
+ * raw_spin_unlock(&lock->wait_lock);
+ */
+ if (!rt_mutex_has_waiters(lock)) {
+ unsigned long flags;
+ int ret = 0;
+
+ local_save_flags(flags);
+
+ /* Drops lock->wait_lock ! */
+ if (unlock_rt_mutex_safe(lock, flags))
+ ret = -EAGAIN;
+
+ /* Relock the rtmutex and try again */
+ raw_spin_lock_irq(&lock->wait_lock);
+
+ return ret;
+ }
+
+ /*
+ * The wakeup next waiter path does not suffer from the above
+ * race. See the comments there.
+ *
+ * Queue the next waiter for wakeup once we release the wait_lock.
+ */
+ mark_wakeup_next_waiter(wake_q, lock);
- return rt_mutex_slowunlock(lock, wqh);
+ /* Do wakeups and deboost. */
+ return 1;
}
/**
--- a/kernel/locking/rtmutex_common.h
+++ b/kernel/locking/rtmutex_common.h
@@ -109,8 +109,8 @@ extern int rt_mutex_finish_proxy_lock(st
struct hrtimer_sleeper *to,
struct rt_mutex_waiter *waiter);
extern int rt_mutex_timed_futex_lock(struct rt_mutex *l, struct hrtimer_sleeper *to);
-extern bool rt_mutex_futex_unlock(struct rt_mutex *lock,
- struct wake_q_head *wqh);
+extern int rt_mutex_futex_unlock(struct rt_mutex *lock,
+ struct wake_q_head *wqh);
extern void rt_mutex_adjust_prio(struct task_struct *task);
#ifdef CONFIG_DEBUG_RT_MUTEXES