Re: [PATCH bpf v2 8/8] libbpf: ringbuf: Prevent missed wakeups
From: Andrii Nakryiko
Date: Tue Jun 23 2026 - 16:28:56 EST
On Thu, Jun 18, 2026 at 5:27 PM Tamir Duberstein <tamird@xxxxxxxxxx> wrote:
>
> After consuming the last visible record, ringbuf_process_ring()
> publishes the consumer position and checks the producer position. These
> operations lack a full StoreLoad barrier. A producer can therefore
> commit a new record but read the old consumer position while the
> consumer reads the old producer position. The producer sends no
> notification and the consumer waits despite a queued record.
>
> Insert a full barrier between publishing a consumer position and the
> next producer position load. When a record bound or callback ends the
> current invocation first, execute the barrier before returning so the
> load in a later invocation completes the same handshake.
>
> Add an edge-triggered epoll test that drains one record per call while a
> concurrent producer fills the ring. Without the barrier, a missed
> notification leaves the producer dropping records from a full ring while
> the consumer times out. Document that bounded consumers and callbacks
> that terminate consumption must drain before waiting again.
>
> Fixes: bf99c936f947 ("libbpf: Add BPF ring buffer support")
> Reported-by: Andrew Werner <awerner32@xxxxxxxxx>
> Reported-by: Sashiko <sashiko-bot@xxxxxxxxxx>
> Closes: https://lore.kernel.org/bpf/20260614015716.945AF1F000E9@xxxxxxxxxxxxxxx/
> Assisted-by: Codex:gpt-5.5
> Signed-off-by: Tamir Duberstein <tamird@xxxxxxxxxx>
> ---
> tools/lib/bpf/libbpf.h | 23 +++++++
> tools/lib/bpf/ringbuf.c | 24 +++++--
> tools/testing/selftests/bpf/prog_tests/ringbuf.c | 83 ++++++++++++++++++++++++
> 3 files changed, 123 insertions(+), 7 deletions(-)
>
> diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h
> index ae46b17feaa6..3a649ed87034 100644
> --- a/tools/lib/bpf/libbpf.h
> +++ b/tools/lib/bpf/libbpf.h
> @@ -1440,6 +1440,11 @@ struct ring;
> struct user_ring_buffer;
>
> /* Callback-based consumption is unsupported for BPF_F_RB_OVERWRITE maps. */
> +/*
> + * A negative return stops consumption; non-negative values continue. Stopping
> + * can leave records queued without a new readiness notification. Before
> + * waiting for readiness again, consume until no records remain.
> + */
> typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size);
>
> struct ring_buffer_opts {
> @@ -1456,6 +1461,20 @@ LIBBPF_API int ring_buffer__add(struct ring_buffer *rb, int map_fd,
> ring_buffer_sample_fn sample_cb, void *ctx);
> LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);
> LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb);
> +
> +/**
> + * @brief **ring_buffer__consume_n()** consumes up to a requested number of
> + * records from a ring buffer manager without event polling.
> + *
> + * @param rb A ring buffer manager object.
> + * @param n Maximum number of records to consume.
> + * @return The number of records consumed, or a negative error code on failure.
> + *
> + * Reaching the requested bound does not establish that every ring is empty.
> + * Records can remain queued without a new readiness notification. Before
> + * calling ring_buffer__poll() or waiting on ring_buffer__epoll_fd(), call
> + * ring_buffer__consume() until it returns 0.
> + */
> LIBBPF_API int ring_buffer__consume_n(struct ring_buffer *rb, size_t n);
> LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb);
>
> @@ -1538,6 +1557,10 @@ LIBBPF_API int ring__consume(struct ring *r);
> * @param r A ringbuffer object.
> * @param n Maximum number of records to consume.
> * @return The number of records consumed, or a negative error code on failure.
> + *
> + * Reaching the requested bound does not establish that the ring is empty.
> + * Records can remain queued without a new readiness notification. Before
> + * waiting on ring__map_fd(), call ring__consume() until it returns 0.
> */
> LIBBPF_API int ring__consume_n(struct ring *r, size_t n);
>
> diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
> index 141f2cbe56eb..0598f6c2f7da 100644
> --- a/tools/lib/bpf/ringbuf.c
> +++ b/tools/lib/bpf/ringbuf.c
> @@ -271,7 +271,7 @@ static int64_t ringbuf_process_ring(struct ring *r, size_t n)
> return 0;
>
> cons_pos = __atomic_load_n(r->consumer_pos, __ATOMIC_ACQUIRE);
> - do {
> + for (;;) {
> got_new_data = false;
> prod_pos = __atomic_load_n(r->producer_pos, __ATOMIC_ACQUIRE);
> /* Positions wrap; the consumer cannot logically pass the producer. */
> @@ -279,9 +279,9 @@ static int64_t ringbuf_process_ring(struct ring *r, size_t n)
> len_ptr = r->data + (cons_pos & r->mask);
> len = __atomic_load_n(len_ptr, __ATOMIC_ACQUIRE);
>
> - /* sample not committed yet, bail out for now */
> + /* Retry a busy record once after publishing prior records. */
> if (len & BPF_RINGBUF_BUSY_BIT)
> - goto done;
> + break;
so we do this one time retry just so that we can reuse the same memory
barrier, is that right? and similarly all those error-returning
conditions with cnt = err. Wouldn't it be cleaner and easier to
mentally follow to have `goto done` which would just duplicate memory
barrier and exit with whatever value err is?
>
> got_new_data = true;
> cons_pos += roundup_len(len);
[...]