Re: [PATCH] [RFC] perf: Fix a race between ring_buffer_detach() and ring_buffer_wakeup()

From: Peter Zijlstra
Date: Wed May 07 2014 - 08:35:42 EST


On Fri, Mar 14, 2014 at 10:50:33AM +0100, Peter Zijlstra wrote:
> How about something like so that only does the sync_rcu() if really
> needed.

Paul reminded me we still have this issue open; I had another look and
the patch I proposed was completely broken, so I had to write me a new
one :-)

---
Subject: perf: Fix a race between ring_buffer_detach() and ring_buffer_attach()
From: Peter Zijlstra <peterz@xxxxxxxxxxxxx>
Date: Fri, 14 Mar 2014 10:50:33 +0100

Alexander noticed that we use RCU iteration on rb->event_list but do
not use list_{add,del}_rcu() to add,remove entries to that list, nor
do we observe proper grace periods when re-using the entries.

Merge ring_buffer_detach() into ring_buffer_attach() such that
attaching to the NULL buffer is detaching.

Furthermore, ensure that between any 'detach' and 'attach' of the same
event we observe the required grace period, but only when strictly
required. In effect this means that only ioctl(.request =
PERF_EVENT_IOC_SET_OUTPUT) will wait for a grace period, while the
normal initial attach and final detach will not be delayed.

This patch should, I think, do the right thing under all
circumstances, the 'normal' cases all should never see the extra grace
period, but the two cases:

1) PERF_EVENT_IOC_SET_OUTPUT on an event which already has a
ring_buffer set, will now observe the required grace period between
removing itself from the old and attaching itself to the new buffer.

This case is 'simple' in that both buffers are present in
perf_event_set_output() one could think an unconditional
synchronize_rcu() would be sufficient; however...

2) an event that has a buffer attached, the buffer is destroyed
(munmap) and then the event is attached to a new/different buffer
using PERF_EVENT_IOC_SET_OUTPUT.

This case is more complex because the buffer destruction does:
ring_buffer_attach(.rb = NULL)
followed by the ioctl() doing:
ring_buffer_attach(.rb = foo);

and we still need to observe the grace period between these two
calls due to us reusing the event->rb_entry list_head.

In order to make 2 happen we use Paul's latest cond_synchronize_rcu()
call.

Cc: Frederic Weisbecker <fweisbec@xxxxxxxxx>
Cc: Mike Galbraith <efault@xxxxxx>
Cc: Paul Mackerras <paulus@xxxxxxxxx>
Cc: Stephane Eranian <eranian@xxxxxxxxxx>
Cc: Andi Kleen <ak@xxxxxxxxxxxxxxx>
Cc: "Paul E. McKenney" <paulmck@xxxxxxxxxxxxxxxxxx>
Cc: Ingo Molnar <mingo@xxxxxxxxxx>
Reported-by: Alexander Shishkin <alexander.shishkin@xxxxxxxxxxxxxxx>
Signed-off-by: Peter Zijlstra <peterz@xxxxxxxxxxxxx>
---
include/linux/perf_event.h | 2
kernel/events/core.c | 109 ++++++++++++++++++++-------------------------
2 files changed, 51 insertions(+), 60 deletions(-)

