Re: [PATCH 2/3 v2] futex: avoid double wake up in futex_wake() on -RT

From: Davidlohr Bueso
Date: Thu Apr 16 2015 - 01:10:33 EST


On Sun, 2015-04-12 at 20:02 -0700, Davidlohr Bueso wrote:
> Doing the wakeups while holding the lock is also a general performance
> issue for futex_wake. The problem being dealing with spurious wakeups
> (wacky drivers), which makes no difference wrt nr_wake.

So I did some measurements with the patch below (Cc'ing Arnaldo for
perf-bench consideration, albeit probably still pretty crude) and by
doing the lockless wakeups, on avg we reduce contending waking threads
latency in about 2x for each thread, which indicates that overall
speedup is based on the number of futex_wake'ers.

I guess now we have the code, the numbers. I go back to auditing drivers
*sigh*. In any case any important core-code already deals with spurious
wakeups (the last silly offender being sysv sems), so I'm really not
_that_ concerned -- in fact, Peter, your patch to trigger them seems to
not trigger any issues anymore. But perhaps its late and I'm in lala
land.

Thanks,
Davidlohr

8<-------------------------------------------------------------
[PATCH] perf-bench/futex: Support parallel wakeup threads

The futex-wake benchmark only measures wakeups done within
a single process. While this has value in its own, it does
not really generate any hb->lock contention. A new -W option
is added, specifying concurrent waker threads. Naturally,
different combinations of numbers of blocking and waker
threads will exhibit different information.

TODO: Improve verbosity of run-to-run stats in parallel.

Kinda-Signed-off-by: Davidlohr Bueso <dbueso@xxxxxxx>
---
tools/perf/bench/futex-wake.c | 237 ++++++++++++++++++++++++++++++------------
1 file changed, 173 insertions(+), 64 deletions(-)

diff --git a/tools/perf/bench/futex-wake.c b/tools/perf/bench/futex-wake.c
index 929f762..67014a2 100644
--- a/tools/perf/bench/futex-wake.c
+++ b/tools/perf/bench/futex-wake.c
@@ -1,11 +1,11 @@
/*
- * Copyright (C) 2013 Davidlohr Bueso <davidlohr@xxxxxx>
+ * Copyright (C) 2013, 2015 Davidlohr Bueso.
*
* futex-wake: Block a bunch of threads on a futex and wake'em up, N at a time.
*
* This program is particularly useful to measure the latency of nthread wakeups
- * in non-error situations: all waiters are queued and all wake calls wakeup
- * one or more tasks, and thus the waitqueue is never empty.
+ * in non-error situations: all waiters are queued and all futex_wake calls
+ * wakeup one or more tasks, and thus the waitqueue is never empty.
*/

#include "../perf.h"
@@ -21,7 +21,7 @@
#include <sys/time.h>
#include <pthread.h>

-/* all threads will block on the same futex */
+/* all threads will block on the same futex -- hash bucket chaos ;) */
static u_int32_t futex1 = 0;

/*
@@ -30,16 +30,27 @@ static u_int32_t futex1 = 0;
*/
static unsigned int nwakes = 1;

-pthread_t *worker;
-static bool done = false, silent = false, fshared = false;
+static pthread_t *blocked_worker, *waking_worker;
+static bool done = false, silent = false;
+static bool fshared = false;
static pthread_mutex_t thread_lock;
static pthread_cond_t thread_parent, thread_worker;
-static struct stats waketime_stats, wakeup_stats;
-static unsigned int ncpus, threads_starting, nthreads = 0;
+static unsigned int ncpus, threads_starting;
+static struct stats *waketime_stats, *wakeup_stats;
+static unsigned int nblocked_threads = 0, nwaking_threads = 0;
+static unsigned int repeat = 0, wakeup_limit = 1;
+static unsigned int *idxp;
static int futex_flag = 0;

