[PATCH v3 tip/core/rcu 37/40] srcu: Introduce CLASSIC_SRCU Kconfig option

From: Paul E. McKenney
Date: Wed Apr 19 2017 - 13:05:48 EST


The TREE_SRCU rewrite is large and a bit on the non-simple side, so
this commit helps reduce risk by allowing the old v4.11 SRCU algorithm
to be selected using a new CLASSIC_SRCU Kconfig option that depends
on RCU_EXPERT. The default is to use the new TREE_SRCU and TINY_SRCU
algorithms, in order to help get these the testing that they need.
However, if your users do not require the update-side scalability that
is to be provided by TREE_SRCU, select RCU_EXPERT and then CLASSIC_SRCU
to revert back to the old classic SRCU algorithm.

Signed-off-by: Paul E. McKenney <paulmck@xxxxxxxxxxxxxxxxxx>
---
include/linux/srcu.h | 2 +
include/linux/srcuclassic.h | 101 ++++++++
init/Kconfig | 21 +-
kernel/rcu/Makefile | 3 +-
kernel/rcu/rcutorture.c | 2 +-
kernel/rcu/srcu.c | 347 ++++++++++++++-----------
kernel/rcu/srcutree.c | 613 ++++++++++++++++++++++++++++++++++++++++++++
7 files changed, 934 insertions(+), 155 deletions(-)
create mode 100644 include/linux/srcuclassic.h
create mode 100644 kernel/rcu/srcutree.c

