[RFC 21/32] mars: add new module xio_copy
From: Thomas Schoebel-Theuer
Date: Fri Dec 30 2016 - 17:59:11 EST
Signed-off-by: Thomas Schoebel-Theuer <tst@xxxxxxxxxxxxxxxxxx>
---
drivers/staging/mars/xio_bricks/xio_copy.c | 1005 ++++++++++++++++++++++++++++
include/linux/xio/xio_copy.h | 115 ++++
2 files changed, 1120 insertions(+)
create mode 100644 drivers/staging/mars/xio_bricks/xio_copy.c
create mode 100644 include/linux/xio/xio_copy.h
diff --git a/drivers/staging/mars/xio_bricks/xio_copy.c b/drivers/staging/mars/xio_bricks/xio_copy.c
new file mode 100644
index 000000000000..56b60f2f837e
--- /dev/null
+++ b/drivers/staging/mars/xio_bricks/xio_copy.c
@@ -0,0 +1,1005 @@
+/*
+ * MARS Long Distance Replication Software
+ *
+ * Copyright (C) 2010-2014 Thomas Schoebel-Theuer
+ * Copyright (C) 2011-2014 1&1 Internet AG
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+/* Copy brick (just for demonstration) */
+
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include <linux/string.h>
+
+#include <linux/xio/xio.h>
+#include <linux/brick/lib_limiter.h>
+
+#ifndef READ
+#define READ 0
+#define WRITE 1
+#endif
+
+#define COPY_CHUNK (PAGE_SIZE)
+#define NR_COPY_REQUESTS (32 * 1024 * 1024 / COPY_CHUNK)
+
+#define STATES_PER_PAGE (PAGE_SIZE / sizeof(struct copy_state))
+#define MAX_SUB_TABLES (NR_COPY_REQUESTS / STATES_PER_PAGE + (NR_COPY_REQUESTS % STATES_PER_PAGE ? 1 : 0)\
+ \
+)
+#define MAX_COPY_REQUESTS (PAGE_SIZE / sizeof(struct copy_state *) * STATES_PER_PAGE)
+
+#define GET_STATE(brick, index) \
+ ((brick)->st[(index) / STATES_PER_PAGE][(index) % STATES_PER_PAGE])
+
+/************************ own type definitions ***********************/
+
+#include <linux/xio/xio_copy.h>
+
+int xio_copy_overlap = 1;
+
+int xio_copy_read_prio = XIO_PRIO_NORMAL;
+
+int xio_copy_write_prio = XIO_PRIO_NORMAL;
+
+int xio_copy_read_max_fly;
+
+int xio_copy_write_max_fly;
+
+#define is_read_limited(brick) \
+ (xio_copy_read_max_fly > 0 && atomic_read(&(brick)->copy_read_flight) >= xio_copy_read_max_fly)
+
+#define is_write_limited(brick) \
+ (xio_copy_write_max_fly > 0 && atomic_read(&(brick)->copy_write_flight) >= xio_copy_write_max_fly)
+
+/************************ own helper functions ***********************/
+
+/* TODO:
+ * The clash logic is untested / alpha stage (Feb. 2011).
+ *
+ * For now, the output is never used, so this cannot do harm.
+ *
+ * In order to get the output really working / enterprise grade,
+ * some larger test effort should be invested.
+ */
+static inline
+void _clash(struct copy_brick *brick)
+{
+ brick->trigger = true;
+ set_bit(0, &brick->clash);
+ atomic_inc(&brick->total_clash_count);
+ wake_up_interruptible(&brick->event);
+}
+
+static inline
+int _clear_clash(struct copy_brick *brick)
+{
+ int old;
+
+ old = test_and_clear_bit(0, &brick->clash);
+ return old;
+}
+
+/* Current semantics:
+ *
+ * All writes are always going to the original input A. They are _not_
+ * replicated to B.
+ *
+ * In order to get B really uptodate, you have to replay the right
+ * transaction logs there (at the right time).
+ * [If you had no writes on A at all during the copy, of course
+ * this is not necessary]
+ *
+ * When utilize_mode is on, reads can utilize the already copied
+ * region from B, but only as long as this region has not been
+ * invalidated by writes (indicated by low_dirty).
+ *
+ * TODO: implement replicated writes, together with some transaction
+ * replay logic applying the transaction logs _only_ after
+ * crashes during inconsistency caused by partial replication of writes.
+ */
+static
+int _determine_input(struct copy_brick *brick, struct aio_object *aio)
+{
+ int rw;
+ int below;
+ int behind;
+ loff_t io_end;
+
+ if (!brick->utilize_mode || brick->low_dirty)
+ return INPUT_A_IO;
+
+ io_end = aio->io_pos + aio->io_len;
+ below = io_end <= brick->copy_start;
+ behind = !brick->copy_end || aio->io_pos >= brick->copy_end;
+ rw = aio->io_may_write | aio->io_rw;
+ if (rw) {
+ if (!behind) {
+ brick->low_dirty = true;
+ if (!below) {
+ _clash(brick);
+ wake_up_interruptible(&brick->event);
+ }
+ }
+ return INPUT_A_IO;
+ }
+
+ if (below)
+ return INPUT_B_IO;
+
+ return INPUT_A_IO;
+}
+
+#define GET_INDEX(pos) (((pos) / COPY_CHUNK) % NR_COPY_REQUESTS)
+#define GET_OFFSET(pos) ((pos) % COPY_CHUNK)
+
+static
+void __clear_aio(struct copy_brick *brick, struct aio_object *aio, int queue)
+{
+ struct copy_input *input;
+
+ input = queue ? brick->inputs[INPUT_B_COPY] : brick->inputs[INPUT_A_COPY];
+ GENERIC_INPUT_CALL(input, aio_put, aio);
+}
+
+static
+void _clear_aio(struct copy_brick *brick, int index, int queue)
+{
+ struct copy_state *st = &GET_STATE(brick, index);
+ struct aio_object *aio = st->table[queue];
+
+ if (aio) {
+ if (unlikely(st->active[queue])) {
+ XIO_ERR("clearing active aio, index = %d queue = %d\n", index, queue);
+ st->active[queue] = false;
+ }
+ __clear_aio(brick, aio, queue);
+ st->table[queue] = NULL;
+ }
+}
+
+static
+void _clear_all_aio(struct copy_brick *brick)
+{
+ int i;
+
+ for (i = 0; i < NR_COPY_REQUESTS; i++) {
+ GET_STATE(brick, i).state = COPY_STATE_START;
+ _clear_aio(brick, i, 0);
+ _clear_aio(brick, i, 1);
+ }
+}
+
+static
+void _clear_state_table(struct copy_brick *brick)
+{
+ int i;
+
+ for (i = 0; i < MAX_SUB_TABLES; i++) {
+ struct copy_state *sub_table = brick->st[i];
+
+ memset(sub_table, 0, PAGE_SIZE);
+ }
+}
+
+static
+void copy_endio(struct generic_callback *cb)
+{
+ struct copy_aio_aspect *aio_a;
+ struct aio_object *aio;
+ struct copy_brick *brick;
+ struct copy_state *st;
+ int index;
+ int queue;
+ int error = 0;
+
+ LAST_CALLBACK(cb);
+ aio_a = cb->cb_private;
+ CHECK_PTR(aio_a, err);
+ aio = aio_a->object;
+ CHECK_PTR(aio, err);
+ brick = aio_a->brick;
+ CHECK_PTR(brick, err);
+
+ queue = aio_a->queue;
+ index = GET_INDEX(aio->io_pos);
+ st = &GET_STATE(brick, index);
+
+ if (unlikely(queue < 0 || queue >= 2)) {
+ XIO_ERR("bad queue %d\n", queue);
+ error = -EINVAL;
+ goto exit;
+ }
+ st->active[queue] = false;
+ if (unlikely(st->table[queue])) {
+ XIO_ERR("table corruption at %d %d (%p => %p)\n", index, queue, st->table[queue], aio);
+ error = -EEXIST;
+ goto exit;
+ }
+ if (unlikely(cb->cb_error < 0)) {
+ error = cb->cb_error;
+ __clear_aio(brick, aio, queue);
+ /* This is racy, but does no harm.
+ * Worst case just produces more error output.
+ */
+ if (!brick->copy_error_count++)
+ XIO_WRN("IO error %d on index %d, old state = %d\n", cb->cb_error, index, st->state);
+ } else {
+ if (unlikely(st->table[queue])) {
+ XIO_ERR("overwriting index %d, state = %d\n", index, st->state);
+ _clear_aio(brick, index, queue);
+ }
+ st->table[queue] = aio;
+ }
+
+exit:
+ if (unlikely(error < 0)) {
+ st->error = error;
+ _clash(brick);
+ }
+ if (aio->io_rw)
+ atomic_dec(&brick->copy_write_flight);
+ else
+ atomic_dec(&brick->copy_read_flight);
+ brick->trigger = true;
+ wake_up_interruptible(&brick->event);
+ goto out_return;
+err:
+ XIO_FAT("cannot handle callback\n");
+out_return:;
+}
+
+static
+int _make_aio(
+struct copy_brick *brick, int index, int queue, void *data, loff_t pos, loff_t end_pos, int rw, int cs_mode)
+{
+ struct aio_object *aio;
+ struct copy_aio_aspect *aio_a;
+ struct copy_input *input;
+ int offset;
+ int len;
+ int status = -EAGAIN;
+
+ if (brick->clash || end_pos <= 0)
+ goto done;
+
+ aio = copy_alloc_aio(brick);
+ status = -ENOMEM;
+
+ aio_a = copy_aio_get_aspect(brick, aio);
+ if (unlikely(!aio_a)) {
+ XIO_FAT("cannot get own apsect\n");
+ goto done;
+ }
+
+ aio_a->brick = brick;
+ aio_a->queue = queue;
+ aio->io_may_write = rw;
+ aio->io_rw = rw;
+ aio->io_data = data;
+ aio->io_pos = pos;
+ aio->io_cs_mode = cs_mode;
+ offset = GET_OFFSET(pos);
+ len = COPY_CHUNK - offset;
+ if (pos + len > end_pos)
+ len = end_pos - pos;
+ aio->io_len = len;
+ aio->io_prio = rw ?
+ xio_copy_write_prio :
+ xio_copy_read_prio;
+ if (aio->io_prio < XIO_PRIO_HIGH || aio->io_prio > XIO_PRIO_LOW)
+ aio->io_prio = brick->io_prio;
+
+ SETUP_CALLBACK(aio, copy_endio, aio_a);
+
+ input = queue ? brick->inputs[INPUT_B_COPY] : brick->inputs[INPUT_A_COPY];
+ status = GENERIC_INPUT_CALL(input, aio_get, aio);
+ if (unlikely(status < 0)) {
+ XIO_ERR("status = %d\n", status);
+ obj_free(aio);
+ goto done;
+ }
+ if (unlikely(aio->io_len < len))
+ XIO_DBG("shorten len %d < %d\n", aio->io_len, len);
+ if (queue == 0) {
+ GET_STATE(brick, index).len = aio->io_len;
+ } else if (unlikely(aio->io_len < GET_STATE(brick, index).len)) {
+ XIO_DBG("shorten len %d < %d at index %d\n", aio->io_len, GET_STATE(brick, index).len, index);
+ GET_STATE(brick, index).len = aio->io_len;
+ }
+
+ GET_STATE(brick, index).active[queue] = true;
+ if (rw)
+ atomic_inc(&brick->copy_write_flight);
+ else
+ atomic_inc(&brick->copy_read_flight);
+ GENERIC_INPUT_CALL(input, aio_io, aio);
+
+done:
+ return status;
+}
+
+static
+void _update_percent(struct copy_brick *brick, bool force)
+{
+ if (force ||
+ brick->copy_last > brick->copy_start + 8 * 1024 * 1024 ||
+ time_is_before_jiffies(brick->last_jiffies + 5 * HZ) ||
+ (brick->copy_last == brick->copy_end && brick->copy_end > 0)) {
+ brick->copy_start = brick->copy_last;
+ brick->last_jiffies = jiffies;
+ brick->power.percent_done = brick->copy_end > 0 ? brick->copy_start * 100 / brick->copy_end : 0;
+ XIO_INF(
+ "'%s' copied %lld / %lld bytes (%d%%)\n",
+ brick->brick_path,
+ brick->copy_last,
+ brick->copy_end,
+ brick->power.percent_done);
+ }
+}
+
+/* The heart of this brick.
+ * State transition function of the finite automaton.
+ * In case no progress is possible (e.g. preconditions not
+ * yet true), the state is left as is (idempotence property:
+ * calling this too often does no harm, just costs performance).
+ */
+static
+int _next_state(struct copy_brick *brick, int index, loff_t pos)
+{
+ struct aio_object *aio0;
+ struct aio_object *aio1;
+ struct copy_state *st;
+ char state;
+ char next_state;
+ bool do_restart = false;
+ int progress = 0;
+ int status;
+
+ st = &GET_STATE(brick, index);
+ next_state = st->state;
+
+restart:
+ state = next_state;
+
+ do_restart = false;
+
+ switch (state) {
+ case COPY_STATE_RESET:
+ /* This state is only entered after errors or
+ * in restarting situations.
+ */
+ _clear_aio(brick, index, 1);
+ _clear_aio(brick, index, 0);
+ next_state = COPY_STATE_START;
+ /* fallthrough */
+ case COPY_STATE_START:
+ /* This is the relgular starting state.
+ * It must be zero, automatically entered via memset()
+ */
+ if (st->table[0] || st->table[1]) {
+ XIO_ERR("index %d not startable\n", index);
+ progress = -EPROTO;
+ goto idle;
+ }
+
+ _clear_aio(brick, index, 1);
+ _clear_aio(brick, index, 0);
+ st->writeout = false;
+ st->error = 0;
+
+ if (brick->is_aborting ||
+ is_read_limited(brick))
+ goto idle;
+
+ status = _make_aio(brick, index, 0, NULL, pos, brick->copy_end, READ, brick->verify_mode ? 2 : 0);
+ if (unlikely(status < 0)) {
+ XIO_DBG("status = %d\n", status);
+ progress = status;
+ break;
+ }
+
+ next_state = COPY_STATE_READ1;
+ if (!brick->verify_mode)
+ break;
+
+ next_state = COPY_STATE_START2;
+ /* fallthrough */
+ case COPY_STATE_START2:
+ status = _make_aio(brick, index, 1, NULL, pos, brick->copy_end, READ, 2);
+ if (unlikely(status < 0)) {
+ XIO_DBG("status = %d\n", status);
+ progress = status;
+ break;
+ }
+ next_state = COPY_STATE_READ2;
+ /* fallthrough */
+ case COPY_STATE_READ2:
+ aio1 = st->table[1];
+ if (!aio1) { /* idempotence: wait by unchanged state */
+ goto idle;
+ }
+ /* fallthrough = > wait for both aios to appear */
+ case COPY_STATE_READ1:
+ case COPY_STATE_READ3:
+ aio0 = st->table[0];
+ if (!aio0) { /* idempotence: wait by unchanged state */
+ goto idle;
+ }
+ if (brick->copy_limiter) {
+ int amount = (aio0->io_len - 1) / 1024 + 1;
+
+ rate_limit_sleep(brick->copy_limiter, amount);
+ }
+ /* on append mode: increase the end pointer dynamically */
+ if (brick->append_mode > 0 && aio0->io_total_size && aio0->io_total_size > brick->copy_end)
+ brick->copy_end = aio0->io_total_size;
+ /* do verify (when applicable) */
+ aio1 = st->table[1];
+ if (aio1 && state != COPY_STATE_READ3) {
+ int len = aio0->io_len;
+ bool ok;
+
+ if (len != aio1->io_len) {
+ ok = false;
+ } else if (aio0->io_cs_mode) {
+ static unsigned char null[sizeof(aio0->io_checksum)];
+
+ ok = !memcmp(aio0->io_checksum, aio1->io_checksum, sizeof(aio0->io_checksum));
+ if (ok)
+ ok = memcmp(aio0->io_checksum, null, sizeof(aio0->io_checksum)) != 0;
+ } else if (!aio0->io_data || !aio1->io_data) {
+ ok = false;
+ } else {
+ ok = !memcmp(aio0->io_data, aio1->io_data, len);
+ }
+
+ _clear_aio(brick, index, 1);
+
+ if (ok)
+ brick->verify_ok_count++;
+ else
+ brick->verify_error_count++;
+
+ if (ok || !brick->repair_mode) {
+ /* skip start of writing, goto final treatment of writeout */
+ next_state = COPY_STATE_CLEANUP;
+ break;
+ }
+ }
+
+ if (aio0->io_cs_mode > 1) { /* re-read, this time with data */
+ _clear_aio(brick, index, 0);
+ status = _make_aio(brick, index, 0, NULL, pos, brick->copy_end, READ, 0);
+ if (unlikely(status < 0)) {
+ XIO_DBG("status = %d\n", status);
+ progress = status;
+ next_state = COPY_STATE_RESET;
+ break;
+ }
+ next_state = COPY_STATE_READ3;
+ break;
+ }
+ next_state = COPY_STATE_WRITE;
+ /* fallthrough */
+ case COPY_STATE_WRITE:
+ if (is_write_limited(brick))
+ goto idle;
+ /* Obey ordering to get a strict "append" behaviour.
+ * We assume that we don't need to wait for completion
+ * of the previous write to avoid a sparse result file
+ * under all circumstances, i.e. we only assure that
+ * _starting_ the writes is in order.
+ * This is only correct when all lower bricks obey the
+ * order of io_io() operations.
+ * Currenty, bio and aio are obeying this. Be careful when
+ * implementing new IO bricks!
+ */
+ if (st->prev >= 0 && !GET_STATE(brick, st->prev).writeout)
+ goto idle;
+ aio0 = st->table[0];
+ if (unlikely(!aio0 || !aio0->io_data)) {
+ XIO_ERR("src buffer for write does not exist, state %d at index %d\n", state, index);
+ progress = -EILSEQ;
+ break;
+ }
+ if (unlikely(brick->is_aborting)) {
+ progress = -EINTR;
+ break;
+ }
+ /* start writeout */
+ status = _make_aio(brick, index, 1, aio0->io_data, pos, pos + aio0->io_len, WRITE, 0);
+ if (unlikely(status < 0)) {
+ XIO_DBG("status = %d\n", status);
+ progress = status;
+ next_state = COPY_STATE_RESET;
+ break;
+ }
+ /* Attention! overlapped IO behind EOF could
+ * lead to temporary inconsistent state of the
+ * file, because the write order may be different from
+ * strict O_APPEND behaviour.
+ */
+ if (xio_copy_overlap)
+ st->writeout = true;
+ next_state = COPY_STATE_WRITTEN;
+ /* fallthrough */
+ case COPY_STATE_WRITTEN:
+ aio1 = st->table[1];
+ if (!aio1) { /* idempotence: wait by unchanged state */
+ goto idle;
+ }
+ st->writeout = true;
+ /* rechecking means to start over again.
+ * ATTENTIION! this may lead to infinite request
+ * submission loops, intentionally.
+ * TODO: implement some timeout means.
+ */
+ if (brick->recheck_mode && brick->repair_mode) {
+ next_state = COPY_STATE_RESET;
+ break;
+ }
+ next_state = COPY_STATE_CLEANUP;
+ /* fallthrough */
+ case COPY_STATE_CLEANUP:
+ _clear_aio(brick, index, 1);
+ _clear_aio(brick, index, 0);
+ next_state = COPY_STATE_FINISHED;
+ /* fallthrough */
+ case COPY_STATE_FINISHED:
+ /* Indicate successful completion by remaining in this state.
+ * Restart of the finite automaton must be done externally.
+ */
+ goto idle;
+ default:
+ XIO_ERR("illegal state %d at index %d\n", state, index);
+ _clash(brick);
+ progress = -EILSEQ;
+ }
+
+ do_restart = (state != next_state);
+
+idle:
+ if (unlikely(progress < 0)) {
+ if (st->error >= 0)
+ st->error = progress;
+ XIO_DBG("progress = %d\n", progress);
+ progress = 0;
+ _clash(brick);
+ } else if (do_restart) {
+ goto restart;
+ } else if (st->state != next_state) {
+ progress++;
+ }
+
+ /* save the resulting state */
+ st->state = next_state;
+ return progress;
+}
+
+static
+int _run_copy(struct copy_brick *brick)
+{
+ int max;
+ loff_t pos;
+ loff_t limit = -1;
+
+ short prev;
+ int progress;
+
+ if (unlikely(_clear_clash(brick))) {
+ XIO_DBG("clash\n");
+ if (atomic_read(&brick->copy_read_flight) + atomic_read(&brick->copy_write_flight) > 0) {
+ /* wait until all pending copy IO has finished
+ */
+ _clash(brick);
+ XIO_DBG("re-clash\n");
+ brick_msleep(100);
+ return 0;
+ }
+ _clear_all_aio(brick);
+ _clear_state_table(brick);
+ }
+
+ /* Do at most max iterations in the below loop
+ */
+ max = NR_COPY_REQUESTS - atomic_read(&brick->io_flight) * 2;
+
+ prev = -1;
+ progress = 0;
+ for (
+ pos = brick->copy_last; pos < brick->copy_end || brick->append_mode > 1; pos = (
+ (pos / COPY_CHUNK) + 1) * COPY_CHUNK) {
+ int index = GET_INDEX(pos);
+ struct copy_state *st = &GET_STATE(brick, index);
+
+ if (max-- <= 0)
+ break;
+ st->prev = prev;
+ prev = index;
+ /* call the finite state automaton */
+ if (!(st->active[0] | st->active[1])) {
+ progress += _next_state(brick, index, pos);
+ limit = pos;
+ }
+ }
+
+ /* check the resulting state: can we advance the copy_last pointer? */
+ if (likely(progress && !brick->clash)) {
+ int count = 0;
+
+ for (pos = brick->copy_last; pos <= limit; pos = ((pos / COPY_CHUNK) + 1) * COPY_CHUNK) {
+ int index = GET_INDEX(pos);
+ struct copy_state *st = &GET_STATE(brick, index);
+
+ if (st->state != COPY_STATE_FINISHED)
+ break;
+ if (unlikely(st->error < 0)) {
+ /* check for fatal consistency errors */
+ if (st->error == -EMEDIUMTYPE) {
+ brick->copy_error = st->error;
+ brick->abort_mode = true;
+ XIO_WRN("Consistency is violated\n");
+ }
+ if (!brick->copy_error) {
+ brick->copy_error = st->error;
+ XIO_WRN("IO error = %d\n", st->error);
+ }
+ if (brick->abort_mode)
+ brick->is_aborting = true;
+ break;
+ }
+ /* rollover */
+ st->state = COPY_STATE_START;
+ count += st->len;
+ /* check contiguity */
+ if (unlikely(GET_OFFSET(pos) + st->len != COPY_CHUNK))
+ break;
+ }
+ if (count > 0) {
+ brick->copy_last += count;
+ get_lamport(&brick->copy_last_stamp);
+ _update_percent(brick, false);
+ }
+ }
+ return progress;
+}
+
+static
+bool _is_done(struct copy_brick *brick)
+{
+ if (brick_thread_should_stop())
+ brick->is_aborting = true;
+ return brick->is_aborting &&
+ atomic_read(&brick->copy_read_flight) + atomic_read(&brick->copy_write_flight) <= 0;
+}
+
+static int _copy_thread(void *data)
+{
+ struct copy_brick *brick = data;
+ int rounds = 0;
+
+ XIO_DBG("--------------- copy_thread %p starting\n", brick);
+ brick->copy_error = 0;
+ brick->copy_error_count = 0;
+ brick->verify_ok_count = 0;
+ brick->verify_error_count = 0;
+
+ _update_percent(brick, true);
+
+ xio_set_power_on_led((void *)brick, true);
+ brick->trigger = true;
+
+ while (!_is_done(brick)) {
+ loff_t old_start = brick->copy_start;
+ loff_t old_end = brick->copy_end;
+ int progress = 0;
+
+ if (old_end > 0) {
+ progress = _run_copy(brick);
+ if (!progress || ++rounds > 1000)
+ rounds = 0;
+ }
+
+ wait_event_interruptible_timeout(
+ brick->event,
+ progress > 0 ||
+ brick->trigger ||
+ brick->copy_start != old_start ||
+ brick->copy_end != old_end ||
+ _is_done(brick),
+ 1 * HZ);
+ brick->trigger = false;
+ }
+
+ /* check for fatal consistency errors */
+ if (brick->copy_error == -EMEDIUMTYPE) {
+ /* reset the whole area */
+ brick->copy_start = 0;
+ brick->copy_last = 0;
+ XIO_WRN("resetting the full copy area\n");
+ }
+ _update_percent(brick, true);
+
+ XIO_DBG(
+ "--------------- copy_thread terminating (%d read requests / %d write requests flying, copy_start = %lld copy_end = %lld)\n",
+ atomic_read(&brick->copy_read_flight),
+ atomic_read(&brick->copy_write_flight),
+ brick->copy_start,
+ brick->copy_end);
+
+ _clear_all_aio(brick);
+ xio_set_power_off_led((void *)brick, true);
+ XIO_DBG("--------------- copy_thread done.\n");
+ return 0;
+}
+
+/***************** own brick * input * output operations *****************/
+
+static int copy_get_info(struct copy_output *output, struct xio_info *info)
+{
+ struct copy_input *input = output->brick->inputs[INPUT_B_IO];
+
+ return GENERIC_INPUT_CALL(input, xio_get_info, info);
+}
+
+static int copy_io_get(struct copy_output *output, struct aio_object *aio)
+{
+ struct copy_input *input;
+ int index;
+ int status;
+
+ index = _determine_input(output->brick, aio);
+ input = output->brick->inputs[index];
+ status = GENERIC_INPUT_CALL(input, aio_get, aio);
+ if (status >= 0)
+ atomic_inc(&output->brick->io_flight);
+ return status;
+}
+
+static void copy_io_put(struct copy_output *output, struct aio_object *aio)
+{
+ struct copy_input *input;
+ int index;
+
+ index = _determine_input(output->brick, aio);
+ input = output->brick->inputs[index];
+ GENERIC_INPUT_CALL(input, aio_put, aio);
+ if (atomic_dec_and_test(&output->brick->io_flight)) {
+ output->brick->trigger = true;
+ wake_up_interruptible(&output->brick->event);
+ }
+}
+
+static void copy_io_io(struct copy_output *output, struct aio_object *aio)
+{
+ struct copy_input *input;
+ int index;
+
+ index = _determine_input(output->brick, aio);
+ input = output->brick->inputs[index];
+ GENERIC_INPUT_CALL(input, aio_io, aio);
+}
+
+static int copy_switch(struct copy_brick *brick)
+{
+ static int version;
+
+ XIO_DBG("power.button = %d\n", brick->power.button);
+ if (brick->power.button) {
+ if (brick->power.on_led)
+ goto done;
+ xio_set_power_off_led((void *)brick, false);
+ brick->is_aborting = false;
+ if (!brick->thread) {
+ brick->copy_last = brick->copy_start;
+ get_lamport(&brick->copy_last_stamp);
+ brick->thread = brick_thread_create(_copy_thread, brick, "xio_copy%d", version++);
+ if (brick->thread) {
+ brick->trigger = true;
+ } else {
+ xio_set_power_off_led((void *)brick, true);
+ XIO_ERR("could not start copy thread\n");
+ }
+ }
+ } else {
+ if (brick->power.off_led)
+ goto done;
+ xio_set_power_on_led((void *)brick, false);
+ if (brick->thread) {
+ XIO_INF("stopping thread...\n");
+ brick_thread_stop(brick->thread);
+ }
+ }
+done:
+ return 0;
+}
+
+/*************** informational * statistics **************/
+
+static
+char *copy_statistics(struct copy_brick *brick, int verbose)
+{
+ char *res = brick_string_alloc(1024);
+
+ snprintf(
+ res, 1024,
+ "copy_start = %lld copy_last = %lld copy_end = %lld copy_error = %d copy_error_count = %d verify_ok_count = %d verify_error_count = %d low_dirty = %d is_aborting = %d clash = %lu | total clash_count = %d | io_flight = %d copy_read_flight = %d copy_write_flight = %d\n",
+ brick->copy_start,
+ brick->copy_last,
+ brick->copy_end,
+ brick->copy_error,
+ brick->copy_error_count,
+ brick->verify_ok_count,
+ brick->verify_error_count,
+ brick->low_dirty,
+ brick->is_aborting,
+ brick->clash,
+ atomic_read(&brick->total_clash_count),
+ atomic_read(&brick->io_flight),
+ atomic_read(&brick->copy_read_flight),
+ atomic_read(&brick->copy_write_flight));
+
+ return res;
+}
+
+static
+void copy_reset_statistics(struct copy_brick *brick)
+{
+ atomic_set(&brick->total_clash_count, 0);
+}
+
+/*************** object * aspect constructors * destructors **************/
+
+static int copy_aio_aspect_init_fn(struct generic_aspect *_ini)
+{
+ struct copy_aio_aspect *ini = (void *)_ini;
+
+ (void)ini;
+ return 0;
+}
+
+static void copy_aio_aspect_exit_fn(struct generic_aspect *_ini)
+{
+ struct copy_aio_aspect *ini = (void *)_ini;
+
+ (void)ini;
+}
+
+XIO_MAKE_STATICS(copy);
+
+/********************* brick constructors * destructors *******************/
+
+static
+void _free_pages(struct copy_brick *brick)
+{
+ int i;
+
+ for (i = 0; i < MAX_SUB_TABLES; i++) {
+ struct copy_state *sub_table = brick->st[i];
+
+ if (!sub_table)
+ continue;
+
+ brick_block_free(sub_table, PAGE_SIZE);
+ }
+ brick_block_free(brick->st, PAGE_SIZE);
+}
+
+static int copy_brick_construct(struct copy_brick *brick)
+{
+ int i;
+
+ brick->st = brick_block_alloc(0, PAGE_SIZE);
+ memset(brick->st, 0, PAGE_SIZE);
+
+ for (i = 0; i < MAX_SUB_TABLES; i++) {
+ struct copy_state *sub_table;
+
+ /* this should be usually optimized away as dead code */
+ if (unlikely(i >= MAX_SUB_TABLES)) {
+ XIO_ERR("sorry, subtable index %d is too large.\n", i);
+ _free_pages(brick);
+ return -EINVAL;
+ }
+
+ sub_table = brick_block_alloc(0, PAGE_SIZE);
+ brick->st[i] = sub_table;
+ memset(sub_table, 0, PAGE_SIZE);
+ }
+
+ init_waitqueue_head(&brick->event);
+ sema_init(&brick->mutex, 1);
+ return 0;
+}
+
+static int copy_brick_destruct(struct copy_brick *brick)
+{
+ _free_pages(brick);
+ return 0;
+}
+
+static int copy_output_construct(struct copy_output *output)
+{
+ return 0;
+}
+
+static int copy_output_destruct(struct copy_output *output)
+{
+ return 0;
+}
+
+/************************ static structs ***********************/
+
+static struct copy_brick_ops copy_brick_ops = {
+ .brick_switch = copy_switch,
+ .brick_statistics = copy_statistics,
+ .reset_statistics = copy_reset_statistics,
+};
+
+static struct copy_output_ops copy_output_ops = {
+ .xio_get_info = copy_get_info,
+ .aio_get = copy_io_get,
+ .aio_put = copy_io_put,
+ .aio_io = copy_io_io,
+};
+
+const struct copy_input_type copy_input_type = {
+ .type_name = "copy_input",
+ .input_size = sizeof(struct copy_input),
+};
+
+static const struct copy_input_type *copy_input_types[] = {
+ ©_input_type,
+ ©_input_type,
+ ©_input_type,
+ ©_input_type,
+};
+
+const struct copy_output_type copy_output_type = {
+ .type_name = "copy_output",
+ .output_size = sizeof(struct copy_output),
+ .master_ops = ©_output_ops,
+ .output_construct = ©_output_construct,
+ .output_destruct = ©_output_destruct,
+};
+
+static const struct copy_output_type *copy_output_types[] = {
+ ©_output_type,
+};
+
+const struct copy_brick_type copy_brick_type = {
+ .type_name = "copy_brick",
+ .brick_size = sizeof(struct copy_brick),
+ .max_inputs = 4,
+ .max_outputs = 1,
+ .master_ops = ©_brick_ops,
+ .aspect_types = copy_aspect_types,
+ .default_input_types = copy_input_types,
+ .default_output_types = copy_output_types,
+ .brick_construct = ©_brick_construct,
+ .brick_destruct = ©_brick_destruct,
+};
+
+/***************** module init stuff ************************/
+
+int __init init_xio_copy(void)
+{
+ XIO_INF("init_copy()\n");
+ return copy_register_brick_type();
+}
+
+void exit_xio_copy(void)
+{
+ XIO_INF("exit_copy()\n");
+ copy_unregister_brick_type();
+}
diff --git a/include/linux/xio/xio_copy.h b/include/linux/xio/xio_copy.h
new file mode 100644
index 000000000000..f92e898419f1
--- /dev/null
+++ b/include/linux/xio/xio_copy.h
@@ -0,0 +1,115 @@
+/*
+ * MARS Long Distance Replication Software
+ *
+ * Copyright (C) 2010-2014 Thomas Schoebel-Theuer
+ * Copyright (C) 2011-2014 1&1 Internet AG
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ */
+
+#ifndef XIO_COPY_H
+#define XIO_COPY_H
+
+#include <linux/wait.h>
+#include <linux/semaphore.h>
+
+#define INPUT_A_IO 0
+#define INPUT_A_COPY 1
+#define INPUT_B_IO 2
+#define INPUT_B_COPY 3
+
+extern int xio_copy_overlap;
+extern int xio_copy_read_prio;
+extern int xio_copy_write_prio;
+extern int xio_copy_read_max_fly;
+extern int xio_copy_write_max_fly;
+
+enum {
+ COPY_STATE_RESET = -1,
+ COPY_STATE_START = 0, /* don't change this, it _must_ be zero */
+ COPY_STATE_START2,
+ COPY_STATE_READ1,
+ COPY_STATE_READ2,
+ COPY_STATE_READ3,
+ COPY_STATE_WRITE,
+ COPY_STATE_WRITTEN,
+ COPY_STATE_CLEANUP,
+ COPY_STATE_FINISHED,
+};
+
+struct copy_state {
+ struct aio_object *table[2];
+ bool active[2];
+ char state;
+ bool writeout;
+
+ short prev;
+ short len;
+ short error;
+};
+
+struct copy_aio_aspect {
+ GENERIC_ASPECT(aio);
+ struct copy_brick *brick;
+ int queue;
+};
+
+struct copy_brick {
+ XIO_BRICK(copy);
+ /* parameters */
+ struct rate_limiter *copy_limiter;
+ loff_t copy_start;
+
+ loff_t copy_end; /* stop working if == 0 */
+ int io_prio;
+
+ int append_mode; /* 1 = passively, 2 = actively */
+ bool verify_mode; /* 0 = copy, 1 = checksum+compare */
+ bool repair_mode; /* whether to repair in case of verify errors */
+ bool recheck_mode; /* whether to re-check after repairs (costs performance) */
+ bool utilize_mode; /* utilize already copied data */
+ bool abort_mode; /* abort on IO error (default is retry forever) */
+ /* readonly from outside */
+ loff_t copy_last; /* current working position */
+ struct timespec copy_last_stamp;
+ int copy_error;
+ int copy_error_count;
+ int verify_ok_count;
+ int verify_error_count;
+ bool low_dirty;
+ bool is_aborting;
+
+ /* internal */
+ bool trigger;
+ unsigned long clash;
+ atomic_t total_clash_count;
+ atomic_t io_flight;
+ atomic_t copy_read_flight;
+ atomic_t copy_write_flight;
+ unsigned long last_jiffies;
+
+ wait_queue_head_t event;
+ struct semaphore mutex;
+ struct task_struct *thread;
+ struct copy_state **st;
+};
+
+struct copy_input {
+ XIO_INPUT(copy);
+};
+
+struct copy_output {
+ XIO_OUTPUT(copy);
+};
+
+XIO_TYPES(copy);
+
+#endif
--
2.11.0