Re: [RFC PATCH v1 04/25] printk-rb: add writer interface

From: John Ogness
Date: Sat Feb 16 2019 - 20:47:27 EST


Hi Petr,

I've made changes to the patch that hopefully align with what you are
looking for. I would appreciate it if you could go over it and see if
the changes are in the right direction. And if so, you should decide
whether I should make these kinds of changes for the whole series and
submit a v2 before you continue with the review.

The list of changes:

- Added comments everywhere I think they could be useful. Is it too
much?

- Renamed struct prb_handle to prb_reserved_entry (more appropriate).

- Fixed up macros as you requested.

- The implementation from prb_commit() has been moved to a new
prb_commit_all_reserved(). This should resolve the confusion in the
"failed to push_tail()" code.

- I tried moving calc_next() into prb_reserve(), but it was pure
insanity. I played with refactoring for a while until I found
something that I think looks nice. I moved the implementation of
calc_next() along with its containing loop into a new function
find_res_ptrs(). This function does what calc_next() and push_tail()
did. With this solution, I think prb_reserve() looks pretty
clean. However, the optimization of communicating about the wrap is
gone. So even though find_res_ptrs() knew if a wrap occurred,
prb_reserve() figures it out again for itself. If we want the
optimization, I still think the best approach is the -1,0,1 return
value of find_res_ptrs().

I'm looking forward to your response.

John Ogness


diff --git a/include/linux/printk_ringbuffer.h b/include/linux/printk_ringbuffer.h
index 4239dc86e029..ab6177c9fe0a 100644
--- a/include/linux/printk_ringbuffer.h
+++ b/include/linux/printk_ringbuffer.h
@@ -25,6 +25,23 @@ struct printk_ringbuffer {
atomic_t ctx;
};

+/*
+ * struct prb_reserved_entry: Reserved but not yet committed entry.
+ * @rb: The printk_ringbuffer where the entry was reserved.
+ *
+ * This is a handle used by the writer to represent an entry that has been
+ * reserved but not yet committed.
+ *
+ * The structure does not actually store any information about the entry that
+ * has been reserved because this information is not required by the
+ * implementation. The struct could prove useful if extra tracking or even
+ * fundamental changes to the ringbuffer were to be implemented. And as such
+ * would not require changes to writers.
+ */
+struct prb_reserved_entry {
+ struct printk_ringbuffer *rb;
+};
+
#define DECLARE_STATIC_PRINTKRB_CPULOCK(name) \
static struct prb_cpulock name = { \
.owner = ATOMIC_INIT(-1), \
@@ -46,6 +63,11 @@ static struct printk_ringbuffer name = { \
.ctx = ATOMIC_INIT(0), \
}

+/* writer interface */
+char *prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+ unsigned int size);
+void prb_commit(struct prb_reserved_entry *e);
+
/* utility functions */
void prb_lock(struct prb_cpulock *cpu_lock);
void prb_unlock(struct prb_cpulock *cpu_lock);
diff --git a/lib/printk_ringbuffer.c b/lib/printk_ringbuffer.c
index 54c750092810..fbe1d92b9b60 100644
--- a/lib/printk_ringbuffer.c
+++ b/lib/printk_ringbuffer.c
@@ -2,6 +2,59 @@
#include <linux/smp.h>
#include <linux/printk_ringbuffer.h>

