[PATCH-tip 3/3] locking/rwsem: Stop active read lock ASAP
From: Waiman Long
Date: Wed Feb 22 2017 - 13:05:58 EST
Currently, when down_read() fails, the active read locking isn't undone
until the rwsem_down_read_failed() function grabs the wait_lock. If the
wait_lock is contended, it may takes a while to get the lock. During
that period, writer lock stealing will be disabled because of the
active read lock.
This patch will release the active read lock ASAP when either the
optimisitic spinners are present or the trylock fails so that writer
lock stealing can happen sooner.
On a 2-socket 36-core 72-thread x86-64 E5-2699 v3 system, a rwsem
microbenchmark was run with 36 locking threads (one/core) doing 100k
reader and writer lock/unlock operations each, the resulting locking
rates (avg of 3 runs) on a 4.10 kernel were 561.4 Mop/s and 588.8
Mop/s without and with the patch respectively. That was an increase
of about 5%.
Signed-off-by: Waiman Long <longman@xxxxxxxxxx>
---
kernel/locking/rwsem-xadd.c | 33 ++++++++++++++++++++++++++-------
1 file changed, 26 insertions(+), 7 deletions(-)
diff --git a/kernel/locking/rwsem-xadd.c b/kernel/locking/rwsem-xadd.c
index fb6d6f3..6080afc 100644
--- a/kernel/locking/rwsem-xadd.c
+++ b/kernel/locking/rwsem-xadd.c
@@ -418,20 +418,40 @@ static inline bool rwsem_has_spinner(struct rw_semaphore *sem)
__visible
struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
{
- long count, adjustment = -RWSEM_ACTIVE_READ_BIAS;
+ bool first_in_queue = false;
+ long count, adjustment;
struct rwsem_waiter waiter;
DEFINE_WAKE_Q(wake_q);
waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_READ;
- raw_spin_lock_irq(&sem->wait_lock);
- if (list_empty(&sem->wait_list))
+ /*
+ * Undo read bias from down_read operation to stop active locking
+ * if the lock isn't free which means it may take a while to acquire
+ * the lock or when spinners are present. Doing that after taking the
+ * wait_lock may block writer lock stealing for too long impacting
+ * performance.
+ */
+ if (rwsem_has_spinner(sem) || !raw_spin_trylock_irq(&sem->wait_lock)) {
+ atomic_long_add(-RWSEM_ACTIVE_READ_BIAS, &sem->count);
+ adjustment = 0;
+ raw_spin_lock_irq(&sem->wait_lock);
+ } else {
+ adjustment = -RWSEM_ACTIVE_READ_BIAS;
+ }
+
+ if (list_empty(&sem->wait_list)) {
adjustment += RWSEM_WAITING_BIAS;
+ first_in_queue = true;
+ }
list_add_tail(&waiter.list, &sem->wait_list);
- /* we're now waiting on the lock, but no longer actively locking */
- count = atomic_long_add_return(adjustment, &sem->count);
+ /* we're now waiting on the lock */
+ if (adjustment)
+ count = atomic_long_add_return(adjustment, &sem->count);
+ else
+ count = atomic_long_read(&sem->count);
/*
* If there are no active locks, wake the front queued process(es).
@@ -440,8 +460,7 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
* wake our own waiter to join the existing active readers !
*/
if (count == RWSEM_WAITING_BIAS ||
- (count > RWSEM_WAITING_BIAS &&
- adjustment != -RWSEM_ACTIVE_READ_BIAS))
+ (count > RWSEM_WAITING_BIAS && first_in_queue))
__rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
raw_spin_unlock_irq(&sem->wait_lock);
--
1.8.3.1