Re: [RFC PATCH 03/21] list: Annotate lockless list primitives with data_race()
From: Jann Horn
Date: Tue Mar 24 2020 - 14:23:22 EST
On Tue, Mar 24, 2020 at 5:59 PM Greg KH <greg@xxxxxxxxx> wrote:
> On Tue, Mar 24, 2020 at 05:38:30PM +0100, Jann Horn wrote:
> > On Tue, Mar 24, 2020 at 5:26 PM Greg KH <greg@xxxxxxxxx> wrote:
> > > On Tue, Mar 24, 2020 at 05:20:45PM +0100, Jann Horn wrote:
> > > > On Tue, Mar 24, 2020 at 4:37 PM Will Deacon <will@xxxxxxxxxx> wrote:
> > > > > Some list predicates can be used locklessly even with the non-RCU list
> > > > > implementations, since they effectively boil down to a test against
> > > > > NULL. For example, checking whether or not a list is empty is safe even
> > > > > in the presence of a concurrent, tearing write to the list head pointer.
> > > > > Similarly, checking whether or not an hlist node has been hashed is safe
> > > > > as well.
> > > > >
> > > > > Annotate these lockless list predicates with data_race() and READ_ONCE()
> > > > > so that KCSAN and the compiler are aware of what's going on. The writer
> > > > > side can then avoid having to use WRITE_ONCE() in the non-RCU
> > > > > implementation.
> > > > [...]
> > > > > static inline int list_empty(const struct list_head *head)
> > > > > {
> > > > > - return READ_ONCE(head->next) == head;
> > > > > + return data_race(READ_ONCE(head->next) == head);
> > > > > }
> > > > [...]
> > > > > static inline int hlist_unhashed(const struct hlist_node *h)
> > > > > {
> > > > > - return !READ_ONCE(h->pprev);
> > > > > + return data_race(!READ_ONCE(h->pprev));
> > > > > }
> > > >
> > > > This is probably valid in practice for hlist_unhashed(), which
> > > > compares with NULL, as long as the most significant byte of all kernel
> > > > pointers is non-zero; but I think list_empty() could realistically
> > > > return false positives in the presence of a concurrent tearing store?
> > > > This could break the following code pattern:
> > > >
> > > > /* optimistic lockless check */
> > > > if (!list_empty(&some_list)) {
> > > > /* slowpath */
> > > > mutex_lock(&some_mutex);
> > > > list_for_each(tmp, &some_list) {
> > > > ...
> > > > }
> > > > mutex_unlock(&some_mutex);
> > > > }
> > > >
> > > > (I'm not sure whether patterns like this appear commonly though.)
> > >
> > >
> > > I would hope not as the list could go "empty" before the lock is
> > > grabbed. That pattern would be wrong.
> >
> > If the list becomes empty in between, the loop just iterates over
> > nothing, and the effect is no different from what you'd get if you had
> > bailed out before. But sure, you have to be aware that that can
> > happen.
>
> Doh, yeah, so it is safe, crazy, but safe :)
Here's an example of that pattern, I think (which I think is
technically incorrect if what peterz said is accurate?):
/**
* waitqueue_active -- locklessly test for waiters on the queue
* @wq_head: the waitqueue to test for waiters
*
* returns true if the wait list is not empty
*
* NOTE: this function is lockless and requires care, incorrect usage _will_
* lead to sporadic and non-obvious failure.
*
* Use either while holding wait_queue_head::lock or when used for wakeups
* with an extra smp_mb() like::
*
* CPU0 - waker CPU1 - waiter
*
* for (;;) {
* @cond = true; prepare_to_wait(&wq_head,
&wait, state);
* smp_mb(); // smp_mb() from set_current_state()
* if (waitqueue_active(wq_head)) if (@cond)
* wake_up(wq_head); break;
* schedule();
* }
* finish_wait(&wq_head, &wait);
*
* Because without the explicit smp_mb() it's possible for the
* waitqueue_active() load to get hoisted over the @cond store such that we'll
* observe an empty wait list while the waiter might not observe @cond.
*
* Also note that this 'optimization' trades a spin_lock() for an smp_mb(),
* which (when the lock is uncontended) are of roughly equal cost.
*/
static inline int waitqueue_active(struct wait_queue_head *wq_head)
{
return !list_empty(&wq_head->head);
}
void signalfd_cleanup(struct sighand_struct *sighand)
{
wait_queue_head_t *wqh = &sighand->signalfd_wqh;
/*
* The lockless check can race with remove_wait_queue() in progress,
* but in this case its caller should run under rcu_read_lock() and
* sighand_cachep is SLAB_TYPESAFE_BY_RCU, we can safely return.
*/
if (likely(!waitqueue_active(wqh)))
return;
/* wait_queue_entry_t->func(POLLFREE) should do remove_wait_queue() */
wake_up_poll(wqh, EPOLLHUP | POLLFREE);
}
and __add_wait_queue() just uses plain list_add(&wq_entry->entry,
&wq_head->head) under a lock.