Do we need to correct barriering in circular-buffers.rst?

From: David Howells
Date: Wed Sep 18 2019 - 11:43:05 EST


Will Deacon <will@xxxxxxxxxx> wrote:

> If I'm understanding your code correctly (big 'if'), then you have things
> like this in pipe_read():
>
>
> unsigned int head = READ_ONCE(pipe->head);
> unsigned int tail = pipe->tail;
> unsigned int mask = pipe->buffers - 1;
>
> if (tail != head) {
> struct pipe_buffer *buf = &pipe->bufs[tail & mask];
>
> [...]
>
> written = copy_page_to_iter(buf->page, buf->offset, chars, to);
>
>
> where you want to make sure you don't read from 'buf->page' until after
> you've read the updated head index. Is that right? If so, then READ_ONCE()
> will not give you that guarantee on architectures such as Power and Arm,
> because the 'if (tail != head)' branch can be speculated and the buffer
> can be read before we've got around to looking at the head index.
>
> So I reckon you need smp_load_acquire() in this case. pipe_write() might be
> ok with the control dependency because CPUs don't tend to make speculative
> writes visible, but I didn't check it carefully and the compiler can do
> crazy stuff in this area, so I'd be inclined to use smp_load_acquire() here
> too unless you really need the last ounce of performance.

Yeah, I probably do.

Documentation/core-api/circular-buffers.rst might be wrong, then, I think.

It mandates using smp_store_release() to update buffer->head in the producer
and buffer->tail in the consumer - but these need pairing with memory barriers
used when reading buffer->head and buffer->tail on the other side. Currently,
for the producer we have:

spin_lock(&producer_lock);

unsigned long head = buffer->head;
/* The spin_unlock() and next spin_lock() provide needed ordering. */
unsigned long tail = READ_ONCE(buffer->tail);

if (CIRC_SPACE(head, tail, buffer->size) >= 1) {
/* insert one item into the buffer */
struct item *item = buffer[head];

produce_item(item);

smp_store_release(buffer->head,
(head + 1) & (buffer->size - 1));

/* wake_up() will make sure that the head is committed before
* waking anyone up */
wake_up(consumer);
}

spin_unlock(&producer_lock);

I think the ordering comment about spin_unlock and spin_lock is wrong. There's
no requirement to have a spinlock on either side - and in any case, both sides
could be inside their respective locked sections when accessing the buffer.
The READ_ONCE() would theoretically provide the smp_read_barrier_depends() to
pair with the smp_store_release() in the consumer. Maybe I should change this
to:

spin_lock(&producer_lock);

/* Barrier paired with consumer-side store-release on tail */
unsigned long tail = smp_load_acquire(buffer->tail);
unsigned long head = buffer->head;

if (CIRC_SPACE(head, tail, buffer->size) >= 1) {
/* insert one item into the buffer */
struct item *item = buffer[head];

produce_item(item);

smp_store_release(buffer->head,
(head + 1) & (buffer->size - 1));

/* wake_up() will make sure that the head is committed before
* waking anyone up */
wake_up(consumer);
}

spin_unlock(&producer_lock);

The consumer is currently:

spin_lock(&consumer_lock);

/* Read index before reading contents at that index. */
unsigned long head = smp_load_acquire(buffer->head);
unsigned long tail = buffer->tail;

if (CIRC_CNT(head, tail, buffer->size) >= 1) {

/* extract one item from the buffer */
struct item *item = buffer[tail];

consume_item(item);

/* Finish reading descriptor before incrementing tail. */
smp_store_release(buffer->tail,
(tail + 1) & (buffer->size - 1));
}

spin_unlock(&consumer_lock);

which I think is okay.

And yes, I note that this does use smp_load_acquire(buffer->head) in the
consumer - which I should also be doing.

David