[PATCH 2/2] rcu/kvfree: Split ready for reclaim objects from a batch

From: Uladzislau Rezki (Sony)
Date: Wed Dec 14 2022 - 07:09:41 EST


This patch splits ready to free pointers from the ones which
require an extra grace period. This is done before submitting
a new batch.

Once a split is done, ready to free are released right away
and after that a new batch is queued from the monitor work
over a queue_rcu_work().

Signed-off-by: Uladzislau Rezki (Sony) <urezki@xxxxxxxxx>
---
kernel/rcu/tree.c | 87 +++++++++++++++++++++++++++++------------------
1 file changed, 54 insertions(+), 33 deletions(-)

diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 312cb0dee117..50a585caf698 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -2764,15 +2764,13 @@ struct kvfree_rcu_bulk_data {
* struct kfree_rcu_cpu_work - single batch of kfree_rcu() requests
* @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period
* @head_free: List of kfree_rcu() objects waiting for a grace period
- * @head_free_gp_snap: Snapshot of RCU state for objects placed to "@head_free"
* @bulk_head_free: Bulk-List of kvfree_rcu() objects waiting for a grace period
* @krcp: Pointer to @kfree_rcu_cpu structure
*/

struct kfree_rcu_cpu_work {
- struct work_struct rcu_work;
+ struct rcu_work rcu_work;
struct rcu_head *head_free;
- unsigned long head_free_gp_snap;
struct list_head bulk_head_free[FREE_N_CHANNELS];
struct kfree_rcu_cpu *krcp;
};
@@ -2780,6 +2778,7 @@ struct kfree_rcu_cpu_work {
/**
* struct kfree_rcu_cpu - batch up kfree_rcu() requests for RCU grace period
* @head: List of kfree_rcu() objects not yet waiting for a grace period
+ * @head_gp_snap: Snapshot of RCU state for objects placed to "@head"
* @bulk_head: Bulk-List of kvfree_rcu() objects not yet waiting for a grace period
* @krw_arr: Array of batches of kfree_rcu() objects waiting for a grace period
* @lock: Synchronize access to this structure
@@ -2807,6 +2806,7 @@ struct kfree_rcu_cpu {
// Objects queued on a linked list
// through their rcu_head structures.
struct rcu_head *head;
+ unsigned long head_gp_snap;
atomic_t head_count;

// Objects queued on a bulk-list.
@@ -2975,10 +2975,9 @@ static void kfree_rcu_work(struct work_struct *work)
struct rcu_head *head;
struct kfree_rcu_cpu *krcp;
struct kfree_rcu_cpu_work *krwp;
- unsigned long head_free_gp_snap;
int i;

- krwp = container_of(work,
+ krwp = container_of(to_rcu_work(work),
struct kfree_rcu_cpu_work, rcu_work);
krcp = krwp->krcp;

@@ -2990,26 +2989,11 @@ static void kfree_rcu_work(struct work_struct *work)
// Channel 3.
head = krwp->head_free;
krwp->head_free = NULL;
- head_free_gp_snap = krwp->head_free_gp_snap;
raw_spin_unlock_irqrestore(&krcp->lock, flags);

// Handle the first two channels.
for (i = 0; i < FREE_N_CHANNELS; i++) {
// Start from the tail page, so a GP is likely passed for it.
- list_for_each_entry_safe_reverse(bnode, n, &bulk_head[i], list) {
- // Not yet ready? Bail out since we need one more GP.
- if (!poll_state_synchronize_rcu(bnode->gp_snap))
- break;
-
- list_del_init(&bnode->list);
- kvfree_rcu_bulk(krcp, bnode, i);
- }
-
- // Please note a request for one more extra GP can
- // occur only once for all objects in this batch.
- if (!list_empty(&bulk_head[i]))
- synchronize_rcu();
-
list_for_each_entry_safe(bnode, n, &bulk_head[i], list)
kvfree_rcu_bulk(krcp, bnode, i);
}
@@ -3021,10 +3005,7 @@ static void kfree_rcu_work(struct work_struct *work)
* queued on a linked list through their rcu_head structures.
* This list is named "Channel 3".
*/
- if (head) {
- cond_synchronize_rcu(head_free_gp_snap);
- kvfree_rcu_list(head);
- }
+ kvfree_rcu_list(head);
}

static bool
@@ -3065,6 +3046,44 @@ schedule_delayed_monitor_work(struct kfree_rcu_cpu *krcp)
queue_delayed_work(system_wq, &krcp->monitor_work, delay);
}

