[PATCH 2/2] Subject: printk: Don't trap random context in infinite log_buf flush

From: Tejun Heo
Date: Thu Nov 02 2017 - 09:53:11 EST


When printk flushing isn't contended, whoever creates messages get to
flush them, which is great in both fairness and keeping log delivery
synchronous. However, when console drivers can't keep up with the
rate of new messages, which happens a lot, who ends up with the
flushing duty is determined by timing and everyone else's messages
become asynchronous.

Unfortunately, the arbitrarily chosen flusher can easily be someone
calling printk from a non-sleepable and/or non-preemptible context,
like while holding a spinlock. This means that the flusher, who can
be stuck in the flushing duty for a long time can't yield or be
preempted, pegging the CPU. This can lead to RCU stall warnings, hung
task warnings and work_on_cpu() stalls and all other sorts of
failures, some of which may generate more printk messages to warn
about the condition further pegging the flusher and grinding the whole
system to a halt.

Even if preemptible, it can be really harmful to peg a random task in
flushing duty for a very long time. There can easily be locking or
other dependency chains where such pegging can lead to more messages
leading to meltdown of the system.

The problem can be solved by introducing a dedicated aync flush worker
and without harming any of the synchroncity guarantees. This patch
udpates console_unlock() that each non-flusher caller is responsible
for only upto the message that it sees on the first iteration which is
guaranteed to include the message it printed, if it did any.

If more messages came in while flushing was in progress, which means
that those messages are already async, it schedules the dedicated
flusher which keeps flushing until empty.

The following repro makes the system completely unusable without this
patch. After the patch, the system stays completely functional no
matter how long the console stays saturated.

#include <linux/module.h>
#include <linux/delay.h>
#include <linux/sched.h>
#include <linux/mutex.h>
#include <linux/workqueue.h>

static DEFINE_MUTEX(test_mutex);
static bool stop_testing;

static void atomic_printk_workfn(struct work_struct *work)
{
while (!READ_ONCE(stop_testing)) {
msleep(100);
mutex_lock(&test_mutex);
mutex_unlock(&test_mutex);
preempt_disable();
printk("XXX ATOMIC\n");
preempt_enable();
cond_resched();
}
}
static DECLARE_WORK(atomic_printk_work, atomic_printk_workfn);

static void hog_printk_workfn(struct work_struct *work)
{
while (!READ_ONCE(stop_testing)) {
mutex_lock(&test_mutex);
printk("XXX HOG\n");
mutex_unlock(&test_mutex);
cond_resched();
}
}
static DECLARE_WORK(hog_printk_work, hog_printk_workfn);

static int __init test_init(void)
{
queue_work_on(0, system_wq, &atomic_printk_work);
msleep(100);
queue_work_on(1, system_wq, &hog_printk_work);

return 0;
}

static void __exit test_exit(void)
{
WRITE_ONCE(stop_testing, true);
flush_work(&atomic_printk_work);
flush_work(&hog_printk_work);
}

module_init(test_init);
module_exit(test_exit);

Signed-off-by: Tejun Heo <tj@xxxxxxxxxx>
Cc: Petr Mladek <pmladek@xxxxxxxx>
Cc: Sergey Senozhatsky <sergey.senozhatsky@xxxxxxxxx>
Cc: Steven Rostedt <rostedt@xxxxxxxxxxx>
Cc: Linus Torvalds <torvalds@xxxxxxxxxxxxxxxxxxxx>
Cc: Andrew Morton <akpm@xxxxxxxxxxxxxxxxxxxx>
---
kernel/printk/printk.c | 66 +++++++++++++++++++++++++++++++++++++++++++------
1 file changed, 58 insertions(+), 8 deletions(-)

--- a/kernel/printk/printk.c
+++ b/kernel/printk/printk.c
@@ -45,6 +45,7 @@
#include <linux/utsname.h>
#include <linux/ctype.h>
#include <linux/uio.h>
+#include <linux/kthread.h>
#include <linux/sched/clock.h>
#include <linux/sched/debug.h>
#include <linux/sched/task_stack.h>
@@ -436,6 +437,12 @@ static char __log_buf[__LOG_BUF_LEN] __a
static char *log_buf = __log_buf;
static u32 log_buf_len = __LOG_BUF_LEN;

