Re: [PATCH] softirq: fix memory corruption when freeing tasklet_struct
From: Allen
Date: Mon Jan 29 2024 - 12:01:10 EST
> The following is a draft patch which implements atomic workqueues and
> convert dm-crypt to use it instead of tasklet. It's an early draft and very
> lightly tested but seems to work more or less. It's on top of wq/for6.9 + a
> pending patchset. The following git branch can be used for testing.
>
> git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git wq-atomic-draft
>
> I'll go over it to make sure all the pieces work. While it adds some
> complications, it doesn't seem too bad and conversion from tasklet should be
> straightforward too.
>
> - It hooks into tasklet[_hi] for now but if we get to update all of tasklet
> users, we can just repurpose the tasklet softirq slots directly.
>
> - I thought about allowing busy-waits for flushes and cancels but it didn't
> seem necessary. Keeping them blocking has the benefit of avoiding possible
> nasty deadlocks. We can revisit if there's need.
>
> - Compared to tasklet, each work item goes through a bit more management
> code because I wanted to keep the code as unified as possible to regular
> threaded workqueues. That said, it's not a huge amount and my bet is that
> the difference is unlikely to be noticeable.
>
> Thanks.
>
> From 8224d2602ef454ca164f4added765dc4dddd5e16 Mon Sep 17 00:00:00 2001
> From: Tejun Heo <tj@xxxxxxxxxx>
> Date: Fri, 26 Jan 2024 13:21:42 -1000
> Subject: [PATCH] workqueue: DRAFT: Implement atomic workqueue and convert
> dmcrypt to use it
>
> ---
> drivers/md/dm-crypt.c | 36 +-----
> include/linux/workqueue.h | 6 +
> kernel/workqueue.c | 234 +++++++++++++++++++++++++++---------
> kernel/workqueue_internal.h | 3 +
> 4 files changed, 186 insertions(+), 93 deletions(-)
>
> diff --git a/drivers/md/dm-crypt.c b/drivers/md/dm-crypt.c
> index 855b482cbff1..d375285db202 100644
> --- a/drivers/md/dm-crypt.c
> +++ b/drivers/md/dm-crypt.c
> @@ -73,11 +73,8 @@ struct dm_crypt_io {
> struct bio *base_bio;
> u8 *integrity_metadata;
> bool integrity_metadata_from_pool:1;
> - bool in_tasklet:1;
>
> struct work_struct work;
> - struct tasklet_struct tasklet;
> -
> struct convert_context ctx;
>
> atomic_t io_pending;
> @@ -1762,7 +1759,6 @@ static void crypt_io_init(struct dm_crypt_io *io, struct crypt_config *cc,
> io->ctx.r.req = NULL;
> io->integrity_metadata = NULL;
> io->integrity_metadata_from_pool = false;
> - io->in_tasklet = false;
> atomic_set(&io->io_pending, 0);
> }
>
> @@ -1771,13 +1767,6 @@ static void crypt_inc_pending(struct dm_crypt_io *io)
> atomic_inc(&io->io_pending);
> }
>
> -static void kcryptd_io_bio_endio(struct work_struct *work)
> -{
> - struct dm_crypt_io *io = container_of(work, struct dm_crypt_io, work);
> -
> - bio_endio(io->base_bio);
> -}
> -
> /*
> * One of the bios was finished. Check for completion of
> * the whole request and correctly clean up the buffer.
> @@ -1800,21 +1789,6 @@ static void crypt_dec_pending(struct dm_crypt_io *io)
> kfree(io->integrity_metadata);
>
> base_bio->bi_status = error;
> -
> - /*
> - * If we are running this function from our tasklet,
> - * we can't call bio_endio() here, because it will call
> - * clone_endio() from dm.c, which in turn will
> - * free the current struct dm_crypt_io structure with
> - * our tasklet. In this case we need to delay bio_endio()
> - * execution to after the tasklet is done and dequeued.
> - */
> - if (io->in_tasklet) {
> - INIT_WORK(&io->work, kcryptd_io_bio_endio);
> - queue_work(cc->io_queue, &io->work);
> - return;
> - }
> -
> bio_endio(base_bio);
> }
>
> @@ -2246,11 +2220,6 @@ static void kcryptd_crypt(struct work_struct *work)
> kcryptd_crypt_write_convert(io);
> }
>
> -static void kcryptd_crypt_tasklet(unsigned long work)
> -{
> - kcryptd_crypt((struct work_struct *)work);
> -}
> -
> static void kcryptd_queue_crypt(struct dm_crypt_io *io)
> {
> struct crypt_config *cc = io->cc;
> @@ -2263,9 +2232,8 @@ static void kcryptd_queue_crypt(struct dm_crypt_io *io)
> * it is being executed with irqs disabled.
> */
> if (in_hardirq() || irqs_disabled()) {
> - io->in_tasklet = true;
> - tasklet_init(&io->tasklet, kcryptd_crypt_tasklet, (unsigned long)&io->work);
> - tasklet_schedule(&io->tasklet);
> + INIT_WORK(&io->work, kcryptd_crypt);
> + queue_work(system_atomic_wq, &io->work);
> return;
> }
>
> diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
> index 232baea90a1d..1e4938b5b176 100644
> --- a/include/linux/workqueue.h
> +++ b/include/linux/workqueue.h
> @@ -353,6 +353,7 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; }
> * Documentation/core-api/workqueue.rst.
> */
> enum wq_flags {
> + WQ_ATOMIC = 1 << 0, /* execute in softirq context */
> WQ_UNBOUND = 1 << 1, /* not bound to any cpu */
> WQ_FREEZABLE = 1 << 2, /* freeze during suspend */
> WQ_MEM_RECLAIM = 1 << 3, /* may be used for memory reclaim */
> @@ -392,6 +393,9 @@ enum wq_flags {
> __WQ_ORDERED = 1 << 17, /* internal: workqueue is ordered */
> __WQ_LEGACY = 1 << 18, /* internal: create*_workqueue() */
> __WQ_ORDERED_EXPLICIT = 1 << 19, /* internal: alloc_ordered_workqueue() */
> +
> + /* atomic wq only allows the following flags */
> + __WQ_ATOMIC_ALLOWS = WQ_ATOMIC | WQ_HIGHPRI,
> };
>
> enum wq_consts {
> @@ -442,6 +446,8 @@ extern struct workqueue_struct *system_unbound_wq;
> extern struct workqueue_struct *system_freezable_wq;
> extern struct workqueue_struct *system_power_efficient_wq;
> extern struct workqueue_struct *system_freezable_power_efficient_wq;
> +extern struct workqueue_struct *system_atomic_wq;
> +extern struct workqueue_struct *system_atomic_highpri_wq;
>
> /**
> * alloc_workqueue - allocate a workqueue
> diff --git a/kernel/workqueue.c b/kernel/workqueue.c
> index 23740c9ed57a..2a8f21494676 100644
> --- a/kernel/workqueue.c
> +++ b/kernel/workqueue.c
> @@ -73,7 +73,8 @@ enum worker_pool_flags {
> * wq_pool_attach_mutex to avoid changing binding state while
> * worker_attach_to_pool() is in progress.
> */
> - POOL_MANAGER_ACTIVE = 1 << 0, /* being managed */
> + POOL_ATOMIC = 1 << 0, /* is an atomic pool */
> + POOL_MANAGER_ACTIVE = 1 << 1, /* being managed */
> POOL_DISASSOCIATED = 1 << 2, /* cpu can't serve workers */
> };
>
> @@ -115,6 +116,14 @@ enum wq_internal_consts {
> WQ_NAME_LEN = 32,
> };
>
> +/*
> + * We don't want to trap softirq for too long. See MAX_SOFTIRQ_TIME and
> + * MAX_SOFTIRQ_RESTART in kernel/softirq.c. These are macros because
> + * msecs_to_jiffies() can't be an initializer.
> + */
> +#define ATOMIC_WORKER_JIFFIES msecs_to_jiffies(2)
> +#define ATOMIC_WORKER_RESTARTS 10
> +
> /*
> * Structure fields follow one of the following exclusion rules.
> *
> @@ -441,8 +450,13 @@ static bool wq_debug_force_rr_cpu = false;
> #endif
> module_param_named(debug_force_rr_cpu, wq_debug_force_rr_cpu, bool, 0644);
>
> +/* the atomic worker pools */
> +static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
> + atomic_worker_pools);
> +
> /* the per-cpu worker pools */
> -static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);
> +static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
> + cpu_worker_pools);
>
> static DEFINE_IDR(worker_pool_idr); /* PR: idr of all pools */
>
> @@ -476,8 +490,13 @@ struct workqueue_struct *system_power_efficient_wq __ro_after_init;
> EXPORT_SYMBOL_GPL(system_power_efficient_wq);
> struct workqueue_struct *system_freezable_power_efficient_wq __ro_after_init;
> EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq);
> +struct workqueue_struct *system_atomic_wq;
> +EXPORT_SYMBOL_GPL(system_atomic_wq);
> +struct workqueue_struct *system_atomic_highpri_wq;
> +EXPORT_SYMBOL_GPL(system_atomic_highpri_wq);
>
> static int worker_thread(void *__worker);
> +static void atomic_worker_taskletfn(struct tasklet_struct *tasklet);
> static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
> static void show_pwq(struct pool_workqueue *pwq);
> static void show_one_worker_pool(struct worker_pool *pool);
> @@ -496,6 +515,11 @@ static void show_one_worker_pool(struct worker_pool *pool);
> !lockdep_is_held(&wq_pool_mutex), \
> "RCU, wq->mutex or wq_pool_mutex should be held")
>
> +#define for_each_atomic_worker_pool(pool, cpu) \
> + for ((pool) = &per_cpu(atomic_worker_pools, cpu)[0]; \
> + (pool) < &per_cpu(atomic_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
> + (pool)++)
> +
> #define for_each_cpu_worker_pool(pool, cpu) \
> for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0]; \
> (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
> @@ -1184,6 +1208,14 @@ static bool kick_pool(struct worker_pool *pool)
> if (!need_more_worker(pool) || !worker)
> return false;
>
> + if (pool->flags & POOL_ATOMIC) {
> + if (pool->attrs->nice == HIGHPRI_NICE_LEVEL)
> + tasklet_hi_schedule(&worker->atomic_tasklet);
> + else
> + tasklet_schedule(&worker->atomic_tasklet);
> + return true;
> + }
> +
> p = worker->task;
Tejun,
I rushed to reply to the draft patch you sent, I should have
looked harder. My apologies.
The idea that I have been working on is to completely move away from
using tasklets.
Essentially, "get rid of tasklets entirely in the kernel". So, the use
of tasklet_schedule() & tasklet_hi_schedule()
will have to go.
I have a very hacky draft that is still wip. I am going to borrow
many bits from your patch which makes
the work I have better.
Perhaps we should start a separate thread, thoughts?
Thanks.
>
> #ifdef CONFIG_SMP
> @@ -1663,8 +1695,15 @@ static bool pwq_tryinc_nr_active(struct pool_workqueue *pwq, bool fill)
> lockdep_assert_held(&pool->lock);
>
> if (!nna) {
> - /* per-cpu workqueue, pwq->nr_active is sufficient */
> - obtained = pwq->nr_active < READ_ONCE(wq->max_active);
> + /*
> + * An atomic workqueue always have a single worker per-cpu and
> + * doesn't impose additional max_active limit. For a per-cpu
> + * workqueue, checking pwq->nr_active is sufficient.
> + */
> + if (wq->flags & WQ_ATOMIC)
> + obtained = true;
> + else
> + obtained = pwq->nr_active < READ_ONCE(wq->max_active);
> goto out;
> }
>
> @@ -2591,27 +2630,31 @@ static struct worker *create_worker(struct worker_pool *pool)
>
> worker->id = id;
>
> - if (pool->cpu >= 0)
> - snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
> - pool->attrs->nice < 0 ? "H" : "");
> - else
> - snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
> -
> - worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
> - "kworker/%s", id_buf);
> - if (IS_ERR(worker->task)) {
> - if (PTR_ERR(worker->task) == -EINTR) {
> - pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
> - id_buf);
> - } else {
> - pr_err_once("workqueue: Failed to create a worker thread: %pe",
> - worker->task);
> + if (pool->flags & POOL_ATOMIC) {
> + tasklet_setup(&worker->atomic_tasklet, atomic_worker_taskletfn);
> + } else {
> + if (pool->cpu >= 0)
> + snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
> + pool->attrs->nice < 0 ? "H" : "");
> + else
> + snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
> +
> + worker->task = kthread_create_on_node(worker_thread, worker,
> + pool->node, "kworker/%s", id_buf);
> + if (IS_ERR(worker->task)) {
> + if (PTR_ERR(worker->task) == -EINTR) {
> + pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
> + id_buf);
> + } else {
> + pr_err_once("workqueue: Failed to create a worker thread: %pe",
> + worker->task);
> + }
> + goto fail;
> }
> - goto fail;
> - }
>
> - set_user_nice(worker->task, pool->attrs->nice);
> - kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
> + set_user_nice(worker->task, pool->attrs->nice);
> + kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
> + }
>
> /* successful, attach the worker to the pool */
> worker_attach_to_pool(worker, pool);
> @@ -2627,7 +2670,8 @@ static struct worker *create_worker(struct worker_pool *pool)
> * check if not woken up soon. As kick_pool() is noop if @pool is empty,
> * wake it up explicitly.
> */
> - wake_up_process(worker->task);
> + if (worker->task)
> + wake_up_process(worker->task);
>
> raw_spin_unlock_irq(&pool->lock);
>
> @@ -3043,25 +3087,35 @@ __acquires(&pool->lock)
> lock_map_release(&lockdep_map);
> lock_map_release(&pwq->wq->lockdep_map);
>
> - if (unlikely(in_atomic() || lockdep_depth(current) > 0 ||
> - rcu_preempt_depth() > 0)) {
> - pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d/%d\n"
> - " last function: %ps\n",
> - current->comm, preempt_count(), rcu_preempt_depth(),
> - task_pid_nr(current), worker->current_func);
> - debug_show_held_locks(current);
> - dump_stack();
> - }
> + if (worker->task) {
> + if (unlikely(in_atomic() || lockdep_depth(current) > 0 ||
> + rcu_preempt_depth() > 0)) {
> + pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d/%d\n"
> + " last function: %ps\n",
> + current->comm, preempt_count(),
> + rcu_preempt_depth(), task_pid_nr(current),
> + worker->current_func);
> + debug_show_held_locks(current);
> + dump_stack();
> + }
>
> - /*
> - * The following prevents a kworker from hogging CPU on !PREEMPTION
> - * kernels, where a requeueing work item waiting for something to
> - * happen could deadlock with stop_machine as such work item could
> - * indefinitely requeue itself while all other CPUs are trapped in
> - * stop_machine. At the same time, report a quiescent RCU state so
> - * the same condition doesn't freeze RCU.
> - */
> - cond_resched();
> + /*
> + * The following prevents a kworker from hogging CPU on
> + * !PREEMPTION kernels, where a requeueing work item waiting for
> + * something to happen could deadlock with stop_machine as such
> + * work item could indefinitely requeue itself while all other
> + * CPUs are trapped in stop_machine. At the same time, report a
> + * quiescent RCU state so the same condition doesn't freeze RCU.
> + */
> + if (worker->task)
> + cond_resched();
> + } else {
> + if (unlikely(lockdep_depth(current) > 0)) {
> + pr_err("BUG: atomic workqueue leaked lock: last function: %ps\n",
> + worker->current_func);
> + debug_show_held_locks(current);
> + }
> + }
>
> raw_spin_lock_irq(&pool->lock);
>
> @@ -3344,6 +3398,44 @@ static int rescuer_thread(void *__rescuer)
> goto repeat;
> }
>
> +void atomic_worker_taskletfn(struct tasklet_struct *tasklet)
> +{
> + struct worker *worker =
> + container_of(tasklet, struct worker, atomic_tasklet);
> + struct worker_pool *pool = worker->pool;
> + int nr_restarts = ATOMIC_WORKER_RESTARTS;
> + unsigned long end = jiffies + ATOMIC_WORKER_JIFFIES;
> +
> + raw_spin_lock_irq(&pool->lock);
> + worker_leave_idle(worker);
> +
> + /*
> + * This function follows the structure of worker_thread(). See there for
> + * explanations on each step.
> + */
> + if (need_more_worker(pool))
> + goto done;
> +
> + WARN_ON_ONCE(!list_empty(&worker->scheduled));
> + worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
> +
> + do {
> + struct work_struct *work =
> + list_first_entry(&pool->worklist,
> + struct work_struct, entry);
> +
> + if (assign_work(work, worker, NULL))
> + process_scheduled_works(worker);
> + } while (--nr_restarts && time_before(jiffies, end) &&
> + keep_working(pool));
> +
> + worker_set_flags(worker, WORKER_PREP);
> +done:
> + worker_enter_idle(worker);
> + kick_pool(pool);
> + raw_spin_unlock_irq(&pool->lock);
> +}
> +
> /**
> * check_flush_dependency - check for flush dependency sanity
> * @target_wq: workqueue being flushed
> @@ -5149,6 +5241,13 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
> size_t wq_size;
> int name_len;
>
> + if (flags & WQ_ATOMIC) {
> + if (WARN_ON_ONCE(flags & ~__WQ_ATOMIC_ALLOWS))
> + return NULL;
> + if (WARN_ON_ONCE(max_active))
> + return NULL;
> + }
> +
> /*
> * Unbound && max_active == 1 used to imply ordered, which is no longer
> * the case on many machines due to per-pod pools. While
> @@ -7094,6 +7193,22 @@ static void __init restrict_unbound_cpumask(const char *name, const struct cpuma
> cpumask_and(wq_unbound_cpumask, wq_unbound_cpumask, mask);
> }
>
> +static void __init init_cpu_worker_pool(struct worker_pool *pool, int cpu, int nice)
> +{
> + BUG_ON(init_worker_pool(pool));
> + pool->cpu = cpu;
> + cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
> + cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
> + pool->attrs->nice = nice;
> + pool->attrs->affn_strict = true;
> + pool->node = cpu_to_node(cpu);
> +
> + /* alloc pool ID */
> + mutex_lock(&wq_pool_mutex);
> + BUG_ON(worker_pool_assign_id(pool));
> + mutex_unlock(&wq_pool_mutex);
> +}
> +
> /**
> * workqueue_init_early - early init for workqueue subsystem
> *
> @@ -7149,25 +7264,19 @@ void __init workqueue_init_early(void)
> pt->pod_node[0] = NUMA_NO_NODE;
> pt->cpu_pod[0] = 0;
>
> - /* initialize CPU pools */
> + /* initialize atomic and CPU pools */
> for_each_possible_cpu(cpu) {
> struct worker_pool *pool;
>
> i = 0;
> - for_each_cpu_worker_pool(pool, cpu) {
> - BUG_ON(init_worker_pool(pool));
> - pool->cpu = cpu;
> - cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
> - cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
> - pool->attrs->nice = std_nice[i++];
> - pool->attrs->affn_strict = true;
> - pool->node = cpu_to_node(cpu);
> -
> - /* alloc pool ID */
> - mutex_lock(&wq_pool_mutex);
> - BUG_ON(worker_pool_assign_id(pool));
> - mutex_unlock(&wq_pool_mutex);
> + for_each_atomic_worker_pool(pool, cpu) {
> + init_cpu_worker_pool(pool, cpu, std_nice[i++]);
> + pool->flags |= POOL_ATOMIC;
> }
> +
> + i = 0;
> + for_each_cpu_worker_pool(pool, cpu)
> + init_cpu_worker_pool(pool, cpu, std_nice[i++]);
> }
>
> /* create default unbound and ordered wq attrs */
> @@ -7200,10 +7309,14 @@ void __init workqueue_init_early(void)
> system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_pwr_efficient",
> WQ_FREEZABLE | WQ_POWER_EFFICIENT,
> 0);
> + system_atomic_wq = alloc_workqueue("system_atomic_wq", WQ_ATOMIC, 0);
> + system_atomic_highpri_wq = alloc_workqueue("system_atomic_highpri_wq",
> + WQ_ATOMIC | WQ_HIGHPRI, 0);
> BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
> !system_unbound_wq || !system_freezable_wq ||
> !system_power_efficient_wq ||
> - !system_freezable_power_efficient_wq);
> + !system_freezable_power_efficient_wq ||
> + !system_atomic_wq || !system_atomic_highpri_wq);
> }
>
> static void __init wq_cpu_intensive_thresh_init(void)
> @@ -7269,9 +7382,10 @@ void __init workqueue_init(void)
> * up. Also, create a rescuer for workqueues that requested it.
> */
> for_each_possible_cpu(cpu) {
> - for_each_cpu_worker_pool(pool, cpu) {
> + for_each_atomic_worker_pool(pool, cpu)
> + pool->node = cpu_to_node(cpu);
> + for_each_cpu_worker_pool(pool, cpu)
> pool->node = cpu_to_node(cpu);
> - }
> }
>
> list_for_each_entry(wq, &workqueues, list) {
> @@ -7284,6 +7398,8 @@ void __init workqueue_init(void)
>
> /* create the initial workers */
> for_each_online_cpu(cpu) {
> + for_each_atomic_worker_pool(pool, cpu)
> + BUG_ON(!create_worker(pool));
> for_each_cpu_worker_pool(pool, cpu) {
> pool->flags &= ~POOL_DISASSOCIATED;
> BUG_ON(!create_worker(pool));
> diff --git a/kernel/workqueue_internal.h b/kernel/workqueue_internal.h
> index f6275944ada7..f65f204f38ea 100644
> --- a/kernel/workqueue_internal.h
> +++ b/kernel/workqueue_internal.h
> @@ -10,6 +10,7 @@
>
> #include <linux/workqueue.h>
> #include <linux/kthread.h>
> +#include <linux/interrupt.h>
> #include <linux/preempt.h>
>
> struct worker_pool;
> @@ -42,6 +43,8 @@ struct worker {
> struct list_head scheduled; /* L: scheduled works */
>
> struct task_struct *task; /* I: worker task */
> + struct tasklet_struct atomic_tasklet; /* I: tasklet for atomic pool */
> +
> struct worker_pool *pool; /* A: the associated pool */
> /* L: for rescuers */
> struct list_head node; /* A: anchored at pool->workers */
> --
> 2.43.0
>
>