Re: [PATCH] rcu/kvfree: Add kvfree_rcu_barrier() API
From: Vlastimil Babka
Date: Thu Aug 01 2024 - 10:28:51 EST
On 8/1/24 13:10, Uladzislau Rezki (Sony) wrote:
> Add a kvfree_rcu_barrier() function. It waits until all
> in-flight pointers are freed over RCU machinery. It does
> not wait any GP completion and it is within its right to
> return immediately if there are no outstanding pointers.
>
> This function is useful when there is a need to guarantee
> that a memory is fully freed before destroying memory caches.
> For example, during unloading a kernel module.
>
> Signed-off-by: Uladzislau Rezki (Sony) <urezki@xxxxxxxxx>
Thanks a lot, Ulad! Locally it makde the kunit test stop failing and for now
I've pushed it as part of my series for bots to test:
https://git.kernel.org/pub/scm/linux/kernel/git/vbabka/linux.git/log/?h=slab-kfree_rcu-destroy-v2r0
I hope after RCU folks review I can carry this in my series and later the
slab git tree so we don't have inter-tree dependencies. Thanks!
Vlastimil
> ---
> include/linux/rcutiny.h | 5 ++
> include/linux/rcutree.h | 1 +
> kernel/rcu/tree.c | 103 ++++++++++++++++++++++++++++++++++++----
> 3 files changed, 101 insertions(+), 8 deletions(-)
>
> diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h
> index d9ac7b136aea..522123050ff8 100644
> --- a/include/linux/rcutiny.h
> +++ b/include/linux/rcutiny.h
> @@ -111,6 +111,11 @@ static inline void __kvfree_call_rcu(struct rcu_head *head, void *ptr)
> kvfree(ptr);
> }
>
> +static inline void kvfree_rcu_barrier(void)
> +{
> + rcu_barrier();
> +}
> +
> #ifdef CONFIG_KASAN_GENERIC
> void kvfree_call_rcu(struct rcu_head *head, void *ptr);
> #else
> diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h
> index 254244202ea9..58e7db80f3a8 100644
> --- a/include/linux/rcutree.h
> +++ b/include/linux/rcutree.h
> @@ -35,6 +35,7 @@ static inline void rcu_virt_note_context_switch(void)
>
> void synchronize_rcu_expedited(void);
> void kvfree_call_rcu(struct rcu_head *head, void *ptr);
> +void kvfree_rcu_barrier(void);
>
> void rcu_barrier(void);
> void rcu_momentary_dyntick_idle(void);
> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> index 28c7031711a3..1423013f9fe6 100644
> --- a/kernel/rcu/tree.c
> +++ b/kernel/rcu/tree.c
> @@ -3550,18 +3550,15 @@ kvfree_rcu_drain_ready(struct kfree_rcu_cpu *krcp)
> }
>
> /*
> - * This function is invoked after the KFREE_DRAIN_JIFFIES timeout.
> + * Return: %true if a work is queued, %false otherwise.
> */
> -static void kfree_rcu_monitor(struct work_struct *work)
> +static bool
> +kvfree_rcu_queue_batch(struct kfree_rcu_cpu *krcp)
> {
> - struct kfree_rcu_cpu *krcp = container_of(work,
> - struct kfree_rcu_cpu, monitor_work.work);
> unsigned long flags;
> + bool queued = false;
> int i, j;
>
> - // Drain ready for reclaim.
> - kvfree_rcu_drain_ready(krcp);
> -
> raw_spin_lock_irqsave(&krcp->lock, flags);
>
> // Attempt to start a new batch.
> @@ -3600,11 +3597,27 @@ static void kfree_rcu_monitor(struct work_struct *work)
> // be that the work is in the pending state when
> // channels have been detached following by each
> // other.
> - queue_rcu_work(system_wq, &krwp->rcu_work);
> + queued = queue_rcu_work(system_wq, &krwp->rcu_work);
> }
> }
>
> raw_spin_unlock_irqrestore(&krcp->lock, flags);
> + return queued;
> +}
> +
> +/*
> + * This function is invoked after the KFREE_DRAIN_JIFFIES timeout.
> + */
> +static void kfree_rcu_monitor(struct work_struct *work)
> +{
> + struct kfree_rcu_cpu *krcp = container_of(work,
> + struct kfree_rcu_cpu, monitor_work.work);
> +
> + // Drain ready for reclaim.
> + kvfree_rcu_drain_ready(krcp);
> +
> + // Queue a batch for a rest.
> + kvfree_rcu_queue_batch(krcp);
>
> // If there is nothing to detach, it means that our job is
> // successfully done here. In case of having at least one
> @@ -3825,6 +3838,80 @@ void kvfree_call_rcu(struct rcu_head *head, void *ptr)
> }
> EXPORT_SYMBOL_GPL(kvfree_call_rcu);
>
> +/**
> + * kvfree_rcu_barrier - Wait until all in-flight kvfree_rcu() complete.
> + *
> + * Note that a single argument of kvfree_rcu() call has a slow path that
> + * triggers synchronize_rcu() following by freeing a pointer. It is done
> + * before the return from the function. Therefore for any single-argument
> + * call that will result in a kfree() to a cache that is to be destroyed
> + * during module exit, it is developer's responsibility to ensure that all
> + * such calls have returned before the call to kmem_cache_destroy().
> + */
> +void kvfree_rcu_barrier(void)
> +{
> + struct kfree_rcu_cpu_work *krwp;
> + struct kfree_rcu_cpu *krcp;
> + bool queued;
> + int i, cpu;
> +
> + /*
> + * Firstly we detach objects and queue them over an RCU-batch
> + * for all CPUs. Finally queued works are flushed for each CPU.
> + *
> + * Please note. If there are outstanding batches for a particular
> + * CPU, those have to be finished first following by queuing a new.
> + */
> + for_each_possible_cpu(cpu) {
> + krcp = per_cpu_ptr(&krc, cpu);
> +
> + /*
> + * Check if this CPU has any objects which have been queued for a
> + * new GP completion. If not(means nothing to detach), we are done
> + * with it. If any batch is pending/running for this "krcp", below
> + * per-cpu flush_rcu_work() waits its completion(see last step).
> + */
> + if (!need_offload_krc(krcp))
> + continue;
> +
> + while (1) {
> + /*
> + * If we are not able to queue a new RCU work it means:
> + * - batches for this CPU are still in flight which should
> + * be flushed first and then repeat;
> + * - no objects to detach, because of concurrency.
> + */
> + queued = kvfree_rcu_queue_batch(krcp);
> +
> + /*
> + * Bail out, if there is no need to offload this "krcp"
> + * anymore. As noted earlier it can run concurrently.
> + */
> + if (queued || !need_offload_krc(krcp))
> + break;
> +
> + /* There are ongoing batches. */
> + for (i = 0; i < KFREE_N_BATCHES; i++) {
> + krwp = &(krcp->krw_arr[i]);
> + flush_rcu_work(&krwp->rcu_work);
> + }
> + }
> + }
> +
> + /*
> + * Now we guarantee that all objects are flushed.
> + */
> + for_each_possible_cpu(cpu) {
> + krcp = per_cpu_ptr(&krc, cpu);
> +
> + for (i = 0; i < KFREE_N_BATCHES; i++) {
> + krwp = &(krcp->krw_arr[i]);
> + flush_rcu_work(&krwp->rcu_work);
> + }
> + }
> +}
> +EXPORT_SYMBOL_GPL(kvfree_rcu_barrier);
> +
> static unsigned long
> kfree_rcu_shrink_count(struct shrinker *shrink, struct shrink_control *sc)
> {