[PATCH net-next v3 3/3] ptr_ring: add barrier to ensure the visiblity of r->queue[]

From: Yunsheng Lin
Date: Thu Jul 01 2021 - 08:27:37 EST


After r->consumer_head is updated in __ptr_ring_discard_one(),
r->queue[r->consumer_head] is already cleared in the previous
round of __ptr_ring_discard_one(). But there is no guarantee
other thread will see the r->queue[r->consumer_head] being
NULL because there is no explicit barrier between r->queue[]
clearing and r->consumer_head updating.

So add two explicit barrier to make sure r->queue[] cleared in
__ptr_ring_discard_one() to be visible to other cpu, mainly to
make sure the cpu calling the __ptr_ring_empty() will see the
correct r->queue[r->consumer_head].

Hopefully the previous and this patch have ensured the correct
visibility of r->queue[], so update the comment accordingly
about __ptr_ring_empty().

Tested using the "perf stat -r 1000 ./ptr_ring_test -s 1000 -m 1
-N 100000000", comparing the elapsed time:

arch unpatched patched improvement
arm64 1.888224 sec 1.893673 sec -0.2%
X86 2.5422 sec 2.5587 sec -0.6%

Reported-by: Michael S. Tsirkin <mst@xxxxxxxxxx>
Signed-off-by: Yunsheng Lin <linyunsheng@xxxxxxxxxx>
---
include/linux/ptr_ring.h | 29 +++++++++++++++++++----------
1 file changed, 19 insertions(+), 10 deletions(-)

diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
index db9c282..d78aab8 100644
--- a/include/linux/ptr_ring.h
+++ b/include/linux/ptr_ring.h
@@ -178,15 +178,11 @@ static inline void *__ptr_ring_peek(struct ptr_ring *r)
*
* NB: This is only safe to call if ring is never resized.
*
- * However, if some other CPU consumes ring entries at the same time, the value
- * returned is not guaranteed to be correct.
- *
- * In this case - to avoid incorrectly detecting the ring
- * as empty - the CPU consuming the ring entries is responsible
- * for either consuming all ring entries until the ring is empty,
- * or synchronizing with some other CPU and causing it to
- * re-test __ptr_ring_empty and/or consume the ring enteries
- * after the synchronization point.
+ * caller might need to use the smp_rmb() to pair with smp_wmb()
+ * or smp_store_release() in __ptr_ring_discard_one() and smp_wmb()
+ * in __ptr_ring_produce() to ensure correct ordering between
+ * __ptr_ring_empty() checking and subsequent operation after
+ * __ptr_ring_empty() checking.
*
* Note: callers invoking this in a loop must use a compiler barrier,
* for example cpu_relax().
@@ -274,7 +270,12 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)

if (unlikely(consumer_head >= r->size)) {
r->consumer_tail = 0;
- WRITE_ONCE(r->consumer_head, 0);
+
+ /* Make sure r->queue[0] ~ r->queue[r->consumer_tail]
+ * cleared in previous __ptr_ring_discard_one() is
+ * visible to other cpu.
+ */
+ smp_store_release(&r->consumer_head, 0);
} else {
r->consumer_tail = consumer_head;
WRITE_ONCE(r->consumer_head, consumer_head);
@@ -288,6 +289,14 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
while (likely(--consumer_head >= tail))
r->queue[consumer_head] = NULL;

+ if (unlikely(!r->consumer_head)) {
+ /* Make sure r->queue[r->consumer_tail] ~
+ * r->queue[r->size - 1] cleared above is visible to
+ * other cpu.
+ */
+ smp_wmb();
+ }
+
return;
}

--
2.7.4