Re: [PATCH] pipe_read: don't wake up the writer if the pipe is still full
From: Oleg Nesterov
Date: Tue Feb 04 2025 - 09:03:56 EST
On 02/03, K Prateek Nayak wrote:
>
> With the below patch on mainline, I see more improvements for a
> modified version of sched-messaging (sched-messaging is same as
> hackbench as you noted on the parallel thread) that uses
> pipe2(O_NOATIME)
Thanks,
> The original regression is still noticeable despite the improvements
> but if folks believe this is a corner case with the original changes
> exhibited by sched-messaging, I'll just continue further testing with
> the new baseline.
I still don't know if we should worry or not... But if we want to try
to improve the wake_writer logic, then I think it makes sense to cleanup
this code first.
IMO the (untested) patch below makes sense regardless, I am going to send
it after I grep fs/splice.c a bit more.
a194dfe6e6f6f ("pipe: Rearrange sequence in pipe_write() to preallocate slot")
changed pipe_write() to increment pipe->head in advance. IIUC to avoid the
race with the post_one_notification()-like code which can add another buffer
under pipe->rd_wait.lock without pipe->mutex.
This is no longer necessary after c73be61cede ("pipe: Add general notification
queue support"), pipe_write() checks pipe_has_watch_queue() and returns -EXDEV
at the start. And can't help in any case, pipe_write() no longer takes this
spinlock.
Change pipe_write() to call copy_page_from_iter() first and do nothing if it
fails. This way pipe_write() can't add a zero-sized bufer and we can simplify
pipe_read() which currently has to handle this very unlikely case.
Oleg.
diff --git a/fs/pipe.c b/fs/pipe.c
index baaa8c0817f1..0816070a5e7a 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -312,6 +312,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
size_t written;
int error;
+ WARN_ON_ONCE(chars == 0);
if (chars > total_len) {
if (buf->flags & PIPE_BUF_FLAG_WHOLE) {
if (ret == 0)
@@ -365,29 +366,9 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
break;
}
mutex_unlock(&pipe->mutex);
-
/*
* We only get here if we didn't actually read anything.
*
- * However, we could have seen (and removed) a zero-sized
- * pipe buffer, and might have made space in the buffers
- * that way.
- *
- * You can't make zero-sized pipe buffers by doing an empty
- * write (not even in packet mode), but they can happen if
- * the writer gets an EFAULT when trying to fill a buffer
- * that already got allocated and inserted in the buffer
- * array.
- *
- * So we still need to wake up any pending writers in the
- * _very_ unlikely case that the pipe was full, but we got
- * no data.
- */
- if (unlikely(wake_writer))
- wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
- kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
-
- /*
* But because we didn't read anything, at this point we can
* just return directly with -ERESTARTSYS if we're interrupted,
* since we've done any required wakeups and there's no need
@@ -396,7 +377,6 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
if (wait_event_interruptible_exclusive(pipe->rd_wait, pipe_readable(pipe)) < 0)
return -ERESTARTSYS;
- wake_writer = false;
wake_next_reader = true;
mutex_lock(&pipe->mutex);
}
@@ -524,31 +504,25 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from)
pipe->tmp_page = page;
}
- /* Allocate a slot in the ring in advance and attach an
- * empty buffer. If we fault or otherwise fail to use
- * it, either the reader will consume it or it'll still
- * be there for the next write.
- */
- pipe->head = head + 1;
+ copied = copy_page_from_iter(page, 0, PAGE_SIZE, from);
+ if (unlikely(copied < PAGE_SIZE && iov_iter_count(from))) {
+ if (!ret)
+ ret = -EFAULT;
+ break;
+ }
+ pipe->head = head + 1;
+ pipe->tmp_page = NULL;
/* Insert it into the buffer array */
buf = &pipe->bufs[head & mask];
buf->page = page;
buf->ops = &anon_pipe_buf_ops;
buf->offset = 0;
- buf->len = 0;
if (is_packetized(filp))
buf->flags = PIPE_BUF_FLAG_PACKET;
else
buf->flags = PIPE_BUF_FLAG_CAN_MERGE;
- pipe->tmp_page = NULL;
- copied = copy_page_from_iter(page, 0, PAGE_SIZE, from);
- if (unlikely(copied < PAGE_SIZE && iov_iter_count(from))) {
- if (!ret)
- ret = -EFAULT;
- break;
- }
ret += copied;
buf->len = copied;