Re: [PATCH v4] lib/dlock-list: Scale dlock_lists_empty()

From: Jan Kara
Date: Tue Nov 07 2017 - 06:59:33 EST


On Mon 06-11-17 10:47:08, Davidlohr Bueso wrote:
> Instead of the current O(N) implementation, at the cost
> of adding an atomic counter, we can convert the call to
> an atomic_read(). The counter only serves for accounting
> empty to non-empty transitions, and vice versa; therefore
> only modified twice for each of the lists during the
> lifetime of the dlock (while used).
>
> In addition, to be able to unaccount a list_del(), we
> add a dlist pointer to each head, thus minimizing the
> overall memory footprint.
>
> Signed-off-by: Davidlohr Bueso <dbueso@xxxxxxx>

Looks good to me. You can add:

Reviewed-by: Jan Kara <jack@xxxxxxx>

Honza Kara

> ---
> Changes from v3:
> - s/waiters/used_lists, more doc around the counter.
> - fixed racy scenario when the list empty/non-empty
> condition changes after taking the lock.
> - sprinkled unlikely() around all checks, these are
> only corner cases in the lifetime of the lock.
>
> include/linux/dlock-list.h | 8 ++++++
> lib/dlock-list.c | 67 +++++++++++++++++++++++++++++++++++++++-------
> 2 files changed, 65 insertions(+), 10 deletions(-)
>
> diff --git a/include/linux/dlock-list.h b/include/linux/dlock-list.h
> index c00c7f92ada4..e18690a9bba6 100644
> --- a/include/linux/dlock-list.h
> +++ b/include/linux/dlock-list.h
> @@ -32,10 +32,18 @@
> struct dlock_list_head {
> struct list_head list;
> spinlock_t lock;
> + struct dlock_list_heads *dlist;
> } ____cacheline_aligned_in_smp;
>
> +/*
> + * This is the main dlist data structure, with the array of heads
> + * and a counter that atomically tracks if any of the lists are
> + * being used. That is, empty to non-empty (and vice versa)
> + * head->list transitions.
> + */
> struct dlock_list_heads {
> struct dlock_list_head *heads;
> + atomic_t used_lists;
> };
>
> /*
> diff --git a/lib/dlock-list.c b/lib/dlock-list.c
> index a4ddecc01b12..a9c855d492b8 100644
> --- a/lib/dlock-list.c
> +++ b/lib/dlock-list.c
> @@ -122,8 +122,11 @@ int __alloc_dlock_list_heads(struct dlock_list_heads *dlist,
>
> INIT_LIST_HEAD(&head->list);
> head->lock = __SPIN_LOCK_UNLOCKED(&head->lock);
> + head->dlist = dlist;
> lockdep_set_class(&head->lock, key);
> }
> +
> + atomic_set(&dlist->used_lists, 0);
> return 0;
> }
> EXPORT_SYMBOL(__alloc_dlock_list_heads);
> @@ -139,29 +142,36 @@ void free_dlock_list_heads(struct dlock_list_heads *dlist)
> {
> kfree(dlist->heads);
> dlist->heads = NULL;
> + atomic_set(&dlist->used_lists, 0);
> }
> EXPORT_SYMBOL(free_dlock_list_heads);
>
> /**
> * dlock_lists_empty - Check if all the dlock lists are empty
> * @dlist: Pointer to the dlock_list_heads structure
> - * Return: true if list is empty, false otherwise.
> *
> - * This can be a pretty expensive function call. If this function is required
> - * in a performance critical path, we may have to maintain a global count
> - * of the list entries in the global dlock_list_heads structure instead.
> + * Return: true if all dlock lists are empty, false otherwise.
> */
> bool dlock_lists_empty(struct dlock_list_heads *dlist)
> {
> - int idx;
> -
> /* Shouldn't be called before nr_dlock_lists is initialized */
> WARN_ON_ONCE(!nr_dlock_lists);
>
> - for (idx = 0; idx < nr_dlock_lists; idx++)
> - if (!list_empty(&dlist->heads[idx].list))
> - return false;
> - return true;
> + /*
> + * Serialize dlist->used_lists such that a 0->1 transition is not
> + * missed by another thread checking if any of the dlock lists are
> + * used.
> + *
> + * CPU0 CPU1
> + * dlock_list_add() dlock_lists_empty()
> + * [S] atomic_inc(used_lists);
> + * smp_mb__after_atomic();
> + * smp_mb__before_atomic();
> + * [L] atomic_read(used_lists)
> + * list_add()
> + */
> + smp_mb__before_atomic();
> + return !atomic_read(&dlist->used_lists);
> }
> EXPORT_SYMBOL(dlock_lists_empty);
>
> @@ -177,11 +187,39 @@ void dlock_lists_add(struct dlock_list_node *node,
> struct dlock_list_heads *dlist)
> {
> struct dlock_list_head *head = &dlist->heads[this_cpu_read(cpu2idx)];
> + bool list_empty_before_lock = false;
> +
> + /*
> + * Optimistically bump the used_lists counter _before_ taking
> + * the head->lock such that we don't miss a thread adding itself
> + * to a list while spinning for the lock.
> + *
> + * Then, after taking the lock, recheck if the empty to non-empty
> + * transition changed and (un)account for ourselves, accordingly.
> + * Note that all these scenarios are corner cases, and not the
> + * common scenario, where the lists are actually populated most
> + * of the time.
> + */
> + if (unlikely(list_empty_careful(&head->list))) {
> + list_empty_before_lock = true;
> + atomic_inc(&dlist->used_lists);
> + smp_mb__after_atomic();
> + }
>
> /*
> * There is no need to disable preemption
> */
> spin_lock(&head->lock);
> +
> + if (unlikely(!list_empty_before_lock && list_empty(&head->list))) {
> + atomic_inc(&dlist->used_lists);
> + smp_mb__after_atomic();
> + }
> + if (unlikely(list_empty_before_lock && !list_empty(&head->list))) {
> + atomic_dec(&dlist->used_lists);
> + smp_mb__after_atomic();
> + }
> +
> node->head = head;
> list_add(&node->list, &head->list);
> spin_unlock(&head->lock);
> @@ -212,6 +250,15 @@ void dlock_lists_del(struct dlock_list_node *node)
> spin_lock(&head->lock);
> if (likely(head == node->head)) {
> list_del_init(&node->list);
> +
> + if (unlikely(list_empty(&head->list))) {
> + struct dlock_list_heads *dlist;
> + dlist = node->head->dlist;
> +
> + atomic_dec(&dlist->used_lists);
> + smp_mb__after_atomic();
> + }
> +
> node->head = NULL;
> retry = false;
> } else {
> --
> 2.13.6
>
--
Jan Kara <jack@xxxxxxxx>
SUSE Labs, CR