[PATCH -v2] x86: MCE: Re-implement MCE log ring buffer as per-CPUring buffer

From: Huang Ying
Date: Tue Apr 28 2009 - 05:27:39 EST


Re-implement MCE log ring buffer as per-CPU ring buffer for better
scalability. Basic design is as follow:

- One ring buffer for each CPU

+ MCEs are added to corresponding local per-CPU buffer, instead of
one big global buffer. Contention/unfairness between CPUs is
eleminated.

+ MCE records are read out and removed from per-CPU buffers by mutex
protected global reader function. Because there are no many
readers in system to contend in most cases.

- Per-CPU ring buffer data structure

+ An array is used to hold MCE records. integer "head" indicates
next writing position and integer "tail" indicates next reading
position.

+ To distinguish buffer empty and full, head and tail wrap to 0 at
MCE_LOG_LIMIT instead of MCE_LOG_LEN. Then the real next writing
position is head % MCE_LOG_LEN, and real next reading position is
tail % MCE_LOG_LEN. If buffer is empty, head == tail, if buffer is
full, head % MCE_LOG_LEN == tail % MCE_LOG_LEN and head != tail.

- Lock-less for writer side

+ MCE log writer may come from NMI, so the writer side must be
lock-less. For per-CPU buffer of one CPU, writers may come from
process, IRQ or NMI context, so "head" is increased with
cmpxchg_local() to allocate buffer space.

+ Reader side is protected with a mutex to guarantee only one reader
is active in the whole system.


Performance test show that the throughput of per-CPU mcelog buffer can
reach 430k records/s compared with 5.3k records/s for original
implementation on a 2-core 2.1GHz Core2 machine.

ChangeLog:

v2:

- Use alloc_percpu() to allocate per_cpu mcelog buffer

- Use ndelay to implement witer timeout


Signed-off-by: Huang Ying <ying.huang@xxxxxxxxx>
CC: Andi Kleen <ak@xxxxxxxxxxxxxxx>

---
arch/x86/include/asm/mce.h | 13 +
arch/x86/kernel/cpu/mcheck/mce_64.c | 257 ++++++++++++++++++++++--------------
2 files changed, 169 insertions(+), 101 deletions(-)

--- a/arch/x86/include/asm/mce.h
+++ b/arch/x86/include/asm/mce.h
@@ -50,15 +50,22 @@ struct mce {
* is set.
*/

-#define MCE_LOG_LEN 32
+#define MCE_LOG_LEN 32
+#define MCE_LOG_LIMIT (MCE_LOG_LEN * 2 - 1)
+
+static inline int mce_log_index(int n)
+{
+ return n >= MCE_LOG_LEN ? n - MCE_LOG_LEN : n;
+}
+
+struct mce_log_cpu;

struct mce_log {
char signature[12]; /* "MACHINECHECK" */
unsigned len; /* = MCE_LOG_LEN */
- unsigned next;
unsigned flags;
unsigned pad0;
- struct mce entry[MCE_LOG_LEN];
+ struct mce_log_cpu *mcelog_cpus;
};

#define MCE_OVERFLOW 0 /* bit 0 in flags means overflow */
--- a/arch/x86/kernel/cpu/mcheck/mce_64.c
+++ b/arch/x86/kernel/cpu/mcheck/mce_64.c
@@ -38,6 +38,9 @@

#define MISC_MCELOG_MINOR 227

+/* Timeout log reader to wait writer to finish */
+#define WRITER_TIMEOUT_NS NSEC_PER_MSEC
+
atomic_t mce_entry;

static int mce_dont_init;
@@ -86,38 +89,43 @@ static struct mce_log mcelog = {
MCE_LOG_LEN,
};

+struct mce_log_cpu {
+ int head;
+ int tail;
+ unsigned long flags;
+ struct mce entry[MCE_LOG_LEN];
+};
+
void mce_log(struct mce *mce)
{
- unsigned next, entry;
+ int cpu = smp_processor_id();
+ struct mce_log_cpu *mcelog_cpu = per_cpu_ptr(mcelog.mcelog_cpus, cpu);
+ int head, ihead, tail, next;
+
atomic_inc(&mce_events);
mce->finished = 0;
- wmb();
- for (;;) {
- entry = rcu_dereference(mcelog.next);
- for (;;) {
- /* When the buffer fills up discard new entries. Assume
- that the earlier errors are the more interesting. */
- if (entry >= MCE_LOG_LEN) {
- set_bit(MCE_OVERFLOW, (unsigned long *)&mcelog.flags);
- return;
- }
- /* Old left over entry. Skip. */
- if (mcelog.entry[entry].finished) {
- entry++;
- continue;
- }
- break;
+ /* mce->finished must be set to 0 before written to ring
+ * buffer */
+ smp_wmb();
+ do {
+ head = mcelog_cpu->head;
+ tail = mcelog_cpu->tail;
+ ihead = mce_log_index(head);
+ /* When the buffer fills up discard new entries. Assume
+ that the earlier errors are the more interesting. */
+ if (ihead == mce_log_index(tail) && head != tail) {
+ set_bit(MCE_OVERFLOW, &mcelog_cpu->flags);
+ return;
}
- smp_rmb();
- next = entry + 1;
- if (cmpxchg(&mcelog.next, entry, next) == entry)
- break;
- }
- memcpy(mcelog.entry + entry, mce, sizeof(struct mce));
- wmb();
- mcelog.entry[entry].finished = 1;
- wmb();
-
+ next = head == MCE_LOG_LIMIT ? 0 : head + 1;
+ } while (cmpxchg_local(&mcelog_cpu->head, head, next) != head);
+ memcpy(mcelog_cpu->entry + ihead, mce, sizeof(struct mce));
+ /* ".finished" of MCE record in ring buffer must be set after
+ * copy */
+ smp_wmb();
+ mcelog_cpu->entry[ihead].finished = 1;
+ /* bit 0 of notify_user should be set after finished be set */
+ smp_wmb();
set_bit(0, &notify_user);
}

@@ -147,21 +155,37 @@ static void print_mce(struct mce *m)
"and contact your hardware vendor\n");
}

