[PATCH v2 -rcu dev 2/5] rcu/tree: Add multiple in-flight batches of kfree_rcu work

From: Joel Fernandes (Google)
Date: Fri Aug 30 2019 - 12:37:03 EST


During testing, it was observed that amount of memory consumed due
kfree_rcu() batching is 300-400MB. Previously we had only a single
head_free pointer pointing to the list of rcu_head(s) that are to be
freed after a grace period. Until this list is drained, we cannot queue
any more objects on it since such objects may not be ready to be
reclaimed when the worker thread eventually gets to drainin g the
head_free list.

We can do better by maintaining multiple lists as done by this patch.
Testing shows that memory consumption came down by around 100-150MB with
just adding another list. Adding more than 1 additional list did not
show any improvement.

Suggested-by: Paul E. McKenney <paulmck@xxxxxxxxxxxxx>
Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx>
---
kernel/rcu/tree.c | 80 +++++++++++++++++++++++++++++++++--------------
1 file changed, 56 insertions(+), 24 deletions(-)

diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 68ebf0eb64c8..2e1772469de9 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -2688,28 +2688,37 @@ EXPORT_SYMBOL_GPL(call_rcu);

/* Maximum number of jiffies to wait before draining a batch. */
#define KFREE_DRAIN_JIFFIES (HZ / 50)
+#define KFREE_N_BATCHES 2
+
+struct kfree_rcu_work {
+ /* The rcu_work node for queuing work with queue_rcu_work(). The work
+ * is done after a grace period.
+ */
+ struct rcu_work rcu_work;
+
+ /* The list of objects that have now left ->head and are queued for
+ * freeing after a grace period.
+ */
+ struct rcu_head *head_free;
+
+ struct kfree_rcu_cpu *krcp;
+};

/*
* Maximum number of kfree(s) to batch, if this limit is hit then the batch of
* kfree(s) is queued for freeing after a grace period, right away.
*/
struct kfree_rcu_cpu {
- /* The rcu_work node for queuing work with queue_rcu_work(). The work
- * is done after a grace period.
- */
- struct rcu_work rcu_work;

/* The list of objects being queued in a batch but are not yet
* scheduled to be freed.
*/
struct rcu_head *head;

- /* The list of objects that have now left ->head and are queued for
- * freeing after a grace period.
- */
- struct rcu_head *head_free;
+ /* Pointer to the per-cpu array of kfree_rcu_work structures */
+ struct kfree_rcu_work krw_arr[KFREE_N_BATCHES];

- /* Protect concurrent access to this structure. */
+ /* Protect concurrent access to this structure and kfree_rcu_work. */
spinlock_t lock;

/* The delayed work that flushes ->head to ->head_free incase ->head
@@ -2717,7 +2726,7 @@ struct kfree_rcu_cpu {
* is busy, ->head just continues to grow and we retry flushing later.
*/
struct delayed_work monitor_work;
- int monitor_todo; /* Is a delayed work pending execution? */
+ bool monitor_todo; /* Is a delayed work pending execution? */
};

static DEFINE_PER_CPU(struct kfree_rcu_cpu, krc);
@@ -2730,12 +2739,15 @@ static void kfree_rcu_work(struct work_struct *work)
{
unsigned long flags;
struct rcu_head *head, *next;
- struct kfree_rcu_cpu *krcp = container_of(to_rcu_work(work),
- struct kfree_rcu_cpu, rcu_work);
+ struct kfree_rcu_work *krwp = container_of(to_rcu_work(work),
+ struct kfree_rcu_work, rcu_work);
+ struct kfree_rcu_cpu *krcp;
+
+ krcp = krwp->krcp;

spin_lock_irqsave(&krcp->lock, flags);
- head = krcp->head_free;
- krcp->head_free = NULL;
+ head = krwp->head_free;
+ krwp->head_free = NULL;
spin_unlock_irqrestore(&krcp->lock, flags);

/*
@@ -2758,19 +2770,30 @@ static void kfree_rcu_work(struct work_struct *work)
*/
static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
{
+ int i = 0;
+ struct kfree_rcu_work *krwp = NULL;
+
lockdep_assert_held(&krcp->lock);
+ while (i < KFREE_N_BATCHES) {
+ if (!krcp->krw_arr[i].head_free) {
+ krwp = &(krcp->krw_arr[i]);
+ break;
+ }
+ i++;
+ }

- /* If a previous RCU batch work is already in progress, we cannot queue
+ /*
+ * If both RCU batches are already in progress, we cannot queue
* another one, just refuse the optimization and it will be retried
* again in KFREE_DRAIN_JIFFIES time.
*/
- if (krcp->head_free)
+ if (!krwp)
return false;

- krcp->head_free = krcp->head;
+ krwp->head_free = krcp->head;
krcp->head = NULL;
- INIT_RCU_WORK(&krcp->rcu_work, kfree_rcu_work);
- queue_rcu_work(system_wq, &krcp->rcu_work);
+ INIT_RCU_WORK(&krwp->rcu_work, kfree_rcu_work);
+ queue_rcu_work(system_wq, &krwp->rcu_work);

return true;
}
@@ -2778,7 +2801,8 @@ static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
static inline void kfree_rcu_drain_unlock(struct kfree_rcu_cpu *krcp,
unsigned long flags)
{
- /* Flush ->head to ->head_free, all objects on ->head_free will be
+ /*
+ * Flush ->head to ->head_free, all objects on ->head_free will be
* kfree'd after a grace period.
*/
if (queue_kfree_rcu_work(krcp)) {
@@ -2787,11 +2811,14 @@ static inline void kfree_rcu_drain_unlock(struct kfree_rcu_cpu *krcp,
return;
}

- /* Previous batch that was queued to RCU did not get free yet, let us
+ /*
+ * Previous batch that was queued to RCU did not get free yet, let us
* try again soon.
*/
- if (!xchg(&krcp->monitor_todo, true))
+ if (!krcp->monitor_todo) {
+ krcp->monitor_todo = true;
schedule_delayed_work(&krcp->monitor_work, KFREE_DRAIN_JIFFIES);
+ }
spin_unlock_irqrestore(&krcp->lock, flags);
}

@@ -2806,10 +2833,12 @@ static void kfree_rcu_monitor(struct work_struct *work)
monitor_work.work);

spin_lock_irqsave(&krcp->lock, flags);
- if (xchg(&krcp->monitor_todo, false))
+ if (krcp->monitor_todo) {
+ krcp->monitor_todo = false;
kfree_rcu_drain_unlock(krcp, flags);
- else
+ } else {
spin_unlock_irqrestore(&krcp->lock, flags);
+ }
}

/*
@@ -3736,8 +3765,11 @@ static void __init kfree_rcu_batch_init(void)

for_each_possible_cpu(cpu) {
struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
+ int i;

spin_lock_init(&krcp->lock);
+ for (i = 0; i < KFREE_N_BATCHES; i++)
+ krcp->krw_arr[i].krcp = krcp;
INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor);
}
}
--
2.23.0.187.g17f5b7556c-goog