--- a/include/linux/perf_event.h
+++ b/include/linux/perf_event.h
@@ -403,6 +403,8 @@ struct perf_event {

struct ring_buffer *rb;
struct list_head rb_entry;
+ unsigned long rcu_batches;
+ int rcu_pending;

/* poll related */
wait_queue_head_t waitq;
--- a/kernel/events/core.c
+++ b/kernel/events/core.c
@@ -3179,7 +3179,8 @@ static void free_event_rcu(struct rcu_he
}

static void ring_buffer_put(struct ring_buffer *rb);
-static void ring_buffer_detach(struct perf_event *event, struct ring_buffer *rb);
+static void ring_buffer_attach(struct perf_event *event,
+ struct ring_buffer *rb);

static void unaccount_event_cpu(struct perf_event *event, int cpu)
{
@@ -3242,8 +3243,6 @@ static void free_event(struct perf_event
unaccount_event(event);

if (event->rb) {
- struct ring_buffer *rb;
-
/*
* Can happen when we close an event with re-directed output.
*
@@ -3251,12 +3250,7 @@ static void free_event(struct perf_event
* over us; possibly making our ring_buffer_put() the last.
*/
mutex_lock(&event->mmap_mutex);
- rb = event->rb;
- if (rb) {
- rcu_assign_pointer(event->rb, NULL);
- ring_buffer_detach(event, rb);
- ring_buffer_put(rb); /* could be last */
- }
+ ring_buffer_attach(event, NULL);
mutex_unlock(&event->mmap_mutex);
}

@@ -3843,28 +3837,47 @@ static int perf_mmap_fault(struct vm_are
static void ring_buffer_attach(struct perf_event *event,
struct ring_buffer *rb)
{
+ struct ring_buffer *old_rb = NULL;
unsigned long flags;

- if (!list_empty(&event->rb_entry))
- return;
+ if (event->rb) {
+ /*
+ * Should be impossible, we set this when removing
+ * event->rb_entry and wait/clear when adding event->rb_entry.
+ */
+ WARN_ON_ONCE(event->rcu_pending);

- spin_lock_irqsave(&rb->event_lock, flags);
- if (list_empty(&event->rb_entry))
- list_add(&event->rb_entry, &rb->event_list);
- spin_unlock_irqrestore(&rb->event_lock, flags);
-}
+ old_rb = event->rb;
+ event->rcu_batches = get_state_synchronize_rcu();
+ event->rcu_pending = 1;

-static void ring_buffer_detach(struct perf_event *event, struct ring_buffer *rb)
-{
- unsigned long flags;
+ spin_lock_irqsave(&rb->event_lock, flags);
+ list_del_rcu(&event->rb_entry);
+ spin_unlock_irqrestore(&rb->event_lock, flags);
+ }

- if (list_empty(&event->rb_entry))
- return;
+ if (event->rcu_pending && rb) {
+ cond_synchronize_rcu(event->rcu_batches);
+ event->rcu_pending = 0;
+ }

- spin_lock_irqsave(&rb->event_lock, flags);
- list_del_init(&event->rb_entry);
- wake_up_all(&event->waitq);
- spin_unlock_irqrestore(&rb->event_lock, flags);
+ if (rb) {
+ spin_lock_irqsave(&rb->event_lock, flags);
+ list_add_rcu(&event->rb_entry, &rb->event_list);
+ spin_unlock_irqrestore(&rb->event_lock, flags);
+ }
+
+ rcu_assign_pointer(event->rb, rb);
+
+ if (old_rb) {
+ ring_buffer_put(old_rb);
+ /*
+ * Since we detached before setting the new rb, so that we
+ * could attach the new rb, we could have missed a wakeup.
+ * Provide it now.
+ */
+ wake_up_all(&event->waitq);
+ }
}

static void ring_buffer_wakeup(struct perf_event *event)
@@ -3933,7 +3946,7 @@ static void perf_mmap_close(struct vm_ar
{
struct perf_event *event = vma->vm_file->private_data;

- struct ring_buffer *rb = event->rb;
+ struct ring_buffer *rb = ring_buffer_get(event);
struct user_struct *mmap_user = rb->mmap_user;
int mmap_locked = rb->mmap_locked;
unsigned long size = perf_data_size(rb);
@@ -3941,18 +3954,14 @@ static void perf_mmap_close(struct vm_ar
atomic_dec(&rb->mmap_count);

if (!atomic_dec_and_mutex_lock(&event->mmap_count, &event->mmap_mutex))
- return;
+ goto out_put;

- /* Detach current event from the buffer. */
- rcu_assign_pointer(event->rb, NULL);
- ring_buffer_detach(event, rb);
+ ring_buffer_attach(event, NULL);
mutex_unlock(&event->mmap_mutex);

/* If there's still other mmap()s of this buffer, we're done. */
- if (atomic_read(&rb->mmap_count)) {
- ring_buffer_put(rb); /* can't be last */
- return;
- }
+ if (atomic_read(&rb->mmap_count))
+ goto out_put;

/*
* No other mmap()s, detach from all other events that might redirect
@@ -3982,11 +3991,9 @@ static void perf_mmap_close(struct vm_ar
* still restart the iteration to make sure we're not now
* iterating the wrong list.
*/
- if (event->rb == rb) {
- rcu_assign_pointer(event->rb, NULL);
- ring_buffer_detach(event, rb);
- ring_buffer_put(rb); /* can't be last, we still have one */
- }
+ if (event->rb == rb)
+ ring_buffer_attach(event, NULL);
+
mutex_unlock(&event->mmap_mutex);
put_event(event);

@@ -4011,6 +4018,7 @@ static void perf_mmap_close(struct vm_ar
vma->vm_mm->pinned_vm -= mmap_locked;
free_uid(mmap_user);

+out_put:
ring_buffer_put(rb); /* could be last */
}

@@ -4128,7 +4136,6 @@ static int perf_mmap(struct file *file,
vma->vm_mm->pinned_vm += extra;

ring_buffer_attach(event, rb);
- rcu_assign_pointer(event->rb, rb);

perf_event_init_userpage(event);
perf_event_update_userpage(event);
@@ -6929,7 +6936,7 @@ static int perf_copy_attr(struct perf_ev
static int
perf_event_set_output(struct perf_event *event, struct perf_event *output_event)
{
- struct ring_buffer *rb = NULL, *old_rb = NULL;
+ struct ring_buffer *rb = NULL;
int ret = -EINVAL;

if (!output_event)
@@ -6957,8 +6964,6 @@ perf_event_set_output(struct perf_event
if (atomic_read(&event->mmap_count))
goto unlock;

- old_rb = event->rb;
-
if (output_event) {
/* get the rb we want to redirect to */
rb = ring_buffer_get(output_event);
@@ -6966,23 +6971,7 @@ perf_event_set_output(struct perf_event
goto unlock;
}

- if (old_rb)
- ring_buffer_detach(event, old_rb);
-
- if (rb)
- ring_buffer_attach(event, rb);
-
- rcu_assign_pointer(event->rb, rb);
-
- if (old_rb) {
- ring_buffer_put(old_rb);
- /*
- * Since we detached before setting the new rb, so that we
- * could attach the new rb, we could have missed a wakeup.
- * Provide it now.
- */
- wake_up_all(&event->waitq);
- }
+ ring_buffer_attach(event, rb);

ret = 0;
unlock:

Attachment: pgp_OclgF1A0T.pgp
Description: PGP signature