Re: [PATCH v3 3/4] rcu/nocb: Protect NOCB state via local_lock() under PREEMPT_RT

From: Thomas Gleixner
Date: Tue Sep 21 2021 - 10:05:12 EST


Valentin,

On Wed, Aug 11 2021 at 21:13, Valentin Schneider wrote:
> Running v5.13-rt1 on my arm64 Juno board triggers:
>
> [ 0.156302] =============================
> [ 0.160416] WARNING: suspicious RCU usage
> [ 0.172409] kernel/rcu/tree_plugin.h:69 Unsafe read of RCU_NOCB offloaded state!
> [ 0.260328] rcu_rdp_is_offloaded (kernel/rcu/tree_plugin.h:69 kernel/rcu/tree_plugin.h:58)
> [ 0.264537] rcu_core (kernel/rcu/tree.c:2332 kernel/rcu/tree.c:2398 kernel/rcu/tree.c:2777)
> [ 0.267786] rcu_cpu_kthread (./include/linux/bottom_half.h:32 kernel/rcu/tree.c:2876)
>
> In this case, this is the RCU core kthread accessing the local CPU's
> rdp. Before that, rcu_cpu_kthread() invokes local_bh_disable().
>
> Under !CONFIG_PREEMPT_RT (and rcutree.use_softirq=0), this ends up
> incrementing the preempt_count, which satisfies the "local non-preemptible
> read" of rcu_rdp_is_offloaded().
>
> Under CONFIG_PREEMPT_RT however, this becomes
>
> local_lock(&softirq_ctrl.lock)
>
> which, under the same config, is migrate_disable() + rt_spin_lock(). As
> pointed out by Frederic, this is not sufficient to safely access an rdp's
> offload state, as the RCU core kthread can be preempted by a kworker
> executing rcu_nocb_rdp_offload() [1].
>
> Introduce a local_lock to serialize an rdp's offload state while the rdp's
> associated core kthread is executing rcu_core().

Yes, sure. But I don't think that local_lock is required at all.

The point is that the two places where this actually matters invoke
rcu_rdp_is_offloaded() just at the top of the function outside of the
anyway existing protection sections. Moving it into the sections which
already provide the required protections makes it just work for both RT
and !RT.

Thanks,

tglx
---

--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -2278,13 +2278,13 @@ rcu_report_qs_rdp(struct rcu_data *rdp)
{
unsigned long flags;
unsigned long mask;
- bool needwake = false;
- const bool offloaded = rcu_rdp_is_offloaded(rdp);
+ bool offloaded, needwake = false;
struct rcu_node *rnp;

WARN_ON_ONCE(rdp->cpu != smp_processor_id());
rnp = rdp->mynode;
raw_spin_lock_irqsave_rcu_node(rnp, flags);
+ offloaded = rcu_rdp_is_offloaded(rdp);
if (rdp->cpu_no_qs.b.norm || rdp->gp_seq != rnp->gp_seq ||
rdp->gpwrap) {

@@ -2446,7 +2446,7 @@ static void rcu_do_batch(struct rcu_data
int div;
bool __maybe_unused empty;
unsigned long flags;
- const bool offloaded = rcu_rdp_is_offloaded(rdp);
+ bool offloaded;
struct rcu_head *rhp;
struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
long bl, count = 0;
@@ -2472,6 +2472,7 @@ static void rcu_do_batch(struct rcu_data
rcu_nocb_lock(rdp);
WARN_ON_ONCE(cpu_is_offline(smp_processor_id()));
pending = rcu_segcblist_n_cbs(&rdp->cblist);
+ offloaded = rcu_rdp_is_offloaded(rdp);
div = READ_ONCE(rcu_divisor);
div = div < 0 ? 7 : div > sizeof(long) * 8 - 2 ? sizeof(long) * 8 - 2 : div;
bl = max(rdp->blimit, pending >> div);