[RFC] io_uring: avoid ring quiesce while registering/unregistering eventfd
From: Usama Arif
Date: Wed Feb 02 2022 - 10:59:49 EST
Acquire completion_lock at the start of __io_uring_register before
registering/unregistering eventfd and release it at the end. Hence
all calls to io_cqring_ev_posted which adds to the eventfd counter
will finish before acquiring the spin_lock in io_uring_register, and
all new calls will wait till the eventfd is registered. This avoids
ring quiesce which is much more expensive than acquiring the spin_lock.
On the system tested with this patch, io_uring_reigster with
IORING_REGISTER_EVENTFD takes less than 1ms, compared to 15ms before.
Signed-off-by: Usama Arif <usama.arif@xxxxxxxxxxxxx>
Reviewed-by: Fam Zheng <fam.zheng@xxxxxxxxxxxxx>
---
fs/io_uring.c | 50 ++++++++++++++++++++++++++++++++++----------------
1 file changed, 34 insertions(+), 16 deletions(-)
diff --git a/fs/io_uring.c b/fs/io_uring.c
index 2e04f718319d..e75d8abd225a 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -1803,11 +1803,11 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
ctx->rings->sq_flags & ~IORING_SQ_CQ_OVERFLOW);
}
- if (posted)
+ if (posted) {
io_commit_cqring(ctx);
- spin_unlock(&ctx->completion_lock);
- if (posted)
io_cqring_ev_posted(ctx);
+ }
+ spin_unlock(&ctx->completion_lock);
return all_flushed;
}
@@ -1971,8 +1971,8 @@ static void io_req_complete_post(struct io_kiocb *req, s32 res,
spin_lock(&ctx->completion_lock);
__io_req_complete_post(req, res, cflags);
io_commit_cqring(ctx);
- spin_unlock(&ctx->completion_lock);
io_cqring_ev_posted(ctx);
+ spin_unlock(&ctx->completion_lock);
}
static inline void io_req_complete_state(struct io_kiocb *req, s32 res,
@@ -2231,11 +2231,11 @@ static void __io_req_find_next_prep(struct io_kiocb *req)
spin_lock(&ctx->completion_lock);
posted = io_disarm_next(req);
- if (posted)
+ if (posted) {
io_commit_cqring(ctx);
- spin_unlock(&ctx->completion_lock);
- if (posted)
io_cqring_ev_posted(ctx);
+ }
+ spin_unlock(&ctx->completion_lock);
}
static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
@@ -2272,8 +2272,8 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked)
static inline void ctx_commit_and_unlock(struct io_ring_ctx *ctx)
{
io_commit_cqring(ctx);
- spin_unlock(&ctx->completion_lock);
io_cqring_ev_posted(ctx);
+ spin_unlock(&ctx->completion_lock);
}
static void handle_prev_tw_list(struct io_wq_work_node *node,
@@ -2535,8 +2535,8 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx)
}
io_commit_cqring(ctx);
- spin_unlock(&ctx->completion_lock);
io_cqring_ev_posted(ctx);
+ spin_unlock(&ctx->completion_lock);
state->flush_cqes = false;
}
@@ -5541,10 +5541,12 @@ static int io_poll_check_events(struct io_kiocb *req)
filled = io_fill_cqe_aux(ctx, req->user_data, mask,
IORING_CQE_F_MORE);
io_commit_cqring(ctx);
- spin_unlock(&ctx->completion_lock);
- if (unlikely(!filled))
+ if (unlikely(!filled)) {
+ spin_unlock(&ctx->completion_lock);
return -ECANCELED;
+ }
io_cqring_ev_posted(ctx);
+ spin_unlock(&ctx->completion_lock);
} else if (req->result) {
return 0;
}
@@ -5579,8 +5581,8 @@ static void io_poll_task_func(struct io_kiocb *req, bool *locked)
hash_del(&req->hash_node);
__io_req_complete_post(req, req->result, 0);
io_commit_cqring(ctx);
- spin_unlock(&ctx->completion_lock);
io_cqring_ev_posted(ctx);
+ spin_unlock(&ctx->completion_lock);
}
static void io_apoll_task_func(struct io_kiocb *req, bool *locked)
@@ -8351,8 +8353,8 @@ static void __io_rsrc_put_work(struct io_rsrc_node *ref_node)
spin_lock(&ctx->completion_lock);
io_fill_cqe_aux(ctx, prsrc->tag, 0, 0);
io_commit_cqring(ctx);
- spin_unlock(&ctx->completion_lock);
io_cqring_ev_posted(ctx);
+ spin_unlock(&ctx->completion_lock);
io_ring_submit_unlock(ctx, lock_ring);
}
@@ -9639,11 +9641,11 @@ static __cold bool io_kill_timeouts(struct io_ring_ctx *ctx,
}
}
spin_unlock_irq(&ctx->timeout_lock);
- if (canceled != 0)
+ if (canceled != 0) {
io_commit_cqring(ctx);
- spin_unlock(&ctx->completion_lock);
- if (canceled != 0)
io_cqring_ev_posted(ctx);
+ }
+ spin_unlock(&ctx->completion_lock);
return canceled != 0;
}
@@ -10970,6 +10972,8 @@ static bool io_register_op_must_quiesce(int op)
case IORING_REGISTER_IOWQ_AFF:
case IORING_UNREGISTER_IOWQ_AFF:
case IORING_REGISTER_IOWQ_MAX_WORKERS:
+ case IORING_REGISTER_EVENTFD:
+ case IORING_UNREGISTER_EVENTFD:
return false;
default:
return true;
@@ -11030,6 +11034,17 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
return -EACCES;
}
+ /*
+ * Acquire completion_lock at the start of __io_uring_register before
+ * registering/unregistering eventfd and release it at the end. Any
+ * completion events pending before this call will finish before acquiring
+ * the spin_lock here, and all new completion events will wait till the
+ * eventfd is registered. This avoids ring quiesce which is much more
+ * expensive then acquiring spin_lock.
+ */
+ if (opcode == IORING_REGISTER_EVENTFD || opcode == IORING_UNREGISTER_EVENTFD)
+ spin_lock(&ctx->completion_lock);
+
if (io_register_op_must_quiesce(opcode)) {
ret = io_ctx_quiesce(ctx);
if (ret)
@@ -11141,6 +11156,9 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
break;
}
+ if (opcode == IORING_REGISTER_EVENTFD || opcode == IORING_UNREGISTER_EVENTFD)
+ spin_unlock(&ctx->completion_lock);
+
if (io_register_op_must_quiesce(opcode)) {
/* bring the ctx back to life */
percpu_ref_reinit(&ctx->refs);
--
2.25.1