[PATCH 2/4] workqueue: split apply_workqueue_attrs() into 3 stages

From: Lai Jiangshan
Date: Thu Mar 12 2015 - 00:59:02 EST


Current apply_workqueue_attrs() includes pwqs-allocation and pwqs-installation,
so when we batch multiple apply_workqueue_attrs()s as a transaction, we can't
ensure the transaction must succeed or fail as a complete unit.

To solve this, we split apply_workqueue_attrs() into three stages.
The first stage does the preparation: allocation memory, pwqs.
The second stage does the attrs-installaion and pwqs-installation.
The third stage frees the allocated memory and (old or unused) pwqs.

As the result, batching multiple apply_workqueue_attrs()s can
succeed or fail as a complete unit:
1) batch do all the first stage for all the workqueues
2) only commit all when all the above succeed.

This patch is a preparation for the next patch ("Allow modifying low level
unbound workqueue cpumask") which will do a multiple apply_workqueue_attrs().

The patch doesn't have functionality changed except two minor adjustment:
1) free_unbound_pwq() for the error path is removed, we use the
heavier version put_pwq_unlocked() instead since the error path
is rare. this adjustment simplifies the code.
2) the memory-allocation is also moved into wq_pool_mutex.
this is needed to avoid to do the further splitting.

Cc: Christoph Lameter <cl@xxxxxxxxx>
Cc: Kevin Hilman <khilman@xxxxxxxxxx>
Cc: Lai Jiangshan <laijs@xxxxxxxxxxxxxx>
Cc: Mike Galbraith <bitbucket@xxxxxxxxx>
Cc: Paul E. McKenney <paulmck@xxxxxxxxxxxxxxxxxx>
Cc: Tejun Heo <tj@xxxxxxxxxx>
Cc: Viresh Kumar <viresh.kumar@xxxxxxxxxx>
Cc: Frederic Weisbecker <fweisbec@xxxxxxxxx>
Signed-off-by: Lai Jiangshan <laijs@xxxxxxxxxxxxxx>
---
kernel/workqueue.c | 199 +++++++++++++++++++++++++++++++----------------------
1 file changed, 115 insertions(+), 84 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index c6845fa..957b244 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -3424,17 +3424,6 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
return pwq;
}

-/* undo alloc_unbound_pwq(), used only in the error path */
-static void free_unbound_pwq(struct pool_workqueue *pwq)
-{
- lockdep_assert_held(&wq_pool_mutex);
-
- if (pwq) {
- put_unbound_pool(pwq->pool);
- kmem_cache_free(pwq_cache, pwq);
- }
-}
-
/**
* wq_calc_node_mask - calculate a wq_attrs' cpumask for the specified node
* @attrs: the wq_attrs of interest
@@ -3497,42 +3486,46 @@ static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq,
return old_pwq;
}

-/**
- * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
- * @wq: the target workqueue
- * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
- *
- * Apply @attrs to an unbound workqueue @wq. Unless disabled, on NUMA
- * machines, this function maps a separate pwq to each NUMA node with
- * possibles CPUs in @attrs->cpumask so that work items are affine to the
- * NUMA node it was issued on. Older pwqs are released as in-flight work
- * items finish. Note that a work item which repeatedly requeues itself
- * back-to-back will stay on its current pwq.
- *
- * Performs GFP_KERNEL allocations.
- *
- * Return: 0 on success and -errno on failure.
- */
-int apply_workqueue_attrs(struct workqueue_struct *wq,
- const struct workqueue_attrs *attrs)
+struct wq_unbound_install_ctx {
+ struct workqueue_struct *wq; /* target to be installed */
+ struct workqueue_attrs *attrs; /* attrs for installing */
+ struct pool_workqueue *dfl_pwq;
+ struct pool_workqueue *pwq_tbl[];
+};
+
+static void wq_unbound_install_ctx_free(struct wq_unbound_install_ctx *ctx)
{
+ int node;
+
+ if (ctx) {
+ /* put the pwqs */
+ for_each_node(node)
+ put_pwq_unlocked(ctx->pwq_tbl[node]);
+ put_pwq_unlocked(ctx->dfl_pwq);
+
+ free_workqueue_attrs(ctx->attrs);
+ }
+
+ kfree(ctx);
+}
+
+static struct wq_unbound_install_ctx *
+wq_unbound_install_ctx_prepare(struct workqueue_struct *wq,
+ const struct workqueue_attrs *attrs)
+{
+ struct wq_unbound_install_ctx *ctx;
struct workqueue_attrs *new_attrs, *tmp_attrs;
- struct pool_workqueue **pwq_tbl, *dfl_pwq;
- int node, ret;
+ int node;

- /* only unbound workqueues can change attributes */
- if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
- return -EINVAL;
+ lockdep_assert_held(&wq_pool_mutex);

- /* creating multiple pwqs breaks ordering guarantee */
- if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))
- return -EINVAL;
+ ctx = kzalloc(sizeof(*ctx) + nr_node_ids * sizeof(ctx->pwq_tbl[0]),
+ GFP_KERNEL);

- pwq_tbl = kzalloc(nr_node_ids * sizeof(pwq_tbl[0]), GFP_KERNEL);
new_attrs = alloc_workqueue_attrs(GFP_KERNEL);
tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL);
- if (!pwq_tbl || !new_attrs || !tmp_attrs)
- goto enomem;
+ if (!ctx || !new_attrs || !tmp_attrs)
+ goto out_free;