+/*
+ * struct prb_entry: An entry within the ringbuffer.
+ * @size: The size in bytes of the entry or -1 if terminating.
+ * @seq: The unique sequence number of the entry.
+ * @data: The data bytes of the entry.
+ *
+ * The struct is typecasted directly into the ringbuffer data array to access
+ * an entry. The @size specifies the complete size of the entry including any
+ * padding. The next entry will be located at &this_entry + this_entry.size.
+ * The only exception is if the entry is terminating (size = -1). In this case
+ * @seq and @data are invalid and the next entry is at the beginning of the
+ * ringbuffer data array.
+ */
+struct prb_entry {
+ unsigned int size;
+ u64 seq;
+ char data[0];
+};
+
+/* the size and size bitmask of the ringbuffer data array */
+#define PRB_SIZE(rb) (1L << rb->size_bits)
+#define PRB_SIZE_BITMASK(rb) (PRB_SIZE(rb) - 1)
+
+/* given a logical position, return its index in the ringbuffer data array */
+#define PRB_INDEX(rb, lpos) (lpos & PRB_SIZE_BITMASK(rb))
+
+/*
+ * given a logical position, return how many times the data buffer has
+ * wrapped, where logical position 0 begins at index 0 with no wraps
+ */
+#define PRB_WRAPS(rb, lpos) (lpos >> rb->size_bits)
+
+/*
+ * given a logical position, return the logical position that represents the
+ * beginning of the ringbuffer data array for this wrap
+ */
+#define PRB_THIS_WRAP_START_LPOS(rb, lpos) \
+ (PRB_WRAPS(rb, lpos) << rb->size_bits)
+
+/*
+ * given a logical position, return the logical position that represents the
+ * beginning of the ringbuffer data array for the next wrap
+ */
+#define PRB_NEXT_WRAP_START_LPOS(rb, lpos) \
+ ((PRB_WRAPS(rb, lpos) + 1) << rb->size_bits)
+
+/*
+ * entries are aligned to allow direct typecasts as struct prb_entry
+ */
+#define PRB_ENTRY_ALIGN sizeof(long)
+#define PRB_ENTRY_ALIGN_SIZE(sz) \
+ ((sz + (PRB_ENTRY_ALIGN - 1)) & ~(PRB_ENTRY_ALIGN - 1))
+
/*
* prb_lock: Perform a processor-reentrant spin lock.
* @cpu_lock: A pointer to the lock object.
@@ -58,3 +111,257 @@ void prb_unlock(struct prb_cpulock *cpu_lock)
}
put_cpu();
}
+
+/* translate a logical position to an entry in the data array */
+static struct prb_entry *to_entry(struct printk_ringbuffer *rb,
+ unsigned long lpos)
+{
+ char *buffer = rb->buffer;
+
+ buffer += PRB_INDEX(rb, lpos);
+ return (struct prb_entry *)buffer;
+}
+
+/* try to move the tail pointer forward, thus removing the oldest entry */
+static bool push_tail(struct printk_ringbuffer *rb, unsigned long tail)
+{
+ unsigned long new_tail, head;
+ struct prb_entry *e;
+
+ /* maybe another context already pushed the tail */
+ if (tail != atomic_long_read(&rb->tail))
+ return true;
+
+ /*
+ * Determine what the new tail should be. If the tail is a
+ * terminating entry, the new tail will be beyond the entry
+ * at the beginning of the data array.
+ */
+ e = to_entry(rb, tail);
+ if (e->size != -1)
+ new_tail = tail + e->size;
+ else
+ new_tail = PRB_NEXT_WRAP_START_LPOS(rb, tail);
+
+ /* make sure the new tail does not overtake the head */
+ head = atomic_long_read(&rb->head);
+ if (head - new_tail > PRB_SIZE(rb))
+ return false;
+
+ /*
+ * The result of this cmpxchg does not matter. If it succeeds,
+ * this context pushed the tail. If it fails, some other context
+ * pushed the tail. Either way, the tail was pushed.
+ */
+ atomic_long_cmpxchg(&rb->tail, tail, new_tail);
+ return true;
+}
+
+/*
+ * If this context incremented rb->ctx to 1, move the head pointer
+ * beyond all reserved entries.
+ */
+static void prb_commit_all_reserved(struct printk_ringbuffer *rb)
+{
+ unsigned long head, res;
+ struct prb_entry *e;
+
+ for (;;) {
+ if (atomic_read(&rb->ctx) > 1) {
+ /* another context will adjust the head pointer */
+ atomic_dec(&rb->ctx);
+ break;
+ }
+
+ /*
+ * This is the only context that will adjust the head pointer.
+ * If NMIs interrupt at any time, they can reserve/commit new
+ * entries, but they will not adjust the head pointer.
+ */
+
+ /* assign sequence numbers before moving the head pointer */
+ head = atomic_long_read(&rb->head);
+ res = atomic_long_read(&rb->reserve);
+ while (head != res) {
+ e = to_entry(rb, head);
+ if (e->size == -1) {
+ head = PRB_NEXT_WRAP_START_LPOS(rb, head);
+ continue;
+ }
+ e->seq = ++rb->seq;
+ head += e->size;
+ }
+
+ /*
+ * move the head pointer, thus making all reserved entries
+ * visible to any readers
+ */
+ atomic_long_set_release(&rb->head, res);
+
+ atomic_dec(&rb->ctx);
+ if (atomic_long_read(&rb->reserve) == res)
+ break;
+ /*
+ * The reserve pointer is different than previously read. New
+ * entries were reserve/committed by NMI contexts, possibly
+ * before ctx was decremented by this context. Go back and move
+ * the head pointer beyond those entries as well.
+ */
+ atomic_inc(&rb->ctx);
+ }
+
+ /* Enable interrupts and allow other CPUs to reserve/commit. */
+ prb_unlock(rb->cpulock);
+}
+
+/*
+ * prb_commit: Commit a reserved entry to the ring buffer.
+ * @e: A structure referencing a the reserved entry to commit.
+ *
+ * Commit data that has been reserved using prb_reserve(). Once the entry
+ * has been committed, it can be invalidated at any time. If a writer is
+ * interested in using the data after committing, the writer should make
+ * its own copy first or use the prb_iter_ reader functions to access the
+ * data in the ring buffer.
+ *
+ * It is safe to call this function from any context and state.
+ */
+void prb_commit(struct prb_reserved_entry *e)
+{
+ prb_commit_all_reserved(e->rb);
+}
+
+/* given the size to reserve, determine current and next reserve pointers */
+static bool find_res_ptrs(struct printk_ringbuffer *rb, unsigned long *res_old,
+ unsigned long *res_new, unsigned int size)
+{
+ unsigned long tail, entry_begin;
+
+ /*
+ * The reserve pointer is not allowed to overtake the index of the
+ * tail pointer. If this would happen, the tail pointer must be
+ * pushed, thus removing the oldest entry.
+ */
+ for (;;) {
+ tail = atomic_long_read(&rb->tail);
+ *res_old = atomic_long_read(&rb->reserve);
+
+ /*
+ * If the new reserve pointer wraps, the new entry will
+ * begin at the beginning of the data array. This loop
+ * exists only to handle the wrap.
+ */
+ for (entry_begin = *res_old;;) {
+
+ *res_new = entry_begin + size;
+
+ if (*res_new - tail > PRB_SIZE(rb)) {
+ /* would overtake tail, push tail */
+
+ if (!push_tail(rb, tail)) {
+ /* couldn't push tail, can't reserve */
+ return false;
+ }
+
+ /* tail pushed, try again */
+ break;
+ }
+
+ if (PRB_WRAPS(rb, entry_begin) ==
+ PRB_WRAPS(rb, *res_new)) {
+ /* reserve pointer values determined */
+ return true;
+ }
+
+ /*
+ * The new entry will wrap. Calculate the new reserve
+ * pointer based on the beginning of the data array
+ * for the wrap of the new reserve pointer.
+ */
+ entry_begin = PRB_THIS_WRAP_START_LPOS(rb, *res_new);
+ }
+ }
+}
+
+/*
+ * prb_reserve: Reserve an entry within a ring buffer.
+ * @e: A structure to be setup and reference a reserved entry.
+ * @rb: A ring buffer to reserve data within.
+ * @size: The number of bytes to reserve.
+ *
+ * Reserve an entry of at least @size bytes to be used by the caller. If
+ * successful, the data region of the entry belongs to the caller and cannot
+ * be invalidated by any other task/context. For this reason, the caller
+ * should call prb_commit() as quickly as possible in order to avoid preventing
+ * other tasks/contexts from reserving data in the case that the ring buffer
+ * has wrapped.
+ *
+ * It is safe to call this function from any context and state.
+ *
+ * Returns a pointer to the reserved data (and @e is setup to reference the
+ * entry containing that data) or NULL if it was not possible to reserve data.
+ */
+char *prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+ unsigned int size)
+{
+ unsigned long res_old, res_new;
+ struct prb_entry *entry;
+
+ if (size == 0)
+ return NULL;
+
+ /* add entry header to size and align for the following entry */
+ size = PRB_ENTRY_ALIGN_SIZE(sizeof(struct prb_entry) + size);
+
+ if (size >= PRB_SIZE(rb))
+ return NULL;
+
+ /*
+ * Lock out all other CPUs and disable interrupts. Only NMIs will
+ * be able to interrupt. It will stay this way until the matching
+ * commit is called.
+ */
+ prb_lock(rb->cpulock);
+
+ /*
+ * Clarify the responsibility of this context. If this context
+ * increments ctx to 1, this context is responsible for pushing
+ * the head pointer beyond all reserved entries on commit.
+ */
+ atomic_inc(&rb->ctx);
+
+ /*
+ * Move the reserve pointer forward. Since NMIs can interrupt at any
+ * time, modifying the reserve pointer is done in a cmpxchg loop.
+ */
+ do {
+ if (!find_res_ptrs(rb, &res_old, &res_new, size)) {
+ /*
+ * Not possible to move the reserve pointer. Try to
+ * commit all reserved entries because this context
+ * might have that responsibility (if it incremented
+ * ctx to 1).
+ */
+ prb_commit_all_reserved(rb);
+ return NULL;
+ }
+ } while (!atomic_long_try_cmpxchg_acquire(&rb->reserve,
+ &res_old, res_new));
+
+ entry = to_entry(rb, res_old);
+ if (PRB_WRAPS(rb, res_old) != PRB_WRAPS(rb, res_new)) {
+ /*
+ * The reserve wraps. Create the terminating entry and get the
+ * pointer to the actually reserved entry at the beginning of
+ * the data array on the wrap of the new reserve pointer.
+ */
+ entry->size = -1;
+ entry = to_entry(rb, PRB_THIS_WRAP_START_LPOS(rb, res_new));
+ }
+
+ /* The size is set now. The seq is set later, on commit. */
+ entry->size = size;
+
+ e->rb = rb;
+ return &entry->data[0];
+}