[PATCH 05/13] aio: mostly crap

From: Jens Axboe
Date: Mon May 25 2009 - 03:35:22 EST


First attempts at getting rid of some locking in aio

Signed-off-by: Jens Axboe <jens.axboe@xxxxxxxxxx>
---
fs/aio.c | 151 +++++++++++++++++++++++++++++++++------------------
include/linux/aio.h | 11 ++--
2 files changed, 103 insertions(+), 59 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index 76da125..98c82f2 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -79,9 +79,8 @@ static int __init aio_setup(void)
return 0;
}

-static void aio_free_ring(struct kioctx *ctx)
+static void __aio_free_ring(struct kioctx *ctx, struct aio_ring_info *info)
{
- struct aio_ring_info *info = &ctx->ring_info;
long i;

for (i=0; i<info->nr_pages; i++)
@@ -99,16 +98,28 @@ static void aio_free_ring(struct kioctx *ctx)
info->nr = 0;
}

-static int aio_setup_ring(struct kioctx *ctx)
+static void aio_free_ring(struct kioctx *ctx)
+{
+ unsigned int i;
+
+ for_each_possible_cpu(i) {
+ struct aio_ring_info *info = per_cpu_ptr(ctx->ring_info, i);
+
+ __aio_free_ring(ctx, info);
+ }
+ free_percpu(ctx->ring_info);
+ ctx->ring_info = NULL;
+}
+
+static int __aio_setup_ring(struct kioctx *ctx, struct aio_ring_info *info)
{
struct aio_ring *ring;
- struct aio_ring_info *info = &ctx->ring_info;
unsigned nr_events = ctx->max_reqs;
unsigned long size;
int nr_pages;

- /* Compensate for the ring buffer's head/tail overlap entry */
- nr_events += 2; /* 1 is required, 2 for good luck */
+ /* round nr_event to next power of 2 */
+ nr_events = roundup_pow_of_two(nr_events);

size = sizeof(struct aio_ring);
size += sizeof(struct io_event) * nr_events;
@@ -117,8 +128,6 @@ static int aio_setup_ring(struct kioctx *ctx)
if (nr_pages < 0)
return -EINVAL;

- nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring)) / sizeof(struct io_event);
-
info->nr = 0;
info->ring_pages = info->internal_pages;
if (nr_pages > AIO_RING_PAGES) {
@@ -158,7 +167,8 @@ static int aio_setup_ring(struct kioctx *ctx)
ring = kmap_atomic(info->ring_pages[0], KM_USER0);
ring->nr = nr_events; /* user copy */
ring->id = ctx->user_id;
- ring->head = ring->tail = 0;
+ atomic_set(&ring->head, 0);
+ ring->tail = 0;
ring->magic = AIO_RING_MAGIC;
ring->compat_features = AIO_RING_COMPAT_FEATURES;
ring->incompat_features = AIO_RING_INCOMPAT_FEATURES;
@@ -168,6 +178,27 @@ static int aio_setup_ring(struct kioctx *ctx)
return 0;
}

+static int aio_setup_ring(struct kioctx *ctx)
+{
+ unsigned int i;
+ int ret;
+
+ ctx->ring_info = alloc_percpu(struct aio_ring_info);
+ if (!ctx->ring_info)
+ return -ENOMEM;
+
+ ret = 0;
+ for_each_possible_cpu(i) {
+ struct aio_ring_info *info = per_cpu_ptr(ctx->ring_info, i);
+ int err;
+
+ err = __aio_setup_ring(ctx, info);
+ if (err && !ret)
+ ret = err;
+ }
+
+ return ret;
+}

