Re: [PATCH 1/2] locking: Use spin primitives for busy loops

From: Nicholas Piggin
Date: Tue Sep 05 2017 - 20:09:02 EST


Hi Peter,

Thanks for the very good reviews, sorry been a bit busy and I figure this
didn't get time to make 4.14 so I'll take your comments and submit again
for next window.

On Fri, 1 Sep 2017 14:23:20 +0200
Peter Zijlstra <peterz@xxxxxxxxxxxxx> wrote:

> This really should have been many patches.

Sure, will do.

>
> On Sun, Aug 20, 2017 at 07:25:01PM +1000, Nicholas Piggin wrote:
>
> > @@ -108,12 +109,10 @@ static inline unsigned __read_seqcount_begin(const seqcount_t *s)
> > {
> > unsigned ret;
> >
> > -repeat:
> > ret = READ_ONCE(s->sequence);
> > - if (unlikely(ret & 1)) {
> > - cpu_relax();
> > - goto repeat;
> > - }
> > + if (unlikely(ret & 1))
> > + spin_until_cond( !((ret = READ_ONCE(s->sequence)) & 1) );
> > +
>
> That seems to suggest something like: cond_load(), similar to
> smp_cond_load_acquire(), except without the smp/acquire parts.
>
> > return ret;
> > }
> >
> > diff --git a/kernel/locking/mcs_spinlock.h b/kernel/locking/mcs_spinlock.h
> > index 6a385aabcce7..a91a0cc46a4c 100644
> > --- a/kernel/locking/mcs_spinlock.h
> > +++ b/kernel/locking/mcs_spinlock.h
> > @@ -27,8 +27,7 @@ struct mcs_spinlock {
> > */
> > #define arch_mcs_spin_lock_contended(l) \
> > do { \
> > - while (!(smp_load_acquire(l))) \
> > - cpu_relax(); \
> > + spin_until_cond(smp_load_acquire(l)); \
> > } while (0)
>
> Torn on whether is makes sense to use smp_cond_load_acquire() here
> instead. That avoids issuing the barrier on each loop.

Oh, possibly. I'll take a look at the asm and maybe you have more
opinions when I resubmit as an individual patch.

>
> > #endif
> >
> > @@ -107,8 +106,7 @@ void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
> > if (likely(cmpxchg_release(lock, node, NULL) == node))
> > return;
> > /* Wait until the next pointer is set */
> > - while (!(next = READ_ONCE(node->next)))
> > - cpu_relax();
> > + spin_until_cond((next = READ_ONCE(node->next)) != 0);
> > }
> >
> > /* Pass lock to next waiter. */
>
> Hmm, you could've used that same pattern above too I suppose,
>
> spin_until_cond(!((ret = READ_ONCE(s->sequence)) & 1));
>
> > diff --git a/kernel/locking/mutex.c b/kernel/locking/mutex.c
> > index 858a07590e39..0ffa1cd7f12b 100644
> > --- a/kernel/locking/mutex.c
> > +++ b/kernel/locking/mutex.c
> > @@ -427,6 +427,7 @@ bool mutex_spin_on_owner(struct mutex *lock, struct task_struct *owner,
> > bool ret = true;
> >
> > rcu_read_lock();
> > + spin_begin();
> > while (__mutex_owner(lock) == owner) {
> > /*
> > * Ensure we emit the owner->on_cpu, dereference _after_
> > @@ -450,8 +451,9 @@ bool mutex_spin_on_owner(struct mutex *lock, struct task_struct *owner,
> > break;
> > }
> >
> > - cpu_relax();
> > + spin_cpu_relax();
> > }
> > + spin_end();
> > rcu_read_unlock();
>
> Since we just did a trylock we expect to wait, which is why we don't do
> a condition test before spin_begin(), right?

Yes. This is supposed to be the pattern although as you noticed I may have
got it wrong :P

>
> >
> > return ret;
> > @@ -532,6 +534,7 @@ mutex_optimistic_spin(struct mutex *lock, struct ww_acquire_ctx *ww_ctx,
> > goto fail;
> > }
> >
> > + spin_begin();
> > for (;;) {
> > struct task_struct *owner;
> >
> > @@ -553,8 +556,9 @@ mutex_optimistic_spin(struct mutex *lock, struct ww_acquire_ctx *ww_ctx,
> > * memory barriers as we'll eventually observe the right
> > * values at the cost of a few extra spins.
> > */
> > - cpu_relax();
> > + spin_cpu_relax();
> > }
> > + spin_end();
> >
> > if (!waiter)
> > osq_unlock(&lock->osq);
> > @@ -563,6 +567,8 @@ mutex_optimistic_spin(struct mutex *lock, struct ww_acquire_ctx *ww_ctx,
> >
> >
> > fail_unlock:
> > + spin_end();
> > +
> > if (!waiter)
> > osq_unlock(&lock->osq);
> >
>
> The same, we just did a trylock before calling this.
>
> However!, I think you just created a nested spin_begin()/spin_end,
> looking at how PPC implements that, this doesn't look right.

You're right, spin_on_owner does not need the spin_begin/spin_end because
it should be always called within spinning context. Thanks!

>
> > diff --git a/kernel/locking/osq_lock.c b/kernel/locking/osq_lock.c
> > index a3167941093b..9dd58bbe60b7 100644
> > --- a/kernel/locking/osq_lock.c
> > +++ b/kernel/locking/osq_lock.c
> > @@ -53,6 +53,7 @@ osq_wait_next(struct optimistic_spin_queue *lock,
> > */
> > old = prev ? prev->cpu : OSQ_UNLOCKED_VAL;
> >
> > + spin_begin();
> > for (;;) {
> > if (atomic_read(&lock->tail) == curr &&
> > atomic_cmpxchg_acquire(&lock->tail, curr, old) == curr) {
> > @@ -80,8 +81,9 @@ osq_wait_next(struct optimistic_spin_queue *lock,
> > break;
> > }
> >
> > - cpu_relax();
> > + spin_cpu_relax();
> > }
> > + spin_end();
> >
> > return next;
> > }
>
> This however doesn't have a fast-path, we unconditionally do
> spin_begin(). That looks sub-optimal. From looking at PPC that drops the
> HT priority even though we might not end up waiting at all.

Okay. Should we have a little spin loop at the end there which spins
on lock->tail == curr || node->next ?

>
>
> > @@ -107,6 +109,8 @@ bool osq_lock(struct optimistic_spin_queue *lock)
> > if (old == OSQ_UNLOCKED_VAL)
> > return true;
> >
> > + spin_begin();
> > +
> > prev = decode_cpu(old);
> > node->prev = prev;
> > WRITE_ONCE(prev->next, node);
>
> Why so early? You again do HMT_low() even though we might not hit the
> cpu_relax().

I was thinking this is a general slowpath, but I guess we don't want to
go to low priority if we are to unqueue and reschedule. Thanks.

>
> > @@ -129,8 +133,9 @@ bool osq_lock(struct optimistic_spin_queue *lock)
> > if (need_resched() || vcpu_is_preempted(node_cpu(node->prev)))
> > goto unqueue;
> >
> > - cpu_relax();
> > + spin_cpu_relax();
> > }
> > + spin_end();
> > return true;
> >
> > unqueue:
>
>
>
> > @@ -152,10 +157,12 @@ bool osq_lock(struct optimistic_spin_queue *lock)
> > * in which case we should observe @node->locked becomming
> > * true.
> > */
> > - if (smp_load_acquire(&node->locked))
> > + if (smp_load_acquire(&node->locked)) {
> > + spin_end();
> > return true;
> > + }
> >
> > - cpu_relax();
> > + spin_cpu_relax();
> >
> > /*
> > * Or we race against a concurrent unqueue()'s step-B, in which
> > @@ -164,6 +171,8 @@ bool osq_lock(struct optimistic_spin_queue *lock)
> > prev = READ_ONCE(node->prev);
> > }
> >
> > + spin_end();
> > +
>
> So here we did all of step-A in low-prio, that doesn't look right.

Yep, will fix.

>
> > /*
> > * Step - B -- stabilize @next
> > *
> > diff --git a/kernel/locking/qrwlock.c b/kernel/locking/qrwlock.c
> > index 2655f26ec882..186ff495097d 100644
> > --- a/kernel/locking/qrwlock.c
> > +++ b/kernel/locking/qrwlock.c
> > @@ -54,10 +54,12 @@ struct __qrwlock {
> > static __always_inline void
> > rspin_until_writer_unlock(struct qrwlock *lock, u32 cnts)
> > {
> > + spin_begin();
> > while ((cnts & _QW_WMASK) == _QW_LOCKED) {
> > - cpu_relax();
> > + spin_cpu_relax();
> > cnts = atomic_read_acquire(&lock->cnts);
> > }
> > + spin_end();
> > }
>
> Again, unconditional :/

This is not always a slowpath?

>
> > /**
> > @@ -124,6 +126,7 @@ void queued_write_lock_slowpath(struct qrwlock *lock)
> > * Set the waiting flag to notify readers that a writer is pending,
> > * or wait for a previous writer to go away.
> > */
> > + spin_begin();
> > for (;;) {
> > struct __qrwlock *l = (struct __qrwlock *)lock;
> >
> > @@ -131,7 +134,7 @@ void queued_write_lock_slowpath(struct qrwlock *lock)
> > (cmpxchg_relaxed(&l->wmode, 0, _QW_WAITING) == 0))
> > break;
> >
> > - cpu_relax();
> > + spin_cpu_relax();
> > }
> > /* When no more readers, set the locked flag */
> > @@ -142,8 +145,10 @@ void queued_write_lock_slowpath(struct qrwlock *lock)
> > _QW_LOCKED) == _QW_WAITING))
> > break;
> >
> > - cpu_relax();
> > + spin_cpu_relax();
> > }
> > + spin_end();
> > +
> > unlock:
> > arch_spin_unlock(&lock->wait_lock);
> > }
>
> Would not something like:
>
> do {
> spin_until_cond(!READ_ONCE(l->wmode));
> } while (try_cmpxchg_relaxed(&l->wmode, 0, _QW_WAITING));
>
> Be a better pattern here? Then we have a fast-path without low-medium
> flips.

