[PATCH 17/31] workqueue: implement attribute-based unbound worker_pool management

From: Tejun Heo
Date: Fri Mar 01 2013 - 22:30:42 EST


This patch makes unbound worker_pools reference counted and
dynamically created and destroyed as workqueues needing them come and
go. All unbound worker_pools are hashed on unbound_pool_hash which is
keyed by the content of worker_pool->attrs.

When an unbound workqueue is allocated, get_unbound_pool() is called
with the attributes of the workqueue. If there already is a matching
worker_pool, the reference count is bumped and the pool is returned.
If not, a new worker_pool with matching attributes is created and
returned.

When an unbound workqueue is destroyed, put_unbound_pool() is called
which decrements the reference count of the associated worker_pool.
If the refcnt reaches zero, the worker_pool is destroyed in sched-RCU
safe way.

Note that the standard unbound worker_pools - normal and highpri ones
with no specific cpumask affinity - are no longer created explicitly
during init_workqueues(). init_workqueues() only initializes
workqueue_attrs to be used for standard unbound pools -
unbound_std_wq_attrs[]. The pools are spawned on demand as workqueues
are created.

Signed-off-by: Tejun Heo <tj@xxxxxxxxxx>
---
kernel/workqueue.c | 230 ++++++++++++++++++++++++++++++++++++++++++++++++++---
1 file changed, 218 insertions(+), 12 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 7eba824..fb91b67 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -41,6 +41,7 @@
#include <linux/debug_locks.h>
#include <linux/lockdep.h>
#include <linux/idr.h>
+#include <linux/jhash.h>
#include <linux/hashtable.h>
#include <linux/rculist.h>

@@ -80,6 +81,7 @@ enum {

NR_STD_WORKER_POOLS = 2, /* # standard pools per cpu */

+ UNBOUND_POOL_HASH_ORDER = 6, /* hashed by pool->attrs */
BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */

MAX_IDLE_WORKERS_RATIO = 4, /* 1/4 of busy can be idle */
@@ -149,6 +151,8 @@ struct worker_pool {
struct ida worker_ida; /* L: for worker IDs */

struct workqueue_attrs *attrs; /* I: worker attributes */
+ struct hlist_node hash_node; /* R: unbound_pool_hash node */
+ atomic_t refcnt; /* refcnt for unbound pools */

/*
* The current concurrency level. As it's likely to be accessed
@@ -156,6 +160,12 @@ struct worker_pool {
* cacheline.
*/
atomic_t nr_running ____cacheline_aligned_in_smp;
+
+ /*
+ * Destruction of pool is sched-RCU protected to allow dereferences
+ * from get_work_pool().
+ */
+ struct rcu_head rcu;
} ____cacheline_aligned_in_smp;

/*
@@ -218,6 +228,11 @@ struct workqueue_struct {

static struct kmem_cache *pwq_cache;

+/* hash of all unbound pools keyed by pool->attrs */
+static DEFINE_HASHTABLE(unbound_pool_hash, UNBOUND_POOL_HASH_ORDER);
+
+static struct workqueue_attrs *unbound_std_wq_attrs[NR_STD_WORKER_POOLS];
+
struct workqueue_struct *system_wq __read_mostly;
EXPORT_SYMBOL_GPL(system_wq);
struct workqueue_struct *system_highpri_wq __read_mostly;
@@ -1740,7 +1755,7 @@ static struct worker *create_worker(struct worker_pool *pool)
worker->pool = pool;
worker->id = id;

- if (pool->cpu != WORK_CPU_UNBOUND)
+ if (pool->cpu >= 0)
worker->task = kthread_create_on_node(worker_thread,
worker, cpu_to_node(pool->cpu),
"kworker/%d:%d%s", pool->cpu, id, pri);
@@ -3159,6 +3174,54 @@ fail:
return NULL;
}

