Re: kvm: use-after-free in process_srcu
From: Paul McKenney
Date: Thu Jan 19 2017 - 17:03:35 EST
(Trouble with VPN, so replying from gmail.)
On Thu, Jan 19, 2017 at 1:27 AM, Paolo Bonzini <pbonzini@xxxxxxxxxx> wrote:
>
>
> On 18/01/2017 23:15, Paul E. McKenney wrote:
>> On Wed, Jan 18, 2017 at 09:53:19AM +0100, Paolo Bonzini wrote:
>>>
>>>
>>> On 17/01/2017 21:34, Paul E. McKenney wrote:
>>>> Do any of your callback functions invoke call_srcu()? (Hey, I have to ask!)
>>>
>>> No, we only use synchronize_srcu and synchronize_srcu_expedited, so our
>>> only callback comes from there.
>>
>> OK, so the next question is whether your code makes sure that all of its
>> synchronize_srcu() and synchronize_srcu_expedited() calls return before
>> the call to cleanup_srcu_struct().
>
> It certainly should! Or at least that would be our bug.
>
>> You should only need srcu_barrier() if there were calls to call_srcu().
>> Given that you only have synchronize_srcu() and synchronize_srcu_expedited(),
>> you -don't- need srcu_barrier(). What you need instead is to make sure
>> that all synchronize_srcu() and synchronize_srcu_expedited() have
>> returned before the call to cleanup_srcu_struct().
>
> Ok, good.
>
>>> If this is incorrect, then one flush_delayed_work is enough. If it is
>>> correct, the possible alternatives are:
>>>
>>> * srcu_barrier in the caller, flush_delayed_work+WARN_ON(sp->running) in
>>> cleanup_srcu_struct. I strongly dislike this one---because we don't use
>>> call_srcu at all, there should be no reason to use srcu_barrier in KVM
>>> code. Plus I think all other users have the same issue.
>>>
>>> * srcu_barrier+flush_delayed_work+WARN_ON(sp->running) in
>>> cleanup_srcu_struct
>>>
>>> * flush_delayed_work+flush_delayed_work+WARN_ON(sp->running) in
>>> cleanup_srcu_struct
>>>
>>> * while(flush_delayed_work) in cleanup_srcu_struct
>>>
>>> * "while(sp->running) flush_delayed_work" in cleanup_srcu_struct
>>
>> My current thought is flush_delayed_work() followed by a warning if
>> there are any callbacks still posted, and also as you say sp->running.
>
> Yes, that would work for KVM and anyone else who doesn't use call_srcu
> (and order synchronize_srcu correctly against destruction).
>
> On the other hand, users of call_srcu, such as rcutorture, _do_ need to
> place an srcu_barrier before cleanup_srcu_struct, or they need two
> flush_delayed_work() calls back to back in cleanup_srcu_struct. Do you
> agree?
The reason I am resisting the notion of placing an srcu_barrier() in
the cleanup_srcu_struct path is that most users don't use call_srcu(),
and I don't feel that we should be inflicting the srcu_barrier()
performance penalty on them.
So I agree with the user invoking call_srcu() after preventing future
calls to call_srcu(), and with there being a flush_delayed_work() in
cleanup_srcu_struct(). As in the untested (and probably
whitespace-mangled) patch below.
Thoughts?
Thanx, Paul
diff --git a/kernel/rcu/srcu.c b/kernel/rcu/srcu.c
index f2abfbae258c..5813ed848821 100644
--- a/kernel/rcu/srcu.c
+++ b/kernel/rcu/srcu.c
@@ -65,6 +65,17 @@ static inline bool rcu_batch_empty(struct rcu_batch *b)
}
/*
+ * Are all batches empty for the specified srcu_struct?
+ */
+static inline bool rcu_all_batches_empty(struct srcu_struct *sp)
+{
+ return rcu_batch_empty(&sp->batch_done) &&
+ rcu_batch_empty(&sp->batch_check1) &&
+ rcu_batch_empty(&sp->batch_check0) &&
+ rcu_batch_empty(&sp->batch_queue);
+}
+
+/*
* Remove the callback at the head of the specified rcu_batch structure
* and return a pointer to it, or return NULL if the structure is empty.
*/
@@ -251,6 +262,11 @@ void cleanup_srcu_struct(struct srcu_struct *sp)
{
if (WARN_ON(srcu_readers_active(sp)))
return; /* Leakage unless caller handles error. */
+ if (WARN_ON(!rcu_all_batches_empty(sp)))
+ return; /* Leakage unless caller handles error. */
+ flush_delayed_work(&sp->work);
+ if (WARN_ON(sp->running))
+ return; /* Caller forgot to stop doing call_srcu()? */
free_percpu(sp->per_cpu_ref);
sp->per_cpu_ref = NULL;
}
@@ -610,15 +626,9 @@ static void srcu_reschedule(struct srcu_struct *sp)
{
bool pending = true;
- if (rcu_batch_empty(&sp->batch_done) &&
- rcu_batch_empty(&sp->batch_check1) &&
- rcu_batch_empty(&sp->batch_check0) &&
- rcu_batch_empty(&sp->batch_queue)) {
+ if (rcu_all_batches_empty(sp)) {
spin_lock_irq(&sp->queue_lock);
- if (rcu_batch_empty(&sp->batch_done) &&
- rcu_batch_empty(&sp->batch_check1) &&
- rcu_batch_empty(&sp->batch_check0) &&
- rcu_batch_empty(&sp->batch_queue)) {
+ if (rcu_all_batches_empty(sp)) {
sp->running = false;
pending = false;
}