Re: [RESEND PATCH] fs/pipe: Introduce a check to skip sleeping processes during pipe read/write

From: Oleg Nesterov
Date: Fri Dec 27 2024 - 11:44:31 EST


On 12/27, Oleg Nesterov wrote:
>
> Consider
>
> int main(void)
> {
> int fd[2], cnt;
> char c;
>
> pipe(fd);
>
> if (!fork()) {
> // wait until the parent blocks in pipe_write() ->
> // wait_event_interruptible_exclusive(pipe->wr_wait, pipe_writable(pipe));
> sleep(1);
>
> for (cnt = 0; cnt < 4096; ++cnt)
> read(fd[0], &c, 1);
> return 0;
> }
>
> // parent
> for (;;)
> write(fd[1], &c, 1);
> }
>
> In this case the child will wakeup the parent 4095 times for no reason,
> pipe_writable() == !pipe_pull() will still be true until the last
> read(fd[0], &c, 1) does
>
> if (!buf->len)
> tail = pipe_update_tail(pipe, buf, tail);
>
> and after that the parent can write the next char.

perhaps something like below makes sense in this particular case.
Incomplete and ugly, just for illustration.

Oleg.

diff --git a/fs/pipe.c b/fs/pipe.c
index 12b22c2723b7..b8eef9e75639 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -253,7 +253,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
size_t total_len = iov_iter_count(to);
struct file *filp = iocb->ki_filp;
struct pipe_inode_info *pipe = filp->private_data;
- bool was_full, wake_next_reader = false;
+ bool was_full, xxx, wake_next_reader = false;
ssize_t ret;

/* Null read succeeds. */
@@ -277,6 +277,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
unsigned int head = smp_load_acquire(&pipe->head);
unsigned int tail = pipe->tail;
unsigned int mask = pipe->ring_size - 1;
+ xxx = false;

#ifdef CONFIG_WATCH_QUEUE
if (pipe->note_loss) {
@@ -340,8 +341,10 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
buf->len = 0;
}

- if (!buf->len)
+ if (!buf->len) {
tail = pipe_update_tail(pipe, buf, tail);
+ xxx = true;
+ }
total_len -= chars;
if (!total_len)
break; /* common path: read succeeded */
@@ -398,7 +401,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
wake_next_reader = false;
mutex_unlock(&pipe->mutex);

- if (was_full)
+ if (was_full && xxx)
wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
if (wake_next_reader)
wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);