[RFC PATCH 2/4] workqueue: introduce support for attaching to cgroups

From: Bandan Das
Date: Fri Mar 18 2016 - 18:16:20 EST


Introduce cgroup aware unbounded worker pools. Whenever a new worker thread is
created, create_worker attaches itself to the cgroups of the task that called
alloc_workqueue(). New worker pools are created if there's no match in the global
list of cgroup aware worker pools.

Signed-off-by: Bandan Das <bsd@xxxxxxxxxx>
---
include/linux/workqueue.h | 2 +
kernel/workqueue.c | 212 +++++++++++++++++++++++++++++++++++++++++---
kernel/workqueue_internal.h | 4 +
3 files changed, 204 insertions(+), 14 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index ca73c50..7afb72d 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -131,6 +131,7 @@ struct workqueue_attrs {
int nice; /* nice level */
cpumask_var_t cpumask; /* allowed CPUs */
bool no_numa; /* disable NUMA affinity */
+ bool cg_enabled; /* cgroups aware */
};

static inline struct delayed_work *to_delayed_work(struct work_struct *work)
@@ -308,6 +309,7 @@ enum {
* http://thread.gmane.org/gmane.linux.kernel/1480396
*/
WQ_POWER_EFFICIENT = 1 << 7,
+ WQ_CGROUPS = 1 << 8,

__WQ_DRAINING = 1 << 16, /* internal: workqueue is draining */
__WQ_ORDERED = 1 << 17, /* internal: workqueue is ordered */
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 7ff5dc7..f052d85 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -48,6 +48,7 @@
#include <linux/nodemask.h>
#include <linux/moduleparam.h>
#include <linux/uaccess.h>
+#include <linux/cgroup.h>

#include "workqueue_internal.h"

@@ -139,8 +140,18 @@ enum {
* MD: wq_mayday_lock protected.
*/

+/*
+ * list of tasks that "own" the cgroups that
+ * this pool is attached to
+ */
+struct cgroup_owners {
+ struct task_struct *owner;
+ struct list_head link;
+};
+
/* struct worker is defined in workqueue_internal.h */

+
struct worker_pool {
spinlock_t lock; /* the pool lock */
int cpu; /* I: the associated cpu */
@@ -169,6 +180,8 @@ struct worker_pool {
struct worker *manager; /* L: purely informational */
struct mutex attach_mutex; /* attach/detach exclusion */
struct list_head workers; /* A: attached workers */
+ struct list_head cg_owners; /* tasks using this pool*/
+ struct list_head unbound_node; /* all cgroup aware pools */
struct completion *detach_completion; /* all workers detached */

struct ida worker_ida; /* worker IDs for task name */
@@ -219,6 +232,7 @@ struct pool_workqueue {
*/
struct work_struct unbound_release_work;
struct rcu_head rcu;
+ struct task_struct *owner; /*for cgroups */
} __aligned(1 << WORK_STRUCT_FLAG_BITS);

/*
@@ -299,6 +313,7 @@ static DEFINE_MUTEX(wq_pool_mutex); /* protects pools and workqueues list */
static DEFINE_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */

static LIST_HEAD(workqueues); /* PR: list of all workqueues */
+static LIST_HEAD(unbound_cgpool); /* list of cgroup aware worker pools */
static bool workqueue_freezing; /* PL: have wqs started freezing? */

/* PL: allowable cpus for unbound wqs and work items */
@@ -425,6 +440,12 @@ static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
if (({ assert_rcu_or_wq_mutex(wq); false; })) { } \
else

+#define for_each_unbound_cgpool(pool) \
+ list_for_each_entry_rcu((pool), &(unbound_cgpool), unbound_node)
+
+#define for_each_task_cgpool(cgtask, pool) \
+ list_for_each_entry_rcu((cgtask), &(pool)->cg_owners, link)
+
#ifdef CONFIG_DEBUG_OBJECTS_WORK

static struct debug_obj_descr work_debug_descr;
@@ -700,6 +721,7 @@ static struct pool_workqueue *get_work_pwq(struct work_struct *work)
*
* Return: The worker_pool @work was last associated with. %NULL if none.
*/
+
static struct worker_pool *get_work_pool(struct work_struct *work)
{
unsigned long data = atomic_long_read(&work->data);
@@ -757,6 +779,7 @@ static bool work_is_canceling(struct work_struct *work)
* they're being called with pool->lock held.
*/

+
static bool __need_more_worker(struct worker_pool *pool)
{
return !atomic_read(&pool->nr_running);
@@ -1072,6 +1095,7 @@ static void get_pwq(struct pool_workqueue *pwq)
static void put_pwq(struct pool_workqueue *pwq)
{
lockdep_assert_held(&pwq->pool->lock);
+
if (likely(--pwq->refcnt))
return;
if (WARN_ON_ONCE(!(pwq->wq->flags & WQ_UNBOUND)))
@@ -1387,6 +1411,9 @@ retry:
/* pwq which will be used unless @work is executing elsewhere */
if (!(wq->flags & WQ_UNBOUND))
pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
+ else if (wq->flags & WQ_CGROUPS)
+ /* use the default pwq */
+ pwq = unbound_pwq_by_node(wq, NUMA_NO_NODE);
else
pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));

@@ -1674,6 +1701,8 @@ static struct worker *alloc_worker(int node)
/* on creation a worker is in !idle && prep state */
worker->flags = WORKER_PREP;
}
+ worker->attach_pending = false;
+ worker->attach_to = NULL;
return worker;
}

@@ -1695,7 +1724,8 @@ static void worker_attach_to_pool(struct worker *worker,
* set_cpus_allowed_ptr() will fail if the cpumask doesn't have any
* online CPUs. It'll be re-applied when any of the CPUs come up.
*/
- set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
+ if (!pool->attrs->cg_enabled)
+ set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);

/*
* The pool->attach_mutex ensures %POOL_DISASSOCIATED remains
@@ -1760,6 +1790,7 @@ static struct worker *create_worker(struct worker_pool *pool)
if (id < 0)
goto fail;

+ /* Note: if user specified cgroups, node is NUMA_NO_NODE */
worker = alloc_worker(pool->node);
if (!worker)
goto fail;
@@ -1779,7 +1810,11 @@ static struct worker *create_worker(struct worker_pool *pool)
goto fail;

set_user_nice(worker->task, pool->attrs->nice);
- kthread_bind_mask(worker->task, pool->attrs->cpumask);
+ if (pool->attrs->cg_enabled) {
+ worker->attach_pending = true;
+ worker->attach_to = current;
+ } else
+ kthread_bind_mask(worker->task, pool->attrs->cpumask);

/* successful, attach the worker to the pool */
worker_attach_to_pool(worker, pool);
@@ -2172,6 +2207,7 @@ static int worker_thread(void *__worker)
{
struct worker *worker = __worker;
struct worker_pool *pool = worker->pool;
+ int cgattach;

/* tell the scheduler that this is a workqueue worker */
worker->task->flags |= PF_WQ_WORKER;
@@ -2191,6 +2227,14 @@ woke_up:
return 0;
}

+ /* this is supposed to run only the first time to attach to cgroups */
+ if (worker->attach_pending) {
+ cgattach = cgroup_attach_task_all(worker->attach_to, current);
+ if (cgattach)
+ pr_warn("workqueue: worker cgroup attach failed but we will still run!");
+ worker->attach_pending = false;
+ }
+
worker_leave_idle(worker);
recheck:
/* no more worker necessary? */
@@ -3181,6 +3225,7 @@ static int init_worker_pool(struct worker_pool *pool)
pool->watchdog_ts = jiffies;
INIT_LIST_HEAD(&pool->worklist);
INIT_LIST_HEAD(&pool->idle_list);
+ INIT_LIST_HEAD(&pool->cg_owners);
hash_init(pool->busy_hash);

init_timer_deferrable(&pool->idle_timer);
@@ -3251,13 +3296,22 @@ static void put_unbound_pool(struct worker_pool *pool)

/* sanity checks */
if (WARN_ON(!(pool->cpu < 0)) ||
- WARN_ON(!list_empty(&pool->worklist)))
+ WARN_ON(!list_empty(&pool->worklist)) ||
+ WARN_ON(!list_empty(&pool->cg_owners)))
return;

/* release id and unhash */
if (pool->id >= 0)
idr_remove(&worker_pool_idr, pool->id);
- hash_del(&pool->hash_node);
+
+ /*
+ * this pool is going down, so remove from the list of
+ * cgroup aware pools
+ */
+ if (pool->attrs->cg_enabled)
+ list_del(&pool->unbound_node);
+ else
+ hash_del(&pool->hash_node);

/*
* Become the manager and destroy all workers. Grabbing
@@ -3290,6 +3344,65 @@ static void put_unbound_pool(struct worker_pool *pool)
call_rcu_sched(&pool->rcu, rcu_free_pool);
}

+static void remove_task_cgpool(struct worker_pool *pool,
+ struct task_struct *tsk)
+{
+ struct cgroup_owners *iter;
+
+ if (pool->attrs->cg_enabled) {
+ for_each_task_cgpool(iter, pool) {
+ if (iter->owner == tsk) {
+ list_del(&iter->link);
+ break;
+ }
+ }
+ }
+}
+
+static bool attach_task_cgpool(struct worker_pool *pool,
+ struct task_struct *tsk)
+{
+ bool result = true;
+ struct cgroup_owners *entry = kzalloc(sizeof(*entry), GFP_KERNEL);
+
+ if (!entry) {
+ result = false;
+ goto done;
+ }
+
+ entry->owner = tsk;
+ list_add_tail(&entry->link, &pool->cg_owners);
+
+done:
+ return result;
+}
+
+static struct worker_pool *find_cg_matching_pool(struct task_struct *tsk)
+{
+ struct worker_pool *pool = NULL, *iter;
+ bool found = false;
+
+ for_each_unbound_cgpool(iter) {
+ struct cgroup_owners *cgtask;
+
+ for_each_task_cgpool(cgtask, iter) {
+ if (cgtask->owner == tsk ||
+ cgroup_match_groups(cgtask->owner, tsk)) {
+ found = true;
+ break;
+ }
+ }
+
+ if (found) {
+ pool = iter;
+ pool->refcnt++;
+ break;
+ }
+ }
+
+ return pool;
+}
+
/**
* get_unbound_pool - get a worker_pool with the specified attributes
* @attrs: the attributes of the worker_pool to get
@@ -3310,9 +3423,19 @@ static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
struct worker_pool *pool;
int node;
int target_node = NUMA_NO_NODE;
+ bool cgroups_enabled = attrs->cg_enabled;

lockdep_assert_held(&wq_pool_mutex);

+ if (cgroups_enabled) {
+ /* "current" is the owner */
+ pool = find_cg_matching_pool(current);
+ if (!pool)
+ goto create;
+ else
+ return pool;
+ }
+
/* do we already have a matching pool? */
hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
if (wqattrs_equal(pool->attrs, attrs)) {
@@ -3332,6 +3455,8 @@ static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
}
}