+static void copy_workqueue_attrs(struct workqueue_attrs *to,
+ const struct workqueue_attrs *from)
+{
+ to->nice = from->nice;
+ cpumask_copy(to->cpumask, from->cpumask);
+}
+
+/*
+ * Hacky implementation of jhash of bitmaps which only considers the
+ * specified number of bits. We probably want a proper implementation in
+ * include/linux/jhash.h.
+ */
+static u32 jhash_bitmap(const unsigned long *bitmap, int bits, u32 hash)
+{
+ int nr_longs = bits / BITS_PER_LONG;
+ int nr_leftover = bits % BITS_PER_LONG;
+ unsigned long leftover = 0;
+
+ if (nr_longs)
+ hash = jhash(bitmap, nr_longs * sizeof(long), hash);
+ if (nr_leftover) {
+ bitmap_copy(&leftover, bitmap + nr_longs, nr_leftover);
+ hash = jhash(&leftover, sizeof(long), hash);
+ }
+ return hash;
+}
+
+/* hash value of the content of @attr */
+static u32 wqattrs_hash(const struct workqueue_attrs *attrs)
+{
+ u32 hash = 0;
+
+ hash = jhash_1word(attrs->nice, hash);
+ hash = jhash_bitmap(cpumask_bits(attrs->cpumask), nr_cpu_ids, hash);
+ return hash;
+}
+
+/* content equality test */
+static bool wqattrs_equal(const struct workqueue_attrs *a,
+ const struct workqueue_attrs *b)
+{
+ if (a->nice != b->nice)
+ return false;
+ if (!cpumask_equal(a->cpumask, b->cpumask))
+ return false;
+ return true;
+}
+
/**
* init_worker_pool - initialize a newly zalloc'd worker_pool
* @pool: worker_pool to initialize
@@ -3169,6 +3232,8 @@ fail:
static int init_worker_pool(struct worker_pool *pool)
{
spin_lock_init(&pool->lock);
+ pool->id = -1;
+ pool->cpu = -1;
pool->flags |= POOL_DISASSOCIATED;
INIT_LIST_HEAD(&pool->worklist);
INIT_LIST_HEAD(&pool->idle_list);
@@ -3185,12 +3250,133 @@ static int init_worker_pool(struct worker_pool *pool)
mutex_init(&pool->assoc_mutex);
ida_init(&pool->worker_ida);

+ INIT_HLIST_NODE(&pool->hash_node);
+ atomic_set(&pool->refcnt, 1);
pool->attrs = alloc_workqueue_attrs(GFP_KERNEL);
if (!pool->attrs)
return -ENOMEM;
return 0;
}

+static void rcu_free_pool(struct rcu_head *rcu)
+{
+ struct worker_pool *pool = container_of(rcu, struct worker_pool, rcu);
+
+ ida_destroy(&pool->worker_ida);
+ free_workqueue_attrs(pool->attrs);
+ kfree(pool);
+}
+
+/**
+ * put_unbound_pool - put a worker_pool
+ * @pool: worker_pool to put
+ *
+ * Put @pool. If its refcnt reaches zero, it gets destroyed in sched-RCU
+ * safe manner.
+ */
+static void put_unbound_pool(struct worker_pool *pool)
+{
+ struct worker *worker;
+
+ if (!atomic_dec_and_test(&pool->refcnt))
+ return;
+
+ /* sanity checks */
+ if (WARN_ON(!(pool->flags & POOL_DISASSOCIATED)))
+ return;
+ if (WARN_ON(pool->nr_workers != pool->nr_idle))
+ return;
+ if (WARN_ON(!list_empty(&pool->worklist)))
+ return;
+
+ /* release id and unhash */
+ spin_lock_irq(&workqueue_lock);
+ if (pool->id >= 0)
+ idr_remove(&worker_pool_idr, pool->id);
+ hash_del(&pool->hash_node);
+ spin_unlock_irq(&workqueue_lock);
+
+ /* lock out manager and destroy all workers */
+ mutex_lock(&pool->manager_mutex);
+ spin_lock_irq(&pool->lock);
+
+ while ((worker = first_worker(pool)))
+ destroy_worker(worker);
+ WARN_ON(pool->nr_workers || pool->nr_idle);
+
+ spin_unlock_irq(&pool->lock);
+ mutex_unlock(&pool->manager_mutex);
+
+ /* shut down the timers */
+ del_timer_sync(&pool->idle_timer);
+ del_timer_sync(&pool->mayday_timer);
+
+ /* sched-RCU protected to allow dereferences from get_work_pool() */
+ call_rcu_sched(&pool->rcu, rcu_free_pool);
+}
+
+/**
+ * get_unbound_pool - get a worker_pool with the specified attributes
+ * @attrs: the attributes of the worker_pool to get
+ *
+ * Obtain a worker_pool which has the same attributes as @attrs, bump the
+ * reference count and return it. If there already is a matching
+ * worker_pool, it will be used; otherwise, this function attempts to
+ * create a new one. On failure, returns NULL.
+ */
+static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
+{
+ static DEFINE_MUTEX(create_mutex);
+ u32 hash = wqattrs_hash(attrs);
+ struct worker_pool *pool;
+ struct hlist_node *tmp;
+ struct worker *worker;
+
+ mutex_lock(&create_mutex);
+
+ /* do we already have a matching pool? */
+ spin_lock_irq(&workqueue_lock);
+ hash_for_each_possible(unbound_pool_hash, pool, tmp, hash_node, hash) {
+ if (wqattrs_equal(pool->attrs, attrs)) {
+ atomic_inc(&pool->refcnt);
+ goto out_unlock;
+ }
+ }
+ spin_unlock_irq(&workqueue_lock);
+
+ /* nope, create a new one */
+ pool = kzalloc(sizeof(*pool), GFP_KERNEL);
+ if (!pool || init_worker_pool(pool) < 0)
+ goto fail;
+
+ copy_workqueue_attrs(pool->attrs, attrs);
+
+ if (worker_pool_assign_id(pool) < 0)
+ goto fail;
+
+ /* create and start the initial worker */
+ worker = create_worker(pool);
+ if (!worker)
+ goto fail;
+
+ spin_lock_irq(&pool->lock);
+ start_worker(worker);
+ spin_unlock_irq(&pool->lock);
+
+ /* install */
+ spin_lock_irq(&workqueue_lock);
+ hash_add(unbound_pool_hash, &pool->hash_node, hash);
+out_unlock:
+ spin_unlock_irq(&workqueue_lock);
+ mutex_unlock(&create_mutex);
+ return pool;
+fail:
+ mutex_unlock(&create_mutex);
+ if (pool)
+ put_unbound_pool(pool);
+ return NULL;
+}
+
static int alloc_and_link_pwqs(struct workqueue_struct *wq)
{
bool highpri = wq->flags & WQ_HIGHPRI;
@@ -3215,7 +3401,12 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq)
if (!pwq)
return -ENOMEM;

