Re: [PATCH 1/2] rcu/kvfree: Carefully reset number of objects in krcp

From: Paul E. McKenney
Date: Thu Dec 15 2022 - 15:40:33 EST


On Wed, Dec 14, 2022 at 01:06:29PM +0100, Uladzislau Rezki (Sony) wrote:
> Problem. A schedule_delayed_monitor_work() relays on the
> number of pointers queued into krcp. Based on that number
> and threshold the work is rearmed with different delayed
> intervals, i.e. sooner or later.
>
> There are three pipes where pointers can be placed. When
> any pipe is offloaded the krcp->count counter is set to
> zero - what is wrong. Because another pipes might not be
> empty.
>
> Fix it by maintaining a counter individually per a pipe.
> In order to get a number of objects per a krcp introduce
> a krc_count() helper.
>
> Signed-off-by: Uladzislau Rezki (Sony) <urezki@xxxxxxxxx>

Queued and pushed for further review and testing, and with the usual
wordsmithing. Much better use of workqueue handlers, thank you!!!

Thanx, Paul

> ---
> kernel/rcu/tree.c | 40 ++++++++++++++++++++++++++++++----------
> 1 file changed, 30 insertions(+), 10 deletions(-)
>
> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> index d155f2594317..312cb0dee117 100644
> --- a/kernel/rcu/tree.c
> +++ b/kernel/rcu/tree.c
> @@ -2785,7 +2785,8 @@ struct kfree_rcu_cpu_work {
> * @lock: Synchronize access to this structure
> * @monitor_work: Promote @head to @head_free after KFREE_DRAIN_JIFFIES
> * @initialized: The @rcu_work fields have been initialized
> - * @count: Number of objects for which GP not started
> + * @head_count: Number of objects in rcu_head singular list
> + * @bulk_count: Number of objects in bulk-list
> * @bkvcache:
> * A simple cache list that contains objects for reuse purpose.
> * In order to save some per-cpu space the list is singular.
> @@ -2803,13 +2804,19 @@ struct kfree_rcu_cpu_work {
> * the interactions with the slab allocators.
> */
> struct kfree_rcu_cpu {
> + // Objects queued on a linked list
> + // through their rcu_head structures.
> struct rcu_head *head;
> + atomic_t head_count;
> +
> + // Objects queued on a bulk-list.
> struct list_head bulk_head[FREE_N_CHANNELS];
> + atomic_t bulk_count[FREE_N_CHANNELS];
> +
> struct kfree_rcu_cpu_work krw_arr[KFREE_N_BATCHES];
> raw_spinlock_t lock;
> struct delayed_work monitor_work;
> bool initialized;
> - int count;
>
> struct delayed_work page_cache_work;
> atomic_t backoff_page_cache_fill;
> @@ -3032,12 +3039,23 @@ need_offload_krc(struct kfree_rcu_cpu *krcp)
> return !!READ_ONCE(krcp->head);
> }
>
> +static int krc_count(struct kfree_rcu_cpu *krcp)
> +{
> + int sum = atomic_read(&krcp->head_count);
> + int i;
> +
> + for (i = 0; i < FREE_N_CHANNELS; i++)
> + sum += atomic_read(&krcp->bulk_count[i]);
> +
> + return sum;
> +}
> +
> static void
> schedule_delayed_monitor_work(struct kfree_rcu_cpu *krcp)
> {
> long delay, delay_left;
>
> - delay = READ_ONCE(krcp->count) >= KVFREE_BULK_MAX_ENTR ? 1:KFREE_DRAIN_JIFFIES;
> + delay = krc_count(krcp) >= KVFREE_BULK_MAX_ENTR ? 1:KFREE_DRAIN_JIFFIES;
> if (delayed_work_pending(&krcp->monitor_work)) {
> delay_left = krcp->monitor_work.timer.expires - jiffies;
> if (delay < delay_left)
> @@ -3075,8 +3093,10 @@ static void kfree_rcu_monitor(struct work_struct *work)
> // Channel 1 corresponds to the SLAB-pointer bulk path.
> // Channel 2 corresponds to vmalloc-pointer bulk path.
> for (j = 0; j < FREE_N_CHANNELS; j++) {
> - if (list_empty(&krwp->bulk_head_free[j]))
> + if (list_empty(&krwp->bulk_head_free[j])) {
> list_replace_init(&krcp->bulk_head[j], &krwp->bulk_head_free[j]);
> + atomic_set(&krcp->bulk_count[j], 0);
> + }
> }
>
> // Channel 3 corresponds to both SLAB and vmalloc
> @@ -3084,6 +3104,7 @@ static void kfree_rcu_monitor(struct work_struct *work)
> if (!krwp->head_free) {
> krwp->head_free = krcp->head;
> WRITE_ONCE(krcp->head, NULL);
> + atomic_set(&krcp->head_count, 0);
>
> // Take a snapshot for this krwp. Please note no more
> // any objects can be added to attached head_free channel
> @@ -3091,8 +3112,6 @@ static void kfree_rcu_monitor(struct work_struct *work)
> krwp->head_free_gp_snap = get_state_synchronize_rcu();
> }
>
> - WRITE_ONCE(krcp->count, 0);
> -
> // One work is per one batch, so there are three
> // "free channels", the batch can handle. It can
> // be that the work is in the pending state when
> @@ -3229,6 +3248,8 @@ add_ptr_to_bulk_krc_lock(struct kfree_rcu_cpu **krcp,
> // Finally insert and update the GP for this page.
> bnode->records[bnode->nr_records++] = ptr;
> bnode->gp_snap = get_state_synchronize_rcu();
> + atomic_inc(&(*krcp)->bulk_count[idx]);
> +
> return true;
> }
>
> @@ -3282,11 +3303,10 @@ void kvfree_call_rcu(struct rcu_head *head, void *ptr)
> head->func = ptr;
> head->next = krcp->head;
> WRITE_ONCE(krcp->head, head);
> + atomic_inc(&krcp->head_count);
> success = true;
> }
>
> - WRITE_ONCE(krcp->count, krcp->count + 1);
> -
> // Set timer to drain after KFREE_DRAIN_JIFFIES.
> if (rcu_scheduler_active == RCU_SCHEDULER_RUNNING)
> schedule_delayed_monitor_work(krcp);
> @@ -3317,7 +3337,7 @@ kfree_rcu_shrink_count(struct shrinker *shrink, struct shrink_control *sc)
> for_each_possible_cpu(cpu) {
> struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
>
> - count += READ_ONCE(krcp->count);
> + count += krc_count(krcp);
> count += READ_ONCE(krcp->nr_bkv_objs);
> atomic_set(&krcp->backoff_page_cache_fill, 1);
> }
> @@ -3334,7 +3354,7 @@ kfree_rcu_shrink_scan(struct shrinker *shrink, struct shrink_control *sc)
> int count;
> struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
>
> - count = krcp->count;
> + count = krc_count(krcp);
> count += drain_page_cache(krcp);
> kfree_rcu_monitor(&krcp->monitor_work.work);
>
> --
> 2.30.2
>