Re: [RESEND PATCH] fs/pipe: Introduce a check to skip sleeping processes during pipe read/write

From: Manfred Spraul
Date: Tue Dec 31 2024 - 06:14:58 EST


Hi Oleg,

just FYI, I did some quick tests with:
- your changes to fs/pipe.c
- my change, to skip locking in wake-up (and some smp_mb())
- statistics, to check how often wake_up is called/how often the list is
wait queue is actually empty

Known issue: Statistic printing every 10 seconds doesn't work, it prints
at eratic times. And the comment in __wake_up is still wrong, the
memory barrier would pair with smp_mb() after updating wq_head->head.

Result: (all with a 2 core 4 thread i3, fully idle system)
- your change has no impact on 'find /proc /sys | grep doesnotexist'
(using busybox)
- Running your test app for around 100 seconds
- 3 wakeups with non-empty queue
- 26 wakeup with empty queue
- 2107 __add_wait_queue
- find|grep produces insane numbers of wakeup. I've seen 20k, I've
now seen 50k wakeup calls. With just around 2k __add_wait_queue,
...

Thus, at least for pipe:
Should we add the missing memory barriers and switch to
wait_queue_active() in front of all wakeup calls?

---
fs/pipe.c | 13 +++++++------
include/linux/wait.h | 5 +++++
kernel/sched/wait.c | 45 ++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 57 insertions(+), 6 deletions(-)

diff --git a/fs/pipe.c b/fs/pipe.c
index 12b22c2723b7..27ffb650f131 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -253,7 +253,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
size_t total_len = iov_iter_count(to);
struct file *filp = iocb->ki_filp;
struct pipe_inode_info *pipe = filp->private_data;
- bool was_full, wake_next_reader = false;
+ bool wake_writer = false, wake_next_reader = false;
ssize_t ret;

/* Null read succeeds. */
@@ -271,7 +271,6 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
* (WF_SYNC), because we want them to get going and generate more
* data for us.
*/
- was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
for (;;) {
/* Read ->head with a barrier vs post_one_notification() */
unsigned int head = smp_load_acquire(&pipe->head);
@@ -340,8 +339,10 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
buf->len = 0;
}

- if (!buf->len)
+ if (!buf->len) {
+ wake_writer |= pipe_full(head, tail, pipe->max_usage);
tail = pipe_update_tail(pipe, buf, tail);
+ }
total_len -= chars;
if (!total_len)
break; /* common path: read succeeded */
@@ -377,7 +378,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
* _very_ unlikely case that the pipe was full, but we got
* no data.
*/
- if (unlikely(was_full))
+ if (unlikely(wake_writer))
wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);

@@ -391,14 +392,14 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
return -ERESTARTSYS;

mutex_lock(&pipe->mutex);
- was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
wake_next_reader = true;
+ wake_writer = false;
}
if (pipe_empty(pipe->head, pipe->tail))
wake_next_reader = false;
mutex_unlock(&pipe->mutex);

- if (was_full)
+ if (wake_writer)
wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
if (wake_next_reader)
wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
diff --git a/include/linux/wait.h b/include/linux/wait.h
index 6d90ad974408..0fdad3c3c513 100644
--- a/include/linux/wait.h
+++ b/include/linux/wait.h
@@ -166,6 +166,7 @@ extern void add_wait_queue_exclusive(struct wait_queue_head *wq_head, struct wai
extern void add_wait_queue_priority(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry);
extern void remove_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry);

+extern atomic_t g_add_count;
static inline void __add_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
struct list_head *head = &wq_head->head;
@@ -177,6 +178,8 @@ static inline void __add_wait_queue(struct wait_queue_head *wq_head, struct wait
head = &wq->entry;
}
list_add(&wq_entry->entry, head);
+ smp_mb();
+ atomic_inc(&g_add_count);
}

/*
@@ -192,6 +195,8 @@ __add_wait_queue_exclusive(struct wait_queue_head *wq_head, struct wait_queue_en
static inline void __add_wait_queue_entry_tail(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
list_add_tail(&wq_entry->entry, &wq_head->head);
+ smp_mb();
+ atomic_inc(&g_add_count);
}

static inline void
diff --git a/kernel/sched/wait.c b/kernel/sched/wait.c
index 51e38f5f4701..07487429dddf 100644
--- a/kernel/sched/wait.c
+++ b/kernel/sched/wait.c
@@ -110,6 +110,10 @@ static int __wake_up_common_lock(struct wait_queue_head *wq_head, unsigned int m
return nr_exclusive - remaining;
}

+#if 1
+atomic_t g_add_count = ATOMIC_INIT(0);
+#endif
+
/**
* __wake_up - wake up threads blocked on a waitqueue.
* @wq_head: the waitqueue
@@ -124,6 +128,47 @@ static int __wake_up_common_lock(struct wait_queue_head *wq_head, unsigned int m
int __wake_up(struct wait_queue_head *wq_head, unsigned int mode,
int nr_exclusive, void *key)
{
+#if 1
+static atomic_t g_slow = ATOMIC_INIT(0);
+static atomic_t g_mid = ATOMIC_INIT(0);
+static atomic_t g_fast = ATOMIC_INIT(0);
+static u64 printtime = 10*HZ;
+#endif
+ if (list_empty(&wq_head->head)) {
+ struct list_head *pn;
+
+ /*
+ * pairs with spin_unlock_irqrestore(&wq_head->lock);
+ * We actually do not need to acquire wq_head->lock, we just
+ * need to be sure that there is no prepare_to_wait() that
+ * completed on any CPU before __wake_up was called.
+ * Thus instead of load_acquiring the spinlock and dropping
+ * it again, we load_acquire the next list entry and check
+ * that the list is not empty.
+ */
+ pn = smp_load_acquire(&wq_head->head.next);
+
+ if(pn == &wq_head->head) {
+#if 1
+ atomic_inc(&g_fast);
+#endif
+ return 0;
+ } else {
+#if 1
+ atomic_inc(&g_mid);
+#endif
+ }
+ } else {
+#if 1
+ atomic_inc(&g_slow);
+#endif
+ }
+#if 1
+ if (get_jiffies_64() > printtime) {
+ printtime = get_jiffies_64() + 10*HZ;
+ pr_info("__wakeup: slow/obvious: %d, mid/nearly raced: %d, fast: %d, add: %d.\n", atomic_read(&g_slow), atomic_read(&g_mid), atomic_read(&g_fast), atomic_read(&g_add_count));
+ }
+#endif
return __wake_up_common_lock(wq_head, mode, nr_exclusive, 0, key);
}
EXPORT_SYMBOL(__wake_up);
--
2.47.1