Re: [PATCH v6 05/20] perf record: Start threads in the beginning of trace streaming

From: Riccardo Mancini
Date: Thu Jun 03 2021 - 19:02:40 EST


Hi,

On Wed, 2021-05-26 at 13:52 +0300, Alexey Bayduraev wrote:
> Start thread in detached state because its management is implemented
> via messaging to avoid any scaling issues. Block signals prior thread
> start so only main tool thread would be notified on external async
> signals during data collection. Thread affinity mask is used to assign
> eligible cpus for the thread to run. Wait and sync on thread start using
> thread ack pipe.
>
> Signed-off-by: Alexey Bayduraev <alexey.v.bayduraev@xxxxxxxxxxxxxxx>
> ---
>  tools/perf/builtin-record.c | 106 +++++++++++++++++++++++++++++++++++-
>  1 file changed, 105 insertions(+), 1 deletion(-)
>
> diff --git a/tools/perf/builtin-record.c b/tools/perf/builtin-record.c
> index 838c1f779849..88fad12cbe5b 100644
> --- a/tools/perf/builtin-record.c
> +++ b/tools/perf/builtin-record.c
> @@ -1423,6 +1423,66 @@ static void record__thread_munmap_filtered(struct
> fdarray *fda, int fd,
>                 perf_mmap__put(map);
>  }
>  
> +static void *record__thread(void *arg)
> +{
> +       enum thread_msg msg = THREAD_MSG__READY;
> +       bool terminate = false;
> +       struct fdarray *pollfd;
> +       int err, ctlfd_pos;
> +
> +       thread = arg;
> +       thread->tid = syscall(SYS_gettid);
> +
> +       err = write(thread->pipes.ack[1], &msg, sizeof(msg));
> +       if (err == -1)
> +               pr_err("threads[%d]: failed to notify on start: %s", thread-
> >tid, strerror(errno));
> +
> +       pr_debug("threads[%d]: started on cpu=%d\n", thread->tid,
> sched_getcpu());
> +
> +       pollfd = &thread->pollfd;
> +       ctlfd_pos = thread->ctlfd_pos;
> +
> +       for (;;) {
> +               unsigned long long hits = thread->samples;
> +
> +               if (record__mmap_read_all(thread->rec, false) < 0 ||
> terminate)
> +                       break;
> +
> +               if (hits == thread->samples) {
> +
> +                       err = fdarray__poll(pollfd, -1);
> +                       /*
> +                        * Propagate error, only if there's any. Ignore
> positive
> +                        * number of returned events and interrupt error.
> +                        */
> +                       if (err > 0 || (err < 0 && errno == EINTR))
> +                               err = 0;
> +                       thread->waking++;
> +
> +                       if (fdarray__filter(pollfd, POLLERR | POLLHUP,
> +                                           record__thread_munmap_filtered,
> NULL) == 0)
> +                               break;
> +               }
> +
> +               if (pollfd->entries[ctlfd_pos].revents & POLLHUP) {
> +                       terminate = true;
> +                       close(thread->pipes.msg[0]);
> +                       pollfd->entries[ctlfd_pos].fd = -1;
> +                       pollfd->entries[ctlfd_pos].events = 0;
> +               }
> +
> +               pollfd->entries[ctlfd_pos].revents = 0;
> +       }
> +       record__mmap_read_all(thread->rec, true);
> +
> +       err = write(thread->pipes.ack[1], &msg, sizeof(msg));
> +       if (err == -1)
> +               pr_err("threads[%d]: failed to notify on termination: %s",
> +                      thread->tid, strerror(errno));
> +
> +       return NULL;
> +}
> +
>  static void record__init_features(struct record *rec)
>  {
>         struct perf_session *session = rec->session;
> @@ -1886,13 +1946,57 @@ static int record__terminate_thread(struct thread_data
> *thread_data)
>  
>  static int record__start_threads(struct record *rec)
>  {
> +       int t, tt, ret = 0, nr_threads = rec->nr_threads;
>         struct thread_data *thread_data = rec->thread_data;
> +       sigset_t full, mask;
> +       pthread_t handle;
> +       pthread_attr_t attrs;
> +
> +       sigfillset(&full);
> +       if (sigprocmask(SIG_SETMASK, &full, &mask)) {
> +               pr_err("Failed to block signals on threads start: %s\n",
> strerror(errno));
> +               return -1;
> +       }
> +
> +       pthread_attr_init(&attrs);
> +       pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
> +
> +       for (t = 1; t < nr_threads; t++) {
> +               enum thread_msg msg = THREAD_MSG__UNDEFINED;
> +
> +               pthread_attr_setaffinity_np(&attrs,
> +                                          
> MMAP_CPU_MASK_BYTES(&(thread_data[t].mask->affinity)),
> +                                           (cpu_set_t *)(thread_data[t].mask-
> >affinity.bits));
> +
> +               if (pthread_create(&handle, &attrs, record__thread,
> &thread_data[t])) {
> +                       for (tt = 1; tt < t; tt++)
> +                               record__terminate_thread(&thread_data[t]);
> +                       pr_err("Failed to start threads: %s\n",
> strerror(errno));
> +                       ret = -1;
> +                       goto out_err;
> +               }
> +
> +               if (read(thread_data[t].pipes.ack[0], &msg, sizeof(msg)) > 0)
> +                       pr_debug2("threads[%d]: sent %s\n", rec-
> >thread_data[t].tid,
> +                                thread_msg_tags[msg]);
> +       }
> +
> +       if (nr_threads > 1) {
> +               sched_setaffinity(0, MMAP_CPU_MASK_BYTES(&thread_data[0].mask-
> >affinity),
> +                                 (cpu_set_t *)thread_data[0].mask-
> >affinity.bits);
> +       }
>  
>         thread = &thread_data[0];
>  
>         pr_debug("threads[%d]: started on cpu=%d\n", thread->tid,
> sched_getcpu());
>  
> -       return 0;
> +out_err:
> +       if (sigprocmask(SIG_SETMASK, &mask, NULL)) {
> +               pr_err("Failed to unblock signals on threads start: %s\n",
> strerror(errno));
> +               ret = -1;
> +       }
> +
> +       return ret;
>  }

ASan complains of a memory leak of the attrs, since pthread_attr_destroy is
missing. It could be added just after out_err label.

Thanks,
Riccardo

>  
>  static int record__stop_threads(struct record *rec, unsigned long *waking)