[RFC][PATCH] printk: do not flush printk_safe from irq_work

From: Sergey Senozhatsky
Date: Wed Jan 24 2018 - 04:37:54 EST


Tejun Heo reported that net_console can cause recursive printk()-s
from call_console_drivers() (under OOM/etc.) and this can lockup that
CPU, because CPU calls call_console_drivers() to print pending logbufs,
but recursive printk() issued by one of console drivers adds several
new messages to the logbuf via recursive printk().

Note, this is not a printk_safe problem. In fact printk_safe helps
us to address the issue here and to rate-limit recursive printk()-s.

Schematically, what we have at the moment is:

==================================================
printk()
preempt_disable()
console_unlock()
{
for (;;) {
local_irq_save()
printk_safe_enter()

call_console_drivers()
printk()
vprintk_safe()
irq_work_queue()

printk_safe_exit()
local_irq_restore()

<< irq work >>
printk_safe_flush()
printk()
}
}
preempt_enable()
==================================================

So CPU triggers printk() recursion in call_console_drivers(),
immediately after local_irq_restore() it executes printk_safe
flush irq_work, which adds even more pending messages to the
logbuf which it has to print. CPU hits printk() recursion again
while it prints pending messages, which results in even more
pending messages flushed from printk_safe via irq_work. And so
on.

The idea is to delay printk_safe flush until CPU is in preemptible
context, so we won't lockup it up.

The new behaviour is:

==================================================
printk()
preempt_disable()
console_unlock()
{
for (;;) {
local_irq_save()
printk_safe_enter()

call_console_drivers()
printk()
vprintk_safe()
queue_work_on(smp_processor_id())

printk_safe_exit()
local_irq_restore()
}
}
preempt_enable()

<< work queue >>
printk_safe_flush()
printk()
==================================================

Since console_unlock() is called under preemption_disabled() now
we cannot schedule flush work as long as CPU is printing messages.
We only can flush printk_safe messages after CPU returns from
console_unlock() and enables preemption.

This might look like with the delayed flush we increase the possibility
of lost printk_safe messages (printk_safe per-CPU buffers are pretty
limited in size). But prinkt_safe was never meant to be used for huge
amounts of data which can generate e.g. debugging of serial consoles,
etc. Its goal is to make recursive printk()-s less deadlock prone.
With this patch we add one more thing: avoid CPU lockup which might
be caused by printk_safe_flush().

Signed-off-by: Sergey Senozhatsky <sergey.senozhatsky@xxxxxxxxx>
Reported-by: Tejun Heo <tj@xxxxxxxxxx>
---
kernel/printk/printk_safe.c | 90 +++++++++++++++++++++++++++++++++++++++------
1 file changed, 78 insertions(+), 12 deletions(-)

diff --git a/kernel/printk/printk_safe.c b/kernel/printk/printk_safe.c
index 3e3c2004bb23..eb5a4049314a 100644
--- a/kernel/printk/printk_safe.c
+++ b/kernel/printk/printk_safe.c
@@ -22,6 +22,7 @@
#include <linux/cpumask.h>
#include <linux/irq_work.h>
#include <linux/printk.h>
+#include <linux/workqueue.h>

#include "internal.h"

@@ -49,7 +50,10 @@ static int printk_safe_irq_ready __read_mostly;
struct printk_safe_seq_buf {
atomic_t len; /* length of written data */
atomic_t message_lost;
- struct irq_work work; /* IRQ work that flushes the buffer */
+ /* IRQ work that flushes printk_nmi buffer */
+ struct irq_work irq_flush_work;
+ /* WQ work that flushes printk_safe buffer */
+ struct work_struct wq_flush_work;
unsigned char buffer[SAFE_LOG_BUF_LEN];
};

@@ -61,10 +65,33 @@ static DEFINE_PER_CPU(struct printk_safe_seq_buf, nmi_print_seq);
#endif

/* Get flushed in a more safe context. */
-static void queue_flush_work(struct printk_safe_seq_buf *s)
+static void queue_irq_flush_work(struct printk_safe_seq_buf *s)
{
if (printk_safe_irq_ready)
- irq_work_queue(&s->work);
+ irq_work_queue(&s->irq_flush_work);
+}
+
+/*
+ * Get flushed from a process context. printk() recursion may happen in
+ * call_console_drivers(), while CPU that hit printk() recursion is
+ * printing pending logbud messages. In this case flushing printk_safe buffer
+ * from IRQ work may result in CPU lockup - we flush printk_safe every time
+ * we enable local IRQs after call_console_drivers(), which adds new messages
+ * to the logbuf and, thus, CPU has more messages to print, triggering more
+ * recursive printk()-s in call_console_drivers() and appending even more
+ * messages to the logbuf.
+ *
+ * Process context cannot flush messages while CPU is in console_unlock(),
+ * because printk() calls console_unlock() with preemption disabled. So
+ * we can flush recursive call_console_drivers() printk()-s only after CPU
+ * leaves console_unlock() printing loop and enables preemption.
+ */
+static void queue_wq_flush_work(struct printk_safe_seq_buf *s)
+{
+ if (printk_safe_irq_ready)
+ queue_work_on(smp_processor_id(),
+ system_wq,
+ &s->wq_flush_work);
}

