[PATCH v4] locking/rwsem: Make handoff bit handling more consistent

From: Waiman Long
Date: Thu Nov 11 2021 - 23:10:10 EST


There are some inconsistency in the way that the handoff bit is being
handled in readers and writers.

Firstly, when a queue head writer set the handoff bit, it will clear it
when the writer is being killed or interrupted on its way out without
acquiring the lock. That is not the case for a queue head reader. The
handoff bit will simply be inherited by the next waiter.

Secondly, in the out_nolock path of rwsem_down_read_slowpath(), both
the waiter and handoff bits are cleared if the wait queue becomes empty.
For rwsem_down_write_slowpath(), however, the handoff bit is not checked
and cleared if the wait queue is empty. This can potentially make the
handoff bit set with empty wait queue.

To make the handoff bit handling more consistent and robust,
extract out the rwsem flags handling code into a common
rwsem_out_nolock_clear_flags() function and call it from both the
reader and writer's out_nolock paths. The common function will only
use atomic_long_andnot() to clear bits to avoid possible race condition.
It will also let the next waiter inherit the handoff bit if it had been
set.

This will elminate the handoff bit set with empty wait queue case as
well as the possible race condition that may screw up the count value.

While at it, simplify the trylock for loop in rwsem_down_write_slowpath()
to make it easier to read.

Fixes: 4f23dbc1e657 ("locking/rwsem: Implement lock handoff to prevent lock starvation")
Suggested-by: Peter Zijlstra <peterz@xxxxxxxxxxxxx>
Signed-off-by: Waiman Long <longman@xxxxxxxxxx>
---
kernel/locking/rwsem.c | 120 ++++++++++++++++-------------------------
1 file changed, 47 insertions(+), 73 deletions(-)

diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index c51387a43265..f376e6eeb36a 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -104,10 +104,11 @@
* atomic_long_fetch_add() is used to obtain reader lock, whereas
* atomic_long_cmpxchg() will be used to obtain writer lock.
*
- * There are three places where the lock handoff bit may be set or cleared.
- * 1) rwsem_mark_wake() for readers.
- * 2) rwsem_try_write_lock() for writers.
- * 3) Error path of rwsem_down_write_slowpath().
+ * There are four places where the lock handoff bit may be set or cleared.
+ * 1) rwsem_mark_wake() for readers -- set, clear
+ * 2) rwsem_try_write_lock() for writers -- set, clear
+ * 3) Error path of rwsem_down_write_slowpath() -- clear
+ * 4) Error path of rwsem_down_read_slowpath() -- clear
*
* For all the above cases, wait_lock will be held. A writer must also
* be the first one in the wait_list to be eligible for setting the handoff
@@ -334,6 +335,9 @@ struct rwsem_waiter {
struct task_struct *task;
enum rwsem_waiter_type type;
unsigned long timeout;
+
+ /* Writer only, not initialized in reader */
+ bool handoff_set;
};
#define rwsem_first_waiter(sem) \
list_first_entry(&sem->wait_list, struct rwsem_waiter, list)
@@ -344,12 +348,6 @@ enum rwsem_wake_type {
RWSEM_WAKE_READ_OWNED /* Waker thread holds the read lock */
};

-enum writer_wait_state {
- WRITER_NOT_FIRST, /* Writer is not first in wait list */
- WRITER_FIRST, /* Writer is first in wait list */
- WRITER_HANDOFF /* Writer is first & handoff needed */
-};
-
/*
* The typical HZ value is either 250 or 1000. So set the minimum waiting
* time to at least 4ms or 1 jiffy (if it is higher than 4ms) in the wait
@@ -531,13 +529,11 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
* This function must be called with the sem->wait_lock held to prevent
* race conditions between checking the rwsem wait list and setting the
* sem->count accordingly.
- *
- * If wstate is WRITER_HANDOFF, it will make sure that either the handoff
- * bit is set or the lock is acquired with handoff bit cleared.
*/
static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
- enum writer_wait_state wstate)
+ struct rwsem_waiter *waiter)
{
+ bool first = rwsem_first_waiter(sem) == waiter;
long count, new;

lockdep_assert_held(&sem->wait_lock);
@@ -546,13 +542,17 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
do {
bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);

- if (has_handoff && wstate == WRITER_NOT_FIRST)
+ if (has_handoff && !first)
return false;

new = count;

if (count & RWSEM_LOCK_MASK) {
- if (has_handoff || (wstate != WRITER_HANDOFF))
+ /* May have inherited a previously set handoff bit */
+ waiter->handoff_set = has_handoff;
+
+ if (has_handoff || (!rt_task(waiter->task) &&
+ !time_after(jiffies, waiter->timeout)))
return false;

new |= RWSEM_FLAG_HANDOFF;
@@ -569,8 +569,11 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
* We have either acquired the lock with handoff bit cleared or
* set the handoff bit.
*/
- if (new & RWSEM_FLAG_HANDOFF)
+ if (new & RWSEM_FLAG_HANDOFF) {
+ waiter->handoff_set = true;
+ lockevent_inc(rwsem_wlock_handoff);
return false;
+ }

rwsem_set_owner(sem);
return true;
@@ -889,6 +892,20 @@ rwsem_spin_on_owner(struct rw_semaphore *sem)
}
#endif

+/*
+ * Common code to handle rwsem flags in out_nolock path with wait_lock held.
+ * If there is more than one waiter in the queue and the HANDOFF bit is set,
+ * the next waiter will inherit it if the first waiter is removed.
+ */
+static inline void rwsem_out_nolock_clear_flags(struct rw_semaphore *sem,
+ struct rwsem_waiter *waiter)
+{
+ list_del(&waiter->list);
+ if (list_empty(&sem->wait_list))
+ atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS,
+ &sem->count);
+}
+
/*
* Wait for the read lock to be granted
*/
@@ -1002,11 +1019,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
return sem;