diff --git a/include/linux/srcu.h b/include/linux/srcu.h
index 907f09b14eda..167ad8831aaf 100644
--- a/include/linux/srcu.h
+++ b/include/linux/srcu.h
@@ -60,6 +60,8 @@ int init_srcu_struct(struct srcu_struct *sp);
#include <linux/srcutiny.h>
#elif defined(CONFIG_TREE_SRCU)
#include <linux/srcutree.h>
+#elif defined(CONFIG_CLASSIC_SRCU)
+#include <linux/srcuclassic.h>
#else
#error "Unknown SRCU implementation specified to kernel configuration"
#endif
diff --git a/include/linux/srcuclassic.h b/include/linux/srcuclassic.h
new file mode 100644
index 000000000000..41cf99930f34
--- /dev/null
+++ b/include/linux/srcuclassic.h
@@ -0,0 +1,101 @@
+/*
+ * Sleepable Read-Copy Update mechanism for mutual exclusion,
+ * classic v4.11 variant.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, you can access it online at
+ * http://www.gnu.org/licenses/gpl-2.0.html.
+ *
+ * Copyright (C) IBM Corporation, 2017
+ *
+ * Author: Paul McKenney <paulmck@xxxxxxxxxx>
+ */
+
+#ifndef _LINUX_SRCU_CLASSIC_H
+#define _LINUX_SRCU_CLASSIC_H
+
+struct srcu_array {
+ unsigned long lock_count[2];
+ unsigned long unlock_count[2];
+};
+
+struct rcu_batch {
+ struct rcu_head *head, **tail;
+};
+
+#define RCU_BATCH_INIT(name) { NULL, &(name.head) }
+
+struct srcu_struct {
+ unsigned long completed;
+ struct srcu_array __percpu *per_cpu_ref;
+ spinlock_t queue_lock; /* protect ->batch_queue, ->running */
+ bool running;
+ /* callbacks just queued */
+ struct rcu_batch batch_queue;
+ /* callbacks try to do the first check_zero */
+ struct rcu_batch batch_check0;
+ /* callbacks done with the first check_zero and the flip */
+ struct rcu_batch batch_check1;
+ struct rcu_batch batch_done;
+ struct delayed_work work;
+#ifdef CONFIG_DEBUG_LOCK_ALLOC
+ struct lockdep_map dep_map;
+#endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */
+};
+
+void process_srcu(struct work_struct *work);
+
+#define __SRCU_STRUCT_INIT(name) \
+ { \
+ .completed = -300, \
+ .per_cpu_ref = &name##_srcu_array, \
+ .queue_lock = __SPIN_LOCK_UNLOCKED(name.queue_lock), \
+ .running = false, \
+ .batch_queue = RCU_BATCH_INIT(name.batch_queue), \
+ .batch_check0 = RCU_BATCH_INIT(name.batch_check0), \
+ .batch_check1 = RCU_BATCH_INIT(name.batch_check1), \
+ .batch_done = RCU_BATCH_INIT(name.batch_done), \
+ .work = __DELAYED_WORK_INITIALIZER(name.work, process_srcu, 0),\
+ __SRCU_DEP_MAP_INIT(name) \
+ }
+
+/*
+ * Define and initialize a srcu struct at build time.
+ * Do -not- call init_srcu_struct() nor cleanup_srcu_struct() on it.
+ *
+ * Note that although DEFINE_STATIC_SRCU() hides the name from other
+ * files, the per-CPU variable rules nevertheless require that the
+ * chosen name be globally unique. These rules also prohibit use of
+ * DEFINE_STATIC_SRCU() within a function. If these rules are too
+ * restrictive, declare the srcu_struct manually. For example, in
+ * each file:
+ *
+ * static struct srcu_struct my_srcu;
+ *
+ * Then, before the first use of each my_srcu, manually initialize it:
+ *
+ * init_srcu_struct(&my_srcu);
+ *
+ * See include/linux/percpu-defs.h for the rules on per-CPU variables.
+ */
+#define __DEFINE_SRCU(name, is_static) \
+ static DEFINE_PER_CPU(struct srcu_array, name##_srcu_array);\
+ is_static struct srcu_struct name = __SRCU_STRUCT_INIT(name)
+#define DEFINE_SRCU(name) __DEFINE_SRCU(name, /* not static */)
+#define DEFINE_STATIC_SRCU(name) __DEFINE_SRCU(name, static)
+
+void synchronize_srcu_expedited(struct srcu_struct *sp);
+void srcu_barrier(struct srcu_struct *sp);
+unsigned long srcu_batches_completed(struct srcu_struct *sp);
+
+#endif
diff --git a/init/Kconfig b/init/Kconfig
index d269f2ca17b8..558cc3638ab9 100644
--- a/init/Kconfig
+++ b/init/Kconfig
@@ -526,15 +526,32 @@ config SRCU
permits arbitrary sleeping or blocking within RCU read-side critical
sections.

+config CLASSIC_SRCU
+ bool "Use v4.11 classic SRCU implementation"
+ default n
+ depends on RCU_EXPERT && SRCU
+ help
+ This option selects the traditional well-tested classic SRCU
+ implementation from v4.11, as might be desired for enterprise
+ Linux distributions. Without this option, the shiny new
+ Tiny SRCU and Tree SRCU implementations are used instead.
+ At some point, it is hoped that Tiny SRCU and Tree SRCU
+ will accumulate enough test time and confidence to allow
+ Classic SRCU to be dropped entirely.
+
+ Say Y if you need a rock-solid SRCU.
+
+ Say N if you would like help test Tree SRCU.
+
config TINY_SRCU
bool
- default y if TINY_RCU
+ default y if TINY_RCU && !CLASSIC_SRCU
help
This option selects the single-CPU non-preemptible version of SRCU.

config TREE_SRCU
bool
- default y if !TINY_RCU
+ default y if !TINY_RCU && !CLASSIC_SRCU
help
This option selects the full-fledged version of SRCU.

diff --git a/kernel/rcu/Makefile b/kernel/rcu/Makefile
index b853214a2b99..158e6593d58c 100644
--- a/kernel/rcu/Makefile
+++ b/kernel/rcu/Makefile
@@ -3,7 +3,8 @@
KCOV_INSTRUMENT := n

obj-y += update.o sync.o
-obj-$(CONFIG_TREE_SRCU) += srcu.o
+obj-$(CONFIG_CLASSIC_SRCU) += srcu.o
+obj-$(CONFIG_TREE_SRCU) += srcutree.o
obj-$(CONFIG_TINY_SRCU) += srcutiny.o
obj-$(CONFIG_RCU_TORTURE_TEST) += rcutorture.o
obj-$(CONFIG_RCU_PERF_TEST) += rcuperf.o
diff --git a/kernel/rcu/rcutorture.c b/kernel/rcu/rcutorture.c
index 9cbb8a7b909d..6f344b6748a8 100644
--- a/kernel/rcu/rcutorture.c
+++ b/kernel/rcu/rcutorture.c
@@ -562,7 +562,7 @@ static void srcu_torture_stats(void)
int __maybe_unused cpu;
int idx;

-#ifdef CONFIG_TREE_SRCU
+#if defined(CONFIG_TREE_SRCU) || defined(CONFIG_CLASSIC_SRCU)
idx = srcu_ctlp->completed & 0x1;
pr_alert("%s%s Tree SRCU per-CPU(idx=%d):",
torture_type, TORTURE_FLAG, idx);
diff --git a/kernel/rcu/srcu.c b/kernel/rcu/srcu.c
index 3cfcc59bddf3..584d8a983883 100644
--- a/kernel/rcu/srcu.c
+++ b/kernel/rcu/srcu.c
@@ -36,16 +36,75 @@
#include <linux/delay.h>
#include <linux/srcu.h>

-#include <linux/rcu_node_tree.h>
#include "rcu.h"

+/*
+ * Initialize an rcu_batch structure to empty.
+ */
+static inline void rcu_batch_init(struct rcu_batch *b)
+{
+ b->head = NULL;
+ b->tail = &b->head;
+}
+
+/*
+ * Enqueue a callback onto the tail of the specified rcu_batch structure.
+ */
+static inline void rcu_batch_queue(struct rcu_batch *b, struct rcu_head *head)
+{
+ *b->tail = head;
+ b->tail = &head->next;
+}
+
+/*
+ * Is the specified rcu_batch structure empty?
+ */
+static inline bool rcu_batch_empty(struct rcu_batch *b)
+{
+ return b->tail == &b->head;
+}
+
+/*
+ * Remove the callback at the head of the specified rcu_batch structure
+ * and return a pointer to it, or return NULL if the structure is empty.
+ */
+static inline struct rcu_head *rcu_batch_dequeue(struct rcu_batch *b)
+{
+ struct rcu_head *head;
+
+ if (rcu_batch_empty(b))
+ return NULL;
+
+ head = b->head;
+ b->head = head->next;
+ if (b->tail == &head->next)
+ rcu_batch_init(b);
+
+ return head;
+}
+
+/*
+ * Move all callbacks from the rcu_batch structure specified by "from" to
+ * the structure specified by "to".
+ */
+static inline void rcu_batch_move(struct rcu_batch *to, struct rcu_batch *from)
+{
+ if (!rcu_batch_empty(from)) {
+ *to->tail = from->head;
+ to->tail = from->tail;
+ rcu_batch_init(from);
+ }
+}
+
static int init_srcu_struct_fields(struct srcu_struct *sp)
{
sp->completed = 0;
- sp->srcu_gp_seq = 0;
- atomic_set(&sp->srcu_exp_cnt, 0);
spin_lock_init(&sp->queue_lock);
- rcu_segcblist_init(&sp->srcu_cblist);
+ sp->running = false;
+ rcu_batch_init(&sp->batch_queue);
+ rcu_batch_init(&sp->batch_check0);
+ rcu_batch_init(&sp->batch_check1);
+ rcu_batch_init(&sp->batch_done);
INIT_DELAYED_WORK(&sp->work, process_srcu);
sp->per_cpu_ref = alloc_percpu(struct srcu_array);
return sp->per_cpu_ref ? 0 : -ENOMEM;
@@ -180,8 +239,6 @@ static bool srcu_readers_active(struct srcu_struct *sp)
return sum;
}

-#define SRCU_INTERVAL 1
-
/**
* cleanup_srcu_struct - deconstruct a sleep-RCU structure
* @sp: structure to clean up.
@@ -197,16 +254,8 @@ static bool srcu_readers_active(struct srcu_struct *sp)
*/
void cleanup_srcu_struct(struct srcu_struct *sp)
{
- WARN_ON_ONCE(atomic_read(&sp->srcu_exp_cnt));
if (WARN_ON(srcu_readers_active(sp)))
return; /* Leakage unless caller handles error. */
- if (WARN_ON(!rcu_segcblist_empty(&sp->srcu_cblist)))
- return; /* Leakage unless caller handles error. */
- flush_delayed_work(&sp->work);
- if (WARN_ON(rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) != SRCU_STATE_IDLE)) {
- pr_info("cleanup_srcu_struct: Active srcu_struct %lu CBs %c state: %d\n", rcu_segcblist_n_cbs(&sp->srcu_cblist), ".E"[rcu_segcblist_empty(&sp->srcu_cblist)], rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)));
- return; /* Caller forgot to stop doing call_srcu()? */
- }
free_percpu(sp->per_cpu_ref);
sp->per_cpu_ref = NULL;
}
@@ -245,36 +294,26 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock);
* We use an adaptive strategy for synchronize_srcu() and especially for
* synchronize_srcu_expedited(). We spin for a fixed time period
* (defined below) to allow SRCU readers to exit their read-side critical
- * sections. If there are still some readers after a few microseconds,
- * we repeatedly block for 1-millisecond time periods.
+ * sections. If there are still some readers after 10 microseconds,
+ * we repeatedly block for 1-millisecond time periods. This approach
+ * has done well in testing, so there is no need for a config parameter.
*/
#define SRCU_RETRY_CHECK_DELAY 5
+#define SYNCHRONIZE_SRCU_TRYCOUNT 2
+#define SYNCHRONIZE_SRCU_EXP_TRYCOUNT 12

