Re: [RFC PATCH V5 5/5] workqueue: introduce a way to set workqueue's scheduler

From: Lai Jiangshan
Date: Sun Jan 28 2018 - 23:15:50 EST


I think adding priority boost to workqueue(flush_work()) is the best
way to fix the problem.

On Sat, Jan 27, 2018 at 1:15 PM, Wen Yang <wen.yang99@xxxxxxxxxx> wrote:
> When pinning RT threads to specific cores using CPU affinity, the
> kworkers on the same CPU would starve, which may lead to some kind
> of priority inversion. In that case, the RT threads would also
> suffer high performance impact.
>
> The priority inversion looks like,
> CPU 0: libvirtd acquired cgroup_mutex, and triggered
> lru_add_drain_per_cpu, then waiting for all the kworkers to complete:
> PID: 44145 TASK: ffff8807bec7b980 CPU: 0 COMMAND: "libvirtd"
> #0 [ffff8807f2cbb9d0] __schedule at ffffffff816410ed
> #1 [ffff8807f2cbba38] schedule at ffffffff81641789
> #2 [ffff8807f2cbba48] schedule_timeout at ffffffff8163f479
> #3 [ffff8807f2cbbaf8] wait_for_completion at ffffffff81641b56
> #4 [ffff8807f2cbbb58] flush_work at ffffffff8109efdc
> #5 [ffff8807f2cbbbd0] lru_add_drain_all at ffffffff81179002
> #6 [ffff8807f2cbbc08] migrate_prep at ffffffff811c77be
> #7 [ffff8807f2cbbc18] do_migrate_pages at ffffffff811b8010
> #8 [ffff8807f2cbbcf8] cpuset_migrate_mm at ffffffff810fea6c
> #9 [ffff8807f2cbbd10] cpuset_attach at ffffffff810ff91e
> #10 [ffff8807f2cbbd50] cgroup_attach_task at ffffffff810f9972
> #11 [ffff8807f2cbbe08] attach_task_by_pid at ffffffff810fa520
> #12 [ffff8807f2cbbe58] cgroup_tasks_write at ffffffff810fa593
> #13 [ffff8807f2cbbe68] cgroup_file_write at ffffffff810f8773
> #14 [ffff8807f2cbbef8] vfs_write at ffffffff811dfdfd
> #15 [ffff8807f2cbbf38] sys_write at ffffffff811e089f
> #16 [ffff8807f2cbbf80] system_call_fastpath at ffffffff8164c809
>
> CPU 43: kworker/43 starved because of the RT threads:
> CURRENT: PID: 21294 TASK: ffff883fd2d45080 COMMAND: "lwip"
> RT PRIO_ARRAY: ffff883fff3f4950
> [ 79] PID: 21294 TASK: ffff883fd2d45080 COMMAND: "lwip"
> [ 79] PID: 21295 TASK: ffff88276d481700 COMMAND: "ovdk-ovsvswitch"
> [ 79] PID: 21351 TASK: ffff8807be822280 COMMAND: "dispatcher"
> [ 79] PID: 21129 TASK: ffff8807bef0f300 COMMAND: "ovdk-ovsvswitch"
> [ 79] PID: 21337 TASK: ffff88276d482e00 COMMAND: "handler_3"
> [ 79] PID: 21352 TASK: ffff8807be824500 COMMAND: "flow_dumper"
> [ 79] PID: 21336 TASK: ffff88276d480b80 COMMAND: "handler_2"
> [ 79] PID: 21342 TASK: ffff88276d484500 COMMAND: "handler_8"
> [ 79] PID: 21341 TASK: ffff88276d482280 COMMAND: "handler_7"
> [ 79] PID: 21338 TASK: ffff88276d483980 COMMAND: "handler_4"
> [ 79] PID: 21339 TASK: ffff88276d480000 COMMAND: "handler_5"
> [ 79] PID: 21340 TASK: ffff88276d486780 COMMAND: "handler_6"
> CFS RB_ROOT: ffff883fff3f4868
> [120] PID: 37959 TASK: ffff88276e148000 COMMAND: "kworker/43:1"
>
> CPU 28: Systemd(Victim) was blocked by cgroup_mutex:
> PID: 1 TASK: ffff883fd2d40000 CPU: 28 COMMAND: "systemd"
> #0 [ffff881fd317bd60] __schedule at ffffffff816410ed
> #1 [ffff881fd317bdc8] schedule_preempt_disabled at ffffffff81642869
> #2 [ffff881fd317bdd8] __mutex_lock_slowpath at ffffffff81640565
> #3 [ffff881fd317be38] mutex_lock at ffffffff8163f9cf
> #4 [ffff881fd317be50] proc_cgroup_show at ffffffff810fd256
> #5 [ffff881fd317be98] seq_read at ffffffff81203cda
> #6 [ffff881fd317bf08] vfs_read at ffffffff811dfc6c
> #7 [ffff881fd317bf38] sys_read at ffffffff811e07bf
> #8 [ffff881fd317bf80] system_call_fastpath at ffffffff81
>
> The simplest way to fix that is to set the scheduler of kworkers to
> higher RT priority, just like,
> chrt --fifo -p 61 <kworker_pid>
> However, it can not avoid other WORK_CPU_BOUND worker threads running
> and starving.
>
> This patch introduces a way to set the scheduler(policy and priority)
> of percpu worker_pool, in that way, user could set proper scheduler
> policy and priority of the worker_pool as needed, which could apply
> to all the WORK_CPU_BOUND workers on the same CPU. On the other hand,
> we could using /sys/devices/virtual/workqueue/cpumask for
> WORK_CPU_UNBOUND workers to prevent them starving.
>
> Tejun Heo suggested:
> "* Add scheduler type to wq_attrs so that unbound workqueues can be
> configured.
>
> * Rename system_wq's wq->name from "events" to "system_percpu", and
> similarly for the similarly named workqueues.
>
> * Enable wq_attrs (only the applicable part should show up in the
> interface) for system_percpu and system_percpu_highpri, and use that
> to change the attributes of the percpu pools."
>
> This patch implements the basic infrastructure and /sys interface,
> such as:
> # cat /sys/devices/virtual/workqueue/system_percpu/sched_attr
> policy=0 prio=0 nice=0
> # echo "policy=1 prio=1 nice=0" > /sys/devices/virtual/workqueue/system_percpu/sched_attr
> # cat /sys/devices/virtual/workqueue/system_percpu/sched_attr
> policy=1 prio=1 nice=0
> # cat /sys/devices/virtual/workqueue/system_percpu_highpri/sched_attr
> policy=0 prio=0 nice=-20
> # echo "policy=1 prio=2 nice=0" > /sys/devices/virtual/workqueue/system_percpu_highpri/sched_attr
> # cat /sys/devices/virtual/workqueue/system_percpu_highpri/sched_attr
> policy=1 prio=2 nice=0
>
> Signed-off-by: Wen Yang <wen.yang99@xxxxxxxxxx>
> Signed-off-by: Jiang Biao <jiang.biao2@xxxxxxxxxx>
> Signed-off-by: Tan Hu <tan.hu@xxxxxxxxxx>
> Suggested-by: Tejun Heo <tj@xxxxxxxxxx>
> Cc: Tejun Heo <tj@xxxxxxxxxx>
> Cc: Lai Jiangshan <jiangshanlai@xxxxxxxxx>
> Cc: kernel test robot <xiaolong.ye@xxxxxxxxx>
> Cc: linux-kernel@xxxxxxxxxxxxxxx
> ---
> kernel/workqueue.c | 196 +++++++++++++++++++++++++++++++++++++++++------------
> 1 file changed, 151 insertions(+), 45 deletions(-)
>
> diff --git a/kernel/workqueue.c b/kernel/workqueue.c
> index fca0e30..e58f9bd 100644
> --- a/kernel/workqueue.c
> +++ b/kernel/workqueue.c
> @@ -1699,6 +1699,7 @@ static void worker_attach_to_pool(struct worker *worker,
> * online CPUs. It'll be re-applied when any of the CPUs come up.
> */
> set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
> + sched_setattr(worker->task, &pool->attrs->sched_attr);
>
> /*
> * The pool->attach_mutex ensures %POOL_DISASSOCIATED remains
> @@ -3166,10 +3167,19 @@ struct workqueue_attrs *alloc_workqueue_attrs(gfp_t gfp_mask)
> return NULL;
> }
>
> +static void copy_sched_attr(struct sched_attr *to,
> + const struct sched_attr *from)
> +{
> + to->sched_policy = from->sched_policy;
> + to->sched_priority = from->sched_priority;
> + to->sched_nice = from->sched_nice;
> + to->sched_flags = from->sched_flags;
> +}
> +
> static void copy_workqueue_attrs(struct workqueue_attrs *to,
> const struct workqueue_attrs *from)
> {
> - to->sched_attr.sched_nice = from->sched_attr.sched_nice;
> + copy_sched_attr(&to->sched_attr, &from->sched_attr);
> cpumask_copy(to->cpumask, from->cpumask);
> /*
> * Unlike hash and equality test, this function doesn't ignore
> @@ -3184,17 +3194,32 @@ static u32 wqattrs_hash(const struct workqueue_attrs *attrs)
> {
> u32 hash = 0;
>
> - hash = jhash_1word(attrs->sched_attr.sched_nice, hash);
> + hash = jhash_3words(attrs->sched_attr.sched_policy,
> + attrs->sched_attr.sched_priority,
> + attrs->sched_attr.sched_nice,
> + hash);
> hash = jhash(cpumask_bits(attrs->cpumask),
> BITS_TO_LONGS(nr_cpumask_bits) * sizeof(long), hash);
> return hash;
> }
>
> +static bool sched_attr_equal(const struct sched_attr *a,
> + const struct sched_attr *b)
> +{
> + if (a->sched_policy != b->sched_policy)
> + return false;
> + if (a->sched_priority != b->sched_priority)
> + return false;
> + if (a->sched_nice != b->sched_nice)
> + return false;
> + return true;
> +}
> +
> /* content equality test */
> static bool wqattrs_equal(const struct workqueue_attrs *a,
> const struct workqueue_attrs *b)
> {
> - if (a->sched_attr.sched_nice != b->sched_attr.sched_nice)
> + if (!sched_attr_equal(&a->sched_attr, &b->sched_attr))
> return false;
> if (!cpumask_equal(a->cpumask, b->cpumask))
> return false;
> @@ -3911,6 +3936,11 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq)
> init_pwq(pwq, wq, &cpu_pools[highpri]);
>
> mutex_lock(&wq->mutex);
> + wq->attrs->sched_attr.sched_policy = SCHED_NORMAL;
> + wq->attrs->sched_attr.sched_priority = 0;
> + wq->attrs->sched_attr.sched_nice =
> + wq->flags & WQ_HIGHPRI ?
> + HIGHPRI_NICE_LEVEL : 0;
> link_pwq(pwq);
> mutex_unlock(&wq->mutex);
> }
> @@ -4336,7 +4366,9 @@ static void pr_cont_pool_info(struct worker_pool *pool)
> pr_cont(" cpus=%*pbl", nr_cpumask_bits, pool->attrs->cpumask);
> if (pool->node != NUMA_NO_NODE)
> pr_cont(" node=%d", pool->node);
> - pr_cont(" flags=0x%x nice=%d", pool->flags,
> + pr_cont(" flags=0x%x policy=%u prio=%u nice=%d", pool->flags,
> + pool->attrs->sched_attr.sched_policy,
> + pool->attrs->sched_attr.sched_priority,
> pool->attrs->sched_attr.sched_nice);
> }
>
> @@ -5041,9 +5073,124 @@ static ssize_t max_active_store(struct device *dev,
> }
> static DEVICE_ATTR_RW(max_active);
>
> +static ssize_t sched_attr_show(struct device *dev,
> + struct device_attribute *attr, char *buf)
> +{
> + size_t written;
> + struct workqueue_struct *wq = dev_to_wq(dev);
> +
> + mutex_lock(&wq->mutex);
> + written = scnprintf(buf, PAGE_SIZE,
> + "policy=%u prio=%u nice=%d\n",
> + wq->attrs->sched_attr.sched_policy,
> + wq->attrs->sched_attr.sched_priority,
> + wq->attrs->sched_attr.sched_nice);
> + mutex_unlock(&wq->mutex);
> +
> + return written;
> +}
> +
> +static struct workqueue_attrs *wq_sysfs_prep_attrs(struct workqueue_struct *wq);
> +
> +static int wq_set_unbound_sched_attr(struct workqueue_struct *wq,
> + const struct sched_attr *new)
> +{
> + struct workqueue_attrs *attrs;
> + int ret = -ENOMEM;
> +
> + apply_wqattrs_lock();
> + attrs = wq_sysfs_prep_attrs(wq);
> + if (!attrs)
> + goto out_unlock;
> + copy_sched_attr(&attrs->sched_attr, new);
> + ret = apply_workqueue_attrs_locked(wq, attrs);
> +
> +out_unlock:
> + apply_wqattrs_unlock();
> + free_workqueue_attrs(attrs);
> + return ret;
> +}
> +
> +static int wq_set_bound_sched_attr(struct workqueue_struct *wq,
> + const struct sched_attr *new)
> +{
> + struct pool_workqueue *pwq;
> + struct worker_pool *pool;
> + struct worker *worker;
> + int ret = 0;
> +
> + apply_wqattrs_lock();
> + for_each_pwq(pwq, wq) {
> + pool = pwq->pool;
> + mutex_lock(&pool->attach_mutex);
> + for_each_pool_worker(worker, pool) {
> + ret = sched_setattr(worker->task, new);
> + if (ret) {
> + pr_err("%s:%d err[%d]",
> + __func__, __LINE__, ret);
> + pr_err(" worker[%s] policy[%d] prio[%d] nice[%d]\n",
> + worker->task->comm,
> + new->sched_policy,
> + new->sched_priority,
> + new->sched_nice);
> + }
> + }
> + copy_sched_attr(&pool->attrs->sched_attr, new);
> + mutex_unlock(&pool->attach_mutex);
> + }
> + apply_wqattrs_unlock();
> +
> + mutex_lock(&wq->mutex);
> + copy_sched_attr(&wq->attrs->sched_attr, new);
> + mutex_unlock(&wq->mutex);
> +
> + return ret;
> +}
> +
> +static ssize_t sched_attr_store(struct device *dev,
> + struct device_attribute *attr, const char *buf, size_t count)
> +{
> + struct workqueue_struct *wq = dev_to_wq(dev);
> + struct sched_attr new = {
> + .size = sizeof(struct sched_attr),
> + .sched_policy = SCHED_NORMAL,
> + .sched_flags = 0,
> + .sched_priority = 0,
> + };
> + int ret = 0;
> +
> + if (!capable(CAP_SYS_NICE))
> + return -EPERM;
> +
> + if (sscanf(buf, "policy=%u prio=%u nice=%d",
> + &new.sched_policy,
> + &new.sched_priority,
> + &new.sched_nice) != 3)
> + return -EINVAL;
> +
> + pr_debug("set wq's sched_attr: policy=%u prio=%u nice=%d\n",
> + new.sched_policy,
> + new.sched_priority,
> + new.sched_nice);
> + mutex_lock(&wq->mutex);
> + if (sched_attr_equal(&wq->attrs->sched_attr, &new)) {
> + mutex_unlock(&wq->mutex);
> + return count;
> + }
> + mutex_unlock(&wq->mutex);
> +
> + if (wq->flags & WQ_UNBOUND)
> + ret = wq_set_unbound_sched_attr(wq, &new);
> + else
> + ret = wq_set_bound_sched_attr(wq, &new);
> + return ret ?: count;
> +}
> +static DEVICE_ATTR_RW(sched_attr);
> +
> static struct attribute *wq_sysfs_attrs[] = {
> &dev_attr_per_cpu.attr,
> &dev_attr_max_active.attr,
> + &dev_attr_sched_attr.attr,
> NULL,
> };
> ATTRIBUTE_GROUPS(wq_sysfs);
> @@ -5068,20 +5215,6 @@ static ssize_t wq_pool_ids_show(struct device *dev,
> return written;
> }
>
> -static ssize_t wq_nice_show(struct device *dev, struct device_attribute *attr,
> - char *buf)
> -{
> - struct workqueue_struct *wq = dev_to_wq(dev);
> - int written;
> -
> - mutex_lock(&wq->mutex);
> - written = scnprintf(buf, PAGE_SIZE, "%d\n",
> - wq->attrs->sched_attr.sched_nice);
> - mutex_unlock(&wq->mutex);
> -
> - return written;
> -}
> -
> /* prepare workqueue_attrs for sysfs store operations */
> static struct workqueue_attrs *wq_sysfs_prep_attrs(struct workqueue_struct *wq)
> {
> @@ -5097,32 +5230,6 @@ static struct workqueue_attrs *wq_sysfs_prep_attrs(struct workqueue_struct *wq)
> return attrs;
> }
>
> -static ssize_t wq_nice_store(struct device *dev, struct device_attribute *attr,
> - const char *buf, size_t count)
> -{
> - struct workqueue_struct *wq = dev_to_wq(dev);
> - struct workqueue_attrs *attrs;
> - int ret = -ENOMEM;
> -
> - apply_wqattrs_lock();
> -
> - attrs = wq_sysfs_prep_attrs(wq);
> - if (!attrs)
> - goto out_unlock;
> -
> - if (sscanf(buf, "%d", &attrs->sched_attr.sched_nice) == 1 &&
> - attrs->sched_attr.sched_nice >= MIN_NICE &&
> - attrs->sched_attr.sched_nice <= MAX_NICE)
> - ret = apply_workqueue_attrs_locked(wq, attrs);
> - else
> - ret = -EINVAL;
> -
> -out_unlock:
> - apply_wqattrs_unlock();
> - free_workqueue_attrs(attrs);
> - return ret ?: count;
> -}
> -
> static ssize_t wq_cpumask_show(struct device *dev,
> struct device_attribute *attr, char *buf)
> {
> @@ -5201,7 +5308,6 @@ static ssize_t wq_numa_store(struct device *dev, struct device_attribute *attr,
>
> static struct device_attribute wq_sysfs_unbound_attrs[] = {
> __ATTR(pool_ids, 0444, wq_pool_ids_show, NULL),
> - __ATTR(nice, 0644, wq_nice_show, wq_nice_store),
> __ATTR(cpumask, 0644, wq_cpumask_show, wq_cpumask_store),
> __ATTR(numa, 0644, wq_numa_show, wq_numa_store),
> __ATTR_NULL,
> --
> 1.8.3.1
>