Re: [PATCH 1/2] locking: Use spin primitives for busy loops

From: Peter Zijlstra
Date: Fri Sep 01 2017 - 08:23:33 EST



This really should have been many patches.

On Sun, Aug 20, 2017 at 07:25:01PM +1000, Nicholas Piggin wrote:

> @@ -108,12 +109,10 @@ static inline unsigned __read_seqcount_begin(const seqcount_t *s)
> {
> unsigned ret;
>
> -repeat:
> ret = READ_ONCE(s->sequence);
> - if (unlikely(ret & 1)) {
> - cpu_relax();
> - goto repeat;
> - }
> + if (unlikely(ret & 1))
> + spin_until_cond( !((ret = READ_ONCE(s->sequence)) & 1) );
> +

That seems to suggest something like: cond_load(), similar to
smp_cond_load_acquire(), except without the smp/acquire parts.

> return ret;
> }
>
> diff --git a/kernel/locking/mcs_spinlock.h b/kernel/locking/mcs_spinlock.h
> index 6a385aabcce7..a91a0cc46a4c 100644
> --- a/kernel/locking/mcs_spinlock.h
> +++ b/kernel/locking/mcs_spinlock.h
> @@ -27,8 +27,7 @@ struct mcs_spinlock {
> */
> #define arch_mcs_spin_lock_contended(l) \
> do { \
> - while (!(smp_load_acquire(l))) \
> - cpu_relax(); \
> + spin_until_cond(smp_load_acquire(l)); \
> } while (0)

Torn on whether is makes sense to use smp_cond_load_acquire() here
instead. That avoids issuing the barrier on each loop.

> #endif
>
> @@ -107,8 +106,7 @@ void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
> if (likely(cmpxchg_release(lock, node, NULL) == node))
> return;
> /* Wait until the next pointer is set */
> - while (!(next = READ_ONCE(node->next)))
> - cpu_relax();
> + spin_until_cond((next = READ_ONCE(node->next)) != 0);
> }
>
> /* Pass lock to next waiter. */

Hmm, you could've used that same pattern above too I suppose,

spin_until_cond(!((ret = READ_ONCE(s->sequence)) & 1));

