Re: [patch 58/63] futex: Prevent requeue_pi() lock nesting issue on RT
From: Peter Zijlstra
Date: Mon Aug 02 2021 - 08:56:43 EST
On Fri, Jul 30, 2021 at 03:51:05PM +0200, Thomas Gleixner wrote:
> +static inline bool futex_requeue_pi_prepare(struct futex_q *q,
> + struct futex_pi_state *pi_state)
> +{
> + int cur, res, new;
> +
> + /*
> + * Set state to Q_REQUEUE_PI_IN_PROGRESS unless an early wakeup has
> + * already set Q_REQUEUE_PI_IGNORE to signal that requeue should
> + * ignore the waiter.
> + */
> + for (cur = atomic_read(&q->requeue_state);; cur = res) {
> + if (cur == Q_REQUEUE_PI_IGNORE)
> + return false;
> +
> + /*
> + * futex_proxy_trylock_atomic() might have set it to
> + * IN_PROGRESS and a interleaved early wake to WAIT.
> + *
> + * It was considered to have an extra state for that
> + * trylock, but that would just add more conditionals
> + * all over the place for a dubious value.
> + */
> + if (cur != Q_REQUEUE_PI_NONE)
> + break;
> +
> + new = Q_REQUEUE_PI_IN_PROGRESS;
> + res = atomic_cmpxchg(&q->requeue_state, cur, new);
> + if (likely(cur == res))
> + break;
> + }
> + q->pi_state = pi_state;
> + return true;
> +}
> +
> +static inline void futex_requeue_pi_complete(struct futex_q *q, int locked)
> +{
> + int cur, res, new;
> +
> + for (cur = atomic_read(&q->requeue_state);; cur = res) {
> + if (locked >= 0) {
> + /* Requeue succeeded. Set DONE or LOCKED */
> + new = Q_REQUEUE_PI_DONE + locked;
> + } else if (cur == Q_REQUEUE_PI_IN_PROGRESS) {
> + /* Deadlock, no early wakeup interleave */
> + new = Q_REQUEUE_PI_NONE;
> + } else {
> + /* Deadlock, early wakeup interleave. */
> + new = Q_REQUEUE_PI_IGNORE;
> + }
> +
> + res = atomic_cmpxchg(&q->requeue_state, cur, new);
> + if (likely(cur == res))
> + break;
> + }
> +
> +#ifdef CONFIG_PREEMPT_RT
> + /* If the waiter interleaved with the requeue let it know */
> + if (unlikely(cur == Q_REQUEUE_PI_WAIT))
> + rcuwait_wake_up(&q->requeue_wait);
> +#endif
> +}
> +
> +static inline int futex_requeue_pi_wakeup_sync(struct futex_q *q)
> +{
> + int cur, new, res;
> +
> + for (cur = atomic_read(&q->requeue_state);; cur = res) {
> + /* Is requeue done already? */
> + if (cur >= Q_REQUEUE_PI_DONE)
> + break;
> +
> + /*
> + * If not done, then tell the requeue code to either ignore
> + * the waiter or to wake it up once the requeue is done.
> + */
> + new = !cur ? Q_REQUEUE_PI_IGNORE : Q_REQUEUE_PI_WAIT;
> + res = atomic_cmpxchg(&q->requeue_state, cur, new);
> + if (likely(cur == res))
> + break;
> + }
> +
> + /* If the requeue was in progress, wait for it to complete */
> + if (cur == Q_REQUEUE_PI_IN_PROGRESS) {
> +#ifdef CONFIG_PREEMPT_RT
> + rcuwait_wait_event(&q->requeue_wait,
> + atomic_read(&q->requeue_state) != Q_REQUEUE_PI_WAIT,
> + TASK_UNINTERRUPTIBLE);
> +#else
> + while (atomic_read(&q->requeue_state) == Q_REQUEUE_PI_WAIT)
> + cpu_relax();
> +#endif
> + }
> +
> + /*
> + * Requeue is now either prohibited or complete. Reread state
> + * because during the wait above it might have changed. Nothing
> + * will modify q->requeue_state after this point.
> + */
> + return atomic_read(&q->requeue_state);
> +}
I did:
- atomic_cmpxchg() -> atomic_try_cmpxchg()
- atomic_read() -> atomic_read_acquire(); which I think is required for
at least the futex_requeue_pi_wakeup_sync() >= Q_REQUEUE_PI_DONE case
to ensure we observe the state as per whoever set DONE/LOCKED.
- use atomic_cond_read_relaxed()
- removed one ternary operator for (IMO) clearer code.
--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -1853,15 +1853,16 @@ enum {
static inline bool futex_requeue_pi_prepare(struct futex_q *q,
struct futex_pi_state *pi_state)
{
- int cur, res, new;
+ int old, new;
/*
* Set state to Q_REQUEUE_PI_IN_PROGRESS unless an early wakeup has
* already set Q_REQUEUE_PI_IGNORE to signal that requeue should
* ignore the waiter.
*/
- for (cur = atomic_read(&q->requeue_state);; cur = res) {
- if (cur == Q_REQUEUE_PI_IGNORE)
+ old = atomic_read_acquire(&q->requeue_state);
+ do {
+ if (old == Q_REQUEUE_PI_IGNORE)
return false;
/*
@@ -1872,74 +1873,68 @@ static inline bool futex_requeue_pi_prep
* trylock, but that would just add more conditionals
* all over the place for a dubious value.
*/
- if (cur != Q_REQUEUE_PI_NONE)
+ if (old != Q_REQUEUE_PI_NONE)
break;
new = Q_REQUEUE_PI_IN_PROGRESS;
- res = atomic_cmpxchg(&q->requeue_state, cur, new);
- if (likely(cur == res))
- break;
- }
+ } while (!atomic_try_cmpxchg(&q->requeue_state, &old, new));
+
q->pi_state = pi_state;
return true;
}
static inline void futex_requeue_pi_complete(struct futex_q *q, int locked)
{
- int cur, res, new;
+ int old, new;
- for (cur = atomic_read(&q->requeue_state);; cur = res) {
+ old = atomic_read_acquire(&q->requeue_state);
+ do {
if (locked >= 0) {
/* Requeue succeeded. Set DONE or LOCKED */
new = Q_REQUEUE_PI_DONE + locked;
- } else if (cur == Q_REQUEUE_PI_IN_PROGRESS) {
+ } else if (old == Q_REQUEUE_PI_IN_PROGRESS) {
/* Deadlock, no early wakeup interleave */
new = Q_REQUEUE_PI_NONE;
} else {
/* Deadlock, early wakeup interleave. */
new = Q_REQUEUE_PI_IGNORE;
}
-
- res = atomic_cmpxchg(&q->requeue_state, cur, new);
- if (likely(cur == res))
- break;
- }
+ } while (!atomic_try_cmpxchg(&q->requeue_state, &old, new));
#ifdef CONFIG_PREEMPT_RT
/* If the waiter interleaved with the requeue let it know */
- if (unlikely(cur == Q_REQUEUE_PI_WAIT))
+ if (unlikely(old == Q_REQUEUE_PI_WAIT))
rcuwait_wake_up(&q->requeue_wait);
#endif
}
static inline int futex_requeue_pi_wakeup_sync(struct futex_q *q)
{
- int cur, new, res;
+ int old, new;
- for (cur = atomic_read(&q->requeue_state);; cur = res) {
+ old = atomic_read_acquire(&q->requeue_state);
+ do {
/* Is requeue done already? */
- if (cur >= Q_REQUEUE_PI_DONE)
+ if (old >= Q_REQUEUE_PI_DONE)
break;
/*
* If not done, then tell the requeue code to either ignore
* the waiter or to wake it up once the requeue is done.
*/
- new = !cur ? Q_REQUEUE_PI_IGNORE : Q_REQUEUE_PI_WAIT;
- res = atomic_cmpxchg(&q->requeue_state, cur, new);
- if (likely(cur == res))
- break;
- }
+ new = Q_REQUEUE_PI_WAIT;
+ if (old == Q_REQUEUE_PI_NONE)
+ new = Q_REQUEUE_PI_IGNORE;
+ } while (!atomic_try_cmpxchg(&q->requeue_state, &old, new));
/* If the requeue was in progress, wait for it to complete */
- if (cur == Q_REQUEUE_PI_IN_PROGRESS) {
+ if (old == Q_REQUEUE_PI_IN_PROGRESS) {
#ifdef CONFIG_PREEMPT_RT
rcuwait_wait_event(&q->requeue_wait,
atomic_read(&q->requeue_state) != Q_REQUEUE_PI_WAIT,
TASK_UNINTERRUPTIBLE);
#else
- while (atomic_read(&q->requeue_state) == Q_REQUEUE_PI_WAIT)
- cpu_relax();
+ (void)atomic_cond_read_relaxed(&q->requeue_state, VAL != Q_REQUEUE_PI_WAIT);
#endif
}