Re: [RFC PATCH 07/35] libceph: Change ceph_osdc_call()'s reply to a ceph_databuf
From: Viacheslav Dubeyko
Date: Mon Mar 17 2025 - 15:41:56 EST
On Thu, 2025-03-13 at 23:32 +0000, David Howells wrote:
> Change the type of ceph_osdc_call()'s reply to a ceph_databuf struct rather
> than a list of pages and access it with kmap_local rather than
> page_address().
>
> Signed-off-by: David Howells <dhowells@xxxxxxxxxx>
> cc: Viacheslav Dubeyko <slava@xxxxxxxxxxx>
> cc: Alex Markuze <amarkuze@xxxxxxxxxx>
> cc: Ilya Dryomov <idryomov@xxxxxxxxx>
> cc: ceph-devel@xxxxxxxxxxxxxxx
> cc: linux-fsdevel@xxxxxxxxxxxxxxx
> ---
> drivers/block/rbd.c | 135 ++++++++++++++++++--------------
> include/linux/ceph/osd_client.h | 2 +-
> net/ceph/cls_lock_client.c | 41 +++++-----
> net/ceph/osd_client.c | 16 ++--
> 4 files changed, 109 insertions(+), 85 deletions(-)
>
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index bb953634c7cb..073e80d2d966 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -1826,9 +1826,8 @@ static int __rbd_object_map_load(struct rbd_device *rbd_dev)
> {
> struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
> CEPH_DEFINE_OID_ONSTACK(oid);
> - struct page **pages;
> - void *p, *end;
> - size_t reply_len;
> + struct ceph_databuf *reply;
> + void *p, *q, *end;
If I understood correctly the logic, q represents a pointer on current position.
So, maybe, it makes sense to rename p into something like "begin"? In this case,
we will have begin pointer, end pointer and p could be used as the name of
pointer on current position.
> u64 num_objects;
> u64 object_map_bytes;
> u64 object_map_size;
> @@ -1842,48 +1841,57 @@ static int __rbd_object_map_load(struct rbd_device *rbd_dev)
> object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
> BITS_PER_BYTE);
> num_pages = calc_pages_for(0, object_map_bytes) + 1;
> - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
> - if (IS_ERR(pages))
> - return PTR_ERR(pages);
>
> - reply_len = num_pages * PAGE_SIZE;
> + reply = ceph_databuf_reply_alloc(num_pages, num_pages * PAGE_SIZE,
> + GFP_KERNEL);
> + if (!reply)
> + return -ENOMEM;
> +
> rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
> ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
> "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
> - NULL, 0, pages, &reply_len);
> + NULL, 0, reply);
> if (ret)
> goto out;
>
> - p = page_address(pages[0]);
> - end = p + min(reply_len, (size_t)PAGE_SIZE);
> - ret = decode_object_map_header(&p, end, &object_map_size);
> + p = kmap_ceph_databuf_page(reply, 0);
> + end = p + min(ceph_databuf_len(reply), (size_t)PAGE_SIZE);
> + q = p;
> + ret = decode_object_map_header(&q, end, &object_map_size);
> if (ret)
> - goto out;
> + goto out_unmap;
>
> if (object_map_size != num_objects) {
> rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
> object_map_size, num_objects);
> ret = -EINVAL;
> - goto out;
> + goto out_unmap;
> }
> + iov_iter_advance(&reply->iter, q - p);
>
> - if (offset_in_page(p) + object_map_bytes > reply_len) {
> + if (object_map_bytes > ceph_databuf_len(reply)) {
Does it mean that we had bug before here? Because it was offset_in_page(p) +
object_map_bytes before.
> ret = -EINVAL;
> - goto out;
> + goto out_unmap;
> }
>
> rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
> if (!rbd_dev->object_map) {
> ret = -ENOMEM;
> - goto out;
> + goto out_unmap;
> }
>
> rbd_dev->object_map_size = object_map_size;
Why do we have object_map_size and object_map_bytes at the same time? It could
be confusing for my taste. Maybe, we need to rename the object_map_size to
object_map_num_objects?
> - ceph_copy_from_page_vector(pages, rbd_dev->object_map,
> - offset_in_page(p), object_map_bytes);
>
> + ret = -EIO;
> + if (copy_from_iter(rbd_dev->object_map, object_map_bytes,
> + &reply->iter) != object_map_bytes)
> + goto out_unmap;
> +
> + ret = 0;
> +out_unmap:
> + kunmap_local(p);
> out:
> - ceph_release_page_vector(pages, num_pages);
> + ceph_databuf_release(reply);
> return ret;
> }
>
> @@ -1952,6 +1960,7 @@ static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
> {
> struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
> struct ceph_osd_data *osd_data;
> + struct ceph_databuf *dbuf;
> u64 objno;
> u8 state, new_state, current_state;
> bool has_current_state;
> @@ -1971,9 +1980,10 @@ static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
> */
> rbd_assert(osd_req->r_num_ops == 2);
> osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
> - rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
> + rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_DATABUF);
> + dbuf = osd_data->dbuf;
>
> - p = page_address(osd_data->pages[0]);
> + p = kmap_ceph_databuf_page(dbuf, 0);
> objno = ceph_decode_64(&p);
> rbd_assert(objno == obj_req->ex.oe_objno);
> rbd_assert(ceph_decode_64(&p) == objno + 1);
> @@ -1981,6 +1991,7 @@ static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
> has_current_state = ceph_decode_8(&p);
> if (has_current_state)
> current_state = ceph_decode_8(&p);
> + kunmap_local(p);
>
> spin_lock(&rbd_dev->object_map_lock);
> state = __rbd_object_map_get(rbd_dev, objno);
> @@ -2020,7 +2031,7 @@ static int rbd_cls_object_map_update(struct ceph_osd_request *req,
> int which, u64 objno, u8 new_state,
> const u8 *current_state)
> {
> - struct page **pages;
> + struct ceph_databuf *dbuf;
> void *p, *start;
> int ret;
>
> @@ -2028,11 +2039,11 @@ static int rbd_cls_object_map_update(struct ceph_osd_request *req,
> if (ret)
> return ret;
>
> - pages = ceph_alloc_page_vector(1, GFP_NOIO);
> - if (IS_ERR(pages))
> - return PTR_ERR(pages);
> + dbuf = ceph_databuf_req_alloc(1, PAGE_SIZE, GFP_NOIO);
> + if (!dbuf)
> + return -ENOMEM;
>
> - p = start = page_address(pages[0]);
> + p = start = kmap_ceph_databuf_page(dbuf, 0);
> ceph_encode_64(&p, objno);
> ceph_encode_64(&p, objno + 1);
> ceph_encode_8(&p, new_state);
> @@ -2042,9 +2053,10 @@ static int rbd_cls_object_map_update(struct ceph_osd_request *req,
> } else {
> ceph_encode_8(&p, 0);
> }
> + kunmap_local(p);
> + ceph_databuf_added_data(dbuf, p - start);
>
> - osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
> - false, true);
> + osd_req_op_cls_request_databuf(req, which, dbuf);
> return 0;
> }
>
> @@ -4673,8 +4685,8 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
> size_t inbound_size)
> {
> struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
> + struct ceph_databuf *reply;
> struct page *req_page = NULL;
> - struct page *reply_page;
> int ret;
>
> /*
> @@ -4695,8 +4707,8 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
> memcpy(page_address(req_page), outbound, outbound_size);
> }
>
> - reply_page = alloc_page(GFP_KERNEL);
> - if (!reply_page) {
> + reply = ceph_databuf_reply_alloc(1, inbound_size, GFP_KERNEL);
Interesting... We allocated memory page before. Now we allocate the memory of
inbound size. Potentially, it could be any size of starting from zero bytes and
including several memory pages. Could we have an issue here?
Thanks,
Slava.
> + if (!reply) {
> if (req_page)
> __free_page(req_page);
> return -ENOMEM;
> @@ -4704,15 +4716,16 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
>
> ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
> CEPH_OSD_FLAG_READ, req_page, outbound_size,
> - &reply_page, &inbound_size);
> + reply);
> if (!ret) {
> - memcpy(inbound, page_address(reply_page), inbound_size);
> - ret = inbound_size;
> + ret = ceph_databuf_len(reply);
> + if (copy_from_iter(inbound, ret, &reply->iter) != ret)
> + ret = -EIO;
> }
>
> if (req_page)
> __free_page(req_page);
> - __free_page(reply_page);
> + ceph_databuf_release(reply);
> return ret;
> }
>
> @@ -5633,7 +5646,7 @@ static int decode_parent_image_spec(void **p, void *end,
>
> static int __get_parent_info(struct rbd_device *rbd_dev,
> struct page *req_page,
> - struct page *reply_page,
> + struct ceph_databuf *reply,
> struct parent_image_info *pii)
> {
> struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
> @@ -5643,27 +5656,31 @@ static int __get_parent_info(struct rbd_device *rbd_dev,
>
> ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
> "rbd", "parent_get", CEPH_OSD_FLAG_READ,
> - req_page, sizeof(u64), &reply_page, &reply_len);
> + req_page, sizeof(u64), reply);
> if (ret)
> return ret == -EOPNOTSUPP ? 1 : ret;
>
> - p = page_address(reply_page);
> + p = kmap_ceph_databuf_page(reply, 0);
> end = p + reply_len;
> ret = decode_parent_image_spec(&p, end, pii);
> + kunmap_local(p);
> if (ret)
> return ret;
>
> + ceph_databuf_reset_reply(reply);
> +
> ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
> "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
> - req_page, sizeof(u64), &reply_page, &reply_len);
> + req_page, sizeof(u64), reply);
> if (ret)
> return ret;
>
> - p = page_address(reply_page);
> + p = kmap_ceph_databuf_page(reply, 0);
> end = p + reply_len;
> ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
> if (pii->has_overlap)
> ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
> + kunmap_local(p);
>
> dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
> __func__, pii->pool_id, pii->pool_ns, pii->image_id, pii->snap_id,
> @@ -5679,25 +5696,25 @@ static int __get_parent_info(struct rbd_device *rbd_dev,
> */
> static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
> struct page *req_page,
> - struct page *reply_page,
> + struct ceph_databuf *reply,
> struct parent_image_info *pii)
> {
> struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
> - size_t reply_len = PAGE_SIZE;
> void *p, *end;
> int ret;
>
> ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
> "rbd", "get_parent", CEPH_OSD_FLAG_READ,
> - req_page, sizeof(u64), &reply_page, &reply_len);
> + req_page, sizeof(u64), reply);
> if (ret)
> return ret;
>
> - p = page_address(reply_page);
> - end = p + reply_len;
> + p = kmap_ceph_databuf_page(reply, 0);
> + end = p + ceph_databuf_len(reply);
> ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
> pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
> if (IS_ERR(pii->image_id)) {
> + kunmap_local(p);
> ret = PTR_ERR(pii->image_id);
> pii->image_id = NULL;
> return ret;
> @@ -5705,6 +5722,7 @@ static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
> ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
> pii->has_overlap = true;
> ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
> + kunmap_local(p);
>
> dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
> __func__, pii->pool_id, pii->pool_ns, pii->image_id, pii->snap_id,
> @@ -5718,29 +5736,30 @@ static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
> static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev,
> struct parent_image_info *pii)
> {
> - struct page *req_page, *reply_page;
> + struct ceph_databuf *reply;
> + struct page *req_page;
> void *p;
> - int ret;
> + int ret = -ENOMEM;
>
> req_page = alloc_page(GFP_KERNEL);
> if (!req_page)
> - return -ENOMEM;
> + goto out;
>
> - reply_page = alloc_page(GFP_KERNEL);
> - if (!reply_page) {
> - __free_page(req_page);
> - return -ENOMEM;
> - }
> + reply = ceph_databuf_reply_alloc(1, PAGE_SIZE, GFP_KERNEL);
> + if (!reply)
> + goto out_free;
>
> - p = page_address(req_page);
> + p = kmap_local_page(req_page);
> ceph_encode_64(&p, rbd_dev->spec->snap_id);
> - ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
> + kunmap_local(p);
> + ret = __get_parent_info(rbd_dev, req_page, reply, pii);
> if (ret > 0)
> - ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
> - pii);
> + ret = __get_parent_info_legacy(rbd_dev, req_page, reply, pii);
>
> + ceph_databuf_release(reply);
> +out_free:
> __free_page(req_page);
> - __free_page(reply_page);
> +out:
> return ret;
> }
>
> diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
> index 172ee515a0f3..57b8aff53f28 100644
> --- a/include/linux/ceph/osd_client.h
> +++ b/include/linux/ceph/osd_client.h
> @@ -610,7 +610,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
> const char *class, const char *method,
> unsigned int flags,
> struct page *req_page, size_t req_len,
> - struct page **resp_pages, size_t *resp_len);
> + struct ceph_databuf *response);
>
> /* watch/notify */
> struct ceph_osd_linger_request *
> diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c
> index 66136a4c1ce7..37bb8708e8bb 100644
> --- a/net/ceph/cls_lock_client.c
> +++ b/net/ceph/cls_lock_client.c
> @@ -74,7 +74,7 @@ int ceph_cls_lock(struct ceph_osd_client *osdc,
> __func__, lock_name, type, cookie, tag, desc, flags);
> ret = ceph_osdc_call(osdc, oid, oloc, "lock", "lock",
> CEPH_OSD_FLAG_WRITE, lock_op_page,
> - lock_op_buf_size, NULL, NULL);
> + lock_op_buf_size, NULL);
>
> dout("%s: status %d\n", __func__, ret);
> __free_page(lock_op_page);
> @@ -124,7 +124,7 @@ int ceph_cls_unlock(struct ceph_osd_client *osdc,
> dout("%s lock_name %s cookie %s\n", __func__, lock_name, cookie);
> ret = ceph_osdc_call(osdc, oid, oloc, "lock", "unlock",
> CEPH_OSD_FLAG_WRITE, unlock_op_page,
> - unlock_op_buf_size, NULL, NULL);
> + unlock_op_buf_size, NULL);
>
> dout("%s: status %d\n", __func__, ret);
> __free_page(unlock_op_page);
> @@ -179,7 +179,7 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
> cookie, ENTITY_NAME(*locker));
> ret = ceph_osdc_call(osdc, oid, oloc, "lock", "break_lock",
> CEPH_OSD_FLAG_WRITE, break_op_page,
> - break_op_buf_size, NULL, NULL);
> + break_op_buf_size, NULL);
>
> dout("%s: status %d\n", __func__, ret);
> __free_page(break_op_page);
> @@ -230,7 +230,7 @@ int ceph_cls_set_cookie(struct ceph_osd_client *osdc,
> __func__, lock_name, type, old_cookie, tag, new_cookie);
> ret = ceph_osdc_call(osdc, oid, oloc, "lock", "set_cookie",
> CEPH_OSD_FLAG_WRITE, cookie_op_page,
> - cookie_op_buf_size, NULL, NULL);
> + cookie_op_buf_size, NULL);
>
> dout("%s: status %d\n", __func__, ret);
> __free_page(cookie_op_page);
> @@ -337,10 +337,10 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc,
> char *lock_name, u8 *type, char **tag,
> struct ceph_locker **lockers, u32 *num_lockers)
> {
> + struct ceph_databuf *reply;
> int get_info_op_buf_size;
> int name_len = strlen(lock_name);
> - struct page *get_info_op_page, *reply_page;
> - size_t reply_len = PAGE_SIZE;
> + struct page *get_info_op_page;
> void *p, *end;
> int ret;
>
> @@ -353,8 +353,8 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc,
> if (!get_info_op_page)
> return -ENOMEM;
>
> - reply_page = alloc_page(GFP_NOIO);
> - if (!reply_page) {
> + reply = ceph_databuf_reply_alloc(1, PAGE_SIZE, GFP_NOIO);
> + if (!reply) {
> __free_page(get_info_op_page);
> return -ENOMEM;
> }
> @@ -370,18 +370,19 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc,
> dout("%s lock_name %s\n", __func__, lock_name);
> ret = ceph_osdc_call(osdc, oid, oloc, "lock", "get_info",
> CEPH_OSD_FLAG_READ, get_info_op_page,
> - get_info_op_buf_size, &reply_page, &reply_len);
> + get_info_op_buf_size, reply);
>
> dout("%s: status %d\n", __func__, ret);
> if (ret >= 0) {
> - p = page_address(reply_page);
> - end = p + reply_len;
> + p = kmap_ceph_databuf_page(reply, 0);
> + end = p + ceph_databuf_len(reply);
>
> ret = decode_lockers(&p, end, type, tag, lockers, num_lockers);
> + kunmap_local(p);
> }
>
> __free_page(get_info_op_page);
> - __free_page(reply_page);
> + ceph_databuf_release(reply);
> return ret;
> }
> EXPORT_SYMBOL(ceph_cls_lock_info);
> @@ -389,11 +390,11 @@ EXPORT_SYMBOL(ceph_cls_lock_info);
> int ceph_cls_assert_locked(struct ceph_osd_request *req, int which,
> char *lock_name, u8 type, char *cookie, char *tag)
> {
> + struct ceph_databuf *dbuf;
> int assert_op_buf_size;
> int name_len = strlen(lock_name);
> int cookie_len = strlen(cookie);
> int tag_len = strlen(tag);
> - struct page **pages;
> void *p, *end;
> int ret;
>
> @@ -408,11 +409,11 @@ int ceph_cls_assert_locked(struct ceph_osd_request *req, int which,
> if (ret)
> return ret;
>
> - pages = ceph_alloc_page_vector(1, GFP_NOIO);
> - if (IS_ERR(pages))
> - return PTR_ERR(pages);
> + dbuf = ceph_databuf_req_alloc(1, PAGE_SIZE, GFP_NOIO);
> + if (!dbuf)
> + return -ENOMEM;
>
> - p = page_address(pages[0]);
> + p = kmap_ceph_databuf_page(dbuf, 0);
> end = p + assert_op_buf_size;
>
> /* encode cls_lock_assert_op struct */
> @@ -422,10 +423,12 @@ int ceph_cls_assert_locked(struct ceph_osd_request *req, int which,
> ceph_encode_8(&p, type);
> ceph_encode_string(&p, end, cookie, cookie_len);
> ceph_encode_string(&p, end, tag, tag_len);
> + kunmap(p);
> WARN_ON(p != end);
> + ceph_databuf_added_data(dbuf, assert_op_buf_size);
>
> - osd_req_op_cls_request_data_pages(req, which, pages, assert_op_buf_size,
> - 0, false, true);
> + osd_req_op_cls_request_databuf(req, which, dbuf);
> return 0;
> }
> EXPORT_SYMBOL(ceph_cls_assert_locked);
> +
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index 720d8a605fc4..b6cf875d3de4 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -5195,7 +5195,10 @@ EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
> * Execute an OSD class method on an object.
> *
> * @flags: CEPH_OSD_FLAG_*
> - * @resp_len: in/out param for reply length
> + * @response: Pointer to the storage descriptor for the reply or NULL.
> + *
> + * The size of the response buffer is set by the caller in @response->limit and
> + * the size of the response obtained is set in @response->iter.
> */
> int ceph_osdc_call(struct ceph_osd_client *osdc,
> struct ceph_object_id *oid,
> @@ -5203,7 +5206,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
> const char *class, const char *method,
> unsigned int flags,
> struct page *req_page, size_t req_len,
> - struct page **resp_pages, size_t *resp_len)
> + struct ceph_databuf *response)
> {
> struct ceph_osd_request *req;
> int ret;
> @@ -5226,9 +5229,8 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
> if (req_page)
> osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
> 0, false, false);
> - if (resp_pages)
> - osd_req_op_cls_response_data_pages(req, 0, resp_pages,
> - *resp_len, 0, false, false);
> + if (response)
> + osd_req_op_cls_response_databuf(req, 0, response);
>
> ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
> if (ret)
> @@ -5238,8 +5240,8 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
> ret = ceph_osdc_wait_request(osdc, req);
> if (ret >= 0) {
> ret = req->r_ops[0].rval;
> - if (resp_pages)
> - *resp_len = req->r_ops[0].outdata_len;
> + if (response)
> + ceph_databuf_reply_ready(response, req->r_ops[0].outdata_len);
> }
>
> out_put_req:
>
>