> diff --git a/kernel/locking/mutex.c b/kernel/locking/mutex.c
> index 858a07590e39..0ffa1cd7f12b 100644
> --- a/kernel/locking/mutex.c
> +++ b/kernel/locking/mutex.c
> @@ -427,6 +427,7 @@ bool mutex_spin_on_owner(struct mutex *lock, struct task_struct *owner,
> bool ret = true;
>
> rcu_read_lock();
> + spin_begin();
> while (__mutex_owner(lock) == owner) {
> /*
> * Ensure we emit the owner->on_cpu, dereference _after_
> @@ -450,8 +451,9 @@ bool mutex_spin_on_owner(struct mutex *lock, struct task_struct *owner,
> break;
> }
>
> - cpu_relax();
> + spin_cpu_relax();
> }
> + spin_end();
> rcu_read_unlock();

Since we just did a trylock we expect to wait, which is why we don't do
a condition test before spin_begin(), right?

>
> return ret;
> @@ -532,6 +534,7 @@ mutex_optimistic_spin(struct mutex *lock, struct ww_acquire_ctx *ww_ctx,
> goto fail;
> }
>
> + spin_begin();
> for (;;) {
> struct task_struct *owner;
>
> @@ -553,8 +556,9 @@ mutex_optimistic_spin(struct mutex *lock, struct ww_acquire_ctx *ww_ctx,
> * memory barriers as we'll eventually observe the right
> * values at the cost of a few extra spins.
> */
> - cpu_relax();
> + spin_cpu_relax();
> }
> + spin_end();
>
> if (!waiter)
> osq_unlock(&lock->osq);
> @@ -563,6 +567,8 @@ mutex_optimistic_spin(struct mutex *lock, struct ww_acquire_ctx *ww_ctx,
>
>
> fail_unlock:
> + spin_end();
> +
> if (!waiter)
> osq_unlock(&lock->osq);
>

The same, we just did a trylock before calling this.

However!, I think you just created a nested spin_begin()/spin_end,
looking at how PPC implements that, this doesn't look right.

> diff --git a/kernel/locking/osq_lock.c b/kernel/locking/osq_lock.c
> index a3167941093b..9dd58bbe60b7 100644
> --- a/kernel/locking/osq_lock.c
> +++ b/kernel/locking/osq_lock.c
> @@ -53,6 +53,7 @@ osq_wait_next(struct optimistic_spin_queue *lock,
> */
> old = prev ? prev->cpu : OSQ_UNLOCKED_VAL;
>
> + spin_begin();
> for (;;) {
> if (atomic_read(&lock->tail) == curr &&
> atomic_cmpxchg_acquire(&lock->tail, curr, old) == curr) {
> @@ -80,8 +81,9 @@ osq_wait_next(struct optimistic_spin_queue *lock,
> break;
> }
>
> - cpu_relax();
> + spin_cpu_relax();
> }
> + spin_end();
>
> return next;
> }

This however doesn't have a fast-path, we unconditionally do
spin_begin(). That looks sub-optimal. From looking at PPC that drops the
HT priority even though we might not end up waiting at all.


> @@ -107,6 +109,8 @@ bool osq_lock(struct optimistic_spin_queue *lock)
> if (old == OSQ_UNLOCKED_VAL)
> return true;
>
> + spin_begin();
> +
> prev = decode_cpu(old);
> node->prev = prev;
> WRITE_ONCE(prev->next, node);

Why so early? You again do HMT_low() even though we might not hit the
cpu_relax().

> @@ -129,8 +133,9 @@ bool osq_lock(struct optimistic_spin_queue *lock)
> if (need_resched() || vcpu_is_preempted(node_cpu(node->prev)))
> goto unqueue;
>
> - cpu_relax();
> + spin_cpu_relax();
> }
> + spin_end();
> return true;
>
> unqueue:



> @@ -152,10 +157,12 @@ bool osq_lock(struct optimistic_spin_queue *lock)
> * in which case we should observe @node->locked becomming
> * true.
> */
> - if (smp_load_acquire(&node->locked))
> + if (smp_load_acquire(&node->locked)) {
> + spin_end();
> return true;
> + }
>
> - cpu_relax();
> + spin_cpu_relax();
>
> /*
> * Or we race against a concurrent unqueue()'s step-B, in which
> @@ -164,6 +171,8 @@ bool osq_lock(struct optimistic_spin_queue *lock)
> prev = READ_ONCE(node->prev);
> }
>
> + spin_end();
> +

So here we did all of step-A in low-prio, that doesn't look right.

> /*
> * Step - B -- stabilize @next
> *
> diff --git a/kernel/locking/qrwlock.c b/kernel/locking/qrwlock.c
> index 2655f26ec882..186ff495097d 100644
> --- a/kernel/locking/qrwlock.c
> +++ b/kernel/locking/qrwlock.c
> @@ -54,10 +54,12 @@ struct __qrwlock {
> static __always_inline void
> rspin_until_writer_unlock(struct qrwlock *lock, u32 cnts)
> {
> + spin_begin();
> while ((cnts & _QW_WMASK) == _QW_LOCKED) {
> - cpu_relax();
> + spin_cpu_relax();
> cnts = atomic_read_acquire(&lock->cnts);
> }
> + spin_end();
> }

Again, unconditional :/

> /**
> @@ -124,6 +126,7 @@ void queued_write_lock_slowpath(struct qrwlock *lock)
> * Set the waiting flag to notify readers that a writer is pending,
> * or wait for a previous writer to go away.
> */
> + spin_begin();
> for (;;) {
> struct __qrwlock *l = (struct __qrwlock *)lock;
>
> @@ -131,7 +134,7 @@ void queued_write_lock_slowpath(struct qrwlock *lock)
> (cmpxchg_relaxed(&l->wmode, 0, _QW_WAITING) == 0))
> break;
>
> - cpu_relax();
> + spin_cpu_relax();
> }
> /* When no more readers, set the locked flag */
> @@ -142,8 +145,10 @@ void queued_write_lock_slowpath(struct qrwlock *lock)
> _QW_LOCKED) == _QW_WAITING))
> break;
>
> - cpu_relax();
> + spin_cpu_relax();
> }
> + spin_end();
> +
> unlock:
> arch_spin_unlock(&lock->wait_lock);
> }

