[RFC PATCH 5/7] locking/rtqspinlock: Handle priority boosting
From: Waiman Long
Date: Tue Jan 03 2017 - 13:02:02 EST
As the priority of a task may get boosted due to an acquired rtmutex,
we will need to periodically check the task priority to see if it
gets boosted. For an originally non-RT task, that means unqueuing from
the MCS wait queue before doing an RT spinning. So the unqueuing code
from osq_lock is borrowed to handle the unqueuing aspect of it.
Signed-off-by: Waiman Long <longman@xxxxxxxxxx>
---
kernel/locking/qspinlock.c | 25 ++++-
kernel/locking/qspinlock_rt.h | 235 +++++++++++++++++++++++++++++++++++++++++-
2 files changed, 253 insertions(+), 7 deletions(-)
diff --git a/kernel/locking/qspinlock.c b/kernel/locking/qspinlock.c
index cb5c2e5..b25ad58 100644
--- a/kernel/locking/qspinlock.c
+++ b/kernel/locking/qspinlock.c
@@ -270,10 +270,18 @@ static __always_inline u32 __pv_wait_head_or_lock(struct qspinlock *lock,
/*
* Realtime queued spinlock is mutual exclusive with native and PV spinlocks.
*/
+#define RT_RETRY ((u32)-1)
+
#ifdef CONFIG_REALTIME_QUEUED_SPINLOCKS
#include "qspinlock_rt.h"
#else
-static __always_inline u32 rt_wait_head_or_retry(struct qspinlock *lock)
+static __always_inline void rt_init_node(struct mcs_spinlock *node, u32 tail) {}
+static __always_inline bool rt_wait_node_or_unqueue(struct qspinlock *lock,
+ struct mcs_spinlock *node,
+ struct mcs_spinlock *prev)
+ { return false; }
+static __always_inline u32 rt_spin_lock_or_retry(struct qspinlock *lock,
+ struct mcs_spinlock *node)
{ return 0; }
#define rt_pending(v) 0
#define rt_enabled() false
@@ -514,6 +522,7 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
node->locked = 0;
node->next = NULL;
pv_init_node(node);
+ rt_init_node(node, tail);
/*
* We touched a (possibly) cold cacheline in the per-cpu queue node;
@@ -552,6 +561,10 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
WRITE_ONCE(prev->next, node);
pv_wait_node(node, prev);
+
+ if (rt_wait_node_or_unqueue(lock, node, prev))
+ goto queue; /* Retry RT locking */
+
arch_mcs_spin_lock_contended(&node->locked);
/*
@@ -591,10 +604,14 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
/*
* The RT rt_wait_head_or_lock function, if active, will acquire the
- * lock and return a non-zero value.
+ * lock, release the MCS lock and return a non-zero value. We need to
+ * retry with RT spinning when RT_RETRY is returned.
*/
- if ((val = rt_wait_head_or_retry(lock)))
- goto locked;
+ val = rt_spin_lock_or_retry(lock, node);
+ if (val == RT_RETRY)
+ goto queue;
+ else if (val)
+ return;
val = smp_cond_load_acquire(&lock->val.counter, !(VAL & _Q_LOCKED_PENDING_MASK));
diff --git a/kernel/locking/qspinlock_rt.h b/kernel/locking/qspinlock_rt.h
index c2f2722..0c4d051 100644
--- a/kernel/locking/qspinlock_rt.h
+++ b/kernel/locking/qspinlock_rt.h
@@ -37,6 +37,12 @@
* doing so, we can query the highest priority task that is waiting on the
* outer lock and adjust our waiting priority accordingly. To speed up nested
* spinlock calls, they will have a minimum RT priority of 1 to begin with.
+ *
+ * To handle priority boosting due to an acquired rt-mutex, The task->prio
+ * field is queried in each iteration of the loop. For originally non-RT tasks,
+ * it will have to break out of the MCS wait queue just like what is done
+ * in the OSQ lock. Then it has to retry RT spinning if it has been boosted
+ * to RT priority.
*/
#include <linux/sched.h>
@@ -45,9 +51,56 @@
#endif
/*
+ * For proper unqueuing from the MCS wait queue, we need to store the encoded
+ * tail code as well the previous node pointer into the extra MCS node. Since
+ * CPUs in interrupt context won't use the per-CPU MCS nodes anymore. So only
+ * one is needed for process context CPUs. As a result, we can use the
+ * additional nodes for data storage. Here, we allow 2 nodes per cpu in case
+ * we want to put softIRQ CPUs into the queue as well.
+ */
+struct rt_node {
+ struct mcs_spinlock mcs;
+ struct mcs_spinlock __reserved;
+ struct mcs_spinlock *prev;
+ u32 tail;
+};
+
+/*
* =========================[ Helper Functions ]=========================
*/
+static u32 cmpxchg_tail_acquire(struct qspinlock *lock, u32 old, u32 new)
+{
+ struct __qspinlock *l = (void *)lock;
+
+ return cmpxchg_acquire(&l->tail, old >> _Q_TAIL_OFFSET,
+ new >> _Q_TAIL_OFFSET) << _Q_TAIL_OFFSET;
+}
+
+static u32 cmpxchg_tail_release(struct qspinlock *lock, u32 old, u32 new)
+{
+ struct __qspinlock *l = (void *)lock;
+
+ return cmpxchg_release(&l->tail, old >> _Q_TAIL_OFFSET,
+ new >> _Q_TAIL_OFFSET) << _Q_TAIL_OFFSET;
+}
+
+static inline void rt_write_prev(struct mcs_spinlock *node,
+ struct mcs_spinlock *prev)
+{
+ WRITE_ONCE(((struct rt_node *)node)->prev, prev);
+}
+
+static inline u32 rt_read_tail(struct mcs_spinlock *node)
+{
+ return READ_ONCE(((struct rt_node *)node)->tail);
+}
+
+static inline struct mcs_spinlock *rt_read_prev(struct mcs_spinlock *node)
+{
+ return READ_ONCE(((struct rt_node *)node)->prev);
+}
+
/*
* Translate the priority of a task to an equivalent RT priority
*/
@@ -141,6 +194,56 @@ static bool __rt_spin_trylock(struct qspinlock *lock,
}
/*
+ * MCS wait queue unqueuing code, borrow mostly from osq_lock.c.
+ */
+static struct mcs_spinlock *
+mcsq_wait_next(struct qspinlock *lock, struct mcs_spinlock *node,
+ struct mcs_spinlock *prev)
+{
+ struct mcs_spinlock *next = NULL;
+ u32 tail = rt_read_tail(node);
+ u32 old;
+
+ /*
+ * If there is a prev node in queue, the 'old' value will be
+ * the prev node's tail value. Otherwise, it's set to 0 since if
+ * we're the only one in queue, the queue will then become empty.
+ */
+ old = prev ? rt_read_tail(prev) : 0;
+
+ for (;;) {
+ if ((atomic_read(&lock->val) & _Q_TAIL_MASK) == tail &&
+ cmpxchg_tail_acquire(lock, tail, old) == tail) {
+ /*
+ * We are at the queue tail, we moved the @lock back.
+ * @prev will now observe @lock and will complete its
+ * unlock()/unqueue().
+ */
+ break;
+ }
+
+ /*
+ * We must xchg() the @node->next value, because if we were to
+ * leave it in, a concurrent unlock()/unqueue() from
+ * @node->next might complete Step-A and think its @prev is
+ * still valid.
+ *
+ * If the concurrent unlock()/unqueue() wins the race, we'll
+ * wait for either @lock to point to us, through its Step-B, or
+ * wait for a new @node->next from its Step-C.
+ */
+ if (node->next) {
+ next = xchg(&node->next, NULL);
+ if (next)
+ break;
+ }
+
+ cpu_relax();
+ }
+ return next;
+}
+
+/*
* ====================[ Functions Used by qspinlock.c ]====================
*/
@@ -158,6 +261,17 @@ static inline int rt_pending(int val)
}
/*
+ * Initialize the RT fields of a MCS node.
+ */
+static inline void rt_init_node(struct mcs_spinlock *node, u32 tail)
+{
+ struct rt_node *n = (struct rt_node *)node;
+
+ n->prev = NULL;
+ n->tail = tail;
+}
+
+/*
* Return: true if locked acquired
* false if queuing in the MCS wait queue is needed.
*/
@@ -167,16 +281,98 @@ static inline bool rt_spin_trylock(struct qspinlock *lock)
}
/*
+ * Return: true if it has been unqueued and need to retry locking.
+ * false if it becomes the wait queue head & proceed to next step.
+ */
+static bool rt_wait_node_or_unqueue(struct qspinlock *lock,
+ struct mcs_spinlock *node,
+ struct mcs_spinlock *prev)
+{
+ struct mcs_spinlock *next;
+
+ rt_write_prev(node, prev); /* Save previous node pointer */
+
+ while (!READ_ONCE(node->locked)) {
+ if (rt_task_priority(current, 0))
+ goto unqueue;
+ cpu_relax();
+ }
+ return false;
+
+unqueue:
+ /*
+ * Step - A -- stabilize @prev
+ *
+ * Undo our @prev->next assignment; this will make @prev's
+ * unlock()/unqueue() wait for a next pointer since @lock points
+ * to us (or later).
+ */
+ for (;;) {
+ if (prev->next == node &&
+ cmpxchg(&prev->next, node, NULL) == node)
+ break;
+
+ /*
+ * We can only fail the cmpxchg() racing against an unlock(),
+ * in which case we should observe @node->locked becoming
+ * true.
+ */
+ if (smp_load_acquire(&node->locked))
+ return false;
+
+ cpu_relax();
+
+ /*
+ * Or we race against a concurrent unqueue()'s step-B, in which
+ * case its step-C will write us a new @node->prev pointer.
+ */
+ prev = rt_read_prev(node);
+ }
+
+ /*
+ * Step - B -- stabilize @next
+ *
+ * Similar to unlock(), wait for @node->next or move @lock from @node
+ * back to @prev.
+ */
+ next = mcsq_wait_next(lock, node, prev);
+
+ /*
+ * Step - C -- unlink
+ *
+ * @prev is stable because its still waiting for a new @prev->next
+ * pointer, @next is stable because our @node->next pointer is NULL and
+ * it will wait in Step-A.
+ */
+ if (next) {
+ rt_write_prev(next, prev);
+ WRITE_ONCE(prev->next, next);
+ }
+
+ /*
+ * Release the node.
+ */
+ __this_cpu_dec(mcs_nodes[0].count);
+
+ return true; /* Need to retry RT spinning */
+}
+
+/*
* We need to make the non-RT tasks wait longer if RT tasks are spinning for
* the lock. This is done to reduce the chance that a non-RT task may
* accidently grab the lock away from the RT tasks in the short interval
* where the pending priority may be reset after an RT task acquires the lock.
*
- * Return: Current value of the lock.
+ * Return: RT_RETRY if it needs to retry locking.
+ * 1 if lock acquired.
*/
-static u32 rt_wait_head_or_retry(struct qspinlock *lock)
+static u32 rt_spin_lock_or_retry(struct qspinlock *lock,
+ struct mcs_spinlock *node)
{
struct __qspinlock *l = (void *)lock;
+ struct mcs_spinlock *next;
+ bool retry = false;
+ u32 tail;
for (;;) {
u16 lockpend = READ_ONCE(l->locked_pending);
@@ -187,6 +383,14 @@ static u32 rt_wait_head_or_retry(struct qspinlock *lock)
if (!lockpend)
break;
}
+ /*
+ * We need to break out of the non-RT wait queue and do
+ * RT spinnning if we become an RT task.
+ */
+ if (rt_task_priority(current, 0)) {
+ retry = true;
+ goto unlock;
+ }
/*
* 4 cpu_relax's if RT tasks present.
@@ -198,7 +402,32 @@ static u32 rt_wait_head_or_retry(struct qspinlock *lock)
}
cpu_relax();
}
- return atomic_read(&lock->val);
+
+unlock:
+ /*
+ * Remove itself from the MCS wait queue (unlock).
+ */
+ tail = rt_read_tail(node);
+ if (cmpxchg_tail_release(lock, tail, 0) == tail)
+ goto release;
+
+ /*
+ * Second case.
+ */
+ next = xchg(&node->next, NULL);
+ if (!next)
+ next = mcsq_wait_next(lock, node, NULL);
+
+ if (next)
+ WRITE_ONCE(next->locked, 1);
+
+release:
+ /*
+ * Release the node.
+ */
+ __this_cpu_dec(mcs_nodes[0].count);
+
+ return retry ? RT_RETRY : 1;
}
/*
--
1.8.3.1