[PATCH RFC] v4 somewhat-expedited "big hammer" RCU grace periods

From: Paul E. McKenney
Date: Fri May 08 2009 - 13:08:38 EST


Fourth cut of "big hammer" expedited RCU grace periods. This uses
a kthread that schedules itself on all online CPUs in turn, thus
forcing a grace period. The synchronize_sched(), synchronize_rcu(),
and synchronize_bh() primitives wake this kthread up and then wait for
it to force the grace period.

As before, this does nothing to expedite callbacks already registered
with call_rcu() or call_rcu_bh(), but there is no need to. Just maps
to synchronize_rcu() and a new synchronize_rcu_bh() on preemptable RCU,
which has more complex grace-period detection -- this can be fixed later.

Passes light rcutorture testing. Grace periods take around 200
microseconds on an 8-CPU Power machine. This is a good order of magnitude
better than v3, but an order of magnitude slower than v2. Furthermore,
it will get slower the more CPUs you have, and eight CPUs is not all
that many these days. So this implementation still does not cut it.

Once again, I am posting this on the off-chance that I made some stupid
mistake that someone might spot. Absent that, I am taking yet another
different approach, namely setting up per-CPU threads that are awakened
via smp_call_function(), permitting the quiescent states to be waited
for in parallel.

Shortcomings:

o Too slow!!! Thinking in terms of using per-CPU kthreads.

o The wait_event() calls result in 120-second warnings, need
to use something like wait_event_interruptible(). There are
probably other corner cases that need attention.

o Does not address preemptable RCU.

Changes since v3:

o Use a kthread that schedules itself on each CPU in turn to
force a grace period. The synchronize_rcu() primitive
wakes up the kthread in order to avoid messing with affinity
masks on user tasks.

o Tried a number of additional variations on the v3 approach, none
of which helped much.

Changes since v2:

o Use reschedule IPIs rather than a softirq.

Changes since v1:

o Added rcutorture support, and added exports required by
rcutorture.

o Added comment stating that smp_call_function() implies a
memory barrier, suggested by Mathieu.

o Added #include for delay.h.

Signed-off-by: Paul E. McKenney <paulmck@xxxxxxxxxxxxxxxxxx>
---

include/linux/rcuclassic.h | 16 +++
include/linux/rcupdate.h | 24 ++---
include/linux/rcupreempt.h | 10 ++
include/linux/rcutree.h | 13 ++
kernel/rcupdate.c | 103 +++++++++++++++++++++++
kernel/rcupreempt.c | 1
kernel/rcutorture.c | 200 ++++++++++++++++++++++++---------------------
7 files changed, 261 insertions(+), 106 deletions(-)

diff --git a/include/linux/rcuclassic.h b/include/linux/rcuclassic.h
index bfd92e1..ea1ceb2 100644
--- a/include/linux/rcuclassic.h
+++ b/include/linux/rcuclassic.h
@@ -158,14 +158,28 @@ extern struct lockdep_map rcu_lock_map;

#define call_rcu_sched(head, func) call_rcu(head, func)

+static inline void synchronize_rcu_expedited(void)
+{
+ synchronize_sched_expedited();
+}
+
+static inline void synchronize_rcu_bh_expedited(void)
+{
+ synchronize_sched_expedited();
+}
+
extern void __rcu_init(void);
-#define rcu_init_sched() do { } while (0)
extern void rcu_check_callbacks(int cpu, int user);
extern void rcu_restart_cpu(int cpu);

extern long rcu_batches_completed(void);
extern long rcu_batches_completed_bh(void);

+static inline void rcu_init_sched(void)
+{
+ synchronize_sched_expedited_init();
+}
+
#define rcu_enter_nohz() do { } while (0)
#define rcu_exit_nohz() do { } while (0)

diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
index 15fbb3c..46ddd78 100644
--- a/include/linux/rcupdate.h
+++ b/include/linux/rcupdate.h
@@ -51,7 +51,18 @@ struct rcu_head {
void (*func)(struct rcu_head *head);
};

