On Thu, 2013-12-19 at 15:14 -0800, Linus Torvalds wrote:On Thu, Dec 19, 2013 at 10:45 AM, Davidlohr Bueso<davidlohr@xxxxxx> wrote:So the following has passed all testing, just like the atomics variant.- increment the counter at queue_lock() as we always end up callingI still hate this whole separate counter thing. It seems really annoying.
queue_me() which adds the element to the list. Upon any error,
queue_unlock() is called for housekeeping, for which we decrement
to mach the increment done in queue_lock().
- decrement the counter at __unqueue_me() to reflect when an element is
removed from the queue for wakeup related purposes.
If re-ordering things didn't work out, then why can't just the counter
we *already* have in the spinlock itself work as the counter? Your
counter update logic seems to basically match when you take the
spinlock anyway.
Thoughts?
Thanks,
Davidlohr
diff --git a/kernel/futex.c b/kernel/futex.c
index fcc6850..c8c7ce5 100644
--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -73,19 +73,22 @@
* Basic futex operation and ordering guarantees:
*
* The waiter reads the futex value in user space and calls
- * futex_wait(). This function computes the hash bucket and acquires
- * the hash bucket lock. After that it reads the futex user space value
- * again and verifies that the data has not changed. If it has not
- * changed it enqueues itself into the hash bucket, releases the hash
+ * futex_wait(). It computes the hash bucket and acquires the hash
+ * bucket lock. After that it reads the futex user space value again
+ * and verifies that the data has not changed. If it has not changed
+ * it enqueues itself into the hash bucket, releases the hash
* bucket lock and schedules.
*
* The waker side modifies the user space value of the futex and calls
- * futex_wake(). This functions computes the hash bucket and acquires
- * the hash bucket lock. Then it looks for waiters on that futex in the
- * hash bucket and wakes them.
+ * futex_wake(). It computes the hash bucket and acquires the hash
+ * bucket lock. Then it looks for waiters on that futex in the hash
+ * bucket and wakes them.
*
- * Note that the spin_lock serializes waiters and wakers, so that the
- * following scenario is avoided:
+ * In scenarios where wakeups are called and no tasks are blocked on a futex,
+ * taking the hb spinlock can be avoided and simply return. In order for this
+ * optimization to work, ordering guarantees must exist so that the waiter
+ * being added to the list is acknowledged when the list is concurrently being
+ * checked by the waker, avoiding scenarios like the following:
*
* CPU 0 CPU 1
* val = *futex;
@@ -106,24 +109,50 @@
* This would cause the waiter on CPU 0 to wait forever because it
* missed the transition of the user space value from val to newval
* and the waker did not find the waiter in the hash bucket queue.
- * The spinlock serializes that:
+ *
+ * The correct serialization ensures that a waiter either observes
+ * the changed user space value before blocking or is woken by a
+ * concurrent waker:
*
* CPU 0 CPU 1
* val = *futex;
* sys_futex(WAIT, futex, val);
* futex_wait(futex, val);
- * lock(hash_bucket(futex));
- * uval = *futex;
- * *futex = newval;
- * sys_futex(WAKE, futex);
- * futex_wake(futex);
- * lock(hash_bucket(futex));
+ *
+ * waiters++;
+ * mb(); (A)<-- paired with -.
+ * |
+ * lock(hash_bucket(futex)); |
+ * |
+ * uval = *futex; |
+ * | *futex = newval;
+ * | sys_futex(WAKE, futex);
+ * | futex_wake(futex);
+ * |
+ * `-------> mb(); (B)
* if (uval == val)
- * queue();
+ * queue();
* unlock(hash_bucket(futex));
- * schedule(); if (!queue_empty())
- * wake_waiters(futex);
- * unlock(hash_bucket(futex));
+ * schedule(); if (waiters)
+ * lock(hash_bucket(futex));
+ * wake_waiters(futex);
+ * unlock(hash_bucket(futex));
+ *
+ * Where (A) orders the waiters increment and the futex value read -- this
+ * is guaranteed by the head counter in the hb spinlock; and where (B)
+ * orders the write to futex and the waiters read.
+ *
+ * This yields the following case (where X:=waiters, Y:=futex):
+ *
+ * X = Y = 0
+ *
+ * w[X]=1 w[Y]=1
+ * MB MB
+ * r[Y]=y r[X]=x
+ *
+ * Which guarantees that x==0&& y==0 is impossible; which translates back into
+ * the guarantee that we cannot both miss the futex variable change and the
+ * enqueue.
*/
int __read_mostly futex_cmpxchg_enabled;
@@ -211,6 +240,35 @@ static unsigned long __read_mostly futex_hashsize;
static struct futex_hash_bucket *futex_queues;
+static inline void futex_get_mm(union futex_key *key)
+{
+ atomic_inc(&key->private.mm->mm_count);
+#ifdef CONFIG_SMP
+ /*
+ * Ensure futex_get_mm() implies a full barrier such that
+ * get_futex_key() implies a full barrier. This is relied upon
+ * as full barrier (B), see the ordering comment above.
+ */
+ smp_mb__after_atomic_inc();
+#endif
+}
+
+static inline bool hb_waiters_pending(struct futex_hash_bucket *hb)
+{
+#ifdef CONFIG_SMP
+ /*
+ * If the hash bucket is locked then we know the ticket counter
+ * is non-zero and thus there is at least one waiter in the queue.
+ */
+ if (spin_is_locked(&hb->lock))
+ return true;
+ smp_rmb(); /* Make sure we check the lock state first */
+ return !plist_head_empty(&hb->chain);
+#else
+ return true;
+#endif
+}