Re: [PATCH v2 4/6] debugobjects: Don't start fill if there are remaining nodes locally

From: Thomas Gleixner
Date: Mon Sep 09 2024 - 08:10:55 EST


On Mon, Sep 09 2024 at 11:22, Thomas Gleixner wrote:
> On Thu, Sep 05 2024 at 11:45, Leizhen wrote:
>> + obj = kmem_cache_zalloc(obj_cache, __GFP_HIGH | GFP_NOWAIT);
>> + if (!obj)
>> + return NULL;
>
> No. That fails on RT. See debug_object_fill_pool().

Some more thoughts on this. The goal is to reduce contention on pool
lock. At the same time, this needs to ensure that the opportunistic fill
mechanism actually works reliably.

debug_object_fill_pool() is invoked from

- debug_object_init()
- debug_object_assert_init()
- debug_object_activate()

debug_object_init() is usually invoked from preemptible context. It will
most of the time consume a tracking object from the per CPU or the
global pool because re-initialization of a tracked object is rather
rare.

debug_object_assert_init() and debug_object_activate() only consume a
tracking object, when the to be tracked object is statically
initialized or the call site failed to initialize the object. Both can
be called from arbitrary contexts even under PREEMPT_RT, where
preemptible context is a prerequisite for pool refill via allocations.

And of course any CPU which sits in fill_pool() can be preempted if the
calling context is preemptible. And no, we can't disable preemption
accross the whole thing due to RT.

So something like the uncompiled below should reduce lock contention
significantly with a reasonable safety net.

Thanks,

tglx
---
--- a/lib/debugobjects.c
+++ b/lib/debugobjects.c
@@ -125,14 +125,10 @@ static const char *obj_states[ODEBUG_STA
[ODEBUG_STATE_NOTAVAILABLE] = "not available",
};

-static void fill_pool(void)
+static void fill_pool_from_freelist(void)
{
- gfp_t gfp = __GFP_HIGH | __GFP_NOWARN;
+ static unsigned long state;
struct debug_obj *obj;
- unsigned long flags;
-
- if (likely(READ_ONCE(obj_pool_free) >= debug_objects_pool_min_level))
- return;

/*
* Reuse objs from the global obj_to_free list; they will be
@@ -141,25 +137,57 @@ static void fill_pool(void)
* obj_nr_tofree is checked locklessly; the READ_ONCE() pairs with
* the WRITE_ONCE() in pool_lock critical sections.
*/
- if (READ_ONCE(obj_nr_tofree)) {
- raw_spin_lock_irqsave(&pool_lock, flags);
- /*
- * Recheck with the lock held as the worker thread might have
- * won the race and freed the global free list already.
- */
- while (obj_nr_tofree && (obj_pool_free < debug_objects_pool_min_level)) {
- obj = hlist_entry(obj_to_free.first, typeof(*obj), node);
- hlist_del(&obj->node);
- WRITE_ONCE(obj_nr_tofree, obj_nr_tofree - 1);
- hlist_add_head(&obj->node, &obj_pool);
- WRITE_ONCE(obj_pool_free, obj_pool_free + 1);
- }
- raw_spin_unlock_irqrestore(&pool_lock, flags);
+ if (!READ_ONCE(obj_nr_tofree))
+ return;
+
+ /*
+ * Prevent the context from being scheduled or interrupted after
+ * setting the state flag;
+ */
+ guard(irqsave)();
+
+ /*
+ * Avoid lock contention on &pool_lock and avoid making the cache
+ * line exclusive by testing the bit before attempting to set it.
+ */
+ if (test_bit(0, &state) || test_and_set_bit(0, &state))
+ return;
+
+ guard(raw_spinlock)(&pool_lock);
+ /*
+ * Recheck with the lock held as the worker thread might have
+ * won the race and freed the global free list already.
+ */
+ while (obj_nr_tofree && (obj_pool_free < debug_objects_pool_min_level)) {
+ obj = hlist_entry(obj_to_free.first, typeof(*obj), node);
+ hlist_del(&obj->node);
+ WRITE_ONCE(obj_nr_tofree, obj_nr_tofree - 1);
+ hlist_add_head(&obj->node, &obj_pool);
+ WRITE_ONCE(obj_pool_free, obj_pool_free + 1);
}
+ clear_bit(0, &state);
+}
+
+static void fill_pool(void)
+{
+ gfp_t gfp = __GFP_HIGH | __GFP_NOWARN;
+ static atomic_t cpus_allocating;

if (unlikely(!obj_cache))
return;

+ /*
+ * Avoid allocation and lock contention when
+ *
+ * - the CPU local pool has at least 2 objects left
+ * - another CPU is already in the allocation path
+ * - the global pool has not reached the critical level yet
+ */
+ if (this_cpu_read(percpu_obj_pool.obj_free) > 1 && atomic_read(&cpus_allocating) &&
+ READ_ONCE(obj_pool_free) > (debug_objects_pool_min_level / 2))
+ return;
+
+ atomic_inc(&cpus_allocating);
while (READ_ONCE(obj_pool_free) < debug_objects_pool_min_level) {
struct debug_obj *new[ODEBUG_BATCH_SIZE];
int cnt;
@@ -172,14 +200,14 @@ static void fill_pool(void)
if (!cnt)
return;

- raw_spin_lock_irqsave(&pool_lock, flags);
+ guard(raw_spinlock_irqsave)(&pool_lock);
while (cnt) {
hlist_add_head(&new[--cnt]->node, &obj_pool);
debug_objects_allocated++;
WRITE_ONCE(obj_pool_free, obj_pool_free + 1);
}
- raw_spin_unlock_irqrestore(&pool_lock, flags);
}
+ atomic_dec(&cpus_allocating);
}

/*
@@ -598,6 +626,15 @@ static struct debug_obj *lookup_object_o

static void debug_objects_fill_pool(void)
{
+ if (likely(READ_ONCE(obj_pool_free) >= debug_objects_pool_min_level))
+ return;
+
+ /* Try reusing objects from obj_to_free_list */
+ fill_pool_from_freelist();
+
+ if (likely(READ_ONCE(obj_pool_free) >= debug_objects_pool_min_level))
+ return;
+
/*
* On RT enabled kernels the pool refill must happen in preemptible
* context -- for !RT kernels we rely on the fact that spinlock_t and