Re: [RFC PATCH 11/35] ceph: Use ceph_databuf in DIO

From: Viacheslav Dubeyko
Date: Mon Mar 17 2025 - 16:04:01 EST


On Thu, 2025-03-13 at 23:33 +0000, David Howells wrote:
> Stash the list of pages to be read into/written from during a ceph fs
> direct read/write in a ceph_databuf struct rather than using a bvec array.
> Eventually this will be replaced with just an iterator supplied by
> netfslib.
>
> Signed-off-by: David Howells <dhowells@xxxxxxxxxx>
> cc: Viacheslav Dubeyko <slava@xxxxxxxxxxx>
> cc: Alex Markuze <amarkuze@xxxxxxxxxx>
> cc: Ilya Dryomov <idryomov@xxxxxxxxx>
> cc: ceph-devel@xxxxxxxxxxxxxxx
> cc: linux-fsdevel@xxxxxxxxxxxxxxx
> ---
> fs/ceph/file.c | 110 +++++++++++++++++++++----------------------------
> 1 file changed, 47 insertions(+), 63 deletions(-)
>
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index 9de2960748b9..fb4024bc8274 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -82,11 +82,10 @@ static __le32 ceph_flags_sys2wire(struct ceph_mds_client *mdsc, u32 flags)
> */
> #define ITER_GET_BVECS_PAGES 64
>
> -static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
> - struct bio_vec *bvecs)
> +static int __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
> + struct ceph_databuf *dbuf)
> {
> size_t size = 0;
> - int bvec_idx = 0;
>
> if (maxsize > iov_iter_count(iter))
> maxsize = iov_iter_count(iter);
> @@ -98,22 +97,24 @@ static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
> int idx = 0;
>
> bytes = iov_iter_get_pages2(iter, pages, maxsize - size,
> - ITER_GET_BVECS_PAGES, &start);
> - if (bytes < 0)
> - return size ?: bytes;
> -
> - size += bytes;
> + ITER_GET_BVECS_PAGES, &start);
> + if (bytes < 0) {
> + if (size == 0)
> + return bytes;
> + break;

I am slightly confused by 'break;' here. Do we have a loop around?

> + }
>
> - for ( ; bytes; idx++, bvec_idx++) {
> + while (bytes) {
> int len = min_t(int, bytes, PAGE_SIZE - start);
>
> - bvec_set_page(&bvecs[bvec_idx], pages[idx], len, start);
> + ceph_databuf_append_page(dbuf, pages[idx++], start, len);
> bytes -= len;
> + size += len;
> start = 0;
> }
> }
>
> - return size;
> + return 0;

Do we really need to return zero here? It looks to me that we calculated the
size for returning here. Am I wrong?

> }
>
> /*
> @@ -124,52 +125,44 @@ static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
> * Attempt to get up to @maxsize bytes worth of pages from @iter.
> * Return the number of bytes in the created bio_vec array, or an error.
> */
> -static ssize_t iter_get_bvecs_alloc(struct iov_iter *iter, size_t maxsize,
> - struct bio_vec **bvecs, int *num_bvecs)
> +static struct ceph_databuf *iter_get_bvecs_alloc(struct iov_iter *iter,
> + size_t maxsize, bool write)
> {
> - struct bio_vec *bv;
> + struct ceph_databuf *dbuf;
> size_t orig_count = iov_iter_count(iter);
> - ssize_t bytes;
> - int npages;
> + int npages, ret;
>
> iov_iter_truncate(iter, maxsize);
> npages = iov_iter_npages(iter, INT_MAX);
> iov_iter_reexpand(iter, orig_count);
>
> - /*
> - * __iter_get_bvecs() may populate only part of the array -- zero it
> - * out.
> - */
> - bv = kvmalloc_array(npages, sizeof(*bv), GFP_KERNEL | __GFP_ZERO);
> - if (!bv)
> - return -ENOMEM;
> + if (write)
> + dbuf = ceph_databuf_req_alloc(npages, 0, GFP_KERNEL);

I am still feeling confused of allocated npages of zero size. :)

> + else
> + dbuf = ceph_databuf_reply_alloc(npages, 0, GFP_KERNEL);
> + if (!dbuf)
> + return ERR_PTR(-ENOMEM);
>
> - bytes = __iter_get_bvecs(iter, maxsize, bv);
> - if (bytes < 0) {
> + ret = __iter_get_bvecs(iter, maxsize, dbuf);
> + if (ret < 0) {
> /*
> * No pages were pinned -- just free the array.
> */
> - kvfree(bv);
> - return bytes;
> + ceph_databuf_release(dbuf);
> + return ERR_PTR(ret);
> }
>
> - *bvecs = bv;
> - *num_bvecs = npages;
> - return bytes;
> + return dbuf;
> }
>
> -static void put_bvecs(struct bio_vec *bvecs, int num_bvecs, bool should_dirty)
> +static void ceph_dirty_pages(struct ceph_databuf *dbuf)

Does it mean that we never used should_dirty argument with false value? Or the
main goal of this method is always making the pages dirty?

