[PATCH v2] rcu/tree: Add multiple in-flight batches of kfree_rcu work

From: Joel Fernandes (Google)
Date: Wed Aug 28 2019 - 10:10:02 EST


During testing, it was observed that amount of memory consumed due
kfree_rcu() batching is 300-400MB. Previously we had only a single
head_free pointer pointing to the list of rcu_head(s) that are to be
freed after a grace period. Until this list is drained, we cannot queue
any more objects on it since such objects may not be ready to be
reclaimed when the worker thread eventually gets to drainin g the
head_free list.

We can do better by maintaining multiple lists as done by this patch.
Testing shows that memory consumption came down by around 100-150MB with
just adding another list. Adding more than 1 additional list did not
show any improvement.

Suggested-by: Paul E. McKenney <paulmck@xxxxxxxxxxxxx>
Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx>
---
kernel/rcu/tree.c | 61 ++++++++++++++++++++++++++++++++---------------
1 file changed, 42 insertions(+), 19 deletions(-)

diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 4f7c3096d786..5bf8f7e793ea 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -2688,28 +2688,37 @@ EXPORT_SYMBOL_GPL(call_rcu);

/* Maximum number of jiffies to wait before draining a batch. */
#define KFREE_DRAIN_JIFFIES (HZ / 50)
+#define KFREE_N_BATCHES 2
+
+struct kfree_rcu_work {
+ /* The rcu_work node for queuing work with queue_rcu_work(). The work
+ * is done after a grace period.
+ */
+ struct rcu_work rcu_work;
+
+ /* The list of objects that have now left ->head and are queued for
+ * freeing after a grace period.
+ */
+ struct rcu_head *head_free;
+
+ struct kfree_rcu_cpu *krcp;
+};

/*
* Maximum number of kfree(s) to batch, if this limit is hit then the batch of
* kfree(s) is queued for freeing after a grace period, right away.
*/
struct kfree_rcu_cpu {
- /* The rcu_work node for queuing work with queue_rcu_work(). The work
- * is done after a grace period.
- */
- struct rcu_work rcu_work;

/* The list of objects being queued in a batch but are not yet
* scheduled to be freed.
*/
struct rcu_head *head;

- /* The list of objects that have now left ->head and are queued for
- * freeing after a grace period.
- */
- struct rcu_head *head_free;
+ /* Pointer to the per-cpu array of kfree_rcu_work structures */
+ struct kfree_rcu_work krw_arr[KFREE_N_BATCHES];

- /* Protect concurrent access to this structure. */
+ /* Protect concurrent access to this structure and kfree_rcu_work. */
spinlock_t lock;

/* The delayed work that flushes ->head to ->head_free incase ->head
@@ -2730,12 +2739,14 @@ static void kfree_rcu_work(struct work_struct *work)
{
unsigned long flags;
struct rcu_head *head, *next;
- struct kfree_rcu_cpu *krcp = container_of(to_rcu_work(work),
- struct kfree_rcu_cpu, rcu_work);
+ struct kfree_rcu_work *krwp = container_of(to_rcu_work(work),
+ struct kfree_rcu_work, rcu_work);
+ struct kfree_rcu_cpu *krcp;
+
+ krcp = krwp->krcp;

spin_lock_irqsave(&krcp->lock, flags);
- head = krcp->head_free;
- krcp->head_free = NULL;
+ head = xchg(&krwp->head_free, NULL);
spin_unlock_irqrestore(&krcp->lock, flags);

/*
@@ -2758,19 +2769,28 @@ static void kfree_rcu_work(struct work_struct *work)
*/
static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
{
+ int i = 0;
+ struct kfree_rcu_work *krwp = NULL;
+
lockdep_assert_held(&krcp->lock);
+ while (i < KFREE_N_BATCHES) {
+ if (!krcp->krw_arr[i].head_free) {
+ krwp = &(krcp->krw_arr[i]);
+ break;
+ }
+ i++;
+ }

- /* If a previous RCU batch work is already in progress, we cannot queue
+ /* If both RCU batches are already in progress, we cannot queue
* another one, just refuse the optimization and it will be retried
* again in KFREE_DRAIN_JIFFIES time.
*/
- if (krcp->head_free)
+ if (!krwp)
return false;

- krcp->head_free = krcp->head;
- krcp->head = NULL;
- INIT_RCU_WORK(&krcp->rcu_work, kfree_rcu_work);
- queue_rcu_work(system_wq, &krcp->rcu_work);
+ krwp->head_free = xchg(&krcp->head, NULL);
+ INIT_RCU_WORK(&krwp->rcu_work, kfree_rcu_work);
+ queue_rcu_work(system_wq, &krwp->rcu_work);

return true;
}
@@ -3736,8 +3756,11 @@ static void __init kfree_rcu_batch_init(void)

for_each_possible_cpu(cpu) {
struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
+ int i = KFREE_N_BATCHES;

spin_lock_init(&krcp->lock);
+ while (i--)
+ krcp->krw_arr[i].krcp = krcp;
INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor);
}
}
--
2.23.0.187.g17f5b7556c-goog