Currently readers and writers have distinctly different wait/wakeThis "if" statement if for a non-first waiter that somehow got woken up to have a chance to steal the lock. Now the handoff is done in the wake side for the first waiter, this "if" statement is not applicable and can be removed.
methods. For readers the ->count adjustment happens on the wakeup
side, while for writers the ->count adjustment happens on the wait
side.
This asymmetry is unfortunate since the wake side has an additional
guarantee -- specifically, the wake side has observed the unlocked
state, and thus it can know that speculative READER_BIAS perbutations
on ->count are just that, they will be undone.
Additionally, unifying the wait/wake methods allows sharing code.
As such, do a straight-forward transform of the writer wakeup into the
wake side.
Signed-off-by: Peter Zijlstra (Intel) <peterz@xxxxxxxxxxxxx>
---
kernel/locking/rwsem.c | 253 ++++++++++++++++++++++---------------------------
1 file changed, 115 insertions(+), 138 deletions(-)
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -107,7 +107,7 @@
*
* There are three places where the lock handoff bit may be set or cleared.
* 1) rwsem_mark_wake() for readers -- set, clear
- * 2) rwsem_try_write_lock() for writers -- set, clear
+ * 2) rwsem_writer_wake() for writers -- set, clear
* 3) rwsem_del_waiter() -- clear
*
* For all the above cases, wait_lock will be held. A writer must also
@@ -377,7 +377,7 @@ rwsem_add_waiter(struct rw_semaphore *se
/*
* Remove a waiter from the wait_list and clear flags.
*
- * Both rwsem_mark_wake() and rwsem_try_write_lock() contain a full 'copy' of
+ * Both rwsem_mark_wake() and rwsem_writer_wake() contain a full 'copy' of
* this function. Modify with care.
*
* Return: true if wait_list isn't empty and false otherwise
@@ -394,6 +394,100 @@ rwsem_del_waiter(struct rw_semaphore *se
return false;
}
+static inline void
+rwsem_waiter_wake(struct rwsem_waiter *waiter, struct wake_q_head *wake_q)
+{
+ struct task_struct *tsk;
+
+ tsk = waiter->task;
+ get_task_struct(tsk);
+
+ /*
+ * Ensure calling get_task_struct() before setting the reader
+ * waiter to nil such that rwsem_down_read_slowpath() cannot
+ * race with do_exit() by always holding a reference count
+ * to the task to wakeup.
+ */
+ smp_store_release(&waiter->task, NULL);
+ /*
+ * Ensure issuing the wakeup (either by us or someone else)
+ * after setting the reader waiter to nil.
+ */
+ wake_q_add_safe(wake_q, tsk);
+}
+
+/*
+ * This function must be called with the sem->wait_lock held to prevent
+ * race conditions between checking the rwsem wait list and setting the
+ * sem->count accordingly.
+ *
+ * Implies rwsem_del_waiter() on success.
+ */
+static void rwsem_writer_wake(struct rw_semaphore *sem,
+ struct rwsem_waiter *waiter,
+ struct wake_q_head *wake_q)
+{
+ struct rwsem_waiter *first = rwsem_first_waiter(sem);
+ long count, new;
+
+ lockdep_assert_held(&sem->wait_lock);
+
+ count = atomic_long_read(&sem->count);
+ do {
+ bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
+
+ if (has_handoff) {
+ /*
+ * Honor handoff bit and yield only when the first
+ * waiter is the one that set it. Otherwisee, we
+ * still try to acquire the rwsem.
+ */
+ if (first->handoff_set && (waiter != first))
+ return;
+ }
+
+ new = count;
+
+ if (count & RWSEM_LOCK_MASK) {
+ /*
+ * A waiter (first or not) can set the handoff bit
+ * if it is an RT task or wait in the wait queue
+ * for too long.
+ */
+ if (has_handoff || (!rt_task(waiter->task) &&
+ !time_after(jiffies, waiter->timeout)))
+ return;
+
+ new |= RWSEM_FLAG_HANDOFF;
+ } else {
+ new |= RWSEM_WRITER_LOCKED;
+ new &= ~RWSEM_FLAG_HANDOFF;
+
+ if (list_is_singular(&sem->wait_list))
+ new &= ~RWSEM_FLAG_WAITERS;
+ }
+ } while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
+
+ /*
+ * We have either acquired the lock with handoff bit cleared or set
+ * the handoff bit. Only the first waiter can have its handoff_set
+ * set here to enable optimistic spinning in slowpath loop.
+ */
+ if (new & RWSEM_FLAG_HANDOFF) {
+ first->handoff_set = true;
+ lockevent_inc(rwsem_wlock_handoff);
+ return;
+ }
+
+ /*
+ * Have rwsem_writer_wake() fully imply rwsem_del_waiter() on
+ * success.
+ */
+ list_del(&waiter->list);
+ rwsem_set_owner(sem);
+ rwsem_waiter_wake(waiter, wake_q);
+}
+
/*
* handle the lock release when processes blocked on it that can now run
* - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must
@@ -424,23 +518,12 @@ static void rwsem_mark_wake(struct rw_se
*/
waiter = rwsem_first_waiter(sem);
- if (waiter->type != RWSEM_WAITING_FOR_WRITE)
- goto wake_readers;
-
- if (wake_type == RWSEM_WAKE_ANY) {
- /*
- * Mark writer at the front of the queue for wakeup.
- * Until the task is actually later awoken later by
- * the caller, other writers are able to steal it.
- * Readers, on the other hand, will block as they
- * will notice the queued writer.
- */
- wake_q_add(wake_q, waiter->task);
- lockevent_inc(rwsem_wake_writer);
+ if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
+ if (wake_type == RWSEM_WAKE_ANY)
+ rwsem_writer_wake(sem, waiter, wake_q);
+ return;
}
- return;
-wake_readers:
/*
* No reader wakeup if there are too many of them already.
*/
@@ -547,25 +630,8 @@ static void rwsem_mark_wake(struct rw_se
atomic_long_add(adjustment, &sem->count);
/* 2nd pass */
- list_for_each_entry_safe(waiter, tmp, &wlist, list) {
- struct task_struct *tsk;
-
- tsk = waiter->task;
- get_task_struct(tsk);
-
- /*
- * Ensure calling get_task_struct() before setting the reader
- * waiter to nil such that rwsem_down_read_slowpath() cannot
- * race with do_exit() by always holding a reference count
- * to the task to wakeup.
- */
- smp_store_release(&waiter->task, NULL);
- /*
- * Ensure issuing the wakeup (either by us or someone else)
- * after setting the reader waiter to nil.
- */
- wake_q_add_safe(wake_q, tsk);
- }
+ list_for_each_entry_safe(waiter, tmp, &wlist, list)
+ rwsem_waiter_wake(waiter, wake_q);
}
/*
@@ -596,77 +662,6 @@ rwsem_del_wake_waiter(struct rw_semaphor
}
/*
- * This function must be called with the sem->wait_lock held to prevent
- * race conditions between checking the rwsem wait list and setting the
- * sem->count accordingly.
- *
- * Implies rwsem_del_waiter() on success.
- */
-static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
- struct rwsem_waiter *waiter)
-{
- struct rwsem_waiter *first = rwsem_first_waiter(sem);
- long count, new;
-
- lockdep_assert_held(&sem->wait_lock);
-
- count = atomic_long_read(&sem->count);
- do {
- bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
-
- if (has_handoff) {
- /*
- * Honor handoff bit and yield only when the first
- * waiter is the one that set it. Otherwisee, we
- * still try to acquire the rwsem.
- */
- if (first->handoff_set && (waiter != first))
- return false;
- }
-
- new = count;
-
- if (count & RWSEM_LOCK_MASK) {
- /*
- * A waiter (first or not) can set the handoff bit
- * if it is an RT task or wait in the wait queue
- * for too long.
- */
- if (has_handoff || (!rt_task(waiter->task) &&
- !time_after(jiffies, waiter->timeout)))
- return false;
-
- new |= RWSEM_FLAG_HANDOFF;
- } else {
- new |= RWSEM_WRITER_LOCKED;
- new &= ~RWSEM_FLAG_HANDOFF;
-
- if (list_is_singular(&sem->wait_list))
- new &= ~RWSEM_FLAG_WAITERS;
- }
- } while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
-
- /*
- * We have either acquired the lock with handoff bit cleared or set
- * the handoff bit. Only the first waiter can have its handoff_set
- * set here to enable optimistic spinning in slowpath loop.
- */
- if (new & RWSEM_FLAG_HANDOFF) {
- first->handoff_set = true;
- lockevent_inc(rwsem_wlock_handoff);
- return false;
- }
-
- /*
- * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
- * success.
- */
- list_del(&waiter->list);
- rwsem_set_owner(sem);
- return true;
-}
-
-/*
* The rwsem_spin_on_owner() function returns the following 4 values
* depending on the lock owner state.
* OWNER_NULL : owner is currently NULL
@@ -1072,7 +1067,7 @@ rwsem_down_read_slowpath(struct rw_semap
for (;;) {
set_current_state(state);
if (!smp_load_acquire(&waiter.task)) {
- /* Matches rwsem_mark_wake()'s smp_store_release(). */
+ /* Matches rwsem_waiter_wake()'s smp_store_release(). */
break;
}
if (signal_pending_state(state, current)) {
@@ -1143,54 +1138,36 @@ rwsem_down_write_slowpath(struct rw_sema
} else {
atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
}
+ raw_spin_unlock_irq(&sem->wait_lock);
/* wait until we successfully acquire the lock */
- set_current_state(state);
trace_contention_begin(sem, LCB_F_WRITE);
for (;;) {
- if (rwsem_try_write_lock(sem, &waiter)) {
- /* rwsem_try_write_lock() implies ACQUIRE on success */
+ set_current_state(state);
+ if (!smp_load_acquire(&waiter.task)) {
+ /* Matches rwsem_waiter_wake()'s smp_store_release(). */
break;
}
-
- raw_spin_unlock_irq(&sem->wait_lock);
-
- if (signal_pending_state(state, current))
- goto out_nolock;
-
- /*
- * After setting the handoff bit and failing to acquire
- * the lock, attempt to spin on owner to accelerate lock
- * transfer. If the previous owner is a on-cpu writer and it
- * has just released the lock, OWNER_NULL will be returned.
- * In this case, we attempt to acquire the lock again
- * without sleeping.
- */
- if (waiter.handoff_set) {
- enum owner_state owner_state;
-
- owner_state = rwsem_spin_on_owner(sem);
- if (owner_state == OWNER_NULL)
- goto trylock_again;
+ if (signal_pending_state(state, current)) {
+ raw_spin_lock_irq(&sem->wait_lock);
+ if (waiter.task)
+ goto out_nolock;
+ raw_spin_unlock_irq(&sem->wait_lock);
+ /* Ordered by sem->wait_lock against rwsem_mark_wake(). */
+ break;
}
-
schedule_preempt_disabled();
lockevent_inc(rwsem_sleep_writer);
- set_current_state(state);
-trylock_again:
- raw_spin_lock_irq(&sem->wait_lock);
}
__set_current_state(TASK_RUNNING);
- raw_spin_unlock_irq(&sem->wait_lock);
lockevent_inc(rwsem_wlock);
trace_contention_end(sem, 0);
return sem;
out_nolock:
- __set_current_state(TASK_RUNNING);
- raw_spin_lock_irq(&sem->wait_lock);
rwsem_del_wake_waiter(sem, &waiter, &wake_q);
+ __set_current_state(TASK_RUNNING);
lockevent_inc(rwsem_wlock_fail);
trace_contention_end(sem, -EINTR);
return ERR_PTR(-EINTR);