[PATCH RFC v3 3/3] sbitmap: fix that 'wait_cnt' can be decreased while waitqueue is empty

From: Yu Kuai
Date: Sun Jul 10 2022 - 00:09:50 EST


From: Yu Kuai <yukuai3@xxxxxxxxxx>

As pointed out by Jan Kara, following race is still possible:

CPU1 CPU2
__sbq_wake_up __sbq_wake_up
sbq_wake_ptr() sbq_wake_ptr() -> the same
wait_cnt = atomic_dec_return()
/* decreased to 0 */
sbq_index_atomic_inc()
/* move to next waitqueue */
atomic_set()
/* reset wait_cnt */
wake_up_nr()
/* wake up on the old waitqueue */
wait_cnt = atomic_dec_return()
/*
* decrease wait_cnt in the old
* waitqueue, while it can be
* empty.
*/

Fix the problem by waking up before updating 'wake_index' and
'wait_cnt'.

With this patch, noted that 'wait_cnt' is still decreased in the old
empty waitqueue, however, the wakeup is redirected to a active waitqueue,
and the extra decrement on the old empty waitqueue is not handled.

Fixes: 88459642cba4 ("blk-mq: abstract tag allocation out into sbitmap library")
Signed-off-by: Yu Kuai <yukuai3@xxxxxxxxxx>
---
lib/sbitmap.c | 20 +++++++++++++++-----
1 file changed, 15 insertions(+), 5 deletions(-)

diff --git a/lib/sbitmap.c b/lib/sbitmap.c
index 57095dd88a33..55826ebbe7db 100644
--- a/lib/sbitmap.c
+++ b/lib/sbitmap.c
@@ -616,22 +616,33 @@ static bool __sbq_wake_up(struct sbitmap_queue *sbq)
return false;

wait_cnt = atomic_dec_return(&ws->wait_cnt);
- if (wait_cnt > 0)
- return false;
-
/*
* For concurrent callers of this, callers should call this function
* again to wakeup a new batch on a different 'ws'.
*/
- if (wait_cnt < 0)
+ if (wait_cnt < 0 || !waitqueue_active(&ws->wait))
return true;

+ if (wait_cnt > 0)
+ return false;
+
wake_batch = READ_ONCE(sbq->wake_batch);

+ /*
+ * Wake up first in case that concurrent callers decrease wait_cnt
+ * while waitqueue is empty.
+ */
+ wake_up_nr(&ws->wait, wake_batch);
+
/*
* Pairs with the memory barrier in sbitmap_queue_resize() to
* ensure that we see the batch size update before the wait
* count is reset.
+ *
+ * Also pairs with the implicit barrier between becrementing wait_cnt
+ * and checking for waitqueue_active() to make sure waitqueue_active()
+ * sees result of the wakeup if atomic_dec_return() has seen the result
+ * of atomic_set().
*/
smp_mb__before_atomic();

@@ -642,7 +653,6 @@ static bool __sbq_wake_up(struct sbitmap_queue *sbq)
*/
sbq_index_atomic_inc(&sbq->wake_index);
atomic_set(&ws->wait_cnt, wake_batch);
- wake_up_nr(&ws->wait, wake_batch);

return false;
}
--
2.31.1