-static void mce_panic(char *msg, struct mce *backup, unsigned long start)
+static int mce_print_cpu(int cpu, struct mce *backup, unsigned long start)
{
int i;
+ struct mce_log_cpu *mcelog_cpu;

- oops_begin();
+ mcelog_cpu = per_cpu_ptr(mcelog.mcelog_cpus, cpu);
for (i = 0; i < MCE_LOG_LEN; i++) {
- unsigned long tsc = mcelog.entry[i].tsc;
+ unsigned long tsc = mcelog_cpu->entry[i].tsc;

if (time_before(tsc, start))
continue;
- print_mce(&mcelog.entry[i]);
- if (backup && mcelog.entry[i].tsc == backup->tsc)
+ print_mce(&mcelog_cpu->entry[i]);
+ if (backup && mcelog_cpu->entry[i].tsc == backup->tsc)
backup = NULL;
}
- if (backup)
+ return backup == NULL;
+}
+
+static void mce_panic(char *msg, struct mce *backup, u64 start)
+{
+ int cpu, cpu_self;
+
+ oops_begin();
+ cpu_self = smp_processor_id();
+ for_each_online_cpu(cpu) {
+ if (cpu == cpu_self)
+ continue;
+ if (mce_print_cpu(cpu, backup, start))
+ backup = NULL;
+ }
+ if (!mce_print_cpu(cpu_self, backup, start))
print_mce(backup);
panic(msg);
}
@@ -574,6 +598,16 @@ static int mce_cap_init(void)
return 0;
}

