Re: [RFC][PATCH 12/13] stop_machine: Remove lglock

From: Peter Zijlstra
Date: Wed Jun 24 2015 - 12:42:23 EST


On Wed, Jun 24, 2015 at 09:09:04AM -0700, Paul E. McKenney wrote:
> Yes, good point, that would be a way of speeding the existing polling
> loop up in the case where the polling loop took longer than a normal
> grace period. Might also be a way to speed up the new "polling" regime,
> but I am still beating up the counters. ;-)
>
> But if the mutex serializes everything unconditionally, then you have
> already potentially waited for several grace periods worth of time
> before you get a chance to check the ticket, so the check doesn't help.
> Or am I missing something subtle here?

Observe gpnum before you acquire the mutex, once you get it, check it
against completed, if you've waited long enough, bail.

The thing is, once you start bailing on this condition your 'queue'
drains very fast and this is around the same time sync_rcu() would've
released the waiters too.

Furthermore, until this point we can have 'slow' progress by kicking the
CPUs.

That said, the all cpus concurrent sync_rcu_expedited scenario is
absolutely horrid, its everyone spray everyone else.

> It looks like I do need to use smp_call_function_single() and your
> resched_cpu() because calling stop_one_cpu() sequentially is about
> twice as slow as try_stop_cpus() in rcutorture runs of up to 16 CPUs.
> But either way, your point about not stopping all the CPUs does hold.

Bah, I was afraid of that, the problem is that we wait for the
individual stop_work to complete before sending another.

The below is getting a little out of hand, but should avoid the problem
and might be easier than getting the IPI think going, but who knows.

---
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -103,6 +103,7 @@ struct rcu_state sname##_state = { \
.orphan_nxttail = &sname##_state.orphan_nxtlist, \
.orphan_donetail = &sname##_state.orphan_donelist, \
.barrier_mutex = __MUTEX_INITIALIZER(sname##_state.barrier_mutex), \
+ .expedited_mutex = __MUTEX_INITIALIZER(sname##_state.expedited_mutex), \
.name = RCU_STATE_NAME(sname), \
.abbr = sabbr, \
}
@@ -3253,23 +3254,28 @@ void cond_synchronize_rcu(unsigned long
}
EXPORT_SYMBOL_GPL(cond_synchronize_rcu);

+struct exp_stop_state {
+ wait_queue_head_t *wq;
+ atomic_t count;
+};
+
static int synchronize_sched_expedited_cpu_stop(void *data)
{
+ struct exp_stop_state *ess = data;
+
/*
* There must be a full memory barrier on each affected CPU
* between the time that try_stop_cpus() is called and the
* time that it returns.
- *
- * In the current initial implementation of cpu_stop, the
- * above condition is already met when the control reaches
- * this point and the following smp_mb() is not strictly
- * necessary. Do smp_mb() anyway for documentation and
- * robustness against future implementation changes.
*/
- smp_mb(); /* See above comment block. */
+ if (atomic_dec_and_test(&ess->count))
+ wake_up(ess->wq);
+
return 0;
}

+static DEFINE_PER_CPU(struct cpu_stop_work, exp_stop_work);
+
/**
* synchronize_sched_expedited - Brute-force RCU-sched grace period
*
@@ -3304,12 +3310,11 @@ static int synchronize_sched_expedited_c
*/
void synchronize_sched_expedited(void)
{
- cpumask_var_t cm;
- bool cma = false;
- int cpu;
- long firstsnap, s, snap;
- int trycount = 0;
+ DECLARE_WAIT_QUEUE_HEAD_ONSTACK(stop_wait);
+ struct exp_stop_state ess = { .wq = &stop_wait, };
struct rcu_state *rsp = &rcu_sched_state;
+ long s, snap;
+ int cpu;

/*
* If we are in danger of counter wrap, just do synchronize_sched().
@@ -3332,7 +3337,6 @@ void synchronize_sched_expedited(void)
* full memory barrier.
*/
snap = atomic_long_inc_return(&rsp->expedited_start);
- firstsnap = snap;
if (!try_get_online_cpus()) {
/* CPU hotplug operation in flight, fall back to normal GP. */
wait_rcu_gp(call_rcu_sched);
@@ -3341,82 +3345,44 @@ void synchronize_sched_expedited(void)
}
WARN_ON_ONCE(cpu_is_offline(raw_smp_processor_id()));

- /* Offline CPUs, idle CPUs, and any CPU we run on are quiescent. */
- cma = zalloc_cpumask_var(&cm, GFP_KERNEL);
- if (cma) {
- cpumask_copy(cm, cpu_online_mask);
- cpumask_clear_cpu(raw_smp_processor_id(), cm);
- for_each_cpu(cpu, cm) {
- struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu);
-
- if (!(atomic_add_return(0, &rdtp->dynticks) & 0x1))
- cpumask_clear_cpu(cpu, cm);
- }
- if (cpumask_weight(cm) == 0)
- goto all_cpus_idle;
- }
-
/*
* Each pass through the following loop attempts to force a
* context switch on each CPU.
*/
- while (try_stop_cpus(cma ? cm : cpu_online_mask,
- synchronize_sched_expedited_cpu_stop,
- NULL) == -EAGAIN) {
- put_online_cpus();
- atomic_long_inc(&rsp->expedited_tryfail);
+ mutex_lock(&rsp->expedited_mutex);

- /* Check to see if someone else did our work for us. */
- s = atomic_long_read(&rsp->expedited_done);
- if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) {
- /* ensure test happens before caller kfree */
- smp_mb__before_atomic(); /* ^^^ */
- atomic_long_inc(&rsp->expedited_workdone1);
- free_cpumask_var(cm);
- return;
- }
+ /*
+ * Check to see if someone else did our work for us, while we were
+ * waiting for the mutex.
+ */
+ s = atomic_long_read(&rsp->expedited_done);
+ if (ULONG_CMP_GE((ulong)s, (ulong)snap)) {
+ /* ensure test happens before caller kfree */
+ smp_mb__before_atomic(); /* ^^^ */
+ atomic_long_inc(&rsp->expedited_workdone1);
+ goto unlock;
+ }

- /* No joy, try again later. Or just synchronize_sched(). */
- if (trycount++ < 10) {
- udelay(trycount * num_online_cpus());
- } else {
- wait_rcu_gp(call_rcu_sched);
- atomic_long_inc(&rsp->expedited_normal);
- free_cpumask_var(cm);
- return;
- }
+ /* Stop each CPU that is online, non-idle, and not us. */
+ for_each_online_cpu(cpu) {
+ struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu);

- /* Recheck to see if someone else did our work for us. */
- s = atomic_long_read(&rsp->expedited_done);
- if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) {
- /* ensure test happens before caller kfree */
- smp_mb__before_atomic(); /* ^^^ */
- atomic_long_inc(&rsp->expedited_workdone2);
- free_cpumask_var(cm);
- return;
- }
+ /* Skip our CPU, */
+ if (raw_smp_processor_id() == cpu)
+ continue;

