[PATCH RFC] Make call_srcu() available during very early boot
From: Paul E. McKenney
Date: Tue Aug 14 2018 - 12:24:58 EST
Event tracing is moving to SRCU in order to take advantage of the fact
that SRCU may be safely used from idle and even offline CPUs. However,
event tracing can invoke call_srcu() very early in the boot process,
even before workqueue_init_early() is invoked (let alone rcu_init()).
Therefore, call_srcu()'s attempts to queue work fail miserably.
This commit therefore detects this situation, and refrains from attempting
to queue work before rcu_init() time, but does everything else that it
would have done, and in addition, adds the srcu_struct to a global list.
The rcu_init() function now invokes a new srcu_init() function, which
is empty if CONFIG_SRCU=n. Otherwise, srcu_init() queues work for
each srcu_struct on the list. This all happens early enough in boot
that there is but a single CPU with interrupts disabled, which allows
synchronization to be dispensed with.
Of course, the queued work won't actually be invoked until after
workqueue_init() is invoked, which happens shortly after the scheduler
is up and running. This means that although call_srcu() may be invoked
any time after per-CPU variables have been set up, there is still a very
narrow window when synchronize_srcu() won't work, and this window
extends from the time that the scheduler starts until the time that
workqueue_init() returns. This can be fixed in a manner similar to
the fix for synchronize_rcu_expedited() and friends, but until someone
actually needs to use synchronize_srcu() during this window, this fix
is added churn for no benefit.
Finally, note that Tree SRCU's new srcu_init() function invokes
queue_work() rather than the queue_delayed_work() function that is invoked
post-boot. The reason is that queue_delayed_work() will (as you would
expect) post a timer, and timers have not yet been initialized. So use
of queue_delayed_work() avoids the complaints about use of uninitialized
spinlocks that would otherwise result. Besides, delay is in any case
provide by the aforementioned fact that the queued work won't actually
be invoked until after the scheduler is up and running.
Requested-by: Steven Rostedt <rostedt@xxxxxxxxxxx>
Signed-off-by: Paul E. McKenney <paulmck@xxxxxxxxxxxxxxxxxx>
diff --git a/include/linux/srcutiny.h b/include/linux/srcutiny.h
index f41d2fb09f87..2b5c0822e683 100644
--- a/include/linux/srcutiny.h
+++ b/include/linux/srcutiny.h
@@ -36,6 +36,7 @@ struct srcu_struct {
struct rcu_head *srcu_cb_head; /* Pending callbacks: Head. */
struct rcu_head **srcu_cb_tail; /* Pending callbacks: Tail. */
struct work_struct srcu_work; /* For driving grace periods. */
+ struct list_head srcu_boot_entry; /* Early-boot callbacks. */
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */
@@ -48,6 +49,7 @@ void srcu_drive_gp(struct work_struct *wp);
.srcu_wq = __SWAIT_QUEUE_HEAD_INITIALIZER(name.srcu_wq), \
.srcu_cb_tail = &name.srcu_cb_head, \
.srcu_work = __WORK_INITIALIZER(name.srcu_work, srcu_drive_gp), \
+ .srcu_boot_entry = LIST_HEAD_INIT(name.srcu_boot_entry), \
__SRCU_DEP_MAP_INIT(name) \
}
diff --git a/include/linux/srcutree.h b/include/linux/srcutree.h
index 745d4ca4dd50..86ad97111315 100644
--- a/include/linux/srcutree.h
+++ b/include/linux/srcutree.h
@@ -94,6 +94,7 @@ struct srcu_struct {
/* callback for the barrier */
/* operation. */
struct delayed_work work;
+ struct list_head srcu_boot_entry; /* Early-boot callbacks. */
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */
@@ -105,12 +106,13 @@ struct srcu_struct {
#define SRCU_STATE_SCAN2 2
#define __SRCU_STRUCT_INIT(name, pcpu_name) \
- { \
- .sda = &pcpu_name, \
- .lock = __SPIN_LOCK_UNLOCKED(name.lock), \
- .srcu_gp_seq_needed = 0 - 1, \
- __SRCU_DEP_MAP_INIT(name) \
- }
+{ \
+ .sda = &pcpu_name, \
+ .lock = __SPIN_LOCK_UNLOCKED(name.lock), \
+ .srcu_gp_seq_needed = 0 - 1, \
+ .srcu_boot_entry = LIST_HEAD_INIT(name.srcu_boot_entry), \
+ __SRCU_DEP_MAP_INIT(name) \
+}
/*
* Define and initialize a srcu struct at build time.
diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h
index 4c56c1d98fb3..8e92ecdf3e9f 100644
--- a/kernel/rcu/rcu.h
+++ b/kernel/rcu/rcu.h
@@ -434,6 +434,12 @@ do { \
#endif /* #if defined(SRCU) || !defined(TINY_RCU) */
+#ifdef CONFIG_SRCU
+void srcu_init(void);
+#else /* #ifdef CONFIG_SRCU */
+static inline void void srcu_init(void) { }
+#endif /* #else #ifdef CONFIG_SRCU */
+
#ifdef CONFIG_TINY_RCU
/* Tiny RCU doesn't expedite, as its purpose in life is instead to be tiny. */
static inline bool rcu_gp_is_normal(void) { return true; }
diff --git a/kernel/rcu/srcutiny.c b/kernel/rcu/srcutiny.c
index 622792abe41a..d4042fe1bedd 100644
--- a/kernel/rcu/srcutiny.c
+++ b/kernel/rcu/srcutiny.c
@@ -34,6 +34,8 @@
#include "rcu.h"
int rcu_scheduler_active __read_mostly;
+static LIST_HEAD(srcu_boot_list);
+static bool srcu_init_done;
static int init_srcu_struct_fields(struct srcu_struct *sp)
{
@@ -46,6 +48,7 @@ static int init_srcu_struct_fields(struct srcu_struct *sp)
sp->srcu_gp_waiting = false;
sp->srcu_idx = 0;
INIT_WORK(&sp->srcu_work, srcu_drive_gp);
+ INIT_LIST_HEAD(&sp->srcu_boot_entry);
return 0;
}
@@ -179,8 +182,12 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *rhp,
*sp->srcu_cb_tail = rhp;
sp->srcu_cb_tail = &rhp->next;
local_irq_restore(flags);
- if (!READ_ONCE(sp->srcu_gp_running))
- schedule_work(&sp->srcu_work);
+ if (!READ_ONCE(sp->srcu_gp_running)) {
+ if (likely(srcu_init_done))
+ schedule_work(&sp->srcu_work);
+ else if (list_empty(&sp->srcu_boot_entry))
+ list_add(&sp->srcu_boot_entry, &srcu_boot_list);
+ }
}
EXPORT_SYMBOL_GPL(call_srcu);
@@ -204,3 +211,21 @@ void __init rcu_scheduler_starting(void)
{
rcu_scheduler_active = RCU_SCHEDULER_RUNNING;
}
+
+/*
+ * Queue work for srcu_struct structures with early boot callbacks.
+ * The work won't actually execute until the workqueue initialization
+ * phase that takes place after the scheduler starts.
+ */
+void __init srcu_init(void)
+{
+ struct srcu_struct *sp;
+
+ srcu_init_done = true;
+ while (!list_empty(&srcu_boot_list)) {
+ sp = list_first_entry(&srcu_boot_list,
+ struct srcu_struct, srcu_boot_entry);
+ list_del_init(&sp->srcu_boot_entry);
+ schedule_work(&sp->srcu_work);
+ }
+}
diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c
index 7f266b0f9832..60d4fd66905c 100644
--- a/kernel/rcu/srcutree.c
+++ b/kernel/rcu/srcutree.c
@@ -51,6 +51,10 @@ module_param(exp_holdoff, ulong, 0444);
static ulong counter_wrap_check = (ULONG_MAX >> 2);
module_param(counter_wrap_check, ulong, 0444);
+/* Early-boot callback-management, so early that no lock is required! */
+static LIST_HEAD(srcu_boot_list);
+static bool __read_mostly srcu_init_done;
+
static void srcu_invoke_callbacks(struct work_struct *work);
static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay);
static void process_srcu(struct work_struct *work);
@@ -182,6 +186,7 @@ static int init_srcu_struct_fields(struct srcu_struct *sp, bool is_static)
mutex_init(&sp->srcu_barrier_mutex);
atomic_set(&sp->srcu_barrier_cpu_cnt, 0);
INIT_DELAYED_WORK(&sp->work, process_srcu);
+ INIT_LIST_HEAD(&sp->srcu_boot_entry);
if (!is_static)
sp->sda = alloc_percpu(struct srcu_data);
init_srcu_struct_nodes(sp, is_static);
@@ -701,7 +706,11 @@ static void srcu_funnel_gp_start(struct srcu_struct *sp, struct srcu_data *sdp,
rcu_seq_state(sp->srcu_gp_seq) == SRCU_STATE_IDLE) {
WARN_ON_ONCE(ULONG_CMP_GE(sp->srcu_gp_seq, sp->srcu_gp_seq_needed));
srcu_gp_start(sp);
- queue_delayed_work(rcu_gp_wq, &sp->work, srcu_get_delay(sp));
+ if (likely(srcu_init_done))
+ queue_delayed_work(rcu_gp_wq, &sp->work,
+ srcu_get_delay(sp));
+ else if (list_empty(&sp->srcu_boot_entry))
+ list_add(&sp->srcu_boot_entry, &srcu_boot_list);
}
spin_unlock_irqrestore_rcu_node(sp, flags);
}
@@ -1308,3 +1317,17 @@ static int __init srcu_bootup_announce(void)
return 0;
}
early_initcall(srcu_bootup_announce);
+
+void __init srcu_init(void)
+{
+ struct srcu_struct *sp;
+
+ srcu_init_done = true;
+ while (!list_empty(&srcu_boot_list)) {
+ sp = list_first_entry(&srcu_boot_list,
+ struct srcu_struct, srcu_boot_entry);
+ check_init_srcu_struct(sp);
+ list_del_init(&sp->srcu_boot_entry);
+ queue_work(rcu_gp_wq, &sp->work.work);
+ }
+}
diff --git a/kernel/rcu/tiny.c b/kernel/rcu/tiny.c
index 1745d30e170e..5f5963ba313e 100644
--- a/kernel/rcu/tiny.c
+++ b/kernel/rcu/tiny.c
@@ -167,4 +167,5 @@ void __init rcu_init(void)
{
open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
rcu_early_boot_tests();
+ srcu_init();
}
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 821a665cc76d..bf016ff9f873 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -3737,6 +3737,7 @@ void __init rcu_init(void)
WARN_ON(!rcu_gp_wq);
rcu_par_gp_wq = alloc_workqueue("rcu_par_gp", WQ_MEM_RECLAIM, 0);
WARN_ON(!rcu_par_gp_wq);
+ srcu_init();
}
#include "tree_exp.h"
diff --git a/kernel/rcu/update.c b/kernel/rcu/update.c
index f6de0ba1d149..f203b94f6b5b 100644
--- a/kernel/rcu/update.c
+++ b/kernel/rcu/update.c
@@ -880,11 +880,16 @@ static void test_callback(struct rcu_head *r)
pr_info("RCU test callback executed %d\n", rcu_self_test_counter);
}
+DEFINE_STATIC_SRCU(early_srcu);
+
static void early_boot_test_call_rcu(void)
{
static struct rcu_head head;
+ static struct rcu_head shead;
call_rcu(&head, test_callback);
+ if (IS_ENABLED(CONFIG_SRCU))
+ call_srcu(&early_srcu, &shead, test_callback);
}
void rcu_early_boot_tests(void)
@@ -904,6 +909,10 @@ static int rcu_verify_early_boot_tests(void)
if (rcu_self_test) {
early_boot_test_counter++;
rcu_barrier();
+ if (IS_ENABLED(CONFIG_SRCU)) {
+ early_boot_test_counter++;
+ srcu_barrier(&early_srcu);
+ }
}
if (rcu_self_test_counter != early_boot_test_counter) {
WARN_ON(1);