Re: threadlets as 'naive pool of threads', epoll, some measurements

From: Ingo Molnar
Date: Mon Feb 26 2007 - 06:56:07 EST



yet another performance update - with the fixed 'heaps of stupid
threads' evserver_threadlet.c code attached below i got:

> evserver_epoll: 9400 reqs/sec
> evserver_epoll_threadlet: 9400 reqs/sec

evserver_threadlet: 9000 reqs/sec

so the overhead, instead of the 10x slowdown Evgeniy predicted/feared,
is 4% for this particular, very event-centric workload.

why? because Evgeniy still overlooks what i've mentioned so many times:
that there is lots of inherent 'caching' possible even in this
particular '8000 clients' workload, which even the most stupid threadlet
queueing model is able to take advantage of. The maximum level of
parallelism that i've measured during this test was 161 threads.

Ingo

-----------{ evserver_threadlet.c }--------------->
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <sys/wait.h>
#include <linux/futex.h>
#include <sys/time.h>
#include <sys/types.h>
#include <linux/unistd.h>
#include <netinet/tcp.h>
#include <stdio.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sched.h>
#include <sys/time.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>

#include <sys/poll.h>
#include <sys/sendfile.h>
#include <sys/epoll.h>

#define DEBUG 0

#include "syslet.h"
#include "sys.h"
#include "threadlet.h"

struct request {
struct request *next_free;
/*
* The threadlet stack is part of the request structure
* and is thus reused as threadlets complete:
*/
unsigned long threadlet_stack;

/*
* These are all the request-specific parameters:
*/
long sock;
};

//#define ulog(f, a...) fprintf(stderr, f, ##a)
#define ulog(f, a...)
#define ulog_err(f, a...) printf(f ": %s [%d].\n", ##a, strerror(errno), errno)

static long handle_request(void *__req)
{
struct request *req = __req;
int s = req->sock, err, fd;
off_t offset;
int count;
char path[] = "/tmp/index.html";
char buf[4096];
struct timeval tm;

count = 40960;
offset = 0;

err = recv(s, buf, sizeof(buf), 0);
if (err < 0) {
ulog_err("Failed to read data from s=%d", s);
goto err_out_remove;
}
if (err == 0) {
gettimeofday(&tm, NULL);
ulog("%08lu:%06lu: Client exited: fd=%d.\n", tm.tv_sec, tm.tv_usec, s);
goto err_out_remove;
}

fd = open(path, O_RDONLY);
if (fd == -1) {
ulog_err("Failed to open '%s'", path);
err = -1;
goto err_out_remove;
}
#if 0
do {
err = read(fd, buf, sizeof(buf));
if (err <= 0)
break;
err = send(s, buf, err, 0);
if (err <= 0)
break;
} while (1);
#endif
err = sendfile(s, fd, &offset, count);
{
int on = 0;
setsockopt(s, SOL_TCP, TCP_CORK, &on, sizeof(on));
}

close(fd);
if (err < 0) {
ulog_err("Failed send %d bytes: fd=%d.\n", count, s);
goto err_out_remove;
}
close(s);

gettimeofday(&tm, NULL);
ulog("%08lu:%06lu: %d bytes has been sent to client fd=%d.\n", tm.tv_sec, tm.tv_usec, err, s);

return complete_threadlet_fn(req, &async_head);

err_out_remove:
printf("blah\n");
close(s);
return complete_threadlet_fn(req, &async_head);
}

/*
* Freelist to recycle requests:
*/
static struct request *freelist;

/*
* Allocate a request and set up its syslet atoms:
*/
static struct request *alloc_req(void)
{
struct request *req;

/*
* Occasionally we have to refill the new-thread stack
* entry:
*/
if (!async_head.new_thread_stack) {
async_head.new_thread_stack = thread_stack_alloc();
pr("allocated new thread stack: %08lx\n",
async_head.new_thread_stack);
}

if (freelist) {
req = freelist;
pr("reusing req %p, threadlet stack %08lx\n",
req, req->threadlet_stack);
freelist = freelist->next_free;
req->next_free = NULL;
return req;
}

req = calloc(1, sizeof(struct request));
pr("allocated req %p\n", req);
req->threadlet_stack = thread_stack_alloc();
pr("allocated thread stack %08lx\n", req->threadlet_stack);

return req;
}

