Re: [for-next][PATCH] ring-buffer: Do not wake up a splice waiter when page is not full

From: Steven Rostedt
Date: Wed Feb 11 2015 - 13:30:50 EST



I added Rabin in the tags as Cc'd but as I send out single patches
manually, I forgot to add him to the real Cc.

-- Steve


On Wed, 11 Feb 2015 07:44:59 -0500
Steven Rostedt <rostedt@xxxxxxxxxxx> wrote:

> git://git.kernel.org/pub/scm/linux/kernel/git/rostedt/linux-trace.git
> for-next
>
> Head SHA1: 1e0d6714aceb770b04161fbedd7765d0e1fc27bd
>
>
> Steven Rostedt (Red Hat) (1):
> ring-buffer: Do not wake up a splice waiter when page is not full
>
> ----
> kernel/trace/ring_buffer.c | 40 +++++++++++++++++++++++++++++++++++-----
> 1 file changed, 35 insertions(+), 5 deletions(-)
> ---------------------------
> commit 1e0d6714aceb770b04161fbedd7765d0e1fc27bd
> Author: Steven Rostedt (Red Hat) <rostedt@xxxxxxxxxxx>
> Date: Tue Feb 10 22:14:53 2015 -0500
>
> ring-buffer: Do not wake up a splice waiter when page is not full
>
> When an application connects to the ring buffer via splice, it can only
> read full pages. Splice does not work with partial pages. If there is
> not enough data to fill a page, the splice command will either block
> or return -EAGAIN (if set to nonblock).
>
> Code was added where if the page is not full, to just sleep again.
> The problem is, it will get woken up again on the next event. That
> is, when something is written into the ring buffer, if there is a waiter
> it will wake it up. The waiter would then check the buffer, see that
> it still does not have enough data to fill a page and go back to sleep.
> To make matters worse, when the waiter goes back to sleep, it could
> cause another event, which would wake it back up again to see it
> doesn't have enough data and sleep again. This produces a tremendous
> overhead and fills the ring buffer with noise.
>
> For example, recording sched_switch on an idle system for 10 seconds
> produces 25,350,475 events!!!
>
> Create another wait queue for those waiters wanting full pages.
> When an event is written, it only wakes up waiters if there's a full
> page of data. It does not wake up the waiter if the page is not yet
> full.
>
> After this change, recording sched_switch on an idle system for 10
> seconds produces only 800 events. Getting rid of 25,349,675 useless
> events (99.9969% of events!!), is something to take seriously.
>
> Cc: stable@xxxxxxxxxxxxxxx # 3.16+
> Cc: Rabin Vincent <rabin@xxxxxx>
> Fixes: e30f53aad220 "tracing: Do not busy wait in buffer splice"
> Signed-off-by: Steven Rostedt <rostedt@xxxxxxxxxxx>
>
> diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
> index 96079180de3d..5040d44fe5a3 100644
> --- a/kernel/trace/ring_buffer.c
> +++ b/kernel/trace/ring_buffer.c
> @@ -445,7 +445,10 @@ int ring_buffer_print_page_header(struct trace_seq *s)
> struct rb_irq_work {
> struct irq_work work;
> wait_queue_head_t waiters;
> + wait_queue_head_t full_waiters;
> bool waiters_pending;
> + bool full_waiters_pending;
> + bool wakeup_full;
> };
>
> /*
> @@ -527,6 +530,10 @@ static void rb_wake_up_waiters(struct irq_work *work)
> struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
>
> wake_up_all(&rbwork->waiters);
> + if (rbwork->wakeup_full) {
> + rbwork->wakeup_full = false;
> + wake_up_all(&rbwork->full_waiters);
> + }
> }
>
> /**
> @@ -551,9 +558,11 @@ int ring_buffer_wait(struct ring_buffer *buffer, int cpu, bool full)
> * data in any cpu buffer, or a specific buffer, put the
> * caller on the appropriate wait queue.
> */
> - if (cpu == RING_BUFFER_ALL_CPUS)
> + if (cpu == RING_BUFFER_ALL_CPUS) {
> work = &buffer->irq_work;
> - else {
> + /* Full only makes sense on per cpu reads */
> + full = false;
> + } else {
> if (!cpumask_test_cpu(cpu, buffer->cpumask))
> return -ENODEV;
> cpu_buffer = buffer->buffers[cpu];
> @@ -562,7 +571,10 @@ int ring_buffer_wait(struct ring_buffer *buffer, int cpu, bool full)
>
>
> while (true) {
> - prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
> + if (full)
> + prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE);
> + else
> + prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
>
> /*
> * The events can happen in critical sections where
> @@ -584,7 +596,10 @@ int ring_buffer_wait(struct ring_buffer *buffer, int cpu, bool full)
> * that is necessary is that the wake up happens after
> * a task has been queued. It's OK for spurious wake ups.
> */
> - work->waiters_pending = true;
> + if (full)
> + work->full_waiters_pending = true;
> + else
> + work->waiters_pending = true;
>
> if (signal_pending(current)) {
> ret = -EINTR;
> @@ -613,7 +628,10 @@ int ring_buffer_wait(struct ring_buffer *buffer, int cpu, bool full)
> schedule();
> }
>
> - finish_wait(&work->waiters, &wait);
> + if (full)
> + finish_wait(&work->full_waiters, &wait);
> + else
> + finish_wait(&work->waiters, &wait);
>
> return ret;
> }
> @@ -1228,6 +1246,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int nr_pages, int cpu)
> init_completion(&cpu_buffer->update_done);
> init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
> init_waitqueue_head(&cpu_buffer->irq_work.waiters);
> + init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
>
> bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
> GFP_KERNEL, cpu_to_node(cpu));
> @@ -2799,6 +2818,8 @@ static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
> static __always_inline void
> rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
> {
> + bool pagebusy;
> +
> if (buffer->irq_work.waiters_pending) {
> buffer->irq_work.waiters_pending = false;
> /* irq_work_queue() supplies it's own memory barriers */
> @@ -2810,6 +2831,15 @@ rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
> /* irq_work_queue() supplies it's own memory barriers */
> irq_work_queue(&cpu_buffer->irq_work.work);
> }
> +
> + pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
> +
> + if (!pagebusy && cpu_buffer->irq_work.full_waiters_pending) {
> + cpu_buffer->irq_work.wakeup_full = true;
> + cpu_buffer->irq_work.full_waiters_pending = false;
> + /* irq_work_queue() supplies it's own memory barriers */
> + irq_work_queue(&cpu_buffer->irq_work.work);
> + }
> }
>
> /**

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/