Would not something like:

do {
spin_until_cond(!READ_ONCE(l->wmode));
} while (try_cmpxchg_relaxed(&l->wmode, 0, _QW_WAITING));

Be a better pattern here? Then we have a fast-path without low-medium
flips.


> diff --git a/kernel/locking/qspinlock_paravirt.h b/kernel/locking/qspinlock_paravirt.h
> index 4ccfcaae5b89..88817e41fadf 100644
> --- a/kernel/locking/qspinlock_paravirt.h
> +++ b/kernel/locking/qspinlock_paravirt.h
> @@ -293,15 +293,19 @@ static void pv_wait_node(struct mcs_spinlock *node, struct mcs_spinlock *prev)
> bool wait_early;
>
> for (;;) {
> + spin_begin();

again, unconditional..

> for (wait_early = false, loop = SPIN_THRESHOLD; loop; loop--) {
> - if (READ_ONCE(node->locked))
> + if (READ_ONCE(node->locked)) {
> + spin_end();
> return;
> + }
> if (pv_wait_early(pp, loop)) {
> wait_early = true;
> break;
> }
> - cpu_relax();
> + spin_cpu_relax();
> }
> + spin_end();
>
> /*
> * Order pn->state vs pn->locked thusly:
> @@ -417,11 +421,15 @@ pv_wait_head_or_lock(struct qspinlock *lock, struct mcs_spinlock *node)
> * disable lock stealing before attempting to acquire the lock.
> */
> set_pending(lock);
> + spin_begin();

and again...

> for (loop = SPIN_THRESHOLD; loop; loop--) {
> - if (trylock_clear_pending(lock))
> + if (trylock_clear_pending(lock)) {
> + spin_end();
> goto gotlock;
> - cpu_relax();
> + }
> + spin_cpu_relax();
> }
> + spin_end();
> clear_pending(lock);
>



> diff --git a/kernel/locking/rwsem-xadd.c b/kernel/locking/rwsem-xadd.c
> index 34e727f18e49..2d0e539f1a95 100644
> --- a/kernel/locking/rwsem-xadd.c
> +++ b/kernel/locking/rwsem-xadd.c
> @@ -358,6 +358,7 @@ static noinline bool rwsem_spin_on_owner(struct rw_semaphore *sem)
> goto out;
>
> rcu_read_lock();
> + spin_begin();
> while (sem->owner == owner) {
> /*
> * Ensure we emit the owner->on_cpu, dereference _after_
> @@ -373,12 +374,14 @@ static noinline bool rwsem_spin_on_owner(struct rw_semaphore *sem)
> */
> if (!owner->on_cpu || need_resched() ||
> vcpu_is_preempted(task_cpu(owner))) {
> + spin_end();
> rcu_read_unlock();
> return false;
> }
>
> - cpu_relax();
> + spin_cpu_relax();
> }
> + spin_end();
> rcu_read_unlock();
> out:
> /*

rwsem_spin_on_owner() is different from the mutex one in that we don't
first do a trylock() which then makes this unconditional.

So I think we want the rwsem_optimistic_spin() main loop re-ordered to
first do the trylock and _then_ wait if it fails. Not first wait and
then try.

> @@ -408,6 +411,7 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem)
> * 2) readers own the lock as we can't determine if they are
> * actively running or not.
> */
> + spin_begin();
> while (rwsem_spin_on_owner(sem)) {
> /*
> * Try to acquire the lock
> @@ -432,8 +436,9 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem)
> * memory barriers as we'll eventually observe the right
> * values at the cost of a few extra spins.
> */
> - cpu_relax();
> + spin_cpu_relax();
> }
> + spin_end();
> osq_unlock(&sem->osq);
> done:
> preempt_enable();

And I think you did the recursive spin_begin thing again here.