Re: [BUG]locking/rwsem: only clean RWSEM_FLAG_HANDOFF when already set
From: Peter Zijlstra
Date: Thu Nov 11 2021 - 15:27:05 EST
On Thu, Nov 11, 2021 at 02:36:52PM -0500, Waiman Long wrote:
> @@ -434,6 +430,7 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
> if (!(oldcount & RWSEM_FLAG_HANDOFF) &&
> time_after(jiffies, waiter->timeout)) {
> adjustment -= RWSEM_FLAG_HANDOFF;
> + waiter->handoff_set = true;
> lockevent_inc(rwsem_rlock_handoff);
> }
>
Do we really need this flag? Wouldn't it be the same as waiter-is-first
AND sem-has-handoff ?
> static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
> + struct rwsem_waiter *waiter)
> {
> long count, new;
> + bool first = rwsem_first_waiter(sem) == waiter;
flip those lines for reverse xmas please
>
> lockdep_assert_held(&sem->wait_lock);
>
> @@ -546,13 +541,14 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
> do {
> bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
>
> - if (has_handoff && wstate == WRITER_NOT_FIRST)
> + if (has_handoff && !first)
> return false;
>
> new = count;
>
> if (count & RWSEM_LOCK_MASK) {
> - if (has_handoff || (wstate != WRITER_HANDOFF))
> + if (has_handoff || (!waiter->rt_task &&
> + !time_after(jiffies, waiter->timeout)))
Does ->rt_task really help over rt_task(current) ? I suppose there's an
argument for locality, but that should be pretty much it, no?
> return false;
>
> new |= RWSEM_FLAG_HANDOFF;
> @@ -889,6 +888,24 @@ rwsem_spin_on_owner(struct rw_semaphore *sem)
> }
> #endif
>
> +/*
> + * Common code to handle rwsem flags in out_nolock path with wait_lock held.
> + */
> +static inline void rwsem_out_nolock_clear_flags(struct rw_semaphore *sem,
> + struct rwsem_waiter *waiter)
> +{
> + long flags = 0;
> +
> + list_del(&waiter->list);
> + if (list_empty(&sem->wait_list))
> + flags = RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS;
> + else if (waiter->handoff_set)
> + flags = RWSEM_FLAG_HANDOFF;
> +
> + if (flags)
> + atomic_long_andnot(flags, &sem->count);
> +}
Right, so I like sharing this between the two _slowpath functions, that
makes sense.
The difference between this and my approach is that I unconditionally
clear HANDOFF when @waiter was the first. Because if it was set, it
must've been ours, and if it wasn't set, clearing it doesn't really hurt
much. This is an unlikely path, I don't think the potentially extra
atomic is an issue here.
> +
> /*
> * Wait for the read lock to be granted
> */
> @@ -936,6 +953,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
> waiter.task = current;
> waiter.type = RWSEM_WAITING_FOR_READ;
> waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
> + waiter.handoff_set = false;
Forgot to set rt_task
>
> raw_spin_lock_irq(&sem->wait_lock);
> if (list_empty(&sem->wait_list)) {
> @@ -1038,16 +1051,13 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
> waiter.task = current;
> waiter.type = RWSEM_WAITING_FOR_WRITE;
> waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
Forget to set handoff_set
> + waiter.rt_task = rt_task(current);
>
> raw_spin_lock_irq(&sem->wait_lock);
Again, I'm not convinced we need these variables.
> @@ -1083,13 +1093,16 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
> /* wait until we successfully acquire the lock */
> set_current_state(state);
> for (;;) {
> - if (rwsem_try_write_lock(sem, wstate)) {
> + if (rwsem_try_write_lock(sem, &waiter)) {
> /* rwsem_try_write_lock() implies ACQUIRE on success */
> break;
> }
>
> raw_spin_unlock_irq(&sem->wait_lock);
>
> + if (signal_pending_state(state, current))
> + goto out_nolock;
> +
> /*
> * After setting the handoff bit and failing to acquire
> * the lock, attempt to spin on owner to accelerate lock
> @@ -1098,7 +1111,7 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
> * In this case, we attempt to acquire the lock again
> * without sleeping.
> */
> - if (wstate == WRITER_HANDOFF) {
> + if (waiter.handoff_set) {
> enum owner_state owner_state;
>
> preempt_disable();
Does it matter much if we spin-wait for every first or only for handoff?
Either way around, I think spin-wait ought to terminate on sigpending
(same for mutex I suppose).
> @@ -1109,40 +1122,9 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
> goto trylock_again;
> }
>
> - /* Block until there are no active lockers. */
> - for (;;) {
> - if (signal_pending_state(state, current))
> - goto out_nolock;
> -
> - schedule();
> - lockevent_inc(rwsem_sleep_writer);
> - set_current_state(state);
> - /*
> - * If HANDOFF bit is set, unconditionally do
> - * a trylock.
> - */
> - if (wstate == WRITER_HANDOFF)
> - break;
> -
> - if ((wstate == WRITER_NOT_FIRST) &&
> - (rwsem_first_waiter(sem) == &waiter))
> - wstate = WRITER_FIRST;
> -
> - count = atomic_long_read(&sem->count);
> - if (!(count & RWSEM_LOCK_MASK))
> - break;
> -
> - /*
> - * The setting of the handoff bit is deferred
> - * until rwsem_try_write_lock() is called.
> - */
> - if ((wstate == WRITER_FIRST) && (rt_task(current) ||
> - time_after(jiffies, waiter.timeout))) {
> - wstate = WRITER_HANDOFF;
> - lockevent_inc(rwsem_wlock_handoff);
> - break;
> - }
> - }
> + schedule();
> + lockevent_inc(rwsem_sleep_writer);
> + set_current_state(state);
> trylock_again:
> raw_spin_lock_irq(&sem->wait_lock);
> }
Nice.