Re: [PATCH] ipc/msg: Implement lockless pipelined wakeups

From: Thomas Gleixner
Date: Sat Oct 31 2015 - 06:31:52 EST


On Fri, 30 Oct 2015, Sebastian Andrzej Siewior wrote:
> This was just compile tested. It would be nice if somone with real
> workload could test it. Otherwise I could hack something myself and check
> if it still works.

ltp and glibc should have tests at least at the functional level

> -static void expunge_all(struct msg_queue *msq, int res)
> +static void expunge_all(struct msg_queue *msq, int res,
> + struct wake_q_head *wake_q)
> {
> struct msg_receiver *msr, *t;
>
> list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {
> - msr->r_msg = NULL; /* initialize expunge ordering */

Don't we need to keep that NULL init? I might be missing something.

> - wake_up_process(msr->r_tsk);
> - /*
> - * Ensure that the wakeup is visible before setting r_msg as
> - * the receiving end depends on it: either spinning on a nil,
> - * or dealing with -EAGAIN cases. See lockless receive part 1
> - * and 2 in do_msgrcv().
> - */
> - smp_wmb(); /* barrier (B) */

Please keep the comment intact and fix it as it documents the mechanism
including the barriers. The barrier is now inside of wake_q_add().

> +
> + wake_q_add(wake_q, msr->r_tsk);
> msr->r_msg = ERR_PTR(res);


> -static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
> +static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg,
> + struct wake_q_head *wake_q)
> {
> struct msg_receiver *msr, *t;
>
> @@ -577,27 +576,13 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
>
> list_del(&msr->r_list);
> if (msr->r_maxsize < msg->m_ts) {
> - /* initialize pipelined send ordering */
> - msr->r_msg = NULL;

See above.

> - wake_up_process(msr->r_tsk);
> - /* barrier (B) see barrier comment below */
> - smp_wmb();

Please do not remove barrier documentation. Update it.

> + wake_q_add(wake_q, msr->r_tsk);
> msr->r_msg = ERR_PTR(-E2BIG);
> } else {
> - msr->r_msg = NULL;
> msq->q_lrpid = task_pid_vnr(msr->r_tsk);
> msq->q_rtime = get_seconds();
> - wake_up_process(msr->r_tsk);
> - /*
> - * Ensure that the wakeup is visible before
> - * setting r_msg, as the receiving can otherwise
> - * exit - once r_msg is set, the receiver can
> - * continue. See lockless receive part 1 and 2
> - * in do_msgrcv(). Barrier (B).
> - */

Ditto.

> - smp_wmb();
> + wake_q_add(wake_q, msr->r_tsk);
> msr->r_msg = msg;
> -
> return 1;
> }
> }

> @@ -932,57 +919,25 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
> rcu_read_lock();
>
> /* Lockless receive, part 2:
> - * Wait until pipelined_send or expunge_all are outside of
> - * wake_up_process(). There is a race with exit(), see
> - * ipc/mqueue.c for the details. The correct serialization
> - * ensures that a receiver cannot continue without the wakeup
> - * being visibible _before_ setting r_msg:
> + * The work in pipelined_send() and expunge_all():
> + * - Set pointer to message
> + * - Queue the receiver task for later wakeup
> + * - Wake up the process after the lock is dropped.
> *
> - * CPU 0 CPU 1
> - * <loop receiver>
> - * smp_rmb(); (A) <-- pair -. <waker thread>
> - * <load ->r_msg> | msr->r_msg = NULL;
> - * | wake_up_process();
> - * <continue> `------> smp_wmb(); (B)
> - * msr->r_msg = msg;
> - *
> - * Where (A) orders the message value read and where (B) orders
> - * the write to the r_msg -- done in both pipelined_send and
> - * expunge_all.
> + * Should the process wake up before this wakeup (due to a
> + * signal) it will either see the message and continue â

Please update the barrier documentation so it matches the new code.

> */
> - for (;;) {
> - /*
> - * Pairs with writer barrier in pipelined_send
> - * or expunge_all.
> - */
> - smp_rmb(); /* barrier (A) */
> - msg = (struct msg_msg *)msr_d.r_msg;
> - if (msg)
> - break;
>
> - /*
> - * The cpu_relax() call is a compiler barrier
> - * which forces everything in this loop to be
> - * re-loaded.
> - */
> - cpu_relax();
> - }

Are you sure that this is not longer required? If so then please
explain and update the documentation accordingly.

Thanks,

tglx