[PATCH v2 3/5] locking/pvqspinlock: Make pv_unhash() atomic

From: Waiman Long
Date: Tue May 31 2016 - 13:03:27 EST


Boqun Feng had come up with a scenario where the hashing/unhashing
of PV node entry may produce unexpected results when the lock holder
vCPU is racing with that of the queue head:

CPU 0 (lock holder) CPU 1 (queue head)
=================== ==================
spin_lock(): spin_lock():
pv_kick_node(): pv_wait_head_or_lock():
if (READ_ONCE(l->locked) != _Q_SLOW_VAL) {
pv_hash();

cmpxchg(&l->locked,
_Q_LOCKED_VAL, _Q_SLOW_VAL);
pv_hash();
locked = xchg(&l->locked, _Q_SLOW_VAL);
do_something(); if(...) {
}
spin_unlock():
pv_unhash();
else if (unlikely(locked == _Q_SLOW_VAL)) {
WRITE_ONCE(*lp, NULL);

In this case, both the pv_unhash() and WRITE_ONCE() will erase the
same entry leaving the extra entry around. This may cause an incorrect
lookup the next time the same lock is hashed leading to missed wakeup
of the queue head.

This patch fixes that particular problem by making pv_unhash() atomic
and replacing the WRITE_ONCE() above with the atomic pv_unhash(). So
as long as the number of pv_hash() match that of pv_unhash(), no
unwant entry will be left.

Reported-by: Boqun Feng <boqun.feng@xxxxxxxxx>
Signed-off-by: Waiman Long <Waiman.Long@xxxxxxx>
---
kernel/locking/qspinlock_paravirt.h | 71 ++++++++++++++++++++--------------
1 files changed, 42 insertions(+), 29 deletions(-)

diff --git a/kernel/locking/qspinlock_paravirt.h b/kernel/locking/qspinlock_paravirt.h
index 3df975d..54e03b9 100644
--- a/kernel/locking/qspinlock_paravirt.h
+++ b/kernel/locking/qspinlock_paravirt.h
@@ -233,10 +233,16 @@ static struct pv_node *pv_unhash(struct qspinlock *lock)
struct pv_hash_entry *he;
struct pv_node *node;

+ /*
+ * As pv_unhash() can be called from both pv_wait_head_or_lock() and
+ * __pv_queued_spin_unlock_slowpath(), it is made atomic to avoid
+ * racing.
+ */
for_each_hash_entry(he, offset, hash) {
if (READ_ONCE(he->lock) == lock) {
node = READ_ONCE(he->node);
- WRITE_ONCE(he->lock, NULL);
+ if (cmpxchg(&he->lock, lock, NULL) != lock)
+ continue; /* Entry has been changed */
return node;
}
}
@@ -424,39 +430,46 @@ pv_wait_head_or_lock(struct qspinlock *lock, struct mcs_spinlock *node)

/*
* Set _Q_SLOW_VAL and hash the PV node, if necessary.
+ *
+ * We must hash before setting _Q_SLOW_VAL, such that
+ * when we observe _Q_SLOW_VAL in __pv_queued_spin_unlock()
+ * we'll be sure to be able to observe our hash entry.
+ *
+ * [S] <hash> [Rmw] l->locked == _Q_SLOW_VAL
+ * MB RMB
+ * [RmW] l->locked = _Q_SLOW_VAL [L] <unhash>
+ *
+ * Matches the smp_rmb() in __pv_queued_spin_unlock().
*/
if (READ_ONCE(l->locked) != _Q_SLOW_VAL) {
- struct qspinlock **lp = pv_hash(lock, pn);
- u8 locked;
+ u8 old;
+
+ pv_hash(lock, pn);
+ old = xchg(&l->locked, _Q_SLOW_VAL);

/*
- * We must hash before setting _Q_SLOW_VAL, such that
- * when we observe _Q_SLOW_VAL in __pv_queued_spin_unlock()
- * we'll be sure to be able to observe our hash entry.
- *
- * [S] <hash> [Rmw] l->locked == _Q_SLOW_VAL
- * MB RMB
- * [RmW] l->locked = _Q_SLOW_VAL [L] <unhash>
- *
- * Matches the smp_rmb() in __pv_queued_spin_unlock().
+ * If the old lock value isn't =_Q_LOCKED_VAL, it
+ * will be either 0 or _Q_SLOW_VAL. In both cases,
+ * this vCPU may be racing with the lock holder vCPU
+ * in pv_kick_node() and/or __pv_queued_spin_unlock().
+ * So we should use the atomic pv_unhash() to remove
+ * the unwanted node entry.
*/
- locked = xchg(&l->locked, _Q_SLOW_VAL);
- if (locked == 0) {
- /*
- * The lock was free and now we own the lock.
- * Change the lock value back to _Q_LOCKED_VAL
- * and unhash the table.
- */
- WRITE_ONCE(l->locked, _Q_LOCKED_VAL);
- WRITE_ONCE(*lp, NULL);
- clear_pending(lock);
- goto gotlock;
- } else if (unlikely(locked == _Q_SLOW_VAL)) {
- /*
- * Racing with pv_kick_node(), need to undo
- * the pv_hash().
- */
- WRITE_ONCE(*lp, NULL);
+ if (old !=_Q_LOCKED_VAL) {
+ pv_unhash(lock);
+
+ if (likely(old == 0)) {
+ /*
+ * The lock was free and now we own the
+ * lock. Change the lock value back to
+ * _Q_LOCKED_VAL as its node has been
+ * unhashed.
+ */
+ WRITE_ONCE(l->locked, _Q_LOCKED_VAL);
+ clear_pending(lock);
+ goto gotlock;
+ }
+ /* old == _Q_SLOW_VAL. */
}
}
clear_pending(lock); /* Enable lock stealing */
--
1.7.1