[PATCH 18/21] aio: Allow cancellation without a cancel callback, new kiocb lookup
From: Kent Overstreet
Date: Mon May 13 2013 - 21:19:47 EST
This patch does a couple things:
* Allows cancellation of any kiocb, even if the driver doesn't
implement a ki_cancel callback function. This will be used for block
layer cancellation - there, implementing a callback is problematic,
but we can implement useful cancellation by just checking if the
kicob has been marked as cancelled when it goes to dequeue the
request.
* Implements a new lookup mechanism for cancellation.
Previously, to cancel a kiocb we had to look it up in a linked list,
and kiocbs were added to the linked list lazily. But if any kiocb is
cancellable, the lazy list adding no longer works, so we need a new
mechanism.
This is done by allocating kiocbs out of a (lazily allocated) array
of pages, which means we can refer to the kiocbs (and iterate over
them) with small integers - we use the percpu tag allocation code for
allocating individual kiocbs.
Signed-off-by: Kent Overstreet <koverstreet@xxxxxxxxxx>
Cc: Zach Brown <zab@xxxxxxxxxx>
Cc: Felipe Balbi <balbi@xxxxxx>
Cc: Greg Kroah-Hartman <gregkh@xxxxxxxxxxxxxxxxxxx>
Cc: Mark Fasheh <mfasheh@xxxxxxxx>
Cc: Joel Becker <jlbec@xxxxxxxxxxxx>
Cc: Rusty Russell <rusty@xxxxxxxxxxxxxxx>
Cc: Jens Axboe <axboe@xxxxxxxxx>
Cc: Asai Thambi S P <asamymuthupa@xxxxxxxxxx>
Cc: Selvan Mani <smani@xxxxxxxxxx>
Cc: Sam Bradshaw <sbradshaw@xxxxxxxxxx>
Cc: Jeff Moyer <jmoyer@xxxxxxxxxx>
Cc: Al Viro <viro@xxxxxxxxxxxxxxxxxx>
Cc: Benjamin LaHaise <bcrl@xxxxxxxxx>
---
fs/aio.c | 207 +++++++++++++++++++++++++++++++++-------------------
include/linux/aio.h | 92 ++++++++++++++++-------
2 files changed, 197 insertions(+), 102 deletions(-)
diff --git a/fs/aio.c b/fs/aio.c
index aa39194..f4ea8d5 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -39,6 +39,7 @@
#include <linux/compat.h>
#include <linux/percpu-refcount.h>
#include <linux/radix-tree.h>
+#include <linux/tags.h>
#include <asm/kmap_types.h>
#include <asm/uaccess.h>
@@ -74,6 +75,9 @@ struct kioctx {
struct __percpu kioctx_cpu *cpu;
+ struct tag_pool kiocb_tags;
+ struct page **kiocb_pages;
+
/*
* For percpu reqs_available, number of slots we move to/from global
* counter at a time:
@@ -113,11 +117,6 @@ struct kioctx {
} ____cacheline_aligned_in_smp;
struct {
- spinlock_t ctx_lock;
- struct list_head active_reqs; /* used for cancellation */
- } ____cacheline_aligned_in_smp;
-
- struct {
struct mutex ring_lock;
wait_queue_head_t wait;
} ____cacheline_aligned_in_smp;
@@ -136,16 +135,25 @@ unsigned long aio_nr; /* current system wide number of aio requests */
unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio requests */
/*----end sysctl variables---*/
-static struct kmem_cache *kiocb_cachep;
static struct kmem_cache *kioctx_cachep;
+#define KIOCBS_PER_PAGE (PAGE_SIZE / sizeof(struct kiocb))
+
+static inline struct kiocb *kiocb_from_id(struct kioctx *ctx, unsigned id)
+{
+ struct page *p = ctx->kiocb_pages[id / KIOCBS_PER_PAGE];
+
+ return p
+ ? ((struct kiocb *) page_address(p)) + (id % KIOCBS_PER_PAGE)
+ : NULL;
+}
+
/* aio_setup
* Creates the slab caches used by the aio routines, panic on
* failure as this is done early during the boot sequence.
*/
static int __init aio_setup(void)
{
- kiocb_cachep = KMEM_CACHE(kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC);
kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC);
pr_debug("sizeof(struct page) = %zu\n", sizeof(struct page));
@@ -245,45 +253,58 @@ static int aio_setup_ring(struct kioctx *ctx)
void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel)
{
- struct kioctx *ctx = req->ki_ctx;
- unsigned long flags;
-
- spin_lock_irqsave(&ctx->ctx_lock, flags);
+ kiocb_cancel_fn *p, *old = req->ki_cancel;
- if (!req->ki_list.next)
- list_add(&req->ki_list, &ctx->active_reqs);
-
- req->ki_cancel = cancel;
+ do {
+ if (old == KIOCB_CANCELLED) {
+ cancel(req);
+ return;
+ }
- spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+ p = old;
+ old = cmpxchg(&req->ki_cancel, old, cancel);
+ } while (old != p);
}
EXPORT_SYMBOL(kiocb_set_cancel_fn);
-static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb)
+static void kiocb_cancel(struct kioctx *ctx, struct kiocb *req)
{
- kiocb_cancel_fn *old, *cancel;
+ kiocb_cancel_fn *old, *new, *cancel = req->ki_cancel;
- /*
- * Don't want to set kiocb->ki_cancel = KIOCB_CANCELLED unless it
- * actually has a cancel function, hence the cmpxchg()
- */
+ local_irq_disable();
- cancel = ACCESS_ONCE(kiocb->ki_cancel);
do {
- if (!cancel || cancel == KIOCB_CANCELLED)
- return -EINVAL;
+ if (cancel == KIOCB_CANCELLING ||
+ cancel == KIOCB_CANCELLED)
+ goto out;
old = cancel;
- cancel = cmpxchg(&kiocb->ki_cancel, old, KIOCB_CANCELLED);
- } while (cancel != old);
+ new = cancel ? KIOCB_CANCELLING : KIOCB_CANCELLED;
+
+ cancel = cmpxchg(&req->ki_cancel, old, KIOCB_CANCELLING);
+ } while (old != cancel);
- return cancel(kiocb);
+ if (cancel) {
+ cancel(req);
+ smp_wmb();
+ req->ki_cancel = KIOCB_CANCELLED;
+ }
+out:
+ local_irq_enable();
}
static void free_ioctx_rcu(struct rcu_head *head)
{
struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
+ unsigned i;
+
+ for (i = 0; i < DIV_ROUND_UP(ctx->nr_events, KIOCBS_PER_PAGE); i++)
+ if (ctx->kiocb_pages[i])
+ __free_page(ctx->kiocb_pages[i]);
+ kfree(ctx->kiocb_pages);
+
+ tag_pool_free(&ctx->kiocb_tags);
free_percpu(ctx->cpu);
kmem_cache_free(kioctx_cachep, ctx);
}
@@ -296,21 +317,16 @@ static void free_ioctx_rcu(struct rcu_head *head)
static void free_ioctx(struct kioctx *ctx)
{
struct aio_ring *ring;
- struct kiocb *req;
- unsigned cpu, avail;
+ unsigned i, cpu, avail;
DEFINE_WAIT(wait);
- spin_lock_irq(&ctx->ctx_lock);
+ for (i = 0; i < ctx->nr_events; i++) {
+ struct kiocb *req = kiocb_from_id(ctx, i);
- while (!list_empty(&ctx->active_reqs)) {
- req = list_first_entry(&ctx->active_reqs,
- struct kiocb, ki_list);
-
- list_del_init(&req->ki_list);
- kiocb_cancel(ctx, req);
+ if (req)
+ kiocb_cancel(ctx, req);
}
- spin_unlock_irq(&ctx->ctx_lock);
for_each_possible_cpu(cpu) {
struct kioctx_cpu *kcpu = per_cpu_ptr(ctx->cpu, cpu);
@@ -409,13 +425,10 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
percpu_ref_get(&ctx->users);
rcu_read_unlock();
- spin_lock_init(&ctx->ctx_lock);
spin_lock_init(&ctx->completion_lock);
mutex_init(&ctx->ring_lock);
init_waitqueue_head(&ctx->wait);
- INIT_LIST_HEAD(&ctx->active_reqs);
-
ctx->cpu = alloc_percpu(struct kioctx_cpu);
if (!ctx->cpu)
goto out_freeref;
@@ -427,6 +440,15 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
ctx->req_batch = (ctx->nr_events - 1) / (num_possible_cpus() * 4);
BUG_ON(!ctx->req_batch);
+ if (tag_pool_init(&ctx->kiocb_tags, ctx->nr_events))
+ goto out_freering;
+
+ ctx->kiocb_pages =
+ kzalloc(DIV_ROUND_UP(ctx->nr_events, KIOCBS_PER_PAGE) *
+ sizeof(struct page *), GFP_KERNEL);
+ if (!ctx->kiocb_pages)
+ goto out_freetags;
+
/* limit the number of system wide aios */
spin_lock(&aio_nr_lock);
if (aio_nr + nr_events > aio_max_nr ||
@@ -456,6 +478,10 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
out_cleanup:
err = -EAGAIN;
+ kfree(ctx->kiocb_pages);
+out_freetags:
+ tag_pool_free(&ctx->kiocb_tags);
+out_freering:
aio_free_ring(ctx);
out_freepcpu:
free_percpu(ctx->cpu);
@@ -619,17 +645,46 @@ out:
static inline struct kiocb *aio_get_req(struct kioctx *ctx)
{
struct kiocb *req;
+ unsigned id;
if (!get_reqs_available(ctx))
return NULL;
- req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
- if (unlikely(!req))
- goto out_put;
+ id = tag_alloc(&ctx->kiocb_tags, false);
+ if (!id)
+ goto err;
+
+ req = kiocb_from_id(ctx, id);
+ if (!req) {
+ unsigned i, page_nr = id / KIOCBS_PER_PAGE;
+ struct page *p = alloc_page(GFP_KERNEL);
+ if (!p)
+ goto err;
+ req = page_address(p);
+
+ for (i = 0; i < KIOCBS_PER_PAGE; i++) {
+ req[i].ki_cancel = KIOCB_CANCELLED;
+ req[i].ki_id = page_nr * KIOCBS_PER_PAGE + i;
+ }
+
+ smp_wmb();
+
+ if (cmpxchg(&ctx->kiocb_pages[page_nr], NULL, p) != NULL)
+ __free_page(p);
+ }
+
+ req = kiocb_from_id(ctx, id);
+
+ /*
+ * Can't set ki_cancel to NULL until we're ready for it to be
+ * cancellable - leave it as KIOCB_CANCELLED until then
+ */
+ memset(req, 0, offsetof(struct kiocb, ki_cancel));
req->ki_ctx = ctx;
+
return req;
-out_put:
+err:
put_reqs_available(ctx, 1);
return NULL;
}
@@ -640,7 +695,7 @@ static void kiocb_free(struct kiocb *req)
fput(req->ki_filp);
if (req->ki_eventfd != NULL)
eventfd_ctx_put(req->ki_eventfd);
- kmem_cache_free(kiocb_cachep, req);
+ tag_free(&req->ki_ctx->kiocb_tags, req->ki_id);
}
static struct kioctx *lookup_ioctx(unsigned long ctx_id)
@@ -770,17 +825,21 @@ EXPORT_SYMBOL(batch_complete_aio);
void aio_complete_batch(struct kiocb *req, long res, long res2,
struct batch_complete *batch)
{
- req->ki_res = res;
- req->ki_res2 = res2;
+ kiocb_cancel_fn *old = NULL, *cancel = req->ki_cancel;
+
+ do {
+ if (cancel == KIOCB_CANCELLING) {
+ cpu_relax();
+ cancel = req->ki_cancel;
+ continue;
+ }
- if (req->ki_list.next) {
- struct kioctx *ctx = req->ki_ctx;
- unsigned long flags;
+ old = cancel;
+ cancel = cmpxchg(&req->ki_cancel, old, KIOCB_CANCELLED);
+ } while (old != cancel);
- spin_lock_irqsave(&ctx->ctx_lock, flags);
- list_del(&req->ki_list);
- spin_unlock_irqrestore(&ctx->ctx_lock, flags);
- }
+ req->ki_res = res;
+ req->ki_res2 = res2;
/*
* Special case handling for sync iocbs:
@@ -1204,7 +1263,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
}
}
- ret = put_user(KIOCB_KEY, &user_iocb->aio_key);
+ ret = put_user(req->ki_id, &user_iocb->aio_key);
if (unlikely(ret)) {
pr_debug("EFAULT: aio_key\n");
goto out_put_req;
@@ -1215,6 +1274,13 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
req->ki_pos = iocb->aio_offset;
req->ki_nbytes = iocb->aio_nbytes;
+ /*
+ * ki_obj.user must point to the right iocb before making the kiocb
+ * cancellable by setting ki_cancel = NULL:
+ */
+ smp_wmb();
+ req->ki_cancel = NULL;
+
ret = aio_run_iocb(req, iocb->aio_lio_opcode,
(char __user *)(unsigned long)iocb->aio_buf,
compat);
@@ -1305,19 +1371,16 @@ SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr,
static struct kiocb *lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb,
u32 key)
{
- struct list_head *pos;
-
- assert_spin_locked(&ctx->ctx_lock);
+ struct kiocb *req;
- if (key != KIOCB_KEY)
+ if (key > ctx->nr_events)
return NULL;
- /* TODO: use a hash or array, this sucks. */
- list_for_each(pos, &ctx->active_reqs) {
- struct kiocb *kiocb = list_kiocb(pos);
- if (kiocb->ki_obj.user == iocb)
- return kiocb;
- }
+ req = kiocb_from_id(ctx, key);
+
+ if (req && req->ki_obj.user == iocb)
+ return req;
+
return NULL;
}
@@ -1347,17 +1410,9 @@ SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
if (unlikely(!ctx))
return -EINVAL;
- spin_lock_irq(&ctx->ctx_lock);
-
kiocb = lookup_kiocb(ctx, iocb, key);
- if (kiocb)
- ret = kiocb_cancel(ctx, kiocb);
- else
- ret = -EINVAL;
-
- spin_unlock_irq(&ctx->ctx_lock);
-
- if (!ret) {
+ if (kiocb) {
+ kiocb_cancel(ctx, kiocb);
/*
* The result argument is no longer used - the io_event is
* always delivered via the ring buffer. -EINPROGRESS indicates
diff --git a/include/linux/aio.h b/include/linux/aio.h
index a6fe048..985e664 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -13,31 +13,80 @@ struct kioctx;
struct kiocb;
struct batch_complete;
-#define KIOCB_KEY 0
-
/*
- * We use ki_cancel == KIOCB_CANCELLED to indicate that a kiocb has been either
- * cancelled or completed (this makes a certain amount of sense because
- * successful cancellation - io_cancel() - does deliver the completion to
- * userspace).
+ * CANCELLATION
+ *
+ * SEMANTICS:
+ *
+ * Userspace may indicate (via io_cancel()) that they wish an iocb to be
+ * cancelled. io_cancel() does nothing more than indicate that the iocb should
+ * be cancelled if possible; it does not indicate whether it succeeded (nor will
+ * it block).
+ *
+ * If cancellation does succeed, userspace should be informed by passing
+ * -ECANCELLED to aio_complete(); userspace retrieves the io_event in the usual
+ * manner.
+ *
+ * DRIVERS:
+ *
+ * A driver that wishes to support cancellation may (but does not have to)
+ * implement a ki_cancel callback. If it doesn't implement a callback, it can
+ * check if the kiocb has been marked as cancelled (with kiocb_cancelled()).
+ * This is what the block layer does - when dequeuing requests it checks to see
+ * if it's for a bio that's been marked as cancelled, and if so doesn't send it
+ * to the device.
+ *
+ * Some drivers are going to need to kick something to notice that kiocb has
+ * been cancelled - those will want to implement a ki_cancel function. The
+ * callback could, say, issue a wakeup so that the thread processing the kiocb
+ * can notice the cancellation - or it might do something else entirely.
+ * kiocb->private is owned by the driver, so that ki_cancel can find the
+ * driver's state.
+ *
+ * A driver must guarantee that a kiocb completes in bounded time if it's been
+ * cancelled - this means that ki_cancel may have to guarantee forward progress.
+ *
+ * ki_cancel() may not call aio_complete().
*
- * And since most things don't implement kiocb cancellation and we'd really like
- * kiocb completion to be lockless when possible, we use ki_cancel to
- * synchronize cancellation and completion - we only set it to KIOCB_CANCELLED
- * with xchg() or cmpxchg(), see batch_complete_aio() and kiocb_cancel().
+ * SYNCHRONIZATION:
+ *
+ * The aio code ensures that after aio_complete() returns, no ki_cancel function
+ * can be called or still be executing. Thus, the driver should free whatever
+ * kiocb->private points to after calling aio_complete().
+ *
+ * Drivers must not set kiocb->ki_cancel directly; they should use
+ * kiocb_set_cancel_fn(), which guards against races with kiocb_cancel(). It
+ * might be the case that userspace cancelled the iocb before the driver called
+ * kiocb_set_cancel_fn() - in that case, kiocb_set_cancel_fn() will immediately
+ * call the cancel function you passed it, and leave ki_cancel set to
+ * KIOCB_CANCELLED.
+ */
+
+/*
+ * Special values for kiocb->ki_cancel - these indicate that a kiocb has either
+ * been cancelled, or has a ki_cancel function currently running.
*/
-#define KIOCB_CANCELLED ((void *) (~0ULL))
+#define KIOCB_CANCELLED ((void *) (-1LL))
+#define KIOCB_CANCELLING ((void *) (-2LL))
typedef int (kiocb_cancel_fn)(struct kiocb *);
struct kiocb {
struct kiocb *ki_next; /* batch completion */
+ /*
+ * If the aio_resfd field of the userspace iocb is not zero,
+ * this is the underlying eventfd context to deliver events to.
+ */
+ struct eventfd_ctx *ki_eventfd;
struct file *ki_filp;
struct kioctx *ki_ctx; /* NULL for sync ops */
- kiocb_cancel_fn *ki_cancel;
void *private;
+ /* Only zero up to here in aio_get_req() */
+ kiocb_cancel_fn *ki_cancel;
+ unsigned ki_id;
+
union {
void __user *user;
struct task_struct *tsk;
@@ -49,17 +98,13 @@ struct kiocb {
loff_t ki_pos;
size_t ki_nbytes; /* copy of iocb->aio_nbytes */
-
- struct list_head ki_list; /* the aio core uses this
- * for cancellation */
-
- /*
- * If the aio_resfd field of the userspace iocb is not zero,
- * this is the underlying eventfd context to deliver events to.
- */
- struct eventfd_ctx *ki_eventfd;
};
+static inline bool kiocb_cancelled(struct kiocb *kiocb)
+{
+ return kiocb->ki_cancel == KIOCB_CANCELLED;
+}
+
static inline bool is_sync_kiocb(struct kiocb *kiocb)
{
return kiocb->ki_ctx == NULL;
@@ -107,11 +152,6 @@ static inline void aio_complete(struct kiocb *iocb, long res, long res2)
aio_complete_batch(iocb, res, res2, NULL);
}
-static inline struct kiocb *list_kiocb(struct list_head *h)
-{
- return list_entry(h, struct kiocb, ki_list);
-}
-
/* for sysctl: */
extern unsigned long aio_nr;
extern unsigned long aio_max_nr;
--
1.8.2.1
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/