Re: [PATCH v2] ipc/mqueue: remove STATE_PENDING
From: Davidlohr Bueso
Date: Mon Apr 27 2015 - 23:25:23 EST
On Fri, 2015-04-24 at 00:18 +0200, Thomas Gleixner wrote:
> Can you please convert that over to Peters lockless wake queues so we
> do not reimplement the same thing open coded here.
So I'd like to include this in my v2 of the wake_q stuff, along with the
futex conversion. What do you guys think of the following?
Thanks,
Davidlohr
8<-------------------------------------------------------------
Subject: [PATCH] ipc/mqueue: lockless pipelined wakeups
This patch moves the wakeup_process() invocation so it is not done under
the info->lock by making use of a lockless wake_q. With this change, the
waiter is woken up once it is STATE_READY and it does not need to loop
on SMP if it is still in STATE_PENDING. In the timeout case we still need
to grab the info->lock to verify the state.
This change should also avoid the introduction of preempt_disable() in
-RT which avoids a busy-loop which pools for the STATE_PENDING -> STATE_READY
change if the waiter has a higher priority compared to the waker.
Additionally, this patch micro-optimizes wq_sleep by using the cheaper
cousin of set_current_state(TASK_INTERRUPTABLE) as we will block no
matter what, thus get rid of the implied barrier. Secondly, and related
to the lockless wakeups, comment the smp_wmb and add barrier pairing on
the reader side.
Based-on-work-from: Sebastian Andrzej Siewior <bigeasy@xxxxxxxxxxxxx>
Signed-off-by: Davidlohr Bueso <dbueso@xxxxxxx>
---
ipc/mqueue.c | 52 +++++++++++++++++++++++++++++++++-------------------
1 file changed, 33 insertions(+), 19 deletions(-)
diff --git a/ipc/mqueue.c b/ipc/mqueue.c
index 3aaea7f..11c7b92 100644
--- a/ipc/mqueue.c
+++ b/ipc/mqueue.c
@@ -47,8 +47,7 @@
#define RECV 1
#define STATE_NONE 0
-#define STATE_PENDING 1
-#define STATE_READY 2
+#define STATE_READY 1
struct posix_msg_tree_node {
struct rb_node rb_node;
@@ -571,15 +570,13 @@ static int wq_sleep(struct mqueue_inode_info *info, int sr,
wq_add(info, sr, ewp);
for (;;) {
- set_current_state(TASK_INTERRUPTIBLE);
+ __set_current_state(TASK_INTERRUPTIBLE);
spin_unlock(&info->lock);
time = schedule_hrtimeout_range_clock(timeout, 0,
HRTIMER_MODE_ABS, CLOCK_REALTIME);
- while (ewp->state == STATE_PENDING)
- cpu_relax();
-
+ smp_rmb(); /* pairs with smp_wmb() in pipelined_send/receive */
if (ewp->state == STATE_READY) {
retval = 0;
goto out;
@@ -909,9 +906,14 @@ out_name:
* bypasses the message array and directly hands the message over to the
* receiver.
* The receiver accepts the message and returns without grabbing the queue
- * spinlock. Therefore an intermediate STATE_PENDING state and memory barriers
- * are necessary. The same algorithm is used for sysv semaphores, see
- * ipc/sem.c for more details.
+ * spinlock. The used algorithm is different from sysv semaphores (ipc/sem.c):
+ *
+ * - Set pointer to message.
+ * - Queue the receiver task's for later wakeup (without the info->lock).
+ * - Update its state to STATE_READY. Now the receiver can continue.
+ * - Wake up the process after the lock is dropped. Should the process wake up
+ * before this wakeup (due to a timeout or a signal) it will either see
+ * STATE_READY and continue or acquire the lock to check the sate again.
*
* The same algorithm is used for senders.
*/
@@ -919,21 +921,29 @@ out_name:
/* pipelined_send() - send a message directly to the task waiting in
* sys_mq_timedreceive() (without inserting message into a queue).
*/
-static inline void pipelined_send(struct mqueue_inode_info *info,
+static inline void pipelined_send(struct wake_q_head *wake_q,
+ struct mqueue_inode_info *info,
struct msg_msg *message,
struct ext_wait_queue *receiver)
{
receiver->msg = message;
list_del(&receiver->list);
- receiver->state = STATE_PENDING;
- wake_up_process(receiver->task);
- smp_wmb();
+ wake_q_add(wake_q, receiver->task);
+ /*
+ * Ensure that updating receiver->state is the last
+ * write operation: As once set, the receiver can continue,
+ * and if we don't have the reference count from the wake_q,
+ * yet, at that point we can later have a use-after-free
+ * condition and bogus wakeup.
+ */
+ smp_wmb(); /* pairs with smp_rmb() in wq_sleep */
receiver->state = STATE_READY;
}
/* pipelined_receive() - if there is task waiting in sys_mq_timedsend()
* gets its message and put to the queue (we have one free place for sure). */
-static inline void pipelined_receive(struct mqueue_inode_info *info)
+static inline void pipelined_receive(struct wake_q_head *wake_q,
+ struct mqueue_inode_info *info)
{
struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND);
@@ -944,10 +954,10 @@ static inline void pipelined_receive(struct mqueue_inode_info *info)
}
if (msg_insert(sender->msg, info))
return;
+
list_del(&sender->list);
- sender->state = STATE_PENDING;
- wake_up_process(sender->task);
- smp_wmb();
+ wake_q_add(wake_q, sender->task);
+ smp_wmb(); /* pairs with smp_rmb() in wq_sleep */
sender->state = STATE_READY;
}
@@ -965,6 +975,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
struct timespec ts;
struct posix_msg_tree_node *new_leaf = NULL;
int ret = 0;
+ WAKE_Q(wake_q);
if (u_abs_timeout) {
int res = prepare_timeout(u_abs_timeout, &expires, &ts);
@@ -1049,7 +1060,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
} else {
receiver = wq_get_first_waiter(info, RECV);
if (receiver) {
- pipelined_send(info, msg_ptr, receiver);
+ pipelined_send(&wake_q, info, msg_ptr, receiver);
} else {
/* adds message to the queue */
ret = msg_insert(msg_ptr, info);
@@ -1062,6 +1073,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
}
out_unlock:
spin_unlock(&info->lock);
+ wake_up_q(&wake_q);
out_free:
if (ret)
free_msg(msg_ptr);
@@ -1084,6 +1096,7 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
ktime_t expires, *timeout = NULL;
struct timespec ts;
struct posix_msg_tree_node *new_leaf = NULL;
+ WAKE_Q(wake_q);
if (u_abs_timeout) {
int res = prepare_timeout(u_abs_timeout, &expires, &ts);
@@ -1155,8 +1168,9 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
CURRENT_TIME;
/* There is now free space in queue. */
- pipelined_receive(info);
+ pipelined_receive(&wake_q, info);
spin_unlock(&info->lock);
+ wake_up_q(&wake_q);
ret = 0;
}
if (ret == 0) {
--
2.1.4
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/