Re: [RFC][PATCH 12/13] stop_machine: Remove lglock

From: Peter Zijlstra
Date: Wed Jun 24 2015 - 14:15:00 EST


On Wed, Jun 24, 2015 at 07:28:18PM +0200, Peter Zijlstra wrote:
> How about something like this, it replaced mutex and start/done ticket
> thing with an MCS style lockless FIFO queue.
>
> I further uses the gpnum/completed thing to short circuit things if
> we've waited long enough.

Prettier version

--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -3253,23 +3253,41 @@ void cond_synchronize_rcu(unsigned long
}
EXPORT_SYMBOL_GPL(cond_synchronize_rcu);

+struct expedited_task_state {
+ struct expedited_task_state *next;
+ struct task_struct *task;
+ atomic_t count;
+ int done;
+};
+
static int synchronize_sched_expedited_cpu_stop(void *data)
{
+ struct expedited_task_state *ets = data;
+
/*
* There must be a full memory barrier on each affected CPU
* between the time that try_stop_cpus() is called and the
* time that it returns.
- *
- * In the current initial implementation of cpu_stop, the
- * above condition is already met when the control reaches
- * this point and the following smp_mb() is not strictly
- * necessary. Do smp_mb() anyway for documentation and
- * robustness against future implementation changes.
*/
- smp_mb(); /* See above comment block. */
+ if (atomic_dec_and_test(&ets->count))
+ wake_up_process(ets->task);
+
return 0;
}

+static DEFINE_PER_CPU(struct cpu_stop_work, exp_stop_work);
+
+#define current_wait(cond) \
+do { \
+ for (;;) { \
+ set_current_state(TASK_UNINTERRUPTIBLE); \
+ if (cond) \
+ break; \
+ schedule(); \
+ } \
+ __set_current_state(TASK_RUNNING); \
+} while (0)
+
/**
* synchronize_sched_expedited - Brute-force RCU-sched grace period
*
@@ -3304,138 +3322,71 @@ static int synchronize_sched_expedited_c
*/
void synchronize_sched_expedited(void)
{
- cpumask_var_t cm;
- bool cma = false;
- int cpu;
- long firstsnap, s, snap;
- int trycount = 0;
struct rcu_state *rsp = &rcu_sched_state;
+ struct expedited_task_state *prev, *next, entry = {
+ .task = current,
+ .count = ATOMIC_INIT(1), /* avoid spurious wakeups */
+ };
+ long gpnum;
+ int cpu;

- /*
- * If we are in danger of counter wrap, just do synchronize_sched().
- * By allowing sync_sched_expedited_started to advance no more than
- * ULONG_MAX/8 ahead of sync_sched_expedited_done, we are ensuring
- * that more than 3.5 billion CPUs would be required to force a
- * counter wrap on a 32-bit system. Quite a few more CPUs would of
- * course be required on a 64-bit system.
- */
- if (ULONG_CMP_GE((ulong)atomic_long_read(&rsp->expedited_start),
- (ulong)atomic_long_read(&rsp->expedited_done) +
- ULONG_MAX / 8)) {
- wait_rcu_gp(call_rcu_sched);
- atomic_long_inc(&rsp->expedited_wrap);
- return;
- }
-
- /*
- * Take a ticket. Note that atomic_inc_return() implies a
- * full memory barrier.
- */
- snap = atomic_long_inc_return(&rsp->expedited_start);
- firstsnap = snap;
if (!try_get_online_cpus()) {
/* CPU hotplug operation in flight, fall back to normal GP. */
wait_rcu_gp(call_rcu_sched);
- atomic_long_inc(&rsp->expedited_normal);
return;
}
WARN_ON_ONCE(cpu_is_offline(raw_smp_processor_id()));

- /* Offline CPUs, idle CPUs, and any CPU we run on are quiescent. */
- cma = zalloc_cpumask_var(&cm, GFP_KERNEL);
- if (cma) {
- cpumask_copy(cm, cpu_online_mask);
- cpumask_clear_cpu(raw_smp_processor_id(), cm);
- for_each_cpu(cpu, cm) {
- struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu);
-
- if (!(atomic_add_return(0, &rdtp->dynticks) & 0x1))
- cpumask_clear_cpu(cpu, cm);
- }
- if (cpumask_weight(cm) == 0)
- goto all_cpus_idle;
+ smp_mb();
+ gpnum = smp_load_acquire(&rsp->gpnum);
+
+ /* MCS style queue 'lock' */
+ prev = xchg(&rsp->expedited_queue, &entry);
+ if (prev) {
+ WRITE_ONCE(prev->next, &entry);
+ current_wait(smp_load_acquire(&entry.done));
}

/*
- * Each pass through the following loop attempts to force a
- * context switch on each CPU.
+ * Check to see if someone else did our work for us, while we were
+ * waiting on the queue.
*/
- while (try_stop_cpus(cma ? cm : cpu_online_mask,
- synchronize_sched_expedited_cpu_stop,
- NULL) == -EAGAIN) {
- put_online_cpus();
- atomic_long_inc(&rsp->expedited_tryfail);
-
- /* Check to see if someone else did our work for us. */
- s = atomic_long_read(&rsp->expedited_done);
- if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) {
- /* ensure test happens before caller kfree */
- smp_mb__before_atomic(); /* ^^^ */
- atomic_long_inc(&rsp->expedited_workdone1);
- free_cpumask_var(cm);
- return;
- }
-
- /* No joy, try again later. Or just synchronize_sched(). */
- if (trycount++ < 10) {
- udelay(trycount * num_online_cpus());
- } else {
- wait_rcu_gp(call_rcu_sched);
- atomic_long_inc(&rsp->expedited_normal);
- free_cpumask_var(cm);
- return;
- }
-
- /* Recheck to see if someone else did our work for us. */
- s = atomic_long_read(&rsp->expedited_done);
- if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) {
- /* ensure test happens before caller kfree */
- smp_mb__before_atomic(); /* ^^^ */
- atomic_long_inc(&rsp->expedited_workdone2);
- free_cpumask_var(cm);
- return;
- }
-
- /*
- * Refetching sync_sched_expedited_started allows later
- * callers to piggyback on our grace period. We retry
- * after they started, so our grace period works for them,
- * and they started after our first try, so their grace
- * period works for us.
- */
- if (!try_get_online_cpus()) {
- /* CPU hotplug operation in flight, use normal GP. */
- wait_rcu_gp(call_rcu_sched);
- atomic_long_inc(&rsp->expedited_normal);
- free_cpumask_var(cm);
- return;
- }
- snap = atomic_long_read(&rsp->expedited_start);
- smp_mb(); /* ensure read is before try_stop_cpus(). */
+ if (ULONG_CMP_LT(gpnum, smp_load_acquire(&rsp->completed)))
+ goto unlock;
+
+ /* Stop each CPU that is online, non-idle, and not us. */
+ for_each_online_cpu(cpu) {
+ struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu);
+
+ /* Skip our CPU, */
+ if (raw_smp_processor_id() == cpu)
+ continue;
+
+ /* and any idle CPUs. */
+ if (!(atomic_add_return(0, &rdtp->dynticks) & 0x1))
+ continue;
+
+ atomic_inc(&entry.count);
+ stop_one_cpu_nowait(cpu, synchronize_sched_expedited_cpu_stop,
+ &entry, &per_cpu(exp_stop_work, cpu));
}
- atomic_long_inc(&rsp->expedited_stoppedcpus);