/* aio_ring_event: returns a pointer to the event at the given index from
* kmap_atomic(, km). Release the pointer with put_aio_ring_event();
@@ -176,8 +207,8 @@ static int aio_setup_ring(struct kioctx *ctx)
#define AIO_EVENTS_FIRST_PAGE ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event))
#define AIO_EVENTS_OFFSET (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE)

-#define aio_ring_event(info, nr, km) ({ \
- unsigned pos = (nr) + AIO_EVENTS_OFFSET; \
+#define aio_ring_event(info, __nr, km) ({ \
+ unsigned pos = ((__nr) & ((info)->nr - 1)) + AIO_EVENTS_OFFSET; \
struct io_event *__event; \
__event = kmap_atomic( \
(info)->ring_pages[pos / AIO_EVENTS_PER_PAGE], km); \
@@ -262,7 +293,6 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)

atomic_set(&ctx->users, 1);
spin_lock_init(&ctx->ctx_lock);
- spin_lock_init(&ctx->ring_info.ring_lock);
init_waitqueue_head(&ctx->wait);

INIT_LIST_HEAD(&ctx->active_reqs);
@@ -426,6 +456,7 @@ void exit_aio(struct mm_struct *mm)
static struct kiocb *__aio_get_req(struct kioctx *ctx)
{
struct kiocb *req = NULL;
+ struct aio_ring_info *info;
struct aio_ring *ring;
int okay = 0;

@@ -448,15 +479,18 @@ static struct kiocb *__aio_get_req(struct kioctx *ctx)
/* Check if the completion queue has enough free space to
* accept an event from this io.
*/
- spin_lock_irq(&ctx->ctx_lock);
- ring = kmap_atomic(ctx->ring_info.ring_pages[0], KM_USER0);
- if (ctx->reqs_active < aio_ring_avail(&ctx->ring_info, ring)) {
+ local_irq_disable();
+ info = per_cpu_ptr(ctx->ring_info, smp_processor_id());
+ ring = kmap_atomic(info->ring_pages[0], KM_IRQ0);
+ if (ctx->reqs_active < aio_ring_avail(info, ring)) {
+ spin_lock(&ctx->ctx_lock);
list_add(&req->ki_list, &ctx->active_reqs);
ctx->reqs_active++;
+ spin_unlock(&ctx->ctx_lock);
okay = 1;
}
- kunmap_atomic(ring, KM_USER0);
- spin_unlock_irq(&ctx->ctx_lock);
+ kunmap_atomic(ring, KM_IRQ0);
+ local_irq_enable();

if (!okay) {
kmem_cache_free(kiocb_cachep, req);
@@ -578,9 +612,11 @@ int aio_put_req(struct kiocb *req)
{
struct kioctx *ctx = req->ki_ctx;
int ret;
+
spin_lock_irq(&ctx->ctx_lock);
ret = __aio_put_req(ctx, req);
spin_unlock_irq(&ctx->ctx_lock);
+
return ret;
}

@@ -954,7 +990,7 @@ int aio_complete(struct kiocb *iocb, long res, long res2)
struct aio_ring *ring;
struct io_event *event;
unsigned long flags;
- unsigned long tail;
+ unsigned tail;
int ret;

/*
@@ -972,15 +1008,14 @@ int aio_complete(struct kiocb *iocb, long res, long res2)
return 1;
}

- info = &ctx->ring_info;
-
/* add a completion event to the ring buffer.
* must be done holding ctx->ctx_lock to prevent
* other code from messing with the tail
* pointer since we might be called from irq
* context.
*/
- spin_lock_irqsave(&ctx->ctx_lock, flags);
+ local_irq_save(flags);
+ info = per_cpu_ptr(ctx->ring_info, smp_processor_id());

if (iocb->ki_run_list.prev && !list_empty(&iocb->ki_run_list))
list_del_init(&iocb->ki_run_list);
@@ -996,8 +1031,6 @@ int aio_complete(struct kiocb *iocb, long res, long res2)

tail = info->tail;
event = aio_ring_event(info, tail, KM_IRQ0);
- if (++tail >= info->nr)
- tail = 0;

event->obj = (u64)(unsigned long)iocb->ki_obj.user;
event->data = iocb->ki_user_data;
@@ -1013,13 +1046,14 @@ int aio_complete(struct kiocb *iocb, long res, long res2)
*/
smp_wmb(); /* make event visible before updating tail */

+ tail++;
info->tail = tail;
ring->tail = tail;

put_aio_ring_event(event, KM_IRQ0);
kunmap_atomic(ring, KM_IRQ1);

- pr_debug("added to ring %p at [%lu]\n", iocb, tail);
+ pr_debug("added to ring %p at [%u]\n", iocb, tail);

/*
* Check if the user asked us to deliver the result through an
@@ -1031,7 +1065,9 @@ int aio_complete(struct kiocb *iocb, long res, long res2)

put_rq:
/* everything turned out well, dispose of the aiocb. */
+ spin_lock(&ctx->ctx_lock);
ret = __aio_put_req(ctx, iocb);
+ spin_unlock(&ctx->ctx_lock);

/*
* We have to order our ring_info tail store above and test
@@ -1044,49 +1080,58 @@ put_rq:
if (waitqueue_active(&ctx->wait))
wake_up(&ctx->wait);

- spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+ local_irq_restore(flags);
+ return ret;
+}
+
+static int __aio_read_evt(struct aio_ring_info *info, struct aio_ring *ring,
+ struct io_event *ent)
+{
+ struct io_event *evp;
+ unsigned head;
+ int ret = 0;
+
+ do {
+ head = atomic_read(&ring->head);
+ if (head == ring->tail)
+ break;
+ evp = aio_ring_event(info, head, KM_USER1);
+ *ent = *evp;
+ smp_mb(); /* finish reading the event before updatng the head */
+ ++ret;
+ put_aio_ring_event(evp, KM_USER1);
+ } while (head != atomic_cmpxchg(&ring->head, head, head + 1));
+
return ret;
}

