Re: [PATCH v5 3/4] bpf: Add libbpf logic for user-space ring buffer
From: David Vernet
Date: Mon Sep 19 2022 - 16:22:17 EST
On Fri, Sep 09, 2022 at 04:59:27PM -0700, Andrii Nakryiko wrote:
> > @@ -1011,6 +1011,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook,
> >
> > /* Ring buffer APIs */
> > struct ring_buffer;
> > +struct user_ring_buffer;
> >
> > typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size);
> >
> > @@ -1030,6 +1031,110 @@ LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);
> > LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb);
> > LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb);
> >
> > +struct user_ring_buffer_opts {
> > + size_t sz; /* size of this struct, for forward/backward compatibility */
> > +};
> > +
> > +#define user_ring_buffer_opts__last_field sz
> > +
> > +/* @brief **user_ring_buffer__new()** creates a new instance of a user ring
> > + * buffer.
> > + *
> > + * @param map_fd A file descriptor to a BPF_MAP_TYPE_RINGBUF map.
>
> typo: USER_RINGBUF
Good catch.
> > + * @param opts Options for how the ring buffer should be created.
> > + * @return A user ring buffer on success; NULL and errno being set on a
> > + * failure.
> > + */
> > +LIBBPF_API struct user_ring_buffer *
> > +user_ring_buffer__new(int map_fd, const struct user_ring_buffer_opts *opts);
> > +
> > +/* @brief **user_ring_buffer__reserve()** reserves a pointer to a sample in the
> > + * user ring buffer.
> > + * @param rb A pointer to a user ring buffer.
> > + * @param size The size of the sample, in bytes.
> > + * @return A pointer to a reserved region of the user ring buffer; NULL, and
> > + * errno being set if a sample could not be reserved.
> > + *
> > + * This function is *not* thread safe, and callers must synchronize accessing
> > + * this function if there are multiple producers. If a size is requested that
> > + * is larger than the size of the entire ring buffer, errno will be set to
> > + * E2BIG and NULL is returned. If the ring buffer could accommodate the size,
> > + * but currently does not have enough space, errno is set to ENOSPC and NULL is
> > + * returned.
>
> we might want to mention that returned pointer is 8-byte aligned
Will do.
> > + *
> > + * After initializing the sample, callers must invoke
> > + * **user_ring_buffer__submit()** to post the sample to the kernel. Otherwise,
> > + * the sample must be freed with **user_ring_buffer__discard()**.
> > + */
> > +LIBBPF_API void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size);
> > +
>
> [...]
>
> > + err = user_ringbuf_map(rb, map_fd);
> > + if (err)
> > + goto err_out;
> > +
> > + return rb;
> > +
> > +err_out:
> > + user_ring_buffer__free(rb);
> > + return errno = -err, NULL;
> > +}
> > +
> > +static void user_ringbuf__commit(struct user_ring_buffer *rb, void *sample, bool discard)
>
> small nit if you are going to resubmit: we stopped using double
> underscore naming for internal static functions, so this should be
> called user_ringbuf_commit
Will do.
> > +{
> > + __u32 new_len;
> > + struct ringbuf_hdr *hdr;
> > + uintptr_t hdr_offset;
> > +
>
> [...]
>
> > +void *user_ring_buffer__reserve_blocking(struct user_ring_buffer *rb, __u32 size, int timeout_ms)
> > +{
> > + int err;
> > + struct timespec start;
> > + __u64 ns_per_ms = 1000000, ns_elapsed = 0, timeout_ns;
> > +
> > + if (timeout_ms < 0 && timeout_ms != -1)
> > + return errno = EINVAL, NULL;
> > +
> > + if (timeout_ms != -1) {
> > + err = clock_gettime(CLOCK_MONOTONIC, &start);
> > + if (err)
> > + return NULL;
> > + }
> > +
> > + timeout_ns = timeout_ms * ns_per_ms;
> > + do {
> > + __u64 ns_remaining = timeout_ns - ns_elapsed;
> > + int cnt, ms_remaining;
> > + void *sample;
> > + struct timespec curr;
> > +
> > + sample = user_ring_buffer__reserve(rb, size);
> > + if (sample)
> > + return sample;
> > + else if (errno != ENOSPC)
> > + return NULL;
> > +
> > + ms_remaining = timeout_ms == -1 ? -1 : ns_remaining / ns_per_ms;
>
> ok, so you've special-cased timeout_ms == -1 but still didn't do
> max(0, ns_remaining). Can you prove that ns_elapsed will never be
> bigger than timeout_ns due to various delays in waking up this thread?
> If not, let's please have max(0) otherwise we can accidentally
> epoll_wait(-1).
Yes you're right, this was an oversight. Thanks for catching this!
> > + /* The kernel guarantees at least one event notification
> > + * delivery whenever at least one sample is drained from the
> > + * ring buffer in an invocation to bpf_ringbuf_drain(). Other
> > + * additional events may be delivered at any time, but only one
> > + * event is guaranteed per bpf_ringbuf_drain() invocation,
> > + * provided that a sample is drained, and the BPF program did
> > + * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain().
> > + */
> > + cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining);
> > + if (cnt < 0)
> > + return NULL;
> > +
> > + if (timeout_ms == -1)
> > + continue;
> > +
> > + err = clock_gettime(CLOCK_MONOTONIC, &curr);
> > + if (err)
> > + return NULL;
> > +
> > + ns_elapsed = ns_elapsed_timespec(&start, &curr);
>
> nit: if you move re-calculation of ms_remaining and ns_remaining to
> here, I think overall loop logic will be even more straightforwad. You
> can initialize ms_remaining to -1 if timeout_ms < 0 and never
> recalculate it, right? Note that you can also do ns_elapsed conversion
> to ms right here and then keep everything else in ms (so no need for
> timeout_ns, ns_remaining, etc).
Sounds good, let me give this a shot in v6.
Thanks for another detailed review!