/* make a copy of @attrs and sanitize it */
copy_workqueue_attrs(new_attrs, attrs);
@@ -3546,75 +3539,113 @@ int apply_workqueue_attrs(struct workqueue_struct *wq,
copy_workqueue_attrs(tmp_attrs, new_attrs);

/*
- * CPUs should stay stable across pwq creations and installations.
- * Pin CPUs, determine the target cpumask for each node and create
- * pwqs accordingly.
- */
- get_online_cpus();
-
- mutex_lock(&wq_pool_mutex);
-
- /*
* If something goes wrong during CPU up/down, we'll fall back to
* the default pwq covering whole @attrs->cpumask. Always create
* it even if we don't use it immediately.
*/
- dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
- if (!dfl_pwq)
- goto enomem_pwq;
+ ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
+ if (!ctx->dfl_pwq)
+ goto out_free;

for_each_node(node) {
if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) {
- pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
- if (!pwq_tbl[node])
- goto enomem_pwq;
+ ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
+ if (!ctx->pwq_tbl[node])
+ goto out_free;
} else {
- dfl_pwq->refcnt++;
- pwq_tbl[node] = dfl_pwq;
+ ctx->dfl_pwq->refcnt++;
+ ctx->pwq_tbl[node] = ctx->dfl_pwq;
}
}

- mutex_unlock(&wq_pool_mutex);
+ ctx->wq = wq;
+ ctx->attrs = new_attrs;
+
+out_free:
+ free_workqueue_attrs(tmp_attrs);
+
+ if (!ctx || !ctx->wq) {
+ kfree(new_attrs);
+ wq_unbound_install_ctx_free(ctx);
+ return NULL;
+ } else {
+ return ctx;
+ }
+}
+
+static void wq_unbound_install_ctx_commit(struct wq_unbound_install_ctx *ctx)
+{
+ int node;

/* all pwqs have been created successfully, let's install'em */
- mutex_lock(&wq->mutex);
+ mutex_lock(&ctx->wq->mutex);

- copy_workqueue_attrs(wq->unbound_attrs, new_attrs);
+ copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);

/* save the previous pwq and install the new one */
for_each_node(node)
- pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]);
+ ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node,
+ ctx->pwq_tbl[node]);

/* @dfl_pwq might not have been used, ensure it's linked */
- link_pwq(dfl_pwq);
- swap(wq->dfl_pwq, dfl_pwq);
+ link_pwq(ctx->dfl_pwq);
+ swap(ctx->wq->dfl_pwq, ctx->dfl_pwq);

- mutex_unlock(&wq->mutex);
+ mutex_unlock(&ctx->wq->mutex);
+}

- /* put the old pwqs */
- for_each_node(node)
- put_pwq_unlocked(pwq_tbl[node]);
- put_pwq_unlocked(dfl_pwq);
+/**
+ * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
+ * @wq: the target workqueue
+ * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
+ *
+ * Apply @attrs to an unbound workqueue @wq. Unless disabled, on NUMA
+ * machines, this function maps a separate pwq to each NUMA node with
+ * possibles CPUs in @attrs->cpumask so that work items are affine to the
+ * NUMA node it was issued on. Older pwqs are released as in-flight work
+ * items finish. Note that a work item which repeatedly requeues itself
+ * back-to-back will stay on its current pwq.
+ *
+ * Performs GFP_KERNEL allocations.
+ *
+ * Return: 0 on success and -errno on failure.
+ */
+int apply_workqueue_attrs(struct workqueue_struct *wq,
+ const struct workqueue_attrs *attrs)
+{
+ struct wq_unbound_install_ctx *ctx;
+ int ret = -ENOMEM;

- put_online_cpus();
- ret = 0;
- /* fall through */
-out_free:
- free_workqueue_attrs(tmp_attrs);
- free_workqueue_attrs(new_attrs);
- kfree(pwq_tbl);
- return ret;
+ /* only unbound workqueues can change attributes */
+ if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
+ return -EINVAL;

-enomem_pwq:
- free_unbound_pwq(dfl_pwq);
- for_each_node(node)
- if (pwq_tbl && pwq_tbl[node] != dfl_pwq)
- free_unbound_pwq(pwq_tbl[node]);
+ /* creating multiple pwqs breaks ordering guarantee */
+ if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))
+ return -EINVAL;
+
+ /*
+ * CPUs should stay stable across pwq creations and installations.
+ * Pin CPUs, determine the target cpumask for each node and create
+ * pwqs accordingly.
+ */
+ get_online_cpus();
+
+ mutex_lock(&wq_pool_mutex);
+ ctx = wq_unbound_install_ctx_prepare(wq, attrs);
mutex_unlock(&wq_pool_mutex);
+
put_online_cpus();
-enomem:
- ret = -ENOMEM;
- goto out_free;
+
+ /* the ctx has been prepared successfully, let's commit it */
+ if (ctx) {
+ wq_unbound_install_ctx_commit(ctx);
+ ret = 0;
+ }
+
+ wq_unbound_install_ctx_free(ctx);
+
+ return ret;
}

/**
--
2.1.0

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/