+#define parallel (wakeup_limit > 1 || nwaking_threads > 1)
+
static const struct option options[] = {
- OPT_UINTEGER('t', "threads", &nthreads, "Specify amount of threads"),
+ OPT_UINTEGER('t', "threads", &nblocked_threads, "Specify amount of threads"),
+ /*
+ * The difference between w and W is that W threads will perform w wakeups.
+ * If -W is not passed, the program defaults to 1 waking thread.
+ */
+ OPT_UINTEGER('W', "nwakers", &nwaking_threads, "Specify amount of waking threads (default 1)"),
OPT_UINTEGER('w', "nwakes", &nwakes, "Specify amount of threads to wake at once"),
OPT_BOOLEAN( 's', "silent", &silent, "Silent mode: do not display data/details"),
OPT_BOOLEAN( 'S', "shared", &fshared, "Use shared futexes instead of private ones"),
@@ -51,7 +62,46 @@ static const char * const bench_futex_wake_usage[] = {
NULL
};

-static void *workerfn(void *arg __maybe_unused)
+static void print_summary(void)
+{
+ unsigned int i, wakeup_avg;
+ double waketime_avg, waketime_stddev;
+ struct stats __waketime_stats, __wakeup_stats;
+
+ for (i = 0; i < wakeup_limit; i++) {
+ double waketime = avg_stats(&waketime_stats[i]);
+ unsigned int wakeup = avg_stats(&wakeup_stats[i]);
+
+ update_stats(&__wakeup_stats, wakeup);
+ update_stats(&__waketime_stats, waketime);
+ }
+
+ waketime_avg = avg_stats(&__waketime_stats);
+ waketime_stddev = stddev_stats(&__waketime_stats);
+ wakeup_avg = avg_stats(&__wakeup_stats);
+
+ printf("Wokeup %d of %d threads in %.4f ms (+-%.2f%%)\n",
+ wakeup_avg * wakeup_limit,
+ nblocked_threads,
+ waketime_avg/1e3,
+ rel_stddev_stats(waketime_stddev, waketime_avg));
+}
+
+static void print_banner(void)
+{
+ if (parallel)
+ printf("Run summary [PID %d]: blocking on %d threads (at [%s] "
+ "futex %p), %d threads waking up %d at a time.\n\n",
+ getpid(), nblocked_threads, fshared ? "shared":"private",
+ &futex1, nwaking_threads, nwakes);
+ else
+ printf("Run summary [PID %d]: blocking on %d threads (at [%s] "
+ "futex %p), waking up %d at a time.\n\n",
+ getpid(), nblocked_threads, fshared ? "shared":"private",
+ &futex1, nwakes);
+}
+
+static void *blocked_workerfn(void *arg __maybe_unused)
{
pthread_mutex_lock(&thread_lock);
threads_starting--;
@@ -64,36 +114,63 @@ static void *workerfn(void *arg __maybe_unused)
return NULL;
}

-static void print_summary(void)
-{
- double waketime_avg = avg_stats(&waketime_stats);
- double waketime_stddev = stddev_stats(&waketime_stats);
- unsigned int wakeup_avg = avg_stats(&wakeup_stats);
-
- printf("Wokeup %d of %d threads in %.4f ms (+-%.2f%%)\n",
- wakeup_avg,
- nthreads,
- waketime_avg/1e3,
- rel_stddev_stats(waketime_stddev, waketime_avg));
-}
-
static void block_threads(pthread_t *w,
pthread_attr_t thread_attr)
{
cpu_set_t cpu;
unsigned int i;

- threads_starting = nthreads;
+ threads_starting = nblocked_threads;

/* create and block all threads */
- for (i = 0; i < nthreads; i++) {
+ for (i = 0; i < nblocked_threads; i++) {
CPU_ZERO(&cpu);
CPU_SET(i % ncpus, &cpu);

if (pthread_attr_setaffinity_np(&thread_attr, sizeof(cpu_set_t), &cpu))
err(EXIT_FAILURE, "pthread_attr_setaffinity_np");

- if (pthread_create(&w[i], &thread_attr, workerfn, NULL))
+ if (pthread_create(&w[i], &thread_attr, blocked_workerfn, NULL))
+ err(EXIT_FAILURE, "pthread_create");
+ }
+}
+
+static void *waking_workerfn(void *arg)
+{
+ unsigned int nwoken = 0, idx = *(unsigned int *)arg;
+ unsigned int limit = parallel ? nwakes : nblocked_threads;
+ struct timeval start, end, runtime;
+
+ gettimeofday(&start, NULL);
+ while (nwoken != limit)
+ nwoken += futex_wake(&futex1, nwakes, futex_flag);
+
+ gettimeofday(&end, NULL);
+ timersub(&end, &start, &runtime);
+
+ update_stats(&wakeup_stats[idx], nwoken);
+ update_stats(&waketime_stats[idx], runtime.tv_usec);
+
+ if (!silent && !parallel) {
+ printf("[Run %d]: Wokeup %d of %d threads in %.4f ms\n",
+ repeat, nwoken, nblocked_threads, runtime.tv_usec/1e3);
+ }
+
+ pthread_exit(NULL);
+ return NULL;
+}
+
+static void wakeup_threads(pthread_t *w)
+{
+ unsigned int i, *idx;
+ idxp = calloc(wakeup_limit, sizeof(unsigned int));
+ if (!idxp)
+ errx(EXIT_FAILURE, "calloc");
+
+ /* create and block all threads */
+ for (i = 0; i < wakeup_limit; i++) {
+ *(idxp + i) = i;
+ if (pthread_create(&w[i], NULL, waking_workerfn, (void *)(idxp+i)))
err(EXIT_FAILURE, "pthread_create");
}
}
@@ -105,11 +182,40 @@ static void toggle_done(int sig __maybe_unused,
done = true;
}

+static void alloc_workers(void)
+{
+ waking_worker = calloc(nwaking_threads, sizeof(pthread_t));
+ if (!waking_worker)
+ err(EXIT_FAILURE, "calloc");
+
+ blocked_worker = calloc(nblocked_threads, sizeof(pthread_t));
+ if (!blocked_worker)
+ err(EXIT_FAILURE, "calloc");
+}
+
+static void alloc_init_stats(void)
+{
+ unsigned int i;
+
+ waketime_stats = calloc(wakeup_limit, sizeof(struct stats));
+ if (!waketime_stats)
+ errx(EXIT_FAILURE, "calloc");
+
+ wakeup_stats = calloc(wakeup_limit, sizeof(struct stats));
+ if (!wakeup_stats)
+ errx(EXIT_FAILURE, "calloc");
+
+ for (i = 0; i < wakeup_limit; i++) {
+ init_stats(&wakeup_stats[i]);
+ init_stats(&waketime_stats[i]);
+ }
+}
+
int bench_futex_wake(int argc, const char **argv,
const char *prefix __maybe_unused)
{
int ret = 0;
- unsigned int i, j;
+ unsigned int i;
struct sigaction act;
pthread_attr_t thread_attr;

@@ -125,35 +231,45 @@ int bench_futex_wake(int argc, const char **argv,
act.sa_sigaction = toggle_done;
sigaction(SIGINT, &act, NULL);

- if (!nthreads)
- nthreads = ncpus;
-
- worker = calloc(nthreads, sizeof(*worker));
- if (!worker)
- err(EXIT_FAILURE, "calloc");
+ if (!nblocked_threads)
+ nblocked_threads = ncpus;
+
+ /* sanity checks */
+ if (parallel) {
+ if (nwaking_threads & 0x1)
+ nwaking_threads++;
+
+ if (nwaking_threads > nblocked_threads)
+ nwaking_threads = nblocked_threads;
+
+ if (nblocked_threads % nwaking_threads)
+ errx(EXIT_FAILURE, "Must be perfectly divisible");
+ /*
+ * Each thread will wakeup nwakes tasks in
+ *a single futex_wait call.
+ */
+ nwakes = nblocked_threads/nwaking_threads;
+ wakeup_limit = nwaking_threads;
+ }

if (!fshared)
futex_flag = FUTEX_PRIVATE_FLAG;

- printf("Run summary [PID %d]: blocking on %d threads (at [%s] futex %p), "
- "waking up %d at a time.\n\n",
- getpid(), nthreads, fshared ? "shared":"private", &futex1, nwakes);
+ alloc_workers();
+ alloc_init_stats();
+
+ print_banner();

- init_stats(&wakeup_stats);
- init_stats(&waketime_stats);
pthread_attr_init(&thread_attr);
pthread_mutex_init(&thread_lock, NULL);
pthread_cond_init(&thread_parent, NULL);
pthread_cond_init(&thread_worker, NULL);

- for (j = 0; j < bench_repeat && !done; j++) {
- unsigned int nwoken = 0;
- struct timeval start, end, runtime;
-
+ for (repeat = 0; repeat < bench_repeat && !done; repeat++) {
/* create, launch & block all threads */
- block_threads(worker, thread_attr);
+ block_threads(blocked_worker, thread_attr);

- /* make sure all threads are already blocked */
+ /* ensure all threads are already blocked */
pthread_mutex_lock(&thread_lock);
while (threads_starting)
pthread_cond_wait(&thread_parent, &thread_lock);
@@ -162,37 +278,30 @@ int bench_futex_wake(int argc, const char **argv,

usleep(100000);

- /* Ok, all threads are patiently blocked, start waking folks up */
- gettimeofday(&start, NULL);
- while (nwoken != nthreads)
- nwoken += futex_wake(&futex1, nwakes, futex_flag);
- gettimeofday(&end, NULL);
- timersub(&end, &start, &runtime);
-
- update_stats(&wakeup_stats, nwoken);
- update_stats(&waketime_stats, runtime.tv_usec);
-
- if (!silent) {
- printf("[Run %d]: Wokeup %d of %d threads in %.4f ms\n",
- j + 1, nwoken, nthreads, runtime.tv_usec/1e3);
- }
+ /* ok, all threads are patiently blocked, start waking folks up */
+ wakeup_threads(waking_worker);
+ free(idxp);

- for (i = 0; i < nthreads; i++) {
- ret = pthread_join(worker[i], NULL);
+ /* we're done with this run */
+ for (i = 0; i < nblocked_threads; i++) {
+ ret = pthread_join(blocked_worker[i], NULL);
if (ret)
err(EXIT_FAILURE, "pthread_join");
}
-
}

- /* cleanup & report results */
+ /* report and cleanup */
+ print_summary();
+
pthread_cond_destroy(&thread_parent);
pthread_cond_destroy(&thread_worker);
pthread_mutex_destroy(&thread_lock);
pthread_attr_destroy(&thread_attr);

- print_summary();
+ free(waking_worker);
+ free(blocked_worker);
+ free(wakeup_stats);
+ free(waketime_stats);

- free(worker);
return ret;
}
--
2.1.4



--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/