Re: [PATCH 0/5] stop_machine: cleanups and fix

From: Oleg Nesterov
Date: Wed Jul 01 2015 - 15:24:21 EST


OK, let me send this patch as 6/5 in reply to 0/5. Added a couple
of helpers and the comment.

Please let me know what do you think.

On 06/30, Oleg Nesterov wrote:
>
> On 06/30, Oleg Nesterov wrote:
> >
> > On 06/30, Oleg Nesterov wrote:
> > >
> > > But let me send some cleanups first. Plus I believe I found another
> > > stop_machine bug, see the last patch. So I hope these changes make
> > > sense in any case.
> >
> > The last patch fixes the bug, I think. Say, stop_one_cpu(X) can race
> > with _cpu_down(X)->stop_machine() so that the kernel will crash if this
> > CPU X becomes online again.
> >
> > The window after cpu_stopper_thread() returns and before smpboot_thread()
> > calls ->park() is tiny, but still this is possible afaics.
>
> Now lets try to remove lglocks from stop_machine.c. See the patch at
> the end. It lacks the comments and the changelog, just for review.
>
> On top of this series, so let me show the code with this patch applied,
>
> static int cpu_stop_queue_two_works(int cpu1, struct cpu_stop_work *work1,
> int cpu2, struct cpu_stop_work *work2)
> {
> struct cpu_stopper *stopper1 = per_cpu_ptr(&cpu_stopper, cpu1);
> struct cpu_stopper *stopper2 = per_cpu_ptr(&cpu_stopper, cpu2);
> int err;
> retry:
> spin_lock_irq(&stopper1->lock);
> spin_lock_nested(&stopper2->lock, SINGLE_DEPTH_NESTING);
> err = -ENOENT;
> if (!cpu_active(cpu1) || !cpu_active(cpu2))
> goto unlock;
>
> BUG_ON(!stopper1->enabled || !stopper2->enabled);
>
> err = -EDEADLK;
> if (list_empty(&stopper1->stop_work.list) !=
> list_empty(&stopper2->stop_work.list))
> goto unlock;
>
> err = 0;
> list_add_tail(&work1->list, &stopper1->works);
> list_add_tail(&work2->list, &stopper2->works);
> wake_up_process(stopper1->thread);
> wake_up_process(stopper2->thread);
> unlock:
> spin_unlock(&stopper2->lock);
> spin_unlock_irq(&stopper1->lock);
>
> if (unlikely(err == -EDEADLK)) {
> cond_resched();
> goto retry;
> }
> return err;
> }
>
> int stop_two_cpus(unsigned int cpu1, unsigned int cpu2, cpu_stop_fn_t fn, void *arg)
> {
> struct cpu_stop_done done;
> struct cpu_stop_work work1, work2;
> struct multi_stop_data msdata = {
> .fn = fn,
> .data = arg,
> .num_threads = 2,
> .active_cpus = cpumask_of(cpu1),
> };
>
> set_state(&msdata, MULTI_STOP_PREPARE);
> cpu_stop_init_done(&done, 2);
>
> work1.fn = work2.fn = multi_cpu_stop;
> work1.arg = work2.arg = &msdata;
> work1.done = work2.done = &done;
>
> if (cpu1 > cpu2)
> swap(cpu1, cpu2);
> if (cpu_stop_queue_two_works(cpu1, &work1, cpu2, &work2))
> return -ENOENT;
>
> wait_for_completion(&done.completion);
> BUG_ON(!done.executed);
> return done.ret;
> }
>
> Note the -EDEADLK check in cpu_stop_queue_two_works(). This avoids the
> race with stop_cpus(). We need to ensure that if we race with stop_cpus()
> then either stop_cpus() wins and queues both works on these CPU's, or
> we win this race and queue both works.
>
> The "false positive" -EDEADLK can happen if we race with, say,
> stop_cpus(cpumask_of(cpu1)). But this is very unlikely (in fact it is
> always called with cpumask == cpu_online_mask), and in this case we
> need to wait anyway, so I think the "busy wait" loop can work.
>
> As for cpu_active() checks... This was copied from the current code,
> but I think they should die later. This needs another cleanup, imo
> the stopper->enabled logic should be improved.
>
> What do you think?
>
> Oleg.
>
> ------------------------------------------------------------------------------
> Subject: [PATCH] stop_machine: kill stop_cpus_lock and lg_double_lock/unlock()
>
> ---
> include/linux/lglock.h | 5 ---
> kernel/locking/lglock.c | 22 -----------
> kernel/stop_machine.c | 92 +++++++++++++++++++++++++---------------------
> 3 files changed, 50 insertions(+), 69 deletions(-)
>
> diff --git a/include/linux/lglock.h b/include/linux/lglock.h
> index c92ebd1..0081f00 100644
> --- a/include/linux/lglock.h
> +++ b/include/linux/lglock.h
> @@ -52,15 +52,10 @@ struct lglock {
> static struct lglock name = { .lock = &name ## _lock }
>
> void lg_lock_init(struct lglock *lg, char *name);
> -
> void lg_local_lock(struct lglock *lg);
> void lg_local_unlock(struct lglock *lg);
> void lg_local_lock_cpu(struct lglock *lg, int cpu);
> void lg_local_unlock_cpu(struct lglock *lg, int cpu);
> -
> -void lg_double_lock(struct lglock *lg, int cpu1, int cpu2);
> -void lg_double_unlock(struct lglock *lg, int cpu1, int cpu2);
> -
> void lg_global_lock(struct lglock *lg);
> void lg_global_unlock(struct lglock *lg);
>
> diff --git a/kernel/locking/lglock.c b/kernel/locking/lglock.c
> index 951cfcd..86ae2ae 100644
> --- a/kernel/locking/lglock.c
> +++ b/kernel/locking/lglock.c
> @@ -60,28 +60,6 @@ void lg_local_unlock_cpu(struct lglock *lg, int cpu)
> }
> EXPORT_SYMBOL(lg_local_unlock_cpu);
>
> -void lg_double_lock(struct lglock *lg, int cpu1, int cpu2)
> -{
> - BUG_ON(cpu1 == cpu2);
> -
> - /* lock in cpu order, just like lg_global_lock */
> - if (cpu2 < cpu1)
> - swap(cpu1, cpu2);
> -
> - preempt_disable();
> - lock_acquire_shared(&lg->lock_dep_map, 0, 0, NULL, _RET_IP_);
> - arch_spin_lock(per_cpu_ptr(lg->lock, cpu1));
> - arch_spin_lock(per_cpu_ptr(lg->lock, cpu2));
> -}
> -
> -void lg_double_unlock(struct lglock *lg, int cpu1, int cpu2)
> -{
> - lock_release(&lg->lock_dep_map, 1, _RET_IP_);
> - arch_spin_unlock(per_cpu_ptr(lg->lock, cpu1));
> - arch_spin_unlock(per_cpu_ptr(lg->lock, cpu2));
> - preempt_enable();
> -}
> -
> void lg_global_lock(struct lglock *lg)
> {
> int i;
> diff --git a/kernel/stop_machine.c b/kernel/stop_machine.c
> index 12484e5..d53c86c 100644
> --- a/kernel/stop_machine.c
> +++ b/kernel/stop_machine.c
> @@ -20,7 +20,6 @@
> #include <linux/kallsyms.h>
> #include <linux/smpboot.h>
> #include <linux/atomic.h>
> -#include <linux/lglock.h>
>
> /*
> * Structure to determine completion condition and record errors. May
> @@ -47,14 +46,6 @@ struct cpu_stopper {
> static DEFINE_PER_CPU(struct cpu_stopper, cpu_stopper);
> static bool stop_machine_initialized = false;
>
> -/*
> - * Avoids a race between stop_two_cpus and global stop_cpus, where
> - * the stoppers could get queued up in reverse order, leading to
> - * system deadlock. Using an lglock means stop_two_cpus remains
> - * relatively cheap.
> - */
> -DEFINE_STATIC_LGLOCK(stop_cpus_lock);
> -
> static void cpu_stop_init_done(struct cpu_stop_done *done, unsigned int nr_todo)
> {
> memset(done, 0, sizeof(*done));
> @@ -213,6 +204,42 @@ static int multi_cpu_stop(void *data)
> return err;
> }
>
> +static int cpu_stop_queue_two_works(int cpu1, struct cpu_stop_work *work1,
> + int cpu2, struct cpu_stop_work *work2)
> +{
> + struct cpu_stopper *stopper1 = per_cpu_ptr(&cpu_stopper, cpu1);
> + struct cpu_stopper *stopper2 = per_cpu_ptr(&cpu_stopper, cpu2);
> + int err;
> +retry:
> + spin_lock_irq(&stopper1->lock);
> + spin_lock_nested(&stopper2->lock, SINGLE_DEPTH_NESTING);
> + err = -ENOENT;
> + if (!cpu_active(cpu1) || !cpu_active(cpu2))
> + goto unlock;
> +
> + BUG_ON(!stopper1->enabled || !stopper2->enabled);
> +
> + err = -EDEADLK;
> + if (list_empty(&stopper1->stop_work.list) !=
> + list_empty(&stopper2->stop_work.list))
> + goto unlock;
> +
> + err = 0;
> + list_add_tail(&work1->list, &stopper1->works);
> + list_add_tail(&work2->list, &stopper2->works);
> + wake_up_process(stopper1->thread);
> + wake_up_process(stopper2->thread);
> +unlock:
> + spin_unlock(&stopper2->lock);
> + spin_unlock_irq(&stopper1->lock);
> +
> + if (unlikely(err == -EDEADLK)) {
> + cond_resched();
> + goto retry;
> + }
> + return err;
> +}
> +
> /**
> * stop_two_cpus - stops two cpus
> * @cpu1: the cpu to stop
> @@ -228,48 +255,28 @@ int stop_two_cpus(unsigned int cpu1, unsigned int cpu2, cpu_stop_fn_t fn, void *
> {
> struct cpu_stop_done done;
> struct cpu_stop_work work1, work2;
> - struct multi_stop_data msdata;
> -
> - preempt_disable();
> - msdata = (struct multi_stop_data){
> + struct multi_stop_data msdata = {
> .fn = fn,
> .data = arg,
> .num_threads = 2,
> .active_cpus = cpumask_of(cpu1),
> };
>
> - work1 = work2 = (struct cpu_stop_work){
> - .fn = multi_cpu_stop,
> - .arg = &msdata,
> - .done = &done
> - };
> -
> - cpu_stop_init_done(&done, 2);
> set_state(&msdata, MULTI_STOP_PREPARE);
> + cpu_stop_init_done(&done, 2);
>
> - /*
> - * If we observe both CPUs active we know _cpu_down() cannot yet have
> - * queued its stop_machine works and therefore ours will get executed
> - * first. Or its not either one of our CPUs that's getting unplugged,
> - * in which case we don't care.
> - *
> - * This relies on the stopper workqueues to be FIFO.
> - */
> - if (!cpu_active(cpu1) || !cpu_active(cpu2)) {
> - preempt_enable();
> - return -ENOENT;
> - }
> -
> - lg_double_lock(&stop_cpus_lock, cpu1, cpu2);
> - cpu_stop_queue_work(cpu1, &work1);
> - cpu_stop_queue_work(cpu2, &work2);
> - lg_double_unlock(&stop_cpus_lock, cpu1, cpu2);
> + work1.fn = work2.fn = multi_cpu_stop;
> + work1.arg = work2.arg = &msdata;
> + work1.done = work2.done = &done;
>
> - preempt_enable();
> + if (cpu1 > cpu2)
> + swap(cpu1, cpu2);
> + if (cpu_stop_queue_two_works(cpu1, &work1, cpu2, &work2))
> + return -ENOENT;
>
> wait_for_completion(&done.completion);
> -
> - return done.executed ? done.ret : -ENOENT;
> + BUG_ON(!done.executed);
> + return done.ret;
> }
>
> /**
> @@ -308,7 +315,7 @@ static void queue_stop_cpus_work(const struct cpumask *cpumask,
> * preempted by a stopper which might wait for other stoppers
> * to enter @fn which can lead to deadlock.
> */
> - lg_global_lock(&stop_cpus_lock);
> + preempt_disable();
> for_each_cpu(cpu, cpumask) {
> work = &per_cpu(cpu_stopper.stop_work, cpu);
> work->fn = fn;
> @@ -316,7 +323,7 @@ static void queue_stop_cpus_work(const struct cpumask *cpumask,
> work->done = done;
> cpu_stop_queue_work(cpu, work);
> }
> - lg_global_unlock(&stop_cpus_lock);
> + preempt_enable();
> }
>
> static int __stop_cpus(const struct cpumask *cpumask,
> @@ -505,6 +512,7 @@ static int __init cpu_stop_init(void)
>
> spin_lock_init(&stopper->lock);
> INIT_LIST_HEAD(&stopper->works);
> + INIT_LIST_HEAD(&stopper->stop_work.list);
> }
>
> BUG_ON(smpboot_register_percpu_thread(&cpu_stop_threads));
> --
> 1.5.5.1
>

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/