-/* Internal to kernel, but needed by rcupreempt.h. */
+/* Exported common interfaces */
+extern void synchronize_rcu(void);
+extern void rcu_barrier(void);
+extern void rcu_barrier_bh(void);
+extern void rcu_barrier_sched(void);
+extern void synchronize_sched_expedited(void);
+
+/* Internal to kernel */
+extern void rcu_init(void);
+extern void rcu_scheduler_starting(void);
+extern void synchronize_sched_expedited_init(void);
+extern int rcu_needs_cpu(int cpu);
extern int rcu_scheduler_active;

#if defined(CONFIG_CLASSIC_RCU)
@@ -259,15 +270,4 @@ extern void call_rcu(struct rcu_head *head,
extern void call_rcu_bh(struct rcu_head *head,
void (*func)(struct rcu_head *head));

-/* Exported common interfaces */
-extern void synchronize_rcu(void);
-extern void rcu_barrier(void);
-extern void rcu_barrier_bh(void);
-extern void rcu_barrier_sched(void);
-
-/* Internal to kernel */
-extern void rcu_init(void);
-extern void rcu_scheduler_starting(void);
-extern int rcu_needs_cpu(int cpu);
-
#endif /* __LINUX_RCUPDATE_H */
diff --git a/include/linux/rcupreempt.h b/include/linux/rcupreempt.h
index fce5227..78117ed 100644
--- a/include/linux/rcupreempt.h
+++ b/include/linux/rcupreempt.h
@@ -74,6 +74,16 @@ extern int rcu_needs_cpu(int cpu);

extern void __synchronize_sched(void);

+static inline void synchronize_rcu_expedited(void)
+{
+ synchronize_rcu(); /* Placeholder for new rcupreempt implementation. */
+}
+
+static inline void synchronize_rcu_bh_expedited(void)
+{
+ synchronize_rcu(); /* Placeholder for new rcupreempt implementation. */
+}
+
extern void __rcu_init(void);
extern void rcu_init_sched(void);
extern void rcu_check_callbacks(int cpu, int user);
diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h
index 58b2aa5..7b533ec 100644
--- a/include/linux/rcutree.h
+++ b/include/linux/rcutree.h
@@ -279,8 +279,14 @@ static inline void __rcu_read_unlock_bh(void)

#define call_rcu_sched(head, func) call_rcu(head, func)

-static inline void rcu_init_sched(void)
+static inline void synchronize_rcu_expedited(void)
+{
+ synchronize_sched_expedited();
+}
+
+static inline void synchronize_rcu_bh_expedited(void)
{
+ synchronize_sched_expedited();
}

extern void __rcu_init(void);
@@ -290,6 +296,11 @@ extern void rcu_restart_cpu(int cpu);
extern long rcu_batches_completed(void);
extern long rcu_batches_completed_bh(void);

+static inline void rcu_init_sched(void)
+{
+ synchronize_sched_expedited_init();
+}
+
#ifdef CONFIG_NO_HZ
void rcu_enter_nohz(void);
void rcu_exit_nohz(void);
diff --git a/kernel/rcupdate.c b/kernel/rcupdate.c
index a967c9f..d82d4f9 100644
--- a/kernel/rcupdate.c
+++ b/kernel/rcupdate.c
@@ -45,6 +45,8 @@
#include <linux/mutex.h>
#include <linux/module.h>
#include <linux/kernel_stat.h>
+#include <linux/delay.h>
+#include <linux/kthread.h>

enum rcu_barrier {
RCU_BARRIER_STD,
@@ -98,6 +100,30 @@ void synchronize_rcu(void)
}
EXPORT_SYMBOL_GPL(synchronize_rcu);

+/**
+ * synchronize_rcu_bh - wait until an rcu_bh grace period has elapsed.
+ *
+ * Control will return to the caller some time after a full rcu_bh grace
+ * period has elapsed, in other words after all currently executing rcu_bh
+ * read-side critical sections have completed. RCU read-side critical
+ * sections are delimited by rcu_read_lock_bh() and rcu_read_unlock_bh(),
+ * and may be nested.
+ */
+void synchronize_rcu_bh(void)
+{
+ struct rcu_synchronize rcu;
+
+ if (rcu_blocking_is_gp())
+ return;
+
+ init_completion(&rcu.completion);
+ /* Will wake me after RCU finished. */
+ call_rcu_bh(&rcu.head, wakeme_after_rcu);
+ /* Wait for it. */
+ wait_for_completion(&rcu.completion);
+}
+EXPORT_SYMBOL_GPL(synchronize_rcu_bh);
+
static void rcu_barrier_callback(struct rcu_head *notused)
{
if (atomic_dec_and_test(&rcu_barrier_cpu_count))
@@ -217,6 +243,83 @@ static int __cpuinit rcu_barrier_cpu_hotplug(struct notifier_block *self,
return NOTIFY_OK;
}

+
+#ifndef CONFIG_SMP
+
+void __init synchronize_sched_expedited_init(void)
+{
+}
+
+void synchronize_sched_expedited(void)
+{
+}
+EXPORT_SYMBOL_GPL(synchronize_sched_expedited);
+
+#else /* #ifndef CONFIG_SMP */
+
+static DEFINE_MUTEX(rcu_sched_expedited_mutex);
+static DECLARE_WAIT_QUEUE_HEAD(need_sched_expedited_wq);
+static DECLARE_WAIT_QUEUE_HEAD(sched_expedited_done_wq);
+static int need_sched_expedited;
+static int sched_expedited_done;
+static struct task_struct *krcu_sched_expedited_task;
+
+/*
+ * Kernel thread that processes synchronize_sched_expedited() requests.
+ * This is implemented as a separate kernel thread to avoid the need
+ * to mess with other tasks' cpumasks.
+ */
+static int krcu_sched_expedited(void *arg)
+{
+ int cpu;
+
+ do {
+ wait_event(need_sched_expedited_wq, need_sched_expedited);
+ need_sched_expedited = 0;
+ get_online_cpus();
+ for_each_online_cpu(cpu) {
+ sched_setaffinity(0, &cpumask_of_cpu(cpu));
+ schedule();
+ }
+ put_online_cpus();
+ sched_expedited_done = 1;
+ wake_up(&sched_expedited_done_wq);
+ } while (!kthread_should_stop());
+ return 0;
+}
+
+/*
+ * Late-boot initialization for synchronize_sched_expedited().
+ * The scheduler must be running before this can be called.
+ */
+void __init synchronize_sched_expedited_init(void)
+{
+ krcu_sched_expedited_task = kthread_run(krcu_sched_expedited,
+ NULL,
+ "krcu_sched_expedited");
+ WARN_ON(IS_ERR(krcu_sched_expedited_task));
+}
+
+void synchronize_sched_expedited(void)
+{
+ /* If there is only one CPU, we are done. */
+ if (num_online_cpus() == 1) {
+ mutex_unlock(&rcu_sched_expedited_mutex);
+ return;
+ }
+
+ /* Multiple CPUs, make krcu_sched_expedited() sequence through them. */
+ mutex_lock(&rcu_sched_expedited_mutex);
+ need_sched_expedited = 1;
+ wake_up(&need_sched_expedited_wq);
+ wait_event(sched_expedited_done_wq, sched_expedited_done);
+ sched_expedited_done = 0;
+ mutex_unlock(&rcu_sched_expedited_mutex);
+}
+EXPORT_SYMBOL_GPL(synchronize_sched_expedited);
+
+#endif /* #else #ifndef CONFIG_SMP */
+
void __init rcu_init(void)
{
__rcu_init();
diff --git a/kernel/rcupreempt.c b/kernel/rcupreempt.c
index ce97a4d..4485758 100644
--- a/kernel/rcupreempt.c
+++ b/kernel/rcupreempt.c
@@ -1507,6 +1507,7 @@ void __init rcu_init_sched(void)
NULL,
"rcu_sched_grace_period");
WARN_ON(IS_ERR(rcu_sched_grace_period_task));
+ synchronize_sched_expedited_init();
}

#ifdef CONFIG_RCU_TRACE
diff --git a/kernel/rcutorture.c b/kernel/rcutorture.c
index 9b4a975..e8e4b8a 100644
--- a/kernel/rcutorture.c
+++ b/kernel/rcutorture.c
@@ -257,14 +257,14 @@ struct rcu_torture_ops {
void (*init)(void);
void (*cleanup)(void);
int (*readlock)(void);
- void (*readdelay)(struct rcu_random_state *rrsp);
+ void (*read_delay)(struct rcu_random_state *rrsp);
void (*readunlock)(int idx);
int (*completed)(void);
- void (*deferredfree)(struct rcu_torture *p);
+ void (*deferred_free)(struct rcu_torture *p);
void (*sync)(void);
void (*cb_barrier)(void);
int (*stats)(char *page);
- int irqcapable;
+ int irq_capable;
char *name;
};
static struct rcu_torture_ops *cur_ops = NULL;
@@ -320,7 +320,7 @@ rcu_torture_cb(struct rcu_head *p)
rp->rtort_mbtest = 0;
rcu_torture_free(rp);
} else
- cur_ops->deferredfree(rp);
+ cur_ops->deferred_free(rp);
}