+static void
+kvfree_rcu_drain_ready(struct kfree_rcu_cpu *krcp)
+{
+ struct list_head bulk_ready[FREE_N_CHANNELS];
+ struct kvfree_rcu_bulk_data *bnode, *n;
+ struct rcu_head *head_ready = NULL;
+ unsigned long flags;
+ int i;
+
+ raw_spin_lock_irqsave(&krcp->lock, flags);
+ for (i = 0; i < FREE_N_CHANNELS; i++) {
+ INIT_LIST_HEAD(&bulk_ready[i]);
+
+ list_for_each_entry_safe_reverse(bnode, n, &krcp->bulk_head[i], list) {
+ if (!poll_state_synchronize_rcu(bnode->gp_snap))
+ break;
+
+ atomic_sub(bnode->nr_records, &krcp->bulk_count[i]);
+ list_move(&bnode->list, &bulk_ready[i]);
+ }
+ }
+
+ if (krcp->head && poll_state_synchronize_rcu(krcp->head_gp_snap)) {
+ head_ready = krcp->head;
+ atomic_set(&krcp->head_count, 0);
+ WRITE_ONCE(krcp->head, NULL);
+ }
+ raw_spin_unlock_irqrestore(&krcp->lock, flags);
+
+ for (i = 0; i < FREE_N_CHANNELS; i++) {
+ list_for_each_entry_safe(bnode, n, &bulk_ready[i], list)
+ kvfree_rcu_bulk(krcp, bnode, i);
+ }
+
+ if (head_ready)
+ kvfree_rcu_list(head_ready);
+}
+
/*
* This function is invoked after the KFREE_DRAIN_JIFFIES timeout.
*/
@@ -3075,6 +3094,9 @@ static void kfree_rcu_monitor(struct work_struct *work)
unsigned long flags;
int i, j;

+ // Drain ready for reclaim.
+ kvfree_rcu_drain_ready(krcp);
+
raw_spin_lock_irqsave(&krcp->lock, flags);

// Attempt to start a new batch.
@@ -3094,8 +3116,9 @@ static void kfree_rcu_monitor(struct work_struct *work)
// Channel 2 corresponds to vmalloc-pointer bulk path.
for (j = 0; j < FREE_N_CHANNELS; j++) {
if (list_empty(&krwp->bulk_head_free[j])) {
- list_replace_init(&krcp->bulk_head[j], &krwp->bulk_head_free[j]);
atomic_set(&krcp->bulk_count[j], 0);
+ list_replace_init(&krcp->bulk_head[j],
+ &krwp->bulk_head_free[j]);
}
}

@@ -3103,13 +3126,8 @@ static void kfree_rcu_monitor(struct work_struct *work)
// objects queued on the linked list.
if (!krwp->head_free) {
krwp->head_free = krcp->head;
- WRITE_ONCE(krcp->head, NULL);
atomic_set(&krcp->head_count, 0);
-
- // Take a snapshot for this krwp. Please note no more
- // any objects can be added to attached head_free channel
- // therefore fixate a GP for it here.
- krwp->head_free_gp_snap = get_state_synchronize_rcu();
+ WRITE_ONCE(krcp->head, NULL);
}

// One work is per one batch, so there are three
@@ -3117,7 +3135,7 @@ static void kfree_rcu_monitor(struct work_struct *work)
// be that the work is in the pending state when
// channels have been detached following by each
// other.
- queue_work(system_wq, &krwp->rcu_work);
+ queue_rcu_work(system_wq, &krwp->rcu_work);
}
}

@@ -3304,6 +3322,9 @@ void kvfree_call_rcu(struct rcu_head *head, void *ptr)
head->next = krcp->head;
WRITE_ONCE(krcp->head, head);
atomic_inc(&krcp->head_count);
+
+ // Take a snapshot for this krcp.
+ krcp->head_gp_snap = get_state_synchronize_rcu();
success = true;
}

@@ -4860,7 +4881,7 @@ static void __init kfree_rcu_batch_init(void)
struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);

for (i = 0; i < KFREE_N_BATCHES; i++) {
- INIT_WORK(&krcp->krw_arr[i].rcu_work, kfree_rcu_work);
+ INIT_RCU_WORK(&krcp->krw_arr[i].rcu_work, kfree_rcu_work);
krcp->krw_arr[i].krcp = krcp;

for (j = 0; j < FREE_N_CHANNELS; j++)
--
2.30.2