Could be so, I'll take a look.


> > diff --git a/kernel/locking/qspinlock_paravirt.h b/kernel/locking/qspinlock_paravirt.h
> > index 4ccfcaae5b89..88817e41fadf 100644
> > --- a/kernel/locking/qspinlock_paravirt.h
> > +++ b/kernel/locking/qspinlock_paravirt.h
> > @@ -293,15 +293,19 @@ static void pv_wait_node(struct mcs_spinlock *node, struct mcs_spinlock *prev)
> > bool wait_early;
> >
> > for (;;) {
> > + spin_begin();
>
> again, unconditional..

Hmm, also thought could be expected to spin here. I'll try to understand
the code better.

> > @@ -373,12 +374,14 @@ static noinline bool rwsem_spin_on_owner(struct rw_semaphore *sem)
> > */
> > if (!owner->on_cpu || need_resched() ||
> > vcpu_is_preempted(task_cpu(owner))) {
> > + spin_end();
> > rcu_read_unlock();
> > return false;
> > }
> >
> > - cpu_relax();
> > + spin_cpu_relax();
> > }
> > + spin_end();
> > rcu_read_unlock();
> > out:
> > /*
>
> rwsem_spin_on_owner() is different from the mutex one in that we don't
> first do a trylock() which then makes this unconditional.
>
> So I think we want the rwsem_optimistic_spin() main loop re-ordered to
> first do the trylock and _then_ wait if it fails. Not first wait and
> then try.

Okay thanks.

>
> > @@ -408,6 +411,7 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem)
> > * 2) readers own the lock as we can't determine if they are
> > * actively running or not.
> > */
> > + spin_begin();
> > while (rwsem_spin_on_owner(sem)) {
> > /*
> > * Try to acquire the lock
> > @@ -432,8 +436,9 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem)
> > * memory barriers as we'll eventually observe the right
> > * values at the cost of a few extra spins.
> > */
> > - cpu_relax();
> > + spin_cpu_relax();
> > }
> > + spin_end();
> > osq_unlock(&sem->osq);
> > done:
> > preempt_enable();
>
> And I think you did the recursive spin_begin thing again here.

Much appreciated! Thanks Peter I'll split up and resubmit after the
merge window closes.

Thanks,
Nick