static void rcu_torture_deferred_free(struct rcu_torture *p)
@@ -329,18 +329,18 @@ static void rcu_torture_deferred_free(struct rcu_torture *p)
}

static struct rcu_torture_ops rcu_ops = {
- .init = NULL,
- .cleanup = NULL,
- .readlock = rcu_torture_read_lock,
- .readdelay = rcu_read_delay,
- .readunlock = rcu_torture_read_unlock,
- .completed = rcu_torture_completed,
- .deferredfree = rcu_torture_deferred_free,
- .sync = synchronize_rcu,
- .cb_barrier = rcu_barrier,
- .stats = NULL,
- .irqcapable = 1,
- .name = "rcu"
+ .init = NULL,
+ .cleanup = NULL,
+ .readlock = rcu_torture_read_lock,
+ .read_delay = rcu_read_delay,
+ .readunlock = rcu_torture_read_unlock,
+ .completed = rcu_torture_completed,
+ .deferred_free = rcu_torture_deferred_free,
+ .sync = synchronize_rcu,
+ .cb_barrier = rcu_barrier,
+ .stats = NULL,
+ .irq_capable = 1,
+ .name = "rcu"
};

static void rcu_sync_torture_deferred_free(struct rcu_torture *p)
@@ -370,18 +370,18 @@ static void rcu_sync_torture_init(void)
}

