Re: [RFC PATCH 06/10] perf workqueue: introduce workqueue struct
From: Riccardo Mancini
Date: Thu Jul 15 2021 - 12:50:03 EST
Hi Arnaldo,
thanks again for having a look at this patchset!
On Wed, 2021-07-14 at 12:22 -0300, Arnaldo Carvalho de Melo wrote:
> Em Tue, Jul 13, 2021 at 02:11:17PM +0200, Riccardo Mancini escreveu:
> > This patch adds the workqueue definition, along with simple creation and
> > destruction functions.
> > Furthermore, a simple subtest is added.
> >
> > A workqueue is attached to a pool, on which it executes its workers.
> > Next patches will introduce workers.
> >
> > Signed-off-by: Riccardo Mancini <rickyman7@xxxxxxxxx>
> >
<SNIP>
> > +
> > +/**
> > + * create_workqueue - create a workqueue associated to @pool
> > + *
> > + * Only one workqueue can execute on a pool at a time.
> > + */
> > +struct workqueue_struct *create_workqueue(struct threadpool_struct *pool)
>
> I wonder if we should use the exact same kernel signature and not pass a
> threadpool, essentially having just one threadpool in tools/perf/ that
> is used by create_workqueue(void)?
I wondered the same thing, but I thought that we'd need it to be dynamically
created to prevent spawning threads at the beginning that might not even be
used.
I think this could be a follow-up patch.
Thanks,
Riccardo
>
> > +{
> > + int err;
> > + struct workqueue_struct *wq = malloc(sizeof(struct
> > workqueue_struct));
> > +
> > +
> > + err = pthread_mutex_init(&wq->lock, NULL);
> > + if (err)
> > + goto out_free_wq;
> > +
> > + err = pthread_cond_init(&wq->idle_cond, NULL);
> > + if (err)
> > + goto out_destroy_mutex;
> > +
> > + wq->pool = NULL;
> > + INIT_LIST_HEAD(&wq->busy_list);
> > + INIT_LIST_HEAD(&wq->idle_list);
> > +
> > + INIT_LIST_HEAD(&wq->pending);
> > +
> > + err = pipe(wq->msg_pipe);
> > + if (err)
> > + goto out_destroy_cond;
> > +
> > + wq->task.fn = worker_thread;
> > +
> > + err = attach_threadpool_to_workqueue(wq, pool);
> > + if (err)
> > + goto out_destroy_cond;
> > +
> > + wq->status = WORKQUEUE_STATUS__READY;
> > +
> > + return wq;
> > +
> > +out_destroy_cond:
> > + pthread_cond_destroy(&wq->idle_cond);
> > +out_destroy_mutex:
> > + pthread_mutex_destroy(&wq->lock);
> > +out_free_wq:
> > + free(wq);
> > + return NULL;
> > +}
> > +
> > +/**
> > + * destroy_workqueue - stop @wq workers and destroy @wq
> > + */
> > +int destroy_workqueue(struct workqueue_struct *wq)
> > +{
> > + int err = 0, ret;
> > +
> > + ret = detach_threadpool_from_workqueue(wq);
> > + if (ret) {
> > + pr_err("workqueue: error detaching from threadpool.\n");
> > + err = -1;
> > + }
> > +
> > + ret = pthread_mutex_destroy(&wq->lock);
> > + if (ret) {
> > + err = -1;
> > + pr_err("workqueue: error pthread_mutex_destroy: %s\n",
> > + strerror(errno));
> > + }
> > +
> > + ret = pthread_cond_destroy(&wq->idle_cond);
> > + if (ret) {
> > + err = -1;
> > + pr_err("workqueue: error pthread_cond_destroy: %s\n",
> > + strerror(errno));
> > + }
> > +
> > + ret = close(wq->msg_pipe[0]);
> > + if (ret) {
> > + err = -1;
> > + pr_err("workqueue: error close msg_pipe[0]: %s\n",
> > + strerror(errno));
> > + }
> > +
> > + ret = close(wq->msg_pipe[1]);
> > + if (ret) {
> > + err = -1;
> > + pr_err("workqueue: error close msg_pipe[1]: %s\n",
> > + strerror(errno));
> > + }
> > +
> > + free(wq);
> > +
> > + return err;
> > +}
> > +
> > +/**
> > + * workqueue_nr_threads - get size of threadpool underlying @wq
> > + */
> > +int workqueue_nr_threads(struct workqueue_struct *wq)
> > +{
> > + return threadpool_size(wq->pool);
> > +}
> > diff --git a/tools/perf/util/workqueue/workqueue.h
> > b/tools/perf/util/workqueue/workqueue.h
> > new file mode 100644
> > index 0000000000000000..86ec1d69274f41db
> > --- /dev/null
> > +++ b/tools/perf/util/workqueue/workqueue.h
> > @@ -0,0 +1,24 @@
> > +/* SPDX-License-Identifier: GPL-2.0 */
> > +#ifndef __WORKQUEUE_WORKQUEUE_H
> > +#define __WORKQUEUE_WORKQUEUE_H
> > +
> > +#include <stdlib.h>
> > +#include <sys/types.h>
> > +#include <linux/list.h>
> > +#include "threadpool.h"
> > +
> > +struct work_struct;
> > +typedef void (*work_func_t)(struct work_struct *work);
> > +
> > +struct work_struct {
> > + struct list_head entry;
> > + work_func_t func;
> > +};
> > +
> > +struct workqueue_struct;
> > +
> > +extern struct workqueue_struct *create_workqueue(struct threadpool_struct
> > *pool);
> > +extern int destroy_workqueue(struct workqueue_struct *wq);
> > +
> > +extern int workqueue_nr_threads(struct workqueue_struct *wq);
> > +#endif /* __WORKQUEUE_WORKQUEUE_H */
> > --
> > 2.31.1
> >
>