[PATCH v1 13/15] perf record: stop threads in the end of trace streaming

From: Alexey Budankov
Date: Mon Oct 12 2020 - 05:10:16 EST



Close write fd of comm.msg pipe to signal thread to terminate
and receive THREAD_MSG__READY confirmation on termination.
Accumulate thread stats into global stats to be correctly
calculated and displayed in perf tool output.

Signed-off-by: Alexey Budankov <alexey.budankov@xxxxxxxxxxxxxxx>
---
tools/perf/builtin-record.c | 64 ++++++++++++++++++++++++++++++++++---
1 file changed, 60 insertions(+), 4 deletions(-)

diff --git a/tools/perf/builtin-record.c b/tools/perf/builtin-record.c
index 3b7e9026f25b..a15642656066 100644
--- a/tools/perf/builtin-record.c
+++ b/tools/perf/builtin-record.c
@@ -85,6 +85,16 @@ struct switch_output {
int cur_file;
};

+enum thread_msg {
+ THREAD_MSG__UNSUPPORTED = 0,
+ THREAD_MSG__READY,
+ THREAD_MSG__MAX,
+};
+
+static const char *thread_msg_tags[THREAD_MSG__MAX] = {
+ "UNSUPPORTED", "READY"
+};
+
struct thread_data {
pid_t tid;
struct {
@@ -1796,6 +1806,50 @@ static void hit_auxtrace_snapshot_trigger(struct record *rec)
}
}

+static int record__terminate_thread(struct thread_data *thread_data)
+{
+ int res;
+ enum thread_msg ack = THREAD_MSG__UNSUPPORTED;
+ pid_t tid = thread_data->tid;
+
+ close(thread_data->comm.msg[1]);
+ res = read(thread_data->comm.ack[0], &ack, sizeof(ack));
+ if (res != -1)
+ pr_debug("threads: %d -> %s\n", tid, thread_msg_tags[ack]);
+ else
+ pr_err("threads: failed to recv msg=%s from %d\n",
+ thread_msg_tags[ack], tid);
+
+ return 0;
+}
+
+static int record__stop_threads(struct record *rec, unsigned long *waking)
+{
+ int i, j, nr_thread_data = rec->nr_thread_data;
+ struct thread_data *thread_data = rec->thread_data;
+
+ if (!record__threads_enabled(rec))
+ return 0;
+
+ for (i = 1; i < nr_thread_data; i++)
+ record__terminate_thread(&thread_data[i]);
+
+ for (i = 0; i < nr_thread_data; i++) {
+ pr_debug("threads: %d : samples %lld, wakes %ld\n",
+ thread_data[i].tid, thread_data[i].samples,
+ thread_data[i].waking);
+
+ rec->samples += thread_data[i].samples;
+ *waking += thread_data[i].waking;
+ for (j = 0; j < thread_data[i].nr_mmaps; j++) {
+ rec->session->bytes_transferred += thread_data[i].maps[j].bytes_transferred;
+ rec->session->bytes_compressed += thread_data[i].maps[j].bytes_compressed;
+ }
+ }
+
+ return 0;
+}
+
static int __cmd_record(struct record *rec, int argc, const char **argv)
{
int err;
@@ -1903,7 +1957,7 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)

if (record__open(rec) != 0) {
err = -1;
- goto out_child;
+ goto out_free_threads;
}
session->header.env.comp_mmap_len = session->evlist->core.mmap_len;

@@ -2203,18 +2257,20 @@ static int __cmd_record(struct record *rec, int argc, const char **argv)
goto out_child;
}

- if (!quiet)
- fprintf(stderr, "[ perf record: Woken up %ld times to write data ]\n", waking);
-
if (target__none(&rec->opts.target))
record__synthesize_workload(rec, true);

out_child:
+ record__stop_threads(rec, &waking);
+out_free_threads:
record__free_thread_data(rec);
evlist__finalize_ctlfd(rec->evlist);
record__mmap_read_all(rec, true);
record__aio_mmap_read_sync(rec);

+ if (!quiet)
+ fprintf(stderr, "[ perf record: Woken up %ld times to write data ]\n", waking);
+
if (rec->session->bytes_transferred && rec->session->bytes_compressed) {
ratio = (float)rec->session->bytes_transferred/(float)rec->session->bytes_compressed;
session->header.env.comp_ratio = ratio + 0.5;
--
2.24.1