-all_cpus_idle:
- free_cpumask_var(cm);
+ atomic_dec(&entry.count); /* let the wakeups in */
+ current_wait(!atomic_read(&entry.count));

- /*
- * Everyone up to our most recent fetch is covered by our grace
- * period. Update the counter, but only if our work is still
- * relevant -- which it won't be if someone who started later
- * than we did already did their update.
- */
- do {
- atomic_long_inc(&rsp->expedited_done_tries);
- s = atomic_long_read(&rsp->expedited_done);
- if (ULONG_CMP_GE((ulong)s, (ulong)snap)) {
- /* ensure test happens before caller kfree */
- smp_mb__before_atomic(); /* ^^^ */
- atomic_long_inc(&rsp->expedited_done_lost);
- break;
- }
- } while (atomic_long_cmpxchg(&rsp->expedited_done, s, snap) != s);
- atomic_long_inc(&rsp->expedited_done_exit);
+unlock:
+ /* MCS style queue 'unlock' */
+ next = READ_ONCE(entry.next);
+ if (!next) {
+ if (cmpxchg(&rsp->expedited_queue, &entry, NULL) == &entry)
+ goto done;
+ while (!(next = READ_ONCE(entry.next)))
+ cpu_relax();
+ }
+ smp_store_release(&next->done, 1);
+ wake_up_process(next->task);

+done:
put_online_cpus();
}
EXPORT_SYMBOL_GPL(synchronize_sched_expedited);
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -483,17 +483,7 @@ struct rcu_state {
/* _rcu_barrier(). */
/* End of fields guarded by barrier_mutex. */

- atomic_long_t expedited_start; /* Starting ticket. */
- atomic_long_t expedited_done; /* Done ticket. */
- atomic_long_t expedited_wrap; /* # near-wrap incidents. */
- atomic_long_t expedited_tryfail; /* # acquisition failures. */
- atomic_long_t expedited_workdone1; /* # done by others #1. */
- atomic_long_t expedited_workdone2; /* # done by others #2. */
- atomic_long_t expedited_normal; /* # fallbacks to normal. */
- atomic_long_t expedited_stoppedcpus; /* # successful stop_cpus. */
- atomic_long_t expedited_done_tries; /* # tries to update _done. */
- atomic_long_t expedited_done_lost; /* # times beaten to _done. */
- atomic_long_t expedited_done_exit; /* # times exited _done loop. */
+ void *expedited_queue;

unsigned long jiffies_force_qs; /* Time at which to invoke */
/* force_quiescent_state(). */
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/