RT thread migration (?) problem

From: Phil Wilshire
Date: Wed Apr 10 2013 - 08:49:55 EST


We are using kernel 3.2.21

It looks like we are seeing stalls or lockups when allowing rt threads to
migrate.

The problem has been much reduced by pinning threads to cpus.

However, we still see unreasonable delays in triggering a high priority rt
monitoring thread when it, alone, is allowed to migrate cpus. If we lock
this monitoring thread down to a single cpu and it all seems to work
properly.
By unreasonable, I mean a delay of over 500mS or more on a Real Time thread
expected to schedule every 250 mS.

The kernel 3.2.21 config is attached as is the test code.
We have tried other kernels and have seen exactly the same problem.

We have been looking at this problem for about 3 months now.. It was
initially exposed on an intel atom D525 cpu. The system simply seemed to
stop working for a period of time, sometimes over 5 seconds. All threads
and interrupts seemed to stop until suddenly it all woke up again.
Of course when the watchdog was set to 4 seconds we would get a surprise
reboot !!
When we extended the watchdog time we started to see the system recover in
the 5 second period.

This problem happens very rarely on "normal" system loading, say once
every three months or so.
We managed to accelerate the problem by creating some special stress test
code ( attached ) that reduced the failure time to a day or so on some
systems.

The test code loads the system heavily with a number of RT worker threads
repeatedly performing a small task. We are not worried about small missed
deadlines on the worker threads. We understand that we are subjecting the
system to an unreasonable load. We are simply trying to expose the main
problem.

In addition, there is an overall supervisor thread at the highest priority.
Under the fault condition the supervisor thread would sleep for 5 or so
seconds and then wake up and continue. The supervisor thread is in charge
of pinging the watchdog.

When running the special stress test, we hit the RT Throttling warning and
understand what that means.
I have enabled LTT tracing and ftrace. I did manage to get some
limited traces showing the system just hanging with no activity for 5
seconds. When it wakes up we have a rush of thread migration
operations as the system tries to recover.

Both trace options did not show much detail just prior to the hang.
When I enabled more tracing detail under ftrace the problem did not
seem to happen.

We originally thought this problem was associated with the ATOM processor.
The reason for the posting is that we are now seeing the same pattern
with some larger Intel Xeon E5-2630/2680 systems

I am ready to run any tests etc, possibly even provide hardware etc to
explore this problem.
Note the attached thread_test program currently pins the management
thread to cpu 0


TIA

Attachment: config-t510v1-3.2.21
Description: Binary data

/* Copyright (c) 2013 by Talari Networks
*
* Author: Todd Martin <tmartin@xxxxxxxxxx>
*
* Description: This program creates a workload intended to test how a system handles a large number of threads
* constantly doing small bits of work followed by small sleeps. This program creates a variety of worker threads of
* varying priorities with workloads that vary in size. This also creates a thread with real time priority that
* continually sleeps for 250ms and measures the amount of time that the sleep takes. This script logs whenever a sleep
* takes longer than 1 second.
*/

#include <stdio.h>
#include <stdlib.h>
#include <sys/param.h>
#include <sys/types.h>
#include <sys/prctl.h>
#include <errno.h>
#include <error.h>
#include <string.h>
#include <inttypes.h>
#include <pthread.h>
#include <time.h>
#include <unistd.h>

u_int64_t max_time_delta_us = 0;
u_int64_t max_ever_time_delta_us = 0;
FILE * logout;
/*************************************/
/* Tunable parameters of the program */
/*************************************/

/* Which scheduling policy should we use for the worker threads */
//#define WORKER_THREAD_SCHEDULE_POLICY SCHED_FIFO

/* The priorities for the worker threads. The priorities will range from BASE_WORKER_PRIORITY to
* BASE_WORKER_PRIORITY+PRIORITY_RANGE
*/
#define BASE_WORKER_PRIORITY 66
#define PRIORITY_RANGE 20

/* The minimum amount of time that a worker thread will sleep in microseconds */
#define MIN_WORKER_THREAD_SLEEP_TIME_US 150

/* The default number of worker threads to create */
#define DEFAULT_NUM_WORKER_THREADS 45

char * syscmd=NULL;

int t2_set_processor_affinity(pthread_t thread_id, unsigned long cpu_set)
{
int ret = pthread_setaffinity_np(thread_id, sizeof(cpu_set), &cpu_set);
if (ret)
{
printf("pthread_setaffinity_np failed. Error: %d cpu_set: 0x%X\n",
ret, (unsigned int) cpu_set);
}

return 0;
}

#if __WORDSIZE == 64
#define rdtscll(val) do { \
unsigned int __a,__d; \
asm volatile("rdtsc" : "=a" (__a), "=d" (__d)); \
(val) = ((unsigned long)__a) | (((unsigned long)__d)<<32); \
} while(0)
#else
#define rdtscll(val) __asm__ __volatile__ ("rdtsc" : "=A" (val))
#endif




