Possible PREEMPT_RT live-lock / priority-inversion between FUTEX_CMP_REQUEUE_PI and FUTEX_WAIT_REQUEUE_PI
From: Moritz KLAMMLER (FERCHAU)
Date: Fri Mar 20 2026 - 15:24:39 EST
Hello,
we're running Linux 6.6 with PREEMPT_RT on a single-core armv7l machine
and observed our devices getting locked-up every few days. We're using
RT/PI condition variables from librtpi [1] and determined that the RT
(SCHED_FIFO) thread making the FUTEX_CMP_REQUEUE_PI syscall from within
pi_cond_broadcast seems to occasionally live-lock inside the kernel.
Thanks to a possibly less than ideal design decision in our system, the
"producer" thread calling pi_cond_broadcast (i.e. doing the
FUTEX_CMP_REQUEUE_PI) has a higher priority than the "consumer" threads
that are waiting on the condition variable (calling pi_cond_timedwait
which eventually makes a FUTEX_WAIT_REQUEUE_PI call). While this might
not be ideal, I suppose that it still ought to be allowed; please
correct me if I should be mistaken on that point.
What seems to happen next is that when the waiter exceeds its finite
timeout [2] and half an eye-blink later, the producer thread decides to
call FUTEX_CMP_REQUEUE_PI after all, the lower-priority consumer might
make it to the point where it sets the requeue state to
Q_REQUEUE_PI_DONE in futex_requeue_pi_wakeup_sync but then gets
preempted before it has a chance to remove itself from the waiters list.
Now, the higher-priority producer thread calls futex_requeue_pi_prepare
which will return false because it sees the Q_REQUEUE_PI_IGNORE.
Subsequently, futex_proxy_trylock_atomic will fail with -EAGAIN and
futex_requeue "goto retry". Which effectively results in the
higher-priority RT thread busy-waiting on the lower-priority thread
forever. It will call cond_resched before the "goto retry" but since it
is considered the most important task in the system, it doesn't seem to
be scheduled away anymore.
We have experimentally added a "retry counter" in futex_requeue and, if
it exceeds a reasonable limit [3], attempted to take evasive action.
Both of these seem to resolve the live-lock, but neither of them feels
great to me:
- Decrement the current task prio before each subsequent retry until it
eventually drops below the corresponding FUTEX_WAIT_REQUEUE_PI caller
and restore it again before returning from the syscall.
- Do a usleep_range_state(1, 100, TASK_KILLABLE) before the
cond_resched(). I'm clueless which numbers would be (and whether
TASK_KILLABLE is) appropriate for this. All I can say about the
chosen ones is that they did seem to reliably resolve the situation
in our use-case upon the first sleep.
I think the nicest option might be to somehow prevent the
FUTEX_WAIT_REQUEUE_PI syscall from being preempted during the critical
section in the first place. But I don't know whether, let alone how,
this could be accomplished. The only (probably flawed) idea that comes
to my mind is temporarily escalating the task's priority to the maximum,
which might have all kinds of other undesirable consequences.
The lock-up is reliably reproduced by having a higher-priority
"producer" thread doing
while (true) {
pi_mutex_lock(mutex);
++global_counter;
very_short_sleep();
pi_cond_broadcast(condvar, mutex);
pi_mutex_unlock(mutex);
short_sleep();
}
and a lower-priorty "consumer" thread doing
while (true) {
pi_mutex_lock(mutex);
while (global_counter == 0) {
pi_cond_timedwait(condvar, mutex, super_short_timeout);
}
--global_counter;
pi_mutex_unlock(mutex);
}
concurrently on a single-core machine. The very_short_sleep() in the
producer thread gives the lower-priority consumer thread a chance to
enter its critical section only to be immediately preempted again before
completing it.
I'm attaching an almost self-contained (only depending on librtpi)
example program [4] that implements the above pseudo-code, but it is
unfortunately a bit verbose. You might have to tweak the numbers for
different CPU speeds. On our device in question, it has never been
observed making it through 2^20 iterations on an unmodified kernel if
the producer's priority exceeds the consumer's. With different
priorities or one of the workarounds mentioned above, it does 2^30
iterations without issues.
Thanks for any insights on this topic.
Moritz
[1] https://github.com/dvhart/librtpi
[2] Exceeding the timeout is the only way by which I've been able to
reliably provoke this situation. I'm not sure if there would also
be other ways to prematurely wake up the waiter.
[3] I don't know what would be reasonable. We've used a limit of 10
before starting to take countermeasures in our experiments so far.
[4] Alas, librtpi seems to be assuming 32 bit timestamps, so the example
uses that, too. We've been able to observe the symptom with both,
32 and 64 bit syscalls.
#undef _FILE_OFFSET_BITS
#undef _TIME_BITS
#define _GNU_SOURCE
#include <assert.h>
#include <errno.h>
#include <limits.h>
#include <math.h>
#include <pthread.h>
#include <rtpi.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/random.h>
#include <time.h>
#include <unistd.h>
#define NANOS_PER_UNIT 1000000000LL
#define ARRAY_SIZE(A) (sizeof(A) / sizeof((A)[0]))
#define CSV(...) __VA_ARGS__
#define S(P) ((const char *)(P))
#define CONSOLE_PRINT(Format, ...) \
do { \
struct timespec _temp_ts; \
if (0 != clock_gettime(CLOCK_MONOTONIC, &_temp_ts)) { \
_temp_ts.tv_sec = _temp_ts.tv_nsec = 0; \
} \
const int _temp_id = gettid(); \
dprintf(STDERR_FILENO, "[%ld.%09lu][%d] %s:%d: " Format "\n", (long)_temp_ts.tv_sec, \
(unsigned long)_temp_ts.tv_nsec, _temp_id, __FILE__, __LINE__, ##__VA_ARGS__); \
} while (false)
#define FATAL_ERROR(...) \
do { \
CONSOLE_PRINT(__VA_ARGS__); \
abort(); \
} while (false)
#define MUST_ZERO_PTHREAD(...) \
do { \
const int _temp_err = (__VA_ARGS__); \
if (_temp_err != 0) { \
FATAL_ERROR("%s = %d (%s)", #__VA_ARGS__, _temp_err, strerror(_temp_err)); \
} \
} while (false)
#define MUST_ZERO_POSIX(...) \
do { \
const int _temp_ret = (__VA_ARGS__); \
if (_temp_ret != 0) { \
FATAL_ERROR("%s = %d (%s)", #__VA_ARGS__, _temp_ret, strerror(errno)); \
} \
} while (false)
#define MUST_NOT_NULL(...) \
do { \
const void *const _temp_ptr = (__VA_ARGS__); \
if (_temp_ptr == NULL) { \
FATAL_ERROR("%s = NULL (%s)\n", #__VA_ARGS__, strerror(errno)); \
} \
} while (false)
static pthread_barrier_t barrier;
static pi_mutex_t *mutex;
static pi_cond_t *condvar;
static int64_t global_limit = 0;
static int64_t global_tally = 0;
struct worker_thread_state {
struct timespec t1;
struct timespec t2;
struct timespec last_update;
};
static int64_t clamp(const int64_t floor, const int64_t value, const int64_t ceil) {
if (value < floor) {
return floor;
}
if (value > ceil) {
return ceil;
}
return value;
}
static int64_t nanoseconds(const struct timespec *const t1, const struct timespec *const t2) {
return NANOS_PER_UNIT * (t2->tv_sec - t1->tv_sec) + (t2->tv_nsec - t1->tv_nsec);
}
static int64_t little_sleepy(const useconds_t micmod) {
struct timespec t1, t2;
const useconds_t duration = random() % micmod;
MUST_ZERO_POSIX(clock_gettime(CLOCK_MONOTONIC, &t1));
MUST_ZERO_POSIX(usleep(duration));
MUST_ZERO_POSIX(clock_gettime(CLOCK_MONOTONIC, &t2));
return nanoseconds(&t1, &t2);
}
static const char *policy_name(const int policy) {
switch (policy) {
case SCHED_OTHER:
return "SCHED_OTHER";
case SCHED_IDLE:
return "SCHED_IDLE";
case SCHED_BATCH:
return "SCHED_BATCH";
case SCHED_FIFO:
return "SCHED_FIFO";
case SCHED_RR:
return "SCHED_RR";
default:
return "UNKNOWN";
}
}
static void overcome_barrier(const char *const who) {
const int ret = pthread_barrier_wait(&barrier);
if (ret == 0) {
CONSOLE_PRINT("The %s thread took the barrier as a follower", who);
} else if (ret == PTHREAD_BARRIER_SERIAL_THREAD) {
CONSOLE_PRINT("The %s thread took the barrier as the leader", who);
} else {
FATAL_ERROR("The %s thread didn't take the barrier: %s", who, strerror(ret));
}
}
static void worker_thread_enter(struct worker_thread_state *const state, const char *const who) {
int policy;
struct sched_param param;
MUST_ZERO_PTHREAD(pthread_getschedparam(pthread_self(), &policy, ¶m));
CONSOLE_PRINT("The %s thread runs with scheduling policy %s at priority %d",
CSV(who, policy_name(policy), param.sched_priority));
overcome_barrier(who);
MUST_ZERO_POSIX(clock_gettime(CLOCK_MONOTONIC, &state->t1));
}
static void worker_thread_progress(struct worker_thread_state *const state, const int64_t iter, const char *const who) {
struct timespec now;
MUST_ZERO_POSIX(clock_gettime(CLOCK_MONOTONIC, &now));
const int64_t since_start = nanoseconds(&state->t1, &now);
const int64_t since_last = nanoseconds(&state->last_update, &now);
const int64_t update_interval = clamp(10 * NANOS_PER_UNIT, since_start / 10, 1000 * NANOS_PER_UNIT);
if (iter >= 10 && since_last > update_interval) {
CONSOLE_PRINT("The %s thread has already processed %lld items (%.2f %%) in %.3f s (i.e. ≈ %.1f Hz)",
CSV(who, iter, 100.0 * iter / global_limit, 1.0E-9 * since_start, 1.0E+9 * iter / since_start));
state->last_update = now;
}
}
static int64_t worker_thread_completed(struct worker_thread_state *const state, const char *const who) {
MUST_ZERO_POSIX(clock_gettime(CLOCK_MONOTONIC, &state->t2));
const int64_t duration = nanoseconds(&state->t1, &state->t2);
CONSOLE_PRINT("The %s thread is done processing all %lld items in %.3f s (i.e. ≈ %.1f Hz)",
CSV(who, global_limit, 1.0E-9 * duration, 1.0E+9 * global_limit / duration));
return duration;
}
static void *worker_thread_exit(struct worker_thread_state *const state, const char *const who) {
(void)state;
CONSOLE_PRINT("The %s thread is about to exit", who);
return NULL;
}
static void *producer_function(void *const ident) {
struct worker_thread_state state = {0};
worker_thread_enter(&state, S(ident));
int64_t sleep_locked = 0, sleep_unlocked = 0;
for (int64_t i = 0; i < global_limit; ++i) {
worker_thread_progress(&state, i, S(ident));
MUST_ZERO_PTHREAD(pi_mutex_lock(mutex));
const int64_t local_tally = ++global_tally;
if (local_tally < 0) {
FATAL_ERROR("Negative value observed by %s thread", S(ident));
}
sleep_locked += little_sleepy(50);
MUST_ZERO_PTHREAD(pi_cond_broadcast(condvar, mutex));
MUST_ZERO_PTHREAD(pi_mutex_unlock(mutex));
sleep_unlocked += little_sleepy(100);
}
const int64_t loop_duration = worker_thread_completed(&state, S(ident));
const int64_t sleep_total = sleep_locked + sleep_unlocked;
CONSOLE_PRINT("The %s thread was sleeping for a total of %.3f seconds (%.2f %% of the time)",
CSV(S(ident), 1.0E-9 * sleep_total, 100.0 * sleep_total / loop_duration));
CONSOLE_PRINT("It was sleeping %.3f s (%.2f %%) with and %.3f s (%.2f %%) without holding the lock",
CSV(1.0E-9 * sleep_locked, 100.0 * sleep_locked / sleep_total),
CSV(1.0E-9 * sleep_unlocked, 100.0 * sleep_unlocked / sleep_total));
return worker_thread_exit(&state, S(ident));
}
static void *consumer_function(void *const ident) {
struct worker_thread_state state = {0};
worker_thread_enter(&state, S(ident));
int64_t wait_success = 0, wait_timeout = 0;
for (int64_t i = 0; i < global_limit; ++i) {
worker_thread_progress(&state, i, S(ident));
MUST_ZERO_PTHREAD(pi_mutex_lock(mutex));
for (uint64_t wait_nanos = 10; global_tally == 0; wait_nanos += wait_nanos / 2) {
struct timespec timeout;
MUST_ZERO_POSIX(clock_gettime(CLOCK_MONOTONIC, &timeout));
uint64_t more_nanos = timeout.tv_nsec + wait_nanos;
timeout.tv_sec += more_nanos / NANOS_PER_UNIT;
timeout.tv_nsec = more_nanos % NANOS_PER_UNIT;
const int ret = pi_cond_timedwait(condvar, mutex, &timeout);
if (ret == 0) {
wait_success += 1;
} else if (ret == ETIMEDOUT) {
wait_timeout += 1;
} else {
FATAL_ERROR("pi_cond_timedwait with relative timeout of %lld ns returned unexpected error code %d: %s",
CSV(wait_nanos, ret, strerror(ret)));
}
}
const int64_t local_tally = --global_tally;
if (local_tally < 0) {
FATAL_ERROR("Negative value observed by %s thread", S(ident));
}
MUST_ZERO_PTHREAD(pi_mutex_unlock(mutex));
}
worker_thread_completed(&state, S(ident));
const int64_t wait_total = wait_success + wait_timeout;
if (wait_total > 0) {
CONSOLE_PRINT("The %s thread encountered timeouts with %.2f %% of the %lld pi_cond_timedwait calls",
CSV(S(ident), 100.0 * wait_timeout / wait_total, wait_total));
} else {
CONSOLE_PRINT("The %s thread didn't call pi_cond_timedwait at all!", S(ident));
}
return worker_thread_exit(&state, S(ident));
}
static int64_t great_power(const int n) {
// Don't accept 2^0 because of atoi(3) deficiencies.
return (0 < n && n < 60) ? (1LL << n) : -1LL;
}
int main(int argc, char **argv) {
typedef void *(*thread_function_t)(void *);
struct {
pthread_t thread;
struct sched_param schedparams;
thread_function_t function;
char name[24];
} threads[] = {
{.function = producer_function, .name = "producer"},
{.function = consumer_function, .name = "consumer"},
};
const int thread_count = ARRAY_SIZE(threads);
if (argc < 4 || argc > 5) {
CONSOLE_PRINT("Usage: %s N PRO CON", argv[0]);
CONSOLE_PRINT("");
CONSOLE_PRINT("Process 2^N items with producer and consumer thread running with FIFO scheduling");
CONSOLE_PRINT("policy at priorities PRO and CON respectively.");
return EXIT_FAILURE;
} else if ((global_limit = great_power(atoi(argv[1]))) <= 0) {
CONSOLE_PRINT("Please choose a sensible number of iterations.");
return EXIT_FAILURE;
} else if ((threads[0].schedparams.sched_priority = atoi(argv[2])) <= 0) {
CONSOLE_PRINT("Please choose a positive real-time priority for the producer thread.");
return EXIT_FAILURE;
} else if ((threads[1].schedparams.sched_priority = atoi(argv[3])) <= 0) {
CONSOLE_PRINT("Please choose a positive real-time priority for the consumer thread.");
return EXIT_FAILURE;
}
{
unsigned entropy;
if (getrandom(&entropy, sizeof(entropy), 0) != (ssize_t)sizeof(entropy)) {
FATAL_ERROR("Unable to obtain %zu bytes of system entropy (errno = %d): %s",
CSV(sizeof(entropy), errno, strerror(errno)));
}
srandom(entropy);
}
if (threads[0].schedparams.sched_priority == threads[1].schedparams.sched_priority) {
CONSOLE_PRINT("About to process %lld items with the producer running at %s priority %s the consumer",
CSV(global_limit, "the same", "as"));
} else if (threads[0].schedparams.sched_priority > threads[1].schedparams.sched_priority) {
CONSOLE_PRINT("About to process %lld items with the producer running at %s priority %s the consumer",
CSV(global_limit, "higher", "than"));
} else if (threads[0].schedparams.sched_priority < threads[1].schedparams.sched_priority) {
CONSOLE_PRINT("About to process %lld items with the producer running at %s priority %s the consumer",
CSV(global_limit, "lower", "than"));
} else {
FATAL_ERROR("The impossible has happened.");
}
CONSOLE_PRINT("Preparing synchronization primitives ...");
MUST_ZERO_PTHREAD(pthread_barrier_init(&barrier, NULL, thread_count + 1));
MUST_NOT_NULL(mutex = pi_mutex_alloc());
MUST_NOT_NULL(condvar = pi_cond_alloc());
MUST_ZERO_PTHREAD(pi_mutex_init(mutex, 0));
MUST_ZERO_PTHREAD(pi_cond_init(condvar, 0));
CONSOLE_PRINT("Starting %d threads ...", thread_count);
for (int i = 0; i < thread_count; ++i) {
pthread_attr_t attrs;
MUST_ZERO_PTHREAD(pthread_attr_init(&attrs));
MUST_ZERO_PTHREAD(pthread_attr_setschedpolicy(&attrs, SCHED_FIFO));
MUST_ZERO_PTHREAD(pthread_attr_setschedparam(&attrs, &threads[i].schedparams));
MUST_ZERO_PTHREAD(pthread_attr_setinheritsched(&attrs, PTHREAD_EXPLICIT_SCHED));
MUST_ZERO_PTHREAD(pthread_create(&threads[i].thread, &attrs, threads[i].function, threads[i].name));
MUST_ZERO_PTHREAD(pthread_attr_destroy(&attrs));
}
CONSOLE_PRINT("Releasing worker threads ...");
overcome_barrier("main");
struct timespec ts_before;
MUST_ZERO_POSIX(clock_gettime(CLOCK_MONOTONIC, &ts_before));
CONSOLE_PRINT("Joining %d threads ...", thread_count);
for (int i = 0; i < thread_count; ++i) {
MUST_ZERO_PTHREAD(pthread_join(threads[i].thread, NULL));
}
struct timespec ts_after;
MUST_ZERO_POSIX(clock_gettime(CLOCK_MONOTONIC, &ts_after));
if (global_tally != 0) {
FATAL_ERROR("Main thread observed non-zero tally %lld afterwards", global_tally);
}
CONSOLE_PRINT("Cleaning up synchronization primitives ...");
MUST_ZERO_PTHREAD(pi_cond_destroy(condvar));
MUST_ZERO_PTHREAD(pi_mutex_destroy(mutex));
pi_cond_free(condvar);
pi_mutex_free(mutex);
MUST_ZERO_PTHREAD(pthread_barrier_destroy(&barrier));
const int64_t elapsed_nanos = nanoseconds(&ts_before, &ts_after);
CONSOLE_PRINT("Overall processing of %lld items took %.3f s (i.e. ≈ %.1f Hz)",
CSV(global_limit, 1.0E-9 * elapsed_nanos, 1.0E+9 * global_limit / elapsed_nanos));
return EXIT_SUCCESS;
}