[RFC PATCH 10/17] perf: Implement pinning and scheduling for SHMEM events

From: Alexander Shishkin
Date: Tue Sep 05 2017 - 09:42:06 EST


A SHMEM buffer is only pinned in while its task is scheduled in and the
pinning is done in a task work, which also implies that the corresponding
event can only be started from that task work.

Pinning is done on a per-cpu basis: if an event has previously been
pinned on the local cpu, its unpinned and its pin count is dropped and a
new event is pinned on this cpu. When the pin count goes to zero, we
unpin the pages, when it goes to one, we pin them.

Signed-off-by: Alexander Shishkin <alexander.shishkin@xxxxxxxxxxxxxxx>
---
include/linux/perf_event.h | 10 +++
kernel/events/core.c | 134 ++++++++++++++++++++++++++++-
kernel/events/internal.h | 5 ++
kernel/events/ring_buffer.c | 202 +++++++++++++++++++++++++++++++++++++++++++-
4 files changed, 347 insertions(+), 4 deletions(-)

diff --git a/include/linux/perf_event.h b/include/linux/perf_event.h
index 341e9960bc..4b966dd0d8 100644
--- a/include/linux/perf_event.h
+++ b/include/linux/perf_event.h
@@ -703,6 +703,13 @@ struct perf_event {
unsigned long rcu_batches;
int rcu_pending;

+ /*
+ * Number of times (CPUs) this event's been pinned (on):
+ * xpinned -> 0: unpin the pages,
+ * xpinned -> 1: pin the pages. See get_pages_work().
+ */
+ atomic_t xpinned;
+
/* poll related */
wait_queue_head_t waitq;
struct fasync_struct *fasync;
@@ -735,6 +742,9 @@ struct perf_event {
struct bpf_prog *prog;
#endif

+ /* Task work to pin event's rb pages if needed */
+ struct callback_head get_pages_work;
+
#ifdef CONFIG_EVENT_TRACING
struct trace_event_call *tp_event;
struct event_filter *filter;
diff --git a/kernel/events/core.c b/kernel/events/core.c
index feff812e30..c80ffcdb5c 100644
--- a/kernel/events/core.c
+++ b/kernel/events/core.c
@@ -50,6 +50,7 @@
#include <linux/sched/mm.h>
#include <linux/proc_ns.h>
#include <linux/mount.h>
+#include <linux/task_work.h>
#include <linux/tracefs.h>

#include "internal.h"
@@ -383,6 +384,7 @@ static atomic_t perf_sched_count;
static DEFINE_PER_CPU(atomic_t, perf_cgroup_events);
static DEFINE_PER_CPU(int, perf_sched_cb_usages);
static DEFINE_PER_CPU(struct pmu_event_list, pmu_sb_events);
+static DEFINE_PER_CPU(struct perf_event *, shmem_events);

static atomic_t nr_mmap_events __read_mostly;
static atomic_t nr_comm_events __read_mostly;
@@ -2058,6 +2060,94 @@ static void perf_set_shadow_time(struct perf_event *event,
event->shadow_ctx_time = tstamp - ctx->timestamp;
}

+static void __unpin_event_pages(struct perf_event *event,
+ struct perf_cpu_context *cpuctx,
+ struct perf_event_context *ctx,
+ void *info)
+{
+ if (!atomic_dec_and_test(&event->xpinned))
+ return;
+
+ /*
+ * If this event happens to be running, we need to stop it before we
+ * can pull the pages. Note that this will be happening if we allow
+ * concurrent shmem events, which seems like a bad idea.
+ */
+ if (READ_ONCE(event->state) == PERF_EVENT_STATE_ACTIVE)
+ event->pmu->stop(event, PERF_EF_UPDATE);
+
+ rb_put_kernel_pages(event->rb, false);
+}
+
+enum pin_event_t {
+ PIN_IN = 0,
+ PIN_NOP,
+};
+
+static enum pin_event_t pin_event_pages(struct perf_event *event)
+{
+ struct perf_event **pinned_event = this_cpu_ptr(&shmem_events);
+ struct perf_event *old_event = *pinned_event;
+
+ if (old_event == event)
+ return PIN_NOP;
+
+ if (old_event && old_event->state > PERF_EVENT_STATE_DEAD)
+ event_function_call(old_event, __unpin_event_pages, NULL);
+
+ *pinned_event = event;
+ if (atomic_inc_return(&event->xpinned) != 1)
+ return PIN_NOP;
+
+ return PIN_IN;
+}
+
+static int perf_event_stop(struct perf_event *event, int restart);
+
+static void get_pages_work(struct callback_head *work)
+{
+ struct perf_event *event = container_of(work, struct perf_event, get_pages_work);
+ int ret;
+ struct ring_buffer *rb = event->rb;
+ int (*get_fn)(struct perf_event *event) = rb_get_kernel_pages;
+
+ work->func = NULL;
+
+ if (!rb || current->flags & PF_EXITING)
+ return;
+
+ if (!rb->shmem_file_addr) {
+ get_fn = rb_inject;
+ if (atomic_cmpxchg(&event->xpinned, 1, 0))
+ rb_put_kernel_pages(rb, false);
+ }
+
+ if (pin_event_pages(event) == PIN_IN) {
+ ret = get_fn(event);
+ } else {
+ ret = 0;
+ }
+
+ if (!ret)
+ perf_event_stop(event, 1);
+}
+
+static int perf_event_queue_work(struct perf_event *event,
+ struct task_struct *task)
+{
+ int ret;
+
+ if (event->get_pages_work.func)
+ return 0;
+
+ init_task_work(&event->get_pages_work, get_pages_work);
+ ret = task_work_add(task, &event->get_pages_work, true);
+ if (ret)
+ return ret;
+
+ return 0;
+}
+
#define MAX_INTERRUPTS (~0ULL)

static void perf_log_throttle(struct perf_event *event, int enable);
@@ -2069,7 +2159,7 @@ event_sched_in(struct perf_event *event,
struct perf_event_context *ctx)
{
u64 tstamp = perf_event_time(event);
- int ret = 0;
+ int ret = 0, shmem = event->attach_state & PERF_ATTACH_SHMEM;

lockdep_assert_held(&ctx->lock);

@@ -2105,13 +2195,21 @@ event_sched_in(struct perf_event *event,

perf_log_itrace_start(event);

- if (event->pmu->add(event, PERF_EF_START)) {
+ /*
+ * For shmem events pmu::start will fail because of
+ * rb::aux_mmap_count==0, so skip the PERF_EF_START, but
+ * queue the task work that will actually start it.
+ */
+ if (event->pmu->add(event, shmem ? 0 : PERF_EF_START)) {
event->state = PERF_EVENT_STATE_INACTIVE;
event->oncpu = -1;
ret = -EAGAIN;
goto out;
}

+ if (shmem)
+ perf_event_queue_work(event, ctx->task);
+
event->tstamp_running += tstamp - event->tstamp_stopped;

if (!is_software_event(event))
@@ -4182,6 +4280,30 @@ static void _free_event(struct perf_event *event)

unaccount_event(event);

+ if (event->attach_state & PERF_ATTACH_SHMEM) {
+ struct perf_event_context *ctx = event->ctx;
+ int cpu;
+
+ atomic_set(&event->xpinned, 0);
+ for_each_possible_cpu(cpu) {
+ struct perf_event **pinned_event =
+ per_cpu_ptr(&shmem_events, cpu);
+
+ cmpxchg(pinned_event, event, NULL);
+ }
+
+ event->attach_state &= ~PERF_ATTACH_SHMEM;
+
+ /*
+ * XXX: !ctx means event is still being created;
+ * we can get here via tracefs file though
+ */
+ if (ctx && ctx->task && ctx->task != TASK_TOMBSTONE)
+ task_work_cancel(ctx->task, get_pages_work);
+
+ rb_put_kernel_pages(event->rb, false);
+ }
+
if (event->dent) {
tracefs_remove(event->dent);

@@ -4948,6 +5070,10 @@ void perf_event_update_userpage(struct perf_event *event)
if (!rb)
goto unlock;

+ /* Don't bother with the file backed rb when it's inactive */
+ if (rb->shmem_file && rb->paused)
+ goto unlock;
+
/*
* compute total_time_enabled, total_time_running
* based on snapshot values taken when the event
@@ -10684,6 +10810,8 @@ void perf_event_exit_task(struct task_struct *child)
}
mutex_unlock(&child->perf_event_mutex);

+ task_work_cancel(child, get_pages_work);
+
for_each_task_context_nr(ctxn)
perf_event_exit_task_context(child, ctxn);

@@ -10881,6 +11009,8 @@ inherit_event(struct perf_event *parent_event,
* For per-task detached events with ring buffers, set_output doesn't
* make sense, but we can allocate a new buffer here. CPU-wide events
* don't have inheritance.
+ * If we have to allocate a ring buffer, it must be shmem backed,
+ * otherwise inheritance is disallowed in rb_alloc_detached().
*/
if (detached) {
int err;
diff --git a/kernel/events/internal.h b/kernel/events/internal.h
index 8de9e9cb6a..80d36a7277 100644
--- a/kernel/events/internal.h
+++ b/kernel/events/internal.h
@@ -55,11 +55,16 @@ struct ring_buffer {

/* tmpfs file for kernel-owned ring buffers */
struct file *shmem_file;
+ unsigned long shmem_file_addr;
+ int shmem_pages_in;

struct perf_event_mmap_page *user_page;
void *data_pages[0];
};

+extern int rb_inject(struct perf_event *event);
+extern int rb_get_kernel_pages(struct perf_event *event);
+extern void rb_put_kernel_pages(struct ring_buffer *rb, bool final);
extern void rb_free(struct ring_buffer *rb);
extern void ring_buffer_unaccount(struct ring_buffer *rb, bool aux);

diff --git a/kernel/events/ring_buffer.c b/kernel/events/ring_buffer.c
index 25159fe038..771dfdb71f 100644
--- a/kernel/events/ring_buffer.c
+++ b/kernel/events/ring_buffer.c
@@ -15,6 +15,8 @@
#include <linux/circ_buf.h>
#include <linux/poll.h>
#include <linux/shmem_fs.h>
+#include <linux/mman.h>
+#include <linux/sched/mm.h>

#include "internal.h"

@@ -384,8 +386,11 @@ void *perf_aux_output_begin(struct perf_output_handle *handle,
unsigned long aux_head, aux_tail;
struct ring_buffer *rb;

- if (output_event->parent)
+ if (output_event->parent) {
+ WARN_ON_ONCE(is_detached_event(event));
+ WARN_ON_ONCE(event->attach_state & PERF_ATTACH_SHMEM);
output_event = output_event->parent;
+ }

/*
* Since this will typically be open across pmu::add/pmu::del, we
@@ -851,6 +856,64 @@ void rb_free_aux(struct ring_buffer *rb)
}
}

+static unsigned long perf_rb_size(struct ring_buffer *rb)
+{
+ return perf_data_size(rb) + perf_aux_size(rb) + PAGE_SIZE;
+}
+
+int rb_inject(struct perf_event *event)
+{
+ struct ring_buffer *rb = event->rb;
+ struct mm_struct *mm;
+ unsigned long addr;
+ int err = -ENOMEM;
+
+ mm = get_task_mm(current);
+ if (!mm)
+ return -ESRCH;
+
+ err = rb_get_kernel_pages(event);
+ if (err)
+ goto err_mmput;
+
+ addr = vm_mmap(rb->shmem_file, 0, perf_rb_size(rb), PROT_READ,
+ MAP_SHARED | MAP_POPULATE, 0);
+
+ mmput(mm);
+ rb->mmap_mapping = mm;
+ rb->shmem_file_addr = addr;
+
+ return 0;
+
+err_mmput:
+ mmput(mm);
+
+ return err;
+}
+
+static void rb_shmem_unmap(struct perf_event *event)
+{
+ struct ring_buffer *rb = event->rb;
+ struct mm_struct *mm = rb->mmap_mapping;
+
+ rb_toggle_paused(rb, true);
+
+ if (!rb->shmem_file_addr)
+ return;
+
+ /*
+ * EXIT state means the task is past exit_mm(),
+ * no need to unmap anything
+ */
+ if (event->state == PERF_EVENT_STATE_EXIT)
+ return;
+
+ down_write(&mm->mmap_sem);
+ (void)do_munmap(mm, rb->shmem_file_addr, perf_rb_size(rb), NULL);
+ up_write(&mm->mmap_sem);
+ rb->shmem_file_addr = 0;
+}
+
static int rb_shmem_setup(struct perf_event *event,
struct task_struct *task,
struct ring_buffer *rb)
@@ -892,6 +955,138 @@ static int rb_shmem_setup(struct perf_event *event,
return 0;
}

+/*
+ * Pin ring_buffer's pages to memory while the task is scheduled in;
+ * populate its page arrays (data_pages, aux_pages, user_page).
+ */
+int rb_get_kernel_pages(struct perf_event *event)
+{
+ struct ring_buffer *rb = event->rb;
+ struct address_space *mapping;
+ int nr_pages, i = 0, err = -EINVAL, changed = 0, mc = 0;
+ struct page *page;
+
+ /*
+ * The mmap_count rules for SHMEM buffers:
+ * - they are always taken together
+ * - except for perf_mmap(), which doesn't work for shmem buffers:
+ * mmaping will force-pin more user's pages than is allowed
+ * - if either of them was taken before us, the pages are there
+ */
+ if (atomic_inc_return(&rb->mmap_count) == 1)
+ mc++;
+
+ if (atomic_inc_return(&rb->aux_mmap_count) == 1)
+ mc++;
+
+ if (mc < 2)
+ goto done;
+
+ if (WARN_ON_ONCE(!rb->shmem_file))
+ goto err_put;
+
+ nr_pages = perf_rb_size(rb) >> PAGE_SHIFT;
+
+ mapping = rb->shmem_file->f_mapping;
+
+restart:
+ for (i = 0; i < nr_pages; i++) {
+ WRITE_ONCE(rb->shmem_pages_in, i);
+ err = shmem_getpage(mapping->host, i, &page, SGP_NOHUGE);
+ if (err)
+ goto err_put;
+
+ unlock_page(page);
+
+ if (READ_ONCE(rb->shmem_pages_in) != i) {
+ put_page(page);
+ goto restart;
+ }
+
+ mark_page_accessed(page);
+ set_page_dirty(page);
+ page->mapping = mapping;
+
+ if (page == perf_mmap_to_page(rb, i))
+ continue;
+
+ changed++;
+ if (!i) {
+ bool init = !rb->user_page;
+
+ rb->user_page = page_address(page);
+ if (init)
+ perf_event_init_userpage(event, rb);
+ } else if (i <= rb->nr_pages) {
+ rb->data_pages[i - 1] = page_address(page);
+ } else {
+ rb->aux_pages[i - rb->nr_pages - 1] = page_address(page);
+ }
+ }
+
+ /* rebuild SG tables: pages may have changed */
+ if (changed) {
+ if (rb->aux_priv)
+ rb->free_aux(rb->aux_priv);
+
+ rb->aux_priv = event->pmu->setup_aux(smp_processor_id(),
+ rb->aux_pages,
+ rb->aux_nr_pages, true);
+ }
+
+ if (!rb->aux_priv) {
+ err = -ENOMEM;
+ goto err_put;
+ }
+
+done:
+ rb_toggle_paused(rb, false);
+ if (changed)
+ perf_event_update_userpage(event);
+
+ return 0;
+
+err_put:
+ for (i--; i >= 0; i--) {
+ page = perf_mmap_to_page(rb, i);
+ put_page(page);
+ }
+
+ atomic_dec(&rb->aux_mmap_count);
+ atomic_dec(&rb->mmap_count);
+
+ return err;
+}
+
+void rb_put_kernel_pages(struct ring_buffer *rb, bool final)
+{
+ struct page *page;
+ int i;
+
+ if (!rb || !rb->shmem_file)
+ return;
+
+ rb_toggle_paused(rb, true);
+
+ /*
+ * If both mmap_counts go to zero, put the pages, otherwise
+ * do nothing.
+ */
+ if (!atomic_dec_and_test(&rb->aux_mmap_count) ||
+ !atomic_dec_and_test(&rb->mmap_count))
+ return;
+
+ for (i = 0; i < READ_ONCE(rb->shmem_pages_in); i++) {
+ page = perf_mmap_to_page(rb, i);
+ set_page_dirty(page);
+ if (final)
+ page->mapping = NULL;
+ put_page(page);
+ }
+
+ WRITE_ONCE(rb->shmem_pages_in, 0);
+}
+
/*
* Allocate a ring_buffer for a detached event and attach it to this event.
* There's one ring_buffer per detached event and vice versa, so
@@ -962,8 +1157,11 @@ void rb_free_detached(struct ring_buffer *rb, struct perf_event *event)
/* Must be the last one */
WARN_ON_ONCE(atomic_read(&rb->refcount) != 1);

- if (rb->shmem_file)
+ if (rb->shmem_file) {
+ rb_shmem_unmap(event);
shmem_truncate_range(rb->shmem_file->f_inode, 0, (loff_t)-1);
+ rb_put_kernel_pages(rb, true);
+ }

atomic_set(&rb->aux_mmap_count, 0);
rcu_assign_pointer(event->rb, NULL);
--
2.14.1