- /*
- * Refetching sync_sched_expedited_started allows later
- * callers to piggyback on our grace period. We retry
- * after they started, so our grace period works for them,
- * and they started after our first try, so their grace
- * period works for us.
- */
- if (!try_get_online_cpus()) {
- /* CPU hotplug operation in flight, use normal GP. */
- wait_rcu_gp(call_rcu_sched);
- atomic_long_inc(&rsp->expedited_normal);
- free_cpumask_var(cm);
- return;
- }
- snap = atomic_long_read(&rsp->expedited_start);
- smp_mb(); /* ensure read is before try_stop_cpus(). */
+ /* and any idle CPUs. */
+ if (!(atomic_add_return(0, &rdtp->dynticks) & 0x1))
+ continue;
+
+ atomic_inc(&ess.count);
+ stop_one_cpu_nowait(cpu, synchronize_sched_expedited_cpu_stop, ,
+ &per_cpu(exp_stop_work, cpu));
}
- atomic_long_inc(&rsp->expedited_stoppedcpus);

-all_cpus_idle:
- free_cpumask_var(cm);
+ wait_event(ess.wq, !atomic_read(&ess.count));
+
+ atomic_long_inc(&rsp->expedited_stoppedcpus);

/*
* Everyone up to our most recent fetch is covered by our grace
@@ -3435,6 +3401,8 @@ void synchronize_sched_expedited(void)
}
} while (atomic_long_cmpxchg(&rsp->expedited_done, s, snap) != s);
atomic_long_inc(&rsp->expedited_done_exit);
+unlock:
+ mutex_unlock(&rsp->expedited_mutex);

put_online_cpus();
}
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -483,6 +483,7 @@ struct rcu_state {
/* _rcu_barrier(). */
/* End of fields guarded by barrier_mutex. */

+ struct mutex expedited_mutex; /* Serializes expediting. */
atomic_long_t expedited_start; /* Starting ticket. */
atomic_long_t expedited_done; /* Done ticket. */
atomic_long_t expedited_wrap; /* # near-wrap incidents. */
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/