[PATCH v8 1/6] rcu/tree: Make rcu_do_batch count how many callbacks were executed

From: Joel Fernandes (Google)
Date: Wed Oct 21 2020 - 15:08:24 EST


Currently, rcu_do_batch() depends on the unsegmented callback list's len field
to know how many CBs are executed. This fields counts down from 0 as CBs are
dequeued. It is possible that all CBs could not be run because of reaching
limits in which case the remaining unexecuted callbacks are requeued in the
CPU's segcblist.

The number of callbacks that were not requeued are then the negative count (how
many CBs were run) stored in the rcl->len which has been counting down on every
dequeue. This negative count is then added to the per-cpu segmented callback
list's to correct its count.

Such a design works against future efforts to track the length of each segment
of the segmented callback list. The reason is because
rcu_segcblist_extract_done_cbs() will be populating the unsegmented callback
list's length field (rcl->len) during extraction.

Also, the design of counting down from 0 is confusing and error-prone IMHO.

This commit therefore explicitly counts how many callbacks were executed in
rcu_do_batch() itself, and uses that to update the per-CPU segcb list's ->len
field, without relying on the negativity of rcl->len.

Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx>
Reviewed-by: Frederic Weisbecker <frederic@xxxxxxxxxx>
---
kernel/rcu/rcu_segcblist.c | 2 +-
kernel/rcu/rcu_segcblist.h | 1 +
kernel/rcu/tree.c | 11 +++++------
3 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c
index 2d2a6b6b9dfb..bb246d8c6ef1 100644
--- a/kernel/rcu/rcu_segcblist.c
+++ b/kernel/rcu/rcu_segcblist.c
@@ -95,7 +95,7 @@ static void rcu_segcblist_set_len(struct rcu_segcblist *rsclp, long v)
* This increase is fully ordered with respect to the callers accesses
* both before and after.
*/
-static void rcu_segcblist_add_len(struct rcu_segcblist *rsclp, long v)
+void rcu_segcblist_add_len(struct rcu_segcblist *rsclp, long v)
{
#ifdef CONFIG_RCU_NOCB_CPU
smp_mb__before_atomic(); /* Up to the caller! */
diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
index 492262bcb591..1d2d61406463 100644
--- a/kernel/rcu/rcu_segcblist.h
+++ b/kernel/rcu/rcu_segcblist.h
@@ -76,6 +76,7 @@ static inline bool rcu_segcblist_restempty(struct rcu_segcblist *rsclp, int seg)
}

void rcu_segcblist_inc_len(struct rcu_segcblist *rsclp);
+void rcu_segcblist_add_len(struct rcu_segcblist *rsclp, long v);
void rcu_segcblist_init(struct rcu_segcblist *rsclp);
void rcu_segcblist_disable(struct rcu_segcblist *rsclp);
void rcu_segcblist_offload(struct rcu_segcblist *rsclp);
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 286dc0a1b184..24c00020ab83 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -2429,7 +2429,7 @@ static void rcu_do_batch(struct rcu_data *rdp)
const bool offloaded = rcu_segcblist_is_offloaded(&rdp->cblist);
struct rcu_head *rhp;
struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
- long bl, count;
+ long bl, count = 0;
long pending, tlimit = 0;

/* If no callbacks are ready, just return. */
@@ -2474,6 +2474,7 @@ static void rcu_do_batch(struct rcu_data *rdp)
for (; rhp; rhp = rcu_cblist_dequeue(&rcl)) {
rcu_callback_t f;

+ count++;
debug_rcu_head_unqueue(rhp);

rcu_lock_acquire(&rcu_callback_map);
@@ -2487,15 +2488,14 @@ static void rcu_do_batch(struct rcu_data *rdp)

/*
* Stop only if limit reached and CPU has something to do.
- * Note: The rcl structure counts down from zero.
*/
- if (-rcl.len >= bl && !offloaded &&
+ if (count >= bl && !offloaded &&
(need_resched() ||
(!is_idle_task(current) && !rcu_is_callbacks_kthread())))
break;
if (unlikely(tlimit)) {
/* only call local_clock() every 32 callbacks */
- if (likely((-rcl.len & 31) || local_clock() < tlimit))
+ if (likely((count & 31) || local_clock() < tlimit))
continue;
/* Exceeded the time limit, so leave. */
break;
@@ -2512,7 +2512,6 @@ static void rcu_do_batch(struct rcu_data *rdp)

local_irq_save(flags);
rcu_nocb_lock(rdp);
- count = -rcl.len;
rdp->n_cbs_invoked += count;
trace_rcu_batch_end(rcu_state.name, count, !!rcl.head, need_resched(),
is_idle_task(current), rcu_is_callbacks_kthread());
@@ -2520,7 +2519,7 @@ static void rcu_do_batch(struct rcu_data *rdp)
/* Update counts and requeue any remaining callbacks. */
rcu_segcblist_insert_done_cbs(&rdp->cblist, &rcl);
smp_mb(); /* List handling before counting for rcu_barrier(). */
- rcu_segcblist_insert_count(&rdp->cblist, &rcl);
+ rcu_segcblist_add_len(&rdp->cblist, -count);

/* Reinstate batch limit if we have worked down the excess. */
count = rcu_segcblist_n_cbs(&rdp->cblist);
--
2.29.0.rc1.297.gfa9743e501-goog