Re: [PATCH] blk-wbt: get back the missed wakeup from __wbt_done
From: Jens Axboe
Date: Mon Aug 27 2018 - 10:51:51 EST
On 8/27/18 12:15 AM, jianchao.wang wrote:
>
>
> On 08/27/2018 11:52 AM, jianchao.wang wrote:
>> Hi Jens
>>
>> On 08/25/2018 11:41 PM, Jens Axboe wrote:
>>> do {
>>> - set_current_state(TASK_UNINTERRUPTIBLE);
>>> + if (test_bit(0, &data.flags))
>>> + break;
>>>
>>> - if (!has_sleeper && rq_wait_inc_below(rqw, get_limit(rwb, rw)))
>>> + WARN_ON_ONCE(list_empty(&data.wq.entry));
>>> +
>>> + if (!has_sleeper &&
>>> + rq_wait_inc_below(rqw, get_limit(rwb, rw))) {
>>> + finish_wait(&rqw->wait, &data.wq);
>>> +
>>> + /*
>>> + * We raced with wbt_wake_function() getting a token,
>>> + * which means we now have two. Put ours and wake
>>> + * anyone else potentially waiting for one.
>>> + */
>>> + if (test_bit(0, &data.flags))
>>> + wbt_rqw_done(rwb, rqw, wb_acct);
>>> break;
>>
>> Just use 'bool' variable should be OK
>> After finish_wait, no one could race with us here.
>>
>>> + }
>>>
>>> if (lock) {
>>> spin_unlock_irq(lock);
>>> @@ -511,11 +569,11 @@ static void __wbt_wait(struct rq_wb *rwb, enum wbt_flags wb_acct,
>>> spin_lock_irq(lock);
>>> } else
>>> io_schedule();
>>> +
>>> has_sleeper = false;
>>> } while (1);
>>
>> I cannot get the point of "since we can't rely on just being woken from the ->func handler
>> we set".
>> Do you mean there could be someone else could wake up this task ?
Yeah, you don't know for a fact that the wbt wait queue is the only
guy waking us up. Any sleep like this needs a loop. It was quite
easy to reproduce for me, and as expected, you'll get list corruption
on the wait queue since we leave it on the list and the stack goes
away.
> If we do need a recheck after the io_schedule, we could do as following:
>
> static void __wbt_wait(struct rq_wb *rwb, enum wbt_flags wb_acct,
> unsigned long rw, spinlock_t *lock)
> __releases(lock)
> __acquires(lock)
> {
> struct rq_wait *rqw = get_rq_wait(rwb, wb_acct);
> struct wbt_wait_data data = {
> .wq = {
> .func = wbt_wake_function,
> .entry = LIST_HEAD_INIT(data.wq.entry),
> },
> .curr = current,
> .rwb = rwb,
> .rqw = rqw,
> .rw = rw,
> };
> bool has_sleeper;
> bool got = false;
>
> retry:
> has_sleeper = wq_has_sleeper(&rqw->wait);
> if (!has_sleeper && rq_wait_inc_below(rqw, get_limit(rwb, rw)))
> return;
>
> prepare_to_wait_exclusive(&rqw->wait, &data.wq, TASK_UNINTERRUPTIBLE);
>
> if (!has_sleeper && rq_wait_inc_below(rqw, get_limit(rwb, rw))) {
> got = true;
> goto out;
> }
>
> if (lock) {
> spin_unlock_irq(lock);
> io_schedule();
> spin_lock_irq(lock);
> } else
> io_schedule();
>
> out:
> finish_wait(&rqw->wait, &data.wq);
>
> /*
> * We raced with wbt_wake_function() getting a token,
> * which means we now have two. Put ours and wake
> * anyone else potentially waiting for one.
> */
> if (data.got && got)
> wbt_rqw_done(rwb, rqw, wb_acct);
> else if (!data.got && !got)
> goto retry;
I think the other variant is cleaner and easier to read. This is just
a natural loop, I don't think we need to use goto's here.
FWIW, I split it into two patches, current version is here:
http://git.kernel.dk/cgit/linux-block/log/?h=for-linus
--
Jens Axboe