Re: [PATCH RFC ticketlock] v3 Auto-queued ticketlock

From: Paul E. McKenney
Date: Wed Jun 12 2013 - 12:59:33 EST


On Thu, Jun 13, 2013 at 12:13:47AM +0800, Lai Jiangshan wrote:
> On Wed, Jun 12, 2013 at 11:40 PM, Paul E. McKenney
> <paulmck@xxxxxxxxxxxxxxxxxx> wrote:
> > Breaking up locks is better than implementing high-contention locks, but
> > if we must have high-contention locks, why not make them automatically
> > switch between light-weight ticket locks at low contention and queued
> > locks at high contention? After all, this would remove the need for
> > the developer to predict which locks will be highly contended.
> >
> > This commit allows ticket locks to automatically switch between pure
> > ticketlock and queued-lock operation as needed. If too many CPUs are
> > spinning on a given ticket lock, a queue structure will be allocated
> > and the lock will switch to queued-lock operation. When the lock becomes
> > free, it will switch back into ticketlock operation. The low-order bit
> > of the head counter is used to indicate that the lock is in queued mode,
> > which forces an unconditional mismatch between the head and tail counters.
> > This approach means that the common-case code path under conditions of
> > low contention is very nearly that of a plain ticket lock.
> >
> > A fixed number of queueing structures is statically allocated in an
> > array. The ticket-lock address is used to hash into an initial element,
> > but if that element is already in use, it moves to the next element. If
> > the entire array is already in use, continue to spin in ticket mode.
> >
> > Signed-off-by: Paul E. McKenney <paulmck@xxxxxxxxxxxxxxxxxx>
> > [ paulmck: Eliminate duplicate code and update comments (Steven Rostedt). ]
> > [ paulmck: Address Eric Dumazet review feedback. ]
> > [ paulmck: Use Lai Jiangshan idea to eliminate smp_mb(). ]
> > [ paulmck: Expand ->head_tkt from s32 to s64 (Waiman Long). ]
> > [ paulmck: Move cpu_relax() to main spin loop (Steven Rostedt). ]
> > [ paulmck: Reduce queue-switch contention (Waiman Long). ]
> > [ paulmck: __TKT_SPIN_INC for __ticket_spin_trylock() (Steffen Persvold). ]
> > [ paulmck: Type safety fixes (Steven Rostedt). ]
> > [ paulmck: Pre-check cmpxchg() value (Waiman Long). ]
> > [ paulmck: smp_mb() downgrade to smp_wmb() (Lai Jiangshan). ]
> >
> > diff --git a/arch/x86/include/asm/spinlock.h b/arch/x86/include/asm/spinlock.h
> > index 33692ea..5aa0177 100644
> > --- a/arch/x86/include/asm/spinlock.h
> > +++ b/arch/x86/include/asm/spinlock.h
> > @@ -34,6 +34,21 @@
> > # define UNLOCK_LOCK_PREFIX
> > #endif
> >
> > +#ifdef CONFIG_TICKET_LOCK_QUEUED
> > +
> > +#define __TKT_SPIN_INC 2
> > +bool tkt_spin_pass(arch_spinlock_t *ap, struct __raw_tickets inc);
> > +
> > +#else /* #ifdef CONFIG_TICKET_LOCK_QUEUED */
> > +
> > +#define __TKT_SPIN_INC 1
> > +static inline bool tkt_spin_pass(arch_spinlock_t *ap, struct __raw_tickets inc)
> > +{
> > + return false;
> > +}
> > +
> > +#endif /* #else #ifdef CONFIG_TICKET_LOCK_QUEUED */
> > +
> > /*
> > * Ticket locks are conceptually two parts, one indicating the current head of
> > * the queue, and the other indicating the current tail. The lock is acquired
> > @@ -49,17 +64,16 @@
> > */
> > static __always_inline void __ticket_spin_lock(arch_spinlock_t *lock)
> > {
> > - register struct __raw_tickets inc = { .tail = 1 };
> > + register struct __raw_tickets inc = { .tail = __TKT_SPIN_INC };
> >
> > inc = xadd(&lock->tickets, inc);
> > -
> > for (;;) {
> > - if (inc.head == inc.tail)
> > + if (inc.head == inc.tail || tkt_spin_pass(lock, inc))
> > break;
> > cpu_relax();
> > inc.head = ACCESS_ONCE(lock->tickets.head);
> > }
> > - barrier(); /* make sure nothing creeps before the lock is taken */
> > + barrier(); /* Make sure nothing creeps in before the lock is taken. */
> > }
> >
> > static __always_inline int __ticket_spin_trylock(arch_spinlock_t *lock)
> > @@ -70,17 +84,33 @@ static __always_inline int __ticket_spin_trylock(arch_spinlock_t *lock)
> > if (old.tickets.head != old.tickets.tail)
> > return 0;
> >
> > - new.head_tail = old.head_tail + (1 << TICKET_SHIFT);
> > + new.head_tail = old.head_tail + (__TKT_SPIN_INC << TICKET_SHIFT);
> >
> > /* cmpxchg is a full barrier, so nothing can move before it */
> > return cmpxchg(&lock->head_tail, old.head_tail, new.head_tail) == old.head_tail;
> > }
> >
> > +#ifndef CONFIG_TICKET_LOCK_QUEUED
> > +
> > static __always_inline void __ticket_spin_unlock(arch_spinlock_t *lock)
> > {
> > __add(&lock->tickets.head, 1, UNLOCK_LOCK_PREFIX);
> > }
> >
> > +#else /* #ifndef CONFIG_TICKET_LOCK_QUEUED */
> > +
> > +extern void tkt_q_do_wake(arch_spinlock_t *lock);
> > +
> > +static __always_inline void __ticket_spin_unlock(arch_spinlock_t *lock)
> > +{
> > + __ticket_t head = 2;
> > +
> > + head = xadd(&lock->tickets.head, head);
> > + if (head & 0x1)
> > + tkt_q_do_wake(lock);
> > +}
> > +#endif /* #else #ifndef CONFIG_TICKET_LOCK_QUEUED */
> > +
> > static inline int __ticket_spin_is_locked(arch_spinlock_t *lock)
> > {
> > struct __raw_tickets tmp = ACCESS_ONCE(lock->tickets);
> > diff --git a/arch/x86/include/asm/spinlock_types.h b/arch/x86/include/asm/spinlock_types.h
> > index ad0ad07..cdaefdd 100644
> > --- a/arch/x86/include/asm/spinlock_types.h
> > +++ b/arch/x86/include/asm/spinlock_types.h
> > @@ -7,12 +7,18 @@
> >
> > #include <linux/types.h>
> >
> > -#if (CONFIG_NR_CPUS < 256)
> > +#if (CONFIG_NR_CPUS < 128)
> > typedef u8 __ticket_t;
> > typedef u16 __ticketpair_t;
> > -#else
> > +#define TICKET_T_CMP_GE(a, b) (UCHAR_MAX / 2 >= (unsigned char)((a) - (b)))
> > +#elif (CONFIG_NR_CPUS < 32768)
> > typedef u16 __ticket_t;
> > typedef u32 __ticketpair_t;
> > +#define TICKET_T_CMP_GE(a, b) (USHRT_MAX / 2 >= (unsigned short)((a) - (b)))
> > +#else
> > +typedef u32 __ticket_t;
> > +typedef u64 __ticketpair_t;
> > +#define TICKET_T_CMP_GE(a, b) (UINT_MAX / 2 >= (unsigned int)((a) - (b)))
> > #endif
> >
> > #define TICKET_SHIFT (sizeof(__ticket_t) * 8)
> > @@ -21,7 +27,11 @@ typedef struct arch_spinlock {
> > union {
> > __ticketpair_t head_tail;
> > struct __raw_tickets {
> > +#ifdef __BIG_ENDIAN__
> > + __ticket_t tail, head;
> > +#else /* #ifdef __BIG_ENDIAN__ */
> > __ticket_t head, tail;
> > +#endif /* #else #ifdef __BIG_ENDIAN__ */
> > } tickets;
> > };
> > } arch_spinlock_t;
> > diff --git a/include/linux/kernel.h b/include/linux/kernel.h
> > index e9ef6d6..816a87c 100644
> > --- a/include/linux/kernel.h
> > +++ b/include/linux/kernel.h
> > @@ -15,6 +15,7 @@
> > #include <asm/byteorder.h>
> > #include <uapi/linux/kernel.h>
> >
> > +#define UCHAR_MAX ((u8)(~0U))
> > #define USHRT_MAX ((u16)(~0U))
> > #define SHRT_MAX ((s16)(USHRT_MAX>>1))
> > #define SHRT_MIN ((s16)(-SHRT_MAX - 1))
> > diff --git a/kernel/Kconfig.locks b/kernel/Kconfig.locks
> > index 44511d1..900c0f0 100644
> > --- a/kernel/Kconfig.locks
> > +++ b/kernel/Kconfig.locks
> > @@ -223,3 +223,38 @@ endif
> > config MUTEX_SPIN_ON_OWNER
> > def_bool y
> > depends on SMP && !DEBUG_MUTEXES
> > +
> > +config TICKET_LOCK_QUEUED
> > + bool "Dynamically switch between ticket and queued locking"
> > + depends on SMP
> > + default n
> > + ---help---
> > + Enable dynamic switching between ticketlock and queued locking
> > + on a per-lock basis. This option will slow down low-contention
> > + acquisition and release very slightly (additional conditional
> > + in release path), but will provide more efficient operation at
> > + high levels of lock contention. High-contention operation will
> > + not be quite as efficient as would be a pure queued lock, but
> > + this dynamic approach consumes less memory than queud locks
> > + and also runs faster at low levels of contention.
> > +
> > + Say "Y" if you are running on a large system with a workload
> > + that is likely to result in high levels of contention.
> > +
> > + Say "N" if you are unsure.
> > +
> > +config TICKET_LOCK_QUEUED_SWITCH
> > + int "When to switch from ticket to queued locking"
> > + depends on TICKET_LOCK_QUEUED
> > + default 8
> > + range 3 32
> > + ---help---
> > + Specify how many tasks should be spinning on the lock before
> > + switching to queued mode. Systems with low-latency memory/cache
> > + interconnects will prefer larger numbers, while extreme low-latency
> > + and real-time workloads will prefer a smaller number. Of course,
> > + extreme real-time workloads would be even happier if contention
> > + on the locks were reduced to the point that there was never any
> > + need for queued locking in the first place.
> > +
> > + Take the default if you are unsure.
> > diff --git a/kernel/Makefile b/kernel/Makefile
> > index 271fd31..70a91f7 100644
> > --- a/kernel/Makefile
> > +++ b/kernel/Makefile
> > @@ -51,6 +51,7 @@ endif
> > obj-$(CONFIG_SMP) += spinlock.o
> > obj-$(CONFIG_DEBUG_SPINLOCK) += spinlock.o
> > obj-$(CONFIG_PROVE_LOCKING) += spinlock.o
> > +obj-$(CONFIG_TICKET_LOCK_QUEUED) += tktqlock.o
> > obj-$(CONFIG_UID16) += uid16.o
> > obj-$(CONFIG_MODULES) += module.o
> > obj-$(CONFIG_MODULE_SIG) += module_signing.o modsign_pubkey.o modsign_certificate.o
> > diff --git a/kernel/tktqlock.c b/kernel/tktqlock.c
> > new file mode 100644
> > index 0000000..912817c
> > --- /dev/null
> > +++ b/kernel/tktqlock.c
> > @@ -0,0 +1,383 @@
> > +/*
> > + * Queued ticket spinlocks.
> > + *
> > + * This program is free software; you can redistribute it and/or modify
> > + * it under the terms of the GNU General Public License as published by
> > + * the Free Software Foundation; either version 2 of the License, or
> > + * (at your option) any later version.
> > + *
> > + * This program is distributed in the hope that it will be useful,
> > + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> > + * GNU General Public License for more details.
> > + *
> > + * You should have received a copy of the GNU General Public License
> > + * along with this program; if not, write to the Free Software
> > + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
> > + *
> > + * Copyright IBM Corporation, 2013
> > + *
> > + * Authors: Paul E. McKenney <paulmck@xxxxxxxxxxxxxxxxxx>
> > + */
> > +#include <linux/types.h>
> > +#include <linux/kernel.h>
> > +#include <linux/spinlock.h>
> > +#include <linux/smp.h>
> > +#include <linux/percpu.h>
> > +
> > +struct tkt_q {
> > + int cpu;
> > + __ticket_t tail;
> > + struct tkt_q *next;
> > +};
> > +
> > +struct tkt_q_head {
> > + arch_spinlock_t *ref; /* Pointer to spinlock if in use. */
> > + s64 head_tkt; /* Head ticket when started queuing. */
> > + struct tkt_q *spin; /* Head of queue. */
> > + struct tkt_q **spin_tail; /* Tail of queue. */
> > +};
> > +
> > +/*
> > + * TKT_Q_SWITCH is twice the number of CPUs that must be spinning on a
> > + * given ticket lock to motivate switching to spinning on a queue.
> > + * The reason that it is twice the number is because the bottom bit of
> > + * the ticket is reserved for the bit that indicates that a queue is
> > + * associated with the lock.
> > + */
> > +#define TKT_Q_SWITCH ((__ticket_t)(CONFIG_TICKET_LOCK_QUEUED_SWITCH * 2))
> > +
> > +/*
> > + * TKT_Q_NQUEUES is the number of queues to maintain. Large systems
> > + * might have multiple highly contended locks, so provide more queues for
> > + * systems with larger numbers of CPUs.
> > + */
> > +#define TKT_Q_NQUEUES (2 * DIV_ROUND_UP(NR_CPUS + ((int)TKT_Q_SWITCH) - 1, \
> > + (int)TKT_Q_SWITCH))
> > +
> > +/* The queues themselves. */
> > +struct tkt_q_head tkt_q_heads[TKT_Q_NQUEUES];
> > +
> > +/* Advance to the next queue slot, wrapping around to the beginning. */
> > +static int tkt_q_next_slot(int i)
> > +{
> > + return (++i < TKT_Q_NQUEUES) ? i : 0;
> > +}
> > +
> > +/* Very crude hash from lock address to queue slot number. */
> > +static unsigned long tkt_q_hash(arch_spinlock_t *lock)
> > +{
> > + return (((unsigned long)lock) >> 8) % TKT_Q_NQUEUES;
> > +}
> > +
> > +/*
> > + * Return a pointer to the queue header associated with the specified lock,
> > + * or return NULL if there is no queue for the lock or if the lock's queue
> > + * is in transition.
> > + */
> > +static struct tkt_q_head *tkt_q_find_head(arch_spinlock_t *lock)
> > +{
> > + int i;
> > + int start;
> > +
> > + start = i = tkt_q_hash(lock);
> > + do
> > + if (ACCESS_ONCE(tkt_q_heads[i].ref) == lock)
> > + return &tkt_q_heads[i];
> > + while ((i = tkt_q_next_slot(i)) != start);
> > + return NULL;
> > +}
> > +
> > +/*
> > + * Try to stop queuing, reverting back to normal ticket-lock operation.
> > + * We can only stop queuing when the queue is empty, which means that
> > + * we need to correctly handle races where someone shows up in the queue
> > + * just as we are trying to dispense with the queue. They win, we lose.
> > + */
> > +static bool tkt_q_try_unqueue(arch_spinlock_t *lock, struct tkt_q_head *tqhp)
> > +{
> > + arch_spinlock_t asold;
> > + arch_spinlock_t asnew;
> > +
> > + /* Pick up the ticket values. */
> > + asold = ACCESS_ONCE(*lock);
> > + if ((asold.tickets.head & ~0x1) == asold.tickets.tail) {
> > +
> > + /* Attempt to mark the lock as not having a queue. */
> > + asnew = asold;
> > + asnew.tickets.head &= ~0x1;
> > + if (cmpxchg(&lock->head_tail,
> > + asold.head_tail,
> > + asnew.head_tail) == asold.head_tail) {
> > +
> > + /* Succeeded, mark the queue as unused. */
> > + ACCESS_ONCE(tqhp->ref) = NULL;
> > + return true;
> > + }
> > + }
> > +
> > + /* Failed, tell the caller there is still a queue to pass off to. */
> > + return false;
> > +}
> > +
> > +/*
> > + * Hand the lock off to the first CPU on the queue.
> > + */
> > +void tkt_q_do_wake(arch_spinlock_t *lock)
> > +{
> > + struct tkt_q_head *tqhp;
> > + struct tkt_q *tqp;
> > +
> > + /*
> > + * If the queue is still being set up, wait for it. Note that
> > + * the caller's xadd() provides the needed memory ordering.
> > + */
> > + while ((tqhp = tkt_q_find_head(lock)) == NULL)
> > + cpu_relax();
> > +
> > + for (;;) {
> > +
> > + /* Find the first queue element. */
> > + tqp = ACCESS_ONCE(tqhp->spin);
> > + if (tqp != NULL)
> > + break; /* Element exists, hand off lock. */
> > + if (tkt_q_try_unqueue(lock, tqhp))
> > + return; /* No element, successfully removed queue. */
> > + cpu_relax();
> > + }
> > + if (ACCESS_ONCE(tqhp->head_tkt) != -1)
> > + ACCESS_ONCE(tqhp->head_tkt) = -1;
> > + smp_mb(); /* Order pointer fetch and assignment against handoff. */
> > + ACCESS_ONCE(tqp->cpu) = -1;
> > +}
> > +EXPORT_SYMBOL(tkt_q_do_wake);
> > +
> > +/*
> > + * Given a lock that already has a queue associated with it, spin on
> > + * that queue. Return false if there was no queue (which means we do not
> > + * hold the lock) and true otherwise (meaning we -do- hold the lock).
> > + */
> > +bool tkt_q_do_spin(arch_spinlock_t *lock, struct __raw_tickets inc)
> > +{
> > + struct tkt_q **oldtail;
> > + struct tkt_q tq;
> > + struct tkt_q_head *tqhp;
> > +
> > + /*
> > + * Ensure that accesses to queue header happen after sensing
> > + * the lock's have-queue bit.
> > + */
> > + smp_mb(); /* See above block comment. */
> > +
> > + /* If there no longer is a queue, leave. */
> > + tqhp = tkt_q_find_head(lock);
> > + if (tqhp == NULL)
> > + return false;
> > +
> > + /* Initialize our queue element. */
> > + tq.cpu = raw_smp_processor_id();
> > + tq.tail = inc.tail;
> > + tq.next = NULL;
> > +
> > + /* Check to see if we already hold the lock. */
> > + if (ACCESS_ONCE(tqhp->head_tkt) == inc.tail) {
> > + /* The last holder left before queue formed, we hold lock. */
> > + tqhp->head_tkt = -1;
> > + return true;
> > + }
> > +
> > + /*
> > + * Add our element to the tail of the queue. Note that if the
> > + * queue is empty, the ->spin_tail pointer will reference
> > + * the queue's head pointer, namely ->spin.
> > + */
> > + oldtail = xchg(&tqhp->spin_tail, &tq.next);
> > + ACCESS_ONCE(*oldtail) = &tq;
> > +
> > + /* Spin until handoff. */
> > + while (ACCESS_ONCE(tq.cpu) != -1)
> > + cpu_relax();
> > +
> > + /*
> > + * Remove our element from the queue. If the queue is now empty,
> > + * update carefully so that the next acquisition will enqueue itself
> > + * at the head of the list. Of course, the next enqueue operation
> > + * might be happening concurrently, and this code needs to handle all
> > + * of the possible combinations, keeping in mind that the enqueue
> > + * operation happens in two stages: (1) update the tail pointer and
> > + * (2) update the predecessor's ->next pointer. With this in mind,
> > + * the following code needs to deal with three scenarios:
> > + *
> > + * 1. tq is the last entry. In this case, we use cmpxchg to
> > + * point the list tail back to the list head (->spin). If
> > + * the cmpxchg fails, that indicates that we are instead
> > + * in scenario 2 below. If the cmpxchg succeeds, the next
> > + * enqueue operation's tail-pointer exchange will enqueue
> > + * the next element at the queue head, because the ->spin_tail
> > + * pointer now references the queue head.
> > + *
> > + * 2. tq is the last entry, and the next entry has updated the
> > + * tail pointer but has not yet updated tq.next. In this
> > + * case, tq.next is NULL, the cmpxchg will fail, and the
> > + * code will wait for the enqueue to complete before completing
> > + * removal of tq from the list.
> > + *
> > + * 3. tq is not the last pointer. In this case, tq.next is non-NULL,
> > + * so the following code simply removes tq from the list.
> > + */
> > + if (tq.next == NULL) {
> > +
> > + /* Mark the queue empty. */
> > + tqhp->spin = NULL;
> > +
> > + /* Try to point the tail back at the head. */
> > + if (cmpxchg(&tqhp->spin_tail,
> > + &tq.next,
> > + &tqhp->spin) == &tq.next)
> > + return true; /* Succeeded, queue is now empty. */
> > +
> > + /* Failed, if needed, wait for the enqueue to complete. */
> > + while (tq.next == NULL)
> > + cpu_relax();
> > +
> > + /* The following code will repair the head. */
> > + }
> > + smp_mb(); /* Force ordering between handoff and critical section. */
> > +
> > + /*
> > + * Advance list-head pointer. This same task will be the next to
> > + * access this when releasing the lock, so no need for a memory
> > + * barrier after the following assignment.
> > + */
> > + ACCESS_ONCE(tqhp->spin) = tq.next;
> > + return true;
> > +}
> > +
> > +/*
> > + * Given a lock that does not have a queue, attempt to associate the
> > + * i-th queue with it, returning true if successful (meaning we hold
> > + * the lock) or false otherwise (meaning we do -not- hold the lock).
> > + * Note that the caller has already filled in ->ref with 0x1, so we
> > + * own the queue.
> > + */
> > +static bool
> > +tkt_q_init_contend(int i, arch_spinlock_t *lock, struct __raw_tickets inc)
> > +{
> > + arch_spinlock_t asold;
> > + arch_spinlock_t asnew;
> > + struct tkt_q_head *tqhp;
> > +
> > + /* Initialize the i-th queue header. */
> > + tqhp = &tkt_q_heads[i];
> > + tqhp->spin = NULL;
> > + tqhp->spin_tail = &tqhp->spin;
> > +
> > + /* Each pass through this loop attempts to mark the lock as queued. */
> > + do {
> > + asold.head_tail = ACCESS_ONCE(lock->head_tail);
> > + asnew = asold;
> > + if (asnew.tickets.head & 0x1) {
> > +
> > + /* Someone beat us to it, back out. */
> > + smp_wmb(); /* Ensure init before NULLing. */
> > + ACCESS_ONCE(tqhp->ref) = NULL;
> > +
> > + /* Spin on the queue element they set up. */
> > + return tkt_q_do_spin(lock, inc);
> > + }
> > +
> > + /*
> > + * Record the head counter in case one of the spinning
> > + * CPUs already holds the lock but doesn't realize it yet.
> > + */
> > + tqhp->head_tkt = asold.tickets.head;
> > +
> > + /* The low-order bit in the head counter says "queued". */
> > + asnew.tickets.head |= 0x1;
>
> if asold.tickets.head == inc.tail, we will quickly success in the next steps,
> we don't need to cancel&return directly to avoid unneeded redo from other cpu.
> but what if in tkt_q_start_contend() ... ?

I am not completely sure that I understand what you are asking.

The straightforward answer is that ->head_tkt handles this case, and it
is checked before we start spinning on our queue element.

Another possible answer is that the current version tends to avoid
having multiple CPUs attempting to concurrently switch a given lock to
queued mode.

So what was the real question? ;-)

> > + } while (cmpxchg(&lock->head_tail,
> > + asold.head_tail,
> > + asnew.head_tail) != asold.head_tail);
> > +
> > + /* Point the queue at the lock and go spin on it. */
> > + ACCESS_ONCE(tqhp->ref) = lock;
> > + return tkt_q_do_spin(lock, inc);
> > +}
> > +
> > +/*
> > + * Start handling a period of high contention by finding a queue to associate
> > + * with this lock. Returns true if successful (in which case we hold the
> > + * lock) and false otherwise (in which case we do -not- hold the lock).
> > + */
> > +bool tkt_q_start_contend(arch_spinlock_t *lock, struct __raw_tickets inc)
> > +{
> > + int i;
> > + int start;
> > +
> > + /* Hash the lock address to find a starting point. */
> > + start = i = tkt_q_hash(lock);
> > +
> > + /*
> > + * Each pass through the following loop attempts to associate
> > + * the lock with the corresponding queue.
> > + */
> > + do {
> > + /*
> > + * Use 0x1 to mark the queue in use, but also avoiding
> > + * any spinners trying to use it before we get it all
> > + * initialized.
> > + */
> > + if (!tkt_q_heads[i].ref &&
> > + cmpxchg(&tkt_q_heads[i].ref,
> > + NULL,
> > + (arch_spinlock_t *)0x1) == NULL) {
> > +
> > + /* Succeeded, now go initialize it. */
> > + return tkt_q_init_contend(i, lock, inc);
> > + }
> > +
> > + /* If someone beat us to it, go spin on their queue. */
> > + if (ACCESS_ONCE(lock->tickets.head) & 0x1)
> > + return tkt_q_do_spin(lock, inc);
>
> if (ACCESS_ONCE(lock->tickets.head) == inc.tail)
> return true;

Is this really a good change? The downside is that we fail to put the
lock into queued mode when it needs it. Sooner or later, someone else
should do so, but we will be suffering from memory contention in the
meantime. It is not clear to me that this is a win.

> > + } while ((i = tkt_q_next_slot(i)) != start);
> > +
> > + /* All the queues are in use, revert to spinning on the ticket lock. */
> > + return false;
> > +}
> > +
> > +bool tkt_spin_pass(arch_spinlock_t *ap, struct __raw_tickets inc)
> > +{
> > + if (unlikely(inc.head & 0x1)) {
> > +
> > + /* This lock has a queue, so go spin on the queue. */
> > + if (tkt_q_do_spin(ap, inc))
> > + return true;
> > +
> > + /* Get here if the queue is in transition: Retry next time. */
> > +
> > + } else if (inc.tail - TKT_Q_SWITCH == inc.head) {
> > +
> > + /*
> > + * This lock has lots of spinners, but no queue. Go create
> > + * a queue to spin on.
> > + *
> > + * In the common case, only the single task that
> > + * sees the head and tail tickets being different by
> > + * exactly TKT_Q_SWITCH will come here set up the queue,
> > + * which prevents a "thundering herd" of queue setups.
> > + * Although it is still possible for an unfortunate series
> > + * of lock handoffs and newly arrived tasks to result
> > + * in more than one task performing a queue setup, this
> > + * is unlikely. Of course, this situation must still be
> > + * handled correctly, which is the job of the cmpxchg()
> > + * in tkt_q_start_contend().
> > + */
> > + if (tkt_q_start_contend(ap, inc))
> > + return true;
> > +
> > + /* Get here if the queue is in transition: Retry next time. */
> > + }
> > +
> > + /* Either no need for a queue or the queue is in transition. Spin. */
> > + return false;
> > +}
> > +EXPORT_SYMBOL(tkt_spin_pass);
> >
> > --
> > To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
> > the body of a message to majordomo@xxxxxxxxxxxxxxx
> > More majordomo info at http://vger.kernel.org/majordomo-info.html
> > Please read the FAQ at http://www.tux.org/lkml/
>

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/