Re: [RFC PATCH 04/10] pipe: Use head and tail pointers for the ring, not cursor and length [ver #2]

From: Ilya Dryomov
Date: Wed Oct 30 2019 - 12:18:59 EST


On Thu, Oct 24, 2019 at 11:49 AM David Howells <dhowells@xxxxxxxxxx> wrote:
>
> Convert pipes to use head and tail pointers for the buffer ring rather than
> pointer and length as the latter requires two atomic ops to update (or a
> combined op) whereas the former only requires one.
>
> (1) The head pointer is the point at which production occurs and points to
> the slot in which the next buffer will be placed. This is equivalent
> to pipe->curbuf + pipe->nrbufs.
>
> The head pointer belongs to the write-side.
>
> (2) The tail pointer is the point at which consumption occurs. It points
> to the next slot to be consumed. This is equivalent to pipe->curbuf.
>
> The tail pointer belongs to the read-side.
>
> (3) head and tail are allowed to run to UINT_MAX and wrap naturally. They
> are only masked off when the array is being accessed, e.g.:
>
> pipe->bufs[head & mask]
>
> This means that it is not necessary to have a dead slot in the ring as
> head == tail isn't ambiguous.
>
> (4) The ring is empty if "head == tail".
>
> A helper, pipe_empty(), is provided for this.
>
> (5) The occupancy of the ring is "head - tail".
>
> A helper, pipe_occupancy(), is provided for this.
>
> (6) The number of free slots in the ring is "pipe->ring_size - occupancy".
>
> A helper, pipe_space_for_user() is provided to indicate how many slots
> userspace may use.
>
> (7) The ring is full if "head - tail >= pipe->ring_size".
>
> A helper, pipe_full(), is provided for this.
>
> Signed-off-by: David Howells <dhowells@xxxxxxxxxx>
> ---
>
> fs/fuse/dev.c | 31 +++--
> fs/pipe.c | 169 ++++++++++++++++-------------
> fs/splice.c | 188 ++++++++++++++++++++------------
> include/linux/pipe_fs_i.h | 86 ++++++++++++++-
> include/linux/uio.h | 4 -
> lib/iov_iter.c | 266 +++++++++++++++++++++++++--------------------
> 6 files changed, 464 insertions(+), 280 deletions(-)
>
> diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
> index dadd617d826c..1e4bc27573cc 100644
> --- a/fs/fuse/dev.c
> +++ b/fs/fuse/dev.c
> @@ -703,7 +703,7 @@ static int fuse_copy_fill(struct fuse_copy_state *cs)
> cs->pipebufs++;
> cs->nr_segs--;
> } else {
> - if (cs->nr_segs == cs->pipe->buffers)
> + if (cs->nr_segs >= cs->pipe->ring_size)
> return -EIO;
>
> page = alloc_page(GFP_HIGHUSER);
> @@ -879,7 +879,7 @@ static int fuse_ref_page(struct fuse_copy_state *cs, struct page *page,
> struct pipe_buffer *buf;
> int err;
>
> - if (cs->nr_segs == cs->pipe->buffers)
> + if (cs->nr_segs >= cs->pipe->ring_size)
> return -EIO;
>
> err = unlock_request(cs->req);
> @@ -1341,7 +1341,7 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
> if (!fud)
> return -EPERM;
>
> - bufs = kvmalloc_array(pipe->buffers, sizeof(struct pipe_buffer),
> + bufs = kvmalloc_array(pipe->ring_size, sizeof(struct pipe_buffer),
> GFP_KERNEL);
> if (!bufs)
> return -ENOMEM;
> @@ -1353,7 +1353,7 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
> if (ret < 0)
> goto out;
>
> - if (pipe->nrbufs + cs.nr_segs > pipe->buffers) {
> + if (pipe_occupancy(pipe->head, pipe->tail) + cs.nr_segs > pipe->ring_size) {
> ret = -EIO;
> goto out;
> }
> @@ -1935,6 +1935,7 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
> struct file *out, loff_t *ppos,
> size_t len, unsigned int flags)
> {
> + unsigned int head, tail, mask, count;
> unsigned nbuf;
> unsigned idx;
> struct pipe_buffer *bufs;
> @@ -1949,8 +1950,12 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
>
> pipe_lock(pipe);
>
> - bufs = kvmalloc_array(pipe->nrbufs, sizeof(struct pipe_buffer),
> - GFP_KERNEL);
> + head = pipe->head;
> + tail = pipe->tail;
> + mask = pipe->ring_size - 1;
> + count = head - tail;
> +
> + bufs = kvmalloc_array(count, sizeof(struct pipe_buffer), GFP_KERNEL);
> if (!bufs) {
> pipe_unlock(pipe);
> return -ENOMEM;
> @@ -1958,8 +1963,8 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
>
> nbuf = 0;
> rem = 0;
> - for (idx = 0; idx < pipe->nrbufs && rem < len; idx++)
> - rem += pipe->bufs[(pipe->curbuf + idx) & (pipe->buffers - 1)].len;
> + for (idx = tail; idx < head && rem < len; idx++)
> + rem += pipe->bufs[idx & mask].len;
>
> ret = -EINVAL;
> if (rem < len)
> @@ -1970,16 +1975,16 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
> struct pipe_buffer *ibuf;
> struct pipe_buffer *obuf;
>
> - BUG_ON(nbuf >= pipe->buffers);
> - BUG_ON(!pipe->nrbufs);
> - ibuf = &pipe->bufs[pipe->curbuf];
> + BUG_ON(nbuf >= pipe->ring_size);
> + BUG_ON(tail == head);
> + ibuf = &pipe->bufs[tail & mask];
> obuf = &bufs[nbuf];
>
> if (rem >= ibuf->len) {
> *obuf = *ibuf;
> ibuf->ops = NULL;
> - pipe->curbuf = (pipe->curbuf + 1) & (pipe->buffers - 1);
> - pipe->nrbufs--;
> + tail++;
> + pipe_commit_read(pipe, tail);
> } else {
> if (!pipe_buf_get(pipe, ibuf))
> goto out_free;
> diff --git a/fs/pipe.c b/fs/pipe.c
> index 8a2ab2f974bd..8a0806fe12d3 100644
> --- a/fs/pipe.c
> +++ b/fs/pipe.c
> @@ -43,10 +43,11 @@ unsigned long pipe_user_pages_hard;
> unsigned long pipe_user_pages_soft = PIPE_DEF_BUFFERS * INR_OPEN_CUR;
>
> /*
> - * We use a start+len construction, which provides full use of the
> - * allocated memory.
> - * -- Florian Coosmann (FGC)
> - *
> + * We use head and tail indices that aren't masked off, except at the point of
> + * dereference, but rather they're allowed to wrap naturally. This means there
> + * isn't a dead spot in the buffer, provided the ring size < INT_MAX.
> + * -- David Howells 2019-09-23.

Hi David,

Is "ring size < INT_MAX" constraint correct?

I've never had to implement this free running indices scheme, but
the way I've always visualized it is that the top bit of the index is
used as a lap (as in a race) indicator, leaving 31 bits to work with
(in case of unsigned ints). Should that be

ring size <= 2^31

or more precisely

ring size is a power of two <= 2^31

or am I missing something?

Thanks,

Ilya