Re: [PATCH 2/2] rcu/nocb: Spare bypass locking upon normal enqueue

From: Joel Fernandes
Date: Mon Oct 10 2022 - 22:00:48 EST


On Tue, Oct 11, 2022 at 12:39:56AM +0200, Frederic Weisbecker wrote:
> When a callback is to be enqueued to the normal queue and not the bypass
> one, a flush to the bypass queue is always tried anyway. This attempt
> involves locking the bypass lock unconditionally. Although it is
> guaranteed not to be contended at this point, because only call_rcu()
> can lock the bypass lock without holding the nocb lock, it's still not
> free and the operation can easily be spared most of the time by just
> checking if the bypass list is empty. The check is safe as nobody can
> queue nor flush the bypass concurrently.
>
> Signed-off-by: Frederic Weisbecker <frederic@xxxxxxxxxx>
> ---
> kernel/rcu/tree_nocb.h | 6 ++++--
> 1 file changed, 4 insertions(+), 2 deletions(-)
>
> diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
> index 094fd454b6c3..30c3d473ffd8 100644
> --- a/kernel/rcu/tree_nocb.h
> +++ b/kernel/rcu/tree_nocb.h
> @@ -423,8 +423,10 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> if (*was_alldone)
> trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> TPS("FirstQ"));
> - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
> - WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
> + if (rcu_cblist_n_cbs(&rdp->nocb_bypass)) {
> + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
> + WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
> + }
> return false; // Caller must enqueue the callback.
> }

Instead of this, since as you mentioned that the bypass lock is not contended
in this path, isn't it unnecessary to even check or attempt to acquire the
lock in call_rcu() path? So how about something like the following, or would
this not work for some reason?

Thanks.

---8<-----------------------

diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index ad8d4e52ae92..6235e72cca07 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -3950,7 +3950,7 @@ static void rcu_barrier_entrain(struct rcu_data *rdp)
debug_rcu_head_queue(&rdp->barrier_head);
rcu_nocb_lock(rdp);
was_done = rcu_rdp_is_offloaded(rdp) && !rcu_segcblist_pend_cbs(&rdp->cblist);
- WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false));
+ WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false, false));
if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) {
atomic_inc(&rcu_state.barrier_cpu_count);
} else {
@@ -4379,7 +4379,7 @@ void rcutree_migrate_callbacks(int cpu)
my_rdp = this_cpu_ptr(&rcu_data);
my_rnp = my_rdp->mynode;
rcu_nocb_lock(my_rdp); /* irqs already disabled. */
- WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies, false));
+ WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies, false, false));
raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */
/* Leverage recent GPs and set GP for new callbacks. */
needwake = rcu_advance_cbs(my_rnp, rdp) ||
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index 1d803d39f0d1..0adb8f97a56d 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -442,7 +442,7 @@ static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp);
static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq);
static void rcu_init_one_nocb(struct rcu_node *rnp);
static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
- unsigned long j, bool lazy);
+ unsigned long j, bool lazy, bool nolock);
static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
bool *was_alldone, unsigned long flags,
bool lazy);
diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
index c9a791407650..2164f5d79dec 100644
--- a/kernel/rcu/tree_nocb.h
+++ b/kernel/rcu/tree_nocb.h
@@ -328,7 +328,7 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
* Note that this function always returns true if rhp is NULL.
*/
static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp_in,
- unsigned long j, bool lazy)
+ unsigned long j, bool lazy, bool nolock)
{
struct rcu_cblist rcl;
struct rcu_head *rhp = rhp_in;
@@ -359,7 +359,8 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp_

rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
WRITE_ONCE(rdp->nocb_bypass_first, j);
- rcu_nocb_bypass_unlock(rdp);
+ if (!nolock)
+ rcu_nocb_bypass_unlock(rdp);
return true;
}

@@ -372,13 +373,14 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp_
* Note that this function always returns true if rhp is NULL.
*/
static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
- unsigned long j, bool lazy)
+ unsigned long j, bool lazy, bool nolock)
{
if (!rcu_rdp_is_offloaded(rdp))
return true;
rcu_lockdep_assert_cblist_protected(rdp);
- rcu_nocb_bypass_lock(rdp);
- return rcu_nocb_do_flush_bypass(rdp, rhp, j, lazy);
+ if (!nolock)
+ rcu_nocb_bypass_lock(rdp);
+ return rcu_nocb_do_flush_bypass(rdp, rhp, j, lazy, nolock);
}

/*
@@ -391,7 +393,7 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
if (!rcu_rdp_is_offloaded(rdp) ||
!rcu_nocb_bypass_trylock(rdp))
return;
- WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j, false));
+ WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j, false, false));
}

/*
@@ -473,7 +475,7 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
TPS("FirstQ"));

- WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false));
+ WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false, true));
WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
return false; // Caller must enqueue the callback.
}
@@ -487,7 +489,7 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
rcu_nocb_lock(rdp);
*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);

- if (!rcu_nocb_flush_bypass(rdp, rhp, j, lazy)) {
+ if (!rcu_nocb_flush_bypass(rdp, rhp, j, lazy, true)) {
if (*was_alldone)
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
TPS("FirstQ"));
@@ -1136,7 +1138,7 @@ static long rcu_nocb_rdp_deoffload(void *arg)
* return false, which means that future calls to rcu_nocb_try_bypass()
* will refuse to put anything into the bypass.
*/
- WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false));
+ WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false, false));
/*
* Start with invoking rcu_core() early. This way if the current thread
* happens to preempt an ongoing call to rcu_core() in the middle,
@@ -1717,7 +1719,7 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
}

static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
- unsigned long j, bool lazy)
+ unsigned long j, bool lazy, bool nolock)
{
return true;
}