[PATCH 08/23] io_uring: internally pass CQ indexes

From: Pavel Begunkov
Date: Wed May 19 2021 - 10:15:05 EST


Allow to pass CQ index from SQE to the end CQE generators, but support
only one CQ for now.

Signed-off-by: Pavel Begunkov <asml.silence@xxxxxxxxx>
---
fs/io_uring.c | 113 ++++++++++++++++++++++------------
include/uapi/linux/io_uring.h | 1 +
2 files changed, 75 insertions(+), 39 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 4fecd9da689e..356a5dc90f46 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -90,6 +90,8 @@
#define IORING_MAX_ENTRIES 32768
#define IORING_MAX_CQ_ENTRIES (2 * IORING_MAX_ENTRIES)

+#define IO_DEFAULT_CQ 0
+
/*
* Shift of 9 is 512 entries, or exactly one page on 64-bit archs
*/
@@ -416,6 +418,7 @@ struct io_ring_ctx {
unsigned cq_extra;
struct wait_queue_head cq_wait;
struct io_cqring cqs[1];
+ unsigned int cq_nr;

struct {
spinlock_t completion_lock;
@@ -832,6 +835,7 @@ struct io_kiocb {

struct io_kiocb *link;
struct percpu_ref *fixed_rsrc_refs;
+ u16 cq_idx;

/* used with ctx->iopoll_list with reads/writes */
struct list_head inflight_entry;
@@ -1034,7 +1038,8 @@ static void io_uring_cancel_sqpoll(struct io_sq_data *sqd);
static struct io_rsrc_node *io_rsrc_node_alloc(struct io_ring_ctx *ctx);

static bool io_cqring_fill_event(struct io_ring_ctx *ctx, u64 user_data,
- long res, unsigned int cflags);
+ long res, unsigned int cflags,
+ unsigned int cq_idx);
static void io_put_req(struct io_kiocb *req);
static void io_put_req_deferred(struct io_kiocb *req, int nr);
static void io_dismantle_req(struct io_kiocb *req);
@@ -1207,13 +1212,15 @@ static void io_account_cq_overflow(struct io_ring_ctx *ctx)

static bool req_need_defer(struct io_kiocb *req, u32 seq)
{
- if (unlikely(req->flags & REQ_F_IO_DRAIN)) {
- struct io_ring_ctx *ctx = req->ctx;
-
- return seq + READ_ONCE(ctx->cq_extra) != ctx->cqs[0].cached_tail;
- }
+ struct io_ring_ctx *ctx = req->ctx;
+ u32 cnt = 0;
+ int i;

- return false;
+ if (!(req->flags & REQ_F_IO_DRAIN))
+ return false;
+ for (i = 0; i < ctx->cq_nr; i++)
+ cnt += ctx->cqs[i].cached_tail;
+ return seq + READ_ONCE(ctx->cq_extra) != cnt;
}

static void io_req_track_inflight(struct io_kiocb *req)
@@ -1289,7 +1296,8 @@ static void io_kill_timeout(struct io_kiocb *req, int status)
atomic_set(&req->ctx->cq_timeouts,
atomic_read(&req->ctx->cq_timeouts) + 1);
list_del_init(&req->timeout.list);
- io_cqring_fill_event(req->ctx, req->user_data, status, 0);
+ io_cqring_fill_event(req->ctx, req->user_data, status, 0,
+ req->cq_idx);
io_put_req_deferred(req, 1);
}
}
@@ -1346,10 +1354,13 @@ static void io_flush_timeouts(struct io_ring_ctx *ctx)

static void io_commit_cqring(struct io_ring_ctx *ctx)
{
+ int i;
+
io_flush_timeouts(ctx);

/* order cqe stores with ring update */
- smp_store_release(&ctx->rings->cq.tail, ctx->cqs[0].cached_tail);
+ for (i = 0; i < ctx->cq_nr; i++)
+ smp_store_release(&ctx->cqs[i].rings->cq.tail, ctx->cqs[i].cached_tail);

if (unlikely(!list_empty(&ctx->defer_list)))
__io_queue_deferred(ctx);
@@ -1362,25 +1373,27 @@ static inline bool io_sqring_full(struct io_ring_ctx *ctx)
return READ_ONCE(r->sq.tail) - ctx->cached_sq_head == ctx->sq_entries;
}