int fib(int n)
{
if (n == 0 || n == 1)
return n;
else
return fib(n - 1) + fib(n - 2);
}

#define MIN_SCALE 1
#define MAX_SCALE 5
void do_dummy_work(int scale)
{
scale = MAX(MIN_SCALE, scale);
scale = MIN(MAX_SCALE, scale);
switch (scale)
{
case 1:
fib(5);
break;
case 2:
fib(10);
break;
case 3:
//fib(13);
fib(10);
break;
case 4:
//fib(15);
fib(12);
break;
case 5:
//fib(16);
fib(12);
break;
}

/* Timings on T510
Time for do_dummy_work(1) = 1 us
Time for do_dummy_work(2) = 4 us
Time for do_dummy_work(3) = 4 us
Time for do_dummy_work(4) = 10 us
Time for do_dummy_work(5) = 10 us
*/

}

float CPU_MHZ;
int num_worker_threads;
int sched_policy=SCHED_OTHER;
int num_cpus = 0;

int get_cpu_count(void)
{
int ret=0;
int cpu_count=0;
char * cmd= "cat /proc/cpuinfo | grep processor | wc > /tmp/cpu_count";
system(cmd);
FILE *file;

if (!(file = fopen("/tmp/cpu_count", "r")))
{
int err = errno;
error(-1, err, "Could not open /tmp/cpu_count: %s", strerror(err));
}

ret = fscanf(file, "%d", &cpu_count);
printf("Cpu count= %d\n", cpu_count);
fclose(file);


return cpu_count;
}

void init_timer()
{
char *pos;
size_t len;
FILE *file;

if (!(file = fopen("/proc/cpuinfo", "r")))
{
int err = errno;
error(-1, err, "Could not open /proc/cpuinfo: %s", strerror(err));
}

char buf[1024];
while (fgets(buf, sizeof (buf), file))
{
if (!(pos = strchr(buf, ':')))
continue;
if (!(len = strspn(pos, ": \t")))
continue;
else if (!strncmp(buf, "cpu MHz", strlen("cpu MHz")))
CPU_MHZ = atof(pos + len);
}

fclose(file);
}


/*
get_rtc_uS
Returns the real_time_clock value in microseconds
*/
u_int64_t get_rtc_uS()
{
u_int64_t now;
double now_f;

/* Get the time from the rtc */
rdtscll(now);
now_f = now;

/* Convert it to microseconds */
now_f /= CPU_MHZ;

now = now_f;

/* Return the result */
return now;
}

static inline u_int64_t delta_calc_u64(u_int64_t c1, u_int64_t c0)
{
// take into account wrap...
return (c1>=c0) ? (c1-c0) :
(((u_int64_t )(0xffffffffffffffffLLU))-c0) + c1+1;
}

int t2_usleep(u_int64_t u_sec)
{
struct timespec sleep_time = {0,0};
sleep_time.tv_sec = 0;
sleep_time.tv_nsec= u_sec*1000;
struct timespec remaining_time = {0,0};
while(nanosleep(&sleep_time, &remaining_time) != 0)
{
int err = errno;
if (err == EINTR)
{
sleep_time = remaining_time;
continue;
}
else
{
printf("Unexpected error from nanosleep\n");
exit(-1);
}
}
return 1;
}

void time_work(int work_scale, const char *str)
{
u_int64_t start_time_us = get_rtc_uS();
do_dummy_work(work_scale);
u_int64_t stop_time_us = get_rtc_uS();

u_int64_t time_delta_us = delta_calc_u64(stop_time_us, start_time_us);
printf("Time for %s = %"PRIu64" us\n", str, time_delta_us);

}

void worker_thread_task(int thread_num)
{
while(1)
{
do_dummy_work( (thread_num % MAX_SCALE) + 1);
t2_usleep( (MAX_SCALE - (thread_num%MAX_SCALE)) * MIN_WORKER_THREAD_SLEEP_TIME_US );
}
}

#define WORKER_PRIORITY(n) (BASE_WORKER_PRIORITY + (n%PRIORITY_RANGE))
void *dispatch_worker_thread_task(void *thread_num)
{
int num = (intptr_t) thread_num;
char thread_name[100];
snprintf(thread_name, sizeof(thread_name), "worker_%d", num);
prctl(PR_SET_NAME, (unsigned long) &thread_name);
pthread_t pthread_id = pthread_self();
struct sched_param param;
param.sched_priority = WORKER_PRIORITY(num);
pthread_setschedparam(pthread_id, sched_policy, &param);

worker_thread_task(num);
return NULL;
}

void start_worker_threads(void)
{
intptr_t i;
for (i = 0; i < num_worker_threads; i++)
{
unsigned long cpu_set = 1<< i%num_cpus;
//printf(" [%d] mod %d cpu_set %x\n", i, i%num_cpus, cpu_set);
pthread_t new_thread_id;
pthread_create(&new_thread_id, NULL, dispatch_worker_thread_task, (void *) i);
t2_set_processor_affinity(new_thread_id, cpu_set);

// pthread_detach(new_thread_id);
}
}

