Re: [PATCH] epoll: ensure ep_poll() doesn't miss wakeup events
From: Jason Baron
Date: Mon May 04 2020 - 00:31:01 EST
On 5/3/20 6:24 AM, Roman Penyaev wrote:
> On 2020-05-02 00:09, Jason Baron wrote:
>> On 5/1/20 5:02 PM, Roman Penyaev wrote:
>>> Hi Jason,
>>>
>>> That is indeed a nice catch.
>>> Seems we need smp_rmb() pair between list_empty_careful(&rp->rdllist) and
>>> READ_ONCE(ep->ovflist) for ep_events_available(), do we?
>>>
>>
>> Hi Roman,
>>
>> Good point, even if we order those reads its still racy, since the
>> read of the ready list could come after its been cleared and the
>> read of the overflow could again come after its been cleared.
>
> You mean the second chunk? True. Sigh.
>
>> So I'm afraid we might need instead something like this to make
>> sure they are read together:
>
> No, impossible, I can't believe in that :) We can't give up.
>
> All we need is to keep a mark, that ep->rdllist is not empty,
> even we've just spliced it. ep_poll_callback() always takes
> the ->ovflist path, if ->ovflist is not EP_UNACTIVE_PTR, but
> ep_events_available() does not need to observe ->ovflist at
> all, just a ->rdllist.
>
> Take a look at that, do I miss something? :
>
> diff --git a/fs/eventpoll.c b/fs/eventpoll.c
> index aba03ee749f8..a8770f9a917e 100644
> --- a/fs/eventpoll.c
> +++ b/fs/eventpoll.c
> @@ -376,8 +376,7 @@ static void ep_nested_calls_init(struct nested_calls *ncalls)
> Â */
> Âstatic inline int ep_events_available(struct eventpoll *ep)
> Â{
> -ÂÂÂÂÂÂ return !list_empty_careful(&ep->rdllist) ||
> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂ READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR;
> +ÂÂÂÂÂÂ return !list_empty_careful(&ep->rdllist);
> Â}
>
> Â#ifdef CONFIG_NET_RX_BUSY_POLL
> @@ -683,7 +682,8 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
> Â{
> ÂÂÂÂÂÂÂ __poll_t res;
> ÂÂÂÂÂÂÂ struct epitem *epi, *nepi;
> -ÂÂÂÂÂÂ LIST_HEAD(txlist);
> +ÂÂÂÂÂÂ LIST_HEAD(rdllist);
> +ÂÂÂÂÂÂ LIST_HEAD(ovflist);
>
> ÂÂÂÂÂÂÂ lockdep_assert_irqs_enabled();
>
> @@ -704,14 +704,22 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
> ÂÂÂÂÂÂÂÂ * in a lockless way.
> ÂÂÂÂÂÂÂÂ */
> ÂÂÂÂÂÂÂ write_lock_irq(&ep->lock);
> -ÂÂÂÂÂÂ list_splice_init(&ep->rdllist, &txlist);
> +ÂÂÂÂÂÂ /*
> +ÂÂÂÂÂÂÂ * We do not call list_splice_init() because for lockless
> +ÂÂÂÂÂÂÂ * ep_events_available() ->rdllist is still "not empty".
> +ÂÂÂÂÂÂÂ * Otherwise the feature that there is something left in
> +ÂÂÂÂÂÂÂ * the list can be lost which causes missed wakeup.
> +ÂÂÂÂÂÂÂ */
> +ÂÂÂÂÂÂ list_splice(&ep->rdllist, &rdllist);
> +ÂÂÂÂÂÂ /*
> +ÂÂÂÂÂÂÂ * If ->rdllist was empty we should pretend it was not,
> +ÂÂÂÂÂÂÂ * because after the unlock ->ovflist comes into play,
> +ÂÂÂÂÂÂÂ * which is invisible for lockless ep_events_available().
> +ÂÂÂÂÂÂÂ */
> +ÂÂÂÂÂÂ ep->rdllist.next = LIST_POISON1;
> ÂÂÂÂÂÂÂ WRITE_ONCE(ep->ovflist, NULL);
> ÂÂÂÂÂÂÂ write_unlock_irq(&ep->lock);
>
> ÂÂÂÂÂÂÂ /*
> ÂÂÂÂÂÂÂÂ * Now call the callback function.
> ÂÂÂÂÂÂÂÂ */
> -ÂÂÂÂÂÂ res = (*sproc)(ep, &txlist, priv);
> +ÂÂÂÂÂÂ res = (*sproc)(ep, &rdllist, priv);
>
> ÂÂÂÂÂÂÂ write_lock_irq(&ep->lock);
> ÂÂÂÂÂÂÂ /*
> @@ -724,7 +732,7 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ /*
> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * We need to check if the item is already in the list.
> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * During the "sproc" callback execution time, items are
> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * queued into ->ovflist but the "txlist" might already
> +ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * queued into ->ovflist but the "rdllist" might already
> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * contain them, and the list_splice() below takes care of them.
> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ */
> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ if (!ep_is_linked(epi)) {
> @@ -732,7 +740,7 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * ->ovflist is LIFO, so we have to reverse it in order
> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * to keep in FIFO.
> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ */
> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ list_add(&epi->rdllink, &ep->rdllist);
> +ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ list_add(&epi->rdllink, &ovflist);
> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ ep_pm_stay_awake(epi);
> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ }
> ÂÂÂÂÂÂÂ }
> @@ -743,10 +751,11 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
> ÂÂÂÂÂÂÂÂ */
> ÂÂÂÂÂÂÂ WRITE_ONCE(ep->ovflist, EP_UNACTIVE_PTR);
>
> -ÂÂÂÂÂÂ /*
> -ÂÂÂÂÂÂÂ * Quickly re-inject items left on "txlist".
> -ÂÂÂÂÂÂÂ */
> -ÂÂÂÂÂÂ list_splice(&txlist, &ep->rdllist);
> +ÂÂÂÂÂÂ /* Events from ->ovflist happened later, thus splice to the tail */
> +ÂÂÂÂÂÂ list_splice_tail(&ovflist, &rdllist);
> +ÂÂÂÂÂÂ /* Just replace list */
> +ÂÂÂÂÂÂ list_replace(&rdllist, &ep->rdllist);
> +
> ÂÂÂÂÂÂÂ __pm_relax(ep->ws);
> ÂÂÂÂÂÂÂ write_unlock_irq(&ep->lock);
>
> @@ -1763,13 +1772,13 @@ static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head
> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * Trigger mode, we need to insert back inside
> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * the ready list, so that the next call to
> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * epoll_wait() will check again the events
> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * availability. At this point, no one can insert
> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * into ep->rdllist besides us. The epoll_ctl()
> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * callers are locked out by
> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * ep_scan_ready_list() holding "mtx" and the
> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * poll callback will queue them in ep->ovflist.
> +ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * availability. What we do here is simply
> +ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * return the epi to the same position where
> +ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * it was, the ep_scan_ready_list() will
> +ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * re-inject the leftovers to the ->rdllist
> +ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * under the proper lock.
> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ */
> -ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ list_add_tail(&epi->rdllink, &ep->rdllist);
> +ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ list_add_tail(&epi->rdllink, &tmp->rdllink);
> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ ep_pm_stay_awake(epi);
> ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ }
> ÂÂÂÂÂÂÂ }
>
>
> --
> Roman
>
Hi Roman,
I think this misses an important case - the initial ep_poll_callback()
may queue to the overflow list. In this case ep_poll has no visibility
into the event since its only checking ep->rdllist.
Here's a different approach using seqcount that avoids expanding ep->lock
region.
Thanks,
-Jason
--- a/fs/eventpoll.c
+++ b/fs/eventpoll.c
@@ -221,6 +221,8 @@ struct eventpoll {
struct list_head visited_list_link;
int visited;
+ seqcount_t seq;
+
#ifdef CONFIG_NET_RX_BUSY_POLL
/* used to track busy poll napi_id */
unsigned int napi_id;
@@ -704,8 +706,10 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
* in a lockless way.
*/
write_lock_irq(&ep->lock);
+ write_seqcount_begin(&ep->seq);
list_splice_init(&ep->rdllist, &txlist);
WRITE_ONCE(ep->ovflist, NULL);
+ write_seqcount_end(&ep->seq);
write_unlock_irq(&ep->lock);
/*
@@ -741,12 +745,14 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep,
* releasing the lock, events will be queued in the normal way inside
* ep->rdllist.
*/
+ write_seqcount_begin(&ep->seq);
WRITE_ONCE(ep->ovflist, EP_UNACTIVE_PTR);
/*
* Quickly re-inject items left on "txlist".
*/
list_splice(&txlist, &ep->rdllist);
+ write_seqcount_end(&ep->seq);
__pm_relax(ep->ws);
write_unlock_irq(&ep->lock);
@@ -1025,6 +1031,7 @@ static int ep_alloc(struct eventpoll **pep)
ep->rbr = RB_ROOT_CACHED;
ep->ovflist = EP_UNACTIVE_PTR;
ep->user = user;
+ seqcount_init(&ep->seq);
*pep = ep;
@@ -1824,6 +1831,7 @@ static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
u64 slack = 0;
wait_queue_entry_t wait;
ktime_t expires, *to = NULL;
+ unsigned int seq;
lockdep_assert_irqs_enabled();
@@ -1900,7 +1908,10 @@ static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
break;
}
- eavail = ep_events_available(ep);
+ do {
+ seq = read_seqcount_begin(&ep->seq);
+ eavail = ep_events_available(ep);
+ } while (read_seqcount_retry(&ep->seq, seq));
if (eavail)
break;
if (signal_pending(current)) {