- pwq->pool = get_std_worker_pool(WORK_CPU_UNBOUND, highpri);
+ pwq->pool = get_unbound_pool(unbound_std_wq_attrs[highpri]);
+ if (!pwq->pool) {
+ kmem_cache_free(pwq_cache, pwq);
+ return -ENOMEM;
+ }
+
list_add_tail_rcu(&pwq->pwqs_node, &wq->pwqs);
}

@@ -3393,6 +3584,15 @@ void destroy_workqueue(struct workqueue_struct *wq)
kfree(wq->rescuer);
}

+ /*
+ * We're the sole accessor of @wq at this point. Directly access
+ * the first pwq and put its pool.
+ */
+ if (wq->flags & WQ_UNBOUND) {
+ pwq = list_first_entry(&wq->pwqs, struct pool_workqueue,
+ pwqs_node);
+ put_unbound_pool(pwq->pool);
+ }
free_pwqs(wq);
kfree(wq);
}
@@ -3856,19 +4056,14 @@ static int __init init_workqueues(void)
hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);

/* initialize CPU pools */
- for_each_wq_cpu(cpu) {
+ for_each_possible_cpu(cpu) {
struct worker_pool *pool;

i = 0;
for_each_std_worker_pool(pool, cpu) {
BUG_ON(init_worker_pool(pool));
pool->cpu = cpu;
-
- if (cpu != WORK_CPU_UNBOUND)
- cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
- else
- cpumask_setall(pool->attrs->cpumask);
-
+ cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
pool->attrs->nice = std_nice[i++];

/* alloc pool ID */
@@ -3877,14 +4072,13 @@ static int __init init_workqueues(void)
}

/* create the initial worker */
- for_each_online_wq_cpu(cpu) {
+ for_each_online_cpu(cpu) {
struct worker_pool *pool;

for_each_std_worker_pool(pool, cpu) {
struct worker *worker;

- if (cpu != WORK_CPU_UNBOUND)
- pool->flags &= ~POOL_DISASSOCIATED;
+ pool->flags &= ~POOL_DISASSOCIATED;

worker = create_worker(pool);
BUG_ON(!worker);
@@ -3894,6 +4088,18 @@ static int __init init_workqueues(void)
}
}

+ /* create default unbound wq attrs */
+ for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
+ struct workqueue_attrs *attrs;
+
+ BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));
+
+ attrs->nice = std_nice[i];
+ cpumask_setall(attrs->cpumask);
+
+ unbound_std_wq_attrs[i] = attrs;
+ }
+
system_wq = alloc_workqueue("events", 0, 0);
system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
system_long_wq = alloc_workqueue("events_long", 0, 0);
--
1.8.1.2

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/