Re: [PATCH 6/8] RCU, workqueue: Implement rcu_work
From: Paul E. McKenney
Date: Wed Mar 14 2018 - 16:12:34 EST
On Wed, Mar 14, 2018 at 12:45:13PM -0700, Tejun Heo wrote:
> There are cases where RCU callback needs to be bounced to a sleepable
> context. This is currently done by the RCU callback queueing a work
> item, which can be cumbersome to write and confusing to read.
>
> This patch introduces rcu_work, a workqueue work variant which gets
> executed after a RCU grace period, and converts the open coded
> bouncing in fs/aio and kernel/cgroup.
>
> v3: Dropped queue_rcu_work_on(). Documented rcu grace period behavior
> after queue_rcu_work().
>
> v2: Use rcu_barrier() instead of synchronize_rcu() to wait for
> completion of previously queued rcu callback as per Paul.
>
> Signed-off-by: Tejun Heo <tj@xxxxxxxxxx>
> Cc: "Paul E. McKenney" <paulmck@xxxxxxxxxxxxxxxxxx>
Looks good to me!
Reviewed-by: "Paul E. McKenney" <paulmck@xxxxxxxxxxxxxxxxxx>
> Cc: Linus Torvalds <torvalds@xxxxxxxxxxxxxxxxxxxx>
> ---
> include/linux/workqueue.h | 23 ++++++++++++++++++++
> kernel/workqueue.c | 54 +++++++++++++++++++++++++++++++++++++++++++++++
> 2 files changed, 77 insertions(+)
>
> diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
> index bc0cda1..d026f8f 100644
> --- a/include/linux/workqueue.h
> +++ b/include/linux/workqueue.h
> @@ -13,6 +13,7 @@
> #include <linux/threads.h>
> #include <linux/atomic.h>
> #include <linux/cpumask.h>
> +#include <linux/rcupdate.h>
>
> struct workqueue_struct;
>
> @@ -120,6 +121,14 @@ struct delayed_work {
> int cpu;
> };
>
> +struct rcu_work {
> + struct work_struct work;
> + struct rcu_head rcu;
> +
> + /* target workqueue ->rcu uses to queue ->work */
> + struct workqueue_struct *wq;
> +};
> +
> /**
> * struct workqueue_attrs - A struct for workqueue attributes.
> *
> @@ -151,6 +160,11 @@ static inline struct delayed_work *to_delayed_work(struct work_struct *work)
> return container_of(work, struct delayed_work, work);
> }
>
> +static inline struct rcu_work *to_rcu_work(struct work_struct *work)
> +{
> + return container_of(work, struct rcu_work, work);
> +}
> +
> struct execute_work {
> struct work_struct work;
> };
> @@ -266,6 +280,12 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; }
> #define INIT_DEFERRABLE_WORK_ONSTACK(_work, _func) \
> __INIT_DELAYED_WORK_ONSTACK(_work, _func, TIMER_DEFERRABLE)
>
> +#define INIT_RCU_WORK(_work, _func) \
> + INIT_WORK(&(_work)->work, (_func))
> +
> +#define INIT_RCU_WORK_ONSTACK(_work, _func) \
> + INIT_WORK_ONSTACK(&(_work)->work, (_func))
> +
> /**
> * work_pending - Find out whether a work item is currently pending
> * @work: The work item in question
> @@ -447,6 +467,7 @@ extern bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
> struct delayed_work *work, unsigned long delay);
> extern bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
> struct delayed_work *dwork, unsigned long delay);
> +extern bool queue_rcu_work(struct workqueue_struct *wq, struct rcu_work *rwork);
>
> extern void flush_workqueue(struct workqueue_struct *wq);
> extern void drain_workqueue(struct workqueue_struct *wq);
> @@ -463,6 +484,8 @@ extern bool flush_delayed_work(struct delayed_work *dwork);
> extern bool cancel_delayed_work(struct delayed_work *dwork);
> extern bool cancel_delayed_work_sync(struct delayed_work *dwork);
>
> +extern bool flush_rcu_work(struct rcu_work *rwork);
> +
> extern void workqueue_set_max_active(struct workqueue_struct *wq,
> int max_active);
> extern struct work_struct *current_work(void);
> diff --git a/kernel/workqueue.c b/kernel/workqueue.c
> index bb9a519..7df85fa 100644
> --- a/kernel/workqueue.c
> +++ b/kernel/workqueue.c
> @@ -1604,6 +1604,40 @@ bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
> }
> EXPORT_SYMBOL_GPL(mod_delayed_work_on);
>
> +static void rcu_work_rcufn(struct rcu_head *rcu)
> +{
> + struct rcu_work *rwork = container_of(rcu, struct rcu_work, rcu);
> +
> + /* read the comment in __queue_work() */
> + local_irq_disable();
> + __queue_work(WORK_CPU_UNBOUND, rwork->wq, &rwork->work);
> + local_irq_enable();
> +}
> +
> +/**
> + * queue_rcu_work - queue work after a RCU grace period
> + * @wq: workqueue to use
> + * @rwork: work to queue
> + *
> + * Return: %false if @rwork was already pending, %true otherwise. Note
> + * that a full RCU grace period is guaranteed only after a %true return.
> + * While @rwork is guarnateed to be executed after a %false return, the
> + * execution may happen before a full RCU grace period has passed.
> + */
> +bool queue_rcu_work(struct workqueue_struct *wq, struct rcu_work *rwork)
> +{
> + struct work_struct *work = &rwork->work;
> +
> + if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
> + rwork->wq = wq;
> + call_rcu(&rwork->rcu, rcu_work_rcufn);
> + return true;
> + }
> +
> + return false;
> +}
> +EXPORT_SYMBOL(queue_rcu_work);
> +
> /**
> * worker_enter_idle - enter idle state
> * @worker: worker which is entering idle state
> @@ -3001,6 +3035,26 @@ bool flush_delayed_work(struct delayed_work *dwork)
> }
> EXPORT_SYMBOL(flush_delayed_work);
>
> +/**
> + * flush_rcu_work - wait for a rwork to finish executing the last queueing
> + * @rwork: the rcu work to flush
> + *
> + * Return:
> + * %true if flush_rcu_work() waited for the work to finish execution,
> + * %false if it was already idle.
> + */
> +bool flush_rcu_work(struct rcu_work *rwork)
> +{
> + if (test_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&rwork->work))) {
> + rcu_barrier();
> + flush_work(&rwork->work);
> + return true;
> + } else {
> + return flush_work(&rwork->work);
> + }
> +}
> +EXPORT_SYMBOL(flush_rcu_work);
> +
> static bool __cancel_work(struct work_struct *work, bool is_dwork)
> {
> unsigned long flags;
> --
> 2.9.5
>