Re: Pending splice(file -> FIFO) excludes all other FIFO operations forever (was: ... always blocks read(FIFO), regardless of O_NONBLOCK on read side?)

From: Ahelenia Ziemiańska
Date: Sun Jul 09 2023 - 18:33:18 EST


On Sun, Jul 09, 2023 at 03:03:22AM +0200, Ahelenia Ziemiańska wrote:
> On Sat, Jul 08, 2023 at 01:06:56PM -0700, Linus Torvalds wrote:
> > I guess combined with something like
> >
> > if (!(in->f_mode & FMODE_NOWAIT))
> > return -EINVAL;
> >
> > it might all work.
> Yes, that makes the splice instantly -EINVAL for ttys, but doesn't
> affect the socketpair() case above, which still blocks forever.

This also triggers for regular file -> pipe splices,
which is probably a no-go.


Attaching a summary diff that does all I said in the previous mail.

filemap_get_pages() does use and inspect IOCB_NOWAIT if set in
filemap_splice_read(), but it appears to not really make much sense,
inasmuch it returns EAGAIN for the first splice() from a
blockdev and then instantly return with data on the next go-around.

This doesn't really make much sense ‒ and open(2) says blockdevs
don't have O_NONBLOCK semantics, so I'm assuming this is not supposed
to be exposed to userspace ‒ so I'm not setting it in the diff.


