Re: [RFC PATCH 01/10] perf workqueue: threadpool creation and destruction
From: Arnaldo Carvalho de Melo
Date: Wed Jul 14 2021 - 10:16:10 EST
Em Tue, Jul 13, 2021 at 02:11:12PM +0200, Riccardo Mancini escreveu:
> The workqueue library is made up by two components:
> - threadpool: handles the lifetime of the threads
> - workqueue: handles work distribution among the threads
>
> This first patch introduces the threadpool, starting from its creation
> and destruction functions.
> Thread management is based on the prototype from Alexey:
> https://lore.kernel.org/lkml/cover.1625227739.git.alexey.v.bayduraev@xxxxxxxxxxxxxxx/
>
> Each thread in the threadpool executes the same function (aka task)
> with a different argument tidx.
> Threads use a pair of pipes to communicate with the main process.
> The threadpool is static (all threads will be spawned at the same time).
> Future work could include making it resizable and adding affinity support
> (as in Alexey prototype).
>
> Suggested-by: Alexey Bayduraev <alexey.v.bayduraev@xxxxxxxxxxxxxxx>
> Signed-off-by: Riccardo Mancini <rickyman7@xxxxxxxxx>
> ---
> tools/perf/util/Build | 1 +
> tools/perf/util/workqueue/Build | 1 +
> tools/perf/util/workqueue/threadpool.c | 175 +++++++++++++++++++++++++
> tools/perf/util/workqueue/threadpool.h | 19 +++
> 4 files changed, 196 insertions(+)
> create mode 100644 tools/perf/util/workqueue/Build
> create mode 100644 tools/perf/util/workqueue/threadpool.c
> create mode 100644 tools/perf/util/workqueue/threadpool.h
>
> diff --git a/tools/perf/util/Build b/tools/perf/util/Build
> index 2d4fa13041789cd6..c7b09701661c869d 100644
> --- a/tools/perf/util/Build
> +++ b/tools/perf/util/Build
> @@ -180,6 +180,7 @@ perf-$(CONFIG_LIBBABELTRACE) += data-convert-bt.o
> perf-y += data-convert-json.o
>
> perf-y += scripting-engines/
> +perf-y += workqueue/
>
> perf-$(CONFIG_ZLIB) += zlib.o
> perf-$(CONFIG_LZMA) += lzma.o
> diff --git a/tools/perf/util/workqueue/Build b/tools/perf/util/workqueue/Build
> new file mode 100644
> index 0000000000000000..8b72a6cd4e2cba0d
> --- /dev/null
> +++ b/tools/perf/util/workqueue/Build
> @@ -0,0 +1 @@
> +perf-y += threadpool.o
> diff --git a/tools/perf/util/workqueue/threadpool.c b/tools/perf/util/workqueue/threadpool.c
> new file mode 100644
> index 0000000000000000..70c67569f956a3e2
> --- /dev/null
> +++ b/tools/perf/util/workqueue/threadpool.c
> @@ -0,0 +1,175 @@
> +// SPDX-License-Identifier: GPL-2.0
> +#include <stdlib.h>
> +#include <stdio.h>
> +#include <unistd.h>
> +#include <errno.h>
> +#include <string.h>
> +#include "debug.h"
> +#include "asm/bug.h"
> +#include "threadpool.h"
> +
> +enum threadpool_status {
> + THREADPOOL_STATUS__STOPPED, /* no threads */
> + THREADPOOL_STATUS__ERROR, /* errors */
> + THREADPOOL_STATUS__MAX
> +};
> +
> +struct threadpool_struct {
Can this be just 'struct threadpool'? I think its descriptive enough:
> + int nr_threads; /* number of threads in the pool */
> + struct thread_struct *threads; /* array of threads in the pool */
> + struct task_struct *current_task; /* current executing function */
> + enum threadpool_status status; /* current status of the pool */
> +};
> +
> +struct thread_struct {
> + int idx; /* idx of thread in pool->threads */
> + pid_t tid; /* tid of thread */
> + struct threadpool_struct *pool; /* parent threadpool */
> + struct {
> + int from[2]; /* messages from thread (acks) */
> + int to[2]; /* messages to thread (commands) */
> + } pipes;
> +};
This one, since we have already a 'struct thread' in tools/perf, to
represent a PERF_RECORD_FORK, perhaps we can call it 'struct threadpool_entry'?
> +
> +/**
> + * init_pipes - initialize all pipes of @thread
> + */
> +static void init_pipes(struct thread_struct *thread)
> +{
> + thread->pipes.from[0] = -1;
> + thread->pipes.from[1] = -1;
> + thread->pipes.to[0] = -1;
> + thread->pipes.to[1] = -1;
> +}
> +
> +/**
> + * open_pipes - open all pipes of @thread
> + */
> +static int open_pipes(struct thread_struct *thread)
Here please:
threadpool_entry__open_pipes()
Its longer, but helps with ctags/cscope navigation and we can go
directly to it via:
:ta threadpool_entry__open_p<TAB>
While 'ta: open_pipes' may bo to various places where this idiom is
used.
> +{
> + if (pipe(thread->pipes.from)) {
> + pr_err("threadpool: failed to create comm pipe 'from': %s\n",
> + strerror(errno));
> + return -ENOMEM;
> + }
> +
> + if (pipe(thread->pipes.to)) {
> + pr_err("threadpool: failed to create comm pipe 'to': %s\n",
> + strerror(errno));
> + close(thread->pipes.from[0]);
> + thread->pipes.from[0] = -1;
> + close(thread->pipes.from[1]);
> + thread->pipes.from[1] = -1;
> + return -ENOMEM;
> + }
> +
> + return 0;
> +}
> +
> +/**
> + * close_pipes - close all communication pipes of @thread
> + */
> +static void close_pipes(struct thread_struct *thread)
> +{
> + if (thread->pipes.from[0] != -1) {
> + close(thread->pipes.from[0]);
> + thread->pipes.from[0] = -1;
> + }
> + if (thread->pipes.from[1] != -1) {
> + close(thread->pipes.from[1]);
> + thread->pipes.from[1] = -1;
> + }
> + if (thread->pipes.to[0] != -1) {
> + close(thread->pipes.to[0]);
> + thread->pipes.to[0] = -1;
> + }
> + if (thread->pipes.to[1] != -1) {
> + close(thread->pipes.to[1]);
> + thread->pipes.to[1] = -1;
> + }
> +}
> +
> +/**
> + * create_threadpool - create a fixed threadpool with @n_threads threads
> + */
> +struct threadpool_struct *create_threadpool(int n_threads)
Is this already something the kernel has and thus we should keep the
naming? I couldn't find it in the kernel, so please name it:
struct threadpool *threadpool__new(int nthreads)
> +{
> + int ret, t;
> + struct threadpool_struct *pool = malloc(sizeof(*pool));
> +
> + if (!pool) {
> + pr_err("threadpool: cannot allocate pool: %s\n",
> + strerror(errno));o
Humm, pr_err() at this level isn't appropriate, please make callers
complain.
> + return NULL;
> + }
> +
> + if (n_threads <= 0) {
> + pr_err("threadpool: invalid number of threads: %d\n",
> + n_threads);
pr_debug()
> + goto out_free_pool;
> + }
> +
> + pool->nr_threads = n_threads;
> + pool->current_task = NULL;
> +
> + pool->threads = malloc(n_threads * sizeof(*pool->threads));
> + if (!pool->threads) {
> + pr_err("threadpool: cannot allocate threads: %s\n",
> + strerror(errno));
> + goto out_free_pool;
> + }
> +
> + for (t = 0; t < n_threads; t++) {
> + pool->threads[t].idx = t;
> + pool->threads[t].tid = -1;
> + pool->threads[t].pool = pool;
> + init_pipes(&pool->threads[t]);
> + }
> +
> + for (t = 0; t < n_threads; t++) {
> + ret = open_pipes(&pool->threads[t]);
> + if (ret)
> + goto out_close_pipes;
> + }
> +
> + pool->status = THREADPOOL_STATUS__STOPPED;
> +
> + return pool;
> +
> +out_close_pipes:
> + for (t = 0; t < n_threads; t++)
> + close_pipes(&pool->threads[t]);
> +
> + free(pool->threads);
> +out_free_pool:
> + free(pool);
> + return NULL;
Here we can use ERR_PTR()/PTR_ERR() to let the caller know what was the
problem, i.e. we can ditch all the pr_err/pr_debug(), etc and instead
have a threadpool__strerror(struct threadpool *pool, int err) like we
have for 'struct evsel', please take a look at evsel__open_strerror().
> +}
> +
> +/**
> + * destroy_threadpool - free the @pool and all its resources
> + */
> +void destroy_threadpool(struct threadpool_struct *pool)
void threadpool__delete(struct threadpool *pool)
> +{
> + int t;
> +
> + if (!pool)
> + return;
> +
> + WARN_ON(pool->status != THREADPOOL_STATUS__STOPPED
> + && pool->status != THREADPOOL_STATUS__ERROR);
> +
> + for (t = 0; t < pool->nr_threads; t++)
> + close_pipes(&pool->threads[t]);
reset pool->threads[t] to -1
> +
> + free(pool->threads);
zfree
> + free(pool);
> +}
> +
> +/**
> + * threadpool_size - get number of threads in the threadpool
> + */
> +int threadpool_size(struct threadpool_struct *pool)
threadpool__size()
> +{
> + return pool->nr_threads;
> +}
> diff --git a/tools/perf/util/workqueue/threadpool.h b/tools/perf/util/workqueue/threadpool.h
> new file mode 100644
> index 0000000000000000..2b9388c768a0b588
> --- /dev/null
> +++ b/tools/perf/util/workqueue/threadpool.h
> @@ -0,0 +1,19 @@
> +/* SPDX-License-Identifier: GPL-2.0 */
> +#ifndef __WORKQUEUE_THREADPOOL_H
> +#define __WORKQUEUE_THREADPOOL_H
> +
> +struct threadpool_struct;
> +struct task_struct;
> +
> +typedef void (*task_func_t)(int tidx, struct task_struct *task);
> +
> +struct task_struct {
> + task_func_t fn;
> +};
> +
> +extern struct threadpool_struct *create_threadpool(int n_threads);
> +extern void destroy_threadpool(struct threadpool_struct *pool);
> +
> +extern int threadpool_size(struct threadpool_struct *pool);
> +
> +#endif /* __WORKQUEUE_THREADPOOL_H */
> --
> 2.31.1
>
--
- Arnaldo