[PATCH 11/12] rwsem: wake all readers when first waiter is a reader

From: Michel Lespinasse
Date: Wed Mar 06 2013 - 18:22:48 EST


When the first queued waiter is a reader, wake all readers instead of
just those that are at the front of the queue. There are really two
motivations for this change:

- This should result in increased parallelism for workloads that mix
readers and writers.

- As we want to enable fast-path write lock stealing (without taking the
wait_lock), we don't want to spend much time counting readers to wake
while fast-path write lock stealing might still get the lock. By
maintaining a count of queued readers, and waking them all at once, we
avoid having to count readers before attempting to grand them read locks.

Regarding the implementation, a few things are worth mentioning:

- On 64-bit architectures, wait_readers fills a hole that would otherwise
be present between wait_lock and wait_list, so it doesn't actually grow
the struct rw_semaphore size.

- The only difference between the RWSEM_WAKE_ANY and RWSEM_WAKE_NO_ACTIVE
wake types was whether we need to check for write locks before granting
read locks. The updated code always starts by granting read locks, so
the difference is now moot. We can thus replace the wake_type with a
boolean 'wakewrite' value, just as in rwsem-spinlock.

Signed-off-by: Michel Lespinasse <walken@xxxxxxxxxx>

---
include/linux/rwsem.h | 2 +
lib/rwsem.c | 128 ++++++++++++++++++++------------------------------
2 files changed, 53 insertions(+), 77 deletions(-)

diff --git a/include/linux/rwsem.h b/include/linux/rwsem.h
index 8da67d625e13..be6f9649512d 100644
--- a/include/linux/rwsem.h
+++ b/include/linux/rwsem.h
@@ -25,6 +25,7 @@ struct rw_semaphore;
struct rw_semaphore {
long count;
raw_spinlock_t wait_lock;
+ unsigned int wait_readers;
struct list_head wait_list;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
@@ -58,6 +59,7 @@ static inline int rwsem_is_locked(struct rw_semaphore *sem)
#define __RWSEM_INITIALIZER(name) \
{ RWSEM_UNLOCKED_VALUE, \
__RAW_SPIN_LOCK_UNLOCKED(name.wait_lock), \
+ 0, \
LIST_HEAD_INIT((name).wait_list) \
__RWSEM_DEP_MAP_INIT(name) }

diff --git a/lib/rwsem.c b/lib/rwsem.c
index 0d50e46d5b0c..f9a57054e9f2 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -41,14 +41,6 @@ struct rwsem_waiter {
enum rwsem_waiter_type type;
};

-/* Wake types for __rwsem_do_wake(). Note that RWSEM_WAKE_NO_ACTIVE and
- * RWSEM_WAKE_READ_OWNED imply that the spinlock must have been kept held
- * since the rwsem value was observed.
- */
-#define RWSEM_WAKE_ANY 0 /* Wake whatever's at head of wait list */
-#define RWSEM_WAKE_NO_ACTIVE 1 /* rwsem was observed with no active thread */
-#define RWSEM_WAKE_READ_OWNED 2 /* rwsem was observed to be read owned */
-
/*
* handle the lock release when processes blocked on it that can now run
* - if we come here from up_xxxx(), then:
@@ -60,83 +52,64 @@ struct rwsem_waiter {
* - writers are only woken if downgrading is false
*/
static struct rw_semaphore *
-__rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
+__rwsem_do_wake(struct rw_semaphore *sem, bool wakewrite)
{
struct rwsem_waiter *waiter;
struct task_struct *tsk;
struct list_head *next;
- signed long woken, loop, adjustment;
+ signed long oldcount, readers, adjustment;

waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
- if (waiter->type != RWSEM_WAITING_FOR_WRITE)
- goto readers_only;
-
- if (wake_type == RWSEM_WAKE_READ_OWNED)
- /* Another active reader was observed, so wakeup is not
- * likely to succeed. Save the atomic op.
- */
+ if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
+ if (wakewrite)
+ /* Wake writer at the front of the queue, but do not
+ * grant it the lock yet as we want other writers
+ * to be able to steal it. Readers, on the other hand,
+ * will block as they will notice the queued writer.
+ */
+ wake_up_process(waiter->task);
goto out;
+ }

- /* Wake up the writing waiter and let the task grab the sem: */
- wake_up_process(waiter->task);
- goto out;
-
- readers_only:
- /* If we come here from up_xxxx(), another thread might have reached
- * rwsem_down_failed_common() before we acquired the spinlock and
- * woken up a waiter, making it now active. We prefer to check for
- * this first in order to not spend too much time with the spinlock
- * held if we're not going to be able to wake up readers in the end.
- *
- * Note that we do not need to update the rwsem count: any writer
- * trying to acquire rwsem will run rwsem_down_write_failed() due
- * to the waiting threads and block trying to acquire the spinlock.
- *
- * We use a dummy atomic update in order to acquire the cache line
- * exclusively since we expect to succeed and run the final rwsem
- * count adjustment pretty soon.
+ /* Writers might steal the lock before we grant it to the next reader.
+ * We grant the lock to readers ASAP so we can bail out early if a
+ * writer stole the lock.
*/
- if (wake_type == RWSEM_WAKE_ANY &&
- rwsem_atomic_update(0, sem) < RWSEM_WAITING_BIAS)
- /* Someone grabbed the sem for write already */
- goto out;
+ readers = sem->wait_readers;
+ adjustment = readers * RWSEM_ACTIVE_READ_BIAS - RWSEM_WAITING_BIAS;
+ retry_reader_grants:
+ oldcount = rwsem_atomic_update(adjustment, sem) - adjustment;
+ if (unlikely(oldcount < RWSEM_WAITING_BIAS)) {
+ /* A writer stole the lock. Undo our reader grants. */
+ if (rwsem_atomic_update(-adjustment, sem) < RWSEM_WAITING_BIAS)
+ goto out;
+ /* The writer left. Retry waking readers. */
+ goto retry_reader_grants;
+ }

- /* Grant an infinite number of read locks to the readers at the front
- * of the queue. Note we increment the 'active part' of the count by
- * the number of readers before waking any processes up.
+ /* Read locks have been granted to all queued readers; we can now
+ * actualy wake them.
*/
- woken = 0;
do {
- woken++;
-
- if (waiter->list.next == &sem->wait_list)
- break;
-
- waiter = list_entry(waiter->list.next,
- struct rwsem_waiter, list);
-
- } while (waiter->type != RWSEM_WAITING_FOR_WRITE);
-
- adjustment = woken * RWSEM_ACTIVE_READ_BIAS;
- if (waiter->type != RWSEM_WAITING_FOR_WRITE)
- /* hit end of list above */
- adjustment -= RWSEM_WAITING_BIAS;
-
- rwsem_atomic_add(adjustment, sem);
-
- next = sem->wait_list.next;
- for (loop = woken; loop > 0; loop--) {
- waiter = list_entry(next, struct rwsem_waiter, list);
next = waiter->list.next;
- tsk = waiter->task;
- smp_mb();
- waiter->task = NULL;
- wake_up_process(tsk);
- put_task_struct(tsk);
- }
-
- sem->wait_list.next = next;
- next->prev = &sem->wait_list;
+ if (waiter->type != RWSEM_WAITING_FOR_WRITE) {
+ list_del(&waiter->list);
+
+ /* Set RWSEM_WAITING_BIAS before waking the last reader
+ * so we know there will be a remaining active locker.
+ */
+ if (!(--readers) && !list_empty(&sem->wait_list))
+ rwsem_atomic_add(RWSEM_WAITING_BIAS, sem);
+
+ tsk = waiter->task;
+ smp_mb();
+ waiter->task = NULL;
+ wake_up_process(tsk);
+ put_task_struct(tsk);
+ }
+ waiter = list_entry(next, struct rwsem_waiter, list);
+ } while (readers);
+ sem->wait_readers = 0;

