Re: [PATCH v6 1/4] rcu/tree: Make rcu_do_batch count how many callbacks were executed

From: Neeraj Upadhyay
Date: Wed Oct 14 2020 - 11:06:47 EST




On 9/23/2020 8:52 PM, Joel Fernandes (Google) wrote:
Currently, rcu_do_batch() depends on the unsegmented callback list's len field
to know how many CBs are executed. This fields counts down from 0 as CBs are
dequeued. It is possible that all CBs could not be run because of reaching
limits in which case the remaining unexecuted callbacks are requeued in the
CPU's segcblist.

The number of callbacks that were not requeued are then the negative count (how
many CBs were run) stored in the rcl->len which has been counting down on every
dequeue. This negative count is then added to the per-cpu segmented callback
list's to correct its count.

Such a design works against future efforts to track the length of each segment
of the segmented callback list. The reason is because
rcu_segcblist_extract_done_cbs() will be populating the unsegmented callback
list's length field (rcl->len) during extraction.

Also, the design of counting down from 0 is confusing and error-prone IMHO.

This commit therefore explicitly counts have many callbacks were executed in
rcu_do_batch() itself, and uses that to update the per-CPU segcb list's ->len
field, without relying on the negativity of rcl->len.

Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx>
---
kernel/rcu/rcu_segcblist.c | 2 +-
kernel/rcu/rcu_segcblist.h | 1 +
kernel/rcu/tree.c | 9 ++++-----
3 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c
index 2d2a6b6b9dfb..bb246d8c6ef1 100644
--- a/kernel/rcu/rcu_segcblist.c
+++ b/kernel/rcu/rcu_segcblist.c
@@ -95,7 +95,7 @@ static void rcu_segcblist_set_len(struct rcu_segcblist *rsclp, long v)
* This increase is fully ordered with respect to the callers accesses
* both before and after.
*/
-static void rcu_segcblist_add_len(struct rcu_segcblist *rsclp, long v)
+void rcu_segcblist_add_len(struct rcu_segcblist *rsclp, long v)
{
#ifdef CONFIG_RCU_NOCB_CPU
smp_mb__before_atomic(); /* Up to the caller! */
diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
index 5c293afc07b8..b90725f81d77 100644
--- a/kernel/rcu/rcu_segcblist.h
+++ b/kernel/rcu/rcu_segcblist.h
@@ -76,6 +76,7 @@ static inline bool rcu_segcblist_restempty(struct rcu_segcblist *rsclp, int seg)
}
void rcu_segcblist_inc_len(struct rcu_segcblist *rsclp);
+void rcu_segcblist_add_len(struct rcu_segcblist *rsclp, long v);
void rcu_segcblist_init(struct rcu_segcblist *rsclp);
void rcu_segcblist_disable(struct rcu_segcblist *rsclp);
void rcu_segcblist_offload(struct rcu_segcblist *rsclp);
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 7623128d0020..50af465729f4 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -2427,7 +2427,7 @@ static void rcu_do_batch(struct rcu_data *rdp)
rcu_segcblist_is_offloaded(&rdp->cblist);
struct rcu_head *rhp;
struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
- long bl, count;
+ long bl, count = 0;
long pending, tlimit = 0;
/* If no callbacks are ready, just return. */
@@ -2472,6 +2472,7 @@ static void rcu_do_batch(struct rcu_data *rdp)
for (; rhp; rhp = rcu_cblist_dequeue(&rcl)) {
rcu_callback_t f;
+ count++;
debug_rcu_head_unqueue(rhp);
rcu_lock_acquire(&rcu_callback_map);
@@ -2485,9 +2486,8 @@ static void rcu_do_batch(struct rcu_data *rdp)
/*
* Stop only if limit reached and CPU has something to do.
- * Note: The rcl structure counts down from zero.
*/
- if (-rcl.len >= bl && !offloaded &&
+ if (count >= bl && !offloaded &&
(need_resched() ||
(!is_idle_task(current) && !rcu_is_callbacks_kthread())))
break;

Update below usage of -rcl.len also?

if (likely((-rcl.len & 31) || local_clock() < tlimit))


Thanks
Neeraj

@@ -2510,7 +2510,6 @@ static void rcu_do_batch(struct rcu_data *rdp)
local_irq_save(flags);
rcu_nocb_lock(rdp);
- count = -rcl.len;
rdp->n_cbs_invoked += count;
trace_rcu_batch_end(rcu_state.name, count, !!rcl.head, need_resched(),
is_idle_task(current), rcu_is_callbacks_kthread());
@@ -2518,7 +2517,7 @@ static void rcu_do_batch(struct rcu_data *rdp)
/* Update counts and requeue any remaining callbacks. */
rcu_segcblist_insert_done_cbs(&rdp->cblist, &rcl);
smp_mb(); /* List handling before counting for rcu_barrier(). */
- rcu_segcblist_insert_count(&rdp->cblist, &rcl);
+ rcu_segcblist_add_len(&rdp->cblist, -count);
/* Reinstate batch limit if we have worked down the excess. */
count = rcu_segcblist_n_cbs(&rdp->cblist);


--
QUALCOMM INDIA, on behalf of Qualcomm Innovation Center, Inc. is a member of the Code Aurora Forum, hosted by The Linux Foundation