+
+create:
/* nope, create a new one */
pool = kzalloc_node(sizeof(*pool), GFP_KERNEL, target_node);
if (!pool || init_worker_pool(pool) < 0)
@@ -3347,6 +3472,9 @@ static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
*/
pool->attrs->no_numa = false;

+ if (cgroups_enabled)
+ pool->attrs->cg_enabled = true;
+
if (worker_pool_assign_id(pool) < 0)
goto fail;

@@ -3355,7 +3483,10 @@ static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
goto fail;

/* install */
- hash_add(unbound_pool_hash, &pool->hash_node, hash);
+ if (cgroups_enabled)
+ list_add_tail(&pool->unbound_node, &unbound_cgpool);
+ else
+ hash_add(unbound_pool_hash, &pool->hash_node, hash);

return pool;
fail:
@@ -3390,6 +3521,8 @@ static void pwq_unbound_release_workfn(struct work_struct *work)
is_last = list_empty(&wq->pwqs);
mutex_unlock(&wq->mutex);

+ remove_task_cgpool(pool, pwq->owner);
+
mutex_lock(&wq_pool_mutex);
put_unbound_pool(pool);
mutex_unlock(&wq_pool_mutex);
@@ -3462,6 +3595,11 @@ static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,
pwq->wq = wq;
pwq->flush_color = -1;
pwq->refcnt = 1;
+ if (pool->attrs->cg_enabled) {
+ /* Add the current task to pool cg_owners */
+ WARN_ON(!attach_task_cgpool(pool, current));
+ pwq->owner = current;
+ }
INIT_LIST_HEAD(&pwq->delayed_works);
INIT_LIST_HEAD(&pwq->pwqs_node);
INIT_LIST_HEAD(&pwq->mayday_node);
@@ -3502,7 +3640,11 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
if (!pool)
return NULL;

- pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
+ if (wq->unbound_attrs->cg_enabled)
+ pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL);
+ else
+ pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
+
if (!pwq) {
put_unbound_pool(pool);
return NULL;
@@ -3590,8 +3732,10 @@ static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
if (ctx) {
int node;

- for_each_node(node)
- put_pwq_unlocked(ctx->pwq_tbl[node]);
+ if (ctx->attrs->cg_enabled) {
+ for_each_node(node)
+ put_pwq_unlocked(ctx->pwq_tbl[node]);
+ }
put_pwq_unlocked(ctx->dfl_pwq);

free_workqueue_attrs(ctx->attrs);
@@ -3607,11 +3751,14 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
{
struct apply_wqattrs_ctx *ctx;
struct workqueue_attrs *new_attrs, *tmp_attrs;
- int node;
+ int node, numa_nodes = nr_node_ids;
+ bool cgroups_enabled = wq->unbound_attrs->cg_enabled;

lockdep_assert_held(&wq_pool_mutex);

- ctx = kzalloc(sizeof(*ctx) + nr_node_ids * sizeof(ctx->pwq_tbl[0]),
+ if (cgroups_enabled)
+ numa_nodes = 0;
+ ctx = kzalloc(sizeof(*ctx) + numa_nodes * sizeof(ctx->pwq_tbl[0]),
GFP_KERNEL);

new_attrs = alloc_workqueue_attrs(GFP_KERNEL);
@@ -3623,6 +3770,7 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
* Calculate the attrs of the default pwq.
* If the user configured cpumask doesn't overlap with the
* wq_unbound_cpumask, we fallback to the wq_unbound_cpumask.
+ * This does not copy attrs->cg_enabled
*/
copy_workqueue_attrs(new_attrs, attrs);
cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_cpumask);
@@ -3640,11 +3788,16 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
* If something goes wrong during CPU up/down, we'll fall back to
* the default pwq covering whole @attrs->cpumask. Always create
* it even if we don't use it immediately.
+ * For cgroups aware wqs, there will be on only one pwq
*/
+ new_attrs->cg_enabled = cgroups_enabled;
ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
if (!ctx->dfl_pwq)
goto out_free;

+ if (cgroups_enabled)
+ goto done;
+
for_each_node(node) {
if (wq_calc_node_cpumask(new_attrs, node, -1, tmp_attrs->cpumask)) {
ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
@@ -3656,8 +3809,10 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
}
}

+done:
/* save the user configured attrs and sanitize it. */
copy_workqueue_attrs(new_attrs, attrs);
+ /* at this point, note that cg_enabled is untouched */
cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
ctx->attrs = new_attrs;

@@ -3676,16 +3831,23 @@ out_free:
static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
{
int node;
+ bool cgroups_enabled = ctx->wq->unbound_attrs->cg_enabled;

/* all pwqs have been created successfully, let's install'em */
mutex_lock(&ctx->wq->mutex);

copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);

- /* save the previous pwq and install the new one */
+ /*
+ * save the previous pwq and install the new one
+ * if WQ_CGROUPS is set, then we don't allocate space for pwq_tbl at all
+ * so in that case, only dfl_pwq is valid
+ */
+ if (!cgroups_enabled) {
for_each_node(node)
ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node,
ctx->pwq_tbl[node]);
+ }

/* @dfl_pwq might not have been used, ensure it's linked */
link_pwq(ctx->dfl_pwq);
@@ -3882,6 +4044,7 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq)
static int wq_clamp_max_active(int max_active, unsigned int flags,
const char *name)
{
+ /* Determine max for cgroups ? */
int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE;

if (max_active < 1 || max_active > lim)
@@ -3901,14 +4064,30 @@ struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
va_list args;
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
+ bool cgroups_enabled = false;
+
+#ifdef CONFIG_CGROUPS
+ /* Only unbound workqueues but not ordered */
+ if ((flags & WQ_CGROUPS) && (flags & WQ_UNBOUND) &&
+ !(flags & __WQ_ORDERED))
+ cgroups_enabled = true;
+#endif

/* see the comment above the definition of WQ_POWER_EFFICIENT */
- if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)
+ if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient) {
flags |= WQ_UNBOUND;
+ if (cgroups_enabled) {
+ pr_warn("workqueue: disabling cgroups because WQ_POWER_EFFICIENT specified");
+ cgroups_enabled = false;
+ }
+ }