out:
return sem;
@@ -158,6 +131,7 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
get_task_struct(tsk);

raw_spin_lock_irq(&sem->wait_lock);
+ sem->wait_readers++;
if (list_empty(&sem->wait_list))
adjustment += RWSEM_WAITING_BIAS;
list_add_tail(&waiter.list, &sem->wait_list);
@@ -166,8 +140,8 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
count = rwsem_atomic_update(adjustment, sem);

/* If there are no active locks, wake the front queued process(es). */
- if (count == RWSEM_WAITING_BIAS)
- sem = __rwsem_do_wake(sem, RWSEM_WAKE_NO_ACTIVE);
+ if (!(count & RWSEM_ACTIVE_MASK))
+ sem = __rwsem_do_wake(sem, true);

raw_spin_unlock_irq(&sem->wait_lock);

@@ -211,7 +185,7 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
* any read locks that were queued ahead of us. */
if (count > RWSEM_WAITING_BIAS &&
adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
- sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);
+ sem = __rwsem_do_wake(sem, false);

/* wait until we successfully acquire the lock */
set_task_state(tsk, TASK_UNINTERRUPTIBLE);
@@ -256,7 +230,7 @@ struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem)

/* do nothing if list empty */
if (!list_empty(&sem->wait_list))
- sem = __rwsem_do_wake(sem, RWSEM_WAKE_ANY);
+ sem = __rwsem_do_wake(sem, true);

raw_spin_unlock_irqrestore(&sem->wait_lock, flags);

@@ -276,7 +250,7 @@ struct rw_semaphore *rwsem_downgrade_wake(struct rw_semaphore *sem)

/* do nothing if list empty */
if (!list_empty(&sem->wait_list))
- sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);
+ sem = __rwsem_do_wake(sem, false);

raw_spin_unlock_irqrestore(&sem->wait_lock, flags);

--
1.8.1.3
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/