-static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
+static inline unsigned int __io_cqring_events(struct io_cqring *cq)
{
- return ctx->cqs[0].cached_tail - READ_ONCE(ctx->rings->cq.head);
+ return cq->cached_tail - READ_ONCE(cq->rings->cq.head);
}

-static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx)
+static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx,
+ unsigned int idx)
{
- struct io_rings *rings = ctx->rings;
- unsigned tail, mask = ctx->cqs[0].entries - 1;
+ struct io_cqring *cq = &ctx->cqs[idx];
+ struct io_rings *rings = cq->rings;
+ unsigned tail, mask = cq->entries - 1;

/*
* writes to the cq entry need to come after reading head; the
* control dependency is enough as we're using WRITE_ONCE to
* fill the cq entry
*/
- if (__io_cqring_events(ctx) == ctx->cqs[0].entries)
+ if (__io_cqring_events(cq) == cq->entries)
return NULL;

- tail = ctx->cqs[0].cached_tail++;
+ tail = cq->cached_tail++;
return &rings->cqes[tail & mask];
}

@@ -1432,16 +1445,18 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
{
unsigned long flags;
bool all_flushed, posted;
+ struct io_cqring *cq = &ctx->cqs[IO_DEFAULT_CQ];

- if (!force && __io_cqring_events(ctx) == ctx->cqs[0].entries)
+ if (!force && __io_cqring_events(cq) == cq->entries)
return false;

posted = false;
spin_lock_irqsave(&ctx->completion_lock, flags);
while (!list_empty(&ctx->cq_overflow_list)) {
- struct io_uring_cqe *cqe = io_get_cqe(ctx);
+ struct io_uring_cqe *cqe = io_get_cqe(ctx, IO_DEFAULT_CQ);
struct io_overflow_cqe *ocqe;

+
if (!cqe && !force)
break;
ocqe = list_first_entry(&ctx->cq_overflow_list,
@@ -1523,12 +1538,17 @@ static inline void req_ref_get(struct io_kiocb *req)
}

static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data,
- long res, unsigned int cflags)
+ long res, unsigned int cflags,
+ unsigned int cq_idx)
{
struct io_overflow_cqe *ocqe;

+ if (cq_idx != IO_DEFAULT_CQ)
+ goto overflow;
+
ocqe = kmalloc(sizeof(*ocqe), GFP_ATOMIC | __GFP_ACCOUNT);
if (!ocqe) {
+overflow:
/*
* If we're in ring overflow flush mode, or in task cancel mode,
* or cannot allocate an overflow entry, then we need to drop it
@@ -1550,7 +1570,8 @@ static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data,
}

static inline bool __io_cqring_fill_event(struct io_ring_ctx *ctx, u64 user_data,
- long res, unsigned int cflags)
+ long res, unsigned int cflags,
+ unsigned int cq_idx)
{
struct io_uring_cqe *cqe;

@@ -1561,21 +1582,22 @@ static inline bool __io_cqring_fill_event(struct io_ring_ctx *ctx, u64 user_data
* submission (by quite a lot). Increment the overflow count in
* the ring.
*/
- cqe = io_get_cqe(ctx);
+ cqe = io_get_cqe(ctx, cq_idx);
if (likely(cqe)) {
WRITE_ONCE(cqe->user_data, user_data);
WRITE_ONCE(cqe->res, res);
WRITE_ONCE(cqe->flags, cflags);
return true;
}
- return io_cqring_event_overflow(ctx, user_data, res, cflags);
+ return io_cqring_event_overflow(ctx, user_data, res, cflags, cq_idx);
}

/* not as hot to bloat with inlining */
static noinline bool io_cqring_fill_event(struct io_ring_ctx *ctx, u64 user_data,
- long res, unsigned int cflags)
+ long res, unsigned int cflags,
+ unsigned int cq_idx)
{
- return __io_cqring_fill_event(ctx, user_data, res, cflags);
+ return __io_cqring_fill_event(ctx, user_data, res, cflags, cq_idx);
}

