[RFC] rcu/nocb: Fix possible bugs in rcu_barrier()

From: Joel Fernandes (Google)
Date: Sun Sep 18 2022 - 18:13:00 EST


When going through the lazy-rcu work, I noticed that
rcu_barrier_entrain() does not really wake up the rcuog GP thread in any
path after entraining. This means it is possible the GP thread is not
awakened soon (say there were no CBs in the cblist after entraining
time).

Further, nothing appears to be calling the rcu_barrier callback
directly in the case the ->cblist was empty which means if the IPI gets
delayed enough to make the ->cblist empty and it turns out to be the last
CPU holding, then nothing calls completes rcu_state.barrier_completion.

Fix both these issues.

A note on the wakeup, there are 3 cases AFAICS after the call to
rcu_nocb_flush_bypass():

1. The rdp->cblist has pending CBs.

2. The rdp->cblist has all done CBs.

3. The rdp->cblist has no CBs at all (say the IPI took a long time to
arrive and some other path dequeued them in the meanwhile).

For #3, entraining a CB is not needed and we should bail. For #1 and
needed. But for #2 it is needed.

Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx>
---
I only build tested this and wanted to post it in advance for discussions. I
will test it more soon. Thanks.

kernel/rcu/tree.c | 22 +++++++++++++++-------
1 file changed, 15 insertions(+), 7 deletions(-)

diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 18f07e167d5e..65d439286757 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -3904,10 +3904,11 @@ static void rcu_barrier_callback(struct rcu_head *rhp)
/*
* If needed, entrain an rcu_barrier() callback on rdp->cblist.
*/
-static void rcu_barrier_entrain(struct rcu_data *rdp)
+static void rcu_barrier_entrain(struct rcu_data *rdp, unsigned long flags)
{
unsigned long gseq = READ_ONCE(rcu_state.barrier_sequence);
unsigned long lseq = READ_ONCE(rdp->barrier_seq_snap);
+ bool was_alldone;

lockdep_assert_held(&rcu_state.barrier_lock);
if (rcu_seq_state(lseq) || !rcu_seq_state(gseq) || rcu_seq_ctr(lseq) != rcu_seq_ctr(gseq))
@@ -3916,14 +3917,20 @@ static void rcu_barrier_entrain(struct rcu_data *rdp)
rdp->barrier_head.func = rcu_barrier_callback;
debug_rcu_head_queue(&rdp->barrier_head);
rcu_nocb_lock(rdp);
+ was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
+
if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) {
atomic_inc(&rcu_state.barrier_cpu_count);
+ __call_rcu_nocb_wake(rdp, was_alldone, flags); /* unlocks */
} else {
+ /* rdp->cblist is empty so directly call the callback. */
+ atomic_inc(&rcu_state.barrier_cpu_count);
+ rcu_barrier_callback(&rdp->barrier_head);
debug_rcu_head_unqueue(&rdp->barrier_head);
rcu_barrier_trace(TPS("IRQNQ"), -1, rcu_state.barrier_sequence);
+ rcu_nocb_unlock(rdp);
}
- rcu_nocb_unlock(rdp);
smp_store_release(&rdp->barrier_seq_snap, gseq);
}

@@ -3932,15 +3939,16 @@ static void rcu_barrier_entrain(struct rcu_data *rdp)
*/
static void rcu_barrier_handler(void *cpu_in)
{
+ unsigned long flags;
uintptr_t cpu = (uintptr_t)cpu_in;
struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);

lockdep_assert_irqs_disabled();
WARN_ON_ONCE(cpu != rdp->cpu);
WARN_ON_ONCE(cpu != smp_processor_id());
- raw_spin_lock(&rcu_state.barrier_lock);
- rcu_barrier_entrain(rdp);
- raw_spin_unlock(&rcu_state.barrier_lock);
+ raw_spin_lock_irqsave(&rcu_state.barrier_lock, flags);
+ rcu_barrier_entrain(rdp, flags);
+ raw_spin_unlock_irqrestore(&rcu_state.barrier_lock, flags);
}

/**
@@ -4007,7 +4015,7 @@ void rcu_barrier(void)
continue;
}
if (!rcu_rdp_cpu_online(rdp)) {
- rcu_barrier_entrain(rdp);
+ rcu_barrier_entrain(rdp, flags);
WARN_ON_ONCE(READ_ONCE(rdp->barrier_seq_snap) != gseq);
raw_spin_unlock_irqrestore(&rcu_state.barrier_lock, flags);
rcu_barrier_trace(TPS("OfflineNoCBQ"), cpu, rcu_state.barrier_sequence);
@@ -4333,7 +4341,7 @@ void rcutree_migrate_callbacks(int cpu)

raw_spin_lock_irqsave(&rcu_state.barrier_lock, flags);
WARN_ON_ONCE(rcu_rdp_cpu_online(rdp));
- rcu_barrier_entrain(rdp);
+ rcu_barrier_entrain(rdp, flags);
my_rdp = this_cpu_ptr(&rcu_data);
my_rnp = my_rdp->mynode;
rcu_nocb_lock(my_rdp); /* irqs already disabled. */
--
2.37.3.968.ga6b4b080e4-goog