Re: [PATCH v2 0/2] Optimise io_uring completion waiting

From: Pavel Begunkov
Date: Tue Sep 24 2019 - 07:11:39 EST


On 24/09/2019 13:34, Jens Axboe wrote:
> On 9/24/19 4:13 AM, Jens Axboe wrote:
>> On 9/24/19 3:49 AM, Peter Zijlstra wrote:
>>> On Tue, Sep 24, 2019 at 10:36:28AM +0200, Jens Axboe wrote:
>>>
>>>> +struct io_wait_queue {
>>>> + struct wait_queue_entry wq;
>>>> + struct io_ring_ctx *ctx;
>>>> + struct task_struct *task;
>>>
>>> wq.private is where the normal waitqueue stores the task pointer.
>>>
>>> (I'm going to rename that)
>>
>> If you do that, then we can just base the io_uring parts on that.
>
> Just took a quick look at it, and ran into block/kyber-iosched.c that
> actually uses the private pointer for something that isn't a task
> struct...
>

Let reuse autoremove_wake_function anyway. Changed a bit your patch:


diff --git a/fs/io_uring.c b/fs/io_uring.c
index 5c3f2bb81637..a77971290fdd 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -2690,6 +2690,38 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
return submit;
}

+struct io_wait_queue {
+ struct wait_queue_entry wq;
+ struct io_ring_ctx *ctx;
+ unsigned to_wait;
+ unsigned nr_timeouts;
+};
+
+static inline bool io_should_wake(struct io_wait_queue *iowq)
+{
+ struct io_ring_ctx *ctx = iowq->ctx;
+
+ /*
+ * Wake up if we have enough events, or if a timeout occured since we
+ * started waiting. For timeouts, we always want to return to userspace,
+ * regardless of event count.
+ */
+ return io_cqring_events(ctx->rings) >= iowq->to_wait ||
+ atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
+}
+
+static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
+ int wake_flags, void *key)
+{
+ struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
+ wq);
+
+ if (!io_should_wake(iowq))
+ return -1;
+
+ return autoremove_wake_function(curr, mode, wake_flags, key);
+}
+
/*
* Wait until events become available, if we don't already have some. The
* application must reap them itself, as they reside on the shared cq ring.
@@ -2697,8 +2729,16 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
const sigset_t __user *sig, size_t sigsz)
{
+ struct io_wait_queue iowq = {
+ .wq = {
+ .private = current,
+ .func = io_wake_function,
+ .entry = LIST_HEAD_INIT(iowq.wq.entry),
+ },
+ .ctx = ctx,
+ .to_wait = min_events,
+ };
struct io_rings *rings = ctx->rings;
- unsigned nr_timeouts;
int ret;

if (io_cqring_events(rings) >= min_events)
@@ -2717,15 +2757,18 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
return ret;
}

- nr_timeouts = atomic_read(&ctx->cq_timeouts);
- /*
- * Return if we have enough events, or if a timeout occured since
- * we started waiting. For timeouts, we always want to return to
- * userspace.
- */
- ret = wait_event_interruptible(ctx->wait,
- io_cqring_events(rings) >= min_events ||
- atomic_read(&ctx->cq_timeouts) != nr_timeouts);
+ iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
+ prepare_to_wait_exclusive(&ctx->wait, &iowq.wq, TASK_INTERRUPTIBLE);
+ do {
+ if (io_should_wake(&iowq))
+ break;
+ schedule();
+ if (signal_pending(current))
+ break;
+ set_current_state(TASK_INTERRUPTIBLE);
+ } while (1);
+ finish_wait(&ctx->wait, &iowq.wq);
+
restore_saved_sigmask_unless(ret == -ERESTARTSYS);
if (ret == -ERESTARTSYS)
ret = -EINTR;


--
Yours sincerely,
Pavel Begunkov