Re: [RFC PATCH 23/35] rbd: Use ceph_databuf_enc_start/stop()
From: Viacheslav Dubeyko
Date: Tue Mar 18 2025 - 20:32:53 EST
On Thu, 2025-03-13 at 23:33 +0000, David Howells wrote:
> Make rbd use ceph_databuf_enc_start() and ceph_databuf_enc_stop() when
> filling out the request data. Also use ceph_encode_*() rather than
> ceph_databuf_encode_*() as the latter will do an iterator copy to deal with
> page crossing and misalignment (the latter being something that the CPU
> will handle on some arches).
>
> Signed-off-by: David Howells <dhowells@xxxxxxxxxx>
> cc: Viacheslav Dubeyko <slava@xxxxxxxxxxx>
> cc: Alex Markuze <amarkuze@xxxxxxxxxx>
> cc: Ilya Dryomov <idryomov@xxxxxxxxx>
> cc: ceph-devel@xxxxxxxxxxxxxxx
> cc: linux-fsdevel@xxxxxxxxxxxxxxx
> ---
> drivers/block/rbd.c | 64 ++++++++++++++++++++++-----------------------
> 1 file changed, 31 insertions(+), 33 deletions(-)
>
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index a2674077edea..956fc4a8f1da 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -1970,19 +1970,19 @@ static int rbd_cls_object_map_update(struct ceph_osd_request *req,
> int which, u64 objno, u8 new_state,
> const u8 *current_state)
> {
> - struct ceph_databuf *dbuf;
> - void *p, *start;
> + struct ceph_databuf *request;
> + void *p;
> int ret;
>
> ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
> if (ret)
> return ret;
>
> - dbuf = ceph_databuf_req_alloc(1, PAGE_SIZE, GFP_NOIO);
> - if (!dbuf)
> + request = ceph_databuf_req_alloc(1, 8 * 2 + 3 * 1, GFP_NOIO);
This 8 * 2 + 3 * 1 is too unclear for me. :) Could we introduce named constants
here?
> + if (!request)
> return -ENOMEM;
>
> - p = start = kmap_ceph_databuf_page(dbuf, 0);
> + p = ceph_databuf_enc_start(request);
> ceph_encode_64(&p, objno);
> ceph_encode_64(&p, objno + 1);
> ceph_encode_8(&p, new_state);
> @@ -1992,10 +1992,9 @@ static int rbd_cls_object_map_update(struct ceph_osd_request *req,
> } else {
> ceph_encode_8(&p, 0);
> }
> - kunmap_local(p);
> - ceph_databuf_added_data(dbuf, p - start);
> + ceph_databuf_enc_stop(request, p);
>
> - osd_req_op_cls_request_databuf(req, which, dbuf);
> + osd_req_op_cls_request_databuf(req, which, request);
> return 0;
> }
>
> @@ -2108,7 +2107,7 @@ static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
>
> static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
> {
> - struct ceph_databuf *dbuf;
> + struct ceph_databuf *request;
>
> /*
> * The response data for a STAT call consists of:
> @@ -2118,12 +2117,12 @@ static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
> * le32 tv_nsec;
> * } mtime;
> */
> - dbuf = ceph_databuf_reply_alloc(1, 8 + sizeof(struct ceph_timespec), GFP_NOIO);
> - if (!dbuf)
> + request = ceph_databuf_reply_alloc(1, 8 + sizeof(struct ceph_timespec), GFP_NOIO);
Ditto. Why do we have 8 + sizeof(struct ceph_timespec) here?
Thanks,
Slava.
> + if (!request)
> return -ENOMEM;
>
> osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
> - osd_req_op_raw_data_in_databuf(osd_req, which, dbuf);
> + osd_req_op_raw_data_in_databuf(osd_req, which, request);
> return 0;
> }
>
> @@ -2964,16 +2963,16 @@ static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
>
> static int setup_copyup_buf(struct rbd_obj_request *obj_req, u64 obj_overlap)
> {
> - struct ceph_databuf *dbuf;
> + struct ceph_databuf *request;
>
> rbd_assert(!obj_req->copyup_buf);
>
> - dbuf = ceph_databuf_req_alloc(calc_pages_for(0, obj_overlap),
> + request = ceph_databuf_req_alloc(calc_pages_for(0, obj_overlap),
> obj_overlap, GFP_NOIO);
> - if (!dbuf)
> + if (!request)
> return -ENOMEM;
>
> - obj_req->copyup_buf = dbuf;
> + obj_req->copyup_buf = request;
> return 0;
> }
>
> @@ -4580,10 +4579,9 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
> if (!request)
> return -ENOMEM;
>
> - p = kmap_ceph_databuf_page(request, 0);
> - memcpy(p, outbound, outbound_size);
> - kunmap_local(p);
> - ceph_databuf_added_data(request, outbound_size);
> + p = ceph_databuf_enc_start(request);
> + ceph_encode_copy(&p, outbound, outbound_size);
> + ceph_databuf_enc_stop(request, p);
> }
>
> reply = ceph_databuf_reply_alloc(1, inbound_size, GFP_KERNEL);
> @@ -4712,7 +4710,7 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
> static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
> struct ceph_object_id *oid,
> struct ceph_object_locator *oloc,
> - struct ceph_databuf *dbuf, int len)
> + struct ceph_databuf *request, int len)
> {
> struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
> struct ceph_osd_request *req;
> @@ -4727,7 +4725,7 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
> req->r_flags = CEPH_OSD_FLAG_READ;
>
> osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, len, 0, 0);
> - osd_req_op_extent_osd_databuf(req, 0, dbuf);
> + osd_req_op_extent_osd_databuf(req, 0, request);
>
> ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
> if (ret)
> @@ -4750,16 +4748,16 @@ static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev,
> bool first_time)
> {
> struct rbd_image_header_ondisk *ondisk;
> - struct ceph_databuf *dbuf = NULL;
> + struct ceph_databuf *request = NULL;
> u32 snap_count = 0;
> u64 names_size = 0;
> u32 want_count;
> int ret;
>
> - dbuf = ceph_databuf_req_alloc(1, sizeof(*ondisk), GFP_KERNEL);
> - if (!dbuf)
> + request = ceph_databuf_req_alloc(1, sizeof(*ondisk), GFP_KERNEL);
> + if (!request)
> return -ENOMEM;
> - ondisk = kmap_ceph_databuf_page(dbuf, 0);
> + ondisk = kmap_ceph_databuf_page(request, 0);
>
> /*
> * The complete header will include an array of its 64-bit
> @@ -4776,13 +4774,13 @@ static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev,
> size += names_size;
>
> ret = -ENOMEM;
> - if (size > dbuf->limit &&
> - ceph_databuf_reserve(dbuf, size - dbuf->limit,
> + if (size > request->limit &&
> + ceph_databuf_reserve(request, size - request->limit,
> GFP_KERNEL) < 0)
> goto out;
>
> ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
> - &rbd_dev->header_oloc, dbuf, size);
> + &rbd_dev->header_oloc, request, size);
> if (ret < 0)
> goto out;
> if ((size_t)ret < size) {
> @@ -4806,7 +4804,7 @@ static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev,
> ret = rbd_header_from_disk(header, ondisk, first_time);
> out:
> kunmap_local(ondisk);
> - ceph_databuf_release(dbuf);
> + ceph_databuf_release(request);
> return ret;
> }
>
> @@ -5625,10 +5623,10 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev,
> if (!reply)
> goto out_free;
>
> - p = kmap_ceph_databuf_page(request, 0);
> + p = ceph_databuf_enc_start(request);
> ceph_encode_64(&p, rbd_dev->spec->snap_id);
> - kunmap_local(p);
> - ceph_databuf_added_data(request, sizeof(__le64));
> + ceph_databuf_enc_stop(request, p);
> +
> ret = __get_parent_info(rbd_dev, request, reply, pii);
> if (ret > 0)
> ret = __get_parent_info_legacy(rbd_dev, request, reply, pii);
>
>