Re: [PATCH] io_thread/x86: don't reset 'cs', 'ss', 'ds' and 'es' registers for io_threads
From: Olivier Langlois
Date: Fri May 21 2021 - 03:31:57 EST
On Thu, 2021-05-20 at 00:13 -0400, Olivier Langlois wrote:
> I know that my test case isn't conclusive. It is a failed attempt to
> capture what my program is doing.
>
> The priority of investigating my core dump issue has substantially
> dropped last week because I did solve my primary issue (A buffer leak
> in the provided buffers to io_uring during disconnection). My program
> did run for days but it did crash morning without any core dump
> again.
> It is a very frustrating situation because it would probably be a bug
> trivial to diagnostic and fix but without the core, the logs are
> opaque
> and they just don't give no clue about why the program did crash.
>
> A key characteristic of my program, it is that it generates at least
> 1
> io-worker thread per io_uring instance.
As I get more familiar with io_uring source code, I have come to
realize that it is practically impossible to not end up with NO io-wq
threads. They are created in from io_uring_install_fd() which is called
for every instances created.
I'm a bit lazy for rebooting my desktop and I am still running 5.11.5
on it. I guess that with this kernel version, the io_uring threads
weren't threads belonging to the user process and arent showing with ps
by searching for a specific process LWPs.
I correctly see all the generated threads when I run the test program
on an up-to-date server (5.12.4).
I have rewritten the whole test program. It has now become an io_uring
multi connection http client (it could make a nice io_uring example
program, IMHO). Still no success in reproducing the problem with it.
So, I am giving up the idea of reproducing the problem with a showcase
program unless I have some clue about how to reproduce it.
However, I can reproduce it at will with my real program. So as Linus
has suggested, I'll investigate by searching where the PF_IO_WORKER is
used.
I'll keep the list updated if I discover something.
Greetings,
/*
* Test program to reproduce issue generating core dumps while using io_uring.
*
* To compile: g++ -pthread test_io_uring_coredump.cpp -luring
*
*/
#include <liburing.h>
#include <stdio.h>
#include <stdlib.h> // for abort()
#include <unistd.h> // for close()
#include <errno.h>
#include <fcntl.h>
#include <poll.h>
#include <string.h> // for strerror()
#include <memory.h> // for memset()
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netdb.h>
#include <stdint.h> // for uint32_t, uint64_t
#include <pthread.h>
#include <sstream>
#include <string>
#define QD 256
#define BGID1 0
#define BGID2 1
#define PORT1 1975
#define PORT2 1976
#define BUFSZ 5*4096
enum {
IOURING_POLL_ADD,
IOURING_POLL_REMOVE,
IOURING_PROV_BUF,
IOURING_BUF_READ,
IOURING_WRITE
};
static
const char *Io_Uring_OpTypeToStr(char type)
{
const char *res;
switch (type) {
case IOURING_POLL_ADD:
res = "IOURING_POLL_ADD";
break;
case IOURING_POLL_REMOVE:
res = "IOURING_POLL_REMOVE";
break;
case IOURING_PROV_BUF:
res = "IOURING_PROV_BUF";
break;
case IOURING_BUF_READ:
res = "IOURING_BUF_READ";
break;
case IOURING_WRITE:
res = "IOURING_WRITE";
break;
default:
res = "Unknown";
}
return res;
}
enum { THREAD_MAX_FD = 86, BIO_RING_SZ = 8 };
inline
void *
iouring_build_user_data(char type, int fd, uint32_t clientId)
{
return (void *)((uint32_t)fd | ((__u64)(clientId & 0x00ffffff) << 32 ) |
((__u64)type << 56));
}
inline
void
iouring_decode_user_data(uint64_t data, char *type, int *fd, uint32_t *clientId)
{
*type = data >> 56;
*fd = data & 0xffffffffU;
*clientId = (data >> 32) & 0x00ffffffU;
}
/*
* provideSimpleBuffer()
*/
static int provideSimpleBuffer(struct io_uring *ring, void *addr, int len, int nr, int bgid, int bid, int block)
{
int res;
// register buffers for buffer selection
struct io_uring_sqe *sqe;
struct io_uring_cqe *cqe;
sqe = io_uring_get_sqe(ring);
io_uring_prep_provide_buffers(sqe, addr, len, nr, bgid, bid);
io_uring_sqe_set_data(sqe, iouring_build_user_data(IOURING_PROV_BUF, 0, -1));
if (block) {
io_uring_submit(ring);
io_uring_wait_cqe(ring, &cqe);
res = cqe->res;
if (res < 0) {
errno = -res;
}
io_uring_cqe_seen(ring, cqe);
}
else {
res = 0;
}
return res;
}
/*
* initBufSelectRead()
*/
static void initBufSelectRead(struct io_uring *ring, int fd, size_t max_read, int gid, uint32_t clientId)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_read(sqe, fd, NULL, max_read, 0);
io_uring_sqe_set_flags(sqe, IOSQE_BUFFER_SELECT|IOSQE_ASYNC);
sqe->buf_group = gid;
io_uring_sqe_set_data(sqe, iouring_build_user_data(IOURING_BUF_READ, fd, clientId));
}
static void
iouring_write(struct io_uring *ring, int fd, const void *addr, int len, uint32_t clientId)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_write(sqe, fd, addr, len, 0);
io_uring_sqe_set_data(sqe, iouring_build_user_data(IOURING_WRITE, fd, clientId));
}
class BufferManager
{
public:
BufferManager(struct io_uring *ring, int nr, int max_sz, int gid);
~BufferManager();
char *getBuffer(int bid) { return m_bufferBase + bid*m_max_sz; }
void releaseBuffer(int bid);
int getMaxSz() const { return m_max_sz; }
int getGid() const { return m_gid; }
private:
struct io_uring *m_ring;
char *m_bufferBase;
int m_nr;
int m_max_sz;
int m_gid;
};
BufferManager::BufferManager(struct io_uring *ring, int nr, int max_sz, int gid)
: m_ring(ring),
m_bufferBase(nullptr),
m_nr(nr),
m_max_sz(max_sz),
m_gid(gid)
{
int res = posix_memalign(reinterpret_cast<void **>(&m_bufferBase), 1024, m_nr*m_max_sz);
if (res) {
fprintf(stderr, "posix_memalign: (%d) %s", res, strerror(res));
exit(0);
}
if (provideSimpleBuffer(m_ring, m_bufferBase, m_max_sz, m_nr, gid, 0, 1) < 0) {
fprintf(stderr, "provideSimpleBuffer() failed: (%d) %s\n", errno, strerror(errno));
exit(0);
}
}
BufferManager::~BufferManager()
{
// We could remove the buffer from io_uring here but for the sake of the test we won't
}
/*
* releaseBuffer()
*/
void BufferManager::releaseBuffer(int bid)
{
provideSimpleBuffer(m_ring, getBuffer(bid), m_max_sz, 1, m_gid, bid, 0);
}
class HttpClient
{
public:
HttpClient();
~HttpClient();
/*
* I don't want to write an URL parser for a test.
*/
void initiate(struct io_uring *ring, const char *host, const char *path, uint32_t clientId);
/*
* As long as the read succeeds, we initiate a new read. Eventually, the server
* is going to close its side of the connection and we will stop submitting operations.
* When all the clients reach that state, the thread should block into io_uring_enter waiting
* for new completion that will never come
*/
void processCompletion(struct io_uring *ring, struct io_uring_cqe *cqe, BufferManager &bufmgr);
private:
void sendRequest(struct io_uring *ring, uint32_t clientId);
void recvReply(struct io_uring *ring, struct io_uring_cqe *cqe, BufferManager &bufmgr);
int m_fd;
enum { DOWN, CONNECTING, WAIT_REPLY } m_state;
const char *m_host;
const char *m_path;
std::string m_request;
};
HttpClient::HttpClient()
: m_fd(-1),
m_state(DOWN),
m_host(nullptr),
m_path(nullptr)
{
}
HttpClient::~HttpClient()
{
if (m_fd >= 0)
close(m_fd);
m_fd = -1;
}
void HttpClient::initiate(struct io_uring *ring, const char *host, const char *path, uint32_t clientId)
{
m_host = host;
m_path = path;
struct addrinfo hints;
struct addrinfo *result = nullptr;
memset(&hints, 0, sizeof(hints));
hints.ai_socktype = SOCK_STREAM;
hints.ai_family = AF_INET;
if (getaddrinfo(m_host, nullptr, &hints, &result) < 0) {
fprintf(stderr, "getaddinfo() failed: (%d) %s\n", errno, strerror(errno));
exit(0);
}
void *p = NULL;
struct addrinfo *res = result;
/* pick the first AF_INET (IPv4) result */
while (!p && res) {
switch (res->ai_family) {
case AF_INET:
p = &((struct sockaddr_in *)res->ai_addr)->sin_addr;
break;
}
res = res->ai_next;
}
struct sockaddr_in sockAddr;
memset(&sockAddr, 0, sizeof(struct sockaddr_in));
sockAddr.sin_family = AF_INET;
sockAddr.sin_addr = *((struct in_addr *)p);
sockAddr.sin_port = htons(80);
freeaddrinfo(result);
m_fd = socket(AF_INET, SOCK_STREAM, 0);
if (m_fd < 0) {
fprintf(stderr, "socket() failed: (%d) %s\n", errno, strerror(errno));
exit(0);
}
if (fcntl(m_fd, F_SETFL, O_NONBLOCK) < 0) {
fprintf(stderr, "fcntl() failed: (%d) %s\n", errno, strerror(errno));
exit(0);
}
if (connect(m_fd, reinterpret_cast<struct sockaddr *>(&sockAddr), sizeof(struct sockaddr_in)) < 0 &&
errno != EINPROGRESS) {
fprintf(stderr, "connect() failed: (%d) %s\n", errno, strerror(errno));
exit(0);
}
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_poll_add(sqe, m_fd, POLLOUT);
io_uring_sqe_set_data(sqe, iouring_build_user_data(IOURING_POLL_ADD, m_fd, clientId));
m_state = CONNECTING;
}
/*
* sendRequest()
*
* GET / HTTP/1.1
* Host: kernel.org:80
*/
void HttpClient::sendRequest(struct io_uring *ring, uint32_t clientId)
{
std::ostringstream ost;
ost << "GET " << m_path << " HTTP/1.1\r\n";
ost << "Host: " << m_host << ":80\r\n";
ost << "User-Agent: io_uring coredump test 1.0\r\n\r\n";
m_request = ost.str();
iouring_write(ring, m_fd, m_request.c_str(), m_request.size(), clientId);
}
/*
* recvReply()
*/
void HttpClient::recvReply(struct io_uring *ring, struct io_uring_cqe *cqe, BufferManager &bufmgr)
{
char type;
int fd, bid;
uint32_t clientId;
int res = cqe->res;
iouring_decode_user_data(cqe->user_data, &type, &fd, &clientId);
bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
char *buf = bufmgr.getBuffer(bid);
fprintf(stderr, "client %d reply:\n\n%.*s\n", clientId, res, buf);
// returns the buffer back to io_uring.
provideSimpleBuffer(ring, buf, bufmgr.getMaxSz(), 1, bufmgr.getGid(), bid, 0);
}
/*
* processCompletion()
*/
void HttpClient::processCompletion(struct io_uring *ring,
struct io_uring_cqe *cqe,
BufferManager &bufmgr)
{
char type;
int fd, bid;
uint32_t clientId;
int res = cqe->res;
iouring_decode_user_data(cqe->user_data, &type, &fd, &clientId);
switch (type) {
case IOURING_POLL_ADD:
if (m_state != CONNECTING) {
fprintf(stderr, "client %d: unexpected IOURING_POLL_ADD completion\n", clientId);
abort();
}
// clear the O_NONBLOCK flag
fcntl(m_fd, F_SETFL, 0);
/*
* Setup the next read before sending the request.
* Maybe it is having multiple concurrent operations on the same
* fd that is triggering the io thread spawn.
*/
initBufSelectRead(ring, m_fd, bufmgr.getMaxSz(), bufmgr.getGid(), clientId);
sendRequest(ring, clientId);
m_state = WAIT_REPLY;
break;
case IOURING_POLL_REMOVE:
fprintf(stderr, "client %d: unexpected IOURING_POLL_REMOVE completion\n", clientId);
abort();
break;
case IOURING_PROV_BUF:
if (res < 0) {
fprintf(stderr, "client %d: unexpected IOURING_PROV_BUF failure: (%d) %s\n",
clientId, -res, strerror(-res));
abort();
}
break;
case IOURING_BUF_READ:
if (res <= 0) {
fprintf(stderr, "client %d: unexpected IOURING_BUF_READ res: %d\n",
clientId, res);
}
else {
recvReply(ring, cqe, bufmgr);
// Reload the read
initBufSelectRead(ring, m_fd, bufmgr.getMaxSz(), bufmgr.getGid(), clientId);
}
break;
case IOURING_WRITE:
fprintf(stderr, "client %d: request tx completed: %d\n", clientId, res);
break;
}
}
#define NUMCLIENTS 6
int main(int argc, char *argv[])
{
int res;
struct io_uring_params params;
struct io_uring ring;
memset(¶ms, 0, sizeof(params));
if (io_uring_queue_init_params(QD, &ring, ¶ms) < 0) {
fprintf(stderr, "io_uring_queue_init_params() failed: (%d) %s\n", errno, strerror(errno));
return res;
}
BufferManager bufmgr(&ring, THREAD_MAX_FD*BIO_RING_SZ, BUFSZ, BGID1);
const char *servers[NUMCLIENTS] = {
"google.com",
"facebook.com",
"twitter.com",
"www.cloudflare.com",
"kernel.org",
"phoronix.com"
};
HttpClient clients[NUMCLIENTS];
/*
* Give the tester
*/
sleep(10);
for (uint32_t i = 0; i < NUMCLIENTS; ++i) {
clients[i].initiate(&ring, servers[i], "/", i);
}
while (1) {
io_uring_submit_and_wait(&ring, 1);
unsigned completed = 0;
unsigned head;
struct io_uring_cqe *cqe;
io_uring_for_each_cqe(&ring, head, cqe) {
++completed;
char type;
int fd, bid;
uint32_t clientId;
iouring_decode_user_data(cqe->user_data, &type, &fd, &clientId);
if (clientId < NUMCLIENTS) {
clients[clientId].processCompletion(&ring, cqe, bufmgr);
}
else {
/*
* The only operation for which we don't assign a client id is IORING_OP_PROVIDE_BUFFERS
*/
if (type != IOURING_PROV_BUF) {
fprintf(stderr, "Unexpected completion event with no client association: %s\n",
Io_Uring_OpTypeToStr(type));
abort();
}
else if (cqe->res < 0) {
fprintf(stderr, "unexpected IOURING_PROV_BUF failure: (%d) %s\n",
-cqe->res, strerror(-cqe->res));
abort();
}
}
}
if (completed) {
io_uring_cq_advance(&ring, completed);
}
}
err2:
io_uring_queue_exit(&ring);
err1:
return res;
}