Re: [PATCH RFC 1/2] qrwlock: A queue read/write lock implementation

From: Steven Rostedt
Date: Mon Jul 15 2013 - 10:39:17 EST


On Fri, 2013-07-12 at 21:34 -0400, Waiman Long wrote:

> Signed-off-by: Waiman Long <Waiman.Long@xxxxxx>
> ---
> include/asm-generic/qrwlock.h | 124 +++++++++++++++++++++
> lib/Kconfig | 11 ++
> lib/Makefile | 1 +
> lib/qrwlock.c | 246 +++++++++++++++++++++++++++++++++++++++++
> 4 files changed, 382 insertions(+), 0 deletions(-)
> create mode 100644 include/asm-generic/qrwlock.h
> create mode 100644 lib/qrwlock.c
>
> diff --git a/include/asm-generic/qrwlock.h b/include/asm-generic/qrwlock.h
> new file mode 100644
> index 0000000..d758dd0
> --- /dev/null
> +++ b/include/asm-generic/qrwlock.h
> @@ -0,0 +1,124 @@
> +/*
> + * Queue read/write lock
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 2 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU General Public License for more details.
> + *
> + * (C) Copyright 2013 Hewlett-Packard Development Company, L.P.
> + *
> + * Authors: Waiman Long <waiman.long@xxxxxx>
> + */
> +#ifndef __ASM_GENERIC_QRWLOCK_H
> +#define __ASM_GENERIC_QRWLOCK_H
> +
> +#include <linux/types.h>
> +#include <asm/cmpxchg.h>
> +#include <asm/barrier.h>
> +
> +#if (CONFIG_NR_CPUS < 65536)
> +typedef u16 __nrcpu_t;
> +typedef u32 __nrcpupair_t;
> +#else
> +typedef u32 __nrcpu_t;
> +typedef u64 __nrcpupair_t;
> +#endif
> +
> +/*
> + * The queue read/write lock data structure
> + * The reader stealing flag, if sea,t will enable reader at the head of the

"sea,t"?

> + * waiting queue to steal the read lock even if a writer is waiting. However,
> + * that may cause starving of a writer by a perpetual stream of readers.
> + */
> +struct qrwnode {
> + struct qrwnode *next;
> + bool wait; /* Waiting flag */
> +};
> +
> +typedef struct qrwlock {
> + union {
> + __nrcpupair_t rw; /* Reader/writer number pair */
> + struct {
> + u8 writer; /* Set if a writer is waiting */
> + u8 rsteal; /* Reader stealing flag */
> + __nrcpu_t readers; /* Number of active readers */
> + };
> + };
> + struct qrwnode *waitq; /* Tail of waiting queue */
> +} arch_rwlock_t;
> +
> +/**
> + * queue_read_can_lock- would read_trylock() succeed?
> + * @lock: Pointer to queue read/writer lock structure
> + */
> +static inline int queue_read_can_lock(struct qrwlock *lock)
> +{
> + return lock->writer == 0;
> +}
> +
> +/**
> + * queue_write_can_lock- would write_trylock() succeed?
> + * @lock: Pointer to queue read/writer lock structure
> + */
> +static inline int queue_write_can_lock(struct qrwlock *lock)
> +{
> + return lock->rw == 0;
> +}
> +
> +/**
> + * queue_read_unlock - release read lock of a queue read/write lock
> + * @lock : Pointer to queue read/writer lock structure
> + */
> +static inline void queue_read_unlock(struct qrwlock *lock)
> +{
> + /*
> + * Atomically decrement the reader count
> + */
> + add_smp(&lock->readers, -1);
> +}
> +
> +/**
> + * queue_write_unlock - release write lock of a queue read/write lock
> + * @lock : Pointer to queue read/writer lock structure
> + */
> +static inline void queue_write_unlock(struct qrwlock *lock)
> +{
> + ACCESS_ONCE(lock->writer) = 0;
> + smp_wmb();
> +}
> +
> +/*
> + * External function declarations
> + */
> +extern void queue_read_lock(struct qrwlock *);
> +extern void queue_write_lock(struct qrwlock *);
> +extern int queue_read_trylock(struct qrwlock *);
> +extern int queue_write_trylock(struct qrwlock *);
> +
> +/*
> + * Initializier
> + */
> +#define __ARCH_RW_LOCK_UNLOCKED { { .rw = 0 }, .waitq = NULL }
> +#define __ARCH_RW_LOCK_UNLOCKED_RSTEAL \
> + { { .writer = 0, .rsteal = 1, .readers = 0 }, waitq = NULL }
> +
> +/*
> + * Remapping read/write lock architecture specific functions to the
> + * corresponding queue read/write lock functions.
> + */
> +#define arch_read_can_lock(l) queue_read_can_lock(l)
> +#define arch_write_can_lock(l) queue_write_can_lock(l)
> +#define arch_read_lock(l) queue_read_lock(l)
> +#define arch_write_lock(l) queue_write_lock(l)
> +#define arch_read_trylock(l) queue_read_trylock(l)
> +#define arch_write_trylock(l) queue_write_trylock(l)
> +#define arch_read_unlock(l) queue_read_unlock(l)
> +#define arch_write_unlock(l) queue_write_unlock(l)
> +
> +#endif /* __ASM_GENERIC_QRWLOCK_H */
> diff --git a/lib/Kconfig b/lib/Kconfig
> index 35da513..de32799 100644
> --- a/lib/Kconfig
> +++ b/lib/Kconfig
> @@ -412,6 +412,17 @@ config SIGNATURE
> Implementation is done using GnuPG MPI library
>
> #
> +# Generic queue read/write lock
> +#
> +config QUEUE_RWLOCK
> + bool "Generic queue read/write lock"
> + depends on ARCH_QUEUE_RWLOCK
> + help
> + Use a NUMA optimized queue read/write lock implementation. This
> + improves performance under lock contention on systems with more
> + than two sockets.
> +
> +#
> # libfdt files, only selected if needed.
> #
> config LIBFDT
> diff --git a/lib/Makefile b/lib/Makefile
> index 7baccfd..2888c17 100644
> --- a/lib/Makefile
> +++ b/lib/Makefile
> @@ -187,3 +187,4 @@ quiet_cmd_build_OID_registry = GEN $@
> clean-files += oid_registry_data.c
>
> obj-$(CONFIG_UCS2_STRING) += ucs2_string.o
> +obj-$(CONFIG_QUEUE_RWLOCK) += qrwlock.o
> diff --git a/lib/qrwlock.c b/lib/qrwlock.c
> new file mode 100644
> index 0000000..a206fae
> --- /dev/null
> +++ b/lib/qrwlock.c
> @@ -0,0 +1,246 @@
> +/*
> + * Queue read/write lock
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 2 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU General Public License for more details.
> + *
> + * (C) Copyright 2013 Hewlett-Packard Development Company, L.P.
> + *
> + * Authors: Waiman Long <waiman.long@xxxxxx>
> + */
> +#include <linux/smp.h>
> +#include <linux/bug.h>
> +#include <linux/cpumask.h>
> +#include <linux/percpu.h>
> +#include <asm-generic/qrwlock.h>
> +
> +/*
> + * Compared with regular read/write lock, the queue read/write lock has
> + * has the following advantages:
> + * 1. It is more deterministic. Even though there is a slight chance of
> + * stealing the lock if come at the right moment, the granting of the
> + * lock is mostly in FIFO order.
> + * 2. It is faster in high contention situation.
> + *
> + * The only downside is that the lock is 4 bytes larger in 32-bit systems
> + * and 12 bytes larger in 64-bit systems.
> + *
> + * There are two queues for writers. The writer field of the lock is a
> + * one-slot waiting queue. The writers that follows will have to wait
> + * in the combined reader/writer queue (waitq).
> + *
> + * Compared with x86 ticket spinlock, the queue read/write lock is faster
> + * in high contention situation. The writer lock is also faster in single
> + * thread operations. Therefore, queue read/write lock can be considered as
> + * a replacement for those spinlocks that are highly contended as long as
> + * an increase in lock size is not an issue.
> + */
> +
> +/**
> + * wait_in_queue - Add to queue and wait until it is at the head
> + * @lock: Pointer to queue read/writer lock structure
> + * @node: Node pointer to be added to the queue
> + */
> +static __always_inline void
> +wait_in_queue(struct qrwlock *lock, struct qrwnode *node)
> +{
> + struct qrwnode *prev;
> +
> + node->next = NULL;
> + node->wait = true;
> + barrier();
> + prev = xchg(&lock->waitq, node);

"barrier()" isn't needed, as xchg() is a full blown smp_mb(), it also
acts as a compiler barrier.

> + if (prev) {
> + prev->next = node;
> + smp_wmb();
> + /*
> + * Wait until the waiting flag is off
> + */
> + while (ACCESS_ONCE(node->wait))
> + cpu_relax();
> + } else
> + node->wait = false;
> +}
> +
> +/**
> + * signal_next - Signal the next one in queue to be at the head
> + * @lock: Pointer to queue read/writer lock structure
> + * @node: Node pointer to the current head of queue
> + */
> +static __always_inline void
> +signal_next(struct qrwlock *lock, struct qrwnode *node)
> +{
> + struct qrwnode *next;
> +
> + /*
> + * Notify the next one in queue or clear the waiting queue
> + */
> + if (ACCESS_ONCE(lock->waitq) == node) {
> + if (cmpxchg(&lock->waitq, node, NULL) == node)
> + return;
> + }
> + /*
> + * Wait until the next one in queue set up the next field
> + */
> + while (!likely(next = ACCESS_ONCE(node->next)))
> + cpu_relax();
> + /*
> + * The next one in queue is now at the head
> + */
> + ACCESS_ONCE(next->wait) = false;
> + smp_wmb();
> +}
> +
> +/**
> + * queue_read_lock - acquire read lock of a queue read/write lock
> + * @lock: Pointer to queue read/writer lock structure
> + */
> +void queue_read_lock(struct qrwlock *lock)
> +{
> + struct qrwlock old, new;
> + struct qrwnode node;
> +
> + old.rw = ACCESS_ONCE(lock->rw);
> + while (likely(!old.writer)) {
> + /*
> + * Atomically increment the reader count while writer is 0
> + */
> + new.rw = old.rw;
> + new.readers++;
> +
> + if (cmpxchg(&lock->rw, old.rw, new.rw) == old.rw)
> + return;
> + cpu_relax();
> + old.rw = ACCESS_ONCE(lock->rw);
> + }
> + /*
> + * Slowpath
> + * Put the reader into the waiting queue
> + */
> + wait_in_queue(lock, &node);
> +
> + /*
> + * At the head of the wait queue now, wait until no writer is pending
> + * or when the reader stealing flag is set and readers are present.
> + * Then try to atomically inc the reader number.
> + */
> + while (true) {
> + old.rw = ACCESS_ONCE(lock->rw);
> + if (old.writer && (!old.rsteal || !old.readers)) {
> + cpu_relax();
> + continue;
> + }
> + new.rw = old.rw;
> + new.readers++;
> + if (cmpxchg(&lock->rw, old.rw, new.rw) == old.rw)
> + break;
> + cpu_relax();
> + }
> + signal_next(lock, &node);
> +}
> +EXPORT_SYMBOL(queue_read_lock);
> +
> +
> +/*
> + * queue_read_trylock - try to acquire read lock of a queue read/write lock
> + * @lock : Pointer to queue read/writer lock structure
> + * Return: 1 if lock acquired, 0 if failed
> + */
> +int queue_read_trylock(struct qrwlock *lock)
> +{
> + struct qrwlock old, new;
> +
> + old.rw = ACCESS_ONCE(lock->rw);
> + if (unlikely(old.writer))
> + return 0;
> + new.rw = old.rw;
> + new.readers++;
> +
> + if (cmpxchg(&lock->rw, old.rw, new.rw) == old.rw)
> + return 1;
> + cpu_relax();

What's the cpu_relax() for? It's not in a loop.


> + return 0;
> +}
> +EXPORT_SYMBOL(queue_read_trylock);
> +
> +/**
> + * queue_write_lock - acquire write lock of a queue read/write lock
> + * @lock : Pointer to queue read/writer lock structure
> + */
> +void queue_write_lock(struct qrwlock *lock)
> +{
> + struct qrwnode node, *next;
> +
> + if (likely(!ACCESS_ONCE(lock->writer))) {
> + /*
> + * Atomically set the writer to 1, then wait until reader
> + * count goes to 0.
> + */
> + if (xchg(&lock->writer, 1) == 0) {
> + while (ACCESS_ONCE(lock->readers))
> + cpu_relax();
> + return;
> + }
> + cpu_relax();

Another cpu_relax() outside of a loop.

> + }
> + /*
> + * Slowpath
> + * Put the writer into the waiting queue
> + */
> + wait_in_queue(lock, &node);
> +
> + /*
> + * At the head of the wait queue now, wait until no writer is pending
> + * and then atomically set it again.
> + */
> + while (true) {
> + if (ACCESS_ONCE(lock->writer)) {
> + cpu_relax();
> + continue;
> + }
> + if (xchg(&lock->writer, 1) != 0) {
> + cpu_relax();
> + continue;
> + }
> + break;
> + }
> + /*
> + * Wait until the reader count go to zero
> + */
> + while (ACCESS_ONCE(lock->readers))
> + cpu_relax();
> +
> + signal_next(lock, &node);
> +}
> +EXPORT_SYMBOL(queue_write_lock);
> +
> +/**
> + * queue_write_trylock - try to acquire write lock of a queue read/write lock
> + * @lock : Pointer to queue read/writer lock structure
> + * Return: 1 if lock acquired, 0 if failed
> + */
> +int queue_write_trylock(struct qrwlock *lock)
> +{
> + struct qrwlock old, new;
> +
> + old.rw = ACCESS_ONCE(lock->rw);
> + if (!old.rw) {
> + /*
> + * Atomically set the writer to 1 if readers = 0
> + */
> + new.rw = old.rw;
> + new.writer = 1;
> + if (cmpxchg(&lock->rw, old.rw, new.rw) == old.rw)
> + return 1;
> + cpu_relax();

Again the cpu_relax with no loop.

> + }
> + return 0;
> +}
> +EXPORT_SYMBOL(queue_write_trylock);

I haven't seen anything bad about this with a quick review. But it
should have a more thorough review to check all corner cases.

-- Steve


--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/