Re: [PATCH 4/5] futex: Avoid taking hb lock if nothing to wakeup

From: Thomas Gleixner
Date: Sat Nov 23 2013 - 08:16:24 EST


On Fri, 22 Nov 2013, Davidlohr Bueso wrote:
> On Fri, 2013-11-22 at 17:25 -0800, Linus Torvalds wrote:
> > On Fri, Nov 22, 2013 at 4:56 PM, Davidlohr Bueso <davidlohr@xxxxxx> wrote:
> > > In futex_wake() there is clearly no point in taking the hb->lock if
> > > we know beforehand that there are no tasks to be woken. This comes
> > > at the smaller cost of doing some atomic operations to keep track of
> > > the list's size.
> >
> > Hmm. Why? Afaik, you only care about "empty or not". And if you don't
> > need the serialization from locking, then afaik you can just do a
> > "plist_head_empty()" without holding the lock.
>
> I remember this being the original approach, but after noticing some
> strange behavior we quickly decided it wasn't the path. And sure enough,
> I just double checked and tried the patch without atomic ops and can see
> things being off: one of the futextest performance cases is stuck
> blocked on a futex and I couldn't reboot the machine either -- nothing
> apparent in dmesg, just not 100% functional. The thing is, we can only
> avoid taking the lock only if nobody else is trying to add itself to the
> list.

Right. The reason is:

CPU 0 CPU 1

val = *futex;
futex_wait(futex, val);

spin_lock(&hb->lock);

uval = *futex;
*futex = newval;

if (uval != val) { futex_wake();
spin_unlock(&hb->lock); if (plist_empty(hb))
return; return;
}

plist_add(hb, self);
spin_unlock(&hb->lock);
schedule();
...

So the waiter on CPU0 gets queued after the user space value changed
and the wakee on CPU1 returns because plist is empty. Waiter therefor
waits forever.

So with the atomic ops you are changing that to:

CPU 0 CPU 1

val = *futex;
futex_wait(futex, val);

spin_lock(&hb->lock);

atomic_inc(&hb->waiters);
uval = *futex;
*futex = newval;

if (uval != val) { futex_wake();
spin_unlock(&hb->lock); if (!atomic_read(&hb->waiters))
return; return;
}
spin_lock((&hb->lock);
plist_add(hb, self);
spin_unlock(&hb->lock);
schedule(); wakeup_waiters(hb);
...

which restores the ordering guarantee, which the hash bucket lock
provided so far.

Now the question is why we queue the waiter _AFTER_ reading the user
space value. The comment in the code is pretty non sensical:

* On the other hand, we insert q and release the hash-bucket only
* after testing *uaddr. This guarantees that futex_wait() will NOT
* absorb a wakeup if *uaddr does not match the desired values
* while the syscall executes.

There is no reason why we cannot queue _BEFORE_ reading the user space
value. We just have to dequeue in all the error handling cases, but
for the fast path it does not matter at all.

CPU 0 CPU 1

val = *futex;
futex_wait(futex, val);

spin_lock(&hb->lock);

plist_add(hb, self);
smp_wmb();

uval = *futex;
*futex = newval;
futex_wake();

smp_rmb();
if (plist_empty(hb))
return;

spin_lock((&hb->lock);
if (uval != val) {
plist_del(self);
spin_unlock(&hb->lock);
return;
}

spin_unlock(&hb->lock);
schedule(); wakeup_waiters(hb);
...

Thanks,

tglx
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/