Re: timers: HARDIRQ-safe -> HARDIRQ-unsafe lock order detected

From: Thomas Gleixner
Date: Thu Jan 14 2016 - 14:48:51 EST


On Thu, 14 Jan 2016, Paul E. McKenney wrote:
> On Thu, Jan 14, 2016 at 06:43:16PM +0100, Thomas Gleixner wrote:
> > Right. We have the pattern
> >
> > rcu_read_lock();
> > x = lookup();
> > if (x)
> > keep_hold(x)
> > rcu_read_unlock();
> > return x;
> >
> > all over the place. Now that keep_hold() can be everything from a refcount to
> > a spinlock and I'm not sure that we can force stuff depending on the mechanism
> > to be completely symetric. So we are probably better off by making that rcu
> > unlock machinery more robust.
>
> OK. If I read the lockdep reports correctly, the issue occurs
> when rcu_read_unlock_special() finds that it needs to unboost,
> which means doing an rt_mutex_unlock(). This is done outside of
> rcu_read_unlock_special()'s irq-disabled region, but of course the caller
> might have disabled irqs.
>
> If I remember correctly, disabling irqs across rt_mutex_unlock() gets
> me lockdep splats.

That shouldn't be the case. The splats come from this scenario:

CPU0 CPU1
rtmutex_lock(rcu)
raw_spin_lock(&rcu->lock)
rcu_read_lock()
Interrupt spin_lock_irq(some->lock);
rcu_read_unlock()
rcu_read_unlock_special()
rtmutex_unlock(rcu)
raw_spin_lock(&rcu->lock)
spin_lock(some->lock)

Now we dead locked.

> I could imagine having a per-CPU pointer to rt_mutex that
> rcu_read_unlock() sets, and that is checked at every point that irqs
> are enabled, with a call to rt_mutex_unlock() if that pointer is non-NULL.
>
> But perhaps you had something else in mind?

We can solve that issue by taking rtmutex->wait_lock with irqsave. So the
above becomes:

CPU0 CPU1
rtmutex_lock(rcu)
raw_spin_lock_irq(&rcu->lock)
rcu_read_lock()
spin_lock_irq(some->lock);
rcu_read_unlock()
rcu_read_unlock_special()
rtmutex_unlock(rcu)
raw_spin_lock_irqsave(&rcu->lock, flags)
raw_spin_unlock_irq(&rcu->lock)
Interrupt ...
spin_lock(some->lock) raw_spin_unlock_irqrestore(&rcu->lock, flags)
...
spin_unlock_irq(some->lock);

Untested patch below.

Thanks,

tglx

8<-------------------------

Subject: rtmutex: Make wait_lock irq safe
From: Thomas Gleixner <tglx@xxxxxxxxxxxxx>
Date: Wed, 13 Jan 2016 11:25:38 +0100

Add some blurb here.

Not-Yet-Signed-off-by: Thomas Gleixner <tglx@xxxxxxxxxxxxx>
---
kernel/futex.c | 18 +++----
kernel/locking/rtmutex.c | 111 +++++++++++++++++++++++++----------------------
2 files changed, 69 insertions(+), 60 deletions(-)

