On Fri, Oct 30, 2015 at 07:26:36PM -0400, Waiman Long wrote:
@@ -431,35 +432,44 @@ queue:*urgh*, last time we had:
* sequentiality; this is because the set_locked() function below
* does not imply a full barrier.
*
+ * The PV pv_wait_head_lock function, if active, will acquire the lock
+ * and return a non-zero value. So we have to skip the
+ * smp_load_acquire() call. As the next PV queue head hasn't been
+ * designated yet, there is no way for the locked value to become
+ * _Q_SLOW_VAL. So both the redundant set_locked() and the
+ * atomic_cmpxchg_relaxed() calls will be safe. The cost of the
+ * redundant set_locked() call below should be negligible, too.
+ *
+ * If PV isn't active, 0 will be returned instead.
*/
- pv_wait_head(lock, node);
- while ((val = smp_load_acquire(&lock->val.counter))& _Q_LOCKED_PENDING_MASK)
- cpu_relax();
+ val = pv_wait_head_lock(lock, node);
+ if (!val) {
+ while ((val = smp_load_acquire(&lock->val.counter))
+ & _Q_LOCKED_PENDING_MASK)
+ cpu_relax();
+ /*
+ * Claim the lock now:
+ *
+ * 0,0 -> 0,1
+ */
+ set_locked(lock);
+ val |= _Q_LOCKED_VAL;
+ }
/*
* If the next pointer is defined, we are not tail anymore.
- * In this case, claim the spinlock& release the MCS lock.
*/
- if (next) {
- set_locked(lock);
+ if (next)
goto mcs_unlock;
- }
/*
- * claim the lock:
- *
- * n,0,0 -> 0,0,1 : lock, uncontended
- * *,0,0 -> *,0,1 : lock, contended
- *
* If the queue head is the only one in the queue (lock value == tail),
- * clear the tail code and grab the lock. Otherwise, we only need
- * to grab the lock.
+ * we have to clear the tail code.
*/
for (;;) {
- if (val != tail) {
- set_locked(lock);
+ if ((val& _Q_TAIL_MASK) != tail)
break;
- }
+
/*
* The smp_load_acquire() call above has provided the necessary
* acquire semantics required for locking. At most two
+ if (pv_wait_head_or_steal())
+ goto stolen;
while ((val = smp_load_acquire(&lock->val.counter))& _Q_LOCKED_PENDING_MASK)
cpu_relax();
...
+stolen:
while (!(next = READ_ONCE(node->next)))
cpu_relax();
...
Now you completely overhaul the native code.. what happened?
-static void pv_wait_head(struct qspinlock *lock, struct mcs_spinlock *node)OK, so we mark ourselves 'pending' such that a new lock() will not steal
+static u32 pv_wait_head_lock(struct qspinlock *lock, struct mcs_spinlock *node)
{
struct pv_node *pn = (struct pv_node *)node;
struct __qspinlock *l = (void *)lock;
@@ -276,11 +330,24 @@ static void pv_wait_head(struct qspinlock *lock, struct mcs_spinlock *node)
lp = (struct qspinlock **)1;
for (;; waitcnt++) {
+ /*
+ * Set the pending bit in the active lock spinning loop to
+ * disable lock stealing. However, the pending bit check in
+ * pv_queued_spin_trylock_unfair() and the setting/clearing
+ * of pending bit here aren't memory barriers. So a cmpxchg()
+ * is used to acquire the lock to be sure.
+ */
+ set_pending(lock);
and is forced to queue behind us.
for (loop = SPIN_THRESHOLD; loop; loop--) {Would not: cmpxchg(&l->locked_pending, _Q_PENDING_VAL, _Q_LOCKED_VAL),
- if (!READ_ONCE(l->locked))
- return;
+ if (!READ_ONCE(l->locked)&&
+ (cmpxchg(&l->locked, 0, _Q_LOCKED_VAL) == 0)) {
+ clear_pending(lock);
+ goto gotlock;
make sense to avoid the clear_pending() call?