/*
- * Start an SRCU grace period.
- */
-static void srcu_gp_start(struct srcu_struct *sp)
-{
- int state;
-
- rcu_segcblist_accelerate(&sp->srcu_cblist,
- rcu_seq_snap(&sp->srcu_gp_seq));
- rcu_seq_start(&sp->srcu_gp_seq);
- state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
- WARN_ON_ONCE(state != SRCU_STATE_SCAN1);
-}
-
-/*
- * Wait until all readers counted by array index idx complete, but
- * loop an additional time if there is an expedited grace period pending.
- * The caller must ensure that ->completed is not changed while checking.
+ * @@@ Wait until all pre-existing readers complete. Such readers
+ * will have used the index specified by "idx".
+ * the caller should ensures the ->completed is not changed while checking
+ * and idx = (->completed & 1) ^ 1
*/
static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount)
{
for (;;) {
if (srcu_readers_active_idx_check(sp, idx))
return true;
- if (--trycount + !!atomic_read(&sp->srcu_exp_cnt) <= 0)
+ if (--trycount <= 0)
return false;
udelay(SRCU_RETRY_CHECK_DELAY);
}
@@ -300,19 +339,6 @@ static void srcu_flip(struct srcu_struct *sp)
}

/*
- * End an SRCU grace period.
- */
-static void srcu_gp_end(struct srcu_struct *sp)
-{
- rcu_seq_end(&sp->srcu_gp_seq);
-
- spin_lock_irq(&sp->queue_lock);
- rcu_segcblist_advance(&sp->srcu_cblist,
- rcu_seq_current(&sp->srcu_gp_seq));
- spin_unlock_irq(&sp->queue_lock);
-}
-
-/*
* Enqueue an SRCU callback on the specified srcu_struct structure,
* initiating grace-period processing if it is not already running.
*
@@ -348,24 +374,26 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *head,
head->func = func;
spin_lock_irqsave(&sp->queue_lock, flags);
smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
- rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
- if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) {
- srcu_gp_start(sp);
+ rcu_batch_queue(&sp->batch_queue, head);
+ if (!sp->running) {
+ sp->running = true;
queue_delayed_work(system_power_efficient_wq, &sp->work, 0);
}
spin_unlock_irqrestore(&sp->queue_lock, flags);
}
EXPORT_SYMBOL_GPL(call_srcu);

-static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay);
+static void srcu_advance_batches(struct srcu_struct *sp, int trycount);
+static void srcu_reschedule(struct srcu_struct *sp);

/*
* Helper function for synchronize_srcu() and synchronize_srcu_expedited().
*/
-static void __synchronize_srcu(struct srcu_struct *sp)
+static void __synchronize_srcu(struct srcu_struct *sp, int trycount)
{
struct rcu_synchronize rcu;
struct rcu_head *head = &rcu.head;
+ bool done = false;

RCU_LOCKDEP_WARN(lock_is_held(&sp->dep_map) ||
lock_is_held(&rcu_bh_lock_map) ||
@@ -373,8 +401,6 @@ static void __synchronize_srcu(struct srcu_struct *sp)
lock_is_held(&rcu_sched_lock_map),
"Illegal synchronize_srcu() in same-type SRCU (or in RCU) read-side critical section");

- if (rcu_scheduler_active == RCU_SCHEDULER_INACTIVE)
- return;
might_sleep();
init_completion(&rcu.completion);

@@ -382,47 +408,31 @@ static void __synchronize_srcu(struct srcu_struct *sp)
head->func = wakeme_after_rcu;
spin_lock_irq(&sp->queue_lock);
smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
- if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) {
+ if (!sp->running) {
/* steal the processing owner */
- rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
- srcu_gp_start(sp);
+ sp->running = true;
+ rcu_batch_queue(&sp->batch_check0, head);
spin_unlock_irq(&sp->queue_lock);
+
+ srcu_advance_batches(sp, trycount);
+ if (!rcu_batch_empty(&sp->batch_done)) {
+ BUG_ON(sp->batch_done.head != head);
+ rcu_batch_dequeue(&sp->batch_done);
+ done = true;
+ }
/* give the processing owner to work_struct */
- srcu_reschedule(sp, 0);
+ srcu_reschedule(sp);
} else {
- rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
+ rcu_batch_queue(&sp->batch_queue, head);
spin_unlock_irq(&sp->queue_lock);
}

- wait_for_completion(&rcu.completion);
- smp_mb(); /* Caller's later accesses after GP. */
-}
-
-/**
- * synchronize_srcu_expedited - Brute-force SRCU grace period
- * @sp: srcu_struct with which to synchronize.
- *
- * Wait for an SRCU grace period to elapse, but be more aggressive about
- * spinning rather than blocking when waiting.
- *
- * Note that synchronize_srcu_expedited() has the same deadlock and
- * memory-ordering properties as does synchronize_srcu().
- */
-void synchronize_srcu_expedited(struct srcu_struct *sp)
-{
- bool do_norm = rcu_gp_is_normal();
-
- if (!do_norm) {
- atomic_inc(&sp->srcu_exp_cnt);
- smp_mb__after_atomic(); /* increment before GP. */
- }
- __synchronize_srcu(sp);
- if (!do_norm) {
- smp_mb__before_atomic(); /* GP before decrement. */
- atomic_dec(&sp->srcu_exp_cnt);
+ if (!done) {
+ wait_for_completion(&rcu.completion);
+ smp_mb(); /* Caller's later accesses after GP. */
}
+
}
-EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);