+/*
+ * Initialize MCE per-CPU log buffer
+ */
+static __cpuinit void mce_log_init(void)
+{
+ if (mcelog.mcelog_cpus)
+ return;
+ mcelog.mcelog_cpus = alloc_percpu(struct mce_log_cpu);
+}
+
static void mce_init(void *dummy)
{
u64 cap;
@@ -656,6 +690,7 @@ void __cpuinit mcheck_init(struct cpuinf
mce_dont_init = 1;
return;
}
+ mce_log_init();
mce_cpu_quirks(c);

mce_init(NULL);
@@ -704,90 +739,116 @@ static int mce_release(struct inode *ino
return 0;
}

-static void collect_tscs(void *data)
+static ssize_t mce_read_cpu(struct mce_log_cpu *mcelog_cpu,
+ char __user *inubuf, size_t usize)
{
- unsigned long *cpu_tsc = (unsigned long *)data;
-
- rdtscll(cpu_tsc[smp_processor_id()]);
-}
-
-static ssize_t mce_read(struct file *filp, char __user *ubuf, size_t usize,
- loff_t *off)
-{
- unsigned long *cpu_tsc;
- static DEFINE_MUTEX(mce_read_mutex);
- unsigned prev, next;
- char __user *buf = ubuf;
- int i, err;
-
- cpu_tsc = kmalloc(nr_cpu_ids * sizeof(long), GFP_KERNEL);
- if (!cpu_tsc)
- return -ENOMEM;
-
- mutex_lock(&mce_read_mutex);
- next = rcu_dereference(mcelog.next);
+ char __user *ubuf = inubuf;
+ int head, tail, pos, i, err = 0;

- /* Only supports full reads right now */
- if (*off != 0 || usize < MCE_LOG_LEN*sizeof(struct mce)) {
- mutex_unlock(&mce_read_mutex);
- kfree(cpu_tsc);
- return -EINVAL;
- }
+ head = mcelog_cpu->head;
+ tail = mcelog_cpu->tail;

- err = 0;
- prev = 0;
- do {
- for (i = prev; i < next; i++) {
- unsigned long start = jiffies;
+ if (head == tail)
+ return 0;

- while (!mcelog.entry[i].finished) {
- if (time_after_eq(jiffies, start + 2)) {
- memset(mcelog.entry + i, 0,
+ for (pos = tail; pos != head && usize >= sizeof(struct mce);
+ pos = pos == MCE_LOG_LIMIT ? 0 : pos+1) {
+ i = mce_log_index(pos);
+ if (!mcelog_cpu->entry[i].finished) {
+ int timeout = WRITER_TIMEOUT_NS;
+ while (!mcelog_cpu->entry[i].finished) {
+ if (timeout-- <= 0) {
+ memset(mcelog_cpu->entry + i, 0,
sizeof(struct mce));
+ head = mcelog_cpu->head;
+ printk(KERN_WARNING "mcelog: timeout "
+ "waiting for writer to finish!\n");
goto timeout;
}
- cpu_relax();
+ ndelay(1);
}
- smp_rmb();
- err |= copy_to_user(buf, mcelog.entry + i,
- sizeof(struct mce));
- buf += sizeof(struct mce);
-timeout:
- ;
}
+ /* finished field should be checked before
+ * copy_to_user() */
+ smp_rmb();
+ err |= copy_to_user(ubuf, mcelog_cpu->entry + i,
+ sizeof(struct mce));
+ ubuf += sizeof(struct mce);
+ usize -= sizeof(struct mce);
+timeout:
+ ;
+ }
+ /* mcelog_cpu->tail must be updated after corresponding MCE
+ * records are copied out */
+ smp_wmb();
+ mcelog_cpu->tail = pos;

- memset(mcelog.entry + prev, 0,
- (next - prev) * sizeof(struct mce));
- prev = next;
- next = cmpxchg(&mcelog.next, prev, 0);
- } while (next != prev);
+ return err ? -EFAULT : ubuf - inubuf;
+}

- synchronize_sched();
+static int mce_empty_cpu(struct mce_log_cpu *mcelog_cpu)
+{
+ int head, tail;

- /*
- * Collect entries that were still getting written before the
- * synchronize.
- */
- on_each_cpu(collect_tscs, cpu_tsc, 1);
- for (i = next; i < MCE_LOG_LEN; i++) {
- if (mcelog.entry[i].finished &&
- mcelog.entry[i].tsc < cpu_tsc[mcelog.entry[i].cpu]) {
- err |= copy_to_user(buf, mcelog.entry+i,
- sizeof(struct mce));
- smp_rmb();
- buf += sizeof(struct mce);
- memset(&mcelog.entry[i], 0, sizeof(struct mce));
- }
+ head = mcelog_cpu->head;
+ tail = mcelog_cpu->tail;
+
+ return head == tail;
+}
+
+static int mce_empty(void)
+{
+ int cpu;
+ struct mce_log_cpu *mcelog_cpu;
+
+ for_each_possible_cpu(cpu) {
+ mcelog_cpu = per_cpu_ptr(mcelog.mcelog_cpus, cpu);
+ if (!mce_empty_cpu(mcelog_cpu))
+ return 0;
}
+ return 1;
+}
+
+static ssize_t mce_read(struct file *filp, char __user *inubuf, size_t usize,
+ loff_t *off)
+{
+ char __user *ubuf = inubuf;
+ struct mce_log_cpu *mcelog_cpu;
+ int cpu, new_mce, err = 0;
+ static DEFINE_MUTEX(mce_read_mutex);
+ size_t usize_limit;
+
+ /* Too large user buffer size may cause system not response */
+ usize_limit = num_possible_cpus() * MCE_LOG_LEN * sizeof(struct mce);
+ if (usize > usize_limit)
+ usize = usize_limit;
+ mutex_lock(&mce_read_mutex);
+ do {
+ new_mce = 0;
+ for_each_possible_cpu(cpu) {
+ if (usize < sizeof(struct mce))
+ goto out;
+ mcelog_cpu = per_cpu_ptr(mcelog.mcelog_cpus, cpu);
+ err = mce_read_cpu(mcelog_cpu, ubuf,
+ sizeof(struct mce));
+ if (err > 0) {
+ ubuf += sizeof(struct mce);
+ usize -= sizeof(struct mce);
+ new_mce = 1;
+ err = 0;
+ } else if (err < 0)
+ goto out;
+ }
+ } while (new_mce || !mce_empty());
+out:
mutex_unlock(&mce_read_mutex);
- kfree(cpu_tsc);
- return err ? -EFAULT : buf - ubuf;
+ return err ? : ubuf - inubuf;
}

static unsigned int mce_poll(struct file *file, poll_table *wait)
{
poll_wait(file, &mce_wait, wait);
- if (rcu_dereference(mcelog.next))
+ if (!mce_empty())
return POLLIN | POLLRDNORM;
return 0;
}

Attachment: signature.asc
Description: This is a digitally signed message part