[PATCH net-next v2 2/2] ptr_ring: make __ptr_ring_empty() checking more reliable

From: Yunsheng Lin
Date: Thu Jun 24 2021 - 23:19:33 EST


Currently r->queue[] is cleared after r->consumer_head is moved
forward, which makes the __ptr_ring_empty() checking called in
page_pool_refill_alloc_cache() unreliable if the checking is done
after the r->queue clearing and before the consumer_head moving
forward.

Move the r->queue[] clearing after consumer_head moving forward
to make __ptr_ring_empty() checking more reliable.

As a side effect of above change, a consumer_head checking is
avoided for the likely case, and it has noticeable performance
improvement when it is tested using the ptr_ring_test selftest
added in the previous patch.

Using "taskset -c 1 ./ptr_ring_test -s 1000 -m 0 -N 100000000"
to test the case of single thread doing both the enqueuing and
dequeuing:

arch unpatched patched delta
arm64 4648 ms 4464 ms +3.9%
X86 2562 ms 2401 ms +6.2%

Using "taskset -c 1-2 ./ptr_ring_test -s 1000 -m 1 -N 100000000"
to test the case of one thread doing enqueuing and another thread
doing dequeuing concurrently, also known as single-producer/single-
consumer:

arch unpatched patched delta
arm64 3624 ms + 3624 ms 3462 ms + 3462 ms +4.4%
x86 2758 ms + 2758 ms 2547 ms + 2547 ms +7.6%

Signed-off-by: Yunsheng Lin <linyunsheng@xxxxxxxxxx>
---
V2: Add performance data.
---
include/linux/ptr_ring.h | 25 ++++++++++++++++---------
1 file changed, 16 insertions(+), 9 deletions(-)

diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
index 808f9d3..db9c282 100644
--- a/include/linux/ptr_ring.h
+++ b/include/linux/ptr_ring.h
@@ -261,8 +261,7 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
/* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
* to work correctly.
*/
- int consumer_head = r->consumer_head;
- int head = consumer_head++;
+ int consumer_head = r->consumer_head + 1;

/* Once we have processed enough entries invalidate them in
* the ring all at once so producer can reuse their space in the ring.
@@ -271,19 +270,27 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
*/
if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
consumer_head >= r->size)) {
+ int tail = r->consumer_tail;
+
+ if (unlikely(consumer_head >= r->size)) {
+ r->consumer_tail = 0;
+ WRITE_ONCE(r->consumer_head, 0);
+ } else {
+ r->consumer_tail = consumer_head;
+ WRITE_ONCE(r->consumer_head, consumer_head);
+ }
+
/* Zero out entries in the reverse order: this way we touch the
* cache line that producer might currently be reading the last;
* producer won't make progress and touch other cache lines
* besides the first one until we write out all entries.
*/
- while (likely(head >= r->consumer_tail))
- r->queue[head--] = NULL;
- r->consumer_tail = consumer_head;
- }
- if (unlikely(consumer_head >= r->size)) {
- consumer_head = 0;
- r->consumer_tail = 0;
+ while (likely(--consumer_head >= tail))
+ r->queue[consumer_head] = NULL;
+
+ return;
}
+
/* matching READ_ONCE in __ptr_ring_empty for lockless tests */
WRITE_ONCE(r->consumer_head, consumer_head);
}
--
2.7.4