> {
> + struct bio_vec *bvec = dbuf->bvec;
> int i;
>
> - for (i = 0; i < num_bvecs; i++) {
> - if (bvecs[i].bv_page) {
> - if (should_dirty)
> - set_page_dirty_lock(bvecs[i].bv_page);
> - put_page(bvecs[i].bv_page);

So, which code will put_page() now?

Thanks,
Slava.

> - }
> - }
> - kvfree(bvecs);
> + for (i = 0; i < dbuf->nr_bvec; i++)
> + if (bvec[i].bv_page)
> + set_page_dirty_lock(bvec[i].bv_page);
> }
>
> /*
> @@ -1338,14 +1331,11 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
> struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
> struct ceph_osd_req_op *op = &req->r_ops[0];
> struct ceph_client_metric *metric = &ceph_sb_to_mdsc(inode->i_sb)->metric;
> - unsigned int len = osd_data->bvec_pos.iter.bi_size;
> + size_t len = osd_data->iter.count;
> bool sparse = (op->op == CEPH_OSD_OP_SPARSE_READ);
> struct ceph_client *cl = ceph_inode_to_client(inode);
>
> - BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS);
> - BUG_ON(!osd_data->num_bvecs);
> -
> - doutc(cl, "req %p inode %p %llx.%llx, rc %d bytes %u\n", req,
> + doutc(cl, "req %p inode %p %llx.%llx, rc %d bytes %zu\n", req,
> inode, ceph_vinop(inode), rc, len);
>
> if (rc == -EOLDSNAPC) {
> @@ -1367,7 +1357,6 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
> if (rc == -ENOENT)
> rc = 0;
> if (rc >= 0 && len > rc) {
> - struct iov_iter i;
> int zlen = len - rc;
>
> /*
> @@ -1384,10 +1373,8 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
> aio_req->total_len = rc + zlen;
> }
>
> - iov_iter_bvec(&i, ITER_DEST, osd_data->bvec_pos.bvecs,
> - osd_data->num_bvecs, len);
> - iov_iter_advance(&i, rc);
> - iov_iter_zero(zlen, &i);
> + iov_iter_advance(&osd_data->iter, rc);
> + iov_iter_zero(zlen, &osd_data->iter);
> }
> }
>
> @@ -1401,8 +1388,8 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
> req->r_end_latency, len, rc);
> }
>
> - put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs,
> - aio_req->should_dirty);
> + if (aio_req->should_dirty)
> + ceph_dirty_pages(osd_data->dbuf);
> ceph_osdc_put_request(req);
>
> if (rc < 0)
> @@ -1491,9 +1478,8 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
> struct ceph_client_metric *metric = &fsc->mdsc->metric;
> struct ceph_vino vino;
> struct ceph_osd_request *req;
> - struct bio_vec *bvecs;
> struct ceph_aio_request *aio_req = NULL;
> - int num_pages = 0;
> + struct ceph_databuf *dbuf = NULL;
> int flags;
> int ret = 0;
> struct timespec64 mtime = current_time(inode);
> @@ -1529,8 +1515,8 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
>
> while (iov_iter_count(iter) > 0) {
> u64 size = iov_iter_count(iter);
> - ssize_t len;
> struct ceph_osd_req_op *op;
> + size_t len;
> int readop = sparse ? CEPH_OSD_OP_SPARSE_READ : CEPH_OSD_OP_READ;
> int extent_cnt;
>
> @@ -1563,16 +1549,17 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
> }
> }
>
> - len = iter_get_bvecs_alloc(iter, size, &bvecs, &num_pages);
> - if (len < 0) {
> + dbuf = iter_get_bvecs_alloc(iter, size, write);
> + if (IS_ERR(dbuf)) {
> ceph_osdc_put_request(req);
> - ret = len;
> + ret = PTR_ERR(dbuf);
> break;
> }
> + len = ceph_databuf_len(dbuf);
> if (len != size)
> osd_req_op_extent_update(req, 0, len);
>
> - osd_req_op_extent_osd_data_bvecs(req, 0, bvecs, num_pages, len);
> + osd_req_op_extent_osd_databuf(req, 0, dbuf);
>
> /*
> * To simplify error handling, allow AIO when IO within i_size
> @@ -1637,20 +1624,17 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
> ret = 0;
>
> if (ret >= 0 && ret < len && pos + ret < size) {
> - struct iov_iter i;
> int zlen = min_t(size_t, len - ret,
> size - pos - ret);
>
> - iov_iter_bvec(&i, ITER_DEST, bvecs, num_pages, len);
> - iov_iter_advance(&i, ret);
> - iov_iter_zero(zlen, &i);
> + iov_iter_advance(&dbuf->iter, ret);
> + iov_iter_zero(zlen, &dbuf->iter);
> ret += zlen;
> }
> if (ret >= 0)
> len = ret;
> }
>
> - put_bvecs(bvecs, num_pages, should_dirty);
> ceph_osdc_put_request(req);
> if (ret < 0)
> break;
>
>