Re: [RFC] locking: rwbase: Take care of ordering guarantee for fastpath reader

From: Boqun Feng
Date: Wed Sep 08 2021 - 09:29:10 EST


On Wed, Sep 08, 2021 at 01:51:24PM +0200, Peter Zijlstra wrote:
[...]
> @@ -201,23 +207,30 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
> {
> struct rt_mutex_base *rtm = &rwb->rtmutex;
> unsigned long flags;
> + int readers;
>
> /* Take the rtmutex as a first step */
> if (rwbase_rtmutex_lock_state(rtm, state))
> return -EINTR;
>
> /* Force readers into slow path */
> - atomic_sub(READER_BIAS, &rwb->readers);
> + readers = atomic_sub_return_relaxed(READER_BIAS, &rwb->readers);
>
> - raw_spin_lock_irqsave(&rtm->wait_lock, flags);
> /*
> * set_current_state() for rw_semaphore
> * current_save_and_set_rtlock_wait_state() for rwlock
> */
> rwbase_set_and_save_current_state(state);
> + raw_spin_lock_irqsave(&rtm->wait_lock, flags);
>
> - /* Block until all readers have left the critical section. */
> - for (; atomic_read(&rwb->readers);) {
> + /*
> + * Block until all readers have left the critical section.
> + *
> + * In the case of !readers, the above implies TSO ordering
> + * at the very least and hence provides ACQUIRE vs the earlier
> + * atomic_sub_return_relaxed().
> + */
> + while (readers) {
> /* Optimized out for rwlocks */
> if (rwbase_signal_pending_state(state, current)) {
> __set_current_state(TASK_RUNNING);
> @@ -230,8 +243,12 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb,
> * Schedule and wait for the readers to leave the critical
> * section. The last reader leaving it wakes the waiter.
> */
> - if (atomic_read(&rwb->readers) != 0)
> + readers = atomic_read(&rwb->readers);
> + if (readers != 0)
> rwbase_schedule();
> + /*
> + * Implies smp_mb() and provides ACQUIRE for the !readers case.
> + */

->readers may get changed to non-zero here, because ->wait_lock is not
held by the writer, and there could be readers in slow-path running.
We need to re-read ->readers after holding ->wait_lock. Otherwise, we
may use an old value of ->readers, and grab a write lock while there
still exists readers.

Regards,
Boqun

> set_current_state(state);
> raw_spin_lock_irqsave(&rtm->wait_lock, flags);
> }
[...]