out_nolock:
- list_del(&waiter.list);
- if (list_empty(&sem->wait_list)) {
- atomic_long_andnot(RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF,
- &sem->count);
- }
+ rwsem_out_nolock_clear_flags(sem, &waiter);
raw_spin_unlock_irq(&sem->wait_lock);
__set_current_state(TASK_RUNNING);
lockevent_inc(rwsem_rlock_fail);
@@ -1020,7 +1033,6 @@ static struct rw_semaphore *
rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
{
long count;
- enum writer_wait_state wstate;
struct rwsem_waiter waiter;
struct rw_semaphore *ret = sem;
DEFINE_WAKE_Q(wake_q);
@@ -1038,16 +1050,13 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_WRITE;
waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
+ waiter.handoff_set = false;

raw_spin_lock_irq(&sem->wait_lock);
-
- /* account for this before adding a new element to the list */
- wstate = list_empty(&sem->wait_list) ? WRITER_FIRST : WRITER_NOT_FIRST;
-
list_add_tail(&waiter.list, &sem->wait_list);

/* we're now waiting on the lock */
- if (wstate == WRITER_NOT_FIRST) {
+ if (rwsem_first_waiter(sem) != &waiter) {
count = atomic_long_read(&sem->count);

/*
@@ -1083,13 +1092,16 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
/* wait until we successfully acquire the lock */
set_current_state(state);
for (;;) {
- if (rwsem_try_write_lock(sem, wstate)) {
+ if (rwsem_try_write_lock(sem, &waiter)) {
/* rwsem_try_write_lock() implies ACQUIRE on success */
break;
}

raw_spin_unlock_irq(&sem->wait_lock);

+ if (signal_pending_state(state, current))
+ goto out_nolock;
+
/*
* After setting the handoff bit and failing to acquire
* the lock, attempt to spin on owner to accelerate lock
@@ -1098,7 +1110,7 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
* In this case, we attempt to acquire the lock again
* without sleeping.
*/
- if (wstate == WRITER_HANDOFF) {
+ if (waiter.handoff_set) {
enum owner_state owner_state;

preempt_disable();
@@ -1109,40 +1121,9 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
goto trylock_again;
}

- /* Block until there are no active lockers. */
- for (;;) {
- if (signal_pending_state(state, current))
- goto out_nolock;
-
- schedule();
- lockevent_inc(rwsem_sleep_writer);
- set_current_state(state);
- /*
- * If HANDOFF bit is set, unconditionally do
- * a trylock.
- */
- if (wstate == WRITER_HANDOFF)
- break;
-
- if ((wstate == WRITER_NOT_FIRST) &&
- (rwsem_first_waiter(sem) == &waiter))
- wstate = WRITER_FIRST;
-
- count = atomic_long_read(&sem->count);
- if (!(count & RWSEM_LOCK_MASK))
- break;
-
- /*
- * The setting of the handoff bit is deferred
- * until rwsem_try_write_lock() is called.
- */
- if ((wstate == WRITER_FIRST) && (rt_task(current) ||
- time_after(jiffies, waiter.timeout))) {
- wstate = WRITER_HANDOFF;
- lockevent_inc(rwsem_wlock_handoff);
- break;
- }
- }
+ schedule();
+ lockevent_inc(rwsem_sleep_writer);
+ set_current_state(state);
trylock_again:
raw_spin_lock_irq(&sem->wait_lock);
}
@@ -1156,19 +1137,12 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
out_nolock:
__set_current_state(TASK_RUNNING);
raw_spin_lock_irq(&sem->wait_lock);
- list_del(&waiter.list);
-
- if (unlikely(wstate == WRITER_HANDOFF))
- atomic_long_add(-RWSEM_FLAG_HANDOFF, &sem->count);
-
- if (list_empty(&sem->wait_list))
- atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count);
- else
+ rwsem_out_nolock_clear_flags(sem, &waiter);
+ if (!list_empty(&sem->wait_list))
rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
raw_spin_unlock_irq(&sem->wait_lock);
wake_up_q(&wake_q);
lockevent_inc(rwsem_wlock_fail);
-
return ERR_PTR(-EINTR);
}

--
2.27.0