Re: [PATCH v3] lib/dlock-list: Scale dlock_lists_empty()

From: Waiman Long
Date: Fri Nov 03 2017 - 12:34:07 EST


On 11/03/2017 10:22 AM, Davidlohr Bueso wrote:
> Instead of the current O(N) implementation, at the cost
> of adding an atomic counter, we can convert the call to
> an atomic_read(). The counter only serves for accounting
> empty to non-empty transitions, and vice versa; therefore
> only modified twice for each of the lists during the
> lifetime of the dlock (while used).
>
> In addition, to be able to unaccount a list_del(), we
> add a dlist pointer to each head, thus minimizing the
> overall memory footprint.
>
> Signed-off-by: Davidlohr Bueso <dbueso@xxxxxxx>
> ---
>
> Changes from v2: Removed some bogus lines and an overoptimistic
> changelog.
>
> include/linux/dlock-list.h | 2 ++
> lib/dlock-list.c | 52
> +++++++++++++++++++++++++++++++++++++---------
> 2 files changed, 44 insertions(+), 10 deletions(-)
>
> diff --git a/include/linux/dlock-list.h b/include/linux/dlock-list.h
> index c00c7f92ada4..d176a2d00cd1 100644
> --- a/include/linux/dlock-list.h
> +++ b/include/linux/dlock-list.h
> @@ -32,10 +32,12 @@
> struct dlock_list_head {
> struct list_head list;
> spinlock_t lock;
> + struct dlock_list_heads *dlist;
> } ____cacheline_aligned_in_smp;
>
> struct dlock_list_heads {
> struct dlock_list_head *heads;
> + atomic_t waiters;

We are tracking number of non-empty lists here. So I think we need a
better name and maybe some documentation of what it is.

> };
>
> /*
> diff --git a/lib/dlock-list.c b/lib/dlock-list.c
> index a4ddecc01b12..5814e42c5b81 100644
> --- a/lib/dlock-list.c
> +++ b/lib/dlock-list.c
> @@ -122,8 +122,11 @@ int __alloc_dlock_list_heads(struct
> dlock_list_heads *dlist,
>
> INIT_LIST_HEAD(&head->list);
> head->lock = __SPIN_LOCK_UNLOCKED(&head->lock);
> + head->dlist = dlist;
> lockdep_set_class(&head->lock, key);
> }
> +
> + atomic_set(&dlist->waiters, 0);
> return 0;
> }
> EXPORT_SYMBOL(__alloc_dlock_list_heads);
> @@ -139,29 +142,36 @@ void free_dlock_list_heads(struct
> dlock_list_heads *dlist)
> {
> kfree(dlist->heads);
> dlist->heads = NULL;
> + atomic_set(&dlist->waiters, 0);
> }
> EXPORT_SYMBOL(free_dlock_list_heads);
>
> /**
> * dlock_lists_empty - Check if all the dlock lists are empty
> * @dlist: Pointer to the dlock_list_heads structure
> - * Return: true if list is empty, false otherwise.
> *
> - * This can be a pretty expensive function call. If this function is
> required
> - * in a performance critical path, we may have to maintain a global
> count
> - * of the list entries in the global dlock_list_heads structure instead.
> + * Return: true if all dlock lists are empty, false otherwise.
> */
> bool dlock_lists_empty(struct dlock_list_heads *dlist)
> {
> - int idx;
> -
> /* Shouldn't be called before nr_dlock_lists is initialized */
> WARN_ON_ONCE(!nr_dlock_lists);
>
> - for (idx = 0; idx < nr_dlock_lists; idx++)
> - if (!list_empty(&dlist->heads[idx].list))
> - return false;
> - return true;
> + /*
> + * Serialize dlist->waiters such that a 0->1 transition is not
> missed
> + * by another thread checking if any of the dlock lists are used.
> + *
> + * CPU0 CPU1
> + * dlock_list_add() dlock_lists_empty()
> + * [S] atomic_inc(waiters);
> + * smp_mb__after_atomic();
> + * smp_mb__before_atomic();
> + * [L] atomic_read(waiters)
> + * list_add()
> + *
> + */
> + smp_mb__before_atomic();
> + return !atomic_read(&dlist->waiters);
> }
> EXPORT_SYMBOL(dlock_lists_empty);
>
> @@ -179,6 +189,16 @@ void dlock_lists_add(struct dlock_list_node *node,
> struct dlock_list_head *head = &dlist->heads[this_cpu_read(cpu2idx)];
>
> /*
> + * Bump the waiters counter _before_ taking the head->lock
> + * such that we don't miss a thread adding itself to a list
> + * while spinning for the lock.
> + */
> + if (list_empty_careful(&head->list)) {
> + atomic_inc(&dlist->waiters);
> + smp_mb__after_atomic();
> + }

That is racy. You will need to remember that you have opportunistically
incremented the count. After acquiring the spinlock, the code will have
to decrement it if the list is no longer empty. You will also have to
increment it after lock if the list is empty now, but not previously.

>
> +
> + /*
> * There is no need to disable preemption
> */
> spin_lock(&head->lock);
> @@ -212,6 +232,18 @@ void dlock_lists_del(struct dlock_list_node *node)
> spin_lock(&head->lock);
> if (likely(head == node->head)) {
> list_del_init(&node->list);
> + /*
> + * We still hold the head->lock, a normal list_empty()
> + * check will do.
> + */
> + if (list_empty(&head->list)) {
> + struct dlock_list_heads *dlist;
> + dlist = node->head->dlist;
> +
> + atomic_dec(&dlist->waiters);
> + smp_mb__after_atomic();

Do you need that while the code will do a spin_unlock soon?

Cheers,
Longman