void *monitor_thread_task(void *dummy __attribute__((unused)))
{
char thread_name[100];
snprintf(thread_name, sizeof(thread_name), "monitor");
prctl(PR_SET_NAME, (unsigned long) &thread_name);
pthread_t pthread_id = pthread_self();
struct sched_param param;
param.sched_priority = 99;
pthread_setschedparam(pthread_id, SCHED_FIFO, &param);

u_int32_t loop_counter = 0;
while (1)
{
u_int64_t start_time_us = get_rtc_uS();
t2_usleep(250000); /* 250 ms */
u_int64_t stop_time_us = get_rtc_uS();

u_int64_t time_delta_us = delta_calc_u64(stop_time_us, start_time_us);

if (time_delta_us > max_time_delta_us)
max_time_delta_us = time_delta_us;
if (time_delta_us > max_ever_time_delta_us)
max_ever_time_delta_us = time_delta_us;
//if (time_delta_us > 1000000)
if (time_delta_us > 400000)
{
time_t current_time_t = time(NULL);
struct tm *current_tm = localtime(&current_time_t);
char time_str[128];
strftime(time_str, sizeof(time_str), "%c", current_tm);
fprintf(logout,"%s: Error: 250 ms sleep took %"PRIu64" us\n", time_str, time_delta_us);
fflush(logout);
if(syscmd)
system((char *)syscmd);
}

if ((loop_counter % 1200) == 60)
{
/* Every 5 minutes log a message to indicate that this thread is still running. */
time_t current_time_t = time(NULL);
struct tm *current_tm = localtime(&current_time_t);
char time_str[128];
strftime(time_str, sizeof(time_str), "%c", current_tm);
fprintf(logout,"%s:running %d threads max delta %"PRIu64" us max_ever %"PRIu64"\n",
time_str, num_worker_threads, max_time_delta_us, max_ever_time_delta_us);
fflush(logout);
max_time_delta_us = 0;
}
loop_counter++;
}
}

void start_monitor_thread(void)
{
pthread_t new_thread_id;
unsigned long cpu_set=1;

pthread_create(&new_thread_id, NULL, monitor_thread_task, NULL);
t2_set_processor_affinity(new_thread_id, cpu_set);
// pthread_detach(new_thread_id);
}

const char *schedule_policy_to_str(int policy)
{
switch (policy)
{
case SCHED_FIFO:
return "SCHED_FIFO";
case SCHED_RR:
return "SCHED_RR";
case SCHED_OTHER:
return "SCHED_OTHER";
default:
return "unknown";
}
}

void print_thread_test_parameters(void)
{
printf("thread_test parameters:\n");
printf("\tnumber of worker threads = %d\n", num_worker_threads);
printf("\tworker threads scheduling policy = %s\n", schedule_policy_to_str(sched_policy));
printf("\tbase worker thread priority = %d\n", BASE_WORKER_PRIORITY);
printf("\tmax worker thread priority = %d\n", BASE_WORKER_PRIORITY + PRIORITY_RANGE);
printf("\tmin worker thread sleep time = %d us\n", MIN_WORKER_THREAD_SLEEP_TIME_US);

if(syscmd)
printf("\tCommand used when delay detected (%s) \n", syscmd);
}

int main(int argc, const char *argv[])
{

int pnum;
logout = fopen("/var/log/thread_test","w");
if (!logout) {
printf("unable to open log file (%s), quitting\n",
"/var/log/thread_test");
return 0;
}
if (argc > 1 )
num_worker_threads = atoi(argv[1]);
else
num_worker_threads = DEFAULT_NUM_WORKER_THREADS;

pnum = 0;
if (argc > 2 ) {
pnum = atoi(argv[2]);
}


if(pnum >0) {
sched_policy = SCHED_FIFO;
} else {
sched_policy = SCHED_OTHER;
}

if (argc > 3 ) {
syscmd = (char *)argv[3];
}

if (num_worker_threads <= 0)
num_worker_threads = DEFAULT_NUM_WORKER_THREADS;

print_thread_test_parameters();
init_timer();
time_work(1, "do_dummy_work(1)");
time_work(2, "do_dummy_work(2)");
time_work(3, "do_dummy_work(3)");
time_work(4, "do_dummy_work(4)");
time_work(5, "do_dummy_work(5)");
num_cpus= get_cpu_count();
//return 0;


printf("Starting clock monitoring thread\n");
start_monitor_thread();

printf("Starting %d worker threads\n", num_worker_threads);
start_worker_threads();
//return 0;

printf("All threads started\n");
int i = 0;
while(1)
{
i++;
sleep(1);
if (i == 30)
{
/*
* This is for testing in case we accidently set the load on the system so high that we cannot
* get in to kill the program
printf("Exiting after 30 seconds\n");
exit(0);
*/
}
}
return 0;
}