/* allocate wq and format name */
- if (flags & WQ_UNBOUND)
- tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);
+ if (flags & WQ_UNBOUND) {
+ if (!cgroups_enabled)
+ tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);
+ /* else let cgroups take care of us */
+ }

wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
if (!wq)
@@ -3918,6 +4097,8 @@ struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL);
if (!wq->unbound_attrs)
goto err_free_wq;
+ if (cgroups_enabled)
+ wq->unbound_attrs->cg_enabled = true;
}

va_start(args, lock_name);
@@ -4980,6 +5161,9 @@ static ssize_t wq_pool_ids_show(struct device *dev,
const char *delim = "";
int node, written = 0;

+ if (wq->unbound_attrs->cg_enabled)
+ return 0;
+
rcu_read_lock_sched();
for_each_node(node) {
written += scnprintf(buf + written, PAGE_SIZE - written,
diff --git a/kernel/workqueue_internal.h b/kernel/workqueue_internal.h
index 4521587..49228cab 100644
--- a/kernel/workqueue_internal.h
+++ b/kernel/workqueue_internal.h
@@ -52,6 +52,10 @@ struct worker {

/* used only by rescuers to point to the target workqueue */
struct workqueue_struct *rescue_wq; /* I: the workqueue to rescue */
+
+ /* for cgroups */
+ bool attach_pending;
+ struct task_struct *attach_to;
};

/**
--
2.5.0