[PATCH v6 1/4] rcu: Make call_rcu() lazy to save power
From: Joel Fernandes (Google)
Date: Thu Sep 22 2022 - 18:01:46 EST
Implement timer-based RCU lazy callback batching. The batch is flushed
whenever a certain amount of time has passed, or the batch on a
particular CPU grows too big. Also memory pressure will flush it in a
future patch.
To handle several corner cases automagically (such as rcu_barrier() and
hotplug), we re-use bypass lists to handle lazy CBs. The bypass list
length has the lazy CB length included in it. A separate lazy CB length
counter is also introduced to keep track of the number of lazy CBs.
v5->v6:
[ Frederic Weisbec: Program the lazy timer only if WAKE_NOT, since other
deferral levels wake much earlier so for those it is not needed. ]
[ Frederic Weisbec: Use flush flags to keep bypass API code clean. ]
[ Frederic Weisbec: Make rcu_barrier() wake up only if main list empty. ]
[ Frederic Weisbec: Remove extra 'else if' branch in rcu_nocb_try_bypass(). ]
[ Joel: Fix issue where I was not resetting lazy_len after moving it to rdp ]
[ Paul/Thomas/Joel: Make call_rcu() default lazy so users don't mess up. ]
Suggested-by: Paul McKenney <paulmck@xxxxxxxxxx>
Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx>
---
include/linux/rcupdate.h | 7 ++
kernel/rcu/Kconfig | 8 ++
kernel/rcu/rcu.h | 8 ++
kernel/rcu/tiny.c | 2 +-
kernel/rcu/tree.c | 133 ++++++++++++++++++----------
kernel/rcu/tree.h | 17 +++-
kernel/rcu/tree_exp.h | 2 +-
kernel/rcu/tree_nocb.h | 184 ++++++++++++++++++++++++++++++++-------
8 files changed, 277 insertions(+), 84 deletions(-)
diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
index 08605ce7379d..40ae36904825 100644
--- a/include/linux/rcupdate.h
+++ b/include/linux/rcupdate.h
@@ -108,6 +108,13 @@ static inline int rcu_preempt_depth(void)
#endif /* #else #ifdef CONFIG_PREEMPT_RCU */
+#ifdef CONFIG_RCU_LAZY
+void call_rcu_flush(struct rcu_head *head, rcu_callback_t func);
+#else
+static inline void call_rcu_flush(struct rcu_head *head,
+ rcu_callback_t func) { call_rcu(head, func); }
+#endif
+
/* Internal to kernel */
void rcu_init(void);
extern int rcu_scheduler_active;
diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig
index f53ad63b2bc6..edd632e68497 100644
--- a/kernel/rcu/Kconfig
+++ b/kernel/rcu/Kconfig
@@ -314,4 +314,12 @@ config TASKS_TRACE_RCU_READ_MB
Say N here if you hate read-side memory barriers.
Take the default if you are unsure.
+config RCU_LAZY
+ bool "RCU callback lazy invocation functionality"
+ depends on RCU_NOCB_CPU
+ default n
+ help
+ To save power, batch RCU callbacks and flush after delay, memory
+ pressure or callback list growing too big.
+
endmenu # "RCU Subsystem"
diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h
index be5979da07f5..65704cbc9df7 100644
--- a/kernel/rcu/rcu.h
+++ b/kernel/rcu/rcu.h
@@ -474,6 +474,14 @@ enum rcutorture_type {
INVALID_RCU_FLAVOR
};
+#if defined(CONFIG_RCU_LAZY)
+unsigned long rcu_lazy_get_jiffies_till_flush(void);
+void rcu_lazy_set_jiffies_till_flush(unsigned long j);
+#else
+static inline unsigned long rcu_lazy_get_jiffies_till_flush(void) { return 0; }
+static inline void rcu_lazy_set_jiffies_till_flush(unsigned long j) { }
+#endif
+
#if defined(CONFIG_TREE_RCU)
void rcutorture_get_gp_data(enum rcutorture_type test_type, int *flags,
unsigned long *gp_seq);
diff --git a/kernel/rcu/tiny.c b/kernel/rcu/tiny.c
index a33a8d4942c3..810479cf17ba 100644
--- a/kernel/rcu/tiny.c
+++ b/kernel/rcu/tiny.c
@@ -44,7 +44,7 @@ static struct rcu_ctrlblk rcu_ctrlblk = {
void rcu_barrier(void)
{
- wait_rcu_gp(call_rcu);
+ wait_rcu_gp(call_rcu_flush);
}
EXPORT_SYMBOL(rcu_barrier);
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 5ec97e3f7468..736d0d724207 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -2728,47 +2728,8 @@ static void check_cb_ovld(struct rcu_data *rdp)
raw_spin_unlock_rcu_node(rnp);
}
-/**
- * call_rcu() - Queue an RCU callback for invocation after a grace period.
- * @head: structure to be used for queueing the RCU updates.
- * @func: actual callback function to be invoked after the grace period
- *
- * The callback function will be invoked some time after a full grace
- * period elapses, in other words after all pre-existing RCU read-side
- * critical sections have completed. However, the callback function
- * might well execute concurrently with RCU read-side critical sections
- * that started after call_rcu() was invoked.
- *
- * RCU read-side critical sections are delimited by rcu_read_lock()
- * and rcu_read_unlock(), and may be nested. In addition, but only in
- * v5.0 and later, regions of code across which interrupts, preemption,
- * or softirqs have been disabled also serve as RCU read-side critical
- * sections. This includes hardware interrupt handlers, softirq handlers,
- * and NMI handlers.
- *
- * Note that all CPUs must agree that the grace period extended beyond
- * all pre-existing RCU read-side critical section. On systems with more
- * than one CPU, this means that when "func()" is invoked, each CPU is
- * guaranteed to have executed a full memory barrier since the end of its
- * last RCU read-side critical section whose beginning preceded the call
- * to call_rcu(). It also means that each CPU executing an RCU read-side
- * critical section that continues beyond the start of "func()" must have
- * executed a memory barrier after the call_rcu() but before the beginning
- * of that RCU read-side critical section. Note that these guarantees
- * include CPUs that are offline, idle, or executing in user mode, as
- * well as CPUs that are executing in the kernel.
- *
- * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the
- * resulting RCU callback function "func()", then both CPU A and CPU B are
- * guaranteed to execute a full memory barrier during the time interval
- * between the call to call_rcu() and the invocation of "func()" -- even
- * if CPU A and CPU B are the same CPU (but again only if the system has
- * more than one CPU).
- *
- * Implementation of these memory-ordering guarantees is described here:
- * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
- */
-void call_rcu(struct rcu_head *head, rcu_callback_t func)
+static void
+__call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy)
{
static atomic_t doublefrees;
unsigned long flags;
@@ -2809,7 +2770,7 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
}
check_cb_ovld(rdp);
- if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags))
+ if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy))
return; // Enqueued onto ->nocb_bypass, so just leave.
// If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
rcu_segcblist_enqueue(&rdp->cblist, head);
@@ -2831,8 +2792,84 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
local_irq_restore(flags);
}
}
-EXPORT_SYMBOL_GPL(call_rcu);
+#ifdef CONFIG_RCU_LAZY
+/**
+ * call_rcu_flush() - Queue RCU callback for invocation after grace period, and
+ * flush all lazy callbacks (including the new one) to the main ->cblist while
+ * doing so.
+ *
+ * @head: structure to be used for queueing the RCU updates.
+ * @func: actual callback function to be invoked after the grace period
+ *
+ * The callback function will be invoked some time after a full grace
+ * period elapses, in other words after all pre-existing RCU read-side
+ * critical sections have completed.
+ *
+ * Use this API instead of call_rcu() if you don't mind the callback being
+ * invoked after very long periods of time on systems without memory pressure
+ * and on systems which are lightly loaded or mostly idle.
+ *
+ * Other than the extra delay in callbacks being invoked, this function is
+ * identical to, and reuses call_rcu()'s logic. Refer to call_rcu() for more
+ * details about memory ordering and other functionality.
+ */
+void call_rcu_flush(struct rcu_head *head, rcu_callback_t func)
+{
+ return __call_rcu_common(head, func, false);
+}
+EXPORT_SYMBOL_GPL(call_rcu_flush);
+#endif
+
+/**
+ * call_rcu() - Queue an RCU callback for invocation after a grace period.
+ * By default the callbacks are 'lazy' and are kept hidden from the main
+ * ->cblist to prevent starting of grace periods too soon.
+ * If you desire grace periods to start very soon, use call_rcu_flush().
+ *
+ * @head: structure to be used for queueing the RCU updates.
+ * @func: actual callback function to be invoked after the grace period
+ *
+ * The callback function will be invoked some time after a full grace
+ * period elapses, in other words after all pre-existing RCU read-side
+ * critical sections have completed. However, the callback function
+ * might well execute concurrently with RCU read-side critical sections
+ * that started after call_rcu() was invoked.
+ *
+ * RCU read-side critical sections are delimited by rcu_read_lock()
+ * and rcu_read_unlock(), and may be nested. In addition, but only in
+ * v5.0 and later, regions of code across which interrupts, preemption,
+ * or softirqs have been disabled also serve as RCU read-side critical
+ * sections. This includes hardware interrupt handlers, softirq handlers,
+ * and NMI handlers.
+ *
+ * Note that all CPUs must agree that the grace period extended beyond
+ * all pre-existing RCU read-side critical section. On systems with more
+ * than one CPU, this means that when "func()" is invoked, each CPU is
+ * guaranteed to have executed a full memory barrier since the end of its
+ * last RCU read-side critical section whose beginning preceded the call
+ * to call_rcu(). It also means that each CPU executing an RCU read-side
+ * critical section that continues beyond the start of "func()" must have
+ * executed a memory barrier after the call_rcu() but before the beginning
+ * of that RCU read-side critical section. Note that these guarantees
+ * include CPUs that are offline, idle, or executing in user mode, as
+ * well as CPUs that are executing in the kernel.
+ *
+ * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the
+ * resulting RCU callback function "func()", then both CPU A and CPU B are
+ * guaranteed to execute a full memory barrier during the time interval
+ * between the call to call_rcu() and the invocation of "func()" -- even
+ * if CPU A and CPU B are the same CPU (but again only if the system has
+ * more than one CPU).
+ *
+ * Implementation of these memory-ordering guarantees is described here:
+ * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
+ */
+void call_rcu(struct rcu_head *head, rcu_callback_t func)
+{
+ return __call_rcu_common(head, func, true);
+}
+EXPORT_SYMBOL_GPL(call_rcu);
/* Maximum number of jiffies to wait before draining a batch. */
#define KFREE_DRAIN_JIFFIES (5 * HZ)
@@ -3507,7 +3544,7 @@ void synchronize_rcu(void)
if (rcu_gp_is_expedited())
synchronize_rcu_expedited();
else
- wait_rcu_gp(call_rcu);
+ wait_rcu_gp(call_rcu_flush);
return;
}
@@ -3902,7 +3939,11 @@ static void rcu_barrier_entrain(struct rcu_data *rdp)
rdp->barrier_head.func = rcu_barrier_callback;
debug_rcu_head_queue(&rdp->barrier_head);
rcu_nocb_lock(rdp);
- WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
+ /*
+ * Flush the bypass list, but also wake up the GP thread as otherwise
+ * bypass/lazy CBs maynot be noticed, and can cause real long delays!
+ */
+ WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, FLUSH_BP_WAKE));
if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) {
atomic_inc(&rcu_state.barrier_cpu_count);
} else {
@@ -4323,7 +4364,7 @@ void rcutree_migrate_callbacks(int cpu)
my_rdp = this_cpu_ptr(&rcu_data);
my_rnp = my_rdp->mynode;
rcu_nocb_lock(my_rdp); /* irqs already disabled. */
- WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies));
+ WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies, FLUSH_BP_NONE));
raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */
/* Leverage recent GPs and set GP for new callbacks. */
needwake = rcu_advance_cbs(my_rnp, rdp) ||
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index d4a97e40ea9c..361c41d642c7 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -263,14 +263,16 @@ struct rcu_data {
unsigned long last_fqs_resched; /* Time of last rcu_resched(). */
unsigned long last_sched_clock; /* Jiffies of last rcu_sched_clock_irq(). */
+ long lazy_len; /* Length of buffered lazy callbacks. */
int cpu;
};
/* Values for nocb_defer_wakeup field in struct rcu_data. */
#define RCU_NOCB_WAKE_NOT 0
#define RCU_NOCB_WAKE_BYPASS 1
-#define RCU_NOCB_WAKE 2
-#define RCU_NOCB_WAKE_FORCE 3
+#define RCU_NOCB_WAKE_LAZY 2
+#define RCU_NOCB_WAKE 3
+#define RCU_NOCB_WAKE_FORCE 4
#define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500))
/* For jiffies_till_first_fqs and */
@@ -439,10 +441,17 @@ static void zero_cpu_stall_ticks(struct rcu_data *rdp);
static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp);
static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq);
static void rcu_init_one_nocb(struct rcu_node *rnp);
+
+#define FLUSH_BP_NONE 0
+/* Is the CB being enqueued after the flush, a lazy CB? */
+#define FLUSH_BP_LAZY BIT(0)
+/* Wake up nocb-GP thread after flush? */
+#define FLUSH_BP_WAKE BIT(1)
static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
- unsigned long j);
+ unsigned long j, unsigned long flush_flags);
static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
- bool *was_alldone, unsigned long flags);
+ bool *was_alldone, unsigned long flags,
+ bool lazy);
static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
unsigned long flags);
static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level);
diff --git a/kernel/rcu/tree_exp.h b/kernel/rcu/tree_exp.h
index 18e9b4cd78ef..5cac05600798 100644
--- a/kernel/rcu/tree_exp.h
+++ b/kernel/rcu/tree_exp.h
@@ -937,7 +937,7 @@ void synchronize_rcu_expedited(void)
/* If expedited grace periods are prohibited, fall back to normal. */
if (rcu_gp_is_normal()) {
- wait_rcu_gp(call_rcu);
+ wait_rcu_gp(call_rcu_flush);
return;
}
diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
index f77a6d7e1356..661c685aba3f 100644
--- a/kernel/rcu/tree_nocb.h
+++ b/kernel/rcu/tree_nocb.h
@@ -256,6 +256,31 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
return __wake_nocb_gp(rdp_gp, rdp, force, flags);
}
+/*
+ * LAZY_FLUSH_JIFFIES decides the maximum amount of time that
+ * can elapse before lazy callbacks are flushed. Lazy callbacks
+ * could be flushed much earlier for a number of other reasons
+ * however, LAZY_FLUSH_JIFFIES will ensure no lazy callbacks are
+ * left unsubmitted to RCU after those many jiffies.
+ */
+#define LAZY_FLUSH_JIFFIES (10 * HZ)
+static unsigned long jiffies_till_flush = LAZY_FLUSH_JIFFIES;
+
+#ifdef CONFIG_RCU_LAZY
+// To be called only from test code.
+void rcu_lazy_set_jiffies_till_flush(unsigned long jif)
+{
+ jiffies_till_flush = jif;
+}
+EXPORT_SYMBOL(rcu_lazy_set_jiffies_till_flush);
+
+unsigned long rcu_lazy_get_jiffies_till_flush(void)
+{
+ return jiffies_till_flush;
+}
+EXPORT_SYMBOL(rcu_lazy_get_jiffies_till_flush);
+#endif
+
/*
* Arrange to wake the GP kthread for this NOCB group at some future
* time when it is safe to do so.
@@ -269,10 +294,14 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
/*
- * Bypass wakeup overrides previous deferments. In case
- * of callback storm, no need to wake up too early.
+ * Bypass wakeup overrides previous deferments. In case of
+ * callback storm, no need to wake up too early.
*/
- if (waketype == RCU_NOCB_WAKE_BYPASS) {
+ if (waketype == RCU_NOCB_WAKE_LAZY
+ && READ_ONCE(rdp->nocb_defer_wakeup) == RCU_NOCB_WAKE_NOT) {
+ mod_timer(&rdp_gp->nocb_timer, jiffies + jiffies_till_flush);
+ WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
+ } else if (waketype == RCU_NOCB_WAKE_BYPASS) {
mod_timer(&rdp_gp->nocb_timer, jiffies + 2);
WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
} else {
@@ -293,12 +322,16 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
* proves to be initially empty, just return false because the no-CB GP
* kthread may need to be awakened in this case.
*
+ * Return true if there was something to be flushed and it succeeded, otherwise
+ * false.
+ *
* Note that this function always returns true if rhp is NULL.
*/
static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
- unsigned long j)
+ unsigned long j, unsigned long flush_flags)
{
struct rcu_cblist rcl;
+ bool lazy = flush_flags & FLUSH_BP_LAZY;
WARN_ON_ONCE(!rcu_rdp_is_offloaded(rdp));
rcu_lockdep_assert_cblist_protected(rdp);
@@ -310,7 +343,20 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
/* Note: ->cblist.len already accounts for ->nocb_bypass contents. */
if (rhp)
rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
- rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
+
+ /*
+ * If the new CB requested was a lazy one, queue it onto the main
+ * ->cblist so we can take advantage of a sooner grade period.
+ */
+ if (lazy && rhp) {
+ rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, NULL);
+ rcu_cblist_enqueue(&rcl, rhp);
+ WRITE_ONCE(rdp->lazy_len, 0);
+ } else {
+ rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
+ WRITE_ONCE(rdp->lazy_len, 0);
+ }
+
rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
WRITE_ONCE(rdp->nocb_bypass_first, j);
rcu_nocb_bypass_unlock(rdp);
@@ -326,13 +372,33 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
* Note that this function always returns true if rhp is NULL.
*/
static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
- unsigned long j)
+ unsigned long j, unsigned long flush_flags)
{
+ bool ret;
+ bool was_alldone = false;
+ bool bypass_all_lazy = false;
+ long bypass_ncbs;
+
if (!rcu_rdp_is_offloaded(rdp))
return true;
rcu_lockdep_assert_cblist_protected(rdp);
rcu_nocb_bypass_lock(rdp);
- return rcu_nocb_do_flush_bypass(rdp, rhp, j);
+
+ if (flush_flags & FLUSH_BP_WAKE) {
+ was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+ bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+ bypass_all_lazy = bypass_ncbs && (bypass_ncbs == rdp->lazy_len);
+ }
+
+ ret = rcu_nocb_do_flush_bypass(rdp, rhp, j, flush_flags);
+
+ // Wake up the nocb GP thread if needed. GP thread could be sleeping
+ // while waiting for lazy timer to expire (otherwise rcu_barrier may
+ // end up waiting for the duration of the lazy timer).
+ if (flush_flags & FLUSH_BP_WAKE && was_alldone && bypass_all_lazy)
+ wake_nocb_gp(rdp, false);
+
+ return ret;
}
/*
@@ -345,7 +411,7 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
if (!rcu_rdp_is_offloaded(rdp) ||
!rcu_nocb_bypass_trylock(rdp))
return;
- WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j));
+ WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j, FLUSH_BP_NONE));
}
/*
@@ -367,12 +433,14 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
* there is only one CPU in operation.
*/
static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
- bool *was_alldone, unsigned long flags)
+ bool *was_alldone, unsigned long flags,
+ bool lazy)
{
unsigned long c;
unsigned long cur_gp_seq;
unsigned long j = jiffies;
long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+ bool bypass_is_lazy = (ncbs == READ_ONCE(rdp->lazy_len));
lockdep_assert_irqs_disabled();
@@ -417,25 +485,30 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
// If there hasn't yet been all that many ->cblist enqueues
// this jiffy, tell the caller to enqueue onto ->cblist. But flush
// ->nocb_bypass first.
- if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
+ // Lazy CBs throttle this back and do immediate bypass queuing.
+ if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) {
rcu_nocb_lock(rdp);
*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
if (*was_alldone)
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
TPS("FirstQ"));
- WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
+
+ WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, FLUSH_BP_NONE));
WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
return false; // Caller must enqueue the callback.
}
// If ->nocb_bypass has been used too long or is too full,
// flush ->nocb_bypass to ->cblist.
- if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
+ if ((ncbs && !bypass_is_lazy && j != READ_ONCE(rdp->nocb_bypass_first)) ||
+ (ncbs && bypass_is_lazy &&
+ (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + jiffies_till_flush))) ||
ncbs >= qhimark) {
rcu_nocb_lock(rdp);
*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
- if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
+ if (!rcu_nocb_flush_bypass(rdp, rhp, j,
+ lazy ? FLUSH_BP_LAZY : FLUSH_BP_NONE)) {
if (*was_alldone)
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
TPS("FirstQ"));
@@ -460,16 +533,29 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
// We need to use the bypass.
rcu_nocb_wait_contended(rdp);
rcu_nocb_bypass_lock(rdp);
+
ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
+
+ if (IS_ENABLED(CONFIG_RCU_LAZY) && lazy)
+ WRITE_ONCE(rdp->lazy_len, rdp->lazy_len + 1);
+
if (!ncbs) {
WRITE_ONCE(rdp->nocb_bypass_first, j);
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ"));
}
+
rcu_nocb_bypass_unlock(rdp);
smp_mb(); /* Order enqueue before wake. */
- if (ncbs) {
+
+ // A wake up of the grace period kthread or timer adjustment needs to
+ // be done only if:
+ // 1. Bypass list was fully empty before (this is the first bypass list entry).
+ // Or, both the below conditions are met:
+ // 1. Bypass list had only lazy CBs before.
+ // 2. The new CB is non-lazy.
+ if (ncbs && (!bypass_is_lazy || lazy)) {
local_irq_restore(flags);
} else {
// No-CBs GP kthread might be indefinitely asleep, if so, wake.
@@ -499,7 +585,7 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
{
unsigned long cur_gp_seq;
unsigned long j;
- long len;
+ long len, lazy_len, bypass_len;
struct task_struct *t;
// If we are being polled or there is no kthread, just leave.
@@ -512,9 +598,16 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
}
// Need to actually to a wakeup.
len = rcu_segcblist_n_cbs(&rdp->cblist);
+ bypass_len = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+ lazy_len = READ_ONCE(rdp->lazy_len);
if (was_alldone) {
rdp->qlen_last_fqs_check = len;
- if (!irqs_disabled_flags(flags)) {
+ // Only lazy CBs in bypass list
+ if (lazy_len && bypass_len == lazy_len) {
+ rcu_nocb_unlock_irqrestore(rdp, flags);
+ wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_LAZY,
+ TPS("WakeLazy"));
+ } else if (!irqs_disabled_flags(flags)) {
/* ... if queue was empty ... */
rcu_nocb_unlock_irqrestore(rdp, flags);
wake_nocb_gp(rdp, false);
@@ -604,8 +697,8 @@ static void nocb_gp_sleep(struct rcu_data *my_rdp, int cpu)
*/
static void nocb_gp_wait(struct rcu_data *my_rdp)
{
- bool bypass = false;
- long bypass_ncbs;
+ bool bypass = false, lazy = false;
+ long bypass_ncbs, lazy_ncbs;
int __maybe_unused cpu = my_rdp->cpu;
unsigned long cur_gp_seq;
unsigned long flags;
@@ -640,24 +733,41 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
* won't be ignored for long.
*/
list_for_each_entry(rdp, &my_rdp->nocb_head_rdp, nocb_entry_rdp) {
+ bool flush_bypass = false;
+
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check"));
rcu_nocb_lock_irqsave(rdp, flags);
lockdep_assert_held(&rdp->nocb_lock);
bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
- if (bypass_ncbs &&
+ lazy_ncbs = READ_ONCE(rdp->lazy_len);
+
+ if (bypass_ncbs && (lazy_ncbs == bypass_ncbs) &&
+ (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + jiffies_till_flush) ||
+ bypass_ncbs > 2 * qhimark)) {
+ flush_bypass = true;
+ } else if (bypass_ncbs && (lazy_ncbs != bypass_ncbs) &&
(time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) ||
bypass_ncbs > 2 * qhimark)) {
- // Bypass full or old, so flush it.
- (void)rcu_nocb_try_flush_bypass(rdp, j);
- bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+ flush_bypass = true;
} else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) {
rcu_nocb_unlock_irqrestore(rdp, flags);
continue; /* No callbacks here, try next. */
}
+
+ if (flush_bypass) {
+ // Bypass full or old, so flush it.
+ (void)rcu_nocb_try_flush_bypass(rdp, j);
+ bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+ lazy_ncbs = READ_ONCE(rdp->lazy_len);
+ }
+
if (bypass_ncbs) {
trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
- TPS("Bypass"));
- bypass = true;
+ bypass_ncbs == lazy_ncbs ? TPS("Lazy") : TPS("Bypass"));
+ if (bypass_ncbs == lazy_ncbs)
+ lazy = true;
+ else
+ bypass = true;
}
rnp = rdp->mynode;
@@ -705,12 +815,21 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
my_rdp->nocb_gp_gp = needwait_gp;
my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0;
- if (bypass && !rcu_nocb_poll) {
- // At least one child with non-empty ->nocb_bypass, so set
- // timer in order to avoid stranding its callbacks.
- wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
- TPS("WakeBypassIsDeferred"));
+ // At least one child with non-empty ->nocb_bypass, so set
+ // timer in order to avoid stranding its callbacks.
+ if (!rcu_nocb_poll) {
+ // If bypass list only has lazy CBs. Add a deferred
+ // lazy wake up.
+ if (lazy && !bypass) {
+ wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_LAZY,
+ TPS("WakeLazyIsDeferred"));
+ // Otherwise add a deferred bypass wake up.
+ } else if (bypass) {
+ wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
+ TPS("WakeBypassIsDeferred"));
+ }
}
+
if (rcu_nocb_poll) {
/* Polling, so trace if first poll in the series. */
if (gotcbs)
@@ -1036,7 +1155,7 @@ static long rcu_nocb_rdp_deoffload(void *arg)
* return false, which means that future calls to rcu_nocb_try_bypass()
* will refuse to put anything into the bypass.
*/
- WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
+ WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, FLUSH_BP_NONE));
/*
* Start with invoking rcu_core() early. This way if the current thread
* happens to preempt an ongoing call to rcu_core() in the middle,
@@ -1278,6 +1397,7 @@ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
raw_spin_lock_init(&rdp->nocb_gp_lock);
timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0);
rcu_cblist_init(&rdp->nocb_bypass);
+ WRITE_ONCE(rdp->lazy_len, 0);
mutex_init(&rdp->nocb_gp_kthread_mutex);
}
@@ -1559,13 +1679,13 @@ static void rcu_init_one_nocb(struct rcu_node *rnp)
}
static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
- unsigned long j)
+ unsigned long j, unsigned long flush_flags)
{
return true;
}
static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
- bool *was_alldone, unsigned long flags)
+ bool *was_alldone, unsigned long flags, bool lazy)
{
return false;
}
--
2.37.3.998.g577e59143f-goog