+/**
+ * queue_read_trylock - try to acquire read lock of a queue rwlock
+ * @lock : Pointer to queue rwlock structure
+ * Return: 1 if lock acquired, 0 if failed
+ */
+static inline int queue_read_trylock(struct qrwlock *lock)
+{
+ union qrwcnts cnts;
+
+ cnts.rw = ACCESS_ONCE(lock->cnts.rw);
+ if (likely(!cnts.writer)) {
+ cnts.rw = xadd(&lock->cnts.rw, _QR_BIAS);
Looks like xadd() is x86-specific, but this is common code. One
approach would be to do xadd() for other arches, another approach
would be to make .rw be an atomic_t rather than a u32. Making it
be atomic_t is probably easiest. (The cmpxchg()s would then need
to be atomic_cmpxchg().)
Ditto for add_smp() further down.
+
+/**
+ * queue_write_unlock - release write lock of a queue rwlock
+ * @lock : Pointer to queue rwlock structure
+ */
+static inline void queue_write_unlock(struct qrwlock *lock)
+{
+ /*
+ * Make sure that none of the critical section will be leaked out.
+ */
+ smp_mb__before_clear_bit();
+ ACCESS_ONCE(lock->cnts.writer) = 0;
+ smp_mb__after_clear_bit();
Interesting combination... It does seem to work out, though.
+++ b/kernel/locking/qrwlock.cJudging from the #defines at the end of qrwlock.h, this replacement is
@@ -0,0 +1,265 @@
+/*
+ * Queue read/write lock
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * (C) Copyright 2013 Hewlett-Packard Development Company, L.P.
+ *
+ * Authors: Waiman Long<waiman.long@xxxxxx>
+ */
+#include<linux/smp.h>
+#include<linux/bug.h>
+#include<linux/cpumask.h>
+#include<linux/percpu.h>
+#include<linux/hardirq.h>
+#include<asm-generic/qrwlock.h>
+
+/*
+ * Compared with regular rwlock, the queue rwlock has has the following
+ * advantages:
+ * 1. Even though there is a slight chance of stealing the lock if come at
+ * the right moment, the granting of the lock is mostly in FIFO order.
+ * 2. It is usually faster in high contention situation.
+ *
+ * The only downside is that the lock is 4 bytes larger in 32-bit systems
+ * and 12 bytes larger in 64-bit systems.
+ *
+ * There are two queues for writers. The writer field of the lock is a
+ * one-slot wait queue. The writers that follow will have to wait in the
+ * combined reader/writer queue (waitq).
+ *
+ * Compared with x86 ticket spinlock, the queue rwlock is faster in high
+ * contention situation. The writer lock is also faster in single thread
+ * operations. Therefore, queue rwlock can be considered as a replacement
+ * for those spinlocks that are highly contended as long as an increase
+ * in lock size is not an issue.
done on a file-by-file basis? Looks to me more like the replacement
must be all-or-nothing across the entire kernel, otherwise trouble would
ensue for locks used across multiple files. What am I missing here?
+ */These are now smp_load_acquire() and smp_store_release().
+
+#ifndef arch_mutex_cpu_relax
+# define arch_mutex_cpu_relax() cpu_relax()
+#endif
+
+#ifndef smp_mb__load_acquire
+# ifdef CONFIG_X86
+# define smp_mb__load_acquire() barrier()
+# else
+# define smp_mb__load_acquire() smp_mb()
+# endif
+#endif
+
+#ifndef smp_mb__store_release
+# ifdef CONFIG_X86
+# define smp_mb__store_release() barrier()
+# else
+# define smp_mb__store_release() smp_mb()
+# endif
+#endif
+while (smp_load_acquire(&node->wait))
+/**
+ * wait_in_queue - Add to queue and wait until it is at the head
+ * @lock: Pointer to queue rwlock structure
+ * @node: Node pointer to be added to the queue
+ */
+static __always_inline void
+wait_in_queue(struct qrwlock *lock, struct qrwnode *node)
+{
+ struct qrwnode *prev;
+
+ node->next = NULL;
+ node->wait = true;
+ prev = xchg(&lock->waitq, node);
+ if (prev) {
+ prev->next = node;
+ /*
+ * Wait until the waiting flag is off
+ */
+ while (ACCESS_ONCE(node->wait))
+ arch_mutex_cpu_relax();
+ smp_mb__load_acquire();
arch_mutex_cpu_relax();
On TSO systems like x86, this should generate the same code.
+ }The above pair of lines can be simply:
+}
+
+/**
+ * signal_next - Signal the next one in queue to be at the head
+ * @lock: Pointer to queue rwlock structure
+ * @node: Node pointer to the current head of queue
+ */
+static __always_inline void
+signal_next(struct qrwlock *lock, struct qrwnode *node)
+{
+ struct qrwnode *next;
+
+ /*
+ * Try to notify the next node first without disturbing the cacheline
+ * of the lock. If that fails, check to see if it is the last node
+ * and so should clear the wait queue.
+ */
+ next = ACCESS_ONCE(node->next);
+ if (likely(next))
+ goto notify_next;
+
+ /*
+ * Clear the wait queue if it is the last node
+ */
+ if ((ACCESS_ONCE(lock->waitq) == node)&&
+ (cmpxchg(&lock->waitq, node, NULL) == node))
+ return;
+ /*
+ * Wait until the next one in queue set up the next field
+ */
+ while (likely(!(next = ACCESS_ONCE(node->next))))
+ arch_mutex_cpu_relax();
+ /*
+ * The next one in queue is now at the head
+ */
+notify_next:
+ smp_mb__store_release();
+ ACCESS_ONCE(next->wait) = false;
smp_store_release(&next->wait, false);
This pairs nicely with the smp_load_acquire() in wait_in_queue().
+}Grammar nit: s/it/they/
+
+/**
+ * rspin_until_writer_unlock - inc reader count& spin until writer is gone
+ * @lock: Pointer to queue rwlock structure
+ * @cnts: Current queue rwlock counts structure
+ *
+ * In interrupt context or at the head of the queue, the reader will just
+ * increment the reader count& wait until the writer releases the lock.
+ */
+static __always_inline void
+rspin_until_writer_unlock(struct qrwlock *lock, union qrwcnts cnts)
+{
+ while (cnts.writer == _QW_LOCKED) {
+ arch_mutex_cpu_relax();
+ cnts.rw = ACCESS_ONCE(lock->cnts.rw);
+ }
+}
+
+/**
+ * queue_read_lock_slowpath - acquire read lock of a queue rwlock
+ * @lock: Pointer to queue rwlock structure
+ */
+void queue_read_lock_slowpath(struct qrwlock *lock)
+{
+ struct qrwnode node;
+ union qrwcnts cnts;
+
+ /*
+ * Readers come here when it cannot get the lock without waiting
Or s/Readers come/Each reader comes/
+ */This needs to be:
+ if (unlikely(irq_count())) {
+ /*
+ * Readers in interrupt context will spin until the lock is
+ * available without waiting in the queue.
+ */
+ cnts.rw = ACCESS_ONCE(lock->cnts.rw);
cnts.rw = smp_load_acquire(&lock->cnts.rw);
I was going to argue that the above assignment should be pushed into
rspin_until_writer_unlock(), but I see that this won't work for the
call later in this function. ;-)
+ rspin_until_writer_unlock(lock, cnts);We do need a memory barrier in this path, otherwise we are not guaranteed
to see the writer's critical section. One approach would be to make
rspin_until_writer_unlock()s "while" loop body do:
arch_mutex_cpu_relax();
cnts.rw = smp_load_acquire(&lock->cnts.rw);
+ return;Making rspin_until_writer_unlock() do an smp_load_acquire() makes this
+ }
+ add_smp(&lock->cnts.rw, -_QR_BIAS);
+
+ /*
+ * Put the reader into the wait queue
+ */
+ wait_in_queue(lock,&node);
+
+ /*
+ * At the head of the wait queue now, wait until the writer state
+ * goes to 0 and then try to increment the reader count and get
+ * the lock.
+ */
+ while (ACCESS_ONCE(lock->cnts.writer))
+ arch_mutex_cpu_relax();
+ cnts.rw = xadd(&lock->cnts.rw, _QR_BIAS);
+ rspin_until_writer_unlock(lock, cnts);
+ /*
+ * Need to have a barrier with read-acquire semantics
+ */
+ smp_mb__load_acquire();
unnecessary.
+ signal_next(lock,&node);Good, this allows multiple readers to acquire the lock concurrently,
give or take memory latency compared to critical-section duration.
When the first writer shows up, it presumably spins on the lock word.