/**
* synchronize_srcu - wait for prior SRCU read-side critical-section completion
@@ -465,14 +475,29 @@ EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
*/
void synchronize_srcu(struct srcu_struct *sp)
{
- if (rcu_gp_is_expedited())
- synchronize_srcu_expedited(sp);
- else
- __synchronize_srcu(sp);
+ __synchronize_srcu(sp, (rcu_gp_is_expedited() && !rcu_gp_is_normal())
+ ? SYNCHRONIZE_SRCU_EXP_TRYCOUNT
+ : SYNCHRONIZE_SRCU_TRYCOUNT);
}
EXPORT_SYMBOL_GPL(synchronize_srcu);

/**
+ * synchronize_srcu_expedited - Brute-force SRCU grace period
+ * @sp: srcu_struct with which to synchronize.
+ *
+ * Wait for an SRCU grace period to elapse, but be more aggressive about
+ * spinning rather than blocking when waiting.
+ *
+ * Note that synchronize_srcu_expedited() has the same deadlock and
+ * memory-ordering properties as does synchronize_srcu().
+ */
+void synchronize_srcu_expedited(struct srcu_struct *sp)
+{
+ __synchronize_srcu(sp, SYNCHRONIZE_SRCU_EXP_TRYCOUNT);
+}
+EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
+
+/**
* srcu_barrier - Wait until all in-flight call_srcu() callbacks complete.
* @sp: srcu_struct on which to wait for in-flight callbacks.
*/
@@ -495,13 +520,29 @@ unsigned long srcu_batches_completed(struct srcu_struct *sp)
}
EXPORT_SYMBOL_GPL(srcu_batches_completed);

+#define SRCU_CALLBACK_BATCH 10
+#define SRCU_INTERVAL 1
+
+/*
+ * Move any new SRCU callbacks to the first stage of the SRCU grace
+ * period pipeline.
+ */
+static void srcu_collect_new(struct srcu_struct *sp)
+{
+ if (!rcu_batch_empty(&sp->batch_queue)) {
+ spin_lock_irq(&sp->queue_lock);
+ rcu_batch_move(&sp->batch_check0, &sp->batch_queue);
+ spin_unlock_irq(&sp->queue_lock);
+ }
+}
+
/*
* Core SRCU state machine. Advance callbacks from ->batch_check0 to
* ->batch_check1 and then to ->batch_done as readers drain.
*/
-static void srcu_advance_batches(struct srcu_struct *sp)
+static void srcu_advance_batches(struct srcu_struct *sp, int trycount)
{
- int idx;
+ int idx = 1 ^ (sp->completed & 1);

/*
* Because readers might be delayed for an extended period after
@@ -509,44 +550,50 @@ static void srcu_advance_batches(struct srcu_struct *sp)
* might well be readers using both idx=0 and idx=1. We therefore
* need to wait for readers to clear from both index values before
* invoking a callback.
- *
- * The load-acquire ensures that we see the accesses performed
- * by the prior grace period.
*/
- idx = rcu_seq_state(smp_load_acquire(&sp->srcu_gp_seq)); /* ^^^ */
- if (idx == SRCU_STATE_IDLE) {
- spin_lock_irq(&sp->queue_lock);
- if (rcu_segcblist_empty(&sp->srcu_cblist)) {
- spin_unlock_irq(&sp->queue_lock);
- return;
- }
- idx = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
- if (idx == SRCU_STATE_IDLE)
- srcu_gp_start(sp);
- spin_unlock_irq(&sp->queue_lock);
- if (idx != SRCU_STATE_IDLE)
- return; /* Someone else started the grace period. */
- }

- if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN1) {
- idx = 1 ^ (sp->completed & 1);
- if (!try_check_zero(sp, idx, 1))
- return; /* readers present, retry later. */
- srcu_flip(sp);
- rcu_seq_set_state(&sp->srcu_gp_seq, SRCU_STATE_SCAN2);
- }
+ if (rcu_batch_empty(&sp->batch_check0) &&
+ rcu_batch_empty(&sp->batch_check1))
+ return; /* no callbacks need to be advanced */

- if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN2) {
+ if (!try_check_zero(sp, idx, trycount))
+ return; /* failed to advance, will try after SRCU_INTERVAL */

- /*
- * SRCU read-side critical sections are normally short,
- * so check at least twice in quick succession after a flip.
- */
- idx = 1 ^ (sp->completed & 1);
- if (!try_check_zero(sp, idx, 2))
- return; /* readers present, retry after later. */
- srcu_gp_end(sp);
- }
+ /*
+ * The callbacks in ->batch_check1 have already done with their
+ * first zero check and flip back when they were enqueued on
+ * ->batch_check0 in a previous invocation of srcu_advance_batches().
+ * (Presumably try_check_zero() returned false during that
+ * invocation, leaving the callbacks stranded on ->batch_check1.)
+ * They are therefore ready to invoke, so move them to ->batch_done.
+ */
+ rcu_batch_move(&sp->batch_done, &sp->batch_check1);
+
+ if (rcu_batch_empty(&sp->batch_check0))
+ return; /* no callbacks need to be advanced */
+ srcu_flip(sp);
+
+ /*
+ * The callbacks in ->batch_check0 just finished their
+ * first check zero and flip, so move them to ->batch_check1
+ * for future checking on the other idx.
+ */
+ rcu_batch_move(&sp->batch_check1, &sp->batch_check0);
+
+ /*
+ * SRCU read-side critical sections are normally short, so check
+ * at least twice in quick succession after a flip.
+ */
+ trycount = trycount < 2 ? 2 : trycount;
+ if (!try_check_zero(sp, idx^1, trycount))
+ return; /* failed to advance, will try after SRCU_INTERVAL */
+
+ /*
+ * The callbacks in ->batch_check1 have now waited for all
+ * pre-existing readers using both idx values. They are therefore
+ * ready to invoke, so move them to ->batch_done.
+ */
+ rcu_batch_move(&sp->batch_done, &sp->batch_check1);
}

/*
@@ -557,48 +604,45 @@ static void srcu_advance_batches(struct srcu_struct *sp)
*/
static void srcu_invoke_callbacks(struct srcu_struct *sp)
{
- struct rcu_cblist ready_cbs;
- struct rcu_head *rhp;
+ int i;
+ struct rcu_head *head;

- spin_lock_irq(&sp->queue_lock);
- if (!rcu_segcblist_ready_cbs(&sp->srcu_cblist)) {
- spin_unlock_irq(&sp->queue_lock);
- return;
- }
- rcu_cblist_init(&ready_cbs);
- rcu_segcblist_extract_done_cbs(&sp->srcu_cblist, &ready_cbs);
- spin_unlock_irq(&sp->queue_lock);
- rhp = rcu_cblist_dequeue(&ready_cbs);
- for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) {
+ for (i = 0; i < SRCU_CALLBACK_BATCH; i++) {
+ head = rcu_batch_dequeue(&sp->batch_done);
+ if (!head)
+ break;
local_bh_disable();
- rhp->func(rhp);
+ head->func(head);
local_bh_enable();
}
- spin_lock_irq(&sp->queue_lock);
- rcu_segcblist_insert_count(&sp->srcu_cblist, &ready_cbs);
- spin_unlock_irq(&sp->queue_lock);
}