/*
* Check whether there are any completions queued for user-space
* to finish up:
*/
static unsigned long complete(void)
{
unsigned long completed = 0;
struct request *req;

for (;;) {
req = (void *)completion_ring[async_head.user_ring_idx];
if (!req)
return completed;
completed++;
pr("completed req %p (threadlet stack %08lx)\n",
req, req->threadlet_stack);

req->next_free = freelist;
freelist = req;

/*
* Clear the completion pointer. To make sure the
* kernel never stomps upon still unhandled completions
* in the ring the kernel only writes to a NULL entry,
* so user-space has to clear it explicitly:
*/
completion_ring[async_head.user_ring_idx] = NULL;
async_head.user_ring_idx++;
if (async_head.user_ring_idx == MAX_PENDING)
async_head.user_ring_idx = 0;
}
}

static unsigned int pending_requests;

/*
* Handle a request that has just been submitted (either it has
* already been executed, or we have to account it as pending):
*/
static void handle_submitted_request(struct request *req, long done)
{
unsigned int nr;

if (done) {
/*
* This is the cached case - free the request:
*/
pr("cache completed req %p (threadlet stack %08lx)\n",
req, req->threadlet_stack);
req->next_free = freelist;
freelist = req;
return;
}
/*
* 'cachemiss' case - the syslet is not finished
* yet. We will be notified about its completion
* via the completion ring:
*/
assert(pending_requests < MAX_PENDING-1);

pending_requests++;
pr("req %p is pending. %d reqs pending.\n", req, pending_requests);
/*
* Attempt to complete requests - this is a fast
* check if there's no completions:
*/
nr = complete();
pending_requests -= nr;

/*
* If the ring is full then wait a bit:
*/
while (pending_requests == MAX_PENDING-1) {
pr("sys_async_wait()");
/*
* Wait for 4 events - to batch things a bit:
*/
sys_async_wait(4, async_head.user_ring_idx, &async_head);
nr = complete();
pending_requests -= nr;
pr("after wait: completed %d requests - still pending: %d\n",
nr, pending_requests);
}
}

static void webserver(int l_sock)
{
struct sockaddr addr;
struct request *req;
socklen_t addrlen;
long done;
int sock;

async_head_init();

/*
* We dont use syslets for the first request (or for oversized
* and/or erroneous requests):
*/
for (;;) {
req = alloc_req();
if (!req)
break;

sock = accept(l_sock, &addr, &addrlen);

if (sock < 0)
break;

req->sock = sock;
done = threadlet_exec(handle_request, req,
req->threadlet_stack, &async_head);

handle_submitted_request(req, done);
}
}

static int tcp_listen_socket(int *l_socket)
{
int ret;
int one = 1;
struct sockaddr_in ad;

/*
* Setup for server socket
*/
memset(&ad, 0, sizeof(ad));
ad.sin_family = AF_INET;
ad.sin_addr.s_addr = htonl(INADDR_ANY);
ad.sin_port = htons(2222);

*l_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (*l_socket < 0) {
perror("socket (server)");
return *l_socket;
}

ret = setsockopt(*l_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&one,
sizeof(int));
if (ret < 0) {
perror("setsockopt");
return ret;
}

ret = bind(*l_socket, (struct sockaddr *)&ad, sizeof(ad));
if (ret < 0) {
perror("bind");
return ret;
}

ret = listen(*l_socket, 1000);
if (ret < 0) {
perror("listen");
return ret;
}

return 0;
}

int main(int argc, char *argv[])
{
int ret;
int l_sock;
struct sched_param p = { sched_priority: 0 };

sched_setscheduler(getpid(), 3 /* SCHED_BATCH */, &p);

ret = tcp_listen_socket(&l_sock);
if (ret)
exit(ret);
printf("listening on port 2222\n");
printf("using threadlets\n");

webserver(l_sock);
async_head_exit();

exit(0);
}
-
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/