+/* async flush */
+static struct kthread_worker *console_async_worker;
+
+static void console_async_workfn(struct kthread_work *work);
+static DEFINE_KTHREAD_WORK(console_async_work, console_async_workfn);
+
/* Return log buffer address */
char *log_buf_addr_get(void)
{
@@ -2092,6 +2099,17 @@ int is_console_locked(void)
return console_locked;
}

+static void console_async_workfn(struct kthread_work *work)
+{
+ console_lock();
+ console_unlock();
+}
+
+static bool current_is_console_async(void)
+{
+ return console_async_worker && console_async_worker->task == current;
+}
+
/*
* Check if we have any console that is capable of printing while cpu is
* booting or shutting down. Requires console_sem.
@@ -2141,7 +2159,8 @@ void console_unlock(void)
static u64 seen_seq;
unsigned long flags;
bool wake_klogd = false;
- bool do_cond_resched, retry;
+ bool do_cond_resched, request_async;
+ u64 target_seq;

if (console_suspended) {
up_console_sem();
@@ -2165,6 +2184,7 @@ void console_unlock(void)
do_cond_resched = console_may_schedule;
again:
console_may_schedule = 0;
+ target_seq = 0;

/*
* We released the console_sem lock, so we need to recheck if
@@ -2185,6 +2205,18 @@ again:
printk_safe_enter_irqsave(flags);
raw_spin_lock(&logbuf_lock);

+ /*
+ * This function can be called from any context and we
+ * don't wanna get live-locked by others' messages. Unless
+ * we're the async worker, flush upto whatever is visible
+ * on the first iteration which is guaranteed to include
+ * the message this task printed if it did. If there are
+ * more messages to be printed, we'll punt to the async
+ * worker.
+ */
+ if (!target_seq || current_is_console_async())
+ target_seq = log_next_seq;
+
if (console_seq < log_first_seq) {
len = sprintf(text, "** %u printk messages dropped ** ",
(unsigned)(log_first_seq - console_seq));
@@ -2196,7 +2228,7 @@ again:
len = 0;
}
skip:
- if (console_seq == log_next_seq)
+ if (console_seq >= target_seq)
break;

msg = log_from_idx(console_idx);
@@ -2246,21 +2278,33 @@ skip:
/*
* Check whether userland needs notification. Also, someone could
* have filled up the buffer again, so re-check if there's
- * something to flush. In case we cannot trylock the console_sem
- * again, there's a new owner and the console_unlock() from them
- * will do the flush, no worries.
+ * something to flush.
*/
raw_spin_lock(&logbuf_lock);
if (seen_seq != log_next_seq) {
wake_klogd = true;
seen_seq = log_next_seq;
}
- retry = console_seq != log_next_seq;
+ request_async = console_seq != log_next_seq;
raw_spin_unlock(&logbuf_lock);
printk_safe_exit_irqrestore(flags);

- if (retry && console_trylock())
- goto again;
+ if (request_async) {
+ if (console_async_worker) {
+ /* async is online, punt */
+ kthread_queue_work(console_async_worker,
+ &console_async_work);
+ } else {
+ /*
+ * We're responsible for flushing. In case we
+ * cannot trylock the console_sem again, there's a
+ * new owner and the console_unlock() from them
+ * will do the flush, no worries.
+ */
+ if (console_trylock())
+ goto again;
+ }
+ }

if (wake_klogd)
wake_up_klogd();
@@ -2647,6 +2691,12 @@ static int __init printk_late_init(void)
struct console *con;
int ret;

+ console_async_worker = kthread_create_worker(0 , "console_async");
+ if (IS_ERR(console_async_worker)) {
+ pr_err("console: failed to initialize async flusher\n");
+ console_async_worker = NULL;
+ }
+
for_each_console(con) {
if (!(con->flags & CON_BOOT))
continue;