Re: [PATCH v6 05/20] perf record: Start threads in the beginning of trace streaming
From: Riccardo Mancini
Date: Thu Jun 03 2021 - 19:02:40 EST
Hi,
On Wed, 2021-05-26 at 13:52 +0300, Alexey Bayduraev wrote:
> Start thread in detached state because its management is implemented
> via messaging to avoid any scaling issues. Block signals prior thread
> start so only main tool thread would be notified on external async
> signals during data collection. Thread affinity mask is used to assign
> eligible cpus for the thread to run. Wait and sync on thread start using
> thread ack pipe.
>
> Signed-off-by: Alexey Bayduraev <alexey.v.bayduraev@xxxxxxxxxxxxxxx>
> ---
> tools/perf/builtin-record.c | 106 +++++++++++++++++++++++++++++++++++-
> 1 file changed, 105 insertions(+), 1 deletion(-)
>
> diff --git a/tools/perf/builtin-record.c b/tools/perf/builtin-record.c
> index 838c1f779849..88fad12cbe5b 100644
> --- a/tools/perf/builtin-record.c
> +++ b/tools/perf/builtin-record.c
> @@ -1423,6 +1423,66 @@ static void record__thread_munmap_filtered(struct
> fdarray *fda, int fd,
> perf_mmap__put(map);
> }
>
> +static void *record__thread(void *arg)
> +{
> + enum thread_msg msg = THREAD_MSG__READY;
> + bool terminate = false;
> + struct fdarray *pollfd;
> + int err, ctlfd_pos;
> +
> + thread = arg;
> + thread->tid = syscall(SYS_gettid);
> +
> + err = write(thread->pipes.ack[1], &msg, sizeof(msg));
> + if (err == -1)
> + pr_err("threads[%d]: failed to notify on start: %s", thread-
> >tid, strerror(errno));
> +
> + pr_debug("threads[%d]: started on cpu=%d\n", thread->tid,
> sched_getcpu());
> +
> + pollfd = &thread->pollfd;
> + ctlfd_pos = thread->ctlfd_pos;
> +
> + for (;;) {
> + unsigned long long hits = thread->samples;
> +
> + if (record__mmap_read_all(thread->rec, false) < 0 ||
> terminate)
> + break;
> +
> + if (hits == thread->samples) {
> +
> + err = fdarray__poll(pollfd, -1);
> + /*
> + * Propagate error, only if there's any. Ignore
> positive
> + * number of returned events and interrupt error.
> + */
> + if (err > 0 || (err < 0 && errno == EINTR))
> + err = 0;
> + thread->waking++;
> +
> + if (fdarray__filter(pollfd, POLLERR | POLLHUP,
> + record__thread_munmap_filtered,
> NULL) == 0)
> + break;
> + }
> +
> + if (pollfd->entries[ctlfd_pos].revents & POLLHUP) {
> + terminate = true;
> + close(thread->pipes.msg[0]);
> + pollfd->entries[ctlfd_pos].fd = -1;
> + pollfd->entries[ctlfd_pos].events = 0;
> + }
> +
> + pollfd->entries[ctlfd_pos].revents = 0;
> + }
> + record__mmap_read_all(thread->rec, true);
> +
> + err = write(thread->pipes.ack[1], &msg, sizeof(msg));
> + if (err == -1)
> + pr_err("threads[%d]: failed to notify on termination: %s",
> + thread->tid, strerror(errno));
> +
> + return NULL;
> +}
> +
> static void record__init_features(struct record *rec)
> {
> struct perf_session *session = rec->session;
> @@ -1886,13 +1946,57 @@ static int record__terminate_thread(struct thread_data
> *thread_data)
>
> static int record__start_threads(struct record *rec)
> {
> + int t, tt, ret = 0, nr_threads = rec->nr_threads;
> struct thread_data *thread_data = rec->thread_data;
> + sigset_t full, mask;
> + pthread_t handle;
> + pthread_attr_t attrs;
> +
> + sigfillset(&full);
> + if (sigprocmask(SIG_SETMASK, &full, &mask)) {
> + pr_err("Failed to block signals on threads start: %s\n",
> strerror(errno));
> + return -1;
> + }
> +
> + pthread_attr_init(&attrs);
> + pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
> +
> + for (t = 1; t < nr_threads; t++) {
> + enum thread_msg msg = THREAD_MSG__UNDEFINED;
> +
> + pthread_attr_setaffinity_np(&attrs,
> +
> MMAP_CPU_MASK_BYTES(&(thread_data[t].mask->affinity)),
> + (cpu_set_t *)(thread_data[t].mask-
> >affinity.bits));
> +
> + if (pthread_create(&handle, &attrs, record__thread,
> &thread_data[t])) {
> + for (tt = 1; tt < t; tt++)
> + record__terminate_thread(&thread_data[t]);
> + pr_err("Failed to start threads: %s\n",
> strerror(errno));
> + ret = -1;
> + goto out_err;
> + }
> +
> + if (read(thread_data[t].pipes.ack[0], &msg, sizeof(msg)) > 0)
> + pr_debug2("threads[%d]: sent %s\n", rec-
> >thread_data[t].tid,
> + thread_msg_tags[msg]);
> + }
> +
> + if (nr_threads > 1) {
> + sched_setaffinity(0, MMAP_CPU_MASK_BYTES(&thread_data[0].mask-
> >affinity),
> + (cpu_set_t *)thread_data[0].mask-
> >affinity.bits);
> + }
>
> thread = &thread_data[0];
>
> pr_debug("threads[%d]: started on cpu=%d\n", thread->tid,
> sched_getcpu());
>
> - return 0;
> +out_err:
> + if (sigprocmask(SIG_SETMASK, &mask, NULL)) {
> + pr_err("Failed to unblock signals on threads start: %s\n",
> strerror(errno));
> + ret = -1;
> + }
> +
> + return ret;
> }
ASan complains of a memory leak of the attrs, since pthread_attr_destroy is
missing. It could be added just after out_err label.
Thanks,
Riccardo
>
> static int record__stop_threads(struct record *rec, unsigned long *waking)