On Fri, Jul 15, 2016 at 06:35:56PM +0200, Peter Zijlstra wrote:
On Fri, Jul 15, 2016 at 12:07:03PM +0200, Peter Zijlstra wrote:I think Waiman's patch does have the problem of two pv_hash() calls for
Urgh, brain hurt.So if we are kicked by the unlock_slowpath, and the lock is stealed byRight, let me go think about this a bit.
someone else, we need hash its node again and set l->locked to
_Q_SLOW_VAL, then enter pv_wait.
So I _think_ the below does for it but I could easily have missed yet
another case.
Waiman's patch has the problem that it can have two pv_hash() calls for
the same lock in progress and I'm thinking that means we can hit the
BUG() in pv_hash() due to that.
the same lock in progress. As I mentioned in the first version:
http://lkml.kernel.org/g/20160527074331.GB8096@insomnia
And he tried to address this in the patch #3 of this series. However,
I think what is proper here is either to reorder patch 2 and 3 or to
merge patch 2 and 3, otherwise, we are introducing a bug in the middle
of this series.
Thoughts, Waiman?
That said, I found Peter's way is much simpler and easier to understand
;-)
If we can't, it still has a problem because its not telling us either.We actually don't need this extra running state, right? Because nobody
--- a/kernel/locking/qspinlock_paravirt.h
+++ b/kernel/locking/qspinlock_paravirt.h
@@ -20,7 +20,8 @@
* native_queued_spin_unlock().
*/
-#define _Q_SLOW_VAL (3U<< _Q_LOCKED_OFFSET)
+#define _Q_HASH_VAL (3U<< _Q_LOCKED_OFFSET)
+#define _Q_SLOW_VAL (7U<< _Q_LOCKED_OFFSET)
/*
* Queue Node Adaptive Spinning
@@ -36,14 +37,11 @@
*/
#define PV_PREV_CHECK_MASK 0xff
-/*
- * Queue node uses: vcpu_running& vcpu_halted.
- * Queue head uses: vcpu_running& vcpu_hashed.
- */
enum vcpu_state {
- vcpu_running = 0,
- vcpu_halted, /* Used only in pv_wait_node */
- vcpu_hashed, /* = pv_hash'ed + vcpu_halted */
+ vcpu_node_running = 0,
+ vcpu_node_halted,
+ vcpu_head_running,
cares about the difference between two running states right now.
+ vcpu_head_halted,This does give a chance to let the lock waiter block the lock holder,
};
struct pv_node {
@@ -263,7 +261,7 @@ pv_wait_early(struct pv_node *prev, int
if ((loop& PV_PREV_CHECK_MASK) != 0)
return false;
- return READ_ONCE(prev->state) != vcpu_running;
+ return READ_ONCE(prev->state)& 1;
}
/*
@@ -311,20 +309,19 @@ static void pv_wait_node(struct mcs_spin
*
* Matches the cmpxchg() from pv_kick_node().
*/
- smp_store_mb(pn->state, vcpu_halted);
+ smp_store_mb(pn->state, vcpu_node_halted);
- if (!READ_ONCE(node->locked)) {
- qstat_inc(qstat_pv_wait_node, true);
- qstat_inc(qstat_pv_wait_early, wait_early);
- pv_wait(&pn->state, vcpu_halted);
- }
+ if (READ_ONCE(node->locked))
+ return;
+
+ qstat_inc(qstat_pv_wait_node, true);
+ qstat_inc(qstat_pv_wait_early, wait_early);
+ pv_wait(&pn->state, vcpu_node_halted);
/*
- * If pv_kick_node() changed us to vcpu_hashed, retain that
- * value so that pv_wait_head_or_lock() knows to not also try
- * to hash this lock.
+ * If pv_kick_node() advanced us, retain that state.
*/
- cmpxchg(&pn->state, vcpu_halted, vcpu_running);
+ cmpxchg(&pn->state, vcpu_node_halted, vcpu_node_running);
/*
* If the locked flag is still not set after wakeup, it is a
@@ -362,18 +359,17 @@ static void pv_kick_node(struct qspinloc
*
* Matches with smp_store_mb() and cmpxchg() in pv_wait_node()
*/
- if (cmpxchg(&pn->state, vcpu_halted, vcpu_hashed) != vcpu_halted)
+ if (cmpxchg(&pn->state, vcpu_node_halted, vcpu_head_running) != vcpu_node_halted)
return;
/*
- * Put the lock into the hash table and set the _Q_SLOW_VAL.
- *
- * As this is the same vCPU that will check the _Q_SLOW_VAL value and
- * the hash table later on at unlock time, no atomic instruction is
- * needed.
+ * See pv_wait_head_or_lock(). We have to hash and force the unlock
+ * into the slow path to deliver the actual kick for waking.
*/
- WRITE_ONCE(l->locked, _Q_SLOW_VAL);
- (void)pv_hash(lock, pn);
+ if (cmpxchg(&l->locked, _Q_LOCKED_VAL, _Q_HASH_VAL) == _Q_LOCKED_VAL) {
+ (void)pv_hash(lock, pn);
+ smp_store_release(&l->locked, _Q_SLOW_VAL);
+ }
}
/*
@@ -388,28 +384,22 @@ pv_wait_head_or_lock(struct qspinlock *l
{
struct pv_node *pn = (struct pv_node *)node;
struct __qspinlock *l = (void *)lock;
- struct qspinlock **lp = NULL;
int waitcnt = 0;
int loop;
/*
- * If pv_kick_node() already advanced our state, we don't need to
- * insert ourselves into the hash table anymore.
- */
- if (READ_ONCE(pn->state) == vcpu_hashed)
- lp = (struct qspinlock **)1;
-
- /*
* Tracking # of slowpath locking operations
*/
qstat_inc(qstat_pv_lock_slowpath, true);
for (;; waitcnt++) {
+ u8 locked;
+
/*
* Set correct vCPU state to be used by queue node wait-early
* mechanism.
*/
- WRITE_ONCE(pn->state, vcpu_running);
+ WRITE_ONCE(pn->state, vcpu_head_running);
/*
* Set the pending bit in the active lock spinning loop to
@@ -423,33 +413,38 @@ pv_wait_head_or_lock(struct qspinlock *l
}
clear_pending(lock);
+ /*
+ * We want to go sleep; ensure we're hashed so that
+ * __pv_queued_spin_unlock_slow() can find us for a wakeup.
+ */
+ locked = cmpxchg(&l->locked, _Q_LOCKED_VAL, _Q_HASH_VAL);
+ switch (locked) {
+ /*
+ * We're not hashed yet, either we're fresh from pv_wait_node()
+ * or __pv_queued_spin_unlock_slow() unhashed us but we lost
+ * the trylock to a steal and have to re-hash.
+ */
+ case _Q_LOCKED_VAL:
+ (void)pv_hash(lock, pn);
+ smp_store_release(&l->locked, _Q_SLOW_VAL);
+ break;
- if (!lp) { /* ONCE */
- lp = pv_hash(lock, pn);
+ /*
+ * pv_kick_node() is hashing us, wait for it.
+ */
+ case _Q_HASH_VAL:
+ while (READ_ONCE(l->locked) == _Q_HASH_VAL)
+ cpu_relax();
+ break;
- /*
- * We must hash before setting _Q_SLOW_VAL, such that
- * when we observe _Q_SLOW_VAL in __pv_queued_spin_unlock()
- * we'll be sure to be able to observe our hash entry.
- *
- * [S]<hash> [Rmw] l->locked == _Q_SLOW_VAL
- * MB RMB
- * [RmW] l->locked = _Q_SLOW_VAL [L]<unhash>
- *
- * Matches the smp_rmb() in __pv_queued_spin_unlock().
- */
- if (xchg(&l->locked, _Q_SLOW_VAL) == 0) {
- /*
- * The lock was free and now we own the lock.
- * Change the lock value back to _Q_LOCKED_VAL
- * and unhash the table.
- */
- WRITE_ONCE(l->locked, _Q_LOCKED_VAL);
- WRITE_ONCE(*lp, NULL);
- goto gotlock;
- }
+ /*
+ * Ooh, unlocked, try and grab it.
+ */
+ case 0:
+ continue;
}
- WRITE_ONCE(pn->state, vcpu_hashed);
+
+ WRITE_ONCE(pn->state, vcpu_head_halted);
qstat_inc(qstat_pv_wait_head, true);
qstat_inc(qstat_pv_wait_again, waitcnt);
pv_wait(&l->locked, _Q_SLOW_VAL);
@@ -480,7 +475,7 @@ __pv_queued_spin_unlock_slowpath(struct
struct __qspinlock *l = (void *)lock;
struct pv_node *node;
- if (unlikely(locked != _Q_SLOW_VAL)) {
+ if (unlikely(locked != _Q_SLOW_VAL&& locked != _Q_HASH_VAL)) {
WARN(!debug_locks_silent,
"pvqspinlock: lock 0x%lx has corrupted value 0x%x!\n",
(unsigned long)lock, atomic_read(&lock->val));
@@ -488,18 +483,17 @@ __pv_queued_spin_unlock_slowpath(struct
}
/*
- * A failed cmpxchg doesn't provide any memory-ordering guarantees,
- * so we need a barrier to order the read of the node data in
- * pv_unhash *after* we've read the lock being _Q_SLOW_VAL.
- *
- * Matches the cmpxchg() in pv_wait_head_or_lock() setting _Q_SLOW_VAL.
+ * Wait until the hash-bucket is complete.
*/
- smp_rmb();
+ while (READ_ONCE(l->locked) == _Q_HASH_VAL)
+ cpu_relax();
right? Considering a lock queue head's vcpu is preempted before it could
set the lock to _Q_SLOW_VAL.
Could we do something like this here:
if (unlikely(cmpxchg(l->locked, _Q_HASH_VAL, 0) == _Q_HASH_VAL))
return;
And in pv_wait_head_or_lock()
locked = cmpxchg(&l->locked, _Q_LOCKED_VAL, _Q_HASH_VAL);
switch(locked) {
case _Q_LOCKED_VAL:
(void)pv_hash(lock, pn);
locked = cmpxchg_release(&l->locked, _Q_HASH_VAL, _Q_SLOW_VAL);
/*
* Only the holder will change the ->locked from
* _Q_HASH_VAL to another value, if this
* happens, the holder has already released the
* lock without trying to wake the head, in this
* case, we need to unhash ourselves and there
* is a great chance we can get the locke.
*/
if (unlikely(locked != _Q_HASH_VAL)) {
pv_unhash(lock, pn);
if (!cmpxchg_relaxed(&l->locked, 0, _Q_LOCKED_VAL)
goto gotlock;
}
break;
Wrote those in my mailbox, may miss something.
Thoughts?