Re: [PATCH 2/3] srcu: queue work without holding the lock

From: Paul E. McKenney
Date: Tue Oct 10 2017 - 17:43:25 EST


On Thu, Sep 28, 2017 at 06:10:46PM -0700, Paul E. McKenney wrote:
> On Thu, Sep 28, 2017 at 06:03:57PM +0200, Sebastian Andrzej Siewior wrote:
> > On 2017-09-22 11:46:10 [-0700], Paul E. McKenney wrote:
> > > On Fri, Sep 22, 2017 at 05:28:05PM +0200, Sebastian Andrzej Siewior wrote:
> > > > On RT we can't invoke queue_delayed_work() within an atomic section
> > > > (which is provided by raw_spin_lock_irqsave()).
> > > > srcu_reschedule() invokes queue_delayed_work() outside of the
> > > > raw_spin_lock_irq_rcu_node() section so this should be fine here, too.
> > > > If the remaining callers of call_srcu() aren't atomic
> > > > (spin_lock_irqsave() is fine) then this should work on RT, too.
> > >
> > > Just to make sure I understand... The problem is not the _irqsave,
> > > but rather the raw_?
> >
> > exactly. The _irqsave is translated into a sleeping lock on RT and does
> > not matter. The raw_ ones stay as they are and queue_delayed_work() uses
> > sleeping locks itself and this is where things fall apart.
>
> OK, internally I could get rid of raw_ at the expense of some code bloat,
> but in the call_srcu() case, the caller might well hold a raw_ lock.

Except that none currently do, so maybe downgrading from raw_ locks is
a reasonable course of action. Does the following patch help?

Thanx, Paul

------------------------------------------------------------------------

commit 7c4b15340e4e23668cb3cadbf4f76795ee495085
Author: Paul E. McKenney <paulmck@xxxxxxxxxxxxxxxxxx>
Date: Tue Oct 10 13:52:30 2017 -0700

srcu: Prohibit call_srcu() use under raw spinlocks

Invoking queue_delayed_work() while holding a raw spinlock is forbidden
in -rt kernels, which is exactly what __call_srcu() does, indirectly via
srcu_funnel_gp_start(). This commit therefore downgrades Tree SRCU's
locking from raw to non-raw spinlocks, which works because call_srcu()
is not ever called while holding a raw spinlock.

Reported-by: Sebastian Andrzej Siewior <bigeasy@xxxxxxxxxxxxx>
Signed-off-by: Paul E. McKenney <paulmck@xxxxxxxxxxxxxxxxxx>

