Re: [PATCH v5 3/4] bpf: Add libbpf logic for user-space ring buffer
From: Andrii Nakryiko
Date: Fri Sep 09 2022 - 19:59:57 EST
On Fri, Sep 2, 2022 at 4:43 PM David Vernet <void@xxxxxxxxxxxxx> wrote:
>
> Now that all of the logic is in place in the kernel to support user-space
> produced ring buffers, we can add the user-space logic to libbpf. This
> patch therefore adds the following public symbols to libbpf:
>
> struct user_ring_buffer *
> user_ring_buffer__new(int map_fd,
> const struct user_ring_buffer_opts *opts);
> void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size);
> void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb,
> __u32 size, int timeout_ms);
> void user_ring_buffer__submit(struct user_ring_buffer *rb, void *sample);
> void user_ring_buffer__discard(struct user_ring_buffer *rb,
> void user_ring_buffer__free(struct user_ring_buffer *rb);
>
> A user-space producer must first create a struct user_ring_buffer * object
> with user_ring_buffer__new(), and can then reserve samples in the
> ring buffer using one of the following two symbols:
>
> void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size);
> void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb,
> __u32 size, int timeout_ms);
>
> With user_ring_buffer__reserve(), a pointer to a 'size' region of the ring
> buffer will be returned if sufficient space is available in the buffer.
> user_ring_buffer__reserve_blocking() provides similar semantics, but will
> block for up to 'timeout_ms' in epoll_wait if there is insufficient space
> in the buffer. This function has the guarantee from the kernel that it will
> receive at least one event-notification per invocation to
> bpf_ringbuf_drain(), provided that at least one sample is drained, and the
> BPF program did not pass the BPF_RB_NO_WAKEUP flag to bpf_ringbuf_drain().
>
> Once a sample is reserved, it must either be committed to the ring buffer
> with user_ring_buffer__submit(), or discarded with
> user_ring_buffer__discard().
>
> Signed-off-by: David Vernet <void@xxxxxxxxxxxxx>
> ---
> tools/lib/bpf/libbpf.c | 10 +-
> tools/lib/bpf/libbpf.h | 105 +++++++++++++
> tools/lib/bpf/libbpf.map | 10 ++
> tools/lib/bpf/libbpf_probes.c | 1 +
> tools/lib/bpf/libbpf_version.h | 2 +-
> tools/lib/bpf/ringbuf.c | 270 +++++++++++++++++++++++++++++++++
> 6 files changed, 395 insertions(+), 3 deletions(-)
>
Thanks for adding nice doc comments! See below for few minor nits, but
ns_elapsed issue I think is pretty bad and should be fixed.
> @@ -1011,6 +1011,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook,
>
> /* Ring buffer APIs */
> struct ring_buffer;
> +struct user_ring_buffer;
>
> typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size);
>
> @@ -1030,6 +1031,110 @@ LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);
> LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb);
> LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb);
>
> +struct user_ring_buffer_opts {
> + size_t sz; /* size of this struct, for forward/backward compatibility */
> +};
> +
> +#define user_ring_buffer_opts__last_field sz
> +
> +/* @brief **user_ring_buffer__new()** creates a new instance of a user ring
> + * buffer.
> + *
> + * @param map_fd A file descriptor to a BPF_MAP_TYPE_RINGBUF map.
typo: USER_RINGBUF
> + * @param opts Options for how the ring buffer should be created.
> + * @return A user ring buffer on success; NULL and errno being set on a
> + * failure.
> + */
> +LIBBPF_API struct user_ring_buffer *
> +user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts);
> +
> +/* @brief **user_ring_buffer__reserve()** reserves a pointer to a sample in the
> + * user ring buffer.
> + * @param rb A pointer to a user ring buffer.
> + * @param size The size of the sample, in bytes.
> + * @return A pointer to a reserved region of the user ring buffer; NULL, and
> + * errno being set if a sample could not be reserved.
> + *
> + * This function is *not* thread safe, and callers must synchronize accessing
> + * this function if there are multiple producers. If a size is requested that
> + * is larger than the size of the entire ring buffer, errno will be set to
> + * E2BIG and NULL is returned. If the ring buffer could accommodate the size,
> + * but currently does not have enough space, errno is set to ENOSPC and NULL is
> + * returned.
we might want to mention that returned pointer is 8-byte aligned
> + *
> + * After initializing the sample, callers must invoke
> + * **user_ring_buffer__submit()** to post the sample to the kernel. Otherwise,
> + * the sample must be freed with **user_ring_buffer__discard()**.
> + */
> +LIBBPF_API void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size);
> +
[...]
> + err = user_ringbuf_map(rb, map_fd);
> + if (err)
> + goto err_out;
> +
> + return rb;
> +
> +err_out:
> + user_ring_buffer__free(rb);
> + return errno = -err, NULL;
> +}
> +
> +static void user_ringbuf__commit(struct user_ring_buffer *rb, void *sample, bool discard)
small nit if you are going to resubmit: we stopped using double
underscore naming for internal static functions, so this should be
called user_ringbuf_commit
> +{
> + __u32 new_len;
> + struct ringbuf_hdr *hdr;
> + uintptr_t hdr_offset;
> +
[...]
> +void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms)
> +{
> + int err;
> + struct timespec start;
> + __u64 ns_per_ms = 1000000, ns_elapsed = 0, timeout_ns;
> +
> + if (timeout_ms < 0 && timeout_ms != -1)
> + return errno = EINVAL, NULL;
> +
> + if (timeout_ms != -1) {
> + err = clock_gettime(CLOCK_MONOTONIC, &start);
> + if (err)
> + return NULL;
> + }
> +
> + timeout_ns = timeout_ms * ns_per_ms;
> + do {
> + __u64 ns_remaining = timeout_ns - ns_elapsed;
> + int cnt, ms_remaining;
> + void *sample;
> + struct timespec curr;
> +
> + sample = user_ring_buffer__reserve(rb, size);
> + if (sample)
> + return sample;
> + else if (errno != ENOSPC)
> + return NULL;
> +
> + ms_remaining = timeout_ms == -1 ? -1 : ns_remaining / ns_per_ms;
ok, so you've special-cased timeout_ms == -1 but still didn't do
max(0, ns_remaining). Can you prove that ns_elapsed will never be
bigger than timeout_ns due to various delays in waking up this thread?
If not, let's please have max(0) otherwise we can accidentally
epoll_wait(-1).
> + /* The kernel guarantees at least one event notification
> + * delivery whenever at least one sample is drained from the
> + * ring buffer in an invocation to bpf_ringbuf_drain(). Other
> + * additional events may be delivered at any time, but only one
> + * event is guaranteed per bpf_ringbuf_drain() invocation,
> + * provided that a sample is drained, and the BPF program did
> + * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain().
> + */
> + cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining);
> + if (cnt < 0)
> + return NULL;
> +
> + if (timeout_ms == -1)
> + continue;
> +
> + err = clock_gettime(CLOCK_MONOTONIC, &curr);
> + if (err)
> + return NULL;
> +
> + ns_elapsed = ns_elapsed_timespec(&start, &curr);
nit: if you move re-calculation of ms_remaining and ns_remaining to
here, I think overall loop logic will be even more straightforwad. You
can initialize ms_remaining to -1 if timeout_ms < 0 and never
recalculate it, right? Note that you can also do ns_elapsed conversion
to ms right here and then keep everything else in ms (so no need for
timeout_ns, ns_remaining, etc).
> + } while (ns_elapsed <= timeout_ns);
> +
> + errno = ENOSPC;
> + return NULL;
> +}
> --
> 2.37.1
>