I've tested this for:
* tty: -EINVAL
* socketpair(AF_UNIX, SOCK_STREAM): works as expected
$ wc -c fifo &
[1] 261
$ ./af_unix ./s > fifo
5 Success
6454 Resource temporarily unavailable
5 Success
6445 Resource temporarily unavailable
0 Success
10 fifo
* socket(AF_INET, SOCK_STREAM, 0): works as expected
$ wc fifo &
[1] 249
$ ./tcp ./s > fifo
23099 Resource temporarily unavailable
7 Success
2099 Resource temporarily unavailable
4 Success
1751 Resource temporarily unavailable
3 Success
21655 Resource temporarily unavailable
95 Success
19589 Resource temporarily unavailable
0 Success
4 15 109 fifo
corresponding to
host$ nc 127.0.0.1 14640
żupan
asd
ad
asdda dasj aiojd askdl; jasiopdij as[pkdo[p askd9p ias90dk aso[pjd 890pasid90[ jaskl;dj il;asd
^C
* blockdev (/dev/vda): as expected (with filemap_splice_read() unchanged), copies it all
* regular file: -EINVAL(!)

Splicers still sleep if the pipe's full, of course,
unless SPLICE_F_NONBLOCK.

Test drivers attached as well.
diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 1a8f82f478cb..4e8caf66c01e 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -1202,7 +1202,8 @@ __releases(fiq->lock)
* the 'sent' flag.
*/
static ssize_t fuse_dev_do_read(struct fuse_dev *fud, struct file *file,
- struct fuse_copy_state *cs, size_t nbytes)
+ struct fuse_copy_state *cs, size_t nbytes,
+ bool nonblock)
{
ssize_t err;
struct fuse_conn *fc = fud->fc;
@@ -1238,7 +1239,7 @@ static ssize_t fuse_dev_do_read(struct fuse_dev *fud, struct file *file,
break;
spin_unlock(&fiq->lock);

- if (file->f_flags & O_NONBLOCK)
+ if (nonblock)
return -EAGAIN;
err = wait_event_interruptible_exclusive(fiq->waitq,
!fiq->connected || request_pending(fiq));
@@ -1364,7 +1365,8 @@ static ssize_t fuse_dev_read(struct kiocb *iocb, struct iov_iter *to)

fuse_copy_init(&cs, 1, to);

- return fuse_dev_do_read(fud, file, &cs, iov_iter_count(to));
+ return fuse_dev_do_read(fud, file, &cs, iov_iter_count(to),
+ file->f_flags & O_NONBLOCK);
}

static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
@@ -1388,7 +1390,7 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
fuse_copy_init(&cs, 1, NULL);
cs.pipebufs = bufs;
cs.pipe = pipe;
- ret = fuse_dev_do_read(fud, in, &cs, len);
+ ret = fuse_dev_do_read(fud, in, &cs, len, true);
if (ret < 0)
goto out;

diff --git a/fs/splice.c b/fs/splice.c
index 004eb1c4ce31..e52cc42fff46 100644
--- a/fs/splice.c
+++ b/fs/splice.c
@@ -364,6 +364,7 @@ ssize_t copy_splice_read(struct file *in, loff_t *ppos,
iov_iter_bvec(&to, ITER_DEST, bv, npages, len);
init_sync_kiocb(&kiocb, in);
kiocb.ki_pos = *ppos;
+ kiocb.ki_flags |= IOCB_NOWAIT;
ret = call_read_iter(in, &kiocb, &to);

if (ret > 0) {
@@ -1309,6 +1310,8 @@ long do_splice(struct file *in, loff_t *off_in, struct file *out,
if (opipe) {
if (off_out)
return -ESPIPE;
+ if (!(in->f_mode & FMODE_NOWAIT))
+ return -EINVAL;
if (off_in) {
if (!(in->f_mode & FMODE_PREAD))
return -EINVAL;
diff --git a/kernel/relay.c b/kernel/relay.c
index a80fa01042e9..d3f5682c4a12 100644
--- a/kernel/relay.c
+++ b/kernel/relay.c
@@ -1215,8 +1215,7 @@ static ssize_t relay_file_splice_read(struct file *in,
if (ret < 0)
break;
else if (!ret) {
- if (flags & SPLICE_F_NONBLOCK)
- ret = -EAGAIN;
+ ret = -EAGAIN;
break;
}

diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c
index 4529e264cb86..821bcbcd9e35 100644
--- a/kernel/trace/trace.c
+++ b/kernel/trace/trace.c
@@ -8391,7 +8391,6 @@ tracing_buffers_splice_read(struct file *file, loff_t *ppos,
if (splice_grow_spd(pipe, &spd))
return -ENOMEM;

- again:
trace_access_lock(iter->cpu_file);
entries = ring_buffer_entries_cpu(iter->array_buffer->buffer, iter->cpu_file);

@@ -8442,35 +8441,12 @@ tracing_buffers_splice_read(struct file *file, loff_t *ppos,

/* did we read anything? */
if (!spd.nr_pages) {
- long wait_index;
-
- if (ret)
- goto out;
-
- ret = -EAGAIN;
- if ((file->f_flags & O_NONBLOCK) || (flags & SPLICE_F_NONBLOCK))
- goto out;
-
- wait_index = READ_ONCE(iter->wait_index);
-
- ret = wait_on_pipe(iter, iter->tr->buffer_percent);
- if (ret)
- goto out;
-
- /* No need to wait after waking up when tracing is off */
- if (!tracer_tracing_is_on(iter->tr))
- goto out;
-
- /* Make sure we see the new wait_index */
- smp_rmb();
- if (wait_index != iter->wait_index)
- goto out;
-
- goto again;
+ if (!ret)
+ ret = -EAGAIN;
+ } else {
+ ret = splice_to_pipe(pipe, &spd);
}

- ret = splice_to_pipe(pipe, &spd);
-out:
splice_shrink_spd(&spd);

return ret;
diff --git a/net/ipv4/tcp.c b/net/ipv4/tcp.c
index e03e08745308..92a2be52123e 100644
--- a/net/ipv4/tcp.c
+++ b/net/ipv4/tcp.c
@@ -780,7 +780,6 @@ ssize_t tcp_splice_read(struct socket *sock, loff_t *ppos,
.len = len,
.flags = flags,
};
- long timeo;
ssize_t spliced;
int ret;

@@ -795,7 +794,6 @@ ssize_t tcp_splice_read(struct socket *sock, loff_t *ppos,

lock_sock(sk);

- timeo = sock_rcvtimeo(sk, sock->file->f_flags & O_NONBLOCK);
while (tss.len) {
ret = __tcp_splice_read(sk, &tss);
if (ret < 0)
@@ -819,35 +817,13 @@ ssize_t tcp_splice_read(struct socket *sock, loff_t *ppos,
ret = -ENOTCONN;
break;
}
- if (!timeo) {
- ret = -EAGAIN;
- break;
- }
- /* if __tcp_splice_read() got nothing while we have
- * an skb in receive queue, we do not want to loop.
- * This might happen with URG data.
- */
- if (!skb_queue_empty(&sk->sk_receive_queue))
- break;
- sk_wait_data(sk, &timeo, NULL);
- if (signal_pending(current)) {
- ret = sock_intr_errno(timeo);
- break;
- }
- continue;
+ ret = -EAGAIN;
+ break;
}
tss.len -= ret;
spliced += ret;

- if (!tss.len || !timeo)
- break;
- release_sock(sk);
- lock_sock(sk);
-
- if (sk->sk_err || sk->sk_state == TCP_CLOSE ||
- (sk->sk_shutdown & RCV_SHUTDOWN) ||
- signal_pending(current))
- break;
+ break;
}

release_sock(sk);
diff --git a/net/kcm/kcmsock.c b/net/kcm/kcmsock.c
index 393f01b2a7e6..f96b52a8be0e 100644
--- a/net/kcm/kcmsock.c
+++ b/net/kcm/kcmsock.c
@@ -1025,7 +1025,7 @@ static ssize_t kcm_splice_read(struct socket *sock, loff_t *ppos,

/* Only support splice for SOCKSEQPACKET */

- skb = skb_recv_datagram(sk, flags, &err);
+ skb = skb_recv_datagram(sk, MSG_DONTWAIT, &err);
if (!skb)
goto err_out;

diff --git a/net/smc/af_smc.c b/net/smc/af_smc.c
index a7f887d91d89..4ba8f93ddbe5 100644
--- a/net/smc/af_smc.c
+++ b/net/smc/af_smc.c
@@ -3172,12 +3172,8 @@ static ssize_t smc_splice_read(struct socket *sock, loff_t *ppos,
rc = -ESPIPE;
goto out;
}
- if (flags & SPLICE_F_NONBLOCK)
- flags = MSG_DONTWAIT;
- else
- flags = 0;
SMC_STAT_INC(smc, splice_cnt);
- rc = smc_rx_recvmsg(smc, NULL, pipe, len, flags);
+ rc = smc_rx_recvmsg(smc, NULL, pipe, len, MSG_DONTWAIT);
}
out:
release_sock(sk);
diff --git a/net/tls/tls_sw.c b/net/tls/tls_sw.c
index 53f944e6d8ef..7df1ea6a62a5 100644
--- a/net/tls/tls_sw.c
+++ b/net/tls/tls_sw.c
@@ -2136,7 +2136,7 @@ ssize_t tls_sw_splice_read(struct socket *sock, loff_t *ppos,
int chunk;
int err;

- err = tls_rx_reader_lock(sk, ctx, flags & SPLICE_F_NONBLOCK);
+ err = tls_rx_reader_lock(sk, ctx, true);
if (err < 0)
return err;

@@ -2145,8 +2145,7 @@ ssize_t tls_sw_splice_read(struct socket *sock, loff_t *ppos,
} else {
struct tls_decrypt_arg darg;

- err = tls_rx_rec_wait(sk, NULL, flags & SPLICE_F_NONBLOCK,
- true);
+ err = tls_rx_rec_wait(sk, NULL, true, true);
if (err <= 0)
goto splice_read_end;

diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index 123b35ddfd71..384d5a479b4a 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -2880,15 +2880,12 @@ static ssize_t unix_stream_splice_read(struct socket *sock, loff_t *ppos,
.pipe = pipe,
.size = size,
.splice_flags = flags,
+ .flags = MSG_DONTWAIT,
};

if (unlikely(*ppos))
return -ESPIPE;

- if (sock->file->f_flags & O_NONBLOCK ||
- flags & SPLICE_F_NONBLOCK)
- state.flags = MSG_DONTWAIT;
-
return unix_stream_read_generic(&state, false);
}

#define _GNU_SOURCE
#include <fcntl.h>
#include <stdio.h>
#include <errno.h>
int main() {
int lasterr = -1;
unsigned ctr = 0;
for(;;) {
errno = 0;
ssize_t ret = splice(0, 0, 1, 0, 128 * 1024 * 1024, 0);
if(ret >= 0 || errno != lasterr) {
fprintf(stderr, "\n\t%m" + (lasterr == -1));
lasterr = errno;
ctr = 0;
}
if(ret == -1) {
++ctr;
fprintf(stderr, "\r%u", ctr);
} else
fprintf(stderr, "\r%zu", ret);
if(!ret)
break;
}
fprintf(stderr, "\n");
}
#define _GNU_SOURCE
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <unistd.h>

int main(int argc, char ** argv) {
int s = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0);
struct sockaddr_in addr = {
.sin_family = AF_INET,
.sin_port = 12345,
.sin_addr = 0,
};
if(bind(s, &addr, sizeof(addr)) == -1 || listen(s, 128))
abort();

int fd = accept4(s, NULL, NULL, SOCK_CLOEXEC);
if(fd == -1)
abort();
dup2(fd, 0);
execvp(argv[1], argv + 1);
}
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>

int main(int argc, char ** argv) {
int fds[2];
if(socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, fds))
abort();

if(!vfork()) {
dup2(fds[0], 0);
_exit(execvp(argv[1], argv + 1));
}
dup2(fds[1], 1);
write(1, "dupa\n", 5);
sleep(1);
write(1, "dupa\n", 5);
sleep(1);
}

Attachment: signature.asc
Description: PGP signature