[tip:locking/core] locking/qspinlock, x86: Provide liveness guarantee
From: tip-bot for Peter Zijlstra
Date: Tue Oct 16 2018 - 12:06:35 EST
Commit-ID: 7aa54be2976550f17c11a1c3e3630002dea39303
Gitweb: https://git.kernel.org/tip/7aa54be2976550f17c11a1c3e3630002dea39303
Author: Peter Zijlstra <peterz@xxxxxxxxxxxxx>
AuthorDate: Wed, 26 Sep 2018 13:01:20 +0200
Committer: Ingo Molnar <mingo@xxxxxxxxxx>
CommitDate: Tue, 16 Oct 2018 17:33:54 +0200
locking/qspinlock, x86: Provide liveness guarantee
On x86 we cannot do fetch_or() with a single instruction and thus end up
using a cmpxchg loop, this reduces determinism. Replace the fetch_or()
with a composite operation: tas-pending + load.
Using two instructions of course opens a window we previously did not
have. Consider the scenario:
CPU0 CPU1 CPU2
1) lock
trylock -> (0,0,1)
2) lock
trylock /* fail */
3) unlock -> (0,0,0)
4) lock
trylock -> (0,0,1)
5) tas-pending -> (0,1,1)
load-val <- (0,1,0) from 3
6) clear-pending-set-locked -> (0,0,1)
FAIL: _2_ owners
where 5) is our new composite operation. When we consider each part of
the qspinlock state as a separate variable (as we can when
_Q_PENDING_BITS == 8) then the above is entirely possible, because
tas-pending will only RmW the pending byte, so the later load is able
to observe prior tail and lock state (but not earlier than its own
trylock, which operates on the whole word, due to coherence).
To avoid this we need 2 things:
- the load must come after the tas-pending (obviously, otherwise it
can trivially observe prior state).
- the tas-pending must be a full word RmW instruction, it cannot be an XCHGB for
example, such that we cannot observe other state prior to setting
pending.
On x86 we can realize this by using "LOCK BTS m32, r32" for
tas-pending followed by a regular load.
Note that observing later state is not a problem:
- if we fail to observe a later unlock, we'll simply spin-wait for
that store to become visible.
- if we observe a later xchg_tail(), there is no difference from that
xchg_tail() having taken place before the tas-pending.
Suggested-by: Will Deacon <will.deacon@xxxxxxx>
Reported-by: Thomas Gleixner <tglx@xxxxxxxxxxxxx>
Signed-off-by: Peter Zijlstra (Intel) <peterz@xxxxxxxxxxxxx>
Reviewed-by: Will Deacon <will.deacon@xxxxxxx>
Cc: Linus Torvalds <torvalds@xxxxxxxxxxxxxxxxxxxx>
Cc: Peter Zijlstra <peterz@xxxxxxxxxxxxx>
Cc: andrea.parri@xxxxxxxxxxxxxxxxxxxx
Cc: longman@xxxxxxxxxx
Fixes: 59fb586b4a07 ("locking/qspinlock: Remove unbounded cmpxchg() loop from locking slowpath")
Link: https://lkml.kernel.org/r/20181003130957.183726335@xxxxxxxxxxxxx
Signed-off-by: Ingo Molnar <mingo@xxxxxxxxxx>
---
arch/x86/include/asm/qspinlock.h | 15 +++++++++++++++
kernel/locking/qspinlock.c | 16 +++++++++++++++-
2 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/arch/x86/include/asm/qspinlock.h b/arch/x86/include/asm/qspinlock.h
index 3e70bed8a978..87623c6b13db 100644
--- a/arch/x86/include/asm/qspinlock.h
+++ b/arch/x86/include/asm/qspinlock.h
@@ -6,9 +6,24 @@
#include <asm/cpufeature.h>
#include <asm-generic/qspinlock_types.h>
#include <asm/paravirt.h>
+#include <asm/rmwcc.h>
#define _Q_PENDING_LOOPS (1 << 9)
+#define queued_fetch_set_pending_acquire queued_fetch_set_pending_acquire
+static __always_inline u32 queued_fetch_set_pending_acquire(struct qspinlock *lock)
+{
+ u32 val = 0;
+
+ if (GEN_BINARY_RMWcc(LOCK_PREFIX "btsl", lock->val.counter, c,
+ "I", _Q_PENDING_OFFSET))
+ val |= _Q_PENDING_VAL;
+
+ val |= atomic_read(&lock->val) & ~_Q_PENDING_MASK;
+
+ return val;
+}
+
#ifdef CONFIG_PARAVIRT_SPINLOCKS
extern void native_queued_spin_lock_slowpath(struct qspinlock *lock, u32 val);
extern void __pv_init_lock_hash(void);
diff --git a/kernel/locking/qspinlock.c b/kernel/locking/qspinlock.c
index 47cb99787e4d..341ca666bc60 100644
--- a/kernel/locking/qspinlock.c
+++ b/kernel/locking/qspinlock.c
@@ -231,6 +231,20 @@ static __always_inline u32 xchg_tail(struct qspinlock *lock, u32 tail)
}
#endif /* _Q_PENDING_BITS == 8 */
+/**
+ * queued_fetch_set_pending_acquire - fetch the whole lock value and set pending
+ * @lock : Pointer to queued spinlock structure
+ * Return: The previous lock value
+ *
+ * *,*,* -> *,1,*
+ */
+#ifndef queued_fetch_set_pending_acquire
+static __always_inline u32 queued_fetch_set_pending_acquire(struct qspinlock *lock)
+{
+ return atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val);
+}
+#endif
+
/**
* set_locked - Set the lock bit and own the lock
* @lock: Pointer to queued spinlock structure
@@ -328,7 +342,7 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
*
* 0,0,* -> 0,1,* -> 0,0,1 pending, trylock
*/
- val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val);
+ val = queued_fetch_set_pending_acquire(lock);
/*
* If we observe contention, there is a concurrent locker.