Re: [PATCH 0/5] stop_machine: cleanups and fix
From: Oleg Nesterov
Date: Tue Jun 30 2015 - 08:53:53 EST
On 06/30, Oleg Nesterov wrote:
>
> On 06/30, Oleg Nesterov wrote:
> >
> > But let me send some cleanups first. Plus I believe I found another
> > stop_machine bug, see the last patch. So I hope these changes make
> > sense in any case.
>
> The last patch fixes the bug, I think. Say, stop_one_cpu(X) can race
> with _cpu_down(X)->stop_machine() so that the kernel will crash if this
> CPU X becomes online again.
>
> The window after cpu_stopper_thread() returns and before smpboot_thread()
> calls ->park() is tiny, but still this is possible afaics.
Now lets try to remove lglocks from stop_machine.c. See the patch at
the end. It lacks the comments and the changelog, just for review.
On top of this series, so let me show the code with this patch applied,
static int cpu_stop_queue_two_works(int cpu1, struct cpu_stop_work *work1,
int cpu2, struct cpu_stop_work *work2)
{
struct cpu_stopper *stopper1 = per_cpu_ptr(&cpu_stopper, cpu1);
struct cpu_stopper *stopper2 = per_cpu_ptr(&cpu_stopper, cpu2);
int err;
retry:
spin_lock_irq(&stopper1->lock);
spin_lock_nested(&stopper2->lock, SINGLE_DEPTH_NESTING);
err = -ENOENT;
if (!cpu_active(cpu1) || !cpu_active(cpu2))
goto unlock;
BUG_ON(!stopper1->enabled || !stopper2->enabled);
err = -EDEADLK;
if (list_empty(&stopper1->stop_work.list) !=
list_empty(&stopper2->stop_work.list))
goto unlock;
err = 0;
list_add_tail(&work1->list, &stopper1->works);
list_add_tail(&work2->list, &stopper2->works);
wake_up_process(stopper1->thread);
wake_up_process(stopper2->thread);
unlock:
spin_unlock(&stopper2->lock);
spin_unlock_irq(&stopper1->lock);
if (unlikely(err == -EDEADLK)) {
cond_resched();
goto retry;
}
return err;
}
int stop_two_cpus(unsigned int cpu1, unsigned int cpu2, cpu_stop_fn_t fn, void *arg)
{
struct cpu_stop_done done;
struct cpu_stop_work work1, work2;
struct multi_stop_data msdata = {
.fn = fn,
.data = arg,
.num_threads = 2,
.active_cpus = cpumask_of(cpu1),
};
set_state(&msdata, MULTI_STOP_PREPARE);
cpu_stop_init_done(&done, 2);
work1.fn = work2.fn = multi_cpu_stop;
work1.arg = work2.arg = &msdata;
work1.done = work2.done = &done;
if (cpu1 > cpu2)
swap(cpu1, cpu2);
if (cpu_stop_queue_two_works(cpu1, &work1, cpu2, &work2))
return -ENOENT;
wait_for_completion(&done.completion);
BUG_ON(!done.executed);
return done.ret;
}
Note the -EDEADLK check in cpu_stop_queue_two_works(). This avoids the
race with stop_cpus(). We need to ensure that if we race with stop_cpus()
then either stop_cpus() wins and queues both works on these CPU's, or
we win this race and queue both works.
The "false positive" -EDEADLK can happen if we race with, say,
stop_cpus(cpumask_of(cpu1)). But this is very unlikely (in fact it is
always called with cpumask == cpu_online_mask), and in this case we
need to wait anyway, so I think the "busy wait" loop can work.
As for cpu_active() checks... This was copied from the current code,
but I think they should die later. This needs another cleanup, imo
the stopper->enabled logic should be improved.
What do you think?
Oleg.
------------------------------------------------------------------------------
Subject: [PATCH] stop_machine: kill stop_cpus_lock and lg_double_lock/unlock()
---
include/linux/lglock.h | 5 ---
kernel/locking/lglock.c | 22 -----------
kernel/stop_machine.c | 92 +++++++++++++++++++++++++---------------------
3 files changed, 50 insertions(+), 69 deletions(-)
diff --git a/include/linux/lglock.h b/include/linux/lglock.h
index c92ebd1..0081f00 100644
--- a/include/linux/lglock.h
+++ b/include/linux/lglock.h
@@ -52,15 +52,10 @@ struct lglock {
static struct lglock name = { .lock = &name ## _lock }
void lg_lock_init(struct lglock *lg, char *name);
-
void lg_local_lock(struct lglock *lg);
void lg_local_unlock(struct lglock *lg);
void lg_local_lock_cpu(struct lglock *lg, int cpu);
void lg_local_unlock_cpu(struct lglock *lg, int cpu);
-
-void lg_double_lock(struct lglock *lg, int cpu1, int cpu2);
-void lg_double_unlock(struct lglock *lg, int cpu1, int cpu2);
-
void lg_global_lock(struct lglock *lg);
void lg_global_unlock(struct lglock *lg);
diff --git a/kernel/locking/lglock.c b/kernel/locking/lglock.c
index 951cfcd..86ae2ae 100644
--- a/kernel/locking/lglock.c
+++ b/kernel/locking/lglock.c
@@ -60,28 +60,6 @@ void lg_local_unlock_cpu(struct lglock *lg, int cpu)
}
EXPORT_SYMBOL(lg_local_unlock_cpu);
-void lg_double_lock(struct lglock *lg, int cpu1, int cpu2)
-{
- BUG_ON(cpu1 == cpu2);
-
- /* lock in cpu order, just like lg_global_lock */
- if (cpu2 < cpu1)
- swap(cpu1, cpu2);
-
- preempt_disable();
- lock_acquire_shared(&lg->lock_dep_map, 0, 0, NULL, _RET_IP_);
- arch_spin_lock(per_cpu_ptr(lg->lock, cpu1));
- arch_spin_lock(per_cpu_ptr(lg->lock, cpu2));
-}
-
-void lg_double_unlock(struct lglock *lg, int cpu1, int cpu2)
-{
- lock_release(&lg->lock_dep_map, 1, _RET_IP_);
- arch_spin_unlock(per_cpu_ptr(lg->lock, cpu1));
- arch_spin_unlock(per_cpu_ptr(lg->lock, cpu2));
- preempt_enable();
-}
-
void lg_global_lock(struct lglock *lg)
{
int i;
diff --git a/kernel/stop_machine.c b/kernel/stop_machine.c
index 12484e5..d53c86c 100644
--- a/kernel/stop_machine.c
+++ b/kernel/stop_machine.c
@@ -20,7 +20,6 @@
#include <linux/kallsyms.h>
#include <linux/smpboot.h>
#include <linux/atomic.h>
-#include <linux/lglock.h>
/*
* Structure to determine completion condition and record errors. May
@@ -47,14 +46,6 @@ struct cpu_stopper {
static DEFINE_PER_CPU(struct cpu_stopper, cpu_stopper);
static bool stop_machine_initialized = false;
-/*
- * Avoids a race between stop_two_cpus and global stop_cpus, where
- * the stoppers could get queued up in reverse order, leading to
- * system deadlock. Using an lglock means stop_two_cpus remains
- * relatively cheap.
- */
-DEFINE_STATIC_LGLOCK(stop_cpus_lock);
-
static void cpu_stop_init_done(struct cpu_stop_done *done, unsigned int nr_todo)
{
memset(done, 0, sizeof(*done));
@@ -213,6 +204,42 @@ static int multi_cpu_stop(void *data)
return err;
}
+static int cpu_stop_queue_two_works(int cpu1, struct cpu_stop_work *work1,
+ int cpu2, struct cpu_stop_work *work2)
+{
+ struct cpu_stopper *stopper1 = per_cpu_ptr(&cpu_stopper, cpu1);
+ struct cpu_stopper *stopper2 = per_cpu_ptr(&cpu_stopper, cpu2);
+ int err;
+retry:
+ spin_lock_irq(&stopper1->lock);
+ spin_lock_nested(&stopper2->lock, SINGLE_DEPTH_NESTING);
+ err = -ENOENT;
+ if (!cpu_active(cpu1) || !cpu_active(cpu2))
+ goto unlock;
+
+ BUG_ON(!stopper1->enabled || !stopper2->enabled);
+
+ err = -EDEADLK;
+ if (list_empty(&stopper1->stop_work.list) !=
+ list_empty(&stopper2->stop_work.list))
+ goto unlock;
+
+ err = 0;
+ list_add_tail(&work1->list, &stopper1->works);
+ list_add_tail(&work2->list, &stopper2->works);
+ wake_up_process(stopper1->thread);
+ wake_up_process(stopper2->thread);
+unlock:
+ spin_unlock(&stopper2->lock);
+ spin_unlock_irq(&stopper1->lock);
+
+ if (unlikely(err == -EDEADLK)) {
+ cond_resched();
+ goto retry;
+ }
+ return err;
+}
+
/**
* stop_two_cpus - stops two cpus
* @cpu1: the cpu to stop
@@ -228,48 +255,28 @@ int stop_two_cpus(unsigned int cpu1, unsigned int cpu2, cpu_stop_fn_t fn, void *
{
struct cpu_stop_done done;
struct cpu_stop_work work1, work2;
- struct multi_stop_data msdata;
-
- preempt_disable();
- msdata = (struct multi_stop_data){
+ struct multi_stop_data msdata = {
.fn = fn,
.data = arg,
.num_threads = 2,
.active_cpus = cpumask_of(cpu1),
};
- work1 = work2 = (struct cpu_stop_work){
- .fn = multi_cpu_stop,
- .arg = &msdata,
- .done = &done
- };
-
- cpu_stop_init_done(&done, 2);
set_state(&msdata, MULTI_STOP_PREPARE);
+ cpu_stop_init_done(&done, 2);
- /*
- * If we observe both CPUs active we know _cpu_down() cannot yet have
- * queued its stop_machine works and therefore ours will get executed
- * first. Or its not either one of our CPUs that's getting unplugged,
- * in which case we don't care.
- *
- * This relies on the stopper workqueues to be FIFO.
- */
- if (!cpu_active(cpu1) || !cpu_active(cpu2)) {
- preempt_enable();
- return -ENOENT;
- }
-
- lg_double_lock(&stop_cpus_lock, cpu1, cpu2);
- cpu_stop_queue_work(cpu1, &work1);
- cpu_stop_queue_work(cpu2, &work2);
- lg_double_unlock(&stop_cpus_lock, cpu1, cpu2);
+ work1.fn = work2.fn = multi_cpu_stop;
+ work1.arg = work2.arg = &msdata;
+ work1.done = work2.done = &done;
- preempt_enable();
+ if (cpu1 > cpu2)
+ swap(cpu1, cpu2);
+ if (cpu_stop_queue_two_works(cpu1, &work1, cpu2, &work2))
+ return -ENOENT;
wait_for_completion(&done.completion);
-
- return done.executed ? done.ret : -ENOENT;
+ BUG_ON(!done.executed);
+ return done.ret;
}
/**
@@ -308,7 +315,7 @@ static void queue_stop_cpus_work(const struct cpumask *cpumask,
* preempted by a stopper which might wait for other stoppers
* to enter @fn which can lead to deadlock.
*/
- lg_global_lock(&stop_cpus_lock);
+ preempt_disable();
for_each_cpu(cpu, cpumask) {
work = &per_cpu(cpu_stopper.stop_work, cpu);
work->fn = fn;
@@ -316,7 +323,7 @@ static void queue_stop_cpus_work(const struct cpumask *cpumask,
work->done = done;
cpu_stop_queue_work(cpu, work);
}
- lg_global_unlock(&stop_cpus_lock);
+ preempt_enable();
}
static int __stop_cpus(const struct cpumask *cpumask,
@@ -505,6 +512,7 @@ static int __init cpu_stop_init(void)
spin_lock_init(&stopper->lock);
INIT_LIST_HEAD(&stopper->works);
+ INIT_LIST_HEAD(&stopper->stop_work.list);
}
BUG_ON(smpboot_register_percpu_thread(&cpu_stop_threads));
--
1.5.5.1
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/