Re: [PATCH RFC 09/16] prcu: Implement prcu_barrier() API

From: Paul E. McKenney
Date: Thu Jan 25 2018 - 01:31:32 EST


On Tue, Jan 23, 2018 at 03:59:34PM +0800, lianglihao@xxxxxxxxxx wrote:
> From: Lihao Liang <lianglihao@xxxxxxxxxx>
>
> This is PRCU's counterpart of RCU's rcu_barrier() API.
>
> Reviewed-by: Heng Zhang <heng.z@xxxxxxxxxx>
> Signed-off-by: Lihao Liang <lianglihao@xxxxxxxxxx>
> ---
> include/linux/prcu.h | 7 ++++++
> kernel/rcu/prcu.c | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++++
> 2 files changed, 70 insertions(+)
>
> diff --git a/include/linux/prcu.h b/include/linux/prcu.h
> index 4e7d5d65..cce967fd 100644
> --- a/include/linux/prcu.h
> +++ b/include/linux/prcu.h
> @@ -5,6 +5,7 @@
> #include <linux/types.h>
> #include <linux/mutex.h>
> #include <linux/wait.h>
> +#include <linux/completion.h>
>
> #define CONFIG_PRCU
>
> @@ -32,6 +33,7 @@ struct prcu_local_struct {
> unsigned int online;
> unsigned long long version;
> unsigned long long cb_version;
> + struct rcu_head barrier_head;
> struct prcu_cblist cblist;
> };
>
> @@ -39,8 +41,11 @@ struct prcu_struct {
> atomic64_t global_version;
> atomic64_t cb_version;
> atomic_t active_ctr;
> + atomic_t barrier_cpu_count;
> struct mutex mtx;
> + struct mutex barrier_mtx;
> wait_queue_head_t wait_q;
> + struct completion barrier_completion;
> };
>
> #ifdef CONFIG_PRCU
> @@ -48,6 +53,7 @@ void prcu_read_lock(void);
> void prcu_read_unlock(void);
> void synchronize_prcu(void);
> void call_prcu(struct rcu_head *head, rcu_callback_t func);
> +void prcu_barrier(void);
> void prcu_init(void);
> void prcu_note_context_switch(void);
> int prcu_pending(void);
> @@ -60,6 +66,7 @@ void prcu_check_callbacks(void);
> #define prcu_read_unlock() do {} while (0)
> #define synchronize_prcu() do {} while (0)
> #define call_prcu() do {} while (0)
> +#define prcu_barrier() do {} while (0)
> #define prcu_init() do {} while (0)
> #define prcu_note_context_switch() do {} while (0)
> #define prcu_pending() 0
> diff --git a/kernel/rcu/prcu.c b/kernel/rcu/prcu.c
> index 373039c5..2664d091 100644
> --- a/kernel/rcu/prcu.c
> +++ b/kernel/rcu/prcu.c
> @@ -15,6 +15,7 @@ struct prcu_struct global_prcu = {
> .cb_version = ATOMIC64_INIT(0),
> .active_ctr = ATOMIC_INIT(0),
> .mtx = __MUTEX_INITIALIZER(global_prcu.mtx),
> + .barrier_mtx = __MUTEX_INITIALIZER(global_prcu.barrier_mtx),
> .wait_q = __WAIT_QUEUE_HEAD_INITIALIZER(global_prcu.wait_q)
> };
> struct prcu_struct *prcu = &global_prcu;
> @@ -250,6 +251,68 @@ static __latent_entropy void prcu_process_callbacks(struct softirq_action *unuse
> local_irq_restore(flags);
> }
>
> +/*
> + * PRCU callback function for prcu_barrier().
> + * If we are last, wake up the task executing prcu_barrier().
> + */
> +static void prcu_barrier_callback(struct rcu_head *rhp)
> +{
> + if (atomic_dec_and_test(&prcu->barrier_cpu_count))
> + complete(&prcu->barrier_completion);
> +}
> +
> +/*
> + * Called with preemption disabled, and from cross-cpu IRQ context.
> + */
> +static void prcu_barrier_func(void *info)
> +{
> + struct prcu_local_struct *local = this_cpu_ptr(&prcu_local);
> +
> + atomic_inc(&prcu->barrier_cpu_count);
> + call_prcu(&local->barrier_head, prcu_barrier_callback);
> +}
> +
> +/* Waiting for all PRCU callbacks to complete. */
> +void prcu_barrier(void)
> +{
> + int cpu;
> +
> + /* Take mutex to serialize concurrent prcu_barrier() requests. */
> + mutex_lock(&prcu->barrier_mtx);
> +
> + /*
> + * Initialize the count to one rather than to zero in order to
> + * avoid a too-soon return to zero in case of a short grace period
> + * (or preemption of this task).
> + */
> + init_completion(&prcu->barrier_completion);
> + atomic_set(&prcu->barrier_cpu_count, 1);
> +
> + /*
> + * Register a new callback on each CPU using IPI to prevent races
> + * with call_prcu(). When that callback is invoked, we will know
> + * that all of the corresponding CPU's preceding callbacks have
> + * been invoked.
> + */
> + for_each_possible_cpu(cpu)
> + smp_call_function_single(cpu, prcu_barrier_func, NULL, 1);

This code seems to be assuming CONFIG_HOTPLUG_CPU=n. This might explain
your rcutorture failure.

> + /* Decrement the count as we initialize it to one. */
> + if (atomic_dec_and_test(&prcu->barrier_cpu_count))
> + complete(&prcu->barrier_completion);
> +
> + /*
> + * Now that we have an prcu_barrier_callback() callback on each
> + * CPU, and thus each counted, remove the initial count.
> + * Wait for all prcu_barrier_callback() callbacks to be invoked.
> + */
> + wait_for_completion(&prcu->barrier_completion);
> +
> + /* Other rcu_barrier() invocations can now safely proceed. */
> + mutex_unlock(&prcu->barrier_mtx);
> +}
> +EXPORT_SYMBOL(prcu_barrier);
> +
> void prcu_init_local_struct(int cpu)
> {
> struct prcu_local_struct *local;
> --
> 2.14.1.729.g59c0ea183
>