Re: [PATCH V4 13/27] ublk: add batch I/O dispatch infrastructure

From: Caleb Sander Mateos

Date: Sun Nov 30 2025 - 14:24:25 EST


On Thu, Nov 20, 2025 at 6:00 PM Ming Lei <ming.lei@xxxxxxxxxx> wrote:
>
> Add infrastructure for delivering I/O commands to ublk server in batches,
> preparing for the upcoming UBLK_U_IO_FETCH_IO_CMDS feature.
>
> Key components:
>
> - struct ublk_batch_fcmd: Represents a batch fetch uring_cmd that will
> receive multiple I/O tags in a single operation, using io_uring's
> multishot command for efficient ublk IO delivery.
>
> - ublk_batch_dispatch(): Batch version of ublk_dispatch_req() that:
> * Pulls multiple request tags from the events FIFO (lock-free reader)
> * Prepares each I/O for delivery (including auto buffer registration)
> * Delivers tags to userspace via single uring_cmd notification
> * Handles partial failures by restoring undelivered tags to FIFO
>
> The batch approach significantly reduces notification overhead by aggregating
> multiple I/O completions into single uring_cmd, while maintaining the same
> I/O processing semantics as individual operations.
>
> Error handling ensures system consistency: if buffer selection or CQE
> posting fails, undelivered tags are restored to the FIFO for retry,
> meantime IO state has to be restored.
>
> This runs in task work context, scheduled via io_uring_cmd_complete_in_task()
> or called directly from ->uring_cmd(), enabling efficient batch processing
> without blocking the I/O submission path.
>
> Signed-off-by: Ming Lei <ming.lei@xxxxxxxxxx>
> ---
> drivers/block/ublk_drv.c | 189 +++++++++++++++++++++++++++++++++++++++
> 1 file changed, 189 insertions(+)
>
> diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> index 6ff284243630..cc9c92d97349 100644
> --- a/drivers/block/ublk_drv.c
> +++ b/drivers/block/ublk_drv.c
> @@ -91,6 +91,12 @@
> UBLK_BATCH_F_HAS_BUF_ADDR | \
> UBLK_BATCH_F_AUTO_BUF_REG_FALLBACK)
>
> +/* ublk batch fetch uring_cmd */
> +struct ublk_batch_fcmd {

I would prefer "fetch_cmd" instead of "fcmd" for clarity

> + struct io_uring_cmd *cmd;
> + unsigned short buf_group;
> +};
> +
> struct ublk_uring_cmd_pdu {
> /*
> * Store requests in same batch temporarily for queuing them to
> @@ -168,6 +174,9 @@ struct ublk_batch_io_data {
> */
> #define UBLK_REFCOUNT_INIT (REFCOUNT_MAX / 2)
>
> +/* used for UBLK_F_BATCH_IO only */
> +#define UBLK_BATCH_IO_UNUSED_TAG ((unsigned short)-1)
> +
> union ublk_io_buf {
> __u64 addr;
> struct ublk_auto_buf_reg auto_reg;
> @@ -616,6 +625,32 @@ static wait_queue_head_t ublk_idr_wq; /* wait until one idr is freed */
> static DEFINE_MUTEX(ublk_ctl_mutex);
>
>
> +static void ublk_batch_deinit_fetch_buf(const struct ublk_batch_io_data *data,
> + struct ublk_batch_fcmd *fcmd,
> + int res)
> +{
> + io_uring_cmd_done(fcmd->cmd, res, data->issue_flags);
> + fcmd->cmd = NULL;
> +}
> +
> +static int ublk_batch_fetch_post_cqe(struct ublk_batch_fcmd *fcmd,
> + struct io_br_sel *sel,
> + unsigned int issue_flags)
> +{
> + if (io_uring_mshot_cmd_post_cqe(fcmd->cmd, sel, issue_flags))
> + return -ENOBUFS;
> + return 0;
> +}
> +
> +static ssize_t ublk_batch_copy_io_tags(struct ublk_batch_fcmd *fcmd,
> + void __user *buf, const u16 *tag_buf,
> + unsigned int len)
> +{
> + if (copy_to_user(buf, tag_buf, len))
> + return -EFAULT;
> + return len;
> +}
> +
> #define UBLK_MAX_UBLKS UBLK_MINORS
>
> /*
> @@ -1378,6 +1413,160 @@ static void ublk_dispatch_req(struct ublk_queue *ubq,
> }
> }
>
> +static bool __ublk_batch_prep_dispatch(struct ublk_queue *ubq,
> + const struct ublk_batch_io_data *data,
> + unsigned short tag)
> +{
> + struct ublk_device *ub = data->ub;
> + struct ublk_io *io = &ubq->ios[tag];
> + struct request *req = blk_mq_tag_to_rq(ub->tag_set.tags[ubq->q_id], tag);
> + enum auto_buf_reg_res res = AUTO_BUF_REG_FALLBACK;
> + struct io_uring_cmd *cmd = data->cmd;
> +
> + if (!ublk_start_io(ubq, req, io))

This doesn't look correct for UBLK_F_NEED_GET_DATA. If that's not
supported in batch mode, then it should probably be disallowed when
creating a batch-mode ublk device. The ublk_need_get_data() check in
ublk_batch_commit_io_check() could also be dropped.

> + return false;
> +
> + if (ublk_support_auto_buf_reg(ubq) && ublk_rq_has_data(req))
> + res = __ublk_do_auto_buf_reg(ubq, req, io, cmd,
> + data->issue_flags);

__ublk_do_auto_buf_reg() reads io->buf.auto_reg. That seems racy
without holding the io spinlock.

> +
> + if (res == AUTO_BUF_REG_FAIL)
> + return false;

Could be moved into the if (ublk_support_auto_buf_reg(ubq) &&
ublk_rq_has_data(req)) statement since it won't be true otherwise?

> +
> + ublk_io_lock(io);
> + ublk_prep_auto_buf_reg_io(ubq, req, io, cmd, res);
> + ublk_io_unlock(io);
> +
> + return true;
> +}
> +
> +static bool ublk_batch_prep_dispatch(struct ublk_queue *ubq,
> + const struct ublk_batch_io_data *data,
> + unsigned short *tag_buf,
> + unsigned int len)
> +{
> + bool has_unused = false;
> + int i;

unsigned?

> +
> + for (i = 0; i < len; i += 1) {

i++?

> + unsigned short tag = tag_buf[i];
> +
> + if (!__ublk_batch_prep_dispatch(ubq, data, tag)) {
> + tag_buf[i] = UBLK_BATCH_IO_UNUSED_TAG;
> + has_unused = true;
> + }
> + }
> +
> + return has_unused;
> +}
> +
> +/*
> + * Filter out UBLK_BATCH_IO_UNUSED_TAG entries from tag_buf.
> + * Returns the new length after filtering.
> + */
> +static unsigned int ublk_filter_unused_tags(unsigned short *tag_buf,
> + unsigned int len)
> +{
> + unsigned int i, j;
> +
> + for (i = 0, j = 0; i < len; i++) {
> + if (tag_buf[i] != UBLK_BATCH_IO_UNUSED_TAG) {
> + if (i != j)
> + tag_buf[j] = tag_buf[i];
> + j++;
> + }
> + }
> +
> + return j;
> +}
> +
> +#define MAX_NR_TAG 128
> +static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> + const struct ublk_batch_io_data *data,
> + struct ublk_batch_fcmd *fcmd)
> +{
> + unsigned short tag_buf[MAX_NR_TAG];
> + struct io_br_sel sel;
> + size_t len = 0;
> + bool needs_filter;
> + int ret;
> +
> + sel = io_uring_cmd_buffer_select(fcmd->cmd, fcmd->buf_group, &len,
> + data->issue_flags);
> + if (sel.val < 0)
> + return sel.val;
> + if (!sel.addr)
> + return -ENOBUFS;
> +
> + /* single reader needn't lock and sizeof(kfifo element) is 2 bytes */
> + len = min(len, sizeof(tag_buf)) / 2;

sizeof(unsigned short) instead of 2?

> + len = kfifo_out(&ubq->evts_fifo, tag_buf, len);
> +
> + needs_filter = ublk_batch_prep_dispatch(ubq, data, tag_buf, len);
> + /* Filter out unused tags before posting to userspace */
> + if (unlikely(needs_filter)) {
> + int new_len = ublk_filter_unused_tags(tag_buf, len);
> +
> + if (!new_len)
> + return len;

Is the purpose of this return value just to make ublk_batch_dispatch()
retry __ublk_batch_dispatch()? Otherwise, it seems like a strange
value to return.

Also, shouldn't this path release the selected buffer to avoid leaking it?

> + len = new_len;
> + }
> +
> + sel.val = ublk_batch_copy_io_tags(fcmd, sel.addr, tag_buf, len * 2);

sizeof(unsigned short)?

> + ret = ublk_batch_fetch_post_cqe(fcmd, &sel, data->issue_flags);
> + if (unlikely(ret < 0)) {
> + int i, res;
> +
> + /*
> + * Undo prep state for all IOs since userspace never received them.
> + * This restores IOs to pre-prepared state so they can be cleanly
> + * re-prepared when tags are pulled from FIFO again.
> + */
> + for (i = 0; i < len; i++) {
> + struct ublk_io *io = &ubq->ios[tag_buf[i]];
> + int index = -1;
> +
> + ublk_io_lock(io);
> + if (io->flags & UBLK_IO_FLAG_AUTO_BUF_REG)
> + index = io->buf.auto_reg.index;

This is missing the io->buf_ctx_handle == io_uring_cmd_ctx_handle(cmd)
check from ublk_handle_auto_buf_reg().

> + io->flags &= ~(UBLK_IO_FLAG_OWNED_BY_SRV | UBLK_IO_FLAG_AUTO_BUF_REG);
> + io->flags |= UBLK_IO_FLAG_ACTIVE;
> + ublk_io_unlock(io);
> +
> + if (index != -1)
> + io_buffer_unregister_bvec(data->cmd, index,
> + data->issue_flags);
> + }
> +
> + res = kfifo_in_spinlocked_noirqsave(&ubq->evts_fifo,
> + tag_buf, len, &ubq->evts_lock);
> +
> + pr_warn("%s: copy tags or post CQE failure, move back "
> + "tags(%d %zu) ret %d\n", __func__, res, len,
> + ret);
> + }
> + return ret;
> +}
> +
> +static __maybe_unused int

The return value looks completely unused. Just return void instead?

Best,
Caleb

> +ublk_batch_dispatch(struct ublk_queue *ubq,
> + const struct ublk_batch_io_data *data,
> + struct ublk_batch_fcmd *fcmd)
> +{
> + int ret = 0;
> +
> + while (!ublk_io_evts_empty(ubq)) {
> + ret = __ublk_batch_dispatch(ubq, data, fcmd);
> + if (ret <= 0)
> + break;
> + }
> +
> + if (ret < 0)
> + ublk_batch_deinit_fetch_buf(data, fcmd, ret);
> +
> + return ret;
> +}
> +
> static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
> unsigned int issue_flags)
> {
> --
> 2.47.0
>