static struct rcu_torture_ops rcu_sync_ops = {
- .init = rcu_sync_torture_init,
- .cleanup = NULL,
- .readlock = rcu_torture_read_lock,
- .readdelay = rcu_read_delay,
- .readunlock = rcu_torture_read_unlock,
- .completed = rcu_torture_completed,
- .deferredfree = rcu_sync_torture_deferred_free,
- .sync = synchronize_rcu,
- .cb_barrier = NULL,
- .stats = NULL,
- .irqcapable = 1,
- .name = "rcu_sync"
+ .init = rcu_sync_torture_init,
+ .cleanup = NULL,
+ .readlock = rcu_torture_read_lock,
+ .read_delay = rcu_read_delay,
+ .readunlock = rcu_torture_read_unlock,
+ .completed = rcu_torture_completed,
+ .deferred_free = rcu_sync_torture_deferred_free,
+ .sync = synchronize_rcu,
+ .cb_barrier = NULL,
+ .stats = NULL,
+ .irq_capable = 1,
+ .name = "rcu_sync"
};

/*
@@ -432,33 +432,33 @@ static void rcu_bh_torture_synchronize(void)
}

static struct rcu_torture_ops rcu_bh_ops = {
- .init = NULL,
- .cleanup = NULL,
- .readlock = rcu_bh_torture_read_lock,
- .readdelay = rcu_read_delay, /* just reuse rcu's version. */
- .readunlock = rcu_bh_torture_read_unlock,
- .completed = rcu_bh_torture_completed,
- .deferredfree = rcu_bh_torture_deferred_free,
- .sync = rcu_bh_torture_synchronize,
- .cb_barrier = rcu_barrier_bh,
- .stats = NULL,
- .irqcapable = 1,
- .name = "rcu_bh"
+ .init = NULL,
+ .cleanup = NULL,
+ .readlock = rcu_bh_torture_read_lock,
+ .read_delay = rcu_read_delay, /* just reuse rcu's version. */
+ .readunlock = rcu_bh_torture_read_unlock,
+ .completed = rcu_bh_torture_completed,
+ .deferred_free = rcu_bh_torture_deferred_free,
+ .sync = rcu_bh_torture_synchronize,
+ .cb_barrier = rcu_barrier_bh,
+ .stats = NULL,
+ .irq_capable = 1,
+ .name = "rcu_bh"
};

