[RFC v1 06/14] bus1: util - queue utility library
From: David Herrmann
Date: Wed Oct 26 2016 - 15:22:12 EST
From: Tom Gundersen <teg@xxxxxxx>
(Please refer to 'Lamport Timestamps', the concept of
'happened-before', and 'causal ordering'. The queue implementation
has its roots in Lamport Timestamps, treating a set of local CPUs
as a distributed system to avoid any global synchronization.)
A bus1 message queue is a FIFO, i.e., messages are linearly ordered by
the time they were sent. Moreover, atomic delivery of messages to
multiple queues are supported, without any global synchronization, i.e.,
the order of message delivery is consistent across queues.
Messages can be destined for multiple queues, hence, we need to be
careful that all queues get a consistent order of incoming messages. We
define the concept of `global order' to provide a basic set of
guarantees. This global order is a partial order on the set of all
messages. The order is defined as:
1) If a message B was queued *after* a message A, then: A < B
2) If a message B was queued *after* a message A was dequeued,
then: A < B
3) If a message B was dequeued *after* a message A on the same queue,
then: A < B
(Note: Causality is honored. `after' and `before' do not refer to
the same task, nor the same queue, but rather any kind of
synchronization between the two operations.)
The queue object implements this global order in a lockless fashion. It
solely relies on a distributed clock on each queue. Each message to be
sent causes a clock tick on the local clock and on all destination
clocks. Furthermore, all clocks are synchronized, meaning they're
fast-forwarded in case they're behind the highest of all participating
peers. No global state tracking is involved.
During a message transaction, we first queue a message as 'staging'
entry in each destination with a preliminary timestamp. This timestamp
is explicitly odd numbered. Any odd numbered timestamp is considered
'staging' and causes *any* message ordered after it to be blocked until
it is no longer staging. This allows us to queue the message in parallel
with any racing multicast, and be guaranteed that all possible conflicts
are blocked until we eventually commit a transaction. To commit a
transaction (after all staging entries are queued), we choose the
highest timestamp we have seen across all destinations and re-queue all
our entries on each peer using that timestamp. Here we use a commit
timestamp (even numbered).
With this in mind, we define that a client can only dequeue messages
from its queue that have an even timestamp. Furthermore, if there is a
message queued with an odd timestamp that is lower than the even
timestamp of another message, then neither message can be dequeued.
They're considered to be in-flight conflicts. This guarantees that two
concurrent multicast messages can be queued without any *global* locks,
but either can only be dequeued by a peer if their ordering has been
established (via commit timestamps).
NOTE: A fully committed message is not guaranteed to be ready to be
dequeued as it may be blocked by a staging entry. This means
that there is an arbitrary (though bounded) time from a
message transaction completing when the queue may still appear
to be empty. In other words, message transmission is not
instantaneous. It would be possible to change this at the
cost of shortly blocking each message transaction on all other
conflicting tasks.
The queue implementation uses an rb-tree (ordered by timestamps and
sender), with a cached pointer to the front of the queue. It will be
embedded in every peer participating on the bus1 kernel message bus1.
Signed-off-by: Tom Gundersen <teg@xxxxxxx>
Signed-off-by: David Herrmann <dh.herrmann@xxxxxxxxx>
---
ipc/bus1/Makefile | 3 +-
ipc/bus1/util/queue.c | 445 ++++++++++++++++++++++++++++++++++++++++++++++++++
ipc/bus1/util/queue.h | 351 +++++++++++++++++++++++++++++++++++++++
3 files changed, 798 insertions(+), 1 deletion(-)
create mode 100644 ipc/bus1/util/queue.c
create mode 100644 ipc/bus1/util/queue.h
diff --git a/ipc/bus1/Makefile b/ipc/bus1/Makefile
index ca8e19d..3c90657 100644
--- a/ipc/bus1/Makefile
+++ b/ipc/bus1/Makefile
@@ -2,7 +2,8 @@ bus1-y := \
main.o \
util/active.o \
util/flist.o \
- util/pool.o
+ util/pool.o \
+ util/queue.o
obj-$(CONFIG_BUS1) += bus1.o
diff --git a/ipc/bus1/util/queue.c b/ipc/bus1/util/queue.c
new file mode 100644
index 0000000..38d7b98
--- /dev/null
+++ b/ipc/bus1/util/queue.c
@@ -0,0 +1,445 @@
+/*
+ * Copyright (C) 2013-2016 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by the
+ * Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ */
+
+#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
+#include <linux/kernel.h>
+#include <linux/rbtree.h>
+#include <linux/rcupdate.h>
+#include <linux/sched.h>
+#include <linux/wait.h>
+#include "queue.h"
+
+static void bus1_queue_node_set_timestamp(struct bus1_queue_node *node, u64 ts)
+{
+ WARN_ON(ts & BUS1_QUEUE_TYPE_MASK);
+ node->timestamp_and_type &= BUS1_QUEUE_TYPE_MASK;
+ node->timestamp_and_type |= ts;
+}
+
+static int bus1_queue_node_order(struct bus1_queue_node *a,
+ struct bus1_queue_node *b)
+{
+ int r;
+
+ r = bus1_queue_compare(bus1_queue_node_get_timestamp(a), a->group,
+ bus1_queue_node_get_timestamp(b), b->group);
+ if (r)
+ return r;
+ if (a < b)
+ return -1;
+ if (a > b)
+ return 1;
+
+ WARN(1, "Duplicate queue entry");
+ return 0;
+}
+
+/**
+ * bus1_queue_init() - initialize queue
+ * @queue: queue to initialize
+ *
+ * This initializes a new queue. The queue memory is considered uninitialized,
+ * any previous content is unrecoverable.
+ */
+void bus1_queue_init(struct bus1_queue *queue)
+{
+ queue->clock = 0;
+ queue->flush = 0;
+ queue->leftmost = NULL;
+ rcu_assign_pointer(queue->front, NULL);
+ queue->messages = RB_ROOT;
+}
+
+/**
+ * bus1_queue_deinit() - destroy queue
+ * @queue: queue to destroy
+ *
+ * This destroys a queue that was previously initialized via bus1_queue_init().
+ * The caller must make sure the queue is empty before calling this.
+ *
+ * This function is a no-op, and only does safety checks on the queue. It is
+ * safe to call this function multiple times on the same queue.
+ *
+ * The caller must guarantee that the backing memory of @queue is freed in an
+ * rcu-delayed manner.
+ */
+void bus1_queue_deinit(struct bus1_queue *queue)
+{
+ WARN_ON(!RB_EMPTY_ROOT(&queue->messages));
+ WARN_ON(queue->leftmost);
+ WARN_ON(rcu_access_pointer(queue->front));
+}
+
+/**
+ * bus1_queue_flush() - flush message queue
+ * @queue: queue to flush
+ * @ts: flush timestamp
+ *
+ * This flushes all committed entries from @queue and returns them as
+ * singly-linked list for the caller to clean up. Staged entries are left in
+ * the queue.
+ *
+ * You must acquire a timestamp before flushing the queue (e.g., tick the
+ * clock). This timestamp must be given as @ts. Only entries lower than, or
+ * equal to, this timestamp are flushed. The timestamp is remembered as
+ * queue->flush.
+ *
+ * Return: Single-linked list of flushed entries.
+ */
+struct bus1_queue_node *bus1_queue_flush(struct bus1_queue *queue, u64 ts)
+{
+ struct bus1_queue_node *node, *list = NULL;
+ struct rb_node *n;
+
+ /*
+ * A queue contains staging and committed nodes. A committed node is
+ * fully owned by the queue, but a staging entry is always still owned
+ * by a transaction.
+ *
+ * On flush, we push all committed (i.e., queue-owned) nodes into a
+ * list and transfer them to the caller, as if they dequeued them
+ * manually. But any staging node is left linked. Depending on the
+ * timestamp that will be assigned by their transaction, they will be
+ * either lazily discarded or not.
+ */
+
+ WARN_ON(ts & 1);
+ WARN_ON(ts > queue->clock + 1);
+ WARN_ON(ts < queue->flush);
+
+ rcu_assign_pointer(queue->front, NULL);
+ queue->leftmost = NULL;
+ queue->flush = ts;
+
+ n = rb_first(&queue->messages);
+ while (n) {
+ node = container_of(n, struct bus1_queue_node, rb);
+ n = rb_next(n);
+ ts = bus1_queue_node_get_timestamp(node);
+
+ if (!(ts & 1) && ts <= queue->flush) {
+ rb_erase(&node->rb, &queue->messages);
+ RB_CLEAR_NODE(&node->rb);
+ node->next = list;
+ list = node;
+ } else if (!queue->leftmost) {
+ queue->leftmost = &node->rb;
+ }
+ }
+
+ return list;
+}
+
+static void bus1_queue_add(struct bus1_queue *queue,
+ wait_queue_head_t *waitq,
+ struct bus1_queue_node *node,
+ u64 timestamp)
+{
+ struct rb_node *front, *n, **slot;
+ struct bus1_queue_node *iter;
+ bool is_leftmost, readable;
+ u64 ts;
+ int r;
+
+ ts = bus1_queue_node_get_timestamp(node);
+ readable = rcu_access_pointer(queue->front);
+
+ /* provided timestamp must be valid */
+ if (WARN_ON(timestamp == 0 || timestamp > queue->clock + 1))
+ return;
+ /* if unstamped, it must be unlinked, and vice versa */
+ if (WARN_ON(!ts == !RB_EMPTY_NODE(&node->rb)))
+ return;
+ /* if stamped, it must be a valid staging timestamp from earlier */
+ if (WARN_ON(ts != 0 && (!(ts & 1) || timestamp < ts)))
+ return;
+ /* nothing to do? */
+ if (ts == timestamp)
+ return;
+
+ /*
+ * We update the timestamp of @node *before* erasing it. This
+ * guarantees that the comparisons to NEXT/PREV are done based on the
+ * new values.
+ * The rb-tree does not care for async key-updates, since all accesses
+ * are done locked, and tree maintenance is always stable (never looks
+ * at the keys).
+ */
+ bus1_queue_node_set_timestamp(node, timestamp);
+
+ /*
+ * On updates, we remove our entry and re-insert it with a higher
+ * timestamp. Hence, _iff_ we were the first entry, we might uncover
+ * some new front entry. Make sure we mark it as front entry then. Note
+ * that we know that our entry must be marked staging, so it cannot be
+ * set as front, yet. If there is a front, it is some other node.
+ */
+ if (&node->rb == queue->leftmost) {
+ /*
+ * We are linked into the queue as staging entry *and* we are
+ * the first entry. Now look at the following entry. If it is
+ * already committed *and* has a lower timestamp than we do, it
+ * will become the new front, so mark it as such!
+ */
+ WARN_ON(readable);
+ queue->leftmost = rb_next(&node->rb);
+ if (queue->leftmost) {
+ iter = container_of(queue->leftmost,
+ struct bus1_queue_node, rb);
+ if (!bus1_queue_node_is_staging(iter) &&
+ bus1_queue_node_order(iter, node) <= 0)
+ rcu_assign_pointer(queue->front,
+ queue->leftmost);
+ }
+ } else if ((front = rcu_dereference_raw(queue->front))) {
+ /*
+ * If there already is a front entry, just verify that we will
+ * not order *before* it. We *must not* replace it as front.
+ */
+ iter = container_of(front, struct bus1_queue_node, rb);
+ WARN_ON(bus1_queue_node_order(node, iter) <= 0);
+ }
+
+ /* must be staging, so it cannot be pointed to by queue->front */
+ if (!RB_EMPTY_NODE(&node->rb))
+ rb_erase(&node->rb, &queue->messages);
+
+ /* re-insert into sorted rb-tree with new timestamp */
+ slot = &queue->messages.rb_node;
+ n = NULL;
+ is_leftmost = true;
+ while (*slot) {
+ n = *slot;
+ iter = container_of(n, struct bus1_queue_node, rb);
+ r = bus1_queue_node_order(node, iter);
+ if (r < 0) {
+ slot = &n->rb_left;
+ } else /* if (r >= 0) */ {
+ slot = &n->rb_right;
+ is_leftmost = false;
+ }
+ }
+
+ rb_link_node(&node->rb, n, slot);
+ rb_insert_color(&node->rb, &queue->messages);
+
+ if (is_leftmost) {
+ queue->leftmost = &node->rb;
+ if (!(timestamp & 1))
+ rcu_assign_pointer(queue->front, &node->rb);
+ else
+ WARN_ON(readable);
+ }
+
+ if (waitq && !readable && rcu_access_pointer(queue->front))
+ wake_up_interruptible(waitq);
+}
+
+/**
+ * bus1_queue_stage() - stage queue entry with fresh timestamp
+ * @queue: queue to operate on
+ * @node: queue entry to stage
+ * @timestamp: minimum timestamp for @node
+ *
+ * Link a queue entry with a new timestamp. The staging entry blocks all
+ * messages with timestamps synced on this queue in the future, as well as any
+ * messages with a timestamp greater than @timestamp. However, it does not block
+ * any messages already committed to this queue.
+ *
+ * The caller must provide an even timestamp and the entry may not already have
+ * been committed.
+ *
+ * Return: The timestamp used.
+ */
+u64 bus1_queue_stage(struct bus1_queue *queue,
+ struct bus1_queue_node *node,
+ u64 timestamp)
+{
+ WARN_ON(timestamp & 1);
+ WARN_ON(bus1_queue_node_is_queued(node));
+
+ timestamp = bus1_queue_sync(queue, timestamp);
+ bus1_queue_add(queue, NULL, node, timestamp + 1);
+ WARN_ON(rcu_access_pointer(queue->front) == &node->rb);
+
+ return timestamp;
+}
+
+/**
+ * bus1_queue_commit_staged() - commit staged queue entry with new timestamp
+ * @queue: queue to operate on
+ * @waitq: wait-queue to wake up on change, or NULL
+ * @node: queue entry to commit
+ * @timestamp: new timestamp for @node
+ *
+ * Update a staging queue entry according to @timestamp. The timestamp must be
+ * even and the entry may not already have been committed.
+ *
+ * Furthermore, the queue clock must be synced with the new timestamp *before*
+ * staging an entry. Similarly, the timestamp of an entry can only be
+ * increased, never decreased.
+ */
+void bus1_queue_commit_staged(struct bus1_queue *queue,
+ wait_queue_head_t *waitq,
+ struct bus1_queue_node *node,
+ u64 timestamp)
+{
+ WARN_ON(timestamp & 1);
+ WARN_ON(!bus1_queue_node_is_queued(node));
+
+ bus1_queue_add(queue, waitq, node, timestamp);
+}
+
+/**
+ * bus1_queue_commit_unstaged() - commit unstaged queue entry with new timestamp
+ * @queue: queue to operate on
+ * @waitq: wait-queue to wake up on change, or NULL
+ * @node: queue entry to commit
+ *
+ * Directly commit an unstaged queue entry to the destination queue. The entry
+ * must not be queued, yet.
+ *
+ * The destination queue is ticked and the resulting timestamp is used to commit
+ * the queue entry.
+ */
+void bus1_queue_commit_unstaged(struct bus1_queue *queue,
+ wait_queue_head_t *waitq,
+ struct bus1_queue_node *node)
+{
+ WARN_ON(bus1_queue_node_is_queued(node));
+
+ bus1_queue_add(queue, waitq, node, bus1_queue_tick(queue));
+}
+
+/**
+ * bus1_queue_commit_synthetic() - commit synthetic entry
+ * @queue: queue to operate on
+ * @node: entry to commit
+ * @timestamp: timestamp to use
+ *
+ * This inserts the unqueued entry @node into the queue with the commit
+ * timestamp @timestamp (just like bus1_queue_commit_unstaged()). However, it
+ * only does so if the new entry would NOT become the new front. It thus allows
+ * inserting fake synthetic entries somewhere in the middle of a queue, but
+ * accepts the possibility of failure.
+ *
+ * Return: True if committed, false if not.
+ */
+bool bus1_queue_commit_synthetic(struct bus1_queue *queue,
+ struct bus1_queue_node *node,
+ u64 timestamp)
+{
+ struct bus1_queue_node *t;
+ bool queued = false;
+ int r;
+
+ WARN_ON(timestamp & 1);
+ WARN_ON(timestamp > queue->clock + 1);
+ WARN_ON(bus1_queue_node_is_queued(node));
+
+ if (queue->leftmost) {
+ t = container_of(queue->leftmost, struct bus1_queue_node, rb);
+ r = bus1_queue_compare(bus1_queue_node_get_timestamp(t),
+ t->group, timestamp, node->group);
+ if (r < 0 || (r == 0 && node < t)) {
+ bus1_queue_add(queue, NULL, node, timestamp);
+ WARN_ON(rcu_access_pointer(queue->front) == &node->rb);
+ queued = true;
+ }
+ }
+
+ return queued;
+}
+
+/**
+ * bus1_queue_remove() - remove entry from queue
+ * @queue: queue to operate on
+ * @waitq: wait-queue to wake up on change, or NULL
+ * @node: queue entry to remove
+ *
+ * This unlinks @node and fully removes it from the queue @queue. If you want
+ * to re-insert the node into a queue, you must re-initialize it first.
+ *
+ * It is an error to call this on an unlinked entry.
+ */
+void bus1_queue_remove(struct bus1_queue *queue,
+ wait_queue_head_t *waitq,
+ struct bus1_queue_node *node)
+{
+ bool readable;
+
+ if (WARN_ON(RB_EMPTY_NODE(&node->rb)))
+ return;
+
+ readable = rcu_access_pointer(queue->front);
+
+ if (queue->leftmost == &node->rb) {
+ /*
+ * We are the first entry in the queue. Regardless whether we
+ * are marked as front or not, our removal might uncover a new
+ * front. Hence, always look at the next following entry and
+ * see whether it is fully committed. If it is, mark it as
+ * front, but otherwise reset the front to NULL.
+ */
+ queue->leftmost = rb_next(queue->leftmost);
+ if (queue->leftmost &&
+ !bus1_queue_node_is_staging(container_of(queue->leftmost,
+ struct bus1_queue_node,
+ rb)))
+ rcu_assign_pointer(queue->front, queue->leftmost);
+ else
+ rcu_assign_pointer(queue->front, NULL);
+ }
+
+ rb_erase(&node->rb, &queue->messages);
+ RB_CLEAR_NODE(&node->rb);
+
+ if (waitq && !readable && rcu_access_pointer(queue->front))
+ wake_up_interruptible(waitq);
+}
+
+/**
+ * bus1_queue_peek() - peek first available entry
+ * @queue: queue to operate on
+ * @morep: where to store group-state
+ *
+ * This returns a pointer to the first available entry in the given queue, or
+ * NULL if there is none. The queue stays unmodified and the returned entry
+ * remains on the queue.
+ *
+ * This only returns entries that are ready to be dequeued. Entries that are
+ * still in staging mode will not be considered.
+ *
+ * If a node is returned, its group-state is stored in @morep. That means,
+ * if there are more messages queued as part of the same transaction, true is
+ * stored in @morep. But if the returned node is the last part of the
+ * transaction, false is returned.
+ *
+ * Return: Pointer to first available entry, NULL if none available.
+ */
+struct bus1_queue_node *bus1_queue_peek(struct bus1_queue *queue, bool *morep)
+{
+ struct bus1_queue_node *node, *t;
+ struct rb_node *n;
+
+ n = rcu_dereference_raw(queue->front);
+ if (!n)
+ return NULL;
+
+ node = container_of(n, struct bus1_queue_node, rb);
+ n = rb_next(n);
+ if (n)
+ t = container_of(n, struct bus1_queue_node, rb);
+
+ *morep = n && !bus1_queue_compare(bus1_queue_node_get_timestamp(node),
+ node->group,
+ bus1_queue_node_get_timestamp(t),
+ t->group);
+ return node;
+}
diff --git a/ipc/bus1/util/queue.h b/ipc/bus1/util/queue.h
new file mode 100644
index 0000000..1a59a60
--- /dev/null
+++ b/ipc/bus1/util/queue.h
@@ -0,0 +1,351 @@
+#ifndef __BUS1_QUEUE_H
+#define __BUS1_QUEUE_H
+
+/*
+ * Copyright (C) 2013-2016 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by the
+ * Free Software Foundation; either version 2.1 of the License, or (at
+ * your option) any later version.
+ */
+
+/**
+ * DOC: Message Queue
+ *
+ * (You are highly encouraged to read up on 'Lamport Timestamps', the
+ * concept of 'happened-before', and 'causal ordering'. The queue
+ * implementation has its roots in Lamport Timestamps, treating a set of local
+ * CPUs as a distributed system to avoid any global synchronization.)
+ *
+ * A message queue is a FIFO, i.e., messages are linearly ordered by the time
+ * they were sent. Moreover, atomic delivery of messages to multiple queues are
+ * supported, without any global synchronization, i.e., the order of message
+ * delivery is consistent across queues.
+ *
+ * Messages can be destined for multiple queues, hence, we need to be careful
+ * that all queues get a consistent order of incoming messages. We define the
+ * concept of `global order' to provide a basic set of guarantees. This global
+ * order is a partial order on the set of all messages. The order is defined as:
+ *
+ * 1) If a message B was queued *after* a message A, then: A < B
+ *
+ * 2) If a message B was queued *after* a message A was dequeued, then: A < B
+ *
+ * 3) If a message B was dequeued *after* a message A on the same queue,
+ * then: A < B
+ *
+ * (Note: Causality is honored. `after' and `before' do not refer to the
+ * same task, nor the same queue, but rather any kind of
+ * synchronization between the two operations.)
+ *
+ * The queue object implements this global order in a lockless fashion. It
+ * solely relies on a distributed clock on each queue. Each message to be sent
+ * causes a clock tick on the local clock and on all destination clocks.
+ * Furthermore, all clocks are synchronized, meaning they're fast-forwarded in
+ * case they're behind the highest of all participating peers. No global state
+ * tracking is involved.
+ *
+ * During a message transaction, we first queue a message as 'staging' entry in
+ * each destination with a preliminary timestamp. This timestamp is explicitly
+ * odd numbered. Any odd numbered timestamp is considered 'staging' and causes
+ * *any* message ordered after it to be blocked until it is no longer staging.
+ * This allows us to queue the message in parallel with any racing multicast,
+ * and be guaranteed that all possible conflicts are blocked until we eventually
+ * commit a transaction. To commit a transaction (after all staging entries are
+ * queued), we choose the highest timestamp we have seen across all destinations
+ * and re-queue all our entries on each peer using that timestamp. Here we use a
+ * commit timestamp (even numbered).
+ *
+ * With this in mind, we define that a client can only dequeue messages from
+ * its queue that have an even timestamp. Furthermore, if there is a message
+ * queued with an odd timestamp that is lower than the even timestamp of
+ * another message, then neither message can be dequeued. They're considered to
+ * be in-flight conflicts. This guarantees that two concurrent multicast
+ * messages can be queued without any *global* locks, but either can only be
+ * dequeued by a peer if their ordering has been established (via commit
+ * timestamps).
+ *
+ * NOTE: A fully committed message is not guaranteed to be ready to be dequeued
+ * as it may be blocked by a staging entry. This means that there is an
+ * arbitrary (though bounded) time from a message transaction completing
+ * when the queue may still appear to be empty. In other words, message
+ * transmission is not instantaneous. It would be possible to change this
+ * at the cost of shortly blocking each message transaction on all other
+ * conflicting tasks.
+ *
+ * The queue implementation uses an rb-tree (ordered by timestamps and sender),
+ * with a cached pointer to the front of the queue.
+ */
+
+#include <linux/kernel.h>
+#include <linux/rbtree.h>
+#include <linux/rcupdate.h>
+#include <linux/wait.h>
+
+/* shift/mask for @timestamp_and_type field of queue nodes */
+#define BUS1_QUEUE_TYPE_SHIFT (62)
+#define BUS1_QUEUE_TYPE_MASK (((u64)3ULL) << BUS1_QUEUE_TYPE_SHIFT)
+#define BUS1_QUEUE_TYPE_N (4)
+
+/**
+ * struct bus1_queue_node - node into message queue
+ * @rcu: rcu-delayed destruction
+ * @rb: link into sorted message queue
+ * @timestamp_and_type: message timestamp and type of parent object
+ * @next: single-linked utility list
+ * @group: group association
+ * @owner: node owner
+ */
+struct bus1_queue_node {
+ union {
+ struct rcu_head rcu;
+ struct rb_node rb;
+ };
+ u64 timestamp_and_type;
+ struct bus1_queue_node *next;
+ void *group;
+ void *owner;
+};
+
+/**
+ * struct bus1_queue - message queue
+ * @clock: local clock (used for Lamport Timestamps)
+ * @flush: last flush timestamp
+ * @leftmost: cached left-most entry
+ * @front: cached front entry
+ * @messages: queued messages
+ */
+struct bus1_queue {
+ u64 clock;
+ u64 flush;
+ struct rb_node *leftmost;
+ struct rb_node __rcu *front;
+ struct rb_root messages;
+};
+
+void bus1_queue_init(struct bus1_queue *queue);
+void bus1_queue_deinit(struct bus1_queue *queue);
+struct bus1_queue_node *bus1_queue_flush(struct bus1_queue *queue, u64 ts);
+u64 bus1_queue_stage(struct bus1_queue *queue,
+ struct bus1_queue_node *node,
+ u64 timestamp);
+void bus1_queue_commit_staged(struct bus1_queue *queue,
+ wait_queue_head_t *waitq,
+ struct bus1_queue_node *node,
+ u64 timestamp);
+void bus1_queue_commit_unstaged(struct bus1_queue *queue,
+ wait_queue_head_t *waitq,
+ struct bus1_queue_node *node);
+bool bus1_queue_commit_synthetic(struct bus1_queue *queue,
+ struct bus1_queue_node *node,
+ u64 timestamp);
+void bus1_queue_remove(struct bus1_queue *queue,
+ wait_queue_head_t *waitq,
+ struct bus1_queue_node *node);
+struct bus1_queue_node *bus1_queue_peek(struct bus1_queue *queue, bool *morep);
+
+/**
+ * bus1_queue_node_init() - initialize queue node
+ * @node: node to initialize
+ * @type: message type
+ *
+ * This initializes a previously unused node, and prepares it for use with a
+ * message queue.
+ */
+static inline void bus1_queue_node_init(struct bus1_queue_node *node,
+ unsigned int type)
+{
+ BUILD_BUG_ON((BUS1_QUEUE_TYPE_N - 1) > (BUS1_QUEUE_TYPE_MASK >>
+ BUS1_QUEUE_TYPE_SHIFT));
+ WARN_ON(type & ~(BUS1_QUEUE_TYPE_MASK >> BUS1_QUEUE_TYPE_SHIFT));
+
+ RB_CLEAR_NODE(&node->rb);
+ node->timestamp_and_type = (u64)type << BUS1_QUEUE_TYPE_SHIFT;
+ node->next = NULL;
+ node->group = NULL;
+ node->owner = NULL;
+}
+
+/**
+ * bus1_queue_node_deinit() - destroy queue node
+ * @node: node to destroy
+ *
+ * This destroys a previously initialized queue node. This is a no-op and only
+ * serves as debugger, testing whether the node was properly unqueued before.
+ */
+static inline void bus1_queue_node_deinit(struct bus1_queue_node *node)
+{
+ WARN_ON(!RB_EMPTY_NODE(&node->rb));
+ WARN_ON(node->next);
+}
+
+/**
+ * bus1_queue_node_get_type() - query node type
+ * @node: node to query
+ *
+ * This queries the node type that was provided via the node constructor. A
+ * node never changes its type during its entire lifetime.
+ *
+ * Return: Type of @node is returned.
+ */
+static inline unsigned int
+bus1_queue_node_get_type(struct bus1_queue_node *node)
+{
+ return (node->timestamp_and_type & BUS1_QUEUE_TYPE_MASK) >>
+ BUS1_QUEUE_TYPE_SHIFT;
+}
+
+/**
+ * bus1_queue_node_get_timestamp() - query node timestamp
+ * @node: node to query
+ *
+ * This queries the node timestamp that is currently set on this node.
+ *
+ * Return: Timestamp of @node is returned.
+ */
+static inline u64 bus1_queue_node_get_timestamp(struct bus1_queue_node *node)
+{
+ return node->timestamp_and_type & ~BUS1_QUEUE_TYPE_MASK;
+}
+
+/**
+ * bus1_queue_node_is_queued() - check whether a node is queued
+ * @node: node to query
+ *
+ * This checks whether a node is currently queued in a message queue. That is,
+ * the node was linked and has not been dequeued, yet.
+ *
+ * Return: True if @node is currently queued.
+ */
+static inline bool bus1_queue_node_is_queued(struct bus1_queue_node *node)
+{
+ return !RB_EMPTY_NODE(&node->rb);
+}
+
+/**
+ * bus1_queue_node_is_staging() - check whether a node is marked staging
+ * @node: node to query
+ *
+ * This checks whether a given node is queued, but still marked staging. That
+ * means, the node has been put on the queue but there is still a transaction
+ * that pins it to commit it later.
+ *
+ * Return: True if @node is queued as staging entry.
+ */
+static inline bool bus1_queue_node_is_staging(struct bus1_queue_node *node)
+{
+ return bus1_queue_node_get_timestamp(node) & 1;
+}
+
+/**
+ * bus1_queue_tick() - increment queue clock
+ * @queue: queue to operate on
+ *
+ * This performs a clock-tick on @queue. The clock is incremented by a full
+ * interval (+2). The caller is free to use both, the new value (even numbered)
+ * and its successor (odd numbered). Both are uniquely allocated to the
+ * caller.
+ *
+ * Return: New clock value is returned.
+ */
+static inline u64 bus1_queue_tick(struct bus1_queue *queue)
+{
+ queue->clock += 2;
+ return queue->clock;
+}
+
+/**
+ * bus1_queue_sync() - sync queue clock
+ * @queue: queue to operate on
+ * @timestamp: timestamp to sync on
+ *
+ * This synchronizes the clock of @queue with the externally provided timestamp
+ * @timestamp. That is, the queue clock is fast-forwarded to @timestamp, in
+ * case it is newer than the queue clock. Otherwise, nothing is done.
+ *
+ * The passed in timestamp must be even.
+ *
+ * Return: New clock value is returned.
+ */
+static inline u64 bus1_queue_sync(struct bus1_queue *queue, u64 timestamp)
+{
+ WARN_ON(timestamp & 1);
+ queue->clock = max(queue->clock, timestamp);
+ return queue->clock;
+}
+
+/**
+ * bus1_queue_is_readable_rcu() - check whether a queue is readable
+ * @queue: queue to operate on
+ *
+ * This checks whether the given queue is readable.
+ *
+ * This does not require any locking, except for an rcu-read-side critical
+ * section.
+ *
+ * Return: True if the queue is readable, false if not.
+ */
+static inline bool bus1_queue_is_readable_rcu(struct bus1_queue *queue)
+{
+ return rcu_access_pointer(queue->front);
+}
+
+/**
+ * bus1_queue_compare() - comparator for queue ordering
+ * @a_ts: timestamp of first node to compare
+ * @a_g: group of first node to compare
+ * @b_ts: timestamp of second node to compare against
+ * @b_g: group of second node to compare against
+ *
+ * Messages on a message queue are ordered. This function implements the
+ * comparator used for all message ordering in queues. Two tags are used for
+ * ordering, the timestamp and the group-tag of a node. Both must be passed to
+ * this function.
+ *
+ * This compares the tuples (@a_ts, @a_g) and (@b_ts, @b_g).
+ *
+ * Return: <0 if (@a_ts, @a_g) is ordered before, >0 if after, 0 if same.
+ */
+static inline int bus1_queue_compare(u64 a_ts, void *a_g, u64 b_ts, void *b_g)
+{
+ /*
+ * This orders two possible queue nodes. As first-level ordering we
+ * use the timestamps, as second-level ordering we use the group-tag.
+ *
+ * Timestamp-based ordering should be obvious. We simply make sure that
+ * any message with a lower timestamp is always considered to be first.
+ * However, due to the distributed nature of the queue-clocks, multiple
+ * messages might end up with the same timestamp. A multicast picks the
+ * highest of its destination clocks and bumps everyone else. As such,
+ * the picked timestamp for a multicast might not be unique, if another
+ * multicast with only partial destination overlap races it and happens
+ * to get the same timestamp via a distinct destination clock. If that
+ * happens, we guarantee a stable order by comparing the group-tag of
+ * the nodes. The group-tag is only ever equal if both messages belong
+ * to the same transaction.
+ *
+ * Note that we strictly rely on any multicast to be staged before its
+ * final commit. This guarantees that if a node is queued with a commit
+ * timestamp, it can never be lower than the commit timestamp of any
+ * other committed node, except if it was already staged with a lower
+ * staging timestamp (as such it blocks the conflicting entry). This
+ * also implies that if two nodes share a timestamp, both will
+ * necessarily block each other until both are committed (since shared
+ * timestamps imply that an entry is guaranteed to be staged before a
+ * conflicting entry is committed).
+ */
+
+ if (a_ts < b_ts)
+ return -1;
+ else if (a_ts > b_ts)
+ return 1;
+ else if (a_g < b_g)
+ return -1;
+ else if (a_g > b_g)
+ return 1;
+
+ return 0;
+}
+
+#endif /* __BUS1_QUEUE_H */
--
2.10.1