diff --git a/include/linux/srcutree.h b/include/linux/srcutree.h
index a949f4f9e4d7..4eda108abee0 100644
--- a/include/linux/srcutree.h
+++ b/include/linux/srcutree.h
@@ -40,7 +40,7 @@ struct srcu_data {
unsigned long srcu_unlock_count[2]; /* Unlocks per CPU. */

/* Update-side state. */
- raw_spinlock_t __private lock ____cacheline_internodealigned_in_smp;
+ spinlock_t __private lock ____cacheline_internodealigned_in_smp;
struct rcu_segcblist srcu_cblist; /* List of callbacks.*/
unsigned long srcu_gp_seq_needed; /* Furthest future GP needed. */
unsigned long srcu_gp_seq_needed_exp; /* Furthest future exp GP. */
@@ -58,7 +58,7 @@ struct srcu_data {
* Node in SRCU combining tree, similar in function to rcu_data.
*/
struct srcu_node {
- raw_spinlock_t __private lock;
+ spinlock_t __private lock;
unsigned long srcu_have_cbs[4]; /* GP seq for children */
/* having CBs, but only */
/* is > ->srcu_gq_seq. */
@@ -78,7 +78,7 @@ struct srcu_struct {
struct srcu_node *level[RCU_NUM_LVLS + 1];
/* First node at each level. */
struct mutex srcu_cb_mutex; /* Serialize CB preparation. */
- raw_spinlock_t __private lock; /* Protect counters */
+ spinlock_t __private lock; /* Protect counters */
struct mutex srcu_gp_mutex; /* Serialize GP work. */
unsigned int srcu_idx; /* Current rdr array element. */
unsigned long srcu_gp_seq; /* Grace-period seq #. */
@@ -107,7 +107,7 @@ struct srcu_struct {
#define __SRCU_STRUCT_INIT(name) \
{ \
.sda = &name##_srcu_data, \
- .lock = __RAW_SPIN_LOCK_UNLOCKED(name.lock), \
+ .lock = __SPIN_LOCK_UNLOCKED(name.lock), \
.srcu_gp_seq_needed = 0 - 1, \
__SRCU_DEP_MAP_INIT(name) \
}
diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c
index 6d5880089ff6..558f9e7b283e 100644
--- a/kernel/rcu/srcutree.c
+++ b/kernel/rcu/srcutree.c
@@ -53,6 +53,42 @@ static void srcu_invoke_callbacks(struct work_struct *work);
static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay);
static void process_srcu(struct work_struct *work);

+/* Wrappers for lock acquisition and release, see raw_spin_lock_rcu_node(). */
+#define spin_lock_rcu_node(p) \
+do { \
+ spin_lock(&ACCESS_PRIVATE(p, lock)); \
+ smp_mb__after_unlock_lock(); \
+} while (0)
+
+#define spin_unlock_rcu_node(p) spin_unlock(&ACCESS_PRIVATE(p, lock))
+
+#define spin_lock_irq_rcu_node(p) \
+do { \
+ spin_lock_irq(&ACCESS_PRIVATE(p, lock)); \
+ smp_mb__after_unlock_lock(); \
+} while (0)
+
+#define spin_unlock_irq_rcu_node(p) \
+ spin_unlock_irq(&ACCESS_PRIVATE(p, lock))
+
+#define spin_lock_irqsave_rcu_node(p, flags) \
+do { \
+ spin_lock_irqsave(&ACCESS_PRIVATE(p, lock), flags); \
+ smp_mb__after_unlock_lock(); \
+} while (0)
+
+#define spin_unlock_irqrestore_rcu_node(p, flags) \
+ spin_unlock_irqrestore(&ACCESS_PRIVATE(p, lock), flags) \
+
+#define spin_trylock_rcu_node(p) \
+({ \
+ bool ___locked = spin_trylock(&ACCESS_PRIVATE(p, lock)); \
+ \
+ if (___locked) \
+ smp_mb__after_unlock_lock(); \
+ ___locked; \
+})
+
/*
* Initialize SRCU combining tree. Note that statically allocated
* srcu_struct structures might already have srcu_read_lock() and
@@ -77,7 +113,7 @@ static void init_srcu_struct_nodes(struct srcu_struct *sp, bool is_static)

/* Each pass through this loop initializes one srcu_node structure. */
rcu_for_each_node_breadth_first(sp, snp) {
- raw_spin_lock_init(&ACCESS_PRIVATE(snp, lock));
+ spin_lock_init(&ACCESS_PRIVATE(snp, lock));
WARN_ON_ONCE(ARRAY_SIZE(snp->srcu_have_cbs) !=
ARRAY_SIZE(snp->srcu_data_have_cbs));
for (i = 0; i < ARRAY_SIZE(snp->srcu_have_cbs); i++) {
@@ -111,7 +147,7 @@ static void init_srcu_struct_nodes(struct srcu_struct *sp, bool is_static)
snp_first = sp->level[level];
for_each_possible_cpu(cpu) {
sdp = per_cpu_ptr(sp->sda, cpu);
- raw_spin_lock_init(&ACCESS_PRIVATE(sdp, lock));
+ spin_lock_init(&ACCESS_PRIVATE(sdp, lock));
rcu_segcblist_init(&sdp->srcu_cblist);
sdp->srcu_cblist_invoking = false;
sdp->srcu_gp_seq_needed = sp->srcu_gp_seq;
@@ -170,7 +206,7 @@ int __init_srcu_struct(struct srcu_struct *sp, const char *name,
/* Don't re-initialize a lock while it is held. */
debug_check_no_locks_freed((void *)sp, sizeof(*sp));
lockdep_init_map(&sp->dep_map, name, key, 0);
- raw_spin_lock_init(&ACCESS_PRIVATE(sp, lock));
+ spin_lock_init(&ACCESS_PRIVATE(sp, lock));
return init_srcu_struct_fields(sp, false);
}
EXPORT_SYMBOL_GPL(__init_srcu_struct);
@@ -187,7 +223,7 @@ EXPORT_SYMBOL_GPL(__init_srcu_struct);
*/
int init_srcu_struct(struct srcu_struct *sp)
{
- raw_spin_lock_init(&ACCESS_PRIVATE(sp, lock));
+ spin_lock_init(&ACCESS_PRIVATE(sp, lock));
return init_srcu_struct_fields(sp, false);
}
EXPORT_SYMBOL_GPL(init_srcu_struct);
@@ -210,13 +246,13 @@ static void check_init_srcu_struct(struct srcu_struct *sp)
/* The smp_load_acquire() pairs with the smp_store_release(). */
if (!rcu_seq_state(smp_load_acquire(&sp->srcu_gp_seq_needed))) /*^^^*/
return; /* Already initialized. */
- raw_spin_lock_irqsave_rcu_node(sp, flags);
+ spin_lock_irqsave_rcu_node(sp, flags);
if (!rcu_seq_state(sp->srcu_gp_seq_needed)) {
- raw_spin_unlock_irqrestore_rcu_node(sp, flags);
+ spin_unlock_irqrestore_rcu_node(sp, flags);
return;
}
init_srcu_struct_fields(sp, true);
- raw_spin_unlock_irqrestore_rcu_node(sp, flags);
+ spin_unlock_irqrestore_rcu_node(sp, flags);
}

/*
@@ -513,7 +549,7 @@ static void srcu_gp_end(struct srcu_struct *sp)
mutex_lock(&sp->srcu_cb_mutex);

/* End the current grace period. */
- raw_spin_lock_irq_rcu_node(sp);
+ spin_lock_irq_rcu_node(sp);
idx = rcu_seq_state(sp->srcu_gp_seq);
WARN_ON_ONCE(idx != SRCU_STATE_SCAN2);
cbdelay = srcu_get_delay(sp);
@@ -522,7 +558,7 @@ static void srcu_gp_end(struct srcu_struct *sp)
gpseq = rcu_seq_current(&sp->srcu_gp_seq);
if (ULONG_CMP_LT(sp->srcu_gp_seq_needed_exp, gpseq))
sp->srcu_gp_seq_needed_exp = gpseq;
- raw_spin_unlock_irq_rcu_node(sp);
+ spin_unlock_irq_rcu_node(sp);
mutex_unlock(&sp->srcu_gp_mutex);
/* A new grace period can start at this point. But only one. */

@@ -530,7 +566,7 @@ static void srcu_gp_end(struct srcu_struct *sp)
idx = rcu_seq_ctr(gpseq) % ARRAY_SIZE(snp->srcu_have_cbs);
idxnext = (idx + 1) % ARRAY_SIZE(snp->srcu_have_cbs);
rcu_for_each_node_breadth_first(sp, snp) {
- raw_spin_lock_irq_rcu_node(snp);
+ spin_lock_irq_rcu_node(snp);
cbs = false;
if (snp >= sp->level[rcu_num_lvls - 1])
cbs = snp->srcu_have_cbs[idx] == gpseq;
@@ -540,7 +576,7 @@ static void srcu_gp_end(struct srcu_struct *sp)
snp->srcu_gp_seq_needed_exp = gpseq;
mask = snp->srcu_data_have_cbs[idx];
snp->srcu_data_have_cbs[idx] = 0;
- raw_spin_unlock_irq_rcu_node(snp);
+ spin_unlock_irq_rcu_node(snp);
if (cbs)
srcu_schedule_cbs_snp(sp, snp, mask, cbdelay);

@@ -548,11 +584,11 @@ static void srcu_gp_end(struct srcu_struct *sp)
if (!(gpseq & counter_wrap_check))
for (cpu = snp->grplo; cpu <= snp->grphi; cpu++) {
sdp = per_cpu_ptr(sp->sda, cpu);
- raw_spin_lock_irqsave_rcu_node(sdp, flags);
+ spin_lock_irqsave_rcu_node(sdp, flags);
if (ULONG_CMP_GE(gpseq,
sdp->srcu_gp_seq_needed + 100))
sdp->srcu_gp_seq_needed = gpseq;
- raw_spin_unlock_irqrestore_rcu_node(sdp, flags);
+ spin_unlock_irqrestore_rcu_node(sdp, flags);
}
}

@@ -560,17 +596,17 @@ static void srcu_gp_end(struct srcu_struct *sp)
mutex_unlock(&sp->srcu_cb_mutex);

/* Start a new grace period if needed. */
- raw_spin_lock_irq_rcu_node(sp);
+ spin_lock_irq_rcu_node(sp);
gpseq = rcu_seq_current(&sp->srcu_gp_seq);
if (!rcu_seq_state(gpseq) &&
ULONG_CMP_LT(gpseq, sp->srcu_gp_seq_needed)) {
srcu_gp_start(sp);
- raw_spin_unlock_irq_rcu_node(sp);
+ spin_unlock_irq_rcu_node(sp);
/* Throttle expedited grace periods: Should be rare! */
srcu_reschedule(sp, rcu_seq_ctr(gpseq) & 0x3ff
? 0 : SRCU_INTERVAL);
} else {
- raw_spin_unlock_irq_rcu_node(sp);
+ spin_unlock_irq_rcu_node(sp);
}
}

@@ -590,18 +626,18 @@ static void srcu_funnel_exp_start(struct srcu_struct *sp, struct srcu_node *snp,
if (rcu_seq_done(&sp->srcu_gp_seq, s) ||
ULONG_CMP_GE(READ_ONCE(snp->srcu_gp_seq_needed_exp), s))
return;
- raw_spin_lock_irqsave_rcu_node(snp, flags);
+ spin_lock_irqsave_rcu_node(snp, flags);
if (ULONG_CMP_GE(snp->srcu_gp_seq_needed_exp, s)) {
- raw_spin_unlock_irqrestore_rcu_node(snp, flags);
+ spin_unlock_irqrestore_rcu_node(snp, flags);
return;
}
WRITE_ONCE(snp->srcu_gp_seq_needed_exp, s);
- raw_spin_unlock_irqrestore_rcu_node(snp, flags);
+ spin_unlock_irqrestore_rcu_node(snp, flags);
}
- raw_spin_lock_irqsave_rcu_node(sp, flags);
+ spin_lock_irqsave_rcu_node(sp, flags);
if (!ULONG_CMP_LT(sp->srcu_gp_seq_needed_exp, s))
sp->srcu_gp_seq_needed_exp = s;
- raw_spin_unlock_irqrestore_rcu_node(sp, flags);
+ spin_unlock_irqrestore_rcu_node(sp, flags);
}

/*
@@ -623,12 +659,12 @@ static void srcu_funnel_gp_start(struct srcu_struct *sp, struct srcu_data *sdp,
for (; snp != NULL; snp = snp->srcu_parent) {
if (rcu_seq_done(&sp->srcu_gp_seq, s) && snp != sdp->mynode)
return; /* GP already done and CBs recorded. */
- raw_spin_lock_irqsave_rcu_node(snp, flags);
+ spin_lock_irqsave_rcu_node(snp, flags);
if (ULONG_CMP_GE(snp->srcu_have_cbs[idx], s)) {
snp_seq = snp->srcu_have_cbs[idx];
if (snp == sdp->mynode && snp_seq == s)
snp->srcu_data_have_cbs[idx] |= sdp->grpmask;
- raw_spin_unlock_irqrestore_rcu_node(snp, flags);
+ spin_unlock_irqrestore_rcu_node(snp, flags);
if (snp == sdp->mynode && snp_seq != s) {
srcu_schedule_cbs_sdp(sdp, do_norm
? SRCU_INTERVAL
@@ -644,11 +680,11 @@ static void srcu_funnel_gp_start(struct srcu_struct *sp, struct srcu_data *sdp,
snp->srcu_data_have_cbs[idx] |= sdp->grpmask;
if (!do_norm && ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, s))
snp->srcu_gp_seq_needed_exp = s;
- raw_spin_unlock_irqrestore_rcu_node(snp, flags);
+ spin_unlock_irqrestore_rcu_node(snp, flags);
}

/* Top of tree, must ensure the grace period will be started. */
- raw_spin_lock_irqsave_rcu_node(sp, flags);
+ spin_lock_irqsave_rcu_node(sp, flags);
if (ULONG_CMP_LT(sp->srcu_gp_seq_needed, s)) {
/*
* Record need for grace period s. Pair with load
@@ -667,7 +703,7 @@ static void srcu_funnel_gp_start(struct srcu_struct *sp, struct srcu_data *sdp,
queue_delayed_work(system_power_efficient_wq, &sp->work,
srcu_get_delay(sp));
}
- raw_spin_unlock_irqrestore_rcu_node(sp, flags);
+ spin_unlock_irqrestore_rcu_node(sp, flags);
}

/*
@@ -830,7 +866,7 @@ void __call_srcu(struct srcu_struct *sp, struct rcu_head *rhp,
rhp->func = func;
local_irq_save(flags);
sdp = this_cpu_ptr(sp->sda);
- raw_spin_lock_rcu_node(sdp);
+ spin_lock_rcu_node(sdp);
rcu_segcblist_enqueue(&sdp->srcu_cblist, rhp, false);
rcu_segcblist_advance(&sdp->srcu_cblist,
rcu_seq_current(&sp->srcu_gp_seq));
@@ -844,7 +880,7 @@ void __call_srcu(struct srcu_struct *sp, struct rcu_head *rhp,
sdp->srcu_gp_seq_needed_exp = s;
needexp = true;
}
- raw_spin_unlock_irqrestore_rcu_node(sdp, flags);
+ spin_unlock_irqrestore_rcu_node(sdp, flags);
if (needgp)
srcu_funnel_gp_start(sp, sdp, s, do_norm);
else if (needexp)
@@ -900,7 +936,7 @@ static void __synchronize_srcu(struct srcu_struct *sp, bool do_norm)

/*
* Make sure that later code is ordered after the SRCU grace
- * period. This pairs with the raw_spin_lock_irq_rcu_node()
+ * period. This pairs with the spin_lock_irq_rcu_node()
* in srcu_invoke_callbacks(). Unlike Tree RCU, this is needed
* because the current CPU might have been totally uninvolved with
* (and thus unordered against) that grace period.
@@ -1024,7 +1060,7 @@ void srcu_barrier(struct srcu_struct *sp)
*/
for_each_possible_cpu(cpu) {
sdp = per_cpu_ptr(sp->sda, cpu);
- raw_spin_lock_irq_rcu_node(sdp);
+ spin_lock_irq_rcu_node(sdp);
atomic_inc(&sp->srcu_barrier_cpu_cnt);
sdp->srcu_barrier_head.func = srcu_barrier_cb;
debug_rcu_head_queue(&sdp->srcu_barrier_head);
@@ -1033,7 +1069,7 @@ void srcu_barrier(struct srcu_struct *sp)
debug_rcu_head_unqueue(&sdp->srcu_barrier_head);
atomic_dec(&sp->srcu_barrier_cpu_cnt);
}
- raw_spin_unlock_irq_rcu_node(sdp);
+ spin_unlock_irq_rcu_node(sdp);
}

/* Remove the initial count, at which point reaching zero can happen. */
@@ -1082,17 +1118,17 @@ static void srcu_advance_state(struct srcu_struct *sp)
*/
idx = rcu_seq_state(smp_load_acquire(&sp->srcu_gp_seq)); /* ^^^ */
if (idx == SRCU_STATE_IDLE) {
- raw_spin_lock_irq_rcu_node(sp);
+ spin_lock_irq_rcu_node(sp);
if (ULONG_CMP_GE(sp->srcu_gp_seq, sp->srcu_gp_seq_needed)) {
WARN_ON_ONCE(rcu_seq_state(sp->srcu_gp_seq));
- raw_spin_unlock_irq_rcu_node(sp);
+ spin_unlock_irq_rcu_node(sp);
mutex_unlock(&sp->srcu_gp_mutex);
return;
}
idx = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
if (idx == SRCU_STATE_IDLE)
srcu_gp_start(sp);
- raw_spin_unlock_irq_rcu_node(sp);
+ spin_unlock_irq_rcu_node(sp);
if (idx != SRCU_STATE_IDLE) {
mutex_unlock(&sp->srcu_gp_mutex);
return; /* Someone else started the grace period. */
@@ -1141,19 +1177,19 @@ static void srcu_invoke_callbacks(struct work_struct *work)
sdp = container_of(work, struct srcu_data, work.work);
sp = sdp->sp;
rcu_cblist_init(&ready_cbs);
- raw_spin_lock_irq_rcu_node(sdp);
+ spin_lock_irq_rcu_node(sdp);
rcu_segcblist_advance(&sdp->srcu_cblist,
rcu_seq_current(&sp->srcu_gp_seq));
if (sdp->srcu_cblist_invoking ||
!rcu_segcblist_ready_cbs(&sdp->srcu_cblist)) {
- raw_spin_unlock_irq_rcu_node(sdp);
+ spin_unlock_irq_rcu_node(sdp);
return; /* Someone else on the job or nothing to do. */
}

/* We are on the job! Extract and invoke ready callbacks. */
sdp->srcu_cblist_invoking = true;
rcu_segcblist_extract_done_cbs(&sdp->srcu_cblist, &ready_cbs);
- raw_spin_unlock_irq_rcu_node(sdp);
+ spin_unlock_irq_rcu_node(sdp);
rhp = rcu_cblist_dequeue(&ready_cbs);
for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) {
debug_rcu_head_unqueue(rhp);
@@ -1166,13 +1202,13 @@ static void srcu_invoke_callbacks(struct work_struct *work)
* Update counts, accelerate new callbacks, and if needed,
* schedule another round of callback invocation.
*/
- raw_spin_lock_irq_rcu_node(sdp);
+ spin_lock_irq_rcu_node(sdp);
rcu_segcblist_insert_count(&sdp->srcu_cblist, &ready_cbs);
(void)rcu_segcblist_accelerate(&sdp->srcu_cblist,
rcu_seq_snap(&sp->srcu_gp_seq));
sdp->srcu_cblist_invoking = false;
more = rcu_segcblist_ready_cbs(&sdp->srcu_cblist);
- raw_spin_unlock_irq_rcu_node(sdp);
+ spin_unlock_irq_rcu_node(sdp);
if (more)
srcu_schedule_cbs_sdp(sdp, 0);
}
@@ -1185,7 +1221,7 @@ static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay)
{
bool pushgp = true;

- raw_spin_lock_irq_rcu_node(sp);
+ spin_lock_irq_rcu_node(sp);
if (ULONG_CMP_GE(sp->srcu_gp_seq, sp->srcu_gp_seq_needed)) {
if (!WARN_ON_ONCE(rcu_seq_state(sp->srcu_gp_seq))) {
/* All requests fulfilled, time to go idle. */
@@ -1195,7 +1231,7 @@ static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay)
/* Outstanding request and no GP. Start one. */
srcu_gp_start(sp);
}
- raw_spin_unlock_irq_rcu_node(sp);
+ spin_unlock_irq_rcu_node(sp);

if (pushgp)
queue_delayed_work(system_power_efficient_wq, &sp->work, delay);