/* aio_read_evt
* Pull an event off of the ioctx's event ring. Returns the number of
* events fetched (0 or 1 ;-)
- * FIXME: make this use cmpxchg.
- * TODO: make the ringbuffer user mmap()able (requires FIXME).
+ * TODO: make the ringbuffer user mmap()able
*/
static int aio_read_evt(struct kioctx *ioctx, struct io_event *ent)
{
- struct aio_ring_info *info = &ioctx->ring_info;
- struct aio_ring *ring;
- unsigned long head;
- int ret = 0;
+ int i, ret = 0;

- ring = kmap_atomic(info->ring_pages[0], KM_USER0);
- dprintk("in aio_read_evt h%lu t%lu m%lu\n",
- (unsigned long)ring->head, (unsigned long)ring->tail,
- (unsigned long)ring->nr);
+ for_each_possible_cpu(i) {
+ struct aio_ring_info *info;
+ struct aio_ring *ring;

- if (ring->head == ring->tail)
- goto out;
+ info = per_cpu_ptr(ioctx->ring_info, i);
+ ring = kmap_atomic(info->ring_pages[0], KM_USER0);
+ dprintk("in aio_read_evt h%u t%u m%u\n",
+ atomic_read(&ring->head), ring->tail, ring->nr);

- spin_lock(&info->ring_lock);
-
- head = ring->head % info->nr;
- if (head != ring->tail) {
- struct io_event *evp = aio_ring_event(info, head, KM_USER1);
- *ent = *evp;
- head = (head + 1) % info->nr;
- smp_mb(); /* finish reading the event before updatng the head */
- ring->head = head;
- ret = 1;
- put_aio_ring_event(evp, KM_USER1);
+ ret = __aio_read_evt(info, ring, ent);
+ kunmap_atomic(ring, KM_USER0);
+ if (ret)
+ break;
}
- spin_unlock(&info->ring_lock);

-out:
- kunmap_atomic(ring, KM_USER0);
- dprintk("leaving aio_read_evt: %d h%lu t%lu\n", ret,
- (unsigned long)ring->head, (unsigned long)ring->tail);
+ dprintk("leaving aio_read_evt: %d h%u t%u\n", ret,
+ atomic_read(&ring->head), ring->tail);
+
return ret;
}

diff --git a/include/linux/aio.h b/include/linux/aio.h
index b16a957..9a7acb4 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
@@ -149,7 +149,7 @@ struct kiocb {
struct aio_ring {
unsigned id; /* kernel internal index number */
unsigned nr; /* number of io_events */
- unsigned head;
+ atomic_t head;
unsigned tail;

unsigned magic;
@@ -157,11 +157,11 @@ struct aio_ring {
unsigned incompat_features;
unsigned header_length; /* size of aio_ring */

-
- struct io_event io_events[0];
+ struct io_event io_events[0];
}; /* 128 bytes + ring size */

-#define aio_ring_avail(info, ring) (((ring)->head + (info)->nr - 1 - (ring)->tail) % (info)->nr)
+#define aio_ring_avail(info, ring) \
+ ((info)->nr + (unsigned) atomic_read(&(ring)->head) - (ring)->tail)

#define AIO_RING_PAGES 8
struct aio_ring_info {
@@ -169,7 +169,6 @@ struct aio_ring_info {
unsigned long mmap_size;

struct page **ring_pages;
- spinlock_t ring_lock;
long nr_pages;

unsigned nr, tail;
@@ -197,7 +196,7 @@ struct kioctx {
/* sys_io_setup currently limits this to an unsigned int */
unsigned max_reqs;

- struct aio_ring_info ring_info;
+ struct aio_ring_info *ring_info;

struct delayed_work wq;

--
1.6.3.rc0.1.gf800

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/