/*
* Finished one round of SRCU grace period. Start another if there are
* more SRCU callbacks queued, otherwise put SRCU into not-running state.
*/
-static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay)
+static void srcu_reschedule(struct srcu_struct *sp)
{
bool pending = true;
- int state;

- if (rcu_segcblist_empty(&sp->srcu_cblist)) {
+ if (rcu_batch_empty(&sp->batch_done) &&
+ rcu_batch_empty(&sp->batch_check1) &&
+ rcu_batch_empty(&sp->batch_check0) &&
+ rcu_batch_empty(&sp->batch_queue)) {
spin_lock_irq(&sp->queue_lock);
- state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
- if (rcu_segcblist_empty(&sp->srcu_cblist) &&
- state == SRCU_STATE_IDLE)
+ if (rcu_batch_empty(&sp->batch_done) &&
+ rcu_batch_empty(&sp->batch_check1) &&
+ rcu_batch_empty(&sp->batch_check0) &&
+ rcu_batch_empty(&sp->batch_queue)) {
+ sp->running = false;
pending = false;
+ }
spin_unlock_irq(&sp->queue_lock);
}

if (pending)
- queue_delayed_work(system_power_efficient_wq, &sp->work, delay);
+ queue_delayed_work(system_power_efficient_wq,
+ &sp->work, SRCU_INTERVAL);
}

/*
@@ -610,8 +654,9 @@ void process_srcu(struct work_struct *work)

sp = container_of(work, struct srcu_struct, work.work);

- srcu_advance_batches(sp);
+ srcu_collect_new(sp);
+ srcu_advance_batches(sp, 1);
srcu_invoke_callbacks(sp);
- srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL);
+ srcu_reschedule(sp);
}
EXPORT_SYMBOL_GPL(process_srcu);
diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c
new file mode 100644
index 000000000000..da676b0d016b
--- /dev/null
+++ b/kernel/rcu/srcutree.c
@@ -0,0 +1,613 @@
+/*
+ * Sleepable Read-Copy Update mechanism for mutual exclusion.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, you can access it online at
+ * http://www.gnu.org/licenses/gpl-2.0.html.
+ *
+ * Copyright (C) IBM Corporation, 2006
+ * Copyright (C) Fujitsu, 2012
+ *
+ * Author: Paul McKenney <paulmck@xxxxxxxxxx>
+ * Lai Jiangshan <laijs@xxxxxxxxxxxxxx>
+ *
+ * For detailed explanation of Read-Copy Update mechanism see -
+ * Documentation/RCU/ *.txt
+ *
+ */
+
+#include <linux/export.h>
+#include <linux/mutex.h>
+#include <linux/percpu.h>
+#include <linux/preempt.h>
+#include <linux/rcupdate_wait.h>
+#include <linux/sched.h>
+#include <linux/smp.h>
+#include <linux/delay.h>
+#include <linux/srcu.h>
+
+#include <linux/rcu_node_tree.h>
+#include "rcu.h"
+
+static int init_srcu_struct_fields(struct srcu_struct *sp)
+{
+ sp->completed = 0;
+ sp->srcu_gp_seq = 0;
+ atomic_set(&sp->srcu_exp_cnt, 0);
+ spin_lock_init(&sp->queue_lock);
+ rcu_segcblist_init(&sp->srcu_cblist);
+ INIT_DELAYED_WORK(&sp->work, process_srcu);
+ sp->per_cpu_ref = alloc_percpu(struct srcu_array);
+ return sp->per_cpu_ref ? 0 : -ENOMEM;
+}
+
+#ifdef CONFIG_DEBUG_LOCK_ALLOC
+
+int __init_srcu_struct(struct srcu_struct *sp, const char *name,
+ struct lock_class_key *key)
+{
+ /* Don't re-initialize a lock while it is held. */
+ debug_check_no_locks_freed((void *)sp, sizeof(*sp));
+ lockdep_init_map(&sp->dep_map, name, key, 0);
+ return init_srcu_struct_fields(sp);
+}
+EXPORT_SYMBOL_GPL(__init_srcu_struct);
+
+#else /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */
+
+/**
+ * init_srcu_struct - initialize a sleep-RCU structure
+ * @sp: structure to initialize.
+ *
+ * Must invoke this on a given srcu_struct before passing that srcu_struct
+ * to any other function. Each srcu_struct represents a separate domain
+ * of SRCU protection.
+ */
+int init_srcu_struct(struct srcu_struct *sp)
+{
+ return init_srcu_struct_fields(sp);
+}
+EXPORT_SYMBOL_GPL(init_srcu_struct);
+
+#endif /* #else #ifdef CONFIG_DEBUG_LOCK_ALLOC */
+
+/*
+ * Returns approximate total of the readers' ->lock_count[] values for the
+ * rank of per-CPU counters specified by idx.
+ */
+static unsigned long srcu_readers_lock_idx(struct srcu_struct *sp, int idx)
+{
+ int cpu;
+ unsigned long sum = 0;
+
+ for_each_possible_cpu(cpu) {
+ struct srcu_array *cpuc = per_cpu_ptr(sp->per_cpu_ref, cpu);
+
+ sum += READ_ONCE(cpuc->lock_count[idx]);
+ }
+ return sum;
+}
+
+/*
+ * Returns approximate total of the readers' ->unlock_count[] values for the
+ * rank of per-CPU counters specified by idx.
+ */
+static unsigned long srcu_readers_unlock_idx(struct srcu_struct *sp, int idx)
+{
+ int cpu;
+ unsigned long sum = 0;
+
+ for_each_possible_cpu(cpu) {
+ struct srcu_array *cpuc = per_cpu_ptr(sp->per_cpu_ref, cpu);
+
+ sum += READ_ONCE(cpuc->unlock_count[idx]);
+ }
+ return sum;
+}
+
+/*
+ * Return true if the number of pre-existing readers is determined to
+ * be zero.
+ */
+static bool srcu_readers_active_idx_check(struct srcu_struct *sp, int idx)
+{
+ unsigned long unlocks;
+
+ unlocks = srcu_readers_unlock_idx(sp, idx);
+
+ /*
+ * Make sure that a lock is always counted if the corresponding
+ * unlock is counted. Needs to be a smp_mb() as the read side may
+ * contain a read from a variable that is written to before the
+ * synchronize_srcu() in the write side. In this case smp_mb()s
+ * A and B act like the store buffering pattern.
+ *
+ * This smp_mb() also pairs with smp_mb() C to prevent accesses
+ * after the synchronize_srcu() from being executed before the
+ * grace period ends.
+ */
+ smp_mb(); /* A */
+
+ /*
+ * If the locks are the same as the unlocks, then there must have
+ * been no readers on this index at some time in between. This does
+ * not mean that there are no more readers, as one could have read
+ * the current index but not have incremented the lock counter yet.
+ *
+ * Possible bug: There is no guarantee that there haven't been
+ * ULONG_MAX increments of ->lock_count[] since the unlocks were
+ * counted, meaning that this could return true even if there are
+ * still active readers. Since there are no memory barriers around
+ * srcu_flip(), the CPU is not required to increment ->completed
+ * before running srcu_readers_unlock_idx(), which means that there
+ * could be an arbitrarily large number of critical sections that
+ * execute after srcu_readers_unlock_idx() but use the old value
+ * of ->completed.
+ */
+ return srcu_readers_lock_idx(sp, idx) == unlocks;
+}
+
+/**
+ * srcu_readers_active - returns true if there are readers. and false
+ * otherwise
+ * @sp: which srcu_struct to count active readers (holding srcu_read_lock).
+ *
+ * Note that this is not an atomic primitive, and can therefore suffer
+ * severe errors when invoked on an active srcu_struct. That said, it
+ * can be useful as an error check at cleanup time.
+ */
+static bool srcu_readers_active(struct srcu_struct *sp)
+{
+ int cpu;
+ unsigned long sum = 0;
+
+ for_each_possible_cpu(cpu) {
+ struct srcu_array *cpuc = per_cpu_ptr(sp->per_cpu_ref, cpu);
+
+ sum += READ_ONCE(cpuc->lock_count[0]);
+ sum += READ_ONCE(cpuc->lock_count[1]);
+ sum -= READ_ONCE(cpuc->unlock_count[0]);
+ sum -= READ_ONCE(cpuc->unlock_count[1]);
+ }
+ return sum;
+}
+
+#define SRCU_INTERVAL 1
+
+/**
+ * cleanup_srcu_struct - deconstruct a sleep-RCU structure
+ * @sp: structure to clean up.
+ *
+ * Must invoke this after you are finished using a given srcu_struct that
+ * was initialized via init_srcu_struct(), else you leak memory.
+ */
+void cleanup_srcu_struct(struct srcu_struct *sp)
+{
+ WARN_ON_ONCE(atomic_read(&sp->srcu_exp_cnt));
+ if (WARN_ON(srcu_readers_active(sp)))
+ return; /* Leakage unless caller handles error. */
+ if (WARN_ON(!rcu_segcblist_empty(&sp->srcu_cblist)))
+ return; /* Leakage unless caller handles error. */
+ flush_delayed_work(&sp->work);
+ if (WARN_ON(rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) != SRCU_STATE_IDLE)) {
+ pr_info("cleanup_srcu_struct: Active srcu_struct %lu CBs %c state: %d\n", rcu_segcblist_n_cbs(&sp->srcu_cblist), ".E"[rcu_segcblist_empty(&sp->srcu_cblist)], rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)));
+ return; /* Caller forgot to stop doing call_srcu()? */
+ }
+ free_percpu(sp->per_cpu_ref);
+ sp->per_cpu_ref = NULL;
+}
+EXPORT_SYMBOL_GPL(cleanup_srcu_struct);
+
+/*
+ * Counts the new reader in the appropriate per-CPU element of the
+ * srcu_struct. Must be called from process context.
+ * Returns an index that must be passed to the matching srcu_read_unlock().
+ */
+int __srcu_read_lock(struct srcu_struct *sp)
+{
+ int idx;
+
+ idx = READ_ONCE(sp->completed) & 0x1;
+ __this_cpu_inc(sp->per_cpu_ref->lock_count[idx]);
+ smp_mb(); /* B */ /* Avoid leaking the critical section. */
+ return idx;
+}
+EXPORT_SYMBOL_GPL(__srcu_read_lock);
+
+/*
+ * Removes the count for the old reader from the appropriate per-CPU
+ * element of the srcu_struct. Note that this may well be a different
+ * CPU than that which was incremented by the corresponding srcu_read_lock().
+ * Must be called from process context.
+ */
+void __srcu_read_unlock(struct srcu_struct *sp, int idx)
+{
+ smp_mb(); /* C */ /* Avoid leaking the critical section. */
+ this_cpu_inc(sp->per_cpu_ref->unlock_count[idx]);
+}
+EXPORT_SYMBOL_GPL(__srcu_read_unlock);
+
+/*
+ * We use an adaptive strategy for synchronize_srcu() and especially for
+ * synchronize_srcu_expedited(). We spin for a fixed time period
+ * (defined below) to allow SRCU readers to exit their read-side critical
+ * sections. If there are still some readers after a few microseconds,
+ * we repeatedly block for 1-millisecond time periods.
+ */
+#define SRCU_RETRY_CHECK_DELAY 5
+
+/*
+ * Start an SRCU grace period.
+ */
+static void srcu_gp_start(struct srcu_struct *sp)
+{
+ int state;
+
+ rcu_segcblist_accelerate(&sp->srcu_cblist,
+ rcu_seq_snap(&sp->srcu_gp_seq));
+ rcu_seq_start(&sp->srcu_gp_seq);
+ state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
+ WARN_ON_ONCE(state != SRCU_STATE_SCAN1);
+}
+
+/*
+ * Wait until all readers counted by array index idx complete, but
+ * loop an additional time if there is an expedited grace period pending.
+ * The caller must ensure that ->completed is not changed while checking.
+ */
+static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount)
+{
+ for (;;) {
+ if (srcu_readers_active_idx_check(sp, idx))
+ return true;
+ if (--trycount + !!atomic_read(&sp->srcu_exp_cnt) <= 0)
+ return false;
+ udelay(SRCU_RETRY_CHECK_DELAY);
+ }
+}
+
+/*
+ * Increment the ->completed counter so that future SRCU readers will
+ * use the other rank of the ->(un)lock_count[] arrays. This allows
+ * us to wait for pre-existing readers in a starvation-free manner.
+ */
+static void srcu_flip(struct srcu_struct *sp)
+{
+ WRITE_ONCE(sp->completed, sp->completed + 1);
+
+ /*
+ * Ensure that if the updater misses an __srcu_read_unlock()
+ * increment, that task's next __srcu_read_lock() will see the
+ * above counter update. Note that both this memory barrier
+ * and the one in srcu_readers_active_idx_check() provide the
+ * guarantee for __srcu_read_lock().
+ */
+ smp_mb(); /* D */ /* Pairs with C. */
+}
+
+/*
+ * End an SRCU grace period.
+ */
+static void srcu_gp_end(struct srcu_struct *sp)
+{
+ rcu_seq_end(&sp->srcu_gp_seq);
+
+ spin_lock_irq(&sp->queue_lock);
+ rcu_segcblist_advance(&sp->srcu_cblist,
+ rcu_seq_current(&sp->srcu_gp_seq));
+ spin_unlock_irq(&sp->queue_lock);
+}
+
+/*
+ * Enqueue an SRCU callback on the specified srcu_struct structure,
+ * initiating grace-period processing if it is not already running.
+ *
+ * Note that all CPUs must agree that the grace period extended beyond
+ * all pre-existing SRCU read-side critical section. On systems with
+ * more than one CPU, this means that when "func()" is invoked, each CPU
+ * is guaranteed to have executed a full memory barrier since the end of
+ * its last corresponding SRCU read-side critical section whose beginning
+ * preceded the call to call_rcu(). It also means that each CPU executing
+ * an SRCU read-side critical section that continues beyond the start of
+ * "func()" must have executed a memory barrier after the call_rcu()
+ * but before the beginning of that SRCU read-side critical section.
+ * Note that these guarantees include CPUs that are offline, idle, or
+ * executing in user mode, as well as CPUs that are executing in the kernel.
+ *
+ * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the
+ * resulting SRCU callback function "func()", then both CPU A and CPU
+ * B are guaranteed to execute a full memory barrier during the time
+ * interval between the call to call_rcu() and the invocation of "func()".
+ * This guarantee applies even if CPU A and CPU B are the same CPU (but
+ * again only if the system has more than one CPU).
+ *
+ * Of course, these guarantees apply only for invocations of call_srcu(),
+ * srcu_read_lock(), and srcu_read_unlock() that are all passed the same
+ * srcu_struct structure.
+ */
+void call_srcu(struct srcu_struct *sp, struct rcu_head *head,
+ rcu_callback_t func)
+{
+ unsigned long flags;
+
+ head->next = NULL;
+ head->func = func;
+ spin_lock_irqsave(&sp->queue_lock, flags);
+ smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
+ rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
+ if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) {
+ srcu_gp_start(sp);
+ queue_delayed_work(system_power_efficient_wq, &sp->work, 0);
+ }
+ spin_unlock_irqrestore(&sp->queue_lock, flags);
+}
+EXPORT_SYMBOL_GPL(call_srcu);
+
+static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay);
+
+/*
+ * Helper function for synchronize_srcu() and synchronize_srcu_expedited().
+ */
+static void __synchronize_srcu(struct srcu_struct *sp)
+{
+ struct rcu_synchronize rcu;
+ struct rcu_head *head = &rcu.head;
+
+ RCU_LOCKDEP_WARN(lock_is_held(&sp->dep_map) ||
+ lock_is_held(&rcu_bh_lock_map) ||
+ lock_is_held(&rcu_lock_map) ||
+ lock_is_held(&rcu_sched_lock_map),
+ "Illegal synchronize_srcu() in same-type SRCU (or in RCU) read-side critical section");
+
+ if (rcu_scheduler_active == RCU_SCHEDULER_INACTIVE)
+ return;
+ might_sleep();
+ init_completion(&rcu.completion);
+
+ head->next = NULL;
+ head->func = wakeme_after_rcu;
+ spin_lock_irq(&sp->queue_lock);
+ smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
+ if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) {
+ /* steal the processing owner */
+ rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
+ srcu_gp_start(sp);
+ spin_unlock_irq(&sp->queue_lock);
+ /* give the processing owner to work_struct */
+ srcu_reschedule(sp, 0);
+ } else {
+ rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
+ spin_unlock_irq(&sp->queue_lock);
+ }
+
+ wait_for_completion(&rcu.completion);
+ smp_mb(); /* Caller's later accesses after GP. */
+}
+
+/**
+ * synchronize_srcu_expedited - Brute-force SRCU grace period
+ * @sp: srcu_struct with which to synchronize.
+ *
+ * Wait for an SRCU grace period to elapse, but be more aggressive about
+ * spinning rather than blocking when waiting.
+ *
+ * Note that synchronize_srcu_expedited() has the same deadlock and
+ * memory-ordering properties as does synchronize_srcu().
+ */
+void synchronize_srcu_expedited(struct srcu_struct *sp)
+{
+ bool do_norm = rcu_gp_is_normal();
+
+ if (!do_norm) {
+ atomic_inc(&sp->srcu_exp_cnt);
+ smp_mb__after_atomic(); /* increment before GP. */
+ }
+ __synchronize_srcu(sp);
+ if (!do_norm) {
+ smp_mb__before_atomic(); /* GP before decrement. */
+ atomic_dec(&sp->srcu_exp_cnt);
+ }
+}
+EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
+
+/**
+ * synchronize_srcu - wait for prior SRCU read-side critical-section completion
+ * @sp: srcu_struct with which to synchronize.
+ *
+ * Wait for the count to drain to zero of both indexes. To avoid the
+ * possible starvation of synchronize_srcu(), it waits for the count of
+ * the index=((->completed & 1) ^ 1) to drain to zero at first,
+ * and then flip the completed and wait for the count of the other index.
+ *
+ * Can block; must be called from process context.
+ *
+ * Note that it is illegal to call synchronize_srcu() from the corresponding
+ * SRCU read-side critical section; doing so will result in deadlock.
+ * However, it is perfectly legal to call synchronize_srcu() on one
+ * srcu_struct from some other srcu_struct's read-side critical section,
+ * as long as the resulting graph of srcu_structs is acyclic.
+ *
+ * There are memory-ordering constraints implied by synchronize_srcu().
+ * On systems with more than one CPU, when synchronize_srcu() returns,
+ * each CPU is guaranteed to have executed a full memory barrier since
+ * the end of its last corresponding SRCU-sched read-side critical section
+ * whose beginning preceded the call to synchronize_srcu(). In addition,
+ * each CPU having an SRCU read-side critical section that extends beyond
+ * the return from synchronize_srcu() is guaranteed to have executed a
+ * full memory barrier after the beginning of synchronize_srcu() and before
+ * the beginning of that SRCU read-side critical section. Note that these
+ * guarantees include CPUs that are offline, idle, or executing in user mode,
+ * as well as CPUs that are executing in the kernel.
+ *
+ * Furthermore, if CPU A invoked synchronize_srcu(), which returned
+ * to its caller on CPU B, then both CPU A and CPU B are guaranteed
+ * to have executed a full memory barrier during the execution of
+ * synchronize_srcu(). This guarantee applies even if CPU A and CPU B
+ * are the same CPU, but again only if the system has more than one CPU.
+ *
+ * Of course, these memory-ordering guarantees apply only when
+ * synchronize_srcu(), srcu_read_lock(), and srcu_read_unlock() are
+ * passed the same srcu_struct structure.
+ */
+void synchronize_srcu(struct srcu_struct *sp)
+{
+ if (rcu_gp_is_expedited())
+ synchronize_srcu_expedited(sp);
+ else
+ __synchronize_srcu(sp);
+}
+EXPORT_SYMBOL_GPL(synchronize_srcu);
+
+/**
+ * srcu_barrier - Wait until all in-flight call_srcu() callbacks complete.
+ * @sp: srcu_struct on which to wait for in-flight callbacks.
+ */
+void srcu_barrier(struct srcu_struct *sp)
+{
+ synchronize_srcu(sp);
+}
+EXPORT_SYMBOL_GPL(srcu_barrier);
+
+/**
+ * srcu_batches_completed - return batches completed.
+ * @sp: srcu_struct on which to report batch completion.
+ *
+ * Report the number of batches, correlated with, but not necessarily
+ * precisely the same as, the number of grace periods that have elapsed.
+ */
+unsigned long srcu_batches_completed(struct srcu_struct *sp)
+{
+ return sp->completed;
+}
+EXPORT_SYMBOL_GPL(srcu_batches_completed);
+
+/*
+ * Core SRCU state machine. Advance callbacks from ->batch_check0 to
+ * ->batch_check1 and then to ->batch_done as readers drain.
+ */
+static void srcu_advance_batches(struct srcu_struct *sp)
+{
+ int idx;
+
+ /*
+ * Because readers might be delayed for an extended period after
+ * fetching ->completed for their index, at any point in time there
+ * might well be readers using both idx=0 and idx=1. We therefore
+ * need to wait for readers to clear from both index values before
+ * invoking a callback.
+ *
+ * The load-acquire ensures that we see the accesses performed
+ * by the prior grace period.
+ */
+ idx = rcu_seq_state(smp_load_acquire(&sp->srcu_gp_seq)); /* ^^^ */
+ if (idx == SRCU_STATE_IDLE) {
+ spin_lock_irq(&sp->queue_lock);
+ if (rcu_segcblist_empty(&sp->srcu_cblist)) {
+ spin_unlock_irq(&sp->queue_lock);
+ return;
+ }
+ idx = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
+ if (idx == SRCU_STATE_IDLE)
+ srcu_gp_start(sp);
+ spin_unlock_irq(&sp->queue_lock);
+ if (idx != SRCU_STATE_IDLE)
+ return; /* Someone else started the grace period. */
+ }
+
+ if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN1) {
+ idx = 1 ^ (sp->completed & 1);
+ if (!try_check_zero(sp, idx, 1))
+ return; /* readers present, retry later. */
+ srcu_flip(sp);
+ rcu_seq_set_state(&sp->srcu_gp_seq, SRCU_STATE_SCAN2);
+ }
+
+ if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN2) {
+
+ /*
+ * SRCU read-side critical sections are normally short,
+ * so check at least twice in quick succession after a flip.
+ */
+ idx = 1 ^ (sp->completed & 1);
+ if (!try_check_zero(sp, idx, 2))
+ return; /* readers present, retry after later. */
+ srcu_gp_end(sp);
+ }
+}
+
+/*
+ * Invoke a limited number of SRCU callbacks that have passed through
+ * their grace period. If there are more to do, SRCU will reschedule
+ * the workqueue. Note that needed memory barriers have been executed
+ * in this task's context by srcu_readers_active_idx_check().
+ */
+static void srcu_invoke_callbacks(struct srcu_struct *sp)
+{
+ struct rcu_cblist ready_cbs;
+ struct rcu_head *rhp;
+
+ spin_lock_irq(&sp->queue_lock);
+ if (!rcu_segcblist_ready_cbs(&sp->srcu_cblist)) {
+ spin_unlock_irq(&sp->queue_lock);
+ return;
+ }
+ rcu_cblist_init(&ready_cbs);
+ rcu_segcblist_extract_done_cbs(&sp->srcu_cblist, &ready_cbs);
+ spin_unlock_irq(&sp->queue_lock);
+ rhp = rcu_cblist_dequeue(&ready_cbs);
+ for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) {
+ local_bh_disable();
+ rhp->func(rhp);
+ local_bh_enable();
+ }
+ spin_lock_irq(&sp->queue_lock);
+ rcu_segcblist_insert_count(&sp->srcu_cblist, &ready_cbs);
+ spin_unlock_irq(&sp->queue_lock);
+}
+
+/*
+ * Finished one round of SRCU grace period. Start another if there are
+ * more SRCU callbacks queued, otherwise put SRCU into not-running state.
+ */
+static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay)
+{
+ bool pending = true;
+ int state;
+
+ if (rcu_segcblist_empty(&sp->srcu_cblist)) {
+ spin_lock_irq(&sp->queue_lock);
+ state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
+ if (rcu_segcblist_empty(&sp->srcu_cblist) &&
+ state == SRCU_STATE_IDLE)
+ pending = false;
+ spin_unlock_irq(&sp->queue_lock);
+ }
+
+ if (pending)
+ queue_delayed_work(system_power_efficient_wq, &sp->work, delay);
+}
+
+/*
+ * This is the work-queue function that handles SRCU grace periods.
+ */
+void process_srcu(struct work_struct *work)
+{
+ struct srcu_struct *sp;
+
+ sp = container_of(work, struct srcu_struct, work.work);
+
+ srcu_advance_batches(sp);
+ srcu_invoke_callbacks(sp);
+ srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL);
+}
+EXPORT_SYMBOL_GPL(process_srcu);
--
2.5.2