--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -1226,7 +1226,7 @@ static int wake_futex_pi(u32 __user *uad
if (pi_state->owner != current)
return -EINVAL;

- raw_spin_lock(&pi_state->pi_mutex.wait_lock);
+ raw_spin_lock_irq(&pi_state->pi_mutex.wait_lock);
new_owner = rt_mutex_next_owner(&pi_state->pi_mutex);

/*
@@ -1252,22 +1252,22 @@ static int wake_futex_pi(u32 __user *uad
else if (curval != uval)
ret = -EINVAL;
if (ret) {
- raw_spin_unlock(&pi_state->pi_mutex.wait_lock);
+ raw_spin_unlock_irq(&pi_state->pi_mutex.wait_lock);
return ret;
}

- raw_spin_lock_irq(&pi_state->owner->pi_lock);
+ raw_spin_lock(&pi_state->owner->pi_lock);
WARN_ON(list_empty(&pi_state->list));
list_del_init(&pi_state->list);
- raw_spin_unlock_irq(&pi_state->owner->pi_lock);
+ raw_spin_unlock(&pi_state->owner->pi_lock);

- raw_spin_lock_irq(&new_owner->pi_lock);
+ raw_spin_lock(&new_owner->pi_lock);
WARN_ON(!list_empty(&pi_state->list));
list_add(&pi_state->list, &new_owner->pi_state_list);
pi_state->owner = new_owner;
- raw_spin_unlock_irq(&new_owner->pi_lock);
+ raw_spin_unlock(&new_owner->pi_lock);

- raw_spin_unlock(&pi_state->pi_mutex.wait_lock);
+ raw_spin_unlock_irq(&pi_state->pi_mutex.wait_lock);

deboost = rt_mutex_futex_unlock(&pi_state->pi_mutex, &wake_q);

@@ -2162,11 +2162,11 @@ static int fixup_owner(u32 __user *uaddr
* we returned due to timeout or signal without taking the
* rt_mutex. Too late.
*/
- raw_spin_lock(&q->pi_state->pi_mutex.wait_lock);
+ raw_spin_lock_irq(&q->pi_state->pi_mutex.wait_lock);
owner = rt_mutex_owner(&q->pi_state->pi_mutex);
if (!owner)
owner = rt_mutex_next_owner(&q->pi_state->pi_mutex);
- raw_spin_unlock(&q->pi_state->pi_mutex.wait_lock);
+ raw_spin_unlock_irq(&q->pi_state->pi_mutex.wait_lock);
ret = fixup_pi_state_owner(uaddr, q, owner);
goto out;
}
--- a/kernel/locking/rtmutex.c
+++ b/kernel/locking/rtmutex.c
@@ -99,13 +99,14 @@ static inline void mark_rt_mutex_waiters
* 2) Drop lock->wait_lock
* 3) Try to unlock the lock with cmpxchg
*/
-static inline bool unlock_rt_mutex_safe(struct rt_mutex *lock)
+static inline bool unlock_rt_mutex_safe(struct rt_mutex *lock,
+ unsigned long flags)
__releases(lock->wait_lock)
{
struct task_struct *owner = rt_mutex_owner(lock);

clear_rt_mutex_waiters(lock);
- raw_spin_unlock(&lock->wait_lock);
+ raw_spin_unlock_irqrestore(&lock->wait_lock, flags);
/*
* If a new waiter comes in between the unlock and the cmpxchg
* we have two situations:
@@ -147,11 +148,12 @@ static inline void mark_rt_mutex_waiters
/*
* Simple slow path only version: lock->owner is protected by lock->wait_lock.
*/
-static inline bool unlock_rt_mutex_safe(struct rt_mutex *lock)
+static inline bool unlock_rt_mutex_safe(struct rt_mutex *lock,
+ unsigned long flags)
__releases(lock->wait_lock)
{
lock->owner = NULL;
- raw_spin_unlock(&lock->wait_lock);
+ raw_spin_unlock_irqrestore(&lock->wait_lock, flags);
return true;
}
#endif
@@ -591,7 +593,7 @@ static int rt_mutex_adjust_prio_chain(st
/*
* No requeue[7] here. Just release @task [8]
*/
- raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+ raw_spin_unlock(&task->pi_lock);
put_task_struct(task);

/*
@@ -599,14 +601,14 @@ static int rt_mutex_adjust_prio_chain(st
* If there is no owner of the lock, end of chain.
*/
if (!rt_mutex_owner(lock)) {
- raw_spin_unlock(&lock->wait_lock);
+ raw_spin_unlock_irqrestore(&lock->wait_lock, flags);
return 0;
}

/* [10] Grab the next task, i.e. owner of @lock */
task = rt_mutex_owner(lock);
get_task_struct(task);
- raw_spin_lock_irqsave(&task->pi_lock, flags);
+ raw_spin_lock(&task->pi_lock);

/*
* No requeue [11] here. We just do deadlock detection.
@@ -621,8 +623,8 @@ static int rt_mutex_adjust_prio_chain(st
top_waiter = rt_mutex_top_waiter(lock);

/* [13] Drop locks */
- raw_spin_unlock_irqrestore(&task->pi_lock, flags);
- raw_spin_unlock(&lock->wait_lock);
+ raw_spin_unlock(&task->pi_lock);
+ raw_spin_unlock_irqrestore(&lock->wait_lock, flags);

/* If owner is not blocked, end of chain. */
if (!next_lock)
@@ -643,7 +645,7 @@ static int rt_mutex_adjust_prio_chain(st
rt_mutex_enqueue(lock, waiter);

/* [8] Release the task */
- raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+ raw_spin_unlock(&task->pi_lock, flags);
put_task_struct(task);

/*
@@ -661,14 +663,14 @@ static int rt_mutex_adjust_prio_chain(st
*/
if (prerequeue_top_waiter != rt_mutex_top_waiter(lock))
wake_up_process(rt_mutex_top_waiter(lock)->task);
- raw_spin_unlock(&lock->wait_lock);
+ raw_spin_unlock_irqrestore(&lock->wait_lock, flags);
return 0;
}

/* [10] Grab the next task, i.e. the owner of @lock */
task = rt_mutex_owner(lock);
get_task_struct(task);
- raw_spin_lock_irqsave(&task->pi_lock, flags);
+ raw_spin_lock(&task->pi_lock);

/* [11] requeue the pi waiters if necessary */
if (waiter == rt_mutex_top_waiter(lock)) {
@@ -722,8 +724,8 @@ static int rt_mutex_adjust_prio_chain(st
top_waiter = rt_mutex_top_waiter(lock);

/* [13] Drop the locks */
- raw_spin_unlock_irqrestore(&task->pi_lock, flags);
- raw_spin_unlock(&lock->wait_lock);
+ raw_spin_unlock(&task->pi_lock);
+ raw_spin_unlock_irqrestore(&lock->wait_lock, flags);

/*
* Make the actual exit decisions [12], based on the stored
@@ -766,8 +768,6 @@ static int rt_mutex_adjust_prio_chain(st
static int try_to_take_rt_mutex(struct rt_mutex *lock, struct task_struct *task,
struct rt_mutex_waiter *waiter)
{
- unsigned long flags;
-
/*
* Before testing whether we can acquire @lock, we set the
* RT_MUTEX_HAS_WAITERS bit in @lock->owner. This forces all
@@ -852,7 +852,7 @@ static int try_to_take_rt_mutex(struct r
* case, but conditionals are more expensive than a redundant
* store.
*/
- raw_spin_lock_irqsave(&task->pi_lock, flags);
+ raw_spin_lock(&task->pi_lock);
task->pi_blocked_on = NULL;
/*
* Finish the lock acquisition. @task is the new owner. If
@@ -861,7 +861,7 @@ static int try_to_take_rt_mutex(struct r
*/
if (rt_mutex_has_waiters(lock))
rt_mutex_enqueue_pi(task, rt_mutex_top_waiter(lock));
- raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+ raw_spin_unlock(&task->pi_lock);

takeit:
/* We got the lock. */
@@ -894,7 +894,6 @@ static int task_blocks_on_rt_mutex(struc
struct rt_mutex_waiter *top_waiter = waiter;
struct rt_mutex *next_lock;
int chain_walk = 0, res;
- unsigned long flags;

/*
* Early deadlock detection. We really don't want the task to
@@ -908,7 +907,7 @@ static int task_blocks_on_rt_mutex(struc
if (owner == task)
return -EDEADLK;

- raw_spin_lock_irqsave(&task->pi_lock, flags);
+ raw_spin_lock(&task->pi_lock);
__rt_mutex_adjust_prio(task);
waiter->task = task;
waiter->lock = lock;
@@ -921,12 +920,12 @@ static int task_blocks_on_rt_mutex(struc

task->pi_blocked_on = waiter;

- raw_spin_unlock_irqrestore(&task->pi_lock, flags);
+ raw_spin_unlock(&task->pi_lock);

if (!owner)
return 0;

- raw_spin_lock_irqsave(&owner->pi_lock, flags);
+ raw_spin_lock(&owner->pi_lock);
if (waiter == rt_mutex_top_waiter(lock)) {
rt_mutex_dequeue_pi(owner, top_waiter);
rt_mutex_enqueue_pi(owner, waiter);
@@ -941,7 +940,7 @@ static int task_blocks_on_rt_mutex(struc
/* Store the lock on which owner is blocked or NULL */
next_lock = task_blocked_on_lock(owner);

- raw_spin_unlock_irqrestore(&owner->pi_lock, flags);
+ raw_spin_unlock(&owner->pi_lock);
/*
* Even if full deadlock detection is on, if the owner is not
* blocked itself, we can avoid finding this out in the chain
@@ -957,12 +956,12 @@ static int task_blocks_on_rt_mutex(struc
*/
get_task_struct(owner);

- raw_spin_unlock(&lock->wait_lock);
+ raw_spin_unlock_irq(&lock->wait_lock);

res = rt_mutex_adjust_prio_chain(owner, chwalk, lock,
next_lock, waiter, task);

- raw_spin_lock(&lock->wait_lock);
+ raw_spin_lock_irq(&lock->wait_lock);

return res;
}
@@ -977,9 +976,8 @@ static void mark_wakeup_next_waiter(stru
struct rt_mutex *lock)
{
struct rt_mutex_waiter *waiter;
- unsigned long flags;

- raw_spin_lock_irqsave(&current->pi_lock, flags);
+ raw_spin_lock(&current->pi_lock);

waiter = rt_mutex_top_waiter(lock);

@@ -1001,7 +999,7 @@ static void mark_wakeup_next_waiter(stru
*/
lock->owner = (void *) RT_MUTEX_HAS_WAITERS;

- raw_spin_unlock_irqrestore(&current->pi_lock, flags);
+ raw_spin_unlock(&current->pi_lock);

wake_q_add(wake_q, waiter->task);
}
@@ -1018,12 +1016,11 @@ static void remove_waiter(struct rt_mute
bool is_top_waiter = (waiter == rt_mutex_top_waiter(lock));
struct task_struct *owner = rt_mutex_owner(lock);
struct rt_mutex *next_lock;
- unsigned long flags;

- raw_spin_lock_irqsave(&current->pi_lock, flags);
+ raw_spin_lock(&current->pi_lock);
rt_mutex_dequeue(lock, waiter);
current->pi_blocked_on = NULL;
- raw_spin_unlock_irqrestore(&current->pi_lock, flags);
+ raw_spin_unlock(&current->pi_lock);

/*
* Only update priority if the waiter was the highest priority
@@ -1032,7 +1029,7 @@ static void remove_waiter(struct rt_mute
if (!owner || !is_top_waiter)
return;

- raw_spin_lock_irqsave(&owner->pi_lock, flags);
+ raw_spin_lock(&owner->pi_lock);

rt_mutex_dequeue_pi(owner, waiter);

@@ -1044,7 +1041,7 @@ static void remove_waiter(struct rt_mute
/* Store the lock on which owner is blocked or NULL */
next_lock = task_blocked_on_lock(owner);

- raw_spin_unlock_irqrestore(&owner->pi_lock, flags);
+ raw_spin_unlock(&owner->pi_lock);

/*
* Don't walk the chain, if the owner task is not blocked
@@ -1056,12 +1053,12 @@ static void remove_waiter(struct rt_mute
/* gets dropped in rt_mutex_adjust_prio_chain()! */
get_task_struct(owner);

- raw_spin_unlock(&lock->wait_lock);
+ raw_spin_unlock_irq(&lock->wait_lock);

rt_mutex_adjust_prio_chain(owner, RT_MUTEX_MIN_CHAINWALK, lock,
next_lock, NULL, current);

- raw_spin_lock(&lock->wait_lock);
+ raw_spin_lock_irq(&lock->wait_lock);
}

/*
@@ -1129,13 +1126,13 @@ static int __sched
break;
}

- raw_spin_unlock(&lock->wait_lock);
+ raw_spin_unlock_irq(&lock->wait_lock);

debug_rt_mutex_print_deadlock(waiter);

schedule();

- raw_spin_lock(&lock->wait_lock);
+ raw_spin_lock_irq(&lock->wait_lock);
set_current_state(state);
}

@@ -1172,17 +1169,26 @@ rt_mutex_slowlock(struct rt_mutex *lock,
enum rtmutex_chainwalk chwalk)
{
struct rt_mutex_waiter waiter;
+ unsigned long flags;
int ret = 0;

debug_rt_mutex_init_waiter(&waiter);
RB_CLEAR_NODE(&waiter.pi_tree_entry);
RB_CLEAR_NODE(&waiter.tree_entry);

- raw_spin_lock(&lock->wait_lock);
+ /*
+ * Technically we could use raw_spin_[un]lock_irq() here, but this can
+ * be called in early boot if the cmpxchg() fast path is disabled
+ * (debug, no architecture support). In this case we will acquire the
+ * rtmutex with lock->wait_lock held. But we cannot unconditionally
+ * enable interrupts in that early boot case. So we need to use the
+ * irqsave/restore variants.
+ */
+ raw_spin_lock_irqsave(&lock->wait_lock, flags);

/* Try to acquire the lock again: */
if (try_to_take_rt_mutex(lock, current, NULL)) {
- raw_spin_unlock(&lock->wait_lock);
+ raw_spin_unlock_irqrestore(&lock->wait_lock, flags);
return 0;
}

@@ -1211,7 +1217,7 @@ rt_mutex_slowlock(struct rt_mutex *lock,
*/
fixup_rt_mutex_waiters(lock);

- raw_spin_unlock(&lock->wait_lock);
+ raw_spin_unlock_irqrestore(&lock->wait_lock, flags);

/* Remove pending timer: */
if (unlikely(timeout))
@@ -1227,6 +1233,7 @@ rt_mutex_slowlock(struct rt_mutex *lock,
*/
static inline int rt_mutex_slowtrylock(struct rt_mutex *lock)
{
+ unsigned long flags;
int ret;

/*
@@ -1241,7 +1248,7 @@ static inline int rt_mutex_slowtrylock(s
* The mutex has currently no owner. Lock the wait lock and
* try to acquire the lock.
*/
- raw_spin_lock(&lock->wait_lock);
+ raw_spin_lock_irqsave(&lock->wait_lock, flags);

ret = try_to_take_rt_mutex(lock, current, NULL);

@@ -1251,7 +1258,7 @@ static inline int rt_mutex_slowtrylock(s
*/
fixup_rt_mutex_waiters(lock);

- raw_spin_unlock(&lock->wait_lock);
+ raw_spin_unlock_irqrestore(&lock->wait_lock, flags);

return ret;
}
@@ -1263,7 +1270,9 @@ static inline int rt_mutex_slowtrylock(s
static bool __sched rt_mutex_slowunlock(struct rt_mutex *lock,
struct wake_q_head *wake_q)
{
- raw_spin_lock(&lock->wait_lock);
+ unsigned long flags;
+
+ raw_spin_lock_irqsave(&lock->wait_lock, flags);

debug_rt_mutex_unlock(lock);

@@ -1302,10 +1311,10 @@ static bool __sched rt_mutex_slowunlock(
*/
while (!rt_mutex_has_waiters(lock)) {
/* Drops lock->wait_lock ! */
- if (unlock_rt_mutex_safe(lock) == true)
+ if (unlock_rt_mutex_safe(lock, flags) == true)
return false;
/* Relock the rtmutex and try again */
- raw_spin_lock(&lock->wait_lock);
+ raw_spin_lock_irqsave(&lock->wait_lock, flags);
}

/*
@@ -1316,7 +1325,7 @@ static bool __sched rt_mutex_slowunlock(
*/
mark_wakeup_next_waiter(wake_q, lock);

- raw_spin_unlock(&lock->wait_lock);
+ raw_spin_unlock_irqrestore(&lock->wait_lock, flags);

/* check PI boosting */
return true;
@@ -1596,10 +1605,10 @@ int rt_mutex_start_proxy_lock(struct rt_
{
int ret;

- raw_spin_lock(&lock->wait_lock);
+ raw_spin_lock_irq(&lock->wait_lock);

if (try_to_take_rt_mutex(lock, task, NULL)) {
- raw_spin_unlock(&lock->wait_lock);
+ raw_spin_unlock_irq(&lock->wait_lock);
return 1;
}

@@ -1620,7 +1629,7 @@ int rt_mutex_start_proxy_lock(struct rt_
if (unlikely(ret))
remove_waiter(lock, waiter);

- raw_spin_unlock(&lock->wait_lock);
+ raw_spin_unlock_irq(&lock->wait_lock);

debug_rt_mutex_print_deadlock(waiter);

@@ -1668,7 +1677,7 @@ int rt_mutex_finish_proxy_lock(struct rt
{
int ret;

- raw_spin_lock(&lock->wait_lock);
+ raw_spin_lock_irq(&lock->wait_lock);

set_current_state(TASK_INTERRUPTIBLE);

@@ -1684,7 +1693,7 @@ int rt_mutex_finish_proxy_lock(struct rt
*/
fixup_rt_mutex_waiters(lock);

- raw_spin_unlock(&lock->wait_lock);
+ raw_spin_unlock_irq(&lock->wait_lock);

return ret;
}