Re: [patch 21/25] debugobjects: Implement batch processing

From: Leizhen (ThunderTown)
Date: Thu Oct 10 2024 - 05:40:10 EST




On 2024/10/8 0:50, Thomas Gleixner wrote:
> Adding and removing single objects in a loop is bad in terms of lock
> contention and cache line accesses.
>
> To implement batching, record the last object in a batch in the object
> itself. This is trivialy possible as hlists are strictly stacks. At a batch
> boundary, when the first object is added to the list the object stores a
> pointer to itself in debug_obj::batch_last. When the next object is added
> to the list then the batch_last pointer is retrieved from the first object
> in the list and stored in the to be added one.
>
> That means for batch processing the first object always has a pointer to
> the last object in a batch, which allows to move batches in a cache line
> efficient way and reduces the lock held time.

It seems that adding a helper function hlist_cut_position() can make the code
look more concise and clear. But there's a lot of patches now. We can do it
later, and maybe I can do it then.

Similar to the current list_cut_position():
/**
* list_cut_position - cut a list into two
* @list: a new list to add all removed entries
* @head: a list with entries
* @entry: an entry within head


Reviewed-by: Zhen Lei <thunder.leizhen@xxxxxxxxxx>

>
> Signed-off-by: Thomas Gleixner <tglx@xxxxxxxxxxxxx>
> ---
> lib/debugobjects.c | 61 +++++++++++++++++++++++++++++++++++++++--------------
> 1 file changed, 46 insertions(+), 15 deletions(-)
>
> --- a/lib/debugobjects.c
> +++ b/lib/debugobjects.c
> @@ -149,18 +149,31 @@ static __always_inline bool pool_must_re
>
> static bool pool_move_batch(struct obj_pool *dst, struct obj_pool *src)
> {
> - if (dst->cnt + ODEBUG_BATCH_SIZE > dst->max_cnt || !src->cnt)
> + struct hlist_node *last, *next_batch, *first_batch;
> + struct debug_obj *obj;
> +
> + if (dst->cnt >= dst->max_cnt || !src->cnt)
> return false;
>
> - for (int i = 0; i < ODEBUG_BATCH_SIZE && src->cnt; i++) {
> - struct hlist_node *node = src->objects.first;
> + first_batch = src->objects.first;
> + obj = hlist_entry(first_batch, typeof(*obj), node);
> + last = obj->batch_last;
> + next_batch = last->next;
>
> - WRITE_ONCE(src->cnt, src->cnt - 1);
> - WRITE_ONCE(dst->cnt, dst->cnt + 1);
> + /* Move the next batch to the front of the source pool */
> + src->objects.first = next_batch;
> + if (next_batch)
> + next_batch->pprev = &src->objects.first;
> +
> + /* Add the extracted batch to the destination pool */
> + last->next = dst->objects.first;
> + if (last->next)
> + last->next->pprev = &last->next;
> + first_batch->pprev = &dst->objects.first;
> + dst->objects.first = first_batch;
>
> - hlist_del(node);
> - hlist_add_head(node, &dst->objects);
> - }
> + WRITE_ONCE(src->cnt, src->cnt - ODEBUG_BATCH_SIZE);
> + WRITE_ONCE(dst->cnt, dst->cnt + ODEBUG_BATCH_SIZE);
> return true;
> }
>
> @@ -182,16 +195,27 @@ static bool pool_push_batch(struct obj_p
>
> static bool pool_pop_batch(struct hlist_head *head, struct obj_pool *src)
> {
> + struct hlist_node *last, *next;
> + struct debug_obj *obj;
> +
> if (!src->cnt)
> return false;
>
> - for (int i = 0; src->cnt && i < ODEBUG_BATCH_SIZE; i++) {
> - struct hlist_node *node = src->objects.first;
> + /* Move the complete list to the head */
> + hlist_move_list(&src->objects, head);
>
> - WRITE_ONCE(src->cnt, src->cnt - 1);
> - hlist_del(node);
> - hlist_add_head(node, head);
> - }
> + obj = hlist_entry(head->first, typeof(*obj), node);
> + last = obj->batch_last;
> + next = last->next;
> + /* Disconnect the batch from the list */
> + last->next = NULL;
> +
> + /* Move the node after last back to the source pool. */
> + src->objects.first = next;
> + if (next)
> + next->pprev = &src->objects.first;
> +
> + WRITE_ONCE(src->cnt, src->cnt - ODEBUG_BATCH_SIZE);
> return true;
> }
>
> @@ -226,7 +250,7 @@ static struct debug_obj *pcpu_alloc(void
> if (!pool_move_batch(pcp, &pool_global))
> return NULL;
> }
> - obj_pool_used += pcp->cnt;
> + obj_pool_used += ODEBUG_BATCH_SIZE;
>
> if (obj_pool_used > obj_pool_max_used)
> obj_pool_max_used = obj_pool_used;
> @@ -239,9 +263,16 @@ static struct debug_obj *pcpu_alloc(void
> static void pcpu_free(struct debug_obj *obj)
> {
> struct obj_pool *pcp = this_cpu_ptr(&pool_pcpu);
> + struct debug_obj *first;
>
> lockdep_assert_irqs_disabled();
>
> + if (!(pcp->cnt % ODEBUG_BATCH_SIZE)) {
> + obj->batch_last = &obj->node;
> + } else {
> + first = hlist_entry(pcp->objects.first, typeof(*first), node);
> + obj->batch_last = first->batch_last;
> + }
> hlist_add_head(&obj->node, &pcp->objects);
> pcp->cnt++;
>
>
> .
>

--
Regards,
Zhen Lei