static void io_req_complete_post(struct io_kiocb *req, long res,
@@ -1585,7 +1607,7 @@ static void io_req_complete_post(struct io_kiocb *req, long res,
unsigned long flags;

spin_lock_irqsave(&ctx->completion_lock, flags);
- __io_cqring_fill_event(ctx, req->user_data, res, cflags);
+ __io_cqring_fill_event(ctx, req->user_data, res, cflags, req->cq_idx);
/*
* If we're the last reference to this request, add to our locked
* free_list cache.
@@ -1797,7 +1819,7 @@ static bool io_kill_linked_timeout(struct io_kiocb *req)
link->timeout.head = NULL;
if (hrtimer_try_to_cancel(&io->timer) != -1) {
io_cqring_fill_event(link->ctx, link->user_data,
- -ECANCELED, 0);
+ -ECANCELED, 0, link->cq_idx);
io_put_req_deferred(link, 1);
return true;
}
@@ -1816,7 +1838,8 @@ static void io_fail_links(struct io_kiocb *req)
link->link = NULL;

trace_io_uring_fail_link(req, link);
- io_cqring_fill_event(link->ctx, link->user_data, -ECANCELED, 0);
+ io_cqring_fill_event(link->ctx, link->user_data, -ECANCELED, 0,
+ link->cq_idx);
io_put_req_deferred(link, 2);
link = nxt;
}
@@ -2138,7 +2161,7 @@ static void io_submit_flush_completions(struct io_comp_state *cs,
for (i = 0; i < nr; i++) {
req = cs->reqs[i];
__io_cqring_fill_event(ctx, req->user_data, req->result,
- req->compl.cflags);
+ req->compl.cflags, req->cq_idx);
}
io_commit_cqring(ctx);
spin_unlock_irq(&ctx->completion_lock);
@@ -2201,7 +2224,7 @@ static unsigned io_cqring_events(struct io_ring_ctx *ctx)
{
/* See comment at the top of this file */
smp_rmb();
- return __io_cqring_events(ctx);
+ return __io_cqring_events(&ctx->cqs[IO_DEFAULT_CQ]);
}

static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
@@ -2278,7 +2301,8 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
if (req->flags & REQ_F_BUFFER_SELECTED)
cflags = io_put_rw_kbuf(req);

- __io_cqring_fill_event(ctx, req->user_data, req->result, cflags);
+ __io_cqring_fill_event(ctx, req->user_data, req->result, cflags,
+ req->cq_idx);
(*nr_events)++;

if (req_ref_put_and_test(req))
@@ -4911,7 +4935,7 @@ static bool io_poll_complete(struct io_kiocb *req, __poll_t mask)
}
if (req->poll.events & EPOLLONESHOT)
flags = 0;
- if (!io_cqring_fill_event(ctx, req->user_data, error, flags)) {
+ if (!io_cqring_fill_event(ctx, req->user_data, error, flags, req->cq_idx)) {
io_poll_remove_waitqs(req);
req->poll.done = true;
flags = 0;
@@ -5242,7 +5266,8 @@ static bool io_poll_remove_one(struct io_kiocb *req)

do_complete = io_poll_remove_waitqs(req);
if (do_complete) {
- io_cqring_fill_event(req->ctx, req->user_data, -ECANCELED, 0);
+ io_cqring_fill_event(req->ctx, req->user_data, -ECANCELED, 0,
+ req->cq_idx);
io_commit_cqring(req->ctx);
req_set_fail_links(req);
io_put_req_deferred(req, 1);
@@ -5494,7 +5519,7 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
atomic_set(&req->ctx->cq_timeouts,
atomic_read(&req->ctx->cq_timeouts) + 1);

- io_cqring_fill_event(ctx, req->user_data, -ETIME, 0);
+ io_cqring_fill_event(ctx, req->user_data, -ETIME, 0, req->cq_idx);
io_commit_cqring(ctx);
spin_unlock_irqrestore(&ctx->completion_lock, flags);

@@ -5536,7 +5561,7 @@ static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
return PTR_ERR(req);

req_set_fail_links(req);
- io_cqring_fill_event(ctx, req->user_data, -ECANCELED, 0);
+ io_cqring_fill_event(ctx, req->user_data, -ECANCELED, 0, req->cq_idx);
io_put_req_deferred(req, 1);
return 0;
}
@@ -5609,7 +5634,7 @@ static int io_timeout_remove(struct io_kiocb *req, unsigned int issue_flags)
ret = io_timeout_update(ctx, tr->addr, &tr->ts,
io_translate_timeout_mode(tr->flags));

- io_cqring_fill_event(ctx, req->user_data, ret, 0);
+ io_cqring_fill_event(ctx, req->user_data, ret, 0, req->cq_idx);
io_commit_cqring(ctx);
spin_unlock_irq(&ctx->completion_lock);
io_cqring_ev_posted(ctx);
@@ -5761,7 +5786,7 @@ static void io_async_find_and_cancel(struct io_ring_ctx *ctx,
done:
if (!ret)
ret = success_ret;
- io_cqring_fill_event(ctx, req->user_data, ret, 0);
+ io_cqring_fill_event(ctx, req->user_data, ret, 0, req->cq_idx);
io_commit_cqring(ctx);
spin_unlock_irqrestore(&ctx->completion_lock, flags);
io_cqring_ev_posted(ctx);
@@ -5818,7 +5843,7 @@ static int io_async_cancel(struct io_kiocb *req, unsigned int issue_flags)

spin_lock_irq(&ctx->completion_lock);
done:
- io_cqring_fill_event(ctx, req->user_data, ret, 0);
+ io_cqring_fill_event(ctx, req->user_data, ret, 0, req->cq_idx);
io_commit_cqring(ctx);
spin_unlock_irq(&ctx->completion_lock);
io_cqring_ev_posted(ctx);
@@ -6516,6 +6541,11 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
req->result = 0;
req->work.creds = NULL;

+ req->cq_idx = READ_ONCE(sqe->cq_idx);
+ if (unlikely(req->cq_idx >= ctx->cq_nr)) {
+ req->cq_idx = IO_DEFAULT_CQ;
+ return -EINVAL;
+ }
/* enforce forwards compatibility on users */
if (unlikely(sqe_flags & ~SQE_VALID_FLAGS))
return -EINVAL;
@@ -7548,7 +7578,7 @@ static void __io_rsrc_put_work(struct io_rsrc_node *ref_node)

io_ring_submit_lock(ctx, lock_ring);
spin_lock_irqsave(&ctx->completion_lock, flags);
- io_cqring_fill_event(ctx, prsrc->tag, 0, 0);
+ io_cqring_fill_event(ctx, prsrc->tag, 0, 0, IO_DEFAULT_CQ);
ctx->cq_extra++;
io_commit_cqring(ctx);
spin_unlock_irqrestore(&ctx->completion_lock, flags);
@@ -9484,7 +9514,6 @@ static int io_allocate_scq_urings(struct io_ring_ctx *ctx,

/* make sure these are sane, as we already accounted them */
ctx->sq_entries = p->sq_entries;
- ctx->cqs[0].entries = p->cq_entries;

size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);
if (size == SIZE_MAX)
@@ -9501,6 +9530,11 @@ static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
rings->sq_ring_entries = p->sq_entries;
rings->cq_ring_entries = p->cq_entries;

+ ctx->cqs[0].cached_tail = 0;
+ ctx->cqs[0].rings = rings;
+ ctx->cqs[0].entries = p->cq_entries;
+ ctx->cq_nr = 1;
+
size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
if (size == SIZE_MAX) {
io_mem_free(ctx->rings);
@@ -10164,6 +10198,7 @@ static int __init io_uring_init(void)
BUILD_BUG_SQE_ELEM(40, __u16, buf_index);
BUILD_BUG_SQE_ELEM(42, __u16, personality);
BUILD_BUG_SQE_ELEM(44, __s32, splice_fd_in);
+ BUILD_BUG_SQE_ELEM(48, __u16, cq_idx);

BUILD_BUG_ON(sizeof(struct io_uring_files_update) !=
sizeof(struct io_uring_rsrc_update));
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index e1ae46683301..c2dfb179360a 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -58,6 +58,7 @@ struct io_uring_sqe {
/* personality to use, if used */
__u16 personality;
__s32 splice_fd_in;
+ __u16 cq_idx;
};
__u64 __pad2[3];
};
--
2.31.1