/*
@@ -89,7 +116,7 @@ static __printf(2, 0) int printk_safe_log_store(struct printk_safe_seq_buf *s,
/* The trailing '\0' is not counted into len. */
if (len >= sizeof(s->buffer) - 1) {
atomic_inc(&s->message_lost);
- queue_flush_work(s);
+ queue_irq_flush_work(s);
return 0;
}

@@ -112,7 +139,7 @@ static __printf(2, 0) int printk_safe_log_store(struct printk_safe_seq_buf *s,
if (atomic_cmpxchg(&s->len, len, len + add) != len)
goto again;

- queue_flush_work(s);
+ queue_irq_flush_work(s);
return add;
}

@@ -186,12 +213,10 @@ static void report_message_lost(struct printk_safe_seq_buf *s)
* Flush data from the associated per-CPU buffer. The function
* can be called either via IRQ work or independently.
*/
-static void __printk_safe_flush(struct irq_work *work)
+static void __printk_safe_flush(struct printk_safe_seq_buf *s)
{
static raw_spinlock_t read_lock =
__RAW_SPIN_LOCK_INITIALIZER(read_lock);
- struct printk_safe_seq_buf *s =
- container_of(work, struct printk_safe_seq_buf, work);
unsigned long flags;
size_t len;
int i;
@@ -243,6 +268,46 @@ static void __printk_safe_flush(struct irq_work *work)
raw_spin_unlock_irqrestore(&read_lock, flags);
}

+static void irq_flush_work_fn(struct irq_work *work)
+{
+ struct printk_safe_seq_buf *s =
+ container_of(work, struct printk_safe_seq_buf, irq_flush_work);
+
+ __printk_safe_flush(s);
+}
+
+static void wq_flush_work_fn(struct work_struct *work)
+{
+ struct printk_safe_seq_buf *s =
+ container_of(work, struct printk_safe_seq_buf, wq_flush_work);
+
+ __printk_safe_flush(s);
+}
+
+/*
+ * We can't queue wq work directly from vprintk_safe(), because we can
+ * deadlock. For instance:
+ *
+ * queue_work()
+ * spin_lock(pool->lock)
+ * printk()
+ * call_console_drivers()
+ * vprintk_safe()
+ * queue_work()
+ * spin_lock(pool->lock)
+ *
+ * So we use irq_work, from which we queue wq work. WQ disables local IRQs
+ * while it works with a pool, so if we have irq_work on this CPU then we
+ * can expect that pool->lock is not locked on this CPU.
+ */
+static void irq_to_wq_flush_work_fn(struct irq_work *work)
+{
+ struct printk_safe_seq_buf *s =
+ container_of(work, struct printk_safe_seq_buf, irq_flush_work);
+
+ queue_wq_flush_work(s);
+}
+
/**
* printk_safe_flush - flush all per-cpu nmi buffers.
*
@@ -256,9 +321,9 @@ void printk_safe_flush(void)

for_each_possible_cpu(cpu) {
#ifdef CONFIG_PRINTK_NMI
- __printk_safe_flush(&per_cpu(nmi_print_seq, cpu).work);
+ __printk_safe_flush(this_cpu_ptr(&nmi_print_seq));
#endif
- __printk_safe_flush(&per_cpu(safe_print_seq, cpu).work);
+ __printk_safe_flush(this_cpu_ptr(&safe_print_seq));
}
}

@@ -387,11 +452,12 @@ void __init printk_safe_init(void)
struct printk_safe_seq_buf *s;

s = &per_cpu(safe_print_seq, cpu);
- init_irq_work(&s->work, __printk_safe_flush);
+ init_irq_work(&s->irq_flush_work, irq_to_wq_flush_work_fn);
+ INIT_WORK(&s->wq_flush_work, wq_flush_work_fn);

#ifdef CONFIG_PRINTK_NMI
s = &per_cpu(nmi_print_seq, cpu);
- init_irq_work(&s->work, __printk_safe_flush);
+ init_irq_work(&s->irq_flush_work, irq_flush_work_fn);
#endif
}

--
2.16.1