[PATCH 30/50] mars: add new file drivers/block/mars/xio_bricks/xio_aio.c
From: Thomas Schoebel-Theuer
Date: Tue Jul 01 2014 - 17:59:36 EST
Signed-off-by: Thomas Schoebel-Theuer <tst@xxxxxxxxxxxxxxxxxx>
---
drivers/block/mars/xio_bricks/xio_aio.c | 1224 +++++++++++++++++++++++++++++++
1 file changed, 1224 insertions(+)
create mode 100644 drivers/block/mars/xio_bricks/xio_aio.c
diff --git a/drivers/block/mars/xio_bricks/xio_aio.c b/drivers/block/mars/xio_bricks/xio_aio.c
new file mode 100644
index 0000000..5356fdb
--- /dev/null
+++ b/drivers/block/mars/xio_bricks/xio_aio.c
@@ -0,0 +1,1224 @@
+/* (c) 2010 Thomas Schoebel-Theuer / 1&1 Internet AG */
+
+#define XIO_DEBUGGING
+
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include <linux/version.h>
+#include <linux/string.h>
+#include <linux/list.h>
+#include <linux/types.h>
+#include <linux/blkdev.h>
+#include <linux/spinlock.h>
+#include <linux/wait.h>
+#include <linux/file.h>
+
+#include <linux/xio.h>
+#include <linux/brick/lib_timing.h>
+#include <linux/lib_mapfree.h>
+
+#include <linux/xio/xio_aio.h>
+
+#define XIO_MAX_AIO 1024
+#define XIO_MAX_AIO_READ 32
+
+static struct timing_stats timings[3];
+
+struct threshold aio_submit_threshold = {
+ .thr_ban = &xio_global_ban,
+ .thr_limit = AIO_SUBMIT_MAX_LATENCY,
+ .thr_factor = 10,
+ .thr_plus = 10000,
+};
+EXPORT_SYMBOL_GPL(aio_submit_threshold);
+
+struct threshold aio_io_threshold[2] = {
+ [0] = {
+ .thr_ban = &xio_global_ban,
+ .thr_limit = AIO_IO_R_MAX_LATENCY,
+ .thr_factor = 100,
+ .thr_plus = 0,
+ },
+ [1] = {
+ .thr_ban = &xio_global_ban,
+ .thr_limit = AIO_IO_W_MAX_LATENCY,
+ .thr_factor = 100,
+ .thr_plus = 0,
+ },
+};
+EXPORT_SYMBOL_GPL(aio_io_threshold);
+
+struct threshold aio_sync_threshold = {
+ .thr_ban = &xio_global_ban,
+ .thr_limit = AIO_SYNC_MAX_LATENCY,
+ .thr_factor = 100,
+ .thr_plus = 0,
+};
+EXPORT_SYMBOL_GPL(aio_sync_threshold);
+
+int aio_sync_mode = 2;
+EXPORT_SYMBOL_GPL(aio_sync_mode);
+
+/************************ mmu faking (provisionary) ***********************/
+
+/* Kludge: our kernel threads will have no mm context, but need one
+ * for stuff like ioctx_alloc() / aio_setup_ring() etc
+ * which expect userspace resources.
+ * We fake one.
+ * TODO: factor out the userspace stuff from AIO such that
+ * this fake is no longer necessary.
+ * Even better: replace do_mmap() in AIO stuff by something
+ * more friendly to kernelspace apps.
+ */
+#include <linux/mmu_context.h>
+
+struct mm_struct *mm_fake = NULL;
+struct task_struct *mm_fake_task = NULL;
+atomic_t mm_fake_count = ATOMIC_INIT(0);
+
+static inline void set_fake(void)
+{
+ mm_fake = current->mm;
+ if (mm_fake) {
+ XIO_DBG("initialized fake\n");
+ mm_fake_task = current;
+ get_task_struct(current); /* paired with put_task_struct() */
+ atomic_inc(&mm_fake->mm_count); /* paired with mmdrop() */
+ atomic_inc(&mm_fake->mm_users); /* paired with mmput() */
+ }
+}
+
+static inline void put_fake(void)
+{
+ int count = 0;
+
+ while (mm_fake && mm_fake_task) {
+ int remain = atomic_read(&mm_fake_count);
+
+ if (unlikely(remain != 0)) {
+ if (count++ < 10) {
+ XIO_WRN("cannot cleanup fake, remain = %d\n", remain);
+ brick_msleep(1000);
+ continue;
+ }
+ XIO_ERR("cannot cleanup fake, remain = %d\n", remain);
+ break;
+ } else {
+ XIO_DBG("cleaning up fake\n");
+ mmput(mm_fake);
+ mmdrop(mm_fake);
+ mm_fake = NULL;
+ put_task_struct(mm_fake_task);
+ mm_fake_task = NULL;
+ }
+ }
+}
+
+static inline void use_fake_mm(void)
+{
+ if (!current->mm && mm_fake) {
+ atomic_inc(&mm_fake_count);
+ XIO_DBG("using fake, count=%d\n", atomic_read(&mm_fake_count));
+ use_mm(mm_fake);
+ }
+}
+
+/* Cleanup faked mm, otherwise do_exit() will crash
+ */
+static inline void unuse_fake_mm(void)
+{
+ if (current->mm == mm_fake && mm_fake) {
+ XIO_DBG("unusing fake, count=%d\n", atomic_read(&mm_fake_count));
+ atomic_dec(&mm_fake_count);
+ unuse_mm(mm_fake);
+ current->mm = NULL;
+ }
+}
+
+/************************ own type definitions ***********************/
+
+/***************** some helpers *****************/
+
+static inline
+void _enqueue(struct aio_threadinfo *tinfo, struct aio_aio_aspect *aio_a, int prio, bool at_end)
+{
+ prio++;
+ if (unlikely(prio < 0))
+ prio = 0;
+ else if (unlikely(prio >= XIO_PRIO_NR))
+ prio = XIO_PRIO_NR - 1;
+
+ aio_a->enqueue_stamp = cpu_clock(raw_smp_processor_id());
+
+ spin_lock(&tinfo->lock);
+
+ if (at_end)
+ list_add_tail(&aio_a->io_head, &tinfo->aio_list[prio]);
+ else
+ list_add(&aio_a->io_head, &tinfo->aio_list[prio]);
+ tinfo->queued[prio]++;
+ atomic_inc(&tinfo->queued_sum);
+
+ spin_unlock(&tinfo->lock);
+
+ atomic_inc(&tinfo->total_enqueue_count);
+
+ wake_up_interruptible_all(&tinfo->event);
+}
+
+static inline
+struct aio_aio_aspect *_dequeue(struct aio_threadinfo *tinfo)
+{
+ struct aio_aio_aspect *aio_a = NULL;
+ int prio;
+
+ spin_lock(&tinfo->lock);
+
+ for (prio = 0; prio < XIO_PRIO_NR; prio++) {
+ struct list_head *start = &tinfo->aio_list[prio];
+ struct list_head *tmp = start->next;
+
+ if (tmp != start) {
+ list_del_init(tmp);
+ tinfo->queued[prio]--;
+ atomic_dec(&tinfo->queued_sum);
+ aio_a = container_of(tmp, struct aio_aio_aspect, io_head);
+ goto done;
+ }
+ }
+
+done:
+ spin_unlock(&tinfo->lock);
+
+ if (likely(aio_a && aio_a->object)) {
+ unsigned long long latency;
+
+ latency = cpu_clock(raw_smp_processor_id()) - aio_a->enqueue_stamp;
+ threshold_check(&aio_io_threshold[aio_a->object->io_rw & 1], latency);
+ }
+ return aio_a;
+}
+
+/***************** own brick * input * output operations *****************/
+
+static
+loff_t get_total_size(struct aio_output *output)
+{
+ struct file *file;
+ struct inode *inode;
+ loff_t min;
+
+ file = output->mf->mf_filp;
+ if (unlikely(!file)) {
+ XIO_ERR("file is not open\n");
+ return -EILSEQ;
+ }
+ if (unlikely(!file->f_mapping)) {
+ XIO_ERR("file %p has no mapping\n", file);
+ return -EILSEQ;
+ }
+ inode = file->f_mapping->host;
+ if (unlikely(!inode)) {
+ XIO_ERR("file %p has no inode\n", file);
+ return -EILSEQ;
+ }
+
+ min = i_size_read(inode);
+
+ /* Workaround for races in the page cache.
+ * It appears that concurrent reads and writes seem to
+ * result in inconsistent reads in some very rare cases, due to
+ * races. Sometimes, the inode claims that the file has been already
+ * appended by a write operation, but the data has not actually hit
+ * the page cache, such that a concurrent read gets NULL blocks.
+ */
+ if (!output->brick->is_static_device) {
+ loff_t max = 0;
+
+ mf_get_dirty(output->mf, &min, &max, 0, 99);
+ }
+
+ return min;
+}
+
+static int aio_io_get(struct aio_output *output, struct aio_object *aio)
+{
+ loff_t total_size;
+
+ if (unlikely(!output->mf)) {
+ XIO_ERR("brick is not switched on\n");
+ return -EILSEQ;
+ }
+
+ if (unlikely(aio->io_len <= 0)) {
+ XIO_ERR("bad io_len=%d\n", aio->io_len);
+ return -EILSEQ;
+ }
+
+ total_size = get_total_size(output);
+ if (unlikely(total_size < 0))
+ return total_size;
+ aio->io_total_size = total_size;
+
+ if (aio->obj_initialized) {
+ obj_get(aio);
+ return aio->io_len;
+ }
+
+ /* Buffered IO.
+ */
+ if (!aio->io_data) {
+ struct aio_aio_aspect *aio_a = aio_aio_get_aspect(output->brick, aio);
+
+ if (unlikely(!aio_a)) {
+ XIO_ERR("bad aio_a\n");
+ return -EILSEQ;
+ }
+ if (unlikely(aio->io_len <= 0)) {
+ XIO_ERR("bad io_len = %d\n", aio->io_len);
+ return -ENOMEM;
+ }
+ aio->io_data = brick_block_alloc(aio->io_pos, (aio_a->alloc_len = aio->io_len));
+ aio_a->do_dealloc = true;
+ atomic_inc(&output->total_alloc_count);
+ atomic_inc(&output->alloc_count);
+ }
+
+ obj_get_first(aio);
+ return aio->io_len;
+}
+
+static void aio_io_put(struct aio_output *output, struct aio_object *aio)
+{
+ struct file *file;
+ struct aio_aio_aspect *aio_a;
+
+ if (!obj_put(aio))
+ goto done;
+
+ if (likely(output->mf)) {
+ file = output->mf->mf_filp;
+ if (likely(file && file->f_mapping && file->f_mapping->host))
+ aio->io_total_size = get_total_size(output);
+ }
+
+ aio_a = aio_aio_get_aspect(output->brick, aio);
+ if (aio_a && aio_a->do_dealloc) {
+ brick_block_free(aio->io_data, aio_a->alloc_len);
+ atomic_dec(&output->alloc_count);
+ }
+ obj_free(aio);
+done:;
+}
+
+static
+void _complete(struct aio_output *output, struct aio_aio_aspect *aio_a, int err)
+{
+ struct aio_object *aio;
+
+ CHECK_PTR(aio_a, fatal);
+ aio = aio_a->object;
+ CHECK_PTR(aio, fatal);
+
+ if (err < 0) {
+ XIO_ERR("IO error %d at pos=%lld len=%d (aio=%p io_data=%p)\n",
+ err,
+ aio->io_pos,
+ aio->io_len,
+ aio,
+ aio->io_data);
+ } else {
+ aio_checksum(aio);
+ aio->io_flags |= AIO_UPTODATE;
+ }
+
+ CHECKED_CALLBACK(aio, err, err_found);
+
+done:
+ if (aio->io_rw)
+ atomic_dec(&output->write_count);
+ else
+ atomic_dec(&output->read_count);
+ mf_remove_dirty(output->mf, &aio_a->di);
+
+ aio_io_put(output, aio);
+ atomic_dec(&xio_global_io_flying);
+ goto out_return;
+err_found:
+ XIO_FAT("giving up...\n");
+ goto done;
+
+fatal:
+ XIO_FAT("bad pointer, giving up...\n");
+out_return:;
+}
+
+static
+void _complete_aio(struct aio_output *output, struct aio_object *aio, int err)
+{
+ struct aio_aio_aspect *aio_a;
+
+ obj_check(aio);
+ aio_a = aio_aio_get_aspect(output->brick, aio);
+ CHECK_PTR(aio_a, fatal);
+ _complete(output, aio_a, err);
+ goto out_return;
+fatal:
+ XIO_FAT("bad pointer, giving up...\n");
+out_return:;
+}
+
+static
+void _complete_all(struct list_head *tmp_list, struct aio_output *output, int err)
+{
+ while (!list_empty(tmp_list)) {
+ struct list_head *tmp = tmp_list->next;
+ struct aio_aio_aspect *aio_a = container_of(tmp, struct aio_aio_aspect, io_head);
+
+ list_del_init(tmp);
+ aio_a->di.dirty_stage = 3;
+ _complete(output, aio_a, err);
+ }
+}
+
+static void aio_io_io(struct aio_output *output, struct aio_object *aio)
+{
+ struct aio_threadinfo *tinfo = &output->tinfo[0];
+ struct aio_aio_aspect *aio_a;
+ int err = -EINVAL;
+
+ obj_get(aio);
+ atomic_inc(&xio_global_io_flying);
+
+ /* statistics */
+ if (aio->io_rw) {
+ atomic_inc(&output->total_write_count);
+ atomic_inc(&output->write_count);
+ } else {
+ atomic_inc(&output->total_read_count);
+ atomic_inc(&output->read_count);
+ }
+
+ if (unlikely(!output->mf || !output->mf->mf_filp))
+ goto done;
+
+ mapfree_set(output->mf, aio->io_pos, -1);
+
+ aio_a = aio_aio_get_aspect(output->brick, aio);
+ if (unlikely(!aio_a))
+ goto done;
+
+ _enqueue(tinfo, aio_a, aio->io_prio, true);
+ goto out_return;
+done:
+ _complete_aio(output, aio, err);
+out_return:;
+}
+
+static int aio_submit(struct aio_output *output, struct aio_aio_aspect *aio_a, bool use_fdsync)
+{
+ struct aio_object *aio = aio_a->object;
+
+ mm_segment_t oldfs;
+ int res;
+
+ struct iocb iocb = {
+ .aio_data = (__u64)aio_a,
+ .aio_lio_opcode = use_fdsync ? IOCB_CMD_FDSYNC : (aio->io_rw != 0 ? IOCB_CMD_PWRITE : IOCB_CMD_PREAD),
+ .aio_fildes = output->fd,
+ .aio_buf = (unsigned long)aio->io_data,
+ .aio_nbytes = aio->io_len,
+ .aio_offset = aio->io_pos,
+ /* .aio_reqprio = something(aio->io_prio) field exists, but not yet implemented in kernelspace :( */
+ };
+ struct iocb *iocbp = &iocb;
+ unsigned long long latency;
+
+ if (unlikely(output->fd < 0)) {
+ XIO_ERR("bad fd = %d\n", output->fd);
+ res = -EBADF;
+ goto done;
+ }
+
+ oldfs = get_fs();
+ set_fs(get_ds());
+ latency = TIME_STATS(&timings[aio->io_rw & 1], res = sys_io_submit(output->ctxp, 1, &iocbp));
+ set_fs(oldfs);
+
+ threshold_check(&aio_submit_threshold, latency);
+
+ atomic_inc(&output->total_submit_count);
+
+ if (likely(res >= 0))
+ atomic_inc(&output->submit_count);
+ else if (likely(res == -EAGAIN))
+ atomic_inc(&output->total_again_count);
+ else
+ XIO_ERR("error = %d\n", res);
+
+done:
+ return res;
+}
+
+static int aio_submit_dummy(struct aio_output *output)
+{
+ mm_segment_t oldfs;
+ int res;
+ int dummy;
+
+ struct iocb iocb = {
+ .aio_buf = (__u64)&dummy,
+ };
+ struct iocb *iocbp = &iocb;
+
+ oldfs = get_fs();
+ set_fs(get_ds());
+ res = sys_io_submit(output->ctxp, 1, &iocbp);
+ set_fs(oldfs);
+
+ if (likely(res >= 0))
+ atomic_inc(&output->submit_count);
+ return res;
+}
+
+static
+int aio_start_thread(
+ struct aio_output *output,
+ struct aio_threadinfo *tinfo,
+ int (*fn)(void *),
+ char class)
+{
+ int j;
+
+ for (j = 0; j < XIO_PRIO_NR; j++)
+ INIT_LIST_HEAD(&tinfo->aio_list[j]);
+ tinfo->output = output;
+ spin_lock_init(&tinfo->lock);
+ init_waitqueue_head(&tinfo->event);
+ init_waitqueue_head(&tinfo->terminate_event);
+ tinfo->terminated = false;
+ tinfo->thread = brick_thread_create(fn, tinfo, "xio_aio_%c%d", class, output->index);
+ if (unlikely(!tinfo->thread)) {
+ XIO_ERR("cannot create thread\n");
+ return -ENOENT;
+ }
+ return 0;
+}
+
+static
+void aio_stop_thread(struct aio_output *output, int i, bool do_submit_dummy)
+{
+ struct aio_threadinfo *tinfo = &output->tinfo[i];
+
+ if (tinfo->thread) {
+ XIO_DBG("stopping thread %d ...\n", i);
+ brick_thread_stop_nowait(tinfo->thread);
+
+/**/
+ if (do_submit_dummy) {
+ XIO_DBG("submitting dummy for wakeup %d...\n", i);
+ use_fake_mm();
+ aio_submit_dummy(output);
+ if (likely(current->mm))
+ unuse_fake_mm();
+ }
+
+ /* wait for termination */
+ XIO_DBG("waiting for thread %d ...\n", i);
+ wait_event_interruptible_timeout(
+ tinfo->terminate_event,
+ tinfo->terminated,
+ (60 - i * 2) * HZ);
+ if (likely(tinfo->terminated))
+ brick_thread_stop(tinfo->thread);
+ else
+ XIO_ERR("thread %d did not terminate - leaving a zombie\n", i);
+ }
+}
+
+static
+int aio_sync(struct file *file)
+{
+ int err;
+
+ switch (aio_sync_mode) {
+ case 1:
+#if defined(S_BIAS) || (defined(RHEL_MAJOR) && (RHEL_MAJOR < 7))
+ err = vfs_fsync_range(file, file->f_path.dentry, 0, LLONG_MAX, 1);
+#else
+ err = vfs_fsync_range(file, 0, LLONG_MAX, 1);
+#endif
+ break;
+ case 2:
+#if defined(S_BIAS) || (defined(RHEL_MAJOR) && (RHEL_MAJOR < 7))
+ err = vfs_fsync_range(file, file->f_path.dentry, 0, LLONG_MAX, 0);
+#else
+ err = vfs_fsync_range(file, 0, LLONG_MAX, 0);
+#endif
+ break;
+ default:
+ err = filemap_write_and_wait_range(file->f_mapping, 0, LLONG_MAX);
+ }
+
+ return err;
+}
+
+static
+void aio_sync_all(struct aio_output *output, struct list_head *tmp_list)
+{
+ unsigned long long latency;
+ int err;
+
+ output->fdsync_active = true;
+ atomic_inc(&output->total_fdsync_count);
+
+ latency = TIME_STATS(
+ &timings[2],
+ err = aio_sync(output->mf->mf_filp)
+ );
+
+ threshold_check(&aio_sync_threshold, latency);
+
+ output->fdsync_active = false;
+ wake_up_interruptible_all(&output->fdsync_event);
+ if (err < 0)
+ XIO_ERR("FDSYNC error %d\n", err);
+
+ /* Signal completion for the whole list.
+ * No locking needed, it's on the stack.
+ */
+ _complete_all(tmp_list, output, err);
+}
+
+/* Workaround for non-implemented aio_fsync()
+ */
+static
+int aio_sync_thread(void *data)
+{
+ struct aio_threadinfo *tinfo = data;
+ struct aio_output *output = tinfo->output;
+
+ XIO_DBG("sync thread has started on '%s'.\n", output->brick->brick_path);
+ /* set_user_nice(current, -20); */
+
+ while (!brick_thread_should_stop() || atomic_read(&tinfo->queued_sum) > 0) {
+ LIST_HEAD(tmp_list);
+ int i;
+
+ output->fdsync_active = false;
+ wake_up_interruptible_all(&output->fdsync_event);
+
+ wait_event_interruptible_timeout(
+ tinfo->event,
+ atomic_read(&tinfo->queued_sum) > 0,
+ HZ / 4);
+
+ spin_lock(&tinfo->lock);
+ for (i = 0; i < XIO_PRIO_NR; i++) {
+ struct list_head *start = &tinfo->aio_list[i];
+
+ if (!list_empty(start)) {
+ /* move over the whole list */
+ list_replace_init(start, &tmp_list);
+ atomic_sub(tinfo->queued[i], &tinfo->queued_sum);
+ tinfo->queued[i] = 0;
+ break;
+ }
+ }
+ spin_unlock(&tinfo->lock);
+
+ if (!list_empty(&tmp_list))
+ aio_sync_all(output, &tmp_list);
+ }
+
+ XIO_DBG("sync thread has stopped.\n");
+ tinfo->terminated = true;
+ wake_up_interruptible_all(&tinfo->terminate_event);
+ return 0;
+}
+
+static int aio_event_thread(void *data)
+{
+ struct aio_threadinfo *tinfo = data;
+ struct aio_output *output = tinfo->output;
+ struct aio_threadinfo *other = &output->tinfo[2];
+ struct io_event *events;
+ int err = -ENOMEM;
+
+ events = brick_mem_alloc(sizeof(struct io_event) * XIO_MAX_AIO_READ);
+
+ XIO_DBG("event thread has started.\n");
+
+ use_fake_mm();
+ if (!current->mm)
+ goto err;
+
+ err = aio_start_thread(output, &output->tinfo[2], aio_sync_thread, 'y');
+ if (unlikely(err < 0))
+ goto err;
+
+ while (!brick_thread_should_stop() || atomic_read(&tinfo->queued_sum) > 0) {
+ mm_segment_t oldfs;
+ int count;
+ int i;
+
+ struct timespec timeout = {
+ .tv_sec = 1,
+ };
+
+ oldfs = get_fs();
+ set_fs(get_ds());
+ /* TODO: don't timeout upon termination.
+ * Probably we should submit a dummy request.
+ */
+ count = sys_io_getevents(output->ctxp, 1, XIO_MAX_AIO_READ, events, &timeout);
+ set_fs(oldfs);
+
+ if (likely(count > 0))
+ atomic_sub(count, &output->submit_count);
+
+ for (i = 0; i < count; i++) {
+ struct aio_aio_aspect *aio_a = (void *)events[i].data;
+ struct aio_object *aio;
+ int err = events[i].res;
+
+ if (!aio_a)
+ continue; /* this was a dummy request */
+
+ aio_a->di.dirty_stage = 2;
+ aio = aio_a->object;
+
+ mapfree_set(output->mf, aio->io_pos, aio->io_pos + aio->io_len);
+
+ if (output->brick->o_fdsync
+ && err >= 0
+ && aio->io_rw != READ
+ && !aio->io_skip_sync
+ && !aio_a->resubmit++) {
+ /* workaround for non-implemented AIO FSYNC operation */
+ if (output->mf &&
+ output->mf->mf_filp &&
+ output->mf->mf_filp->f_op &&
+ !output->mf->mf_filp->f_op->aio_fsync) {
+ _enqueue(other, aio_a, aio->io_prio, true);
+ continue;
+ }
+ err = aio_submit(output, aio_a, true);
+ if (likely(err >= 0))
+ continue;
+ }
+
+ aio_a->di.dirty_stage = 3;
+ _complete(output, aio_a, err);
+
+ }
+ }
+ err = 0;
+
+err:
+ XIO_DBG("event thread has stopped, err = %d\n", err);
+
+ aio_stop_thread(output, 2, false);
+
+ unuse_fake_mm();
+
+ tinfo->terminated = true;
+ wake_up_interruptible_all(&tinfo->terminate_event);
+ brick_mem_free(events);
+ return err;
+}
+
+#if 1
+/* This should go to fs/open.c (as long as vfs_submit() is not implemented)
+ */
+#include <linux/fdtable.h>
+void fd_uninstall(unsigned int fd)
+{
+ struct files_struct *files = current->files;
+ struct fdtable *fdt;
+
+ XIO_DBG("fd = %d\n", fd);
+ if (unlikely(fd < 0)) {
+ XIO_ERR("bad fd = %d\n", fd);
+ goto out_return;
+ }
+ spin_lock(&files->file_lock);
+ fdt = files_fdtable(files);
+ rcu_assign_pointer(fdt->fd[fd], NULL);
+ spin_unlock(&files->file_lock);
+out_return:;
+}
+EXPORT_SYMBOL(fd_uninstall);
+#endif
+
+static
+atomic_t ioctx_count = ATOMIC_INIT(0);
+
+static
+void _destroy_ioctx(struct aio_output *output)
+{
+ if (unlikely(!output))
+ goto done;
+
+ aio_stop_thread(output, 1, true);
+
+ use_fake_mm();
+
+ if (likely(output->ctxp)) {
+ mm_segment_t oldfs;
+ int err;
+
+ XIO_DBG("ioctx count = %d destroying %p\n", atomic_read(&ioctx_count), (void *)output->ctxp);
+ oldfs = get_fs();
+ set_fs(get_ds());
+ err = sys_io_destroy(output->ctxp);
+ set_fs(oldfs);
+ atomic_dec(&ioctx_count);
+ XIO_DBG("ioctx count = %d status = %d\n", atomic_read(&ioctx_count), err);
+ output->ctxp = 0;
+ }
+
+ if (likely(output->fd >= 0)) {
+ XIO_DBG("destroying fd %d\n", output->fd);
+ fd_uninstall(output->fd);
+ put_unused_fd(output->fd);
+ output->fd = -1;
+ }
+
+done:
+ if (likely(current->mm))
+ unuse_fake_mm();
+}
+
+static
+int _create_ioctx(struct aio_output *output)
+{
+ struct file *file;
+
+ mm_segment_t oldfs;
+ int err = -EINVAL;
+
+ CHECK_PTR_NULL(output, done);
+ CHECK_PTR_NULL(output->mf, done);
+ file = output->mf->mf_filp;
+ CHECK_PTR_NULL(file, done);
+
+ /* TODO: this is provisionary. We only need it for sys_io_submit()
+ * which uses userspace concepts like file handles.
+ * This should be accompanied by a future kernelsapce vfs_submit() or
+ * do_submit() which currently does not exist :(
+ */
+ err = get_unused_fd();
+ XIO_DBG("file %p '%s' new fd = %d\n", file, output->mf->mf_name, err);
+ if (unlikely(err < 0)) {
+ XIO_ERR("cannot get fd, err=%d\n", err);
+ goto done;
+ }
+ output->fd = err;
+ fd_install(err, file);
+
+ use_fake_mm();
+
+ err = -ENOMEM;
+ if (unlikely(!current->mm)) {
+ XIO_ERR("cannot fake mm\n");
+ goto done;
+ }
+
+ XIO_DBG("ioctx count = %d old = %p\n", atomic_read(&ioctx_count), (void *)output->ctxp);
+ output->ctxp = 0;
+
+ oldfs = get_fs();
+ set_fs(get_ds());
+ err = sys_io_setup(XIO_MAX_AIO, &output->ctxp);
+ set_fs(oldfs);
+ if (likely(output->ctxp))
+ atomic_inc(&ioctx_count);
+ XIO_DBG("ioctx count = %d new = %p status = %d\n", atomic_read(&ioctx_count), (void *)output->ctxp, err);
+ if (unlikely(err < 0)) {
+ XIO_ERR("io_setup failed, err=%d\n", err);
+ goto done;
+ }
+
+ err = aio_start_thread(output, &output->tinfo[1], aio_event_thread, 'e');
+ if (unlikely(err < 0)) {
+ XIO_ERR("could not start event thread\n");
+ goto done;
+ }
+
+done:
+ if (likely(current->mm))
+ unuse_fake_mm();
+ return err;
+}
+
+static int aio_submit_thread(void *data)
+{
+ struct aio_threadinfo *tinfo = data;
+ struct aio_output *output = tinfo->output;
+ struct file *file;
+ int err = -EINVAL;
+
+ XIO_DBG("submit thread has started.\n");
+
+ file = output->mf->mf_filp;
+
+ use_fake_mm();
+
+ while (!brick_thread_should_stop() || atomic_read(&output->read_count) + atomic_read(&output->write_count) + atomic_read(&tinfo->queued_sum) > 0) {
+ struct aio_aio_aspect *aio_a;
+ struct aio_object *aio;
+ int sleeptime;
+ int status;
+
+ wait_event_interruptible_timeout(
+ tinfo->event,
+ atomic_read(&tinfo->queued_sum) > 0,
+ HZ / 4);
+
+ aio_a = _dequeue(tinfo);
+ if (!aio_a)
+ continue;
+
+ aio = aio_a->object;
+ status = -EINVAL;
+ CHECK_PTR(aio, error);
+
+ mapfree_set(output->mf, aio->io_pos, -1);
+
+ aio_a->di.dirty_stage = 0;
+ if (aio->io_rw)
+ mf_insert_dirty(output->mf, &aio_a->di);
+
+ aio->io_total_size = get_total_size(output);
+
+ /* check for reads crossing the EOF boundary (special case) */
+ if (aio->io_timeout > 0 &&
+ !aio->io_rw &&
+ aio->io_pos + aio->io_len > aio->io_total_size) {
+ loff_t len = aio->io_total_size - aio->io_pos;
+
+ if (len > 0) {
+ if (aio->io_len > len)
+ aio->io_len = len;
+ } else {
+ if (!aio_a->start_jiffies)
+ aio_a->start_jiffies = jiffies;
+ if ((long long)jiffies - aio_a->start_jiffies <= aio->io_timeout) {
+ if (atomic_read(&tinfo->queued_sum) <= 0) {
+ atomic_inc(&output->total_msleep_count);
+ brick_msleep(1000 * 4 / HZ);
+ }
+ _enqueue(tinfo, aio_a, XIO_PRIO_LOW, true);
+ continue;
+ }
+ XIO_DBG("ENODATA %lld\n", len);
+ _complete(output, aio_a, -ENODATA);
+ continue;
+ }
+ }
+
+ sleeptime = 1;
+ for (;;) {
+ aio_a->di.dirty_stage = 1;
+ status = aio_submit(output, aio_a, false);
+
+ if (likely(status != -EAGAIN))
+ break;
+ aio_a->di.dirty_stage = 0;
+ atomic_inc(&output->total_delay_count);
+ brick_msleep(sleeptime);
+ if (sleeptime < 100)
+ sleeptime++;
+ }
+
+error:
+ if (unlikely(status < 0))
+ _complete_aio(output, aio, status);
+ }
+
+ XIO_DBG("submit thread has stopped, status = %d.\n", err);
+
+ if (likely(current->mm))
+ unuse_fake_mm();
+
+ tinfo->terminated = true;
+ wake_up_interruptible_all(&tinfo->terminate_event);
+ return err;
+}
+
+static int aio_get_info(struct aio_output *output, struct xio_info *info)
+{
+ struct file *file;
+
+ if (unlikely(!output || !output->mf))
+ return -EINVAL;
+ file = output->mf->mf_filp;
+ if (unlikely(!file || !file->f_mapping || !file->f_mapping->host))
+ return -EINVAL;
+
+ info->tf_align = 1;
+ info->tf_min_size = 1;
+ info->current_size = get_total_size(output);
+
+ XIO_DBG("determined file size = %lld\n", info->current_size);
+
+ return 0;
+}
+
+/*************** informational * statistics **************/
+
+static noinline
+char *aio_statistics(struct aio_brick *brick, int verbose)
+{
+ struct aio_output *output = brick->outputs[0];
+ char *res = brick_string_alloc(4096);
+ char *sync = NULL;
+ int pos = 0;
+
+ pos += report_timing(&timings[0], res + pos, 4096 - pos);
+ pos += report_timing(&timings[1], res + pos, 4096 - pos);
+ pos += report_timing(&timings[2], res + pos, 4096 - pos);
+
+ snprintf(res + pos, 4096 - pos,
+ "total reads = %d writes = %d allocs = %d submits = %d again = %d delays = %d msleeps = %d fdsyncs = %d fdsync_waits = %d map_free = %d | flying reads = %d writes = %d allocs = %d submits = %d q0 = %d q1 = %d q2 = %d | total q0 = %d q1 = %d q2 = %d %s\n",
+ atomic_read(&output->total_read_count),
+ atomic_read(&output->total_write_count),
+ atomic_read(&output->total_alloc_count),
+ atomic_read(&output->total_submit_count),
+ atomic_read(&output->total_again_count),
+ atomic_read(&output->total_delay_count),
+ atomic_read(&output->total_msleep_count),
+ atomic_read(&output->total_fdsync_count),
+ atomic_read(&output->total_fdsync_wait_count),
+ atomic_read(&output->total_mapfree_count),
+ atomic_read(&output->read_count),
+ atomic_read(&output->write_count),
+ atomic_read(&output->alloc_count),
+ atomic_read(&output->submit_count),
+ atomic_read(&output->tinfo[0].queued_sum),
+ atomic_read(&output->tinfo[1].queued_sum),
+ atomic_read(&output->tinfo[2].queued_sum),
+ atomic_read(&output->tinfo[0].total_enqueue_count),
+ atomic_read(&output->tinfo[1].total_enqueue_count),
+ atomic_read(&output->tinfo[2].total_enqueue_count),
+ sync ? sync : "");
+
+ if (sync)
+ brick_string_free(sync);
+
+ return res;
+}
+
+static noinline
+void aio_reset_statistics(struct aio_brick *brick)
+{
+ struct aio_output *output = brick->outputs[0];
+ int i;
+
+ atomic_set(&output->total_read_count, 0);
+ atomic_set(&output->total_write_count, 0);
+ atomic_set(&output->total_alloc_count, 0);
+ atomic_set(&output->total_submit_count, 0);
+ atomic_set(&output->total_again_count, 0);
+ atomic_set(&output->total_delay_count, 0);
+ atomic_set(&output->total_msleep_count, 0);
+ atomic_set(&output->total_fdsync_count, 0);
+ atomic_set(&output->total_fdsync_wait_count, 0);
+ atomic_set(&output->total_mapfree_count, 0);
+ for (i = 0; i < 3; i++) {
+ struct aio_threadinfo *tinfo = &output->tinfo[i];
+
+ atomic_set(&tinfo->total_enqueue_count, 0);
+ }
+}
+
+/*************** object * aspect constructors * destructors **************/
+
+static int aio_aio_aspect_init_fn(struct generic_aspect *_ini)
+{
+ struct aio_aio_aspect *ini = (void *)_ini;
+
+ INIT_LIST_HEAD(&ini->io_head);
+ INIT_LIST_HEAD(&ini->di.dirty_head);
+ ini->di.dirty_aio = ini->object;
+ return 0;
+}
+
+static void aio_aio_aspect_exit_fn(struct generic_aspect *_ini)
+{
+ struct aio_aio_aspect *ini = (void *)_ini;
+
+ CHECK_HEAD_EMPTY(&ini->di.dirty_head);
+ CHECK_HEAD_EMPTY(&ini->io_head);
+}
+
+XIO_MAKE_STATICS(aio);
+
+/********************* brick constructors * destructors *******************/
+
+static int aio_brick_construct(struct aio_brick *brick)
+{
+ return 0;
+}
+
+static int aio_switch(struct aio_brick *brick)
+{
+ static int index;
+ struct aio_output *output = brick->outputs[0];
+ const char *path = output->brick->brick_path;
+ int flags = O_RDWR | O_LARGEFILE;
+ int status = 0;
+
+ XIO_DBG("power.button = %d\n", brick->power.button);
+ if (!brick->power.button)
+ goto cleanup;
+
+ if (brick->power.on_led || output->mf)
+ goto done;
+
+ xio_set_power_off_led((void *)brick, false);
+
+ if (brick->o_creat) {
+ flags |= O_CREAT;
+ XIO_DBG("using O_CREAT on %s\n", path);
+ }
+ if (brick->o_direct) {
+ flags |= O_DIRECT;
+ XIO_DBG("using O_DIRECT on %s\n", path);
+ }
+
+ output->mf = mapfree_get(path, flags);
+ if (unlikely(!output->mf)) {
+ XIO_ERR("could not open file = '%s' flags = %d\n", path, flags);
+ status = -ENOENT;
+ goto err;
+ }
+
+ output->index = ++index;
+
+ status = _create_ioctx(output);
+ if (unlikely(status < 0)) {
+ XIO_ERR("could not create ioctx, status = %d\n", status);
+ goto err;
+ }
+
+ status = aio_start_thread(output, &output->tinfo[0], aio_submit_thread, 's');
+ if (unlikely(status < 0)) {
+ XIO_ERR("could not start theads, status = %d\n", status);
+ goto err;
+ }
+
+ XIO_DBG("opened file '%s'\n", path);
+ xio_set_power_on_led((void *)brick, true);
+
+done:
+ return 0;
+
+err:
+ XIO_ERR("status = %d\n", status);
+cleanup:
+ if (brick->power.off_led)
+ goto done;
+
+ xio_set_power_on_led((void *)brick, false);
+
+ aio_stop_thread(output, 0, false);
+
+ _destroy_ioctx(output);
+
+ xio_set_power_off_led((void *)brick,
+ (output->tinfo[0].thread == NULL &&
+ output->tinfo[1].thread == NULL &&
+ output->tinfo[2].thread == NULL));
+
+ XIO_DBG("switch off off_led = %d status = %d\n", brick->power.off_led, status);
+ if (brick->power.off_led) {
+ if (output->mf) {
+ XIO_DBG("closing file = '%s'\n", output->mf->mf_name);
+ mapfree_put(output->mf);
+ output->mf = NULL;
+ }
+ }
+ return status;
+}
+
+static int aio_output_construct(struct aio_output *output)
+{
+ init_waitqueue_head(&output->fdsync_event);
+ output->fd = -1;
+ return 0;
+}
+
+static int aio_output_destruct(struct aio_output *output)
+{
+ if (unlikely(output->fd >= 0))
+ XIO_ERR("active fd = %d detected\n", output->fd);
+ return 0;
+}
+
+/************************ static structs ***********************/
+
+static struct aio_brick_ops aio_brick_ops = {
+ .brick_switch = aio_switch,
+ .brick_statistics = aio_statistics,
+ .reset_statistics = aio_reset_statistics,
+};
+
+static struct aio_output_ops aio_output_ops = {
+ .aio_get = aio_io_get,
+ .aio_put = aio_io_put,
+ .aio_io = aio_io_io,
+ .xio_get_info = aio_get_info,
+};
+
+const struct aio_input_type aio_input_type = {
+ .type_name = "aio_input",
+ .input_size = sizeof(struct aio_input),
+};
+
+static const struct aio_input_type *aio_input_types[] = {
+ &aio_input_type,
+};
+
+const struct aio_output_type aio_output_type = {
+ .type_name = "aio_output",
+ .output_size = sizeof(struct aio_output),
+ .master_ops = &aio_output_ops,
+ .output_construct = &aio_output_construct,
+ .output_destruct = &aio_output_destruct,
+};
+
+static const struct aio_output_type *aio_output_types[] = {
+ &aio_output_type,
+};
+
+const struct aio_brick_type aio_brick_type = {
+ .type_name = "aio_brick",
+ .brick_size = sizeof(struct aio_brick),
+ .max_inputs = 0,
+ .max_outputs = 1,
+ .master_ops = &aio_brick_ops,
+ .aspect_types = aio_aspect_types,
+ .default_input_types = aio_input_types,
+ .default_output_types = aio_output_types,
+ .brick_construct = &aio_brick_construct,
+};
+EXPORT_SYMBOL_GPL(aio_brick_type);
+
+/***************** module init stuff ************************/
+
+int __init init_xio_aio(void)
+{
+ XIO_DBG("init_aio()\n");
+ _aio_brick_type = (void *)&aio_brick_type;
+ set_fake();
+ return aio_register_brick_type();
+}
+
+void exit_xio_aio(void)
+{
+ XIO_DBG("exit_aio()\n");
+ put_fake();
+ aio_unregister_brick_type();
+}
--
2.0.0
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/