Re: [PATCH v8 1/6] lib/dlock-list: Distributed and lock-protected lists
From: Davidlohr Bueso
Date: Thu Nov 02 2017 - 13:05:40 EST
On Tue, 31 Oct 2017, Waiman Long wrote:
+/**
+ * dlock_lists_empty - Check if all the dlock lists are empty
+ * @dlist: Pointer to the dlock_list_heads structure
+ * Return: true if list is empty, false otherwise.
+ *
+ * This can be a pretty expensive function call. If this function is required
+ * in a performance critical path, we may have to maintain a global count
+ * of the list entries in the global dlock_list_heads structure instead.
+ */
I vote for doing this in the original version. How about the following?
+bool dlock_lists_empty(struct dlock_list_heads *dlist)
+{
+ int idx;
+
+ for (idx = 0; idx < nr_cpu_ids; idx++)
+ if (!list_empty(&dlist->heads[idx].list))
+ return false;
+ return true;
+}
+EXPORT_SYMBOL(dlock_lists_empty);
----------8<-----------------------------------------------
From: Davidlohr Bueso <dave@xxxxxxxxxxxx>
Subject: [PATCH] lib/dlock-list: Scale dlock_lists_empty()
Instead of the current O(N) implementation; at the cost
of adding an atomic counter. We also need to add a heads
pointer to the node structure such that we can unaccount
a thread doing list_del().
Signed-off-by: Davidlohr Bueso <dbueso@xxxxxxx>
---
include/linux/dlock-list.h | 2 ++
lib/dlock-list.c | 40 ++++++++++++++++++++++++++++------------
2 files changed, 30 insertions(+), 12 deletions(-)
diff --git a/include/linux/dlock-list.h b/include/linux/dlock-list.h
index c00c7f92ada4..dd73d5787885 100644
--- a/include/linux/dlock-list.h
+++ b/include/linux/dlock-list.h
@@ -36,6 +36,7 @@ struct dlock_list_head {
struct dlock_list_heads {
struct dlock_list_head *heads;
+ atomic_t waiters;
};
/*
@@ -44,6 +45,7 @@ struct dlock_list_heads {
struct dlock_list_node {
struct list_head list;
struct dlock_list_head *head;
+ struct dlock_list_heads *heads;
};
/*
diff --git a/lib/dlock-list.c b/lib/dlock-list.c
index a4ddecc01b12..bd11fc0da254 100644
--- a/lib/dlock-list.c
+++ b/lib/dlock-list.c
@@ -124,6 +124,8 @@ int __alloc_dlock_list_heads(struct dlock_list_heads *dlist,
head->lock = __SPIN_LOCK_UNLOCKED(&head->lock);
lockdep_set_class(&head->lock, key);
}
+
+ atomic_set(&dlist->waiters, 0);
return 0;
}
EXPORT_SYMBOL(__alloc_dlock_list_heads);
@@ -139,29 +141,23 @@ void free_dlock_list_heads(struct dlock_list_heads *dlist)
{
kfree(dlist->heads);
dlist->heads = NULL;
+ atomic_set(&dlist->waiters, 0);
}
EXPORT_SYMBOL(free_dlock_list_heads);
/**
* dlock_lists_empty - Check if all the dlock lists are empty
* @dlist: Pointer to the dlock_list_heads structure
- * Return: true if list is empty, false otherwise.
*
- * This can be a pretty expensive function call. If this function is required
- * in a performance critical path, we may have to maintain a global count
- * of the list entries in the global dlock_list_heads structure instead.
+ * Return: true if all dlock lists are empty, false otherwise.
*/
bool dlock_lists_empty(struct dlock_list_heads *dlist)
{
- int idx;
-
/* Shouldn't be called before nr_dlock_lists is initialized */
WARN_ON_ONCE(!nr_dlock_lists);
- for (idx = 0; idx < nr_dlock_lists; idx++)
- if (!list_empty(&dlist->heads[idx].list))
- return false;
- return true;
+ smp_mb__before_atomic();
+ return !atomic_read(&dlist->waiters);
}
EXPORT_SYMBOL(dlock_lists_empty);
@@ -179,10 +175,30 @@ void dlock_lists_add(struct dlock_list_node *node,
struct dlock_list_head *head = &dlist->heads[this_cpu_read(cpu2idx)];
/*
+ * Serialize dlist->waiters such that a 0->1 transition is not missed,
+ * by another thread checking if any of the dlock lists are used.
+ *
+ * CPU0 CPU1
+ * dlock_list_add() dlock_lists_empty()
+ * [S] atomic_inc(waiters);
+ * smp_mb__after_atomic();
+ * smp_mb__before_atomic();
+ * [L] atomic_read(waiters)
+ * list_add()
+ *
+ * Bump the waiters counter _before_ taking the head->lock such that we
+ * don't miss a thread adding itself to a list while spinning for the
+ * lock.
+ */
+ atomic_inc(&dlist->waiters);
+ smp_mb__after_atomic();
+
+ /*
* There is no need to disable preemption
*/
spin_lock(&head->lock);
node->head = head;
+ node->heads = dlist;
list_add(&node->list, &head->list);
spin_unlock(&head->lock);
}
@@ -199,8 +215,7 @@ EXPORT_SYMBOL(dlock_lists_add);
* a bug.
*/
void dlock_lists_del(struct dlock_list_node *node)
-{
- struct dlock_list_head *head;
+{ struct dlock_list_head *head;
bool retry;
do {
@@ -214,6 +229,7 @@ void dlock_lists_del(struct dlock_list_node *node)
list_del_init(&node->list);
node->head = NULL;
retry = false;
+ atomic_dec(&node->heads->waiters);
} else {
/*
* The lock has somehow changed. Retry again if it is
--
2.13.6