Re: [RFC PATCH 07/10] rcu: Wake NOCB rcuog kthreads on expedited grace period completion

From: Puranjay Mohan

Date: Wed Jun 24 2026 - 12:20:31 EST


Hi Frederic,

Sorry for the late reply.

I have sent another version addressing all your comments:
https://lore.kernel.org/all/20260624132356.516959-1-puranjay@xxxxxxxxxx/

Thanks,
Puranjay

On Wed, Jun 3, 2026 at 3:07 PM Frederic Weisbecker <frederic@xxxxxxxxxx> wrote:
>
> Le Fri, Apr 17, 2026 at 04:11:55PM -0700, Puranjay Mohan a écrit :
> > When an expedited grace period completes, rcu_exp_wait_wake() wakes
> > waiters on rnp->exp_wq[] but does not notify NOCB rcuog kthreads. These
> > kthreads may be sleeping waiting for a grace period to complete.
> > Without this wakeup, callbacks on offloaded CPUs that could benefit from
> > the expedited GP must wait until the rcuog kthread wakes for some other
> > reason (e.g., next normal GP completion or a timer).
> >
> > Add rcu_exp_wake_nocb() which wakes rcuog kthreads for leaf-node CPUs,
> > deduplicating via rdp->nocb_gp_rdp since multiple CPUs share one rcuog
> > kthread. Uses for_each_leaf_node_possible_cpu() because offline CPUs
> > can have pending callbacks. The function is defined in tree_nocb.h with
> > an empty stub for CONFIG_RCU_NOCB_CPU=n builds.
> >
> > Reviewed-by: Paul E. McKenney <paulmck@xxxxxxxxxx>
> > Signed-off-by: Puranjay Mohan <puranjay@xxxxxxxxxx>
> > ---
> > kernel/rcu/tree.h | 1 +
> > kernel/rcu/tree_exp.h | 1 +
> > kernel/rcu/tree_nocb.h | 29 +++++++++++++++++++++++++++++
> > 3 files changed, 31 insertions(+)
> >
> > diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
> > index 7dfc57e9adb1..40f778453591 100644
> > --- a/kernel/rcu/tree.h
> > +++ b/kernel/rcu/tree.h
> > @@ -500,6 +500,7 @@ static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp);
> > static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq);
> > static void rcu_init_one_nocb(struct rcu_node *rnp);
> > static bool wake_nocb_gp(struct rcu_data *rdp);
> > +static void rcu_exp_wake_nocb(struct rcu_node *rnp);
> > static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> > unsigned long j, bool lazy);
> > static void call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *head,
> > diff --git a/kernel/rcu/tree_exp.h b/kernel/rcu/tree_exp.h
> > index 82cada459e5d..0df1009c6e97 100644
> > --- a/kernel/rcu/tree_exp.h
> > +++ b/kernel/rcu/tree_exp.h
> > @@ -708,6 +708,7 @@ static void rcu_exp_wait_wake(unsigned long s)
> > }
> > smp_mb(); /* All above changes before wakeup. */
> > wake_up_all(&rnp->exp_wq[rcu_seq_ctr(s) & 0x3]);
> > + rcu_exp_wake_nocb(rnp);
> > }
> > trace_rcu_exp_grace_period(rcu_state.name, s, TPS("endwake"));
> > mutex_unlock(&rcu_state.exp_wake_mutex);
> > diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
> > index 7462cd5e2507..f37ee56d62a9 100644
> > --- a/kernel/rcu/tree_nocb.h
> > +++ b/kernel/rcu/tree_nocb.h
> > @@ -190,6 +190,31 @@ static void rcu_init_one_nocb(struct rcu_node *rnp)
> > init_swait_queue_head(&rnp->nocb_gp_wq[1]);
> > }
> >
> > +/*
> > + * Wake NOCB rcuog kthreads for leaf-node CPUs so that they can advance
> > + * callbacks that were waiting for the just-completed expedited GP.
> > + * Deduplicate via nocb_gp_rdp since multiple CPUs share one rcuog
> > + * kthread. Use for_each_leaf_node_possible_cpu() because offline CPUs
> > + * may have pending callbacks.
> > + */
> > +static void rcu_exp_wake_nocb(struct rcu_node *rnp)
>
> Please consolidate the naming to match rcu_nocb_gp_cleanup().
>
> > +{
> > + struct rcu_data *last_rdp_gp = NULL;
> > + int cpu;
> > +
> > + if (!rcu_is_leaf_node(rnp))
> > + return;
> > +
> > + for_each_leaf_node_possible_cpu(rnp, cpu) {
> > + struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
> > +
> > + if (rdp->nocb_gp_rdp == last_rdp_gp)
> > + continue;
> > + last_rdp_gp = rdp->nocb_gp_rdp;
> > + wake_nocb_gp(rdp);
> > + }
>
> There are two waitqueues for rcuog wake-ups:
>
> 1) rdp->rdp_gp->nocb_gp_wq: to wait for callbacks on the queue
> 2) rnp->nocb_gp_wq: to wait for grace periods
>
> So you're waking up the wrong one.
>
> Something like the below? (untested)
>
> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> index 0e43866dc4cd..436e12e313c2 100644
> --- a/kernel/rcu/tree.c
> +++ b/kernel/rcu/tree.c
> @@ -2193,8 +2193,13 @@ static noinline void rcu_gp_cleanup(void)
> dump_blkd_tasks(rnp, 10);
> WARN_ON_ONCE(rnp->qsmask);
> WRITE_ONCE(rnp->gp_seq, new_gp_seq);
> - if (!rnp->parent)
> - smp_mb(); // Order against failing poll_state_synchronize_rcu_full().
> + if (!rnp->parent) {
> + /*
> + * Order against failing poll_state_synchronize_rcu_full().
> + * and also rcu_nocb_cleanup_wake() -> swait_active()
> + */
> + smp_mb();
> + }
> rdp = this_cpu_ptr(&rcu_data);
> if (rnp == rdp->mynode)
> needgp = __note_gp_changes(rnp, rdp) || needgp;
> diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
> index 40f778453591..8f272cb4e4f3 100644
> --- a/kernel/rcu/tree.h
> +++ b/kernel/rcu/tree.h
> @@ -253,7 +253,7 @@ struct rcu_data {
> u8 nocb_gp_sleep; /* Is the nocb GP thread asleep? */
> u8 nocb_gp_bypass; /* Found a bypass on last scan? */
> u8 nocb_gp_gp; /* GP to wait for on last scan? */
> - unsigned long nocb_gp_seq; /* If so, ->gp_seq to wait for. */
> + struct rcu_gp_oldstate nocb_gp_seq; /* If so, ->gp_seq to wait for. */
> unsigned long nocb_gp_loops; /* # passes through wait code. */
> struct swait_queue_head nocb_gp_wq; /* For nocb kthreads to sleep on. */
> bool nocb_cb_sleep; /* Is the nocb CB thread asleep? */
> @@ -498,9 +498,9 @@ static bool rcu_preempt_need_deferred_qs(struct task_struct *t);
> static void zero_cpu_stall_ticks(struct rcu_data *rdp);
> static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp);
> static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq);
> +static void rcu_nocb_exp_cleanup(struct rcu_node *rnp);
> static void rcu_init_one_nocb(struct rcu_node *rnp);
> static bool wake_nocb_gp(struct rcu_data *rdp);
> -static void rcu_exp_wake_nocb(struct rcu_node *rnp);
> static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> unsigned long j, bool lazy);
> static void call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *head,
> diff --git a/kernel/rcu/tree_exp.h b/kernel/rcu/tree_exp.h
> index 0df1009c6e97..43c167a8a145 100644
> --- a/kernel/rcu/tree_exp.h
> +++ b/kernel/rcu/tree_exp.h
> @@ -708,7 +708,8 @@ static void rcu_exp_wait_wake(unsigned long s)
> }
> smp_mb(); /* All above changes before wakeup. */
> wake_up_all(&rnp->exp_wq[rcu_seq_ctr(s) & 0x3]);
> - rcu_exp_wake_nocb(rnp);
> + if (rcu_is_leaf_node(rnp))
> + rcu_nocb_exp_cleanup(rnp);
> }
> trace_rcu_exp_grace_period(rcu_state.name, s, TPS("endwake"));
> mutex_unlock(&rcu_state.exp_wake_mutex);
> diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
> index f37ee56d62a9..60d4182b9509 100644
> --- a/kernel/rcu/tree_nocb.h
> +++ b/kernel/rcu/tree_nocb.h
> @@ -170,13 +170,59 @@ static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp)
> lockdep_assert_held(&rdp->nocb_lock);
> }
>
> +static void rcu_nocb_cleanup_wake(struct swait_queue_head *sq)
> +{
> + if (swait_active(sq))
> + swake_up_all(sq);
> +}
> +
> /*
> * Wake up any no-CBs CPUs' kthreads that were waiting on the just-ended
> * grace period.
> */
> static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq)
> {
> - swake_up_all(sq);
> + /*
> + * swait active() can be checked first because:
> + *
> + * rcu_gp_cleanup() nocb_gp_wait()
> + * --------------- --------------
> + * WRITE_ONCE(root->gp_seq, new_gp_seq); swait_event_interruptible_exclusive(sq)
> + * smp_mb() prepare_to_swait()
> + * if swait_active(sq) list_add_tail(&wait->task_list, &q->task_list);
> + * swake_up_all(sq) set_current_state()
> + * smp_mb()
> + * if (poll_state_synchronize_rcu_full())
> + * if (rcu_seq_done_exact(&root->gp_seq, rgosp->rgos_norm))
> + * ...
> + */
> + rcu_nocb_cleanup_wake(sq);
> +}
> +
> +/*
> + * Wake NOCB rcuog kthreads for leaf-node CPUs so that they can advance
> + * callbacks that were waiting for the just-completed expedited GP.
> + * Wake-up waitqueues for both even and odd GP numbers because exp and
> + * normal sequences don't match.
> + */
> +static void rcu_nocb_exp_cleanup(struct rcu_node *rnp)
> +{
> +/*
> + * swait active() can be checked first because:
> + *
> + * rcu_exp_wait_wake() nocb_gp_wait()
> + * --------------- --------------
> + * rcu_seq_end(&rcu_state.expedited_sequence); swait_event_interruptible_exclusive(sq)
> + * smp_mb() prepare_to_swait()
> + * if swait_active(sq) list_add_tail(&wait->task_list, &q->task_list);
> + * swake_up_all(sq) set_current_state()
> + * smp_mb()
> + * if (poll_state_synchronize_rcu_full())
> + * if (rcu_seq_done_exact(&rcu_state.expedited_sequence, rgosp->rgos_exp))
> + * ...
> + */
> + rcu_nocb_cleanup_wake(&rnp->nocb_gp_wq[0]);
> + rcu_nocb_cleanup_wake(&rnp->nocb_gp_wq[1]);
> }
>
> static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp)
> @@ -190,31 +236,6 @@ static void rcu_init_one_nocb(struct rcu_node *rnp)
> init_swait_queue_head(&rnp->nocb_gp_wq[1]);
> }
>
> -/*
> - * Wake NOCB rcuog kthreads for leaf-node CPUs so that they can advance
> - * callbacks that were waiting for the just-completed expedited GP.
> - * Deduplicate via nocb_gp_rdp since multiple CPUs share one rcuog
> - * kthread. Use for_each_leaf_node_possible_cpu() because offline CPUs
> - * may have pending callbacks.
> - */
> -static void rcu_exp_wake_nocb(struct rcu_node *rnp)
> -{
> - struct rcu_data *last_rdp_gp = NULL;
> - int cpu;
> -
> - if (!rcu_is_leaf_node(rnp))
> - return;
> -
> - for_each_leaf_node_possible_cpu(rnp, cpu) {
> - struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
> -
> - if (rdp->nocb_gp_rdp == last_rdp_gp)
> - continue;
> - last_rdp_gp = rdp->nocb_gp_rdp;
> - wake_nocb_gp(rdp);
> - }
> -}
> -
> /* Clear any pending deferred wakeup timer (nocb_gp_lock must be held). */
> static void nocb_defer_wakeup_cancel(struct rcu_data *rdp_gp)
> {
> @@ -684,7 +705,7 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
> {
> bool bypass = false;
> int __maybe_unused cpu = my_rdp->cpu;
> - struct rcu_gp_oldstate cur_gp_seq_full;
> + struct rcu_gp_oldstate wait_gp_seq = {0}; //remove uninitialized warning
> unsigned long flags;
> bool gotcbs = false;
> unsigned long j = jiffies;
> @@ -694,7 +715,6 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
> bool needwake_gp;
> struct rcu_data *rdp, *rdp_toggling = NULL;
> struct rcu_node *rnp;
> - unsigned long wait_gp_seq = 0; // Suppress "use uninitialized" warning.
> bool wasempty = false;
>
> /*
> @@ -718,6 +738,7 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
> * won't be ignored for long.
> */
> list_for_each_entry(rdp, &my_rdp->nocb_head_rdp, nocb_entry_rdp) {
> + struct rcu_gp_oldstate cur_gp_seq;
> long bypass_ncbs;
> bool flush_bypass = false;
> long lazy_ncbs;
> @@ -755,8 +776,8 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
> needwake_gp = false;
> if (!rcu_segcblist_restempty(&rdp->cblist,
> RCU_NEXT_READY_TAIL) ||
> - (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq_full) &&
> - poll_state_synchronize_rcu_full(&cur_gp_seq_full))) {
> + (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
> + poll_state_synchronize_rcu_full(&cur_gp_seq))) {
> raw_spin_lock_rcu_node(rnp); /* irqs disabled. */
> needwake_gp = rcu_advance_cbs(rnp, rdp);
> wasempty = rcu_segcblist_restempty(&rdp->cblist,
> @@ -777,11 +798,15 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
> * numbers from rcu_accelerate_cbs() inside
> * rcu_advance_cbs() and will be handled on the next pass.
> */
> - if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq_full) &&
> - !poll_state_synchronize_rcu_full(&cur_gp_seq_full)) {
> + if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
> + !poll_state_synchronize_rcu_full(&cur_gp_seq)) {
> + if (!needwait_gp ||
> + ULONG_CMP_LT(cur_gp_seq.rgos_norm, wait_gp_seq.rgos_norm))
> + wait_gp_seq.rgos_norm = cur_gp_seq.rgos_norm;
> if (!needwait_gp ||
> - ULONG_CMP_LT(cur_gp_seq_full.rgos_norm, wait_gp_seq))
> - wait_gp_seq = cur_gp_seq_full.rgos_norm;
> + ULONG_CMP_LT(cur_gp_seq.rgos_exp, wait_gp_seq.rgos_exp))
> + wait_gp_seq.rgos_exp = cur_gp_seq.rgos_exp;
> +
> needwait_gp = true;
> trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> TPS("NeedWaitGP"));
> @@ -803,7 +828,8 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
>
> my_rdp->nocb_gp_bypass = bypass;
> my_rdp->nocb_gp_gp = needwait_gp;
> - my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0;
> + if (needwait_gp)
> + my_rdp->nocb_gp_seq = wait_gp_seq;
>
> // At least one child with non-empty ->nocb_bypass, so set
> // timer in order to avoid stranding its callbacks.
> @@ -838,12 +864,12 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
> nocb_gp_sleep(my_rdp, cpu);
> } else {
> rnp = my_rdp->mynode;
> - trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("StartWait"));
> + trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq.rgos_norm, TPS("StartWait"));
> swait_event_interruptible_exclusive(
> - rnp->nocb_gp_wq[rcu_seq_ctr(wait_gp_seq) & 0x1],
> - rcu_seq_done(&rnp->gp_seq, wait_gp_seq) ||
> + rnp->nocb_gp_wq[rcu_seq_ctr(wait_gp_seq.rgos_norm) & 0x1],
> + poll_state_synchronize_rcu_full(&wait_gp_seq) ||
> !READ_ONCE(my_rdp->nocb_gp_sleep));
> - trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait"));
> + trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq.rgos_norm, TPS("EndWait"));
> }
>
> if (!rcu_nocb_poll) {
> @@ -877,7 +903,8 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
> swake_up_one(&rdp_toggling->nocb_state_wq);
> }
>
> - my_rdp->nocb_gp_seq = -1;
> + my_rdp->nocb_gp_seq.rgos_norm = -1;
> + my_rdp->nocb_gp_seq.rgos_exp = -1;
> WARN_ON(signal_pending(current));
> }
>
> @@ -1561,7 +1588,7 @@ static void show_rcu_nocb_gp_state(struct rcu_data *rdp)
> {
> struct rcu_node *rnp = rdp->mynode;
>
> - pr_info("nocb GP %d %c%c%c%c%c %c[%c%c] %c%c:%ld rnp %d:%d %lu %c CPU %d%s\n",
> + pr_info("nocb GP %d %c%c%c%c%c %c[%c%c] %c%c:%ld/%ld rnp %d:%d %lu %c CPU %d%s\n",
> rdp->cpu,
> "kK"[!!rdp->nocb_gp_kthread],
> "lL"[raw_spin_is_locked(&rdp->nocb_gp_lock)],
> @@ -1573,7 +1600,8 @@ static void show_rcu_nocb_gp_state(struct rcu_data *rdp)
> ".W"[swait_active(&rnp->nocb_gp_wq[1])],
> ".B"[!!rdp->nocb_gp_bypass],
> ".G"[!!rdp->nocb_gp_gp],
> - (long)rdp->nocb_gp_seq,
> + (long)rdp->nocb_gp_seq.rgos_norm,
> + (long)rdp->nocb_gp_seq.rgos_exp,
> rnp->grplo, rnp->grphi, READ_ONCE(rdp->nocb_gp_loops),
> rdp->nocb_gp_kthread ? task_state_to_char(rdp->nocb_gp_kthread) : '.',
> rdp->nocb_gp_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1,
> @@ -1684,16 +1712,16 @@ static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq)
> {
> }
>
> -static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp)
> +static void rcu_nocb_exp_cleanup(struct rcu_node *rnp)
> {
> - return NULL;
> }
>
> -static void rcu_init_one_nocb(struct rcu_node *rnp)
> +static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp)
> {
> + return NULL;
> }
>
> -static void rcu_exp_wake_nocb(struct rcu_node *rnp)
> +static void rcu_init_one_nocb(struct rcu_node *rnp)
> {
> }
>