[RFC PATCH 6/6] stop_machine: optimize stop_work_alloc()

From: Oleg Nesterov
Date: Thu Jun 25 2015 - 22:17:31 EST


wait_event()/wake_up_all() in stop_work_alloc/stop_work_free logic
is very suboptimal because of non-exclusive wakeups. So we add the
wait_queue_func_t alloc_wake() helper which wakes the waiter up only
a) if it actually waits for a stop_work in the "freed" cpumask, and
b) only after we already set ->stop_owner = waiter.

So if 2 stop_machine()'s race with each other, the loser will likely
call schedule() only once and we will have a single wakeup.

TODO: we can optimize (and simplify!) this code more. We can remove
stop_work_alloc_one() and fold it into stop_work_alloc(), so that
prepare_to_wait() will be the outer loop. Lets do this later.

TODO: the init_waitqueue_func_entry() code in stop_work_alloc_one()
is really annoying, we need the trivial new __init_wait(wait, func)
helper.

Signed-off-by: Oleg Nesterov <oleg@xxxxxxxxxx>
---
kernel/stop_machine.c | 47 +++++++++++++++++++++++++++++++++++++++++++----
1 files changed, 43 insertions(+), 4 deletions(-)

diff --git a/kernel/stop_machine.c b/kernel/stop_machine.c
index 572abc9..bbfc670 100644
--- a/kernel/stop_machine.c
+++ b/kernel/stop_machine.c
@@ -63,21 +63,60 @@ static void stop_work_free(const struct cpumask *cpumask)

for_each_cpu(cpu, cpumask)
stop_work_free_one(cpu);
- wake_up_all(&stop_work_wq);
+ __wake_up(&stop_work_wq, TASK_ALL, 0, (void *)cpumask);
+}
+
+struct alloc_wait {
+ wait_queue_t wait;
+ int cpu;
+};
+
+static int alloc_wake(wait_queue_t *wait, unsigned int mode, int sync, void *key)
+{
+ struct alloc_wait *aw = container_of(wait, struct alloc_wait, wait);
+ struct cpu_stopper *stopper = &per_cpu(cpu_stopper, aw->cpu);
+ const struct cpumask *cpumask = key;
+
+ if (!cpumask_test_cpu(aw->cpu, cpumask))
+ return 0;
+ if (cmpxchg(&stopper->stop_owner, NULL, aw->wait.private) != NULL)
+ return 0;
+
+ return autoremove_wake_function(wait, mode, sync, key);
}

static struct cpu_stop_work *stop_work_alloc_one(int cpu, bool wait)
{
struct cpu_stopper *stopper = &per_cpu(cpu_stopper, cpu);
+ struct task_struct *me = current;
+ struct alloc_wait aw;

- if (cmpxchg(&stopper->stop_owner, NULL, current) == NULL)
+ if (cmpxchg(&stopper->stop_owner, NULL, me) == NULL)
goto done;

if (!wait)
return NULL;

- __wait_event(stop_work_wq,
- cmpxchg(&stopper->stop_owner, NULL, current) == NULL);
+ /* TODO: add __init_wait(wait, func) helper! */
+ INIT_LIST_HEAD(&aw.wait.task_list);
+ init_waitqueue_func_entry(&aw.wait, alloc_wake);
+ aw.wait.private = me;
+ aw.cpu = cpu;
+ for (;;) {
+ prepare_to_wait(&stop_work_wq, &aw.wait, TASK_UNINTERRUPTIBLE);
+ /*
+ * This can "falsely" fail if we race with alloc_wake() and
+ * stopper->stop_owner is already me, in this case schedule()
+ * won't block and the check below will notice this change.
+ */
+ if (cmpxchg(&stopper->stop_owner, NULL, me) == NULL)
+ break;
+
+ schedule();
+ if (likely(stopper->stop_owner == me))
+ break;
+ }
+ finish_wait(&stop_work_wq, &aw.wait);
done:
return &stopper->stop_work;
}
--
1.5.5.1

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/