[PATCH 1/2] libbpf: ringbuf: allow to consume up to a certain amount of items

From: Andrea Righi
Date: Mon Apr 01 2024 - 03:32:50 EST


In some cases, instead of always consuming all items from ring buffers
in a greedy way, we may want to consume up to a certain amount of items,
for example when we need to copy items from the BPF ring buffer to a
limited user buffer.

This change allows to set an upper limit to the amount of items consumed
from one or more ring buffers.

Link: https://lore.kernel.org/lkml/20240310154726.734289-1-andrea.righi@xxxxxxxxxxxxx/T
Signed-off-by: Andrea Righi <andrea.righi@xxxxxxxxxxxxx>
---
tools/lib/bpf/ringbuf.c | 29 +++++++++++++++++------------
1 file changed, 17 insertions(+), 12 deletions(-)

diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
index aacb64278a01..81df535040d1 100644
--- a/tools/lib/bpf/ringbuf.c
+++ b/tools/lib/bpf/ringbuf.c
@@ -231,7 +231,7 @@ static inline int roundup_len(__u32 len)
return (len + 7) / 8 * 8;
}

-static int64_t ringbuf_process_ring(struct ring *r)
+static int64_t ringbuf_process_ring(struct ring *r, int64_t max_items)
{
int *len_ptr, len, err;
/* 64-bit to avoid overflow in case of extreme application behavior */
@@ -264,7 +264,14 @@ static int64_t ringbuf_process_ring(struct ring *r)
cons_pos);
return err;
}
- cnt++;
+ if (++cnt >= max_items) {
+ /* update consumer pos and return the
+ * total amount of items consumed.
+ */
+ smp_store_release(r->consumer_pos,
+ cons_pos);
+ goto done;
+ }
}

smp_store_release(r->consumer_pos, cons_pos);
@@ -281,19 +288,18 @@ static int64_t ringbuf_process_ring(struct ring *r)
*/
int ring_buffer__consume(struct ring_buffer *rb)
{
- int64_t err, res = 0;
+ int64_t err, res = 0, max_items = INT_MAX;
int i;

for (i = 0; i < rb->ring_cnt; i++) {
struct ring *ring = rb->rings[i];

- err = ringbuf_process_ring(ring);
+ err = ringbuf_process_ring(ring, max_items);
if (err < 0)
return libbpf_err(err);
res += err;
+ max_items -= err;
}
- if (res > INT_MAX)
- return INT_MAX;
return res;
}

@@ -304,7 +310,7 @@ int ring_buffer__consume(struct ring_buffer *rb)
int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms)
{
int i, cnt;
- int64_t err, res = 0;
+ int64_t err, res = 0, max_items = INT_MAX;

cnt = epoll_wait(rb->epoll_fd, rb->events, rb->ring_cnt, timeout_ms);
if (cnt < 0)
@@ -314,13 +320,12 @@ int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms)
__u32 ring_id = rb->events[i].data.fd;
struct ring *ring = rb->rings[ring_id];

- err = ringbuf_process_ring(ring);
+ err = ringbuf_process_ring(ring, max_items);
if (err < 0)
return libbpf_err(err);
res += err;
+ max_items -= err;
}
- if (res > INT_MAX)
- return INT_MAX;
return res;
}

@@ -375,11 +380,11 @@ int ring__consume(struct ring *r)
{
int64_t res;

- res = ringbuf_process_ring(r);
+ res = ringbuf_process_ring(r, INT_MAX);
if (res < 0)
return libbpf_err(res);

- return res > INT_MAX ? INT_MAX : res;
+ return res;
}

static void user_ringbuf_unmap_ring(struct user_ring_buffer *rb)
--
2.43.0