static struct rcu_torture_ops rcu_bh_sync_ops = {
- .init = rcu_sync_torture_init,
- .cleanup = NULL,
- .readlock = rcu_bh_torture_read_lock,
- .readdelay = rcu_read_delay, /* just reuse rcu's version. */
- .readunlock = rcu_bh_torture_read_unlock,
- .completed = rcu_bh_torture_completed,
- .deferredfree = rcu_sync_torture_deferred_free,
- .sync = rcu_bh_torture_synchronize,
- .cb_barrier = NULL,
- .stats = NULL,
- .irqcapable = 1,
- .name = "rcu_bh_sync"
+ .init = rcu_sync_torture_init,
+ .cleanup = NULL,
+ .readlock = rcu_bh_torture_read_lock,
+ .read_delay = rcu_read_delay, /* just reuse rcu's version. */
+ .readunlock = rcu_bh_torture_read_unlock,
+ .completed = rcu_bh_torture_completed,
+ .deferred_free = rcu_sync_torture_deferred_free,
+ .sync = rcu_bh_torture_synchronize,
+ .cb_barrier = NULL,
+ .stats = NULL,
+ .irq_capable = 1,
+ .name = "rcu_bh_sync"
};

/*
@@ -530,17 +530,17 @@ static int srcu_torture_stats(char *page)
}

static struct rcu_torture_ops srcu_ops = {
- .init = srcu_torture_init,
- .cleanup = srcu_torture_cleanup,
- .readlock = srcu_torture_read_lock,
- .readdelay = srcu_read_delay,
- .readunlock = srcu_torture_read_unlock,
- .completed = srcu_torture_completed,
- .deferredfree = rcu_sync_torture_deferred_free,
- .sync = srcu_torture_synchronize,
- .cb_barrier = NULL,
- .stats = srcu_torture_stats,
- .name = "srcu"
+ .init = srcu_torture_init,
+ .cleanup = srcu_torture_cleanup,
+ .readlock = srcu_torture_read_lock,
+ .read_delay = srcu_read_delay,
+ .readunlock = srcu_torture_read_unlock,
+ .completed = srcu_torture_completed,
+ .deferred_free = rcu_sync_torture_deferred_free,
+ .sync = srcu_torture_synchronize,
+ .cb_barrier = NULL,
+ .stats = srcu_torture_stats,
+ .name = "srcu"
};

/*
@@ -574,32 +574,47 @@ static void sched_torture_synchronize(void)
}

static struct rcu_torture_ops sched_ops = {
- .init = rcu_sync_torture_init,
- .cleanup = NULL,
- .readlock = sched_torture_read_lock,
- .readdelay = rcu_read_delay, /* just reuse rcu's version. */
- .readunlock = sched_torture_read_unlock,
- .completed = sched_torture_completed,
- .deferredfree = rcu_sched_torture_deferred_free,
- .sync = sched_torture_synchronize,
- .cb_barrier = rcu_barrier_sched,
- .stats = NULL,
- .irqcapable = 1,
- .name = "sched"
+ .init = rcu_sync_torture_init,
+ .cleanup = NULL,
+ .readlock = sched_torture_read_lock,
+ .read_delay = rcu_read_delay, /* just reuse rcu's version. */
+ .readunlock = sched_torture_read_unlock,
+ .completed = sched_torture_completed,
+ .deferred_free = rcu_sched_torture_deferred_free,
+ .sync = sched_torture_synchronize,
+ .cb_barrier = rcu_barrier_sched,
+ .stats = NULL,
+ .irq_capable = 1,
+ .name = "sched"
};

static struct rcu_torture_ops sched_ops_sync = {
- .init = rcu_sync_torture_init,
- .cleanup = NULL,
- .readlock = sched_torture_read_lock,
- .readdelay = rcu_read_delay, /* just reuse rcu's version. */
- .readunlock = sched_torture_read_unlock,
- .completed = sched_torture_completed,
- .deferredfree = rcu_sync_torture_deferred_free,
- .sync = sched_torture_synchronize,
- .cb_barrier = NULL,
- .stats = NULL,
- .name = "sched_sync"
+ .init = rcu_sync_torture_init,
+ .cleanup = NULL,
+ .readlock = sched_torture_read_lock,
+ .read_delay = rcu_read_delay, /* just reuse rcu's version. */
+ .readunlock = sched_torture_read_unlock,
+ .completed = sched_torture_completed,
+ .deferred_free = rcu_sync_torture_deferred_free,
+ .sync = sched_torture_synchronize,
+ .cb_barrier = NULL,
+ .stats = NULL,
+ .name = "sched_sync"
+};
+
+static struct rcu_torture_ops sched_expedited_ops = {
+ .init = rcu_sync_torture_init,
+ .cleanup = NULL,
+ .readlock = sched_torture_read_lock,
+ .read_delay = rcu_read_delay, /* just reuse rcu's version. */
+ .readunlock = sched_torture_read_unlock,
+ .completed = sched_torture_completed,
+ .deferred_free = rcu_sched_torture_deferred_free,
+ .sync = synchronize_sched_expedited,
+ .cb_barrier = NULL,
+ .stats = NULL,
+ .irq_capable = 1,
+ .name = "sched_expedited"
};

/*
@@ -635,7 +650,7 @@ rcu_torture_writer(void *arg)
i = RCU_TORTURE_PIPE_LEN;
atomic_inc(&rcu_torture_wcount[i]);
old_rp->rtort_pipe_count++;
- cur_ops->deferredfree(old_rp);
+ cur_ops->deferred_free(old_rp);
}
rcu_torture_current_version++;
oldbatch = cur_ops->completed();
@@ -700,7 +715,7 @@ static void rcu_torture_timer(unsigned long unused)
if (p->rtort_mbtest == 0)
atomic_inc(&n_rcu_torture_mberror);
spin_lock(&rand_lock);
- cur_ops->readdelay(&rand);
+ cur_ops->read_delay(&rand);
n_rcu_torture_timers++;
spin_unlock(&rand_lock);
preempt_disable();
@@ -738,11 +753,11 @@ rcu_torture_reader(void *arg)

VERBOSE_PRINTK_STRING("rcu_torture_reader task started");
set_user_nice(current, 19);
- if (irqreader && cur_ops->irqcapable)
+ if (irqreader && cur_ops->irq_capable)
setup_timer_on_stack(&t, rcu_torture_timer, 0);

do {
- if (irqreader && cur_ops->irqcapable) {
+ if (irqreader && cur_ops->irq_capable) {
if (!timer_pending(&t))
mod_timer(&t, 1);
}
@@ -757,7 +772,7 @@ rcu_torture_reader(void *arg)
}
if (p->rtort_mbtest == 0)
atomic_inc(&n_rcu_torture_mberror);
- cur_ops->readdelay(&rand);
+ cur_ops->read_delay(&rand);
preempt_disable();
pipe_count = p->rtort_pipe_count;
if (pipe_count > RCU_TORTURE_PIPE_LEN) {
@@ -778,7 +793,7 @@ rcu_torture_reader(void *arg)
} while (!kthread_should_stop() && fullstop == FULLSTOP_DONTSTOP);
VERBOSE_PRINTK_STRING("rcu_torture_reader task stopping");
rcutorture_shutdown_absorb("rcu_torture_reader");
- if (irqreader && cur_ops->irqcapable)
+ if (irqreader && cur_ops->irq_capable)
del_timer_sync(&t);
while (!kthread_should_stop())
schedule_timeout_uninterruptible(1);
@@ -1078,6 +1093,7 @@ rcu_torture_init(void)
int firsterr = 0;
static struct rcu_torture_ops *torture_ops[] =
{ &rcu_ops, &rcu_sync_ops, &rcu_bh_ops, &rcu_bh_sync_ops,
+ &sched_expedited_ops,
&srcu_ops, &sched_ops, &sched_ops_sync, };

mutex_lock(&fullstop_mutex);
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/