Re: [RFC PATCH 03/10] perf workqueue: add threadpool start and stop functions

From: Riccardo Mancini
Date: Fri Jul 16 2021 - 09:54:18 EST


Hi Namhyung,

On Thu, 2021-07-15 at 16:48 -0700, Namhyung Kim wrote:
> On Tue, Jul 13, 2021 at 5:11 AM Riccardo Mancini <rickyman7@xxxxxxxxx> wrote:
> >
> > This patch adds the start and stop functions, alongside the thread
> > function.
> > Each thread will run until a stop signal is received.
> > Furthermore, start and stop are added to the test.
> >
> > Thread management is based on the prototype from Alexey:
> > https://lore.kernel.org/lkml/cover.1625227739.git.alexey.v.bayduraev@xxxxxxxxxxxxxxx/
> >
> > Suggested-by: Alexey Bayduraev <alexey.v.bayduraev@xxxxxxxxxxxxxxx>
> > Signed-off-by: Riccardo Mancini <rickyman7@xxxxxxxxx>
> > ---
> >  tools/perf/tests/workqueue.c           |  13 ++
> >  tools/perf/util/workqueue/threadpool.c | 238 +++++++++++++++++++++++++
> >  tools/perf/util/workqueue/threadpool.h |   5 +
> >  3 files changed, 256 insertions(+)
> >
> > diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
> > index 1bd4d78c13eb3b14..be377e9897bab4e9 100644
> > --- a/tools/perf/tests/workqueue.c
> > +++ b/tools/perf/tests/workqueue.c
> > @@ -10,16 +10,29 @@ struct threadpool_test_args_t {
> >
> >  static int __threadpool__prepare(struct threadpool_struct **pool, int
> > pool_size)
> >  {
> > +       int ret;
> > +
> >         *pool = create_threadpool(pool_size);
> >         TEST_ASSERT_VAL("threadpool creation failure", *pool != NULL);
> >         TEST_ASSERT_VAL("threadpool size is wrong",
> >                         threadpool_size(*pool) == pool_size);
> >
> > +       ret = start_threadpool(*pool);
> > +       TEST_ASSERT_VAL("threadpool start failure", ret == 0);
> > +       TEST_ASSERT_VAL("threadpool is not ready",
> > threadpool_is_ready(*pool));
> > +
> >         return 0;
> >  }
> >
> >  static int __threadpool__teardown(struct threadpool_struct *pool)
> >  {
> > +       int ret;
> > +
> > +       ret = stop_threadpool(pool);
> > +       TEST_ASSERT_VAL("threadpool start failure", ret == 0);
>
> s/start/stop/
Thanks.
>
> > +       TEST_ASSERT_VAL("stopped threadpool is ready",
> > +                       !threadpool_is_ready(pool));
> > +
> >         destroy_threadpool(pool);
> >
> >         return 0;
> > diff --git a/tools/perf/util/workqueue/threadpool.c
> > b/tools/perf/util/workqueue/threadpool.c
> > index 70c67569f956a3e2..f4635ff782b9388e 100644
> > --- a/tools/perf/util/workqueue/threadpool.c
> > +++ b/tools/perf/util/workqueue/threadpool.c
> [SNIP]
> > +/**
> > + * wait_thread - receive ack from thread
> > + *
> > + * NB: call only from main thread!
> > + */
> > +static int wait_thread(struct thread_struct *thread)
> > +{
> > +       int res;
> > +       enum thread_msg msg = THREAD_MSG__UNDEFINED;
> > +
> > +       res = read(thread->pipes.from[0], &msg, sizeof(msg));
> > +       if (res < 0) {
>
> Maybe it needs to handle -EINTR.

Its behaviour should be retry, right?
Since these reads are used multiple times in the code, maybe I'm better off
writing a wrapper function handling also EINTR.

>
> > +               pr_err("threadpool: failed to recv msg from tid=%d: %s\n",
> > +                      thread->tid, strerror(errno));
> > +               return -1;
> > +       }
> > +       if (msg != THREAD_MSG__ACK) {
> > +               pr_err("threadpool: received unexpected msg from tid=%d:
> > %s\n",
> > +                      thread->tid, thread_msg_tags[msg]);
> > +               return -1;
> > +       }
> > +
> > +       pr_debug2("threadpool: received ack from tid=%d\n", thread->tid);
> > +
> > +       return 0;
> > +}
> >
<SNIP>
> > +static int __start_threadpool(struct threadpool_struct *pool)
> > +{
> > +       int t, tt, ret = 0, nr_threads = pool->nr_threads;
> > +       sigset_t full, mask;
> > +       pthread_t handle;
> > +       pthread_attr_t attrs;
> > +
> > +       sigfillset(&full);
> > +       if (sigprocmask(SIG_SETMASK, &full, &mask)) {
> > +               pr_err("Failed to block signals on threads start: %s\n",
> > +                       strerror(errno));
> > +               return -1;
> > +       }
> > +
> > +       pthread_attr_init(&attrs);
> > +       pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
> > +
> > +       for (t = 0; t < nr_threads; t++) {
> > +               struct thread_struct *thread = &pool->threads[t];
> > +
> > +               if (pthread_create(&handle, &attrs, threadpool_thread,
> > thread)) {
> > +                       for (tt = 1; tt < t; tt++)
> > +                               terminate_thread(thread);
> > +                       pr_err("Failed to start threads: %s\n",
> > strerror(errno));
> > +                       ret = -1;
> > +                       goto out_free_attr;
> > +               }
> > +
> > +               if (wait_thread(thread)) {
> > +                       for (tt = 1; tt <= t; tt++)
> > +                               terminate_thread(thread);
> > +                       ret = -1;
> > +                       goto out_free_attr;
> > +               }
> > +       }
>
> Isn't it better doing this way?
>
> for (t = 0; t < nr_threads; t++) {
>     pthread_create(t)
> }
>
> for (t = 0; t < nr_threads; t++) {
>     wait_thread(t)
> }

I wondered the same thing, but I saw that it was done like that also in Alexey
patch, so I kept it like so.
To me, it also looks like it should be not a problem doing as you suggest. It
should also be more efficient.

Thanks,
Riccardo

>
> Thanks,
> Namhyung
>
>
> > +
> > +out_free_attr:
> > +       pthread_attr_destroy(&attrs);
> > +
> > +       if (sigprocmask(SIG_SETMASK, &mask, NULL)) {
> > +               pr_err("Failed to unblock signals on threads start: %s\n",
> > +                       strerror(errno));
> > +               ret = -1;
> > +       }
> > +
> > +       return ret;
> > +}
> > +