[PATCH RFC v1 1/2] rcu/tree: Add basic support for kfree_rcu batching
From: Joel Fernandes (Google)
Date: Tue Aug 06 2019 - 17:20:58 EST
Recently a discussion about performance of system involving a high rate
of kfree_rcu() calls surfaced on the list [1] which led to another
discussion how to prepare for this situation.
This patch adds basic batching support for kfree_rcu. It is "basic"
because we do none of the slab management, dynamic allocation, code
moving or any of the other things, some of which previous attempts did
[2]. These fancier improvements can be follow-up patches and there are
several ideas being experimented in those regards.
Torture tests follow in the next patch and show improvements of around
~13% with continuous flooding of kfree_rcu() calls on a 16 CPU system.
[1] http://lore.kernel.org/lkml/20190723035725-mutt-send-email-mst@xxxxxxxxxx
[2] https://lkml.org/lkml/2017/12/19/824
This is an effort just to start simple, and build up from there.
Cc: Rao Shoaib <rao.shoaib@xxxxxxxxxx>
Cc: max.byungchul.park@xxxxxxxxx
Cc: byungchul.park@xxxxxxx
Cc: kernel-team@xxxxxxxxxxx
Cc: kernel-team@xxxxxxx
Co-developed-by: Byungchul Park <byungchul.park@xxxxxxx>
Signed-off-by: Byungchul Park <byungchul.park@xxxxxxx>
Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx>
---
kernel/rcu/tree.c | 198 ++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 193 insertions(+), 5 deletions(-)
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index a14e5fbbea46..bdbd483606ce 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -2593,19 +2593,194 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
}
EXPORT_SYMBOL_GPL(call_rcu);
+
+/* Maximum number of jiffies to wait before draining batch */
+#define KFREE_DRAIN_JIFFIES 50
+
+/*
+ * Maximum number of kfree(s) to batch, if limit is hit
+ * then RCU work is queued right away
+ */
+#define KFREE_MAX_BATCH 200000ULL
+
+struct kfree_rcu_cpu {
+ /* The work done to free objects after GP */
+ struct rcu_work rcu_work;
+
+ /* The list of objects being queued */
+ struct rcu_head *head;
+ int kfree_batch_len;
+
+ /* The list of objects pending a free */
+ struct rcu_head *head_free;
+
+ /* Protect concurrent access to this structure */
+ spinlock_t lock;
+
+ /* The work done to monitor whether objects need free */
+ struct delayed_work monitor_work;
+ bool monitor_todo;
+};
+
+static DEFINE_PER_CPU(struct kfree_rcu_cpu, krc);
+
+/* Free all heads after a grace period (worker function) */
+static void kfree_rcu_work(struct work_struct *work)
+{
+ unsigned long flags;
+ struct rcu_head *head, *next;
+ struct kfree_rcu_cpu *krc = container_of(to_rcu_work(work),
+ struct kfree_rcu_cpu, rcu_work);
+
+ spin_lock_irqsave(&krc->lock, flags);
+ head = krc->head_free;
+ krc->head_free = NULL;
+ spin_unlock_irqrestore(&krc->lock, flags);
+
+ /* The head must be detached and not referenced from anywhere */
+ for (; head; head = next) {
+ next = head->next;
+ head->next = NULL;
+ /* Could be possible to optimize with kfree_bulk in future */
+ __rcu_reclaim(rcu_state.name, head);
+ }
+}
+
+/*
+ * Schedule the kfree batch RCU work to run after GP.
+ *
+ * Either the batch reached its maximum size, or the monitor's
+ * time reached, either way schedule the batch work.
+ */
+static bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krc)
+{
+ lockdep_assert_held(&krc->lock);
+
+ /*
+ * Someone already drained, probably before the monitor's worker
+ * thread ran. Just return to avoid useless work.
+ */
+ if (!krc->head)
+ return true;
+
+ /*
+ * If RCU batch work already in progress, we cannot
+ * queue another one, just refuse the optimization.
+ */
+ if (krc->head_free)
+ return false;
+
+ krc->head_free = krc->head;
+ krc->head = NULL;
+ krc->kfree_batch_len = 0;
+ INIT_RCU_WORK(&krc->rcu_work, kfree_rcu_work);
+ queue_rcu_work(system_wq, &krc->rcu_work);
+
+ return true;
+}
+
+static void kfree_rcu_drain_unlock(struct kfree_rcu_cpu *krc,
+ unsigned long flags)
+{
+ struct rcu_head *head, *next;
+
+ /* It is time to do bulk reclaim after grace period */
+ krc->monitor_todo = false;
+ if (queue_kfree_rcu_work(krc)) {
+ spin_unlock_irqrestore(&krc->lock, flags);
+ return;
+ }
+
+ /*
+ * Use non-batch regular call_rcu for kfree_rcu in case things are too
+ * busy and batching of kfree_rcu could not be used.
+ */
+ head = krc->head;
+ krc->head = NULL;
+ krc->kfree_batch_len = 0;
+ spin_unlock_irqrestore(&krc->lock, flags);
+
+ for (; head; head = next) {
+ next = head->next;
+ head->next = NULL;
+ __call_rcu(head, head->func, -1, 1);
+ }
+}
+
+/*
+ * If enough time has passed, the kfree batch has to be drained
+ * and the monitor takes care of that.
+ */
+static void kfree_rcu_monitor(struct work_struct *work)
+{
+ bool todo;
+ unsigned long flags;
+ struct kfree_rcu_cpu *krc = container_of(work, struct kfree_rcu_cpu,
+ monitor_work.work);
+
+ /* It is time to do bulk reclaim after grace period */
+ spin_lock_irqsave(&krc->lock, flags);
+ todo = krc->monitor_todo;
+ krc->monitor_todo = false;
+ if (todo)
+ kfree_rcu_drain_unlock(krc, flags);
+ else
+ spin_unlock_irqrestore(&krc->lock, flags);
+}
+
+static void kfree_rcu_batch(struct rcu_head *head, rcu_callback_t func)
+{
+ unsigned long flags;
+ struct kfree_rcu_cpu *this_krc;
+ bool monitor_todo;
+
+ local_irq_save(flags);
+ this_krc = this_cpu_ptr(&krc);
+
+ spin_lock(&this_krc->lock);
+
+ /* Queue the kfree but don't yet schedule the batch */
+ head->func = func;
+ head->next = this_krc->head;
+ this_krc->head = head;
+ this_krc->kfree_batch_len++;
+
+ if (this_krc->kfree_batch_len == KFREE_MAX_BATCH) {
+ kfree_rcu_drain_unlock(this_krc, flags);
+ return;
+ }
+
+ /* Maximum has not been reached, schedule monitor for timely drain */
+ monitor_todo = this_krc->monitor_todo;
+ this_krc->monitor_todo = true;
+ spin_unlock(&this_krc->lock);
+
+ if (!monitor_todo) {
+ schedule_delayed_work_on(smp_processor_id(),
+ &this_krc->monitor_work, KFREE_DRAIN_JIFFIES);
+ }
+ local_irq_restore(flags);
+}
+
/*
* Queue an RCU callback for lazy invocation after a grace period.
- * This will likely be later named something like "call_rcu_lazy()",
- * but this change will require some way of tagging the lazy RCU
- * callbacks in the list of pending callbacks. Until then, this
- * function may only be called from __kfree_rcu().
*/
void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
{
- __call_rcu(head, func, -1, 1);
+ kfree_rcu_batch(head, func);
}
EXPORT_SYMBOL_GPL(kfree_call_rcu);
+/*
+ * The version of kfree_call_rcu that does not do batching of kfree_rcu()
+ * requests. To be used only for performance testing comparisons with
+ * kfree_rcu_batch().
+ */
+void kfree_call_rcu_nobatch(struct rcu_head *head, rcu_callback_t func)
+{
+ __call_rcu(head, func, -1, 1);
+}
+
/*
* During early boot, any blocking grace-period wait automatically
* implies a grace period. Later on, this is never the case for PREEMPT.
@@ -3452,6 +3627,17 @@ static void __init rcu_dump_rcu_node_tree(void)
pr_cont("\n");
}
+void kfree_rcu_batch_init(void)
+{
+ int cpu;
+
+ for_each_possible_cpu(cpu) {
+ struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
+
+ INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor);
+ }
+}
+
struct workqueue_struct *rcu_gp_wq;
struct workqueue_struct *rcu_par_gp_wq;
@@ -3459,6 +3645,8 @@ void __init rcu_init(void)
{
int cpu;
+ kfree_rcu_batch_init();
+
rcu_early_boot_tests();
rcu_bootup_announce();
--
2.22.0.770.g0f2c4a37fd-goog