Re: semaphore: lockless fastpath using atomic_{inc,dec}_return
From: Bruno Santos
Date: Wed Jul 09 2008 - 10:14:08 EST
So far the machine I'm testing this (Core2 Duo) it's been up to almost 24H.
It seems the patch got screwed by the mailer, so I'm posting it again.
From 343d08a5d172d103e49c77e5580a45f02fab2b5e Mon Sep 17 00:00:00 2001
From: Bruno Santos <bsantos@xxxxxxxx>
Date: Tue, 8 Jul 2008 23:40:53 +0100
Subject: [PATCH] semaphore lockless fastpath
Signed-off-by: Bruno Santos <bsantos@xxxxxxxx>
---
include/linux/semaphore.h | 4 +-
kernel/semaphore.c | 131
+++++++++++++++++++++++++--------------------
2 files changed, 75 insertions(+), 60 deletions(-)
diff --git a/include/linux/semaphore.h b/include/linux/semaphore.h
index 9cae64b..d7850f0 100644
--- a/include/linux/semaphore.h
+++ b/include/linux/semaphore.h
@@ -14,15 +14,15 @@
/* Please don't access any members of this structure directly */
struct semaphore {
+ atomic_t count;
spinlock_t lock;
- unsigned int count;
struct list_head wait_list;
};
#define __SEMAPHORE_INITIALIZER(name, n) \
{ \
+ .count = ATOMIC_INIT(n), \
.lock = __SPIN_LOCK_UNLOCKED((name).lock), \
- .count = n, \
.wait_list = LIST_HEAD_INIT((name).wait_list), \
}
diff --git a/kernel/semaphore.c b/kernel/semaphore.c
index 5c2942e..2815980 100644
--- a/kernel/semaphore.c
+++ b/kernel/semaphore.c
@@ -8,6 +8,8 @@
* A counting semaphore may be acquired 'n' times before sleeping.
* See mutex.c for single-acquisition sleeping locks which enforce
* rules which allow code to be debugged more easily.
+ *
+ * Lockless fastpath by Bruno Santos <bsantos@xxxxxxxx>
*/
/*
@@ -22,7 +24,7 @@
* too.
*
* The ->count variable represents how many more tasks can acquire this
- * semaphore. If it's zero, there may be tasks waiting on the wait_list.
+ * semaphore. If it's negative, there may be tasks waiting on the
wait_list.
*/
#include <linux/compiler.h>
@@ -51,14 +53,10 @@ static noinline void __up(struct semaphore *sem);
*/
void down(struct semaphore *sem)
{
- unsigned long flags;
+ if (likely(atomic_dec_return(&sem->count) >= 0))
+ return;
- spin_lock_irqsave(&sem->lock, flags);
- if (likely(sem->count > 0))
- sem->count--;
- else
- __down(sem);
- spin_unlock_irqrestore(&sem->lock, flags);
+ __down(sem);
}
EXPORT_SYMBOL(down);
@@ -73,17 +71,10 @@ EXPORT_SYMBOL(down);
*/
int down_interruptible(struct semaphore *sem)
{
- unsigned long flags;
- int result = 0;
+ if (likely(atomic_dec_return(&sem->count) >= 0))
+ return 0;
- spin_lock_irqsave(&sem->lock, flags);
- if (likely(sem->count > 0))
- sem->count--;
- else
- result = __down_interruptible(sem);
- spin_unlock_irqrestore(&sem->lock, flags);
-
- return result;
+ return __down_interruptible(sem);
}
EXPORT_SYMBOL(down_interruptible);
@@ -99,17 +90,10 @@ EXPORT_SYMBOL(down_interruptible);
*/
int down_killable(struct semaphore *sem)
{
- unsigned long flags;
- int result = 0;
+ if (likely(atomic_dec_return(&sem->count) >= 0))
+ return 0;
- spin_lock_irqsave(&sem->lock, flags);
- if (likely(sem->count > 0))
- sem->count--;
- else
- result = __down_killable(sem);
- spin_unlock_irqrestore(&sem->lock, flags);
-
- return result;
+ return __down_killable(sem);
}
EXPORT_SYMBOL(down_killable);
@@ -128,16 +112,21 @@ EXPORT_SYMBOL(down_killable);
*/
int down_trylock(struct semaphore *sem)
{
- unsigned long flags;
- int count;
+ int old, cmp;
- spin_lock_irqsave(&sem->lock, flags);
- count = sem->count - 1;
- if (likely(count >= 0))
- sem->count = count;
- spin_unlock_irqrestore(&sem->lock, flags);
+ /*
+ * The down_trylock fastpath is not very optimal compared to the
+ * down and up fastpath's, but it should be used less frequently.
+ */
+ old = atomic_read(&sem->count);
+ while (old > 0) {
+ cmp = old;
+ old = atomic_cmpxchg(&sem->count, cmp, old - 1);
+ if (old == cmp)
+ return 0;
+ }
- return (count < 0);
+ return 1;
}
EXPORT_SYMBOL(down_trylock);
@@ -153,17 +142,10 @@ EXPORT_SYMBOL(down_trylock);
*/
int down_timeout(struct semaphore *sem, long jiffies)
{
- unsigned long flags;
- int result = 0;
-
- spin_lock_irqsave(&sem->lock, flags);
- if (likely(sem->count > 0))
- sem->count--;
- else
- result = __down_timeout(sem, jiffies);
- spin_unlock_irqrestore(&sem->lock, flags);
+ if (likely(atomic_dec_return(&sem->count) >= 0))
+ return 0;
- return result;
+ return __down_timeout(sem, jiffies);
}
EXPORT_SYMBOL(down_timeout);
@@ -176,14 +158,10 @@ EXPORT_SYMBOL(down_timeout);
*/
void up(struct semaphore *sem)
{
- unsigned long flags;
+ if (likely(atomic_inc_return(&sem->count) > 0))
+ return;
- spin_lock_irqsave(&sem->lock, flags);
- if (likely(list_empty(&sem->wait_list)))
- sem->count++;
- else
- __up(sem);
- spin_unlock_irqrestore(&sem->lock, flags);
+ __up(sem);
}
EXPORT_SYMBOL(up);
@@ -205,6 +183,15 @@ static inline int __sched __down_common(struct
semaphore *sem, long state,
{
struct task_struct *task = current;
struct semaphore_waiter waiter;
+ unsigned long flags;
+
+ spin_lock_irqsave(&sem->lock, flags);
+ /*
+ * Someone may have incremented the count and failed to wake
+ * us before we acquired the spinlock.
+ */
+ if (atomic_dec_return(&sem->count) >= 0)
+ goto done;
list_add_tail(&waiter.list, &sem->wait_list);
waiter.task = task;
@@ -222,16 +209,22 @@ static inline int __sched __down_common(struct
semaphore *sem, long state,
timeout = schedule_timeout(timeout);
spin_lock_irq(&sem->lock);
if (waiter.up)
- return 0;
+ goto done;
}
timed_out:
list_del(&waiter.list);
+ spin_unlock_irqrestore(&sem->lock, flags);
return -ETIME;
interrupted:
list_del(&waiter.list);
+ spin_unlock_irqrestore(&sem->lock, flags);
return -EINTR;
+
+ done:
+ spin_unlock_irqrestore(&sem->lock, flags);
+ return 0;
}
static noinline void __sched __down(struct semaphore *sem)
@@ -256,9 +249,31 @@ static noinline int __sched __down_timeout(struct
semaphore *sem, long jiffies)
static noinline void __sched __up(struct semaphore *sem)
{
- struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
+ struct semaphore_waiter *waiter;
+ unsigned long flags;
+ int old, cmp;
+
+ spin_lock_irqsave(&sem->lock, flags);
+ /*
+ * If the wait list is empty, it means either we acquired the spinlock
+ * faster than a possible waiter or the possible waiter abandoned the
+ * wait because it got interrupted or timed out. In such case we have
+ * to increment the count to a value that is greater or equal than 1.
+ */
+ if (list_empty(&sem->wait_list)) {
+ old = atomic_read(&sem->count);
+ do {
+ cmp = old;
+ old = (old > 0) ? old + 1 : 1;
+ old = atomic_cmpxchg(&sem->count, cmp, old);
+ } while (cmp != old);
+
+ } else {
+ waiter = list_first_entry(&sem->wait_list,
struct semaphore_waiter, list);
- list_del(&waiter->list);
- waiter->up = 1;
- wake_up_process(waiter->task);
+ list_del(&waiter->list);
+ waiter->up = 1;
+ wake_up_process(waiter->task);
+ }
+ spin_unlock_irqrestore(&sem->lock, flags);
}
--
1.5.2.5
From 343d08a5d172d103e49c77e5580a45f02fab2b5e Mon Sep 17 00:00:00 2001
From: Bruno Santos <bsantos@xxxxxxxx>
Date: Tue, 8 Jul 2008 23:40:53 +0100
Subject: [PATCH] semaphore lockless fastpath
Signed-off-by: Bruno Santos <bsantos@xxxxxxxx>
---
include/linux/semaphore.h | 4 +-
kernel/semaphore.c | 131 +++++++++++++++++++++++++--------------------
2 files changed, 75 insertions(+), 60 deletions(-)
diff --git a/include/linux/semaphore.h b/include/linux/semaphore.h
index 9cae64b..d7850f0 100644
--- a/include/linux/semaphore.h
+++ b/include/linux/semaphore.h
@@ -14,15 +14,15 @@
/* Please don't access any members of this structure directly */
struct semaphore {
+ atomic_t count;
spinlock_t lock;
- unsigned int count;
struct list_head wait_list;
};
#define __SEMAPHORE_INITIALIZER(name, n) \
{ \
+ .count = ATOMIC_INIT(n), \
.lock = __SPIN_LOCK_UNLOCKED((name).lock), \
- .count = n, \
.wait_list = LIST_HEAD_INIT((name).wait_list), \
}
diff --git a/kernel/semaphore.c b/kernel/semaphore.c
index 5c2942e..2815980 100644
--- a/kernel/semaphore.c
+++ b/kernel/semaphore.c
@@ -8,6 +8,8 @@
* A counting semaphore may be acquired 'n' times before sleeping.
* See mutex.c for single-acquisition sleeping locks which enforce
* rules which allow code to be debugged more easily.
+ *
+ * Lockless fastpath by Bruno Santos <bsantos@xxxxxxxx>
*/
/*
@@ -22,7 +24,7 @@
* too.
*
* The ->count variable represents how many more tasks can acquire this
- * semaphore. If it's zero, there may be tasks waiting on the wait_list.
+ * semaphore. If it's negative, there may be tasks waiting on the wait_list.
*/
#include <linux/compiler.h>
@@ -51,14 +53,10 @@ static noinline void __up(struct semaphore *sem);
*/
void down(struct semaphore *sem)
{
- unsigned long flags;
+ if (likely(atomic_dec_return(&sem->count) >= 0))
+ return;
- spin_lock_irqsave(&sem->lock, flags);
- if (likely(sem->count > 0))
- sem->count--;
- else
- __down(sem);
- spin_unlock_irqrestore(&sem->lock, flags);
+ __down(sem);
}
EXPORT_SYMBOL(down);
@@ -73,17 +71,10 @@ EXPORT_SYMBOL(down);
*/
int down_interruptible(struct semaphore *sem)
{
- unsigned long flags;
- int result = 0;
+ if (likely(atomic_dec_return(&sem->count) >= 0))
+ return 0;
- spin_lock_irqsave(&sem->lock, flags);
- if (likely(sem->count > 0))
- sem->count--;
- else
- result = __down_interruptible(sem);
- spin_unlock_irqrestore(&sem->lock, flags);
-
- return result;
+ return __down_interruptible(sem);
}
EXPORT_SYMBOL(down_interruptible);
@@ -99,17 +90,10 @@ EXPORT_SYMBOL(down_interruptible);
*/
int down_killable(struct semaphore *sem)
{
- unsigned long flags;
- int result = 0;
+ if (likely(atomic_dec_return(&sem->count) >= 0))
+ return 0;
- spin_lock_irqsave(&sem->lock, flags);
- if (likely(sem->count > 0))
- sem->count--;
- else
- result = __down_killable(sem);
- spin_unlock_irqrestore(&sem->lock, flags);
-
- return result;
+ return __down_killable(sem);
}
EXPORT_SYMBOL(down_killable);
@@ -128,16 +112,21 @@ EXPORT_SYMBOL(down_killable);
*/
int down_trylock(struct semaphore *sem)
{
- unsigned long flags;
- int count;
+ int old, cmp;
- spin_lock_irqsave(&sem->lock, flags);
- count = sem->count - 1;
- if (likely(count >= 0))
- sem->count = count;
- spin_unlock_irqrestore(&sem->lock, flags);
+ /*
+ * The down_trylock fastpath is not very optimal compared to the
+ * down and up fastpath's, but it should be used less frequently.
+ */
+ old = atomic_read(&sem->count);
+ while (old > 0) {
+ cmp = old;
+ old = atomic_cmpxchg(&sem->count, cmp, old - 1);
+ if (old == cmp)
+ return 0;
+ }
- return (count < 0);
+ return 1;
}
EXPORT_SYMBOL(down_trylock);
@@ -153,17 +142,10 @@ EXPORT_SYMBOL(down_trylock);
*/
int down_timeout(struct semaphore *sem, long jiffies)
{
- unsigned long flags;
- int result = 0;
-
- spin_lock_irqsave(&sem->lock, flags);
- if (likely(sem->count > 0))
- sem->count--;
- else
- result = __down_timeout(sem, jiffies);
- spin_unlock_irqrestore(&sem->lock, flags);
+ if (likely(atomic_dec_return(&sem->count) >= 0))
+ return 0;
- return result;
+ return __down_timeout(sem, jiffies);
}
EXPORT_SYMBOL(down_timeout);
@@ -176,14 +158,10 @@ EXPORT_SYMBOL(down_timeout);
*/
void up(struct semaphore *sem)
{
- unsigned long flags;
+ if (likely(atomic_inc_return(&sem->count) > 0))
+ return;
- spin_lock_irqsave(&sem->lock, flags);
- if (likely(list_empty(&sem->wait_list)))
- sem->count++;
- else
- __up(sem);
- spin_unlock_irqrestore(&sem->lock, flags);
+ __up(sem);
}
EXPORT_SYMBOL(up);
@@ -205,6 +183,15 @@ static inline int __sched __down_common(struct semaphore *sem, long state,
{
struct task_struct *task = current;
struct semaphore_waiter waiter;
+ unsigned long flags;
+
+ spin_lock_irqsave(&sem->lock, flags);
+ /*
+ * Someone may have incremented the count and failed to wake
+ * us before we acquired the spinlock.
+ */
+ if (atomic_dec_return(&sem->count) >= 0)
+ goto done;
list_add_tail(&waiter.list, &sem->wait_list);
waiter.task = task;
@@ -222,16 +209,22 @@ static inline int __sched __down_common(struct semaphore *sem, long state,
timeout = schedule_timeout(timeout);
spin_lock_irq(&sem->lock);
if (waiter.up)
- return 0;
+ goto done;
}
timed_out:
list_del(&waiter.list);
+ spin_unlock_irqrestore(&sem->lock, flags);
return -ETIME;
interrupted:
list_del(&waiter.list);
+ spin_unlock_irqrestore(&sem->lock, flags);
return -EINTR;
+
+ done:
+ spin_unlock_irqrestore(&sem->lock, flags);
+ return 0;
}
static noinline void __sched __down(struct semaphore *sem)
@@ -256,9 +249,31 @@ static noinline int __sched __down_timeout(struct semaphore *sem, long jiffies)
static noinline void __sched __up(struct semaphore *sem)
{
- struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
+ struct semaphore_waiter *waiter;
+ unsigned long flags;
+ int old, cmp;
+
+ spin_lock_irqsave(&sem->lock, flags);
+ /*
+ * If the wait list is empty, it means either we acquired the spinlock
+ * faster than a possible waiter or the possible waiter abandoned the
+ * wait because it got interrupted or timed out. In such case we have
+ * to increment the count to a value that is greater or equal than 1.
+ */
+ if (list_empty(&sem->wait_list)) {
+ old = atomic_read(&sem->count);
+ do {
+ cmp = old;
+ old = (old > 0) ? old + 1 : 1;
+ old = atomic_cmpxchg(&sem->count, cmp, old);
+ } while (cmp != old);
+
+ } else {
+ waiter = list_first_entry(&sem->wait_list,
struct semaphore_waiter, list);
- list_del(&waiter->list);
- waiter->up = 1;
- wake_up_process(waiter->task);
+ list_del(&waiter->list);
+ waiter->up = 1;
+ wake_up_process(waiter->task);
+ }
+ spin_unlock_irqrestore(&sem->lock, flags);
}
--
1.5.2.5