Re: [PATCH v8 05/22] perf record: Start threads in the beginning of trace streaming

From: Arnaldo Carvalho de Melo
Date: Wed Jun 30 2021 - 13:21:45 EST


Em Wed, Jun 30, 2021 at 06:54:44PM +0300, Alexey Bayduraev escreveu:
> Start thread in detached state because its management is implemented
> via messaging to avoid any scaling issues. Block signals prior thread
> start so only main tool thread would be notified on external async
> signals during data collection. Thread affinity mask is used to assign
> eligible cpus for the thread to run. Wait and sync on thread start using
> thread ack pipe.
>
> Acked-by: Namhyung Kim <namhyung@xxxxxxxxx>
> Signed-off-by: Alexey Bayduraev <alexey.v.bayduraev@xxxxxxxxxxxxxxx>
> ---
> tools/perf/builtin-record.c | 108 +++++++++++++++++++++++++++++++++++-
> 1 file changed, 107 insertions(+), 1 deletion(-)
>
> diff --git a/tools/perf/builtin-record.c b/tools/perf/builtin-record.c
> index 82a21da2af16..cead2b3c56d7 100644
> --- a/tools/perf/builtin-record.c
> +++ b/tools/perf/builtin-record.c
> @@ -1423,6 +1423,66 @@ static void record__thread_munmap_filtered(struct fdarray *fda, int fd,
> perf_mmap__put(map);
> }
>
> +static void *record__thread(void *arg)
> +{
> + enum thread_msg msg = THREAD_MSG__READY;
> + bool terminate = false;
> + struct fdarray *pollfd;
> + int err, ctlfd_pos;
> +
> + thread = arg;
> + thread->tid = syscall(SYS_gettid);

We have 'gettid()' in tools/perf, its not in a nice place but we have
tools/build/feature/test-gettid.c to test if gettid() is available in
system headers and if not, then:

tools/perf/jvmti/jvmti_agent.c

#ifndef HAVE_GETTID
static inline pid_t gettid(void)
{
return (pid_t)syscall(__NR_gettid);
}
#endif

I'll move it to a more suitable place so that you can use it here.


> + err = write(thread->pipes.ack[1], &msg, sizeof(msg));
> + if (err == -1)
> + pr_err("threads[%d]: failed to notify on start: %s", thread->tid, strerror(errno));
> +
> + pr_debug("threads[%d]: started on cpu=%d\n", thread->tid, sched_getcpu());
> +
> + pollfd = &thread->pollfd;
> + ctlfd_pos = thread->ctlfd_pos;
> +
> + for (;;) {
> + unsigned long long hits = thread->samples;
> +
> + if (record__mmap_read_all(thread->rec, false) < 0 || terminate)
> + break;
> +
> + if (hits == thread->samples) {
> +
> + err = fdarray__poll(pollfd, -1);
> + /*
> + * Propagate error, only if there's any. Ignore positive
> + * number of returned events and interrupt error.
> + */
> + if (err > 0 || (err < 0 && errno == EINTR))
> + err = 0;
> + thread->waking++;
> +
> + if (fdarray__filter(pollfd, POLLERR | POLLHUP,
> + record__thread_munmap_filtered, NULL) == 0)
> + break;
> + }
> +
> + if (pollfd->entries[ctlfd_pos].revents & POLLHUP) {
> + terminate = true;
> + close(thread->pipes.msg[0]);
> + pollfd->entries[ctlfd_pos].fd = -1;
> + pollfd->entries[ctlfd_pos].events = 0;
> + }
> +
> + pollfd->entries[ctlfd_pos].revents = 0;
> + }
> + record__mmap_read_all(thread->rec, true);
> +
> + err = write(thread->pipes.ack[1], &msg, sizeof(msg));
> + if (err == -1)
> + pr_err("threads[%d]: failed to notify on termination: %s",
> + thread->tid, strerror(errno));
> +
> + return NULL;
> +}
> +
> static void record__init_features(struct record *rec)
> {
> struct perf_session *session = rec->session;
> @@ -1886,13 +1946,59 @@ static int record__terminate_thread(struct thread_data *thread_data)
>
> static int record__start_threads(struct record *rec)
> {
> + int t, tt, ret = 0, nr_threads = rec->nr_threads;
> struct thread_data *thread_data = rec->thread_data;
> + sigset_t full, mask;
> + pthread_t handle;
> + pthread_attr_t attrs;
> +
> + sigfillset(&full);
> + if (sigprocmask(SIG_SETMASK, &full, &mask)) {
> + pr_err("Failed to block signals on threads start: %s\n", strerror(errno));
> + return -1;
> + }
> +
> + pthread_attr_init(&attrs);
> + pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
> +
> + for (t = 1; t < nr_threads; t++) {
> + enum thread_msg msg = THREAD_MSG__UNDEFINED;
> +
> + pthread_attr_setaffinity_np(&attrs,
> + MMAP_CPU_MASK_BYTES(&(thread_data[t].mask->affinity)),
> + (cpu_set_t *)(thread_data[t].mask->affinity.bits));
> +
> + if (pthread_create(&handle, &attrs, record__thread, &thread_data[t])) {
> + for (tt = 1; tt < t; tt++)
> + record__terminate_thread(&thread_data[t]);
> + pr_err("Failed to start threads: %s\n", strerror(errno));
> + ret = -1;
> + goto out_err;
> + }
> +
> + if (read(thread_data[t].pipes.ack[0], &msg, sizeof(msg)) > 0)
> + pr_debug2("threads[%d]: sent %s\n", rec->thread_data[t].tid,
> + thread_msg_tags[msg]);
> + }
> +
> + if (nr_threads > 1) {
> + sched_setaffinity(0, MMAP_CPU_MASK_BYTES(&thread_data[0].mask->affinity),
> + (cpu_set_t *)thread_data[0].mask->affinity.bits);
> + }
>
> thread = &thread_data[0];
>
> pr_debug("threads[%d]: started on cpu=%d\n", thread->tid, sched_getcpu());
>
> - return 0;
> +out_err:
> + pthread_attr_destroy(&attrs);
> +
> + if (sigprocmask(SIG_SETMASK, &mask, NULL)) {
> + pr_err("Failed to unblock signals on threads start: %s\n", strerror(errno));
> + ret = -1;
> + }
> +
> + return ret;
> }
>
> static int record__stop_threads(struct record *rec, unsigned long *waking)
> --
> 2.19.0
>

--

- Arnaldo