[PATCH 15/20] drbd: rework request processing for DRBD 9 multi-peer IO
From: Christoph Böhmwalder
Date: Fri Mar 27 2026 - 18:42:24 EST
Restructure the request state model to support simultaneous replication
to multiple peers.
Split the single request state word into a local and per-peer-node
state, with a per-request spinlock protecting concurrent updates from
independent peer connections.
Move the transfer log from per-connection to per-resource scope, and
replace coarse request-path locking by fine-grained locks for the
interval tree, completion lists, and transfer log.
Track request lifetime with a three-level reference counting scheme
that separates upper-layer completion, bitmap/activity-log cleanup,
and peer-ack processing.
Destruction defers memory reclaim via call_rcu() to allow lock-free
transfer log traversal.
Replace blocking write conflict waits by an asynchronous conflict
resolution system that queues overlapping requests by type onto
per-device work lists for deferred submission.
A new peer acknowledgment subsystem batches cross-node write
confirmations using dagtag-based sequencing, ensuring all peers have
acknowledged a write window before its requests are reclaimed.
The write path independently decides replication or out-of-sync
notification for each peer, while the read path adds peer selection
with read balancing.
Co-developed-by: Philipp Reisner <philipp.reisner@xxxxxxxxxx>
Signed-off-by: Philipp Reisner <philipp.reisner@xxxxxxxxxx>
Co-developed-by: Lars Ellenberg <lars.ellenberg@xxxxxxxxxx>
Signed-off-by: Lars Ellenberg <lars.ellenberg@xxxxxxxxxx>
Co-developed-by: Joel Colledge <joel.colledge@xxxxxxxxxx>
Signed-off-by: Joel Colledge <joel.colledge@xxxxxxxxxx>
Co-developed-by: Christoph Böhmwalder <christoph.boehmwalder@xxxxxxxxxx>
Signed-off-by: Christoph Böhmwalder <christoph.boehmwalder@xxxxxxxxxx>
---
drivers/block/drbd/drbd_req.c | 2990 +++++++++++++++++++++++----------
1 file changed, 2143 insertions(+), 847 deletions(-)
diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c
index 70f75ef07945..8652824b1d2e 100644
--- a/drivers/block/drbd/drbd_req.c
+++ b/drivers/block/drbd/drbd_req.c
@@ -18,7 +18,6 @@
#include "drbd_int.h"
#include "drbd_req.h"
-
static bool drbd_may_do_local_read(struct drbd_device *device, sector_t sector, int size);
static struct drbd_request *drbd_req_new(struct drbd_device *device, struct bio *bio_src)
@@ -28,160 +27,554 @@ static struct drbd_request *drbd_req_new(struct drbd_device *device, struct bio
req = mempool_alloc(&drbd_request_mempool, GFP_NOIO);
if (!req)
return NULL;
+
memset(req, 0, sizeof(*req));
- req->rq_state = (bio_data_dir(bio_src) == WRITE ? RQ_WRITE : 0)
- | (bio_op(bio_src) == REQ_OP_WRITE_ZEROES ? RQ_ZEROES : 0)
- | (bio_op(bio_src) == REQ_OP_DISCARD ? RQ_UNMAP : 0);
+ kref_get(&device->kref);
+
req->device = device;
req->master_bio = bio_src;
req->epoch = 0;
drbd_clear_interval(&req->i);
- req->i.sector = bio_src->bi_iter.bi_sector;
- req->i.size = bio_src->bi_iter.bi_size;
- req->i.local = true;
- req->i.waiting = false;
+ req->i.sector = bio_src->bi_iter.bi_sector;
+ req->i.size = bio_src->bi_iter.bi_size;
+ req->i.type = bio_data_dir(bio_src) == WRITE ? INTERVAL_LOCAL_WRITE : INTERVAL_LOCAL_READ;
INIT_LIST_HEAD(&req->tl_requests);
- INIT_LIST_HEAD(&req->w.list);
+ INIT_LIST_HEAD(&req->list);
INIT_LIST_HEAD(&req->req_pending_master_completion);
INIT_LIST_HEAD(&req->req_pending_local);
/* one reference to be put by __drbd_make_request */
atomic_set(&req->completion_ref, 1);
- /* one kref as long as completion_ref > 0 */
+ /* one reference as long as completion_ref > 0 */
+ refcount_set(&req->done_ref, 1);
+ /* one reference as long as done_ref > 0 */
+ refcount_set(&req->oos_send_ref, 1);
+ /* one kref as long as oos_send_ref > 0 */
kref_init(&req->kref);
+ spin_lock_init(&req->rq_lock);
+
+ req->local_rq_state = (bio_data_dir(bio_src) == WRITE ? RQ_WRITE : 0)
+ | (bio_op(bio_src) == REQ_OP_WRITE_ZEROES ? RQ_ZEROES : 0)
+ | (bio_op(bio_src) == REQ_OP_DISCARD ? RQ_UNMAP : 0);
+
return req;
}
+void drbd_reclaim_req(struct rcu_head *rp)
+{
+ struct drbd_request *req = container_of(rp, struct drbd_request, rcu);
+
+ kref_put(&req->device->kref, drbd_destroy_device);
+
+ mempool_free(req, &drbd_request_mempool);
+}
+
+static u64 peer_ack_mask(struct drbd_request *req)
+{
+ struct drbd_resource *resource = req->device->resource;
+ struct drbd_connection *connection;
+ u64 mask = 0;
+
+ spin_lock_irq(&req->rq_lock);
+ if (req->local_rq_state & RQ_LOCAL_OK)
+ mask |= NODE_MASK(resource->res_opts.node_id);
+
+ rcu_read_lock();
+ for_each_connection_rcu(connection, resource) {
+ int node_id = connection->peer_node_id;
+
+ if (req->net_rq_state[node_id] & RQ_NET_OK)
+ mask |= NODE_MASK(node_id);
+ }
+ rcu_read_unlock();
+ spin_unlock_irq(&req->rq_lock);
+
+ return mask;
+}
+
+static void queue_peer_ack_send(struct drbd_resource *resource,
+ struct drbd_request *req, struct drbd_peer_ack *peer_ack)
+{
+ struct drbd_connection *connection;
+
+ rcu_read_lock();
+ for_each_connection_rcu(connection, resource) {
+ unsigned int node_id = connection->peer_node_id;
+ if (connection->agreed_pro_version < 110 ||
+ connection->cstate[NOW] != C_CONNECTED) {
+ connection->last_peer_ack_dagtag_seen = peer_ack->dagtag_sector;
+ continue;
+ }
+
+ if (req->net_rq_state[node_id] & RQ_NET_SENT)
+ peer_ack->pending_mask |= NODE_MASK(node_id);
+
+ peer_ack->queued_mask |= NODE_MASK(node_id);
+ queue_work(connection->ack_sender, &connection->peer_ack_work);
+ }
+ rcu_read_unlock();
+}
+
+void drbd_destroy_peer_ack_if_done(struct drbd_peer_ack *peer_ack)
+{
+ struct drbd_resource *resource = peer_ack->resource;
+
+ lockdep_assert_held(&resource->peer_ack_lock);
+
+ if (peer_ack->queued_mask)
+ return;
+
+ list_del(&peer_ack->list);
+ kfree(peer_ack);
+}
+
+int w_queue_peer_ack(struct drbd_work *w, int cancel)
+{
+ struct drbd_resource *resource =
+ container_of(w, struct drbd_resource, peer_ack_work);
+ LIST_HEAD(work_list);
+ struct drbd_request *req, *tmp;
+
+ spin_lock_irq(&resource->peer_ack_lock);
+ list_splice_init(&resource->peer_ack_req_list, &work_list);
+ spin_unlock_irq(&resource->peer_ack_lock);
+
+ list_for_each_entry_safe(req, tmp, &work_list, list) {
+ struct drbd_peer_ack *peer_ack =
+ kzalloc_obj(struct drbd_peer_ack);
+
+ peer_ack->resource = resource;
+ INIT_LIST_HEAD(&peer_ack->list);
+ peer_ack->mask = peer_ack_mask(req);
+ peer_ack->dagtag_sector = req->dagtag_sector;
+
+ spin_lock_irq(&resource->peer_ack_lock);
+ list_add_tail(&peer_ack->list, &resource->peer_ack_list);
+ queue_peer_ack_send(resource, req, peer_ack);
+ drbd_destroy_peer_ack_if_done(peer_ack);
+ spin_unlock_irq(&resource->peer_ack_lock);
+
+ kref_put(&req->kref, drbd_req_destroy);
+ }
+ return 0;
+}
+
+void drbd_queue_peer_ack(struct drbd_resource *resource, struct drbd_request *req)
+{
+ lockdep_assert_held(&resource->peer_ack_lock);
+
+ list_add_tail(&req->list, &resource->peer_ack_req_list);
+ drbd_queue_work_if_unqueued(&resource->work, &resource->peer_ack_work);
+}
+
+static bool peer_ack_differs(struct drbd_request *req1, struct drbd_request *req2)
+{
+ unsigned int max_node_id = req1->device->resource->max_node_id;
+ unsigned int node_id;
+
+ for (node_id = 0; node_id <= max_node_id; node_id++)
+ if ((req1->net_rq_state[node_id] & RQ_NET_OK) !=
+ (req2->net_rq_state[node_id] & RQ_NET_OK))
+ return true;
+ return false;
+}
+
+static bool peer_ack_window_full(struct drbd_request *req)
+{
+ struct drbd_resource *resource = req->device->resource;
+ u32 peer_ack_window = resource->res_opts.peer_ack_window;
+ u64 last_dagtag = resource->last_peer_acked_dagtag + peer_ack_window;
+
+ return dagtag_newer_eq(req->dagtag_sector, last_dagtag);
+}
+
static void drbd_remove_request_interval(struct rb_root *root,
struct drbd_request *req)
{
struct drbd_device *device = req->device;
- struct drbd_interval *i = &req->i;
-
- drbd_remove_interval(root, i);
+ unsigned long flags;
- /* Wake up any processes waiting for this request to complete. */
- if (i->waiting)
- wake_up(&device->misc_wait);
+ spin_lock_irqsave(&device->interval_lock, flags);
+ drbd_remove_interval(root, &req->i);
+ spin_unlock_irqrestore(&device->interval_lock, flags);
}
void drbd_req_destroy(struct kref *kref)
{
struct drbd_request *req = container_of(kref, struct drbd_request, kref);
+
+ call_rcu(&req->rcu, drbd_reclaim_req);
+}
+
+static void drbd_req_done(struct drbd_request *req)
+{
struct drbd_device *device = req->device;
- const unsigned s = req->rq_state;
+ struct drbd_resource *resource = device->resource;
+ struct drbd_peer_device *peer_device;
+ unsigned int s = req->local_rq_state;
+ unsigned long modified_mask = 0;
+
+ lockdep_assert_held(&resource->state_rwlock);
+ lockdep_assert_irqs_disabled();
+
+#ifdef CONFIG_DRBD_TIMING_STATS
+ if (s & RQ_WRITE && req->i.size != 0) {
+ spin_lock(&device->timing_lock); /* local irq already disabled */
+ device->reqs++;
+ ktime_aggregate(device, req, in_actlog_kt);
+ ktime_aggregate(device, req, pre_submit_kt);
+ for_each_peer_device(peer_device, device) {
+ int node_id = peer_device->node_id;
+ unsigned ns = req->net_rq_state[node_id];
+ if (!(ns & RQ_NET_MASK))
+ continue;
+ ktime_aggregate_pd(peer_device, node_id, req, pre_send_kt);
+ ktime_aggregate_pd(peer_device, node_id, req, acked_kt);
+ ktime_aggregate_pd(peer_device, node_id, req, net_done_kt);
+ }
+ spin_unlock(&device->timing_lock);
+ }
+#endif
+
+ /* paranoia */
+ for_each_peer_device(peer_device, device) {
+ unsigned ns = req->net_rq_state[peer_device->node_id];
+ if (!(ns & RQ_NET_MASK))
+ continue;
+ if (ns & RQ_NET_DONE)
+ continue;
+
+ drbd_err(device,
+ "%s: Logic BUG rq_state: (0:%x, %d:%x), completion_ref = %d\n",
+ __func__, s, peer_device->node_id, ns, atomic_read(&req->completion_ref));
+ return;
+ }
+ /* more paranoia */
if ((req->master_bio && !(s & RQ_POSTPONED)) ||
- atomic_read(&req->completion_ref) ||
- (s & RQ_LOCAL_PENDING) ||
- ((s & RQ_NET_MASK) && !(s & RQ_NET_DONE))) {
- drbd_err(device, "drbd_req_destroy: Logic BUG rq_state = 0x%x, completion_ref = %d\n",
- s, atomic_read(&req->completion_ref));
+ atomic_read(&req->completion_ref) || (s & RQ_LOCAL_PENDING)) {
+ drbd_err(device, "%s: Logic BUG master_bio:%d rq_state: %x, completion_ref = %d\n",
+ __func__, !!req->master_bio, s, atomic_read(&req->completion_ref));
return;
}
- /* If called from mod_rq_state (expected normal case) or
- * drbd_send_and_submit (the less likely normal path), this holds the
- * req_lock, and req->tl_requests will typicaly be on ->transfer_log,
- * though it may be still empty (never added to the transfer log).
- *
- * If called from do_retry(), we do NOT hold the req_lock, but we are
- * still allowed to unconditionally list_del(&req->tl_requests),
- * because it will be on a local on-stack list only. */
- list_del_init(&req->tl_requests);
-
/* finally remove the request from the conflict detection
* respective block_id verification interval tree. */
- if (!drbd_interval_empty(&req->i)) {
- struct rb_root *root;
+ if (s & RQ_WRITE && !drbd_interval_empty(&req->i))
+ drbd_remove_request_interval(&device->requests, req);
+
+ /* There is a special case:
+ * we may notice late that IO was suspended,
+ * and postpone, or schedule for retry, a write,
+ * before it even was submitted or sent.
+ * In that case we do not want to touch the bitmap at all.
+ */
+ if ((s & RQ_WRITE) && (s & (RQ_POSTPONED|RQ_LOCAL_MASK)) != RQ_POSTPONED &&
+ req->i.size && get_ldev_if_state(device, D_DETACHING)) {
+ struct drbd_peer_md *peer_md = device->ldev->md.peers;
+ unsigned long bits = -1, mask = -1;
+ int node_id, max_node_id = device->resource->max_node_id;
+
+ for (node_id = 0; node_id <= max_node_id; node_id++) {
+ unsigned int net_rq_state;
+
+ net_rq_state = req->net_rq_state[node_id];
+ if (net_rq_state & RQ_NET_OK) {
+ int bitmap_index = peer_md[node_id].bitmap_index;
+
+ if (bitmap_index == -1)
+ continue;
+
+ if (net_rq_state & RQ_NET_SIS)
+ clear_bit(bitmap_index, &bits);
+ else
+ clear_bit(bitmap_index, &mask);
+ }
+ }
+ if (device->bitmap)
+ modified_mask =
+ drbd_set_sync(device, req->i.sector, req->i.size, bits, mask);
+ put_ldev(device);
+ }
- if (s & RQ_WRITE)
- root = &device->write_requests;
- else
- root = &device->read_requests;
- drbd_remove_request_interval(root, req);
- } else if (s & (RQ_NET_MASK & ~RQ_NET_DONE) && req->i.size != 0)
- drbd_err(device, "drbd_req_destroy: Logic BUG: interval empty, but: rq_state=0x%x, sect=%llu, size=%u\n",
- s, (unsigned long long)req->i.sector, req->i.size);
-
- /* if it was a write, we may have to set the corresponding
- * bit(s) out-of-sync first. If it had a local part, we need to
- * release the reference to the activity log. */
if (s & RQ_WRITE) {
- /* Set out-of-sync unless both OK flags are set
- * (local only or remote failed).
- * Other places where we set out-of-sync:
- * READ with local io-error */
-
- /* There is a special case:
- * we may notice late that IO was suspended,
- * and postpone, or schedule for retry, a write,
- * before it even was submitted or sent.
- * In that case we do not want to touch the bitmap at all.
- */
- struct drbd_peer_device *peer_device = first_peer_device(device);
- if ((s & (RQ_POSTPONED|RQ_LOCAL_MASK|RQ_NET_MASK)) != RQ_POSTPONED) {
- if (!(s & RQ_NET_OK) || !(s & RQ_LOCAL_OK))
- drbd_set_out_of_sync(peer_device, req->i.sector, req->i.size);
+ for_each_peer_device(peer_device, device) {
+ if (!(req->net_rq_state[peer_device->node_id] & RQ_NET_PENDING_OOS))
+ continue;
+
+ if (s & RQ_POSTPONED) {
+ drbd_err(device, "%s: Logic BUG RQ_NET_PENDING_OOS|RQ_POSTPONED\n",
+ __func__);
+ continue;
+ }
- if ((s & RQ_NET_OK) && (s & RQ_LOCAL_OK) && (s & RQ_NET_SIS))
- drbd_set_in_sync(peer_device, req->i.sector, req->i.size);
+ /*
+ * As an optimization, we only send out-of-sync if we
+ * set some bit for this peer. If we are not
+ * replicating to this peer and the same block(s) are
+ * overwritten several times, the peer only needs to be
+ * informed of the first change.
+ */
+ if (peer_device->bitmap_index != -1 &&
+ test_bit(peer_device->bitmap_index, &modified_mask))
+ _req_mod(req, READY_FOR_NET, peer_device);
+ else
+ _req_mod(req, SKIP_OOS, peer_device);
+
+ wake_up(&peer_device->connection->sender_work.q_wait);
}
+ }
+
+ /* one might be tempted to move the drbd_al_complete_io
+ * to the local io completion callback drbd_request_endio.
+ * but, if this was a mirror write, we may only
+ * drbd_al_complete_io after this is RQ_NET_DONE,
+ * otherwise the extent could be dropped from the al
+ * before it has actually been written on the peer.
+ * if we crash before our peer knows about the request,
+ * but after the extent has been dropped from the al,
+ * we would forget to resync the corresponding extent.
+ */
+ if (s & RQ_IN_ACT_LOG) {
+ if (get_ldev_if_state(device, D_DETACHING)) {
+ if (drbd_al_complete_io(device, &req->i))
+ set_bit(INTERVAL_AL_EXTENT_LAST, &req->i.flags);
+ put_ldev(device);
+ } else if (drbd_device_ratelimit(device, BACKEND)) {
+ drbd_warn(device, "Should have called drbd_al_complete_io(, %llu, %u), but my Disk seems to have failed :(\n",
+ (unsigned long long) req->i.sector, req->i.size);
- /* one might be tempted to move the drbd_al_complete_io
- * to the local io completion callback drbd_request_endio.
- * but, if this was a mirror write, we may only
- * drbd_al_complete_io after this is RQ_NET_DONE,
- * otherwise the extent could be dropped from the al
- * before it has actually been written on the peer.
- * if we crash before our peer knows about the request,
- * but after the extent has been dropped from the al,
- * we would forget to resync the corresponding extent.
- */
- if (s & RQ_IN_ACT_LOG) {
- if (get_ldev_if_state(device, D_FAILED)) {
- drbd_al_complete_io(device, &req->i);
- put_ldev(device);
- } else if (drbd_ratelimit()) {
- drbd_warn(device, "Should have called drbd_al_complete_io(, %llu, %u), "
- "but my Disk seems to have failed :(\n",
- (unsigned long long) req->i.sector, req->i.size);
- }
}
}
+}
- mempool_free(req, &drbd_request_mempool);
+static void drbd_req_oos_sent(struct drbd_request *req)
+{
+ struct drbd_device *device = req->device;
+ struct drbd_resource *resource = device->resource;
+ unsigned int s = req->local_rq_state;
+
+ lockdep_assert_held(&resource->state_rwlock);
+ lockdep_assert_irqs_disabled();
+
+ if (s & RQ_WRITE && req->i.size) {
+ struct drbd_resource *resource = device->resource;
+ struct drbd_request *peer_ack_req;
+
+ spin_lock(&resource->peer_ack_lock); /* local irq already disabled */
+ peer_ack_req = resource->peer_ack_req;
+ if (peer_ack_req) {
+ bool al_extent_last = test_bit(INTERVAL_AL_EXTENT_LAST, &req->i.flags);
+
+ if (peer_ack_differs(req, peer_ack_req) ||
+ (al_extent_last && atomic_read(&device->ap_actlog_cnt)) ||
+ peer_ack_window_full(req)) {
+ drbd_queue_peer_ack(resource, peer_ack_req);
+ peer_ack_req = NULL;
+ } else
+ kref_put(&peer_ack_req->kref, drbd_req_destroy);
+ }
+ resource->peer_ack_req = req;
+
+ if (!peer_ack_req)
+ resource->last_peer_acked_dagtag = req->dagtag_sector;
+ spin_unlock(&resource->peer_ack_lock);
+
+ mod_timer(&resource->peer_ack_timer,
+ jiffies + resource->res_opts.peer_ack_delay * HZ / 1000);
+ } else
+ kref_put(&req->kref, drbd_req_destroy);
}
-static void wake_all_senders(struct drbd_connection *connection)
+static void wake_all_senders(struct drbd_resource *resource)
{
- wake_up(&connection->sender_work.q_wait);
+ struct drbd_connection *connection;
+ /* We need make sure any update is visible before we wake up the
+ * threads that may check the values in their wait_event() condition.
+ * Do we need smp_mb here? Or rather switch to atomic_t? */
+ rcu_read_lock();
+ for_each_connection_rcu(connection, resource)
+ wake_up(&connection->sender_work.q_wait);
+ rcu_read_unlock();
}
-/* must hold resource->req_lock */
-void start_new_tl_epoch(struct drbd_connection *connection)
+bool start_new_tl_epoch(struct drbd_resource *resource)
{
+ unsigned long flags;
+ bool new_epoch_started;
+
+ spin_lock_irqsave(&resource->current_tle_lock, flags);
/* no point closing an epoch, if it is empty, anyways. */
- if (connection->current_tle_writes == 0)
- return;
+ if (resource->current_tle_writes == 0) {
+ new_epoch_started = false;
+ } else {
+ resource->current_tle_writes = 0;
+ atomic_inc(&resource->current_tle_nr);
+ wake_all_senders(resource);
+ new_epoch_started = true;
+ }
+ spin_unlock_irqrestore(&resource->current_tle_lock, flags);
- connection->current_tle_writes = 0;
- atomic_inc(&connection->current_tle_nr);
- wake_all_senders(connection);
+ return new_epoch_started;
}
void complete_master_bio(struct drbd_device *device,
struct bio_and_error *m)
{
+ int rw = bio_data_dir(m->bio);
if (unlikely(m->error))
m->bio->bi_status = errno_to_blk_status(m->error);
bio_endio(m->bio);
- dec_ap_bio(device);
+ dec_ap_bio(device, rw);
+}
+
+static void queue_conflicting_resync_write(
+ struct conflict_worker *submit_conflict, struct drbd_interval *i)
+{
+ struct drbd_peer_request *peer_req = container_of(i, struct drbd_peer_request, i);
+
+ list_add_tail(&peer_req->w.list, &submit_conflict->resync_writes);
+}
+
+static void queue_conflicting_resync_read(
+ struct conflict_worker *submit_conflict, struct drbd_interval *i)
+{
+ struct drbd_peer_request *peer_req = container_of(i, struct drbd_peer_request, i);
+
+ list_add_tail(&peer_req->w.list, &submit_conflict->resync_reads);
+}
+
+static void queue_conflicting_write(
+ struct conflict_worker *submit_conflict, struct drbd_interval *i)
+{
+ struct drbd_request *req = container_of(i, struct drbd_request, i);
+
+ list_add_tail(&req->list, &submit_conflict->writes);
+}
+
+static void queue_conflicting_peer_write(
+ struct conflict_worker *submit_conflict, struct drbd_interval *i)
+{
+ struct drbd_peer_request *peer_req = container_of(i, struct drbd_peer_request, i);
+
+ list_add_tail(&peer_req->w.list, &submit_conflict->peer_writes);
+}
+
+/* Queue any conflicting requests in this interval to be submitted. */
+void drbd_release_conflicts(struct drbd_device *device, struct drbd_interval *release_interval)
+{
+ struct conflict_worker *submit_conflict = &device->submit_conflict;
+ struct drbd_interval *i;
+ bool any_queued = false;
+
+ lockdep_assert_held(&device->interval_lock);
+
+ drbd_for_each_overlap(i, &device->requests, release_interval->sector, release_interval->size) {
+ if (test_bit(INTERVAL_SUBMITTED, &i->flags))
+ continue;
+
+ /* If we are waiting for a reply from the peer, then there is
+ * no need to process the conflict. */
+ if (test_bit(INTERVAL_READY_TO_SEND, &i->flags) &&
+ !test_bit(INTERVAL_RECEIVED, &i->flags))
+ continue;
+
+ dynamic_drbd_dbg(device,
+ "%s %s request at %llus+%u after conflict with %llus+%u\n",
+ test_bit(INTERVAL_SUBMIT_CONFLICT_QUEUED, &i->flags) ? "Already queued" : "Queue",
+ drbd_interval_type_str(i),
+ (unsigned long long) i->sector, i->size,
+ (unsigned long long) release_interval->sector, release_interval->size);
+
+ if (test_bit(INTERVAL_SUBMIT_CONFLICT_QUEUED, &i->flags))
+ continue;
+
+ /* Verify requests never wait for conflicting intervals. If
+ * there are no conflicts, they are marked direcly as
+ * submitted. Hence we should not see any here. */
+ if (unlikely(drbd_interval_is_verify(i))) {
+ if (drbd_ratelimit())
+ drbd_err(device, "Found verify request that was not yet submitted\n");
+ continue;
+ }
+
+ set_bit(INTERVAL_SUBMIT_CONFLICT_QUEUED, &i->flags);
+
+ spin_lock(&submit_conflict->lock);
+ /* Queue the request regardless of whether other conflicts
+ * remain. The conflict submitter will only actually submit the
+ * request if there are no conflicts. */
+ switch (i->type) {
+ case INTERVAL_LOCAL_WRITE:
+ queue_conflicting_write(submit_conflict, i);
+ break;
+ case INTERVAL_PEER_WRITE:
+ queue_conflicting_peer_write(submit_conflict, i);
+ break;
+ case INTERVAL_RESYNC_WRITE:
+ queue_conflicting_resync_write(submit_conflict, i);
+ break;
+ case INTERVAL_RESYNC_READ:
+ queue_conflicting_resync_read(submit_conflict, i);
+ break;
+ default:
+ BUG();
+ }
+ spin_unlock(&submit_conflict->lock);
+
+ any_queued = true;
+ }
+
+ if (any_queued)
+ queue_work(submit_conflict->wq, &submit_conflict->worker);
}
+void drbd_put_ref_tl_walk(struct drbd_request *req, int done_put, int oos_send_put)
+{
+ struct drbd_resource *resource = req->device->resource;
+
+ lockdep_assert_held(&resource->state_rwlock);
+
+ while (req) {
+ struct drbd_request *next_write;
+ bool done = false;
+ bool oos_sent = false;
+
+ if (done_put && refcount_sub_and_test(done_put, &req->done_ref)) {
+ done = true;
+ drbd_req_done(req);
+ oos_send_put++;
+ }
+
+ if (oos_send_put && refcount_sub_and_test(oos_send_put, &req->oos_send_ref))
+ oos_sent = true;
+
+ if (!done && !oos_sent)
+ break;
+
+ spin_lock(&resource->tl_update_lock); /* local irq already disabled */
+ next_write = req->next_write;
+ if (oos_sent) {
+ list_del_rcu(&req->tl_requests);
+ if (resource->tl_previous_write == req)
+ resource->tl_previous_write = NULL;
+ } else if (done) {
+ set_bit(INTERVAL_DONE, &req->i.flags);
+ }
+ spin_unlock(&resource->tl_update_lock);
+
+ if (oos_sent)
+ /* potentially destroy */
+ drbd_req_oos_sent(req);
+
+ req = next_write;
+ done_put = done ? 1 : 0;
+ oos_send_put = oos_sent ? 1 : 0;
+ }
+}
/* Helper for __req_mod().
* Set m->bio to the master bio, if it is fit to be completed,
@@ -192,30 +585,11 @@ void complete_master_bio(struct drbd_device *device,
static
void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m)
{
- const unsigned s = req->rq_state;
+ const unsigned s = req->local_rq_state;
struct drbd_device *device = req->device;
- int error, ok;
-
- /* we must not complete the master bio, while it is
- * still being processed by _drbd_send_zc_bio (drbd_send_dblock)
- * not yet acknowledged by the peer
- * not yet completed by the local io subsystem
- * these flags may get cleared in any order by
- * the worker,
- * the receiver,
- * the bio_endio completion callbacks.
- */
- if ((s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED)) ||
- (s & RQ_NET_QUEUED) || (s & RQ_NET_PENDING) ||
- (s & RQ_COMPLETION_SUSP)) {
- drbd_err(device, "drbd_req_complete: Logic BUG rq_state = 0x%x\n", s);
- return;
- }
-
- if (!req->master_bio) {
- drbd_err(device, "drbd_req_complete: Logic BUG, master_bio == NULL!\n");
- return;
- }
+ struct drbd_peer_device *peer_device;
+ unsigned long flags;
+ int error, ok = 0;
/*
* figure out whether to report success or failure.
@@ -230,69 +604,136 @@ void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m)
* local completion error, if any, has been stored as ERR_PTR
* in private_bio within drbd_request_endio.
*/
- ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
+ if (s & RQ_LOCAL_OK)
+ ++ok;
error = PTR_ERR(req->private_bio);
+ for_each_peer_device(peer_device, device) {
+ unsigned ns = req->net_rq_state[peer_device->node_id];
+ /* any net ok ok local ok is good enough to complete this bio as OK */
+ if (ns & RQ_NET_OK)
+ ++ok;
+ /* paranoia */
+ /* we must not complete the master bio, while it is
+ * still being processed by _drbd_send_zc_bio (drbd_send_dblock),
+ * respectively still needed for the second drbd_csum_bio() there.
+ * not yet acknowledged by the peer
+ * not yet completed by the local io subsystem
+ * these flags may get cleared in any order by
+ * the worker,
+ * the sender,
+ * the receiver,
+ * the bio_endio completion callbacks.
+ */
+ if (!(ns & RQ_NET_MASK))
+ continue;
+ if (ns & RQ_NET_PENDING_OOS)
+ continue;
+ if (!(ns & (RQ_NET_PENDING|RQ_NET_QUEUED)))
+ continue;
+
+ drbd_err(device,
+ "drbd_req_complete: Logic BUG rq_state: (0:%x, %d:%x), completion_ref = %d\n",
+ s, peer_device->node_id, ns, atomic_read(&req->completion_ref));
+ return;
+ }
+
+ /* more paranoia */
+ if (atomic_read(&req->completion_ref) ||
+ ((s & RQ_LOCAL_PENDING) && !(s & RQ_LOCAL_ABORTED))) {
+ drbd_err(device, "drbd_req_complete: Logic BUG rq_state: %x, completion_ref = %d\n",
+ s, atomic_read(&req->completion_ref));
+ return;
+ }
+
+ if (!req->master_bio) {
+ drbd_err(device, "drbd_req_complete: Logic BUG, master_bio == NULL!\n");
+ return;
+ }
+
/* Before we can signal completion to the upper layers,
* we may need to close the current transfer log epoch.
- * We are within the request lock, so we can simply compare
- * the request epoch number with the current transfer log
- * epoch number. If they match, increase the current_tle_nr,
- * and reset the transfer log epoch write_cnt.
+ * We simply compare the request epoch number with the current
+ * transfer log epoch number.
+ * With very specific timing, this may cause unnecessary barriers
+ * to be sent, but that is harmless.
+ *
+ * There is no need to close the transfer log epoch for empty flushes.
+ * The completion of the previous requests had the required effect on
+ * the peers already.
*/
- if (op_is_write(bio_op(req->master_bio)) &&
- req->epoch == atomic_read(&first_peer_device(device)->connection->current_tle_nr))
- start_new_tl_epoch(first_peer_device(device)->connection);
+ if (bio_data_dir(req->master_bio) == WRITE &&
+ likely(req->i.size != 0) &&
+ req->epoch == atomic_read(&device->resource->current_tle_nr))
+ start_new_tl_epoch(device->resource);
/* Update disk stats */
bio_end_io_acct(req->master_bio, req->start_jif);
- /* If READ failed,
- * have it be pushed back to the retry work queue,
- * so it will re-enter __drbd_make_request(),
- * and be re-assigned to a suitable local or remote path,
- * or failed if we do not have access to good data anymore.
- *
- * Unless it was failed early by __drbd_make_request(),
- * because no path was available, in which case
- * it was not even added to the transfer_log.
- *
- * read-ahead may fail, and will not be retried.
- *
- * WRITE should have used all available paths already.
- */
- if (!ok &&
- bio_op(req->master_bio) == REQ_OP_READ &&
- !(req->master_bio->bi_opf & REQ_RAHEAD) &&
- !list_empty(&req->tl_requests))
- req->rq_state |= RQ_POSTPONED;
-
- if (!(req->rq_state & RQ_POSTPONED)) {
- m->error = ok ? 0 : (error ?: -EIO);
+ if (device->cached_err_io) {
+ ok = 0;
+ req->local_rq_state &= ~RQ_POSTPONED;
+ } else if (!ok &&
+ bio_op(req->master_bio) == REQ_OP_READ &&
+ !(req->master_bio->bi_opf & REQ_RAHEAD) &&
+ !list_empty(&req->tl_requests)) {
+ /* If READ failed,
+ * have it be pushed back to the retry work queue,
+ * so it will re-enter __drbd_make_request(),
+ * and be re-assigned to a suitable local or remote path,
+ * or failed if we do not have access to good data anymore.
+ *
+ * Unless it was failed early by __drbd_make_request(),
+ * because no path was available, in which case
+ * it was not even added to the transfer_log.
+ *
+ * read-ahead may fail, and will not be retried.
+ *
+ * WRITE should have used all available paths already.
+ */
+ req->local_rq_state |= RQ_POSTPONED;
+ }
+
+ if (!(req->local_rq_state & RQ_POSTPONED)) {
+ struct drbd_resource *resource = device->resource;
+ bool quorum =
+ resource->res_opts.on_no_quorum == ONQ_IO_ERROR ?
+ resource->cached_all_devices_have_quorum : true;
+
+ m->error = ok && quorum ? 0 : (error ?: -EIO);
m->bio = req->master_bio;
req->master_bio = NULL;
- /* We leave it in the tree, to be able to verify later
- * write-acks in protocol != C during resync.
- * But we mark it as "complete", so it won't be counted as
- * conflict in a multi-primary setup. */
- req->i.completed = true;
+
+ if (req->local_rq_state & RQ_WRITE) {
+ spin_lock_irqsave(&device->interval_lock, flags);
+ /* We leave it in the tree, to be able to verify later
+ * write-acks in protocol != C during resync.
+ * But we mark it as "complete", so it won't be counted as
+ * conflict in a multi-primary setup.
+ */
+ set_bit(INTERVAL_COMPLETED, &req->i.flags);
+ drbd_release_conflicts(device, &req->i);
+ spin_unlock_irqrestore(&device->interval_lock, flags);
+ }
}
- if (req->i.waiting)
- wake_up(&device->misc_wait);
+ if (!(req->local_rq_state & RQ_WRITE))
+ drbd_remove_request_interval(&device->read_requests, req);
/* Either we are about to complete to upper layers,
* or we will restart this request.
* In either case, the request object will be destroyed soon,
* so better remove it from all lists. */
+ spin_lock_irqsave(&device->pending_completion_lock, flags);
list_del_init(&req->req_pending_master_completion);
+ spin_unlock_irqrestore(&device->pending_completion_lock, flags);
}
-/* still holds resource->req_lock */
static void drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and_error *m, int put)
{
- struct drbd_device *device = req->device;
- D_ASSERT(device, m || (req->rq_state & RQ_POSTPONED));
+ D_ASSERT(req->device, m || (req->local_rq_state & RQ_POSTPONED));
+
+ lockdep_assert_held(&req->device->resource->state_rwlock);
if (!put)
return;
@@ -304,229 +745,368 @@ static void drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and
/* local completion may still come in later,
* we need to keep the req object around. */
- if (req->rq_state & RQ_LOCAL_ABORTED)
+ if (req->local_rq_state & RQ_LOCAL_ABORTED)
return;
- if (req->rq_state & RQ_POSTPONED) {
+ if (req->local_rq_state & RQ_POSTPONED) {
/* don't destroy the req object just yet,
* but queue it for retry */
drbd_restart_request(req);
return;
}
- kref_put(&req->kref, drbd_req_destroy);
+ drbd_put_ref_tl_walk(req, 1, 0);
}
-static void set_if_null_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req)
+void drbd_set_pending_out_of_sync(struct drbd_peer_device *peer_device)
{
- struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
- if (!connection)
- return;
- if (connection->req_next == NULL)
- connection->req_next = req;
+ struct drbd_device *device = peer_device->device;
+ struct drbd_resource *resource = device->resource;
+ const int node_id = peer_device->node_id;
+ struct drbd_request *req;
+
+ rcu_read_lock();
+ list_for_each_entry_rcu(req, &resource->transfer_log, tl_requests) {
+ unsigned int local_rq_state, net_rq_state;
+
+ /*
+ * This is similar to the bitmap modification performed in
+ * drbd_req_done(), but simplified for this special case.
+ */
+
+ spin_lock_irq(&req->rq_lock);
+ local_rq_state = req->local_rq_state;
+ net_rq_state = req->net_rq_state[node_id];
+ spin_unlock_irq(&req->rq_lock);
+
+ if (!(local_rq_state & RQ_WRITE))
+ continue;
+
+ if ((local_rq_state & (RQ_POSTPONED|RQ_LOCAL_MASK)) == RQ_POSTPONED)
+ continue;
+
+ if (!req->i.size)
+ continue;
+
+ if (net_rq_state & RQ_NET_OK)
+ continue;
+
+ drbd_set_out_of_sync(peer_device, req->i.sector, req->i.size);
+ }
+ rcu_read_unlock();
}
-static void advance_conn_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req)
+static void advance_conn_req_next(struct drbd_connection *connection, struct drbd_request *req)
{
- struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
- struct drbd_request *iter = req;
- if (!connection)
+ struct drbd_request *found_req = NULL;
+ /* Only the sender thread comes here. No other caller context of req_mod() ever arrives here */
+ if (connection->todo.req_next != req)
return;
- if (connection->req_next != req)
- return;
-
- req = NULL;
- list_for_each_entry_continue(iter, &connection->transfer_log, tl_requests) {
- const unsigned int s = iter->rq_state;
+ rcu_read_lock();
+ list_for_each_entry_continue_rcu(req, &connection->resource->transfer_log, tl_requests) {
+ const unsigned s = req->net_rq_state[connection->peer_node_id];
- if (s & RQ_NET_QUEUED) {
- req = iter;
+ if (likely(s & RQ_NET_QUEUED)) {
+ found_req = req;
break;
}
}
- connection->req_next = req;
+ rcu_read_unlock();
+ connection->todo.req_next = found_req;
}
-static void set_if_null_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req)
+/**
+ * set_cache_ptr_if_null() - Set caching pointer to given request if not currently set.
+ * @connection: DRBD connection to operate on.
+ * @cache_ptr: Pointer to set.
+ * @req: Request to potentially set the pointer to.
+ *
+ * The caching pointer system is designed to track the oldest request in the
+ * transfer log fulfilling some condition. In particular, a combination of
+ * flags towards a given peer. This condition must guarantee that the request
+ * will not be destroyed.
+ *
+ * This system is implemented by set_cache_ptr_if_null() and
+ * advance_cache_ptr(). A request must be in the transfer log and fulfil the
+ * condition before set_cache_ptr_if_null() is called. If
+ * set_cache_ptr_if_null() is called before this request is in the transfer log
+ * or before it fulfils the condition, the pointer may be advanced past this
+ * request, or unset, which also has the effect of skipping the request.
+ *
+ * Once the condition is no longer fulfilled for a request, advance_cache_ptr()
+ * must be called. If the caching pointer currently points to this request,
+ * this will advance it to the next request fulfilling the condition.
+ *
+ * set_cache_ptr_if_null() may be called concurrently with advance_cache_ptr().
+ */
+static void set_cache_ptr_if_null(struct drbd_connection *connection,
+ struct drbd_request **cache_ptr, struct drbd_request *req)
{
- struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
- if (!connection)
- return;
- if (connection->req_ack_pending == NULL)
- connection->req_ack_pending = req;
+ spin_lock(&connection->advance_cache_ptr_lock); /* local IRQ already disabled */
+ if (*cache_ptr == NULL) {
+ smp_wmb(); /* make list_add_tail_rcu(req, transfer_log) visible before cache_ptr */
+ WRITE_ONCE(*cache_ptr, req);
+ }
+ spin_unlock(&connection->advance_cache_ptr_lock);
}
-static void advance_conn_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req)
+/* See set_cache_ptr_if_null(). */
+static void advance_cache_ptr(struct drbd_connection *connection,
+ struct drbd_request __rcu **cache_ptr, struct drbd_request *req,
+ unsigned int is_set, unsigned int is_clear)
{
- struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
- struct drbd_request *iter = req;
- if (!connection)
- return;
- if (connection->req_ack_pending != req)
- return;
+ struct drbd_request *old_req;
+ struct drbd_request *found_req = NULL;
- req = NULL;
- list_for_each_entry_continue(iter, &connection->transfer_log, tl_requests) {
- const unsigned int s = iter->rq_state;
+ /*
+ * Prevent concurrent updates of the same caching pointer. Otherwise if
+ * this function is called concurrently for a given caching pointer,
+ * the call for the older request may advance the pointer to the newer
+ * request, although the newer request has concurrently been modified
+ * such that it no longer fulfils the condition.
+ */
+ spin_lock(&connection->advance_cache_ptr_lock); /* local IRQ already disabled */
- if ((s & RQ_NET_SENT) && (s & RQ_NET_PENDING)) {
- req = iter;
+ rcu_read_lock();
+ old_req = rcu_dereference(*cache_ptr);
+ if (old_req != req) {
+ rcu_read_unlock();
+ spin_unlock(&connection->advance_cache_ptr_lock);
+ return;
+ }
+ list_for_each_entry_continue_rcu(req, &connection->resource->transfer_log, tl_requests) {
+ const unsigned s = READ_ONCE(req->net_rq_state[connection->peer_node_id]);
+ if (!(s & RQ_NET_MASK))
+ continue;
+ if (((s & is_set) == is_set) && !(s & is_clear)) {
+ found_req = req;
break;
}
}
- connection->req_ack_pending = req;
+
+ WRITE_ONCE(*cache_ptr, found_req);
+ rcu_read_unlock();
+
+ spin_unlock(&connection->advance_cache_ptr_lock);
}
-static void set_if_null_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req)
+/* for wsame, discard, and zero-out requests, the payload (amount of data we
+ * need to send) is much smaller than the number of storage sectors affected */
+static unsigned int req_payload_sectors(struct drbd_request *req)
{
- struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
- if (!connection)
- return;
- if (connection->req_not_net_done == NULL)
- connection->req_not_net_done = req;
+ /* actually: physical_block_size,
+ * but lets just hardcode 4k in sectors: */
+ if (unlikely(req->local_rq_state & RQ_WSAME))
+ return 8;
+ /* really only a few bytes, but let's pretend one sector */
+ if (unlikely(req->local_rq_state & (RQ_UNMAP|RQ_ZEROES)))
+ return 1;
+ /* other have all the data as payload on the wire */
+ return req->i.size >> 9;
}
-static void advance_conn_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req)
+static bool drbd_sender_needs_master_bio(unsigned int net_rq_state)
{
- struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
- struct drbd_request *iter = req;
- if (!connection)
- return;
- if (connection->req_not_net_done != req)
- return;
-
- req = NULL;
- list_for_each_entry_continue(iter, &connection->transfer_log, tl_requests) {
- const unsigned int s = iter->rq_state;
-
- if ((s & RQ_NET_SENT) && !(s & RQ_NET_DONE)) {
- req = iter;
- break;
- }
- }
- connection->req_not_net_done = req;
+ return (net_rq_state & RQ_NET_QUEUED) && !(net_rq_state & RQ_NET_DONE);
}
/* I'd like this to be the only place that manipulates
* req->completion_ref and req->kref. */
static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m,
+ struct drbd_peer_device *peer_device,
int clear, int set)
{
- struct drbd_device *device = req->device;
- struct drbd_peer_device *peer_device = first_peer_device(device);
- unsigned s = req->rq_state;
+ unsigned int old_local, old_net = 0, new_net = 0;
+ unsigned int set_local = set & RQ_STATE_0_MASK;
+ unsigned int clear_local = clear & RQ_STATE_0_MASK;
int c_put = 0;
-
- if (drbd_suspended(device) && !((s | clear) & RQ_COMPLETION_SUSP))
- set |= RQ_COMPLETION_SUSP;
+ int d_put = 0;
+ int o_put = 0;
+ const int idx = peer_device ? peer_device->node_id : -1;
+ struct drbd_connection *connection = NULL;
+ bool unchanged;
+
+ set &= ~RQ_STATE_0_MASK;
+ clear &= ~RQ_STATE_0_MASK;
+
+ if (idx == -1) {
+ /* do not try to manipulate net state bits
+ * without an associated state slot! */
+ BUG_ON(set);
+ BUG_ON(clear);
+ }
/* apply */
+ spin_lock(&req->rq_lock); /* local IRQ already disabled */
- req->rq_state &= ~clear;
- req->rq_state |= set;
+ old_local = req->local_rq_state;
+ req->local_rq_state &= ~clear_local;
+ req->local_rq_state |= set_local;
+
+ if (idx != -1) {
+ old_net = req->net_rq_state[idx];
+ new_net = (req->net_rq_state[idx] & ~clear) | set;
+ WRITE_ONCE(req->net_rq_state[idx], new_net);
+ connection = peer_device->connection;
+ }
/* no change? */
- if (req->rq_state == s)
+ unchanged = req->local_rq_state == old_local &&
+ (idx == -1 || req->net_rq_state[idx] == old_net);
+
+ if (unchanged) {
+ spin_unlock(&req->rq_lock);
return;
+ }
/* intent: get references */
- kref_get(&req->kref);
-
- if (!(s & RQ_LOCAL_PENDING) && (set & RQ_LOCAL_PENDING))
+ if (!(old_local & RQ_LOCAL_PENDING) && (set_local & RQ_LOCAL_PENDING))
atomic_inc(&req->completion_ref);
- if (!(s & RQ_NET_PENDING) && (set & RQ_NET_PENDING)) {
- inc_ap_pending(device);
+ if (!(old_net & RQ_NET_PENDING) && (set & RQ_NET_PENDING)) {
+ inc_ap_pending(peer_device);
atomic_inc(&req->completion_ref);
}
- if (!(s & RQ_NET_QUEUED) && (set & RQ_NET_QUEUED)) {
+ if (!(old_net & RQ_NET_QUEUED) && (set & RQ_NET_QUEUED)) {
+ /* Keep request on transfer log while queued for sender */
+ refcount_inc(&req->oos_send_ref);
+ }
+
+ if (!drbd_sender_needs_master_bio(old_net) && drbd_sender_needs_master_bio(new_net)) {
+ /*
+ * This completion ref is necessary to avoid premature
+ * completion in case a WRITE_ACKED_BY_PEER comes in before the
+ * sender can do HANDED_OVER_TO_NETWORK.
+ */
atomic_inc(&req->completion_ref);
- set_if_null_req_next(peer_device, req);
}
- if (!(s & RQ_EXP_BARR_ACK) && (set & RQ_EXP_BARR_ACK))
- kref_get(&req->kref); /* wait for the DONE */
+ if (!(old_net & RQ_NET_READY) && (set & RQ_NET_READY) &&
+ !(req->net_rq_state[idx] & RQ_NET_DONE))
+ set_cache_ptr_if_null(connection, &connection->req_not_net_done, req);
+
+ if (!(old_net & RQ_EXP_BARR_ACK) && (set & RQ_EXP_BARR_ACK))
+ refcount_inc(&req->done_ref); /* wait for the DONE */
- if (!(s & RQ_NET_SENT) && (set & RQ_NET_SENT)) {
+ if (!(old_net & RQ_NET_SENT) && (set & RQ_NET_SENT)) {
/* potentially already completed in the ack_receiver thread */
- if (!(s & RQ_NET_DONE)) {
- atomic_add(req->i.size >> 9, &device->ap_in_flight);
- set_if_null_req_not_net_done(peer_device, req);
- }
- if (req->rq_state & RQ_NET_PENDING)
- set_if_null_req_ack_pending(peer_device, req);
+ if (!(old_net & RQ_NET_DONE))
+ atomic_add(req_payload_sectors(req), &peer_device->connection->ap_in_flight);
+ if (req->net_rq_state[idx] & RQ_NET_PENDING)
+ set_cache_ptr_if_null(connection, &connection->req_ack_pending, req);
}
- if (!(s & RQ_COMPLETION_SUSP) && (set & RQ_COMPLETION_SUSP))
+ if (!(old_local & RQ_COMPLETION_SUSP) && (set_local & RQ_COMPLETION_SUSP))
atomic_inc(&req->completion_ref);
+ spin_unlock(&req->rq_lock);
+
/* progress: put references */
- if ((s & RQ_COMPLETION_SUSP) && (clear & RQ_COMPLETION_SUSP))
+ if ((old_local & RQ_COMPLETION_SUSP) && (clear_local & RQ_COMPLETION_SUSP))
++c_put;
- if (!(s & RQ_LOCAL_ABORTED) && (set & RQ_LOCAL_ABORTED)) {
- D_ASSERT(device, req->rq_state & RQ_LOCAL_PENDING);
+ if (!(old_local & RQ_LOCAL_ABORTED) && (set_local & RQ_LOCAL_ABORTED)) {
+ D_ASSERT(req->device, req->local_rq_state & RQ_LOCAL_PENDING);
++c_put;
}
- if ((s & RQ_LOCAL_PENDING) && (clear & RQ_LOCAL_PENDING)) {
- if (req->rq_state & RQ_LOCAL_ABORTED)
- kref_put(&req->kref, drbd_req_destroy);
+ if ((old_local & RQ_LOCAL_PENDING) && (clear_local & RQ_LOCAL_PENDING)) {
+ struct drbd_device *device = req->device;
+
+ if (req->local_rq_state & RQ_LOCAL_ABORTED)
+ ++d_put;
else
++c_put;
+ spin_lock(&device->pending_completion_lock); /* local irq already disabled */
list_del_init(&req->req_pending_local);
+ spin_unlock(&device->pending_completion_lock);
}
- if ((s & RQ_NET_PENDING) && (clear & RQ_NET_PENDING)) {
- dec_ap_pending(device);
+ if ((old_net & RQ_NET_PENDING) && (clear & RQ_NET_PENDING)) {
+ dec_ap_pending(peer_device);
++c_put;
- req->acked_jif = jiffies;
- advance_conn_req_ack_pending(peer_device, req);
+ ktime_get_accounting(req->acked_kt[peer_device->node_id]);
+ advance_cache_ptr(connection, &connection->req_ack_pending,
+ req, RQ_NET_SENT | RQ_NET_PENDING, 0);
}
- if ((s & RQ_NET_QUEUED) && (clear & RQ_NET_QUEUED)) {
- ++c_put;
- advance_conn_req_next(peer_device, req);
+ if ((old_net & RQ_NET_QUEUED) && (clear & RQ_NET_QUEUED)) {
+ ++o_put;
+ advance_conn_req_next(connection, req);
}
- if (!(s & RQ_NET_DONE) && (set & RQ_NET_DONE)) {
- if (s & RQ_NET_SENT)
- atomic_sub(req->i.size >> 9, &device->ap_in_flight);
- if (s & RQ_EXP_BARR_ACK)
- kref_put(&req->kref, drbd_req_destroy);
- req->net_done_jif = jiffies;
+ if (drbd_sender_needs_master_bio(old_net) && !drbd_sender_needs_master_bio(new_net))
+ ++c_put;
- /* in ahead/behind mode, or just in case,
- * before we finally destroy this request,
- * the caching pointers must not reference it anymore */
- advance_conn_req_next(peer_device, req);
- advance_conn_req_ack_pending(peer_device, req);
- advance_conn_req_not_net_done(peer_device, req);
- }
+ if (!(old_net & RQ_NET_DONE) && (set & RQ_NET_DONE)) {
+ if (old_net & RQ_NET_SENT)
+ atomic_sub(req_payload_sectors(req),
+ &peer_device->connection->ap_in_flight);
+ if (old_net & RQ_EXP_BARR_ACK)
+ ++d_put;
+ ktime_get_accounting(req->net_done_kt[peer_device->node_id]);
- /* potentially complete and destroy */
+ advance_cache_ptr(connection, &connection->req_not_net_done,
+ req, 0, RQ_NET_DONE);
+ }
- /* If we made progress, retry conflicting peer requests, if any. */
- if (req->i.waiting)
- wake_up(&device->misc_wait);
+ if ((old_net & RQ_NET_PENDING_OOS) && (clear & RQ_NET_PENDING_OOS)) {
+ if (peer_device->repl_state[NOW] == L_AHEAD &&
+ atomic_read(&peer_device->connection->ap_in_flight) == 0) {
+ struct drbd_peer_device *pd;
+ int vnr;
+ /* The first peer device to notice that it is time to
+ * go Ahead -> SyncSource tries to trigger that
+ * transition for *all* peer devices currently in
+ * L_AHEAD for this connection. */
+ idr_for_each_entry(&peer_device->connection->peer_devices, pd, vnr) {
+ if (pd->repl_state[NOW] != L_AHEAD)
+ continue;
+ if (test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &pd->flags))
+ continue; /* already done */
+ pd->start_resync_side = L_SYNC_SOURCE;
+ mod_timer(&pd->start_resync_timer, jiffies + HZ);
+ }
+ }
+ }
+ /* potentially complete and destroy */
drbd_req_put_completion_ref(req, m, c_put);
- kref_put(&req->kref, drbd_req_destroy);
+
+ /* req cannot have been destroyed if there are still references */
+ if (d_put || o_put)
+ /* potentially destroy */
+ drbd_put_ref_tl_walk(req, d_put, o_put);
}
static void drbd_report_io_error(struct drbd_device *device, struct drbd_request *req)
{
- if (!drbd_ratelimit())
+ if (!drbd_device_ratelimit(device, BACKEND))
return;
drbd_warn(device, "local %s IO error sector %llu+%u on %pg\n",
- (req->rq_state & RQ_WRITE) ? "WRITE" : "READ",
- (unsigned long long)req->i.sector,
- req->i.size >> 9,
- device->ldev->backing_bdev);
+ (req->local_rq_state & RQ_WRITE) ? "WRITE" : "READ",
+ (unsigned long long)req->i.sector,
+ req->i.size >> 9,
+ device->ldev->backing_bdev);
+}
+
+static int drbd_protocol_state_bits(struct drbd_connection *connection)
+{
+ struct net_conf *nc;
+ int p;
+
+ rcu_read_lock();
+ nc = rcu_dereference(connection->transport.net_conf);
+ p = nc->wire_protocol;
+ rcu_read_unlock();
+
+ return p == DRBD_PROT_C ? RQ_EXP_WRITE_ACK :
+ p == DRBD_PROT_B ? RQ_EXP_RECEIVE_ACK : 0;
+
}
/* Helper for HANDED_OVER_TO_NETWORK.
@@ -535,11 +1115,12 @@ static void drbd_report_io_error(struct drbd_device *device, struct drbd_request
* --> If so, clear PENDING and set NET_OK below.
* If it is a protocol A write, but not RQ_PENDING anymore, neg-ack was faster
* (and we must not set RQ_NET_OK) */
-static inline bool is_pending_write_protocol_A(struct drbd_request *req)
+static inline bool is_pending_write_protocol_A(struct drbd_request *req, int idx)
{
- return (req->rq_state &
- (RQ_WRITE|RQ_NET_PENDING|RQ_EXP_WRITE_ACK|RQ_EXP_RECEIVE_ACK))
- == (RQ_WRITE|RQ_NET_PENDING);
+ return (req->local_rq_state & RQ_WRITE) == 0 ? 0 :
+ (req->net_rq_state[idx] &
+ (RQ_NET_PENDING|RQ_EXP_WRITE_ACK|RQ_EXP_RECEIVE_ACK))
+ == RQ_NET_PENDING;
}
/* obviously this could be coded as many single functions
@@ -550,95 +1131,76 @@ static inline bool is_pending_write_protocol_A(struct drbd_request *req)
* but having it this way
* enforces that it is all in this one place, where it is easier to audit,
* it makes it obvious that whatever "event" "happens" to a request should
- * happen "atomically" within the req_lock,
+ * happen with the state_rwlock read lock held,
* and it enforces that we have to think in a very structured manner
* about the "events" that may happen to a request during its life time ...
*
*
* peer_device == NULL means local disk
*/
-int __req_mod(struct drbd_request *req, enum drbd_req_event what,
+void __req_mod(struct drbd_request *req, enum drbd_req_event what,
struct drbd_peer_device *peer_device,
struct bio_and_error *m)
{
- struct drbd_device *const device = req->device;
- struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
+ struct drbd_device *device = req->device;
struct net_conf *nc;
- int p, rv = 0;
+ unsigned long flags;
+ int p;
+ int idx;
+
+ lockdep_assert_held(&device->resource->state_rwlock);
if (m)
m->bio = NULL;
+ idx = peer_device ? peer_device->node_id : -1;
+
switch (what) {
default:
drbd_err(device, "LOGIC BUG in %s:%u\n", __FILE__ , __LINE__);
break;
- /* does not happen...
- * initialization done in drbd_req_new
- case CREATED:
- break;
- */
-
- case TO_BE_SENT: /* via network */
- /* reached via __drbd_make_request
- * and from w_read_retry_remote */
- D_ASSERT(device, !(req->rq_state & RQ_NET_MASK));
- rcu_read_lock();
- nc = rcu_dereference(connection->net_conf);
- p = nc->wire_protocol;
- rcu_read_unlock();
- req->rq_state |=
- p == DRBD_PROT_C ? RQ_EXP_WRITE_ACK :
- p == DRBD_PROT_B ? RQ_EXP_RECEIVE_ACK : 0;
- mod_rq_state(req, m, 0, RQ_NET_PENDING);
- break;
-
case TO_BE_SUBMITTED: /* locally */
/* reached via __drbd_make_request */
- D_ASSERT(device, !(req->rq_state & RQ_LOCAL_MASK));
- mod_rq_state(req, m, 0, RQ_LOCAL_PENDING);
+ D_ASSERT(device, !(req->local_rq_state & RQ_LOCAL_MASK));
+ mod_rq_state(req, m, peer_device, 0, RQ_LOCAL_PENDING);
break;
case COMPLETED_OK:
- if (req->rq_state & RQ_WRITE)
+ if (req->local_rq_state & RQ_WRITE)
device->writ_cnt += req->i.size >> 9;
else
device->read_cnt += req->i.size >> 9;
- mod_rq_state(req, m, RQ_LOCAL_PENDING,
+ mod_rq_state(req, m, peer_device, RQ_LOCAL_PENDING,
RQ_LOCAL_COMPLETED|RQ_LOCAL_OK);
break;
case ABORT_DISK_IO:
- mod_rq_state(req, m, 0, RQ_LOCAL_ABORTED);
+ mod_rq_state(req, m, peer_device, 0, RQ_LOCAL_ABORTED);
break;
case WRITE_COMPLETED_WITH_ERROR:
drbd_report_io_error(device, req);
- __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
- mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
+ mod_rq_state(req, m, peer_device, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
break;
case READ_COMPLETED_WITH_ERROR:
- drbd_set_out_of_sync(first_peer_device(device),
- req->i.sector, req->i.size);
+ drbd_set_all_out_of_sync(device, req->i.sector, req->i.size);
drbd_report_io_error(device, req);
- __drbd_chk_io_error(device, DRBD_READ_ERROR);
fallthrough;
case READ_AHEAD_COMPLETED_WITH_ERROR:
- /* it is legal to fail read-ahead, no __drbd_chk_io_error in that case. */
- mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
+ mod_rq_state(req, m, peer_device, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
break;
case DISCARD_COMPLETED_NOTSUPP:
case DISCARD_COMPLETED_WITH_ERROR:
/* I'd rather not detach from local disk just because it
* failed a REQ_OP_DISCARD. */
- mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
+ mod_rq_state(req, m, peer_device, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
break;
- case QUEUE_FOR_NET_READ:
+ case NEW_NET_READ:
/* READ, and
* no local disk,
* or target area marked as invalid,
@@ -650,27 +1212,19 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
* Corresponding drbd_remove_request_interval is in
* drbd_req_complete() */
D_ASSERT(device, drbd_interval_empty(&req->i));
+ spin_lock_irqsave(&device->interval_lock, flags);
drbd_insert_interval(&device->read_requests, &req->i);
+ spin_unlock_irqrestore(&device->interval_lock, flags);
- set_bit(UNPLUG_REMOTE, &device->flags);
-
- D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
- D_ASSERT(device, (req->rq_state & RQ_LOCAL_MASK) == 0);
- mod_rq_state(req, m, 0, RQ_NET_QUEUED);
- req->w.cb = w_send_read_req;
- drbd_queue_work(&connection->sender_work,
- &req->w);
+ D_ASSERT(device, !(req->net_rq_state[idx] & RQ_NET_MASK));
+ D_ASSERT(device, !(req->local_rq_state & RQ_LOCAL_MASK));
+ mod_rq_state(req, m, peer_device, 0, RQ_NET_PENDING|RQ_NET_QUEUED);
break;
- case QUEUE_FOR_NET_WRITE:
+ case NEW_NET_WRITE:
/* assert something? */
/* from __drbd_make_request only */
- /* Corresponding drbd_remove_request_interval is in
- * drbd_req_complete() */
- D_ASSERT(device, drbd_interval_empty(&req->i));
- drbd_insert_interval(&device->write_requests, &req->i);
-
/* NOTE
* In case the req ended up on the transfer log before being
* queued on the worker, it could lead to this request being
@@ -685,85 +1239,109 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
*
* Add req to the (now) current epoch (barrier). */
- /* otherwise we may lose an unplug, which may cause some remote
- * io-scheduler timeout to expire, increasing maximum latency,
- * hurting performance. */
- set_bit(UNPLUG_REMOTE, &device->flags);
+ D_ASSERT(device, !(req->net_rq_state[idx] & RQ_NET_MASK));
/* queue work item to send data */
- D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
- mod_rq_state(req, m, 0, RQ_NET_QUEUED|RQ_EXP_BARR_ACK);
- req->w.cb = w_send_dblock;
- drbd_queue_work(&connection->sender_work,
- &req->w);
+ mod_rq_state(req, m, peer_device, 0, RQ_NET_PENDING|RQ_NET_QUEUED|RQ_EXP_BARR_ACK|
+ drbd_protocol_state_bits(peer_device->connection));
+
+ /* Close the epoch, in case it outgrew the limit.
+ * Or if this is a "batch bio", and some of our peers is "old",
+ * because a batch bio "storm" (like, large scale discarding
+ * during mkfs time) would be likely to starve out the peers
+ * activity log, if it is smaller than ours (or we don't have
+ * any). And a fix for the resulting potential distributed
+ * deadlock was only implemented with P_CONFIRM_STABLE with
+ * protocol version 114.
+ */
+ if (device->resource->cached_min_aggreed_protocol_version < 114 &&
+ (req->local_rq_state & (RQ_UNMAP|RQ_WSAME|RQ_ZEROES)))
+ p = 1;
+ else {
+ rcu_read_lock();
+ nc = rcu_dereference(peer_device->connection->transport.net_conf);
+ p = nc->max_epoch_size;
+ rcu_read_unlock();
+ }
+ if (device->resource->current_tle_writes >= p)
+ start_new_tl_epoch(device->resource);
+ break;
- /* close the epoch, in case it outgrew the limit */
- rcu_read_lock();
- nc = rcu_dereference(connection->net_conf);
- p = nc->max_epoch_size;
- rcu_read_unlock();
- if (connection->current_tle_writes >= p)
- start_new_tl_epoch(connection);
+ case NEW_NET_OOS:
+ /* We will just send P_OUT_OF_SYNC to this peer. The request is
+ * "done" from the start in the sense that everything necessary
+ * in the data stage has been done.
+ */
+ mod_rq_state(req, m, peer_device, 0, RQ_NET_PENDING_OOS|RQ_NET_QUEUED|RQ_NET_DONE);
+ break;
+ case READY_FOR_NET:
+ mod_rq_state(req, m, peer_device, 0, RQ_NET_READY);
break;
- case QUEUE_FOR_SEND_OOS:
- mod_rq_state(req, m, 0, RQ_NET_QUEUED);
- req->w.cb = w_send_out_of_sync;
- drbd_queue_work(&connection->sender_work,
- &req->w);
+ case SKIP_OOS:
+ mod_rq_state(req, m, peer_device, RQ_NET_PENDING_OOS, RQ_NET_READY);
break;
- case READ_RETRY_REMOTE_CANCELED:
+ case OOS_HANDED_TO_NETWORK:
case SEND_CANCELED:
case SEND_FAILED:
- /* real cleanup will be done from tl_clear. just update flags
- * so it is no longer marked as on the worker queue */
- mod_rq_state(req, m, RQ_NET_QUEUED, 0);
+ /* Sending P_OUT_OF_SYNC is irrelevant if the connection was
+ * lost. Hence, when the intention was to send P_OUT_OF_SYNC,
+ * the effect of successfully sending the packet and connection
+ * loss are the same.
+ *
+ * Otherwise just update flags so it is no longer marked as on
+ * the sender queue; real cleanup will be done from
+ * tl_walk(,CONNECTION_LOST*).
+ */
+ mod_rq_state(req, m, peer_device, RQ_NET_PENDING_OOS|RQ_NET_QUEUED, 0);
break;
case HANDED_OVER_TO_NETWORK:
/* assert something? */
- if (is_pending_write_protocol_A(req))
+ if (is_pending_write_protocol_A(req, idx))
/* this is what is dangerous about protocol A:
* pretend it was successfully written on the peer. */
- mod_rq_state(req, m, RQ_NET_QUEUED|RQ_NET_PENDING,
- RQ_NET_SENT|RQ_NET_OK);
+ mod_rq_state(req, m, peer_device, RQ_NET_QUEUED|RQ_NET_PENDING,
+ RQ_NET_SENT|RQ_NET_OK);
else
- mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_SENT);
+ mod_rq_state(req, m, peer_device, RQ_NET_QUEUED, RQ_NET_SENT);
/* It is still not yet RQ_NET_DONE until the
* corresponding epoch barrier got acked as well,
* so we know what to dirty on connection loss. */
break;
- case OOS_HANDED_TO_NETWORK:
- /* Was not set PENDING, no longer QUEUED, so is now DONE
- * as far as this connection is concerned. */
- mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_DONE);
- break;
-
- case CONNECTION_LOST_WHILE_PENDING:
- /* transfer log cleanup after connection loss */
- mod_rq_state(req, m,
- RQ_NET_OK|RQ_NET_PENDING|RQ_COMPLETION_SUSP,
- RQ_NET_DONE);
- break;
+ case CONNECTION_LOST:
+ case CONNECTION_LOST_WHILE_SUSPENDED:
+ /* Only apply to requests that were for this peer but not done. */
+ if (!(req->net_rq_state[idx] & RQ_NET_MASK) || req->net_rq_state[idx] & RQ_NET_DONE)
+ break;
- case CONFLICT_RESOLVED:
- /* for superseded conflicting writes of multiple primaries,
- * there is no need to keep anything in the tl, potential
- * node crashes are covered by the activity log.
+ /* For protocol A, or when not suspended, we consider the
+ * request to be lost towards this peer.
+ *
+ * Protocol B&C requests are kept while suspended because
+ * resending is allowed. If such a request is pending to this
+ * peer, we suspend its completion until IO is resumed. This is
+ * a conservative simplification. We could complete it while
+ * suspended once we know it has been received by "enough"
+ * peers. However, we do not track that.
*
- * If this request had been marked as RQ_POSTPONED before,
- * it will actually not be completed, but "restarted",
- * resubmitted from the retry worker context. */
- D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
- D_ASSERT(device, req->rq_state & RQ_EXP_WRITE_ACK);
- mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_DONE|RQ_NET_OK);
+ * If the request is no longer pending to this peer, then we
+ * have already received the corresponding ack. The request may
+ * complete as far as this peer is concerned. */
+ if (what == CONNECTION_LOST ||
+ !(req->net_rq_state[idx] & (RQ_EXP_RECEIVE_ACK|RQ_EXP_WRITE_ACK)))
+ mod_rq_state(req, m, peer_device, RQ_NET_PENDING|RQ_NET_OK, RQ_NET_DONE);
+ else if (req->net_rq_state[idx] & RQ_NET_PENDING)
+ mod_rq_state(req, m, peer_device, 0, RQ_COMPLETION_SUSP);
break;
case WRITE_ACKED_BY_PEER_AND_SIS:
- req->rq_state |= RQ_NET_SIS;
+ spin_lock_irqsave(&req->rq_lock, flags);
+ req->net_rq_state[idx] |= RQ_NET_SIS;
+ spin_unlock_irqrestore(&req->rq_lock, flags);
fallthrough;
case WRITE_ACKED_BY_PEER:
/* Normal operation protocol C: successfully written on peer.
@@ -775,155 +1353,162 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
* for volatile write-back caches on lower level devices. */
goto ack_common;
case RECV_ACKED_BY_PEER:
- D_ASSERT(device, req->rq_state & RQ_EXP_RECEIVE_ACK);
+ D_ASSERT(device, req->net_rq_state[idx] & RQ_EXP_RECEIVE_ACK);
/* protocol B; pretends to be successfully written on peer.
* see also notes above in HANDED_OVER_TO_NETWORK about
* protocol != C */
ack_common:
- mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK);
- break;
-
- case POSTPONE_WRITE:
- D_ASSERT(device, req->rq_state & RQ_EXP_WRITE_ACK);
- /* If this node has already detected the write conflict, the
- * worker will be waiting on misc_wait. Wake it up once this
- * request has completed locally.
- */
- D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
- req->rq_state |= RQ_POSTPONED;
- if (req->i.waiting)
- wake_up(&device->misc_wait);
- /* Do not clear RQ_NET_PENDING. This request will make further
- * progress via restart_conflicting_writes() or
- * fail_postponed_requests(). Hopefully. */
+ mod_rq_state(req, m, peer_device, RQ_NET_PENDING, RQ_NET_OK);
break;
case NEG_ACKED:
- mod_rq_state(req, m, RQ_NET_OK|RQ_NET_PENDING, 0);
+ mod_rq_state(req, m, peer_device, RQ_NET_OK|RQ_NET_PENDING,
+ (req->local_rq_state & RQ_WRITE) ? 0 : RQ_NET_DONE);
break;
- case FAIL_FROZEN_DISK_IO:
- if (!(req->rq_state & RQ_LOCAL_COMPLETED))
- break;
- mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0);
+ case COMPLETION_RESUMED:
+ mod_rq_state(req, m, peer_device, RQ_COMPLETION_SUSP, 0);
break;
- case RESTART_FROZEN_DISK_IO:
- if (!(req->rq_state & RQ_LOCAL_COMPLETED))
+ case CANCEL_SUSPENDED_IO:
+ /* Only apply to requests that were for this peer but not done. */
+ if (!(req->net_rq_state[idx] & RQ_NET_MASK) || req->net_rq_state[idx] & RQ_NET_DONE)
break;
- mod_rq_state(req, m,
- RQ_COMPLETION_SUSP|RQ_LOCAL_COMPLETED,
- RQ_LOCAL_PENDING);
-
- rv = MR_READ;
- if (bio_data_dir(req->master_bio) == WRITE)
- rv = MR_WRITE;
-
- get_ldev(device); /* always succeeds in this call path */
- req->w.cb = w_restart_disk_io;
- drbd_queue_work(&connection->sender_work,
- &req->w);
+ /* CONNECTION_LOST_WHILE_SUSPENDED followed by
+ * CANCEL_SUSPENDED_IO should be essentially the same as
+ * CONNECTION_LOST. Make the corresponding changes. The
+ * RQ_COMPLETION_SUSP flag is handled by COMPLETION_RESUMED. */
+ mod_rq_state(req, m, peer_device, RQ_NET_PENDING|RQ_NET_OK, RQ_NET_DONE);
break;
case RESEND:
- /* Simply complete (local only) READs. */
- if (!(req->rq_state & RQ_WRITE) && !req->w.cb) {
- mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0);
- break;
- }
-
/* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK
before the connection loss (B&C only); only P_BARRIER_ACK
(or the local completion?) was missing when we suspended.
Throwing them out of the TL here by pretending we got a BARRIER_ACK.
- During connection handshake, we ensure that the peer was not rebooted. */
- if (!(req->rq_state & RQ_NET_OK)) {
- /* FIXME could this possibly be a req->dw.cb == w_send_out_of_sync?
- * in that case we must not set RQ_NET_PENDING. */
-
- mod_rq_state(req, m, RQ_COMPLETION_SUSP, RQ_NET_QUEUED|RQ_NET_PENDING);
- if (req->w.cb) {
- /* w.cb expected to be w_send_dblock, or w_send_read_req */
- drbd_queue_work(&connection->sender_work,
- &req->w);
- rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
- } /* else: FIXME can this happen? */
+ During connection handshake, we ensure that the peer was not rebooted.
+
+ Protocol A requests always have RQ_NET_OK removed when the
+ connection is lost, so this will never apply to them.
+
+ Resending is only allowed on synchronous connections,
+ where all requests not yet completed to upper layers would
+ be in the same "reorder-domain", there can not possibly be
+ any dependency between incomplete requests, and we are
+ allowed to complete this one "out-of-sequence".
+ */
+ if (req->net_rq_state[idx] & RQ_NET_OK)
+ goto barrier_acked;
+
+ /* Only apply to requests that are pending a response from
+ * this peer. */
+ if (!(req->net_rq_state[idx] & RQ_NET_PENDING))
break;
- }
- fallthrough; /* to BARRIER_ACKED */
+
+ D_ASSERT(device, !(req->net_rq_state[idx] & RQ_NET_QUEUED));
+ mod_rq_state(req, m, peer_device, RQ_NET_SENT, RQ_NET_QUEUED);
+ break;
case BARRIER_ACKED:
+barrier_acked:
/* barrier ack for READ requests does not make sense */
- if (!(req->rq_state & RQ_WRITE))
+ if (!(req->local_rq_state & RQ_WRITE))
break;
- if (req->rq_state & RQ_NET_PENDING) {
+ if (req->net_rq_state[idx] & RQ_NET_PENDING) {
/* barrier came in before all requests were acked.
* this is bad, because if the connection is lost now,
* we won't be able to clean them up... */
drbd_err(device, "FIXME (BARRIER_ACKED but pending)\n");
+ mod_rq_state(req, m, peer_device, RQ_NET_PENDING, RQ_NET_OK);
}
- /* Allowed to complete requests, even while suspended.
- * As this is called for all requests within a matching epoch,
+ /* As this is called for all requests within a matching epoch,
* we need to filter, and only set RQ_NET_DONE for those that
* have actually been on the wire. */
- mod_rq_state(req, m, RQ_COMPLETION_SUSP,
- (req->rq_state & RQ_NET_MASK) ? RQ_NET_DONE : 0);
+ if (req->net_rq_state[idx] & RQ_NET_MASK)
+ mod_rq_state(req, m, peer_device, 0, RQ_NET_DONE);
break;
case DATA_RECEIVED:
- D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
- mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK|RQ_NET_DONE);
+ D_ASSERT(device, req->net_rq_state[idx] & RQ_NET_PENDING);
+ mod_rq_state(req, m, peer_device, RQ_NET_PENDING, RQ_NET_OK|RQ_NET_DONE);
break;
- case QUEUE_AS_DRBD_BARRIER:
- start_new_tl_epoch(connection);
- mod_rq_state(req, m, 0, RQ_NET_OK|RQ_NET_DONE);
+ case BARRIER_SENT:
+ mod_rq_state(req, m, peer_device, 0, RQ_NET_OK|RQ_NET_DONE);
break;
}
-
- return rv;
}
/* we may do a local read if:
* - we are consistent (of course),
* - or we are generally inconsistent,
- * BUT we are still/already IN SYNC for this area.
+ * BUT we are still/already IN SYNC with all peers for this area.
* since size may be bigger than BM_BLOCK_SIZE,
* we may need to check several bits.
*/
static bool drbd_may_do_local_read(struct drbd_device *device, sector_t sector, int size)
{
+ struct drbd_md *md = &device->ldev->md;
+ struct drbd_bitmap *bm;
+ unsigned int node_id;
+ unsigned int n_checked = 0;
+
unsigned long sbnr, ebnr;
sector_t esector, nr_sectors;
- if (device->state.disk == D_UP_TO_DATE)
+ if (device->disk_state[NOW] == D_UP_TO_DATE)
return true;
- if (device->state.disk != D_INCONSISTENT)
+ if (device->disk_state[NOW] != D_INCONSISTENT)
return false;
esector = sector + (size >> 9) - 1;
nr_sectors = get_capacity(device->vdisk);
D_ASSERT(device, sector < nr_sectors);
D_ASSERT(device, esector < nr_sectors);
- sbnr = BM_SECT_TO_BIT(sector);
- ebnr = BM_SECT_TO_BIT(esector);
+ bm = device->bitmap;
+ if (!bm)
+ return true;
+
+ sbnr = bm_sect_to_bit(bm, sector);
+ ebnr = bm_sect_to_bit(bm, esector);
+
+ for (node_id = 0; node_id < DRBD_NODE_ID_MAX; node_id++) {
+ struct drbd_peer_md *peer_md = &md->peers[node_id];
+
+ /* Skip bitmap indexes which are not assigned to a peer. */
+ if (!(peer_md->flags & MDF_HAVE_BITMAP))
+ continue;
- return drbd_bm_count_bits(device, sbnr, ebnr) == 0;
+ if (drbd_bm_count_bits(device, peer_md->bitmap_index, sbnr, ebnr))
+ return false;
+ ++n_checked;
+ }
+ if (n_checked == 0) {
+ drbd_err_ratelimit(device, "No valid bitmap slots found to check!\n");
+ return false;
+ }
+ return true;
}
-static bool remote_due_to_read_balancing(struct drbd_device *device, sector_t sector,
+/* TODO improve for more than one peer.
+ * also take into account the drbd protocol. */
+static bool remote_due_to_read_balancing(struct drbd_device *device,
+ struct drbd_peer_device *peer_device, sector_t sector,
enum drbd_read_balancing rbm)
{
int stripe_shift;
switch (rbm) {
case RB_CONGESTED_REMOTE:
+ /* originally, this used the bdi congestion framework,
+ * but that was removed in linux 5.18.
+ * so just never report the lower device as congested. */
return false;
case RB_LEAST_PENDING:
return atomic_read(&device->local_cnt) >
- atomic_read(&device->ap_pending_cnt) + atomic_read(&device->rs_pending_cnt);
+ atomic_read(&peer_device->ap_pending_cnt) + atomic_read(&peer_device->rs_pending_cnt);
case RB_32K_STRIPING: /* stripe_shift = 15 */
case RB_64K_STRIPING:
case RB_128K_STRIPING:
@@ -942,63 +1527,32 @@ static bool remote_due_to_read_balancing(struct drbd_device *device, sector_t se
}
}
-/*
- * complete_conflicting_writes - wait for any conflicting write requests
- *
- * The write_requests tree contains all active write requests which we
- * currently know about. Wait for any requests to complete which conflict with
- * the new one.
- *
- * Only way out: remove the conflicting intervals from the tree.
- */
-static void complete_conflicting_writes(struct drbd_request *req)
-{
- DEFINE_WAIT(wait);
- struct drbd_device *device = req->device;
- struct drbd_interval *i;
- sector_t sector = req->i.sector;
- int size = req->i.size;
-
- for (;;) {
- drbd_for_each_overlap(i, &device->write_requests, sector, size) {
- /* Ignore, if already completed to upper layers. */
- if (i->completed)
- continue;
- /* Handle the first found overlap. After the schedule
- * we have to restart the tree walk. */
- break;
- }
- if (!i) /* if any */
- break;
-
- /* Indicate to wake up device->misc_wait on progress. */
- prepare_to_wait(&device->misc_wait, &wait, TASK_UNINTERRUPTIBLE);
- i->waiting = true;
- spin_unlock_irq(&device->resource->req_lock);
- schedule();
- spin_lock_irq(&device->resource->req_lock);
- }
- finish_wait(&device->misc_wait, &wait);
-}
-
-/* called within req_lock */
-static void maybe_pull_ahead(struct drbd_device *device)
+static void __maybe_pull_ahead(struct drbd_device *device, struct drbd_connection *connection)
{
- struct drbd_connection *connection = first_peer_device(device)->connection;
struct net_conf *nc;
bool congested = false;
enum drbd_on_congestion on_congestion;
+ u32 cong_fill = 0, cong_extents = 0;
+ struct drbd_peer_device *peer_device = conn_peer_device(connection, device->vnr);
- rcu_read_lock();
- nc = rcu_dereference(connection->net_conf);
- on_congestion = nc ? nc->on_congestion : OC_BLOCK;
- rcu_read_unlock();
- if (on_congestion == OC_BLOCK ||
- connection->agreed_pro_version < 96)
+ lockdep_assert_held(&device->resource->state_rwlock);
+
+ if (connection->agreed_pro_version < 96)
+ return;
+
+ nc = rcu_dereference(connection->transport.net_conf);
+ if (nc) {
+ on_congestion = nc->on_congestion;
+ cong_fill = nc->cong_fill;
+ cong_extents = nc->cong_extents;
+ } else {
+ on_congestion = OC_BLOCK;
+ }
+ if (on_congestion == OC_BLOCK)
return;
- if (on_congestion == OC_PULL_AHEAD && device->state.conn == C_AHEAD)
- return; /* nothing to do ... */
+ if (!drbd_should_do_remote(peer_device, NOW))
+ return; /* Ignore congestion if we are not replicating writes */
/* If I don't even have good local storage, we can not reasonably try
* to pull ahead of the peer. We also need the local reference to make
@@ -1007,44 +1561,125 @@ static void maybe_pull_ahead(struct drbd_device *device)
if (!get_ldev_if_state(device, D_UP_TO_DATE))
return;
- if (nc->cong_fill &&
- atomic_read(&device->ap_in_flight) >= nc->cong_fill) {
- drbd_info(device, "Congestion-fill threshold reached\n");
- congested = true;
+ if (test_and_set_bit(HANDLING_CONGESTION, &peer_device->flags))
+ goto out;
+
+ /* if an other volume already found that we are congested, short circuit. */
+ congested = test_bit(CONN_CONGESTED, &connection->flags);
+
+ if (!congested && cong_fill) {
+ int n = atomic_read(&connection->ap_in_flight) +
+ atomic_read(&connection->rs_in_flight);
+ if (n >= cong_fill) {
+ drbd_info(device, "Congestion-fill threshold reached (%d >= %d)\n", n, cong_fill);
+ congested = true;
+ }
}
- if (device->act_log->used >= nc->cong_extents) {
- drbd_info(device, "Congestion-extents threshold reached\n");
+ if (!congested && device->act_log->used >= cong_extents) {
+ drbd_info(device, "Congestion-extents threshold reached (%d >= %d)\n",
+ device->act_log->used, cong_extents);
congested = true;
}
if (congested) {
- /* start a new epoch for non-mirrored writes */
- start_new_tl_epoch(first_peer_device(device)->connection);
-
- if (on_congestion == OC_PULL_AHEAD)
- _drbd_set_state(_NS(device, conn, C_AHEAD), 0, NULL);
- else /*nc->on_congestion == OC_DISCONNECT */
- _drbd_set_state(_NS(device, conn, C_DISCONNECTING), 0, NULL);
+ set_bit(CONN_CONGESTED, &connection->flags);
+ drbd_peer_device_post_work(peer_device, HANDLE_CONGESTION);
+ } else {
+ clear_bit(HANDLING_CONGESTION, &peer_device->flags);
}
+out:
put_ldev(device);
}
-/* If this returns false, and req->private_bio is still set,
- * this should be submitted locally.
+static void maybe_pull_ahead(struct drbd_device *device)
+{
+ struct drbd_connection *connection;
+
+ rcu_read_lock();
+ for_each_connection_rcu(connection, device->resource)
+ if (connection->cstate[NOW] == C_CONNECTED)
+ __maybe_pull_ahead(device, connection);
+ rcu_read_unlock();
+}
+
+bool drbd_should_do_remote(struct drbd_peer_device *peer_device, enum which_state which)
+{
+ enum drbd_disk_state peer_disk_state = peer_device->disk_state[which];
+ enum drbd_repl_state repl_state = peer_device->repl_state[which];
+ bool replication = peer_device->replication[which];
+
+ return peer_disk_state == D_UP_TO_DATE ||
+ (peer_disk_state == D_INCONSISTENT && replication &&
+ (repl_state == L_ESTABLISHED ||
+ (repl_state >= L_WF_BITMAP_T && repl_state < L_AHEAD)));
+ /* Before proto 96 that was >= CONNECTED instead of >= L_WF_BITMAP_T.
+ That is equivalent since before 96 IO was frozen in the L_WF_BITMAP*
+ states. */
+}
+
+static bool drbd_should_send_out_of_sync(struct drbd_peer_device *peer_device)
+{
+ enum drbd_disk_state peer_disk_state = peer_device->disk_state[NOW];
+ enum drbd_repl_state repl_state = peer_device->repl_state[NOW];
+ bool replication = peer_device->replication[NOW];
+
+ return repl_state == L_AHEAD ||
+ repl_state == L_WF_BITMAP_S ||
+ (repl_state >= L_ESTABLISHED &&
+ (peer_disk_state == D_OUTDATED ||
+ (peer_disk_state == D_INCONSISTENT && !replication)));
+
+ /* proto 96 check omitted, there was no L_AHEAD back then,
+ * peer disk was never Outdated while connection was established,
+ * and IO was frozen during bitmap exchange */
+}
+
+/* Prefer to read from protcol C peers, then B, last A */
+static u64 calc_nodes_to_read_from(struct drbd_device *device)
+{
+ struct drbd_peer_device *peer_device;
+ u64 candidates[DRBD_PROT_C] = {};
+ int wp;
+
+ rcu_read_lock();
+ for_each_peer_device_rcu(peer_device, device) {
+ struct net_conf *nc;
+
+ if (peer_device->disk_state[NOW] != D_UP_TO_DATE)
+ continue;
+ nc = rcu_dereference(peer_device->connection->transport.net_conf);
+ if (!nc || !nc->allow_remote_read)
+ continue;
+ wp = nc->wire_protocol;
+ candidates[wp - 1] |= NODE_MASK(peer_device->node_id);
+ }
+ rcu_read_unlock();
+
+ for (wp = DRBD_PROT_C; wp >= DRBD_PROT_A; wp--) {
+ if (candidates[wp - 1])
+ return candidates[wp - 1];
+ }
+ return 0;
+}
+
+/* If this returns NULL, and req->private_bio is still set,
+ * the request should be submitted locally.
*
- * If it returns false, but req->private_bio is not set,
+ * If it returns NULL, but req->private_bio is not set,
* we do not have access to good data :(
*
* Otherwise, this destroys req->private_bio, if any,
- * and returns true.
+ * and returns the peer device which should be asked for data.
*/
-static bool do_remote_read(struct drbd_request *req)
+static struct drbd_peer_device *find_peer_device_for_read(struct drbd_request *req)
{
+ struct drbd_peer_device *peer_device;
struct drbd_device *device = req->device;
- enum drbd_read_balancing rbm;
+ enum drbd_read_balancing rbm = RB_PREFER_REMOTE;
if (req->private_bio) {
+ /* ldev_safe: have private_bio */
if (!drbd_may_do_local_read(device,
req->i.sector, req->i.size)) {
bio_put(req->private_bio);
@@ -1053,90 +1688,123 @@ static bool do_remote_read(struct drbd_request *req)
}
}
- if (device->state.pdsk != D_UP_TO_DATE)
- return false;
-
- if (req->private_bio == NULL)
- return true;
-
- /* TODO: improve read balancing decisions, take into account drbd
- * protocol, pending requests etc. */
-
- rcu_read_lock();
- rbm = rcu_dereference(device->ldev->disk_conf)->read_balancing;
- rcu_read_unlock();
-
- if (rbm == RB_PREFER_LOCAL && req->private_bio)
- return false; /* submit locally */
+ if (device->disk_state[NOW] > D_DISKLESS) {
+ rcu_read_lock();
+ /* ldev_safe: checked disk_state while holding state_rwlock */
+ rbm = rcu_dereference(device->ldev->disk_conf)->read_balancing;
+ rcu_read_unlock();
+ if (rbm == RB_PREFER_LOCAL && req->private_bio) {
+ return NULL; /* submit locally */
+ }
+ }
- if (remote_due_to_read_balancing(device, req->i.sector, rbm)) {
- if (req->private_bio) {
- bio_put(req->private_bio);
- req->private_bio = NULL;
- put_ldev(device);
+ /* TODO: improve read balancing decisions, allow user to configure node weights */
+ while (true) {
+ if (!device->read_nodes)
+ device->read_nodes = calc_nodes_to_read_from(device);
+ if (device->read_nodes) {
+ int peer_node_id = __ffs64(device->read_nodes);
+ device->read_nodes &= ~NODE_MASK(peer_node_id);
+ peer_device = peer_device_by_node_id(device, peer_node_id);
+ if (!peer_device)
+ continue;
+ if (peer_device->disk_state[NOW] != D_UP_TO_DATE)
+ continue;
+ if (req->private_bio &&
+ !remote_due_to_read_balancing(device, peer_device, req->i.sector, rbm))
+ peer_device = NULL;
+ } else {
+ peer_device = NULL;
}
- return true;
+ break;
}
- return false;
+ if (peer_device && req->private_bio) {
+ bio_put(req->private_bio);
+ req->private_bio = NULL;
+ put_ldev(device);
+ }
+ return peer_device;
}
-bool drbd_should_do_remote(union drbd_dev_state s)
+static int drbd_process_empty_flush(struct drbd_request *req)
{
- return s.pdsk == D_UP_TO_DATE ||
- (s.pdsk >= D_INCONSISTENT &&
- s.conn >= C_WF_BITMAP_T &&
- s.conn < C_AHEAD);
- /* Before proto 96 that was >= CONNECTED instead of >= C_WF_BITMAP_T.
- That is equivalent since before 96 IO was frozen in the C_WF_BITMAP*
- states. */
-}
+ struct drbd_device *device = req->device;
+ struct drbd_peer_device *peer_device;
+ int count = 0;
+
+ for_each_peer_device(peer_device, device) {
+ /* When a flush is submitted, the expectation is that the data
+ * is written somewhere in a usable form. Hence only
+ * D_UP_TO_DATE peers are included and not all peers that
+ * receive the data. */
+ if (peer_device->disk_state[NOW] == D_UP_TO_DATE) {
+ ++count;
+
+ /* An empty flush indicates that all previously
+ * completed requests should be written out to stable
+ * storage. Request completion already triggers a
+ * barrier to be sent and the current epoch closed. The
+ * barrier causes the data to be written out unless
+ * that is configured not to be necessary.
+ *
+ * Hence there is nothing more to be done to cause the
+ * writing out to persistent storage which was
+ * requested. We just mark the request so that we know
+ * that a flush has effectively occurred on this peer
+ * so that we can complete it successfully.
+ *
+ * We _should_ wait for any outstanding barriers to
+ * protocol C peers to be acked before completing this
+ * request, so that we are sure that the previously
+ * completed requests have really been written out
+ * there too. However, DRBD has never yet implemented
+ * this. */
+ _req_mod(req, BARRIER_SENT, peer_device);
+ }
+ }
-static bool drbd_should_send_out_of_sync(union drbd_dev_state s)
-{
- return s.conn == C_AHEAD || s.conn == C_WF_BITMAP_S;
- /* pdsk = D_INCONSISTENT as a consequence. Protocol 96 check not necessary
- since we enter state C_AHEAD only if proto >= 96 */
+ return count;
}
-/* returns number of connections (== 1, for drbd 8.4)
- * expected to actually write this data,
+/* returns the number of connections expected to actually write this data,
* which does NOT include those that we are L_AHEAD for. */
static int drbd_process_write_request(struct drbd_request *req)
{
struct drbd_device *device = req->device;
- struct drbd_peer_device *peer_device = first_peer_device(device);
+ struct drbd_peer_device *peer_device;
int remote, send_oos;
+ int count = 0;
+
+ for_each_peer_device(peer_device, device) {
+ remote = drbd_should_do_remote(peer_device, NOW);
+ send_oos = drbd_should_send_out_of_sync(peer_device);
- remote = drbd_should_do_remote(device->state);
- send_oos = drbd_should_send_out_of_sync(device->state);
+ if (!remote && !send_oos)
+ continue;
- /* Need to replicate writes. Unless it is an empty flush,
- * which is better mapped to a DRBD P_BARRIER packet,
- * also for drbd wire protocol compatibility reasons.
- * If this was a flush, just start a new epoch.
- * Unless the current epoch was empty anyways, or we are not currently
- * replicating, in which case there is no point. */
- if (unlikely(req->i.size == 0)) {
- /* The only size==0 bios we expect are empty flushes. */
- D_ASSERT(device, req->master_bio->bi_opf & REQ_PREFLUSH);
- if (remote)
- _req_mod(req, QUEUE_AS_DRBD_BARRIER, peer_device);
- return remote;
- }
+ D_ASSERT(device, !(remote && send_oos));
- if (!remote && !send_oos)
- return 0;
+ if (remote) {
+ ++count;
+ _req_mod(req, NEW_NET_WRITE, peer_device);
+ } else
+ _req_mod(req, NEW_NET_OOS, peer_device);
+ }
- D_ASSERT(device, !(remote && send_oos));
+ return count;
+}
- if (remote) {
- _req_mod(req, TO_BE_SENT, peer_device);
- _req_mod(req, QUEUE_FOR_NET_WRITE, peer_device);
- } else if (drbd_set_out_of_sync(peer_device, req->i.sector, req->i.size))
- _req_mod(req, QUEUE_FOR_SEND_OOS, peer_device);
+static void drbd_request_ready_for_net(struct drbd_request *req)
+{
+ struct drbd_device *device = req->device;
+ struct drbd_peer_device *peer_device;
- return remote;
+ for_each_peer_device(peer_device, device) {
+ /* Do not mark RQ_NET_PENDING_OOS requests ready yet */
+ if (req->net_rq_state[peer_device->node_id] & RQ_NET_PENDING)
+ _req_mod(req, READY_FOR_NET, peer_device);
+ }
}
static void drbd_process_discard_or_zeroes_req(struct drbd_request *req, int flags)
@@ -1162,45 +1830,67 @@ drbd_submit_req_private_bio(struct drbd_request *req)
else
type = DRBD_FAULT_DT_RD;
+ /* ldev_safe: req->private_bio implies an ldev reference is held */
+ bio_set_dev(bio, device->ldev->backing_bdev);
+
/* State may have changed since we grabbed our reference on the
- * ->ldev member. Double check, and short-circuit to endio.
+ * device->ldev member. Double check, and short-circuit to endio.
* In case the last activity log transaction failed to get on
* stable storage, and this is a WRITE, we may not even submit
* this bio. */
if (get_ldev(device)) {
- if (drbd_insert_fault(device, type))
- bio_io_error(bio);
- else if (bio_op(bio) == REQ_OP_WRITE_ZEROES)
+ if (drbd_insert_fault(device, type)) {
+ bio->bi_status = BLK_STS_IOERR;
+ bio_endio(bio);
+ } else if (bio_op(bio) == REQ_OP_WRITE_ZEROES) {
drbd_process_discard_or_zeroes_req(req, EE_ZEROOUT |
((bio->bi_opf & REQ_NOUNMAP) ? 0 : EE_TRIM));
- else if (bio_op(bio) == REQ_OP_DISCARD)
+ } else if (bio_op(bio) == REQ_OP_DISCARD) {
drbd_process_discard_or_zeroes_req(req, EE_TRIM);
- else
+ } else {
submit_bio_noacct(bio);
+ }
put_ldev(device);
- } else
- bio_io_error(bio);
-}
+ } else {
+ bio->bi_status = BLK_STS_IOERR;
+ bio_endio(bio);
+ }
+ }
static void drbd_queue_write(struct drbd_device *device, struct drbd_request *req)
{
- spin_lock_irq(&device->resource->req_lock);
- list_add_tail(&req->tl_requests, &device->submit.writes);
+ if (req->private_bio)
+ atomic_inc(&device->ap_actlog_cnt);
+ spin_lock_irq(&device->pending_completion_lock);
list_add_tail(&req->req_pending_master_completion,
&device->pending_master_completion[1 /* WRITE */]);
- spin_unlock_irq(&device->resource->req_lock);
+ spin_unlock_irq(&device->pending_completion_lock);
+ spin_lock(&device->submit.lock);
+ list_add_tail(&req->list, &device->submit.writes);
+ spin_unlock(&device->submit.lock);
queue_work(device->submit.wq, &device->submit.worker);
/* do_submit() may sleep internally on al_wait, too */
wake_up(&device->al_wait);
}
-/* returns the new drbd_request pointer, if the caller is expected to
- * drbd_send_and_submit() it (to save latency), or NULL if we queued the
- * request on the submitter thread.
+static void drbd_req_in_actlog(struct drbd_request *req)
+{
+ req->local_rq_state |= RQ_IN_ACT_LOG;
+ ktime_get_accounting(req->in_actlog_kt);
+ atomic_sub(interval_to_al_extents(&req->i), &req->device->wait_for_actlog_ecnt);
+}
+
+/* returns the new drbd_request pointer, if the caller is expected to submit it
+ * (to save latency), or NULL if we queued the request on the submitter thread.
* Returns ERR_PTR(-ENOMEM) if we cannot allocate a drbd_request.
*/
+#ifndef CONFIG_DRBD_TIMING_STATS
+#define drbd_request_prepare(d, b, k, j) drbd_request_prepare(d, b, j)
+#endif
static struct drbd_request *
-drbd_request_prepare(struct drbd_device *device, struct bio *bio)
+drbd_request_prepare(struct drbd_device *device, struct bio *bio,
+ ktime_t start_kt,
+ unsigned long start_jif)
{
const int rw = bio_data_dir(bio);
struct drbd_request *req;
@@ -1208,44 +1898,66 @@ drbd_request_prepare(struct drbd_device *device, struct bio *bio)
/* allocate outside of all locks; */
req = drbd_req_new(device, bio);
if (!req) {
- dec_ap_bio(device);
- /* only pass the error to the upper layers.
- * if user cannot handle io errors, that's not our business. */
drbd_err(device, "could not kmalloc() req\n");
- bio->bi_status = BLK_STS_RESOURCE;
- bio_endio(bio);
- return ERR_PTR(-ENOMEM);
+ goto no_mem;
}
/* Update disk stats */
req->start_jif = bio_start_io_acct(req->master_bio);
if (get_ldev(device)) {
- req->private_bio = bio_alloc_clone(device->ldev->backing_bdev,
- bio, GFP_NOIO,
- &drbd_io_bio_set);
+ req->private_bio = bio_alloc_clone(device->ldev->backing_bdev, bio, GFP_NOIO, &drbd_io_bio_set);
+ if (!req->private_bio) {
+ drbd_err(device, "could not bio_alloc_clone() req->private_bio\n");
+ kfree(req);
+ put_ldev(device);
+ goto no_mem;
+ }
req->private_bio->bi_private = req;
req->private_bio->bi_end_io = drbd_request_endio;
}
+ ktime_get_accounting_assign(req->start_kt, start_kt);
+
+ if (rw != WRITE || req->i.size == 0)
+ return req;
+
+ /* Let the activity log know we are about to use it...
+ * FIXME
+ * Needs to slow down to not congest on the activity log, in case we
+ * have multiple primaries and the peer sends huge scattered epochs.
+ * See also how peer_requests are handled
+ * in receive_Data() { ... drbd_wait_for_activity_log_extents(); ... }
+ */
+ if (req->private_bio)
+ atomic_add(interval_to_al_extents(&req->i), &device->wait_for_actlog_ecnt);
+
/* process discards always from our submitter thread */
- if (bio_op(bio) == REQ_OP_WRITE_ZEROES ||
- bio_op(bio) == REQ_OP_DISCARD)
+ if ((bio_op(bio) == REQ_OP_WRITE_ZEROES) ||
+ (bio_op(bio) == REQ_OP_DISCARD))
goto queue_for_submitter_thread;
- if (rw == WRITE && req->private_bio && req->i.size
- && !test_bit(AL_SUSPENDED, &device->flags)) {
+ if (req->private_bio && !test_bit(AL_SUSPENDED, &device->flags)) {
+ /* ldev_safe: have private_bio */
if (!drbd_al_begin_io_fastpath(device, &req->i))
goto queue_for_submitter_thread;
- req->rq_state |= RQ_IN_ACT_LOG;
- req->in_actlog_jif = jiffies;
+ drbd_req_in_actlog(req);
}
return req;
queue_for_submitter_thread:
- atomic_inc(&device->ap_actlog_cnt);
+ ktime_aggregate_delta(device, req->start_kt, before_queue_kt);
drbd_queue_write(device, req);
return NULL;
+
+ no_mem:
+ dec_ap_bio(device, rw);
+ /* only pass the error to the upper layers.
+ * if user cannot handle io errors, that's not our business.
+ */
+ bio->bi_status = BLK_STS_RESOURCE;
+ bio_endio(bio);
+ return ERR_PTR(-ENOMEM);
}
/* Require at least one path to current data.
@@ -1260,8 +1972,17 @@ drbd_request_prepare(struct drbd_device *device, struct bio *bio)
*/
static bool may_do_writes(struct drbd_device *device)
{
- const union drbd_dev_state s = device->state;
- return s.disk == D_UP_TO_DATE || s.pdsk == D_UP_TO_DATE;
+ struct drbd_peer_device *peer_device;
+
+ if (device->disk_state[NOW] == D_UP_TO_DATE)
+ return true;
+
+ for_each_peer_device(peer_device, device) {
+ if (peer_device->disk_state[NOW] == D_UP_TO_DATE)
+ return true;
+ }
+
+ return false;
}
struct drbd_plug_cb {
@@ -1273,21 +1994,25 @@ struct drbd_plug_cb {
static void drbd_unplug(struct blk_plug_cb *cb, bool from_schedule)
{
struct drbd_plug_cb *plug = container_of(cb, struct drbd_plug_cb, cb);
- struct drbd_resource *resource = plug->cb.data;
struct drbd_request *req = plug->most_recent_req;
+ struct drbd_resource *resource;
kfree(cb);
if (!req)
return;
- spin_lock_irq(&resource->req_lock);
+ resource = req->device->resource;
+
+ read_lock_irq(&resource->state_rwlock);
/* In case the sender did not process it yet, raise the flag to
* have it followed with P_UNPLUG_REMOTE just after. */
- req->rq_state |= RQ_UNPLUG;
+ spin_lock(&req->rq_lock);
+ req->local_rq_state |= RQ_UNPLUG;
+ spin_unlock(&req->rq_lock);
/* but also queue a generic unplug */
drbd_queue_unplug(req->device);
kref_put(&req->kref, drbd_req_destroy);
- spin_unlock_irq(&resource->req_lock);
+ read_unlock_irq(&resource->state_rwlock);
}
static struct drbd_plug_cb* drbd_check_plugged(struct drbd_resource *resource)
@@ -1307,40 +2032,34 @@ static struct drbd_plug_cb* drbd_check_plugged(struct drbd_resource *resource)
static void drbd_update_plug(struct drbd_plug_cb *plug, struct drbd_request *req)
{
struct drbd_request *tmp = plug->most_recent_req;
- /* Will be sent to some peer.
- * Remember to tag it with UNPLUG_REMOTE on unplug */
+ /* Will be sent to some peer. */
kref_get(&req->kref);
plug->most_recent_req = req;
if (tmp)
kref_put(&tmp->kref, drbd_req_destroy);
}
-static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request *req)
+static void drbd_send_and_submit(struct drbd_request *req)
{
+ struct drbd_device *device = req->device;
struct drbd_resource *resource = device->resource;
- struct drbd_peer_device *peer_device = first_peer_device(device);
+ struct drbd_peer_device *peer_device = NULL; /* for read */
const int rw = bio_data_dir(req->master_bio);
struct bio_and_error m = { NULL, };
bool no_remote = false;
bool submit_private_bio = false;
- spin_lock_irq(&resource->req_lock);
- if (rw == WRITE) {
- /* This may temporarily give up the req_lock,
- * but will re-aquire it before it returns here.
- * Needs to be before the check on drbd_suspended() */
- complete_conflicting_writes(req);
- /* no more giving up req_lock from now on! */
+ read_lock_irq(&resource->state_rwlock);
+ if (rw == WRITE) {
/* check for congestion, and potentially stop sending
* full data updates, but start sending "dirty bits" only. */
maybe_pull_ahead(device);
}
-
if (drbd_suspended(device)) {
/* push back and retry: */
- req->rq_state |= RQ_POSTPONED;
+ req->local_rq_state |= RQ_POSTPONED;
if (req->private_bio) {
bio_put(req->private_bio);
req->private_bio = NULL;
@@ -1349,44 +2068,87 @@ static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request
goto out;
}
- /* We fail READ early, if we can not serve it.
- * We must do this before req is registered on any lists.
- * Otherwise, drbd_req_complete() will queue failed READ for retry. */
- if (rw != WRITE) {
- if (!do_remote_read(req) && !req->private_bio)
+ if (rw == WRITE) {
+ if (!may_do_writes(device)) {
+ if (req->private_bio) {
+ bio_put(req->private_bio);
+ req->private_bio = NULL;
+ put_ldev(device);
+ }
+ goto nodata;
+ }
+ } else {
+ /* We fail READ early, if we can not serve it.
+ * We must do this before req is registered on any lists.
+ * Otherwise, drbd_req_complete() will queue failed READ for retry. */
+ peer_device = find_peer_device_for_read(req);
+ if (!peer_device && !req->private_bio)
goto nodata;
}
- /* which transfer log epoch does this belong to? */
- req->epoch = atomic_read(&first_peer_device(device)->connection->current_tle_nr);
+ spin_lock(&resource->tl_update_lock); /* local irq already disabled */
+ if (rw == WRITE) {
+ /* Update dagtag_sector before determining current_tle_nr so
+ * that senders can detect if there are requests currently
+ * being submitted. Updates are protected by tl_update_lock,
+ * but reads are not, so WRITE_ONCE(). */
+ WRITE_ONCE(resource->dagtag_sector, resource->dagtag_sector + (req->i.size >> 9));
+ /* Ensure that the written value is visible to the senders. */
+ smp_wmb();
+ }
+ req->dagtag_sector = resource->dagtag_sector;
- /* no point in adding empty flushes to the transfer log,
- * they are mapped to drbd barriers already. */
- if (likely(req->i.size!=0)) {
- if (rw == WRITE)
- first_peer_device(device)->connection->current_tle_writes++;
+ spin_lock(&resource->current_tle_lock);
+ /* which transfer log epoch does this belong to? */
+ req->epoch = atomic_read(&resource->current_tle_nr);
+ if (rw == WRITE && likely(req->i.size != 0))
+ resource->current_tle_writes++;
+ spin_unlock(&resource->current_tle_lock);
- list_add_tail(&req->tl_requests, &first_peer_device(device)->connection->transfer_log);
- }
+ /* A size==0 bio can only be an empty flush, which is mapped to a DRBD
+ * P_BARRIER packet. */
+ if (unlikely(req->i.size == 0)) {
+ /* The only size==0 bios we expect are empty flushes. */
+ D_ASSERT(device, req->master_bio->bi_opf & REQ_PREFLUSH);
- if (rw == WRITE) {
- if (req->private_bio && !may_do_writes(device)) {
- bio_put(req->private_bio);
- req->private_bio = NULL;
- put_ldev(device);
- goto nodata;
- }
- if (!drbd_process_write_request(req))
+ if (!drbd_process_empty_flush(req))
no_remote = true;
} else {
- /* We either have a private_bio, or we can read from remote.
- * Otherwise we had done the goto nodata above. */
- if (req->private_bio == NULL) {
- _req_mod(req, TO_BE_SENT, peer_device);
- _req_mod(req, QUEUE_FOR_NET_READ, peer_device);
- } else
- no_remote = true;
+ if (rw == WRITE) {
+ struct drbd_request *prev_write = resource->tl_previous_write;
+ resource->tl_previous_write = req;
+
+ if (prev_write) {
+ if (!test_bit(INTERVAL_DONE, &prev_write->i.flags))
+ refcount_inc(&req->done_ref);
+ refcount_inc(&req->oos_send_ref);
+ prev_write->next_write = req;
+ }
+
+ if (!drbd_process_write_request(req))
+ no_remote = true;
+ } else {
+ if (peer_device)
+ _req_mod(req, NEW_NET_READ, peer_device);
+ else
+ no_remote = true;
+ }
+
+ /* req may now be accessed by other threads - do not modify
+ * "immutable" fields after this point */
+ list_add_tail_rcu(&req->tl_requests, &resource->transfer_log);
+
+ /* Do this after adding to the transfer log so that the
+ * caching pointer req_not_net_done is set if
+ * necessary. */
+ drbd_request_ready_for_net(req);
}
+ spin_unlock(&resource->tl_update_lock);
+
+ if (rw == WRITE)
+ wake_all_senders(resource);
+ else if (peer_device)
+ wake_up(&peer_device->connection->sender_work.q_wait);
if (no_remote == false) {
struct drbd_plug_cb *plug = drbd_check_plugged(resource);
@@ -1396,29 +2158,38 @@ static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request
/* If it took the fast path in drbd_request_prepare, add it here.
* The slow path has added it already. */
+ spin_lock(&device->pending_completion_lock); /* local irq already disabled */
if (list_empty(&req->req_pending_master_completion))
list_add_tail(&req->req_pending_master_completion,
&device->pending_master_completion[rw == WRITE]);
if (req->private_bio) {
- /* needs to be marked within the same spinlock */
+ /* pre_submit_jif is used in request_timer_fn() */
req->pre_submit_jif = jiffies;
+ ktime_get_accounting(req->pre_submit_kt);
list_add_tail(&req->req_pending_local,
&device->pending_completion[rw == WRITE]);
_req_mod(req, TO_BE_SUBMITTED, NULL);
- /* but we need to give up the spinlock to submit */
+ /* needs to be marked within the same spinlock
+ * but we need to give up the spinlock to submit */
submit_private_bio = true;
- } else if (no_remote) {
+ spin_unlock(&device->pending_completion_lock);
+ } else {
+ spin_unlock(&device->pending_completion_lock);
+ if (no_remote) {
nodata:
- if (drbd_ratelimit())
- drbd_err(device, "IO ERROR: neither local nor remote data, sector %llu+%u\n",
- (unsigned long long)req->i.sector, req->i.size >> 9);
- /* A write may have been queued for send_oos, however.
- * So we can not simply free it, we must go through drbd_req_put_completion_ref() */
+ drbd_err_ratelimit(req->device,
+ "IO ERROR: neither local nor remote data, sector %llu+%u\n",
+ (unsigned long long)req->i.sector, req->i.size >> 9);
+ /* A write may have been queued for send_oos, however.
+ * So we can not simply free it, we must go through
+ * drbd_req_put_completion_ref()
+ */
+ }
}
out:
drbd_req_put_completion_ref(req, &m, 1);
- spin_unlock_irq(&resource->req_lock);
+ read_unlock_irq(&resource->state_rwlock);
/* Even though above is a kref_put(), this is safe.
* As long as we still need to submit our private bio,
@@ -1428,114 +2199,396 @@ static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request
* That's why we cannot check on req->private_bio. */
if (submit_private_bio)
drbd_submit_req_private_bio(req);
+
if (m.bio)
complete_master_bio(device, &m);
}
-void __drbd_make_request(struct drbd_device *device, struct bio *bio)
+/* Insert the request into the tree of writes. Pass it through to be submitted
+ * if possible. Otherwise it will be submitted asynchronously via
+ * drbd_release_conflicts once the conflict has been resolved. */
+static void drbd_conflict_submit_write(struct drbd_request *req)
+{
+ struct drbd_device *device = req->device;
+ struct drbd_interval *conflict;
+
+ spin_lock_irq(&device->interval_lock);
+ clear_bit(INTERVAL_SUBMIT_CONFLICT_QUEUED, &req->i.flags);
+ conflict = drbd_find_conflict(device, &req->i, 0);
+ if (drbd_interval_empty(&req->i))
+ drbd_insert_interval(&device->requests, &req->i);
+ if (!conflict) {
+ set_bit(INTERVAL_SUBMITTED, &req->i.flags);
+ } else if (drbd_interval_is_local(conflict)) {
+ struct drbd_request *conflicting_req =
+ container_of(conflict, struct drbd_request, i);
+
+ if (conflicting_req->local_rq_state & RQ_POSTPONED) {
+ req->local_rq_state |= RQ_POSTPONED;
+
+ /*
+ * Remove interval from tree to prevent req from being
+ * queued when conflicts are released.
+ */
+ drbd_remove_interval(&device->requests, &req->i);
+ }
+ }
+ spin_unlock_irq(&device->interval_lock);
+
+ /*
+ * If there is a conflict, the request will be submitted once the
+ * conflict has cleared.
+ */
+ if (!conflict) {
+ drbd_send_and_submit(req);
+ } else if (req->local_rq_state & RQ_POSTPONED) {
+ if (req->private_bio) {
+ bio_put(req->private_bio);
+ req->private_bio = NULL;
+ put_ldev(device);
+ }
+ drbd_req_put_completion_ref(req, NULL, 1);
+ }
+}
+
+static bool inc_ap_bio_cond(struct drbd_device *device, int rw)
+{
+ int ap_bio_cnt;
+ bool rv;
+
+ read_lock_irq(&device->resource->state_rwlock);
+ rv = may_inc_ap_bio(device);
+ read_unlock_irq(&device->resource->state_rwlock);
+ if (!rv)
+ return false;
+
+ /* check need for new current uuid _AFTER_ ensuring IO is not suspended via may_inc_ap_bio */
+ if (test_bit(NEW_CUR_UUID, &device->flags)) {
+ if (!test_and_set_bit(WRITING_NEW_CUR_UUID, &device->flags))
+ drbd_device_post_work(device, MAKE_NEW_CUR_UUID);
+
+ return false;
+ }
+
+ do {
+ unsigned int nr_requests = device->resource->res_opts.nr_requests;
+
+ ap_bio_cnt = atomic_read(&device->ap_bio_cnt[rw]);
+ if (ap_bio_cnt >= nr_requests)
+ return false;
+ } while (atomic_cmpxchg(&device->ap_bio_cnt[rw], ap_bio_cnt, ap_bio_cnt + 1) != ap_bio_cnt);
+
+ return true;
+}
+
+static void inc_ap_bio(struct drbd_device *device, int rw)
+{
+ /* we wait here
+ * as long as the device is suspended
+ * until the bitmap is no longer on the fly during connection
+ * handshake as long as we would exceed the max_buffer limit.
+ *
+ * to avoid races with the reconnect code,
+ * we need to atomic_inc within the spinlock. */
+
+ wait_event(device->misc_wait, inc_ap_bio_cond(device, rw));
+}
+
+void __drbd_make_request(struct drbd_device *device, struct bio *bio,
+ ktime_t start_kt,
+ unsigned long start_jif)
{
- struct drbd_request *req = drbd_request_prepare(device, bio);
+ const int rw = bio_data_dir(bio);
+ struct drbd_request *req;
+
+ inc_ap_bio(device, bio_data_dir(bio));
+ req = drbd_request_prepare(device, bio, start_kt, start_jif);
if (IS_ERR_OR_NULL(req))
return;
- drbd_send_and_submit(device, req);
+
+ if (rw == WRITE)
+ drbd_conflict_submit_write(req);
+ else
+ drbd_send_and_submit(req);
+}
+
+/* Work function to submit requests once they are released after conflicts. The
+ * queued requests are processed and, if no other conflict is found, submitted. */
+void drbd_do_submit_conflict(struct work_struct *ws)
+{
+ struct drbd_device *device = container_of(ws, struct drbd_device, submit_conflict.worker);
+ struct drbd_peer_request *peer_req, *peer_req_tmp;
+ struct drbd_request *req, *tmp;
+ LIST_HEAD(resync_writes);
+ LIST_HEAD(resync_reads);
+ LIST_HEAD(writes);
+ LIST_HEAD(peer_writes);
+
+ spin_lock_irq(&device->submit_conflict.lock);
+ list_splice_init(&device->submit_conflict.resync_writes, &resync_writes);
+ list_splice_init(&device->submit_conflict.resync_reads, &resync_reads);
+ list_splice_init(&device->submit_conflict.writes, &writes);
+ list_splice_init(&device->submit_conflict.peer_writes, &peer_writes);
+ spin_unlock_irq(&device->submit_conflict.lock);
+
+ /* Delete the list entries when iterating them so that they can be re-used
+ * for adding them to the conflict lists again once the
+ * submit_conflict_queued flag has been cleared. */
+
+ list_for_each_entry_safe(peer_req, peer_req_tmp, &resync_writes, w.list) {
+ list_del_init(&peer_req->w.list);
+ if (!test_bit(INTERVAL_READY_TO_SEND, &peer_req->i.flags))
+ drbd_conflict_send_resync_request(peer_req);
+ else
+ drbd_conflict_submit_resync_request(peer_req);
+ }
+
+ list_for_each_entry_safe(peer_req, peer_req_tmp, &resync_reads, w.list) {
+ list_del_init(&peer_req->w.list);
+ drbd_conflict_submit_peer_read(peer_req);
+ }
+
+ list_for_each_entry_safe(req, tmp, &writes, list) {
+ list_del_init(&req->list);
+ drbd_conflict_submit_write(req);
+ }
+
+ list_for_each_entry_safe(peer_req, peer_req_tmp, &peer_writes, w.list) {
+ list_del_init(&peer_req->w.list);
+ /* ldev_safe: queued peer requests hold their own ldev references */
+ drbd_conflict_submit_peer_write(peer_req);
+ }
+}
+
+/* helpers for do_submit */
+
+struct incoming_pending {
+ /* from drbd_submit_bio() or receive_Data() */
+ struct list_head incoming;
+ /* for non-blocking fill-up # of updates in the transaction */
+ struct list_head more_incoming;
+ /* to be submitted after next AL-transaction commit */
+ struct list_head pending;
+ /* need cleanup */
+ struct list_head cleanup;
+};
+
+struct waiting_for_act_log {
+ struct incoming_pending requests;
+ struct incoming_pending peer_requests;
+};
+
+static void ipb_init(struct incoming_pending *ipb)
+{
+ INIT_LIST_HEAD(&ipb->incoming);
+ INIT_LIST_HEAD(&ipb->more_incoming);
+ INIT_LIST_HEAD(&ipb->pending);
+ INIT_LIST_HEAD(&ipb->cleanup);
+}
+
+static void wfa_init(struct waiting_for_act_log *wfa)
+{
+ ipb_init(&wfa->requests);
+ ipb_init(&wfa->peer_requests);
+}
+
+#define wfa_lists_empty(_wfa, name) \
+ (list_empty(&(_wfa)->requests.name) && list_empty(&(_wfa)->peer_requests.name))
+#define wfa_splice_tail_init(_wfa, from, to) do { \
+ list_splice_tail_init(&(_wfa)->requests.from, &(_wfa)->requests.to); \
+ list_splice_tail_init(&(_wfa)->peer_requests.from, &(_wfa)->peer_requests.to); \
+ } while (0)
+
+static void __drbd_submit_peer_request(struct drbd_peer_request *peer_req)
+{
+ struct drbd_peer_device *peer_device = peer_req->peer_device;
+ struct drbd_device *device = peer_device->device;
+ int err;
+
+ peer_req->flags |= EE_IN_ACTLOG;
+ atomic_sub(interval_to_al_extents(&peer_req->i), &device->wait_for_actlog_ecnt);
+ atomic_dec(&device->wait_for_actlog);
+ list_del_init(&peer_req->w.list);
+
+ err = drbd_submit_peer_request(peer_req);
+
+ if (err)
+ drbd_cleanup_after_failed_submit_peer_write(peer_req);
}
-static void submit_fast_path(struct drbd_device *device, struct list_head *incoming)
+static void submit_fast_path(struct drbd_device *device, struct waiting_for_act_log *wfa)
{
struct blk_plug plug;
struct drbd_request *req, *tmp;
+ struct drbd_peer_request *pr, *pr_tmp;
blk_start_plug(&plug);
- list_for_each_entry_safe(req, tmp, incoming, tl_requests) {
+ list_for_each_entry_safe(pr, pr_tmp, &wfa->peer_requests.incoming, w.list) {
+ if (!drbd_al_begin_io_fastpath(pr->peer_device->device, &pr->i))
+ continue;
+
+ __drbd_submit_peer_request(pr);
+ }
+ list_for_each_entry_safe(req, tmp, &wfa->requests.incoming, list) {
const int rw = bio_data_dir(req->master_bio);
- if (rw == WRITE /* rw != WRITE should not even end up here! */
- && req->private_bio && req->i.size
- && !test_bit(AL_SUSPENDED, &device->flags)) {
+ if (rw == WRITE && req->private_bio && req->i.size
+ && !test_bit(AL_SUSPENDED, &device->flags)) {
if (!drbd_al_begin_io_fastpath(device, &req->i))
continue;
- req->rq_state |= RQ_IN_ACT_LOG;
- req->in_actlog_jif = jiffies;
+ drbd_req_in_actlog(req);
atomic_dec(&device->ap_actlog_cnt);
}
- list_del_init(&req->tl_requests);
- drbd_send_and_submit(device, req);
+ list_del_init(&req->list);
+ drbd_conflict_submit_write(req);
}
blk_finish_plug(&plug);
}
+static struct drbd_request *wfa_next_request(struct waiting_for_act_log *wfa)
+{
+ struct list_head *lh = !list_empty(&wfa->requests.more_incoming) ?
+ &wfa->requests.more_incoming : &wfa->requests.incoming;
+ return list_first_entry_or_null(lh, struct drbd_request, list);
+}
+
+static struct drbd_peer_request *wfa_next_peer_request(struct waiting_for_act_log *wfa)
+{
+ struct list_head *lh = !list_empty(&wfa->peer_requests.more_incoming) ?
+ &wfa->peer_requests.more_incoming : &wfa->peer_requests.incoming;
+ return list_first_entry_or_null(lh, struct drbd_peer_request, w.list);
+}
+
static bool prepare_al_transaction_nonblock(struct drbd_device *device,
- struct list_head *incoming,
- struct list_head *pending,
- struct list_head *later)
+ struct waiting_for_act_log *wfa)
{
+ struct drbd_peer_request *peer_req;
struct drbd_request *req;
- int wake = 0;
+ bool made_progress = false;
int err;
spin_lock_irq(&device->al_lock);
- while ((req = list_first_entry_or_null(incoming, struct drbd_request, tl_requests))) {
+
+ /* Don't even try, if someone has it locked right now. */
+ if (test_bit(__LC_LOCKED, &device->act_log->flags))
+ goto out;
+
+ while ((peer_req = wfa_next_peer_request(wfa))) {
+ if (peer_req->peer_device->connection->cstate[NOW] < C_CONNECTED) {
+ list_move_tail(&peer_req->w.list, &wfa->peer_requests.cleanup);
+ made_progress = true;
+ continue;
+ }
+ err = drbd_al_begin_io_nonblock(device, &peer_req->i);
+ if (err) {
+ if (err != -ENOBUFS && drbd_ratelimit())
+ drbd_err(device, "Unexpected error %d from drbd_al_begin_io_nonblock\n", err);
+ break;
+ }
+ list_move_tail(&peer_req->w.list, &wfa->peer_requests.pending);
+ made_progress = true;
+ }
+ while ((req = wfa_next_request(wfa))) {
+ ktime_aggregate_delta(device, req->start_kt, before_al_begin_io_kt);
err = drbd_al_begin_io_nonblock(device, &req->i);
- if (err == -ENOBUFS)
+ if (err) {
+ if (err != -ENOBUFS && drbd_ratelimit())
+ drbd_err(device, "Unexpected error %d from drbd_al_begin_io_nonblock\n", err);
break;
- if (err == -EBUSY)
- wake = 1;
- if (err)
- list_move_tail(&req->tl_requests, later);
- else
- list_move_tail(&req->tl_requests, pending);
+ }
+ list_move_tail(&req->list, &wfa->requests.pending);
+ made_progress = true;
}
+ out:
spin_unlock_irq(&device->al_lock);
- if (wake)
- wake_up(&device->al_wait);
- return !list_empty(pending);
+ return made_progress;
}
-static void send_and_submit_pending(struct drbd_device *device, struct list_head *pending)
+static void send_and_submit_pending(struct drbd_device *device, struct waiting_for_act_log *wfa)
{
struct blk_plug plug;
- struct drbd_request *req;
+ struct drbd_request *req, *tmp;
+ struct drbd_peer_request *pr, *pr_tmp;
blk_start_plug(&plug);
- while ((req = list_first_entry_or_null(pending, struct drbd_request, tl_requests))) {
- req->rq_state |= RQ_IN_ACT_LOG;
- req->in_actlog_jif = jiffies;
+ list_for_each_entry_safe(pr, pr_tmp, &wfa->peer_requests.pending, w.list) {
+ __drbd_submit_peer_request(pr);
+ }
+ list_for_each_entry_safe(req, tmp, &wfa->requests.pending, list) {
+ drbd_req_in_actlog(req);
atomic_dec(&device->ap_actlog_cnt);
- list_del_init(&req->tl_requests);
- drbd_send_and_submit(device, req);
+ list_del_init(&req->list);
+ drbd_conflict_submit_write(req);
}
blk_finish_plug(&plug);
}
+/* more: for non-blocking fill-up # of updates in the transaction */
+static bool grab_new_incoming_requests(struct drbd_device *device, struct waiting_for_act_log *wfa, bool more)
+{
+ /* grab new incoming requests */
+ struct list_head *reqs = more ? &wfa->requests.more_incoming : &wfa->requests.incoming;
+ struct list_head *peer_reqs = more ? &wfa->peer_requests.more_incoming : &wfa->peer_requests.incoming;
+ bool found_new = false;
+
+ spin_lock(&device->submit.lock);
+ found_new = !list_empty(&device->submit.writes);
+ list_splice_tail_init(&device->submit.writes, reqs);
+ found_new |= !list_empty(&device->submit.peer_writes);
+ list_splice_tail_init(&device->submit.peer_writes, peer_reqs);
+ spin_unlock(&device->submit.lock);
+
+ return found_new;
+}
+
void do_submit(struct work_struct *ws)
{
struct drbd_device *device = container_of(ws, struct drbd_device, submit.worker);
- LIST_HEAD(incoming); /* from drbd_make_request() */
- LIST_HEAD(pending); /* to be submitted after next AL-transaction commit */
- LIST_HEAD(busy); /* blocked by resync requests */
+ struct waiting_for_act_log wfa;
+ bool made_progress;
- /* grab new incoming requests */
- spin_lock_irq(&device->resource->req_lock);
- list_splice_tail_init(&device->submit.writes, &incoming);
- spin_unlock_irq(&device->resource->req_lock);
+ wfa_init(&wfa);
+
+ grab_new_incoming_requests(device, &wfa, false);
for (;;) {
DEFINE_WAIT(wait);
- /* move used-to-be-busy back to front of incoming */
- list_splice_init(&busy, &incoming);
- submit_fast_path(device, &incoming);
- if (list_empty(&incoming))
+ /* ldev_safe: queued requests acquired ldev in drbd_request_prepare() */
+ submit_fast_path(device, &wfa);
+ if (wfa_lists_empty(&wfa, incoming))
break;
for (;;) {
+ /*
+ * We put ourselves on device->al_wait, then check if
+ * we can need to actually sleep and wait for someone
+ * else to make progress.
+ *
+ * We need to sleep if we cannot activate enough
+ * activity log extents for even one single request.
+ * That would mean that all (peer-)requests in our
+ * incoming lists target "cold" activity log extents,
+ * all activity log extent slots are have on-going
+ * in-flight IO (are "hot"), and no idle or free slot
+ * is available.
+ *
+ * prepare_to_wait() can internally cause a wake_up()
+ * as well, though, so this may appear to busy-loop
+ * a couple times, but should settle down quickly.
+ *
+ * When application requests make sufficient progress,
+ * some refcount on some extent will eventually drop to
+ * zero, we will be woken up, and can try to move that
+ * now idle extent to "cold", and recycle its slot for
+ * one of the extents we'd like to become hot.
+ */
prepare_to_wait(&device->al_wait, &wait, TASK_UNINTERRUPTIBLE);
- list_splice_init(&busy, &incoming);
- prepare_al_transaction_nonblock(device, &incoming, &pending, &busy);
- if (!list_empty(&pending))
+ made_progress = prepare_al_transaction_nonblock(device, &wfa);
+ if (made_progress)
break;
schedule();
@@ -1551,15 +2604,12 @@ void do_submit(struct work_struct *ws)
* effectively blocking all new requests until we made
* at least _some_ progress with what we currently have.
*/
- if (!list_empty(&incoming))
+ if (!wfa_lists_empty(&wfa, incoming))
continue;
- /* Nothing moved to pending, but nothing left
- * on incoming: all moved to busy!
- * Grab new and iterate. */
- spin_lock_irq(&device->resource->req_lock);
- list_splice_tail_init(&device->submit.writes, &incoming);
- spin_unlock_irq(&device->resource->req_lock);
+ /* Nothing moved to pending, but nothing left on
+ * incoming. Grab new and iterate. */
+ grab_new_incoming_requests(device, &wfa, false);
}
finish_wait(&device->al_wait, &wait);
@@ -1567,81 +2617,216 @@ void do_submit(struct work_struct *ws)
* had been processed, skip ahead to commit, and iterate
* without splicing in more incoming requests from upper layers.
*
- * Else, if all incoming have been processed,
- * they have become either "pending" (to be submitted after
- * next transaction commit) or "busy" (blocked by resync).
+ * Else, if all incoming have been processed, they have become
+ * "pending" (to be submitted after next transaction commit).
*
* Maybe more was queued, while we prepared the transaction?
* Try to stuff those into this transaction as well.
* Be strictly non-blocking here,
* we already have something to commit.
*
- * Commit if we don't make any more progres.
+ * Commit as soon as we don't make any more progress.
*/
- while (list_empty(&incoming)) {
- LIST_HEAD(more_pending);
- LIST_HEAD(more_incoming);
- bool made_progress;
-
+ while (wfa_lists_empty(&wfa, incoming)) {
/* It is ok to look outside the lock,
* it's only an optimization anyways */
- if (list_empty(&device->submit.writes))
+ if (list_empty(&device->submit.writes) &&
+ list_empty(&device->submit.peer_writes))
break;
- spin_lock_irq(&device->resource->req_lock);
- list_splice_tail_init(&device->submit.writes, &more_incoming);
- spin_unlock_irq(&device->resource->req_lock);
-
- if (list_empty(&more_incoming))
+ if (!grab_new_incoming_requests(device, &wfa, true))
break;
- made_progress = prepare_al_transaction_nonblock(device, &more_incoming, &more_pending, &busy);
+ made_progress = prepare_al_transaction_nonblock(device, &wfa);
- list_splice_tail_init(&more_pending, &pending);
- list_splice_tail_init(&more_incoming, &incoming);
+ wfa_splice_tail_init(&wfa, more_incoming, incoming);
if (!made_progress)
break;
}
+ if (!list_empty(&wfa.peer_requests.cleanup))
+ drbd_cleanup_peer_requests_wfa(device, &wfa.peer_requests.cleanup);
+ /* ldev_safe: queued requests acquired ldev in drbd_request_prepare() */
drbd_al_begin_io_commit(device);
- send_and_submit_pending(device, &pending);
+
+ send_and_submit_pending(device, &wfa);
+ }
+}
+
+static bool drbd_reject_write_early(struct drbd_device *device, struct bio *bio)
+{
+ struct drbd_resource *resource = device->resource;
+
+ /* If you "mount -o ro", then later "mount -o remount,rw", you can end
+ * up with a DRBD "Secondary" receiving WRITE requests from the VFS.
+ * We cannot have that. */
+
+ if (bio_data_dir(bio) == READ)
+ return false;
+
+ if (resource->role[NOW] != R_PRIMARY) {
+ /* You can fsync() on an O_RDONLY fd. Only be noisy
+ * if there is data. Ratelimit on per device "unspec"
+ * ratelimit state before kmalloc / adding the specific
+ * openers hint.
+ */
+ if (bio_has_data(bio) && drbd_device_ratelimit(device, GENERIC)) {
+ char *buf = kmalloc(128, __GFP_NORETRY);
+
+ if (buf)
+ youngest_and_oldest_opener_to_str(device, buf, 128);
+ drbd_err(device,
+ "Rejected WRITE request, not in Primary role.%s\n", buf ?: "");
+ kfree(buf);
+ }
+ return true;
+ } else if (device->open_cnt == 0) {
+ drbd_err_ratelimit(device, "WRITE request, but open_cnt == 0!\n");
+ } else if (!device->writable && bio_has_data(bio)) {
+ /*
+ * If the resource was (temporarily, auto) promoted,
+ * a remount,rw may have succeeded without marking the device
+ * open_cnt as "writable". Once we let writes through, we need
+ * _all_ openers to release(), before we attempt to auto-demote
+ * again, so we mark it writable here. Grab the open_release
+ * mutex to protect against races with new openers.
+ */
+ mutex_lock(&resource->open_release);
+ drbd_info(device, "open_cnt:%d, implicitly promoted to writable\n",
+ device->open_cnt);
+ device->writable = true;
+ mutex_unlock(&resource->open_release);
+ }
+ return false;
+}
+
+/* Check if bio is "bad", likely to be rejected by lower layers or peers:
+ * Must not be too large, must not be unaligned.
+ */
+static bool bio_bad(struct drbd_device *device, struct bio *bio)
+{
+ unsigned int bss_mask = queue_logical_block_size(device->rq_queue) / SECTOR_SIZE - 1;
+ unsigned int bs_mask = queue_logical_block_size(device->rq_queue) - 1;
+ unsigned long long sector = bio->bi_iter.bi_sector;
+ unsigned int size = bio->bi_iter.bi_size;
+
+ if (size > DRBD_MAX_BATCH_BIO_SIZE || (size & bs_mask) || (sector & bss_mask)) {
+ char comm[TASK_COMM_LEN];
+
+ get_task_comm(comm, current);
+ drbd_warn(device, "bad bio: %llu +%u 0x%x submitted by %s[%u]\n",
+ sector, size, bio->bi_opf, comm, task_pid_nr(current));
+ return true;
}
+
+ return false;
}
+/* drbd_submit_bio() - entry point for data into DRBD
+ *
+ * Request handling flow:
+ *
+ * drbd_submit_bio
+ * |
+ * v wait for AL
+ * do_retry -----------------------> __drbd_make_request --------> drbd_queue_write
+ * ^ | |
+ * | | ...
+ * | | |
+ * | | v AL extent active
+ * | drbd_do_submit_conflict --------+ do_submit ----------------+
+ * | ^ | | |
+ * ... | | v v
+ * | ... | send_and_submit_pending submit_fast_path
+ * | | v | |
+ * | +----------- drbd_conflict_submit_write <-------+--------------------+
+ * | conflict |
+ * | v
+ * drbd_restart_request <----------- drbd_send_and_submit
+ * RQ_POSTPONED |
+ * v
+ * Request state machine
+ */
void drbd_submit_bio(struct bio *bio)
{
struct drbd_device *device = bio->bi_bdev->bd_disk->private_data;
+#ifdef CONFIG_DRBD_TIMING_STATS
+ ktime_t start_kt;
+#endif
+ unsigned long start_jif;
+
+ if (drbd_reject_write_early(device, bio)) {
+ bio->bi_status = BLK_STS_IOERR;
+ bio_endio(bio);
+ return;
+ }
bio = bio_split_to_limits(bio);
if (!bio)
return;
- /*
- * what we "blindly" assume:
+ if (device->cached_err_io || bio_bad(device, bio)) {
+ bio->bi_status = BLK_STS_IOERR;
+ bio_endio(bio);
+ return;
+ }
+
+ /* This is both an optimization: READ of size 0, nothing to do
+ * and a workaround: (older) ZFS explodes on size zero reads, see
+ * https://github.com/zfsonlinux/zfs/issues/8379
+ * Actually don't do anything for size zero bios.
+ * Add a "WARN_ONCE", so we can tell the caller to stop doing this.
*/
- D_ASSERT(device, IS_ALIGNED(bio->bi_iter.bi_size, 512));
+ if (bio_op(bio) == REQ_OP_READ && bio->bi_iter.bi_size == 0) {
+ WARN_ONCE(1, "size zero read from upper layers");
+ bio_endio(bio);
+ return;
+ }
+
+ ktime_get_accounting(start_kt);
+ start_jif = jiffies;
+
+ __drbd_make_request(device, bio, start_kt, start_jif);
+}
+
+static unsigned long time_min_in_future(unsigned long now,
+ unsigned long t1, unsigned long t2)
+{
+ bool t1_in_future = time_after(t1, now);
+ bool t2_in_future = time_after(t2, now);
+
+ /* Ensure that we never return a time in the past. */
+ t1 = t1_in_future ? t1 : now;
+ t2 = t2_in_future ? t2 : now;
- inc_ap_bio(device);
- __drbd_make_request(device, bio);
+ if (!t1_in_future)
+ return t2;
+
+ if (!t2_in_future)
+ return t1;
+
+ return time_after(t1, t2) ? t2 : t1;
}
static bool net_timeout_reached(struct drbd_request *net_req,
- struct drbd_connection *connection,
+ struct drbd_peer_device *peer_device,
unsigned long now, unsigned long ent,
unsigned int ko_count, unsigned int timeout)
{
- struct drbd_device *device = net_req->device;
+ struct drbd_connection *connection = peer_device->connection;
+ int peer_node_id = peer_device->node_id;
+ unsigned long pre_send_jif = net_req->pre_send_jif[peer_node_id];
- if (!time_after(now, net_req->pre_send_jif + ent))
+ if (!time_after(now, pre_send_jif + ent))
return false;
if (time_in_range(now, connection->last_reconnect_jif, connection->last_reconnect_jif + ent))
return false;
- if (net_req->rq_state & RQ_NET_PENDING) {
- drbd_warn(device, "Remote failed to finish a request within %ums > ko-count (%u) * timeout (%u * 0.1s)\n",
- jiffies_to_msecs(now - net_req->pre_send_jif), ko_count, timeout);
+ if (net_req->net_rq_state[peer_node_id] & RQ_NET_PENDING) {
+ drbd_warn(peer_device, "Remote failed to finish a request within %ums > ko-count (%u) * timeout (%u * 0.1s)\n",
+ jiffies_to_msecs(now - pre_send_jif), ko_count, timeout);
return true;
}
@@ -1650,9 +2835,12 @@ static bool net_timeout_reached(struct drbd_request *net_req,
* Check if we sent the barrier already. We should not blame the peer
* for being unresponsive, if we did not even ask it yet. */
if (net_req->epoch == connection->send.current_epoch_nr) {
- drbd_warn(device,
- "We did not send a P_BARRIER for %ums > ko-count (%u) * timeout (%u * 0.1s); drbd kernel thread blocked?\n",
- jiffies_to_msecs(now - net_req->pre_send_jif), ko_count, timeout);
+ /* It is OK for the barrier to be delayed for a long time for a
+ * suspended request. */
+ if (!(net_req->local_rq_state & RQ_COMPLETION_SUSP))
+ drbd_warn(peer_device,
+ "We did not send a P_BARRIER for %ums > ko-count (%u) * timeout (%u * 0.1s); drbd kernel thread blocked?\n",
+ jiffies_to_msecs(now - pre_send_jif), ko_count, timeout);
return false;
}
@@ -1673,7 +2861,7 @@ static bool net_timeout_reached(struct drbd_request *net_req,
* barrier packet is relevant enough.
*/
if (time_after(now, connection->send.last_sent_barrier_jif + ent)) {
- drbd_warn(device, "Remote failed to answer a P_BARRIER (sent at %lu jif; now=%lu jif) within %ums > ko-count (%u) * timeout (%u * 0.1s)\n",
+ drbd_warn(peer_device, "Remote failed to answer a P_BARRIER (sent at %lu jif; now=%lu jif) within %ums > ko-count (%u) * timeout (%u * 0.1s)\n",
connection->send.last_sent_barrier_jif, now,
jiffies_to_msecs(now - connection->send.last_sent_barrier_jif), ko_count, timeout);
return true;
@@ -1690,7 +2878,7 @@ static bool net_timeout_reached(struct drbd_request *net_req,
* - the connection was established (resp. disk was attached)
* for longer than the timeout already.
* Note that for 32bit jiffies and very stable connections/disks,
- * we may have a wrap around, which is catched by
+ * we may have a wrap around, which is caught by
* !time_in_range(now, last_..._jif, last_..._jif + timeout).
*
* Side effect: once per 32bit wrap-around interval, which means every
@@ -1700,92 +2888,200 @@ static bool net_timeout_reached(struct drbd_request *net_req,
void request_timer_fn(struct timer_list *t)
{
- struct drbd_device *device = timer_container_of(device, t,
- request_timer);
- struct drbd_connection *connection = first_peer_device(device)->connection;
- struct drbd_request *req_read, *req_write, *req_peer; /* oldest request */
- struct net_conf *nc;
- unsigned long oldest_submit_jif;
- unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */
- unsigned long now;
- unsigned int ko_count = 0, timeout = 0;
+ struct drbd_device *device = timer_container_of(device, t, request_timer);
+ struct drbd_resource *resource = device->resource;
+ struct drbd_connection *connection;
+ struct drbd_request *req_read, *req_write;
+ unsigned long oldest_submit_jif, irq_flags;
+ unsigned long disk_timeout = 0, effective_timeout = 0, now = jiffies, next_trigger_time = now;
+ bool restart_timer = false, io_error = false;
+ unsigned long timeout_peers = 0;
+ int node_id;
rcu_read_lock();
- nc = rcu_dereference(connection->net_conf);
- if (nc && device->state.conn >= C_WF_REPORT_PARAMS) {
- ko_count = nc->ko_count;
- timeout = nc->timeout;
- }
-
if (get_ldev(device)) { /* implicit state.disk >= D_INCONSISTENT */
- dt = rcu_dereference(device->ldev->disk_conf)->disk_timeout * HZ / 10;
+ disk_timeout = rcu_dereference(device->ldev->disk_conf)->disk_timeout * HZ / 10;
put_ldev(device);
}
rcu_read_unlock();
+ /* FIXME right now, this basically does a full transfer log walk *every time* */
+ read_lock_irq(&resource->state_rwlock);
+ if (disk_timeout) {
+ unsigned long write_pre_submit_jif = 0, read_pre_submit_jif = 0;
+
+ spin_lock(&device->pending_completion_lock); /* local irq already disabled */
+ req_read = list_first_entry_or_null(&device->pending_completion[0], struct drbd_request, req_pending_local);
+ req_write = list_first_entry_or_null(&device->pending_completion[1], struct drbd_request, req_pending_local);
+ spin_unlock(&device->pending_completion_lock);
+
+ if (req_write)
+ write_pre_submit_jif = req_write->pre_submit_jif;
+ if (req_read)
+ read_pre_submit_jif = req_read->pre_submit_jif;
+ oldest_submit_jif =
+ (req_write && req_read)
+ ? (time_before(write_pre_submit_jif, read_pre_submit_jif)
+ ? write_pre_submit_jif : read_pre_submit_jif)
+ : req_write ? write_pre_submit_jif
+ : req_read ? read_pre_submit_jif : now;
+
+ if (device->disk_state[NOW] > D_FAILED) {
+ effective_timeout = min_not_zero(effective_timeout, disk_timeout);
+ next_trigger_time = time_min_in_future(now,
+ next_trigger_time, oldest_submit_jif + disk_timeout);
+ restart_timer = true;
+ }
- ent = timeout * HZ/10 * ko_count;
- et = min_not_zero(dt, ent);
+ if (time_after(now, oldest_submit_jif + disk_timeout) &&
+ !time_in_range(now, device->last_reattach_jif, device->last_reattach_jif + disk_timeout))
+ io_error = true;
+ }
+ for_each_connection(connection, resource) {
+ struct drbd_peer_device *peer_device = conn_peer_device(connection, device->vnr);
+ struct net_conf *nc;
+ struct drbd_request *req;
+ unsigned long effective_net_timeout = 0;
+ unsigned long pre_send_jif = now;
+ unsigned int ko_count = 0, timeout = 0;
- if (!et)
- return; /* Recurring timer stopped */
+ rcu_read_lock();
+ nc = rcu_dereference(connection->transport.net_conf);
+ if (nc) {
+ /* effective timeout = ko_count * timeout */
+ if (connection->cstate[NOW] == C_CONNECTED) {
+ ko_count = nc->ko_count;
+ timeout = nc->timeout;
+ effective_net_timeout = timeout * HZ/10 * ko_count;
+ }
+ }
+ rcu_read_unlock();
- now = jiffies;
- nt = now + et;
+ /* This connection is not established,
+ * or has the effective timeout disabled.
+ * no timer restart needed (for this connection). */
+ if (!effective_net_timeout)
+ continue;
- spin_lock_irq(&device->resource->req_lock);
- req_read = list_first_entry_or_null(&device->pending_completion[0], struct drbd_request, req_pending_local);
- req_write = list_first_entry_or_null(&device->pending_completion[1], struct drbd_request, req_pending_local);
+ /* maybe the oldest request waiting for the peer is in fact still
+ * blocking in tcp sendmsg. That's ok, though, that's handled via the
+ * socket send timeout, requesting a ping, and bumping ko-count in
+ * drbd_stream_send_timed_out().
+ */
- /* maybe the oldest request waiting for the peer is in fact still
- * blocking in tcp sendmsg. That's ok, though, that's handled via the
- * socket send timeout, requesting a ping, and bumping ko-count in
- * we_should_drop_the_connection().
- */
+ /* check the oldest request we did successfully sent,
+ * but which is still waiting for an ACK. */
+ req = connection->req_ack_pending;
+
+ /* If we don't have such request (e.g. protocol A)
+ * check the oldest request which is still waiting on its epoch
+ * closing barrier ack. */
+ if (!req) {
+ req = connection->req_not_net_done;
+
+ /* If we did not send the request yet then pre_send_jif
+ * is not set. Treat this the same as when there are no
+ * requests pending. */
+ if (req && !(req->net_rq_state[connection->peer_node_id] & RQ_NET_SENT))
+ req = NULL;
+ }
- /* check the oldest request we did successfully sent,
- * but which is still waiting for an ACK. */
- req_peer = connection->req_ack_pending;
+ if (req)
+ pre_send_jif = req->pre_send_jif[connection->peer_node_id];
+
+ effective_timeout = min_not_zero(effective_timeout, effective_net_timeout);
+ next_trigger_time = time_min_in_future(now,
+ next_trigger_time, pre_send_jif + effective_net_timeout);
+ /* Restart the timer, even if there are no pending requests at all.
+ * We currently do not re-arm from the submit path. */
+ restart_timer = true;
+
+ /* We have one timer per "device",
+ * but the "oldest" request is per "connection".
+ * Evaluate the oldest peer request only in one timer! */
+ if (req == NULL || req->device != device)
+ continue;
+
+ if (net_timeout_reached(req, peer_device, now, effective_net_timeout, ko_count, timeout)) {
+ dynamic_drbd_dbg(peer_device, "Request at %llus+%u timed out\n",
+ (unsigned long long) req->i.sector,
+ req->i.size);
+ timeout_peers |= NODE_MASK(connection->peer_node_id);
+ }
+ }
+ read_unlock_irq(&resource->state_rwlock);
- /* if we don't have such request (e.g. protocoll A)
- * check the oldest requests which is still waiting on its epoch
- * closing barrier ack. */
- if (!req_peer)
- req_peer = connection->req_not_net_done;
+ if (io_error) {
+ drbd_warn(device, "Local backing device failed to meet the disk-timeout\n");
+ drbd_handle_io_error(device, DRBD_FORCE_DETACH);
+ }
- /* evaluate the oldest peer request only in one timer! */
- if (req_peer && req_peer->device != device)
- req_peer = NULL;
+ BUILD_BUG_ON(sizeof(timeout_peers) * 8 < DRBD_NODE_ID_MAX);
+ for_each_set_bit(node_id, &timeout_peers, DRBD_NODE_ID_MAX) {
+ connection = drbd_get_connection_by_node_id(resource, node_id);
+ if (!connection)
+ continue;
+ begin_state_change(resource, &irq_flags, CS_VERBOSE | CS_HARD);
+ __change_cstate(connection, C_TIMEOUT);
+ end_state_change(resource, &irq_flags, "timeout");
+ kref_put(&connection->kref, drbd_destroy_connection);
+ }
- /* do we have something to evaluate? */
- if (req_peer == NULL && req_write == NULL && req_read == NULL)
- goto out;
+ if (restart_timer) {
+ next_trigger_time = time_min_in_future(now, next_trigger_time, now + effective_timeout);
+ mod_timer(&device->request_timer, next_trigger_time);
+ }
+}
- oldest_submit_jif =
- (req_write && req_read)
- ? ( time_before(req_write->pre_submit_jif, req_read->pre_submit_jif)
- ? req_write->pre_submit_jif : req_read->pre_submit_jif )
- : req_write ? req_write->pre_submit_jif
- : req_read ? req_read->pre_submit_jif : now;
+/**
+ * drbd_handle_io_error_: Handle the on_io_error setting, should be called from all io completion handlers
+ * @device: DRBD device.
+ * @df: Detach flags indicating the kind of IO that failed.
+ * @where: Calling function name.
+ */
+void drbd_handle_io_error_(struct drbd_device *device,
+ enum drbd_force_detach_flags df, const char *where)
+{
+ unsigned long flags;
+ enum drbd_io_error_p ep;
- if (ent && req_peer && net_timeout_reached(req_peer, connection, now, ent, ko_count, timeout))
- _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_VERBOSE | CS_HARD);
+ write_lock_irqsave(&device->resource->state_rwlock, flags);
- if (dt && oldest_submit_jif != now &&
- time_after(now, oldest_submit_jif + dt) &&
- !time_in_range(now, device->last_reattach_jif, device->last_reattach_jif + dt)) {
- drbd_warn(device, "Local backing device failed to meet the disk-timeout\n");
- __drbd_chk_io_error(device, DRBD_FORCE_DETACH);
+ rcu_read_lock();
+ /* ldev_safe: called from endio handlers where ldev is still held */
+ ep = rcu_dereference(device->ldev->disk_conf)->on_io_error;
+ rcu_read_unlock();
+ switch (ep) {
+ case EP_PASS_ON: /* FIXME would this be better named "Ignore"? */
+ if (df == DRBD_READ_ERROR || df == DRBD_WRITE_ERROR) {
+ if (drbd_device_ratelimit(device, BACKEND))
+ drbd_err(device, "Local IO failed in %s.\n", where);
+ if (device->disk_state[NOW] > D_INCONSISTENT) {
+ begin_state_change_locked(device->resource, CS_HARD);
+ __change_disk_state(device, D_INCONSISTENT);
+ end_state_change_locked(device->resource, "local-io-error");
+ }
+ break;
+ }
+ fallthrough; /* for DRBD_META_IO_ERROR or DRBD_FORCE_DETACH */
+ case EP_DETACH:
+ case EP_CALL_HELPER:
+ /* Force-detach is not really an IO error, but rather a
+ * desperate measure to try to deal with a completely
+ * unresponsive lower level IO stack.
+ * Still it should be treated as a WRITE error.
+ */
+ if (df == DRBD_FORCE_DETACH)
+ set_bit(FORCE_DETACH, &device->flags);
+ if (device->disk_state[NOW] > D_FAILED) {
+ begin_state_change_locked(device->resource, CS_HARD);
+ __change_disk_state(device, D_FAILED);
+ end_state_change_locked(device->resource, "local-io-error");
+ drbd_err(device,
+ "Local IO failed in %s. Detaching...\n", where);
+ }
+ break;
}
- /* Reschedule timer for the nearest not already expired timeout.
- * Fallback to now + min(effective network timeout, disk timeout). */
- ent = (ent && req_peer && time_before(now, req_peer->pre_send_jif + ent))
- ? req_peer->pre_send_jif + ent : now + et;
- dt = (dt && oldest_submit_jif != now && time_before(now, oldest_submit_jif + dt))
- ? oldest_submit_jif + dt : now + et;
- nt = time_before(ent, dt) ? ent : dt;
-out:
- spin_unlock_irq(&device->resource->req_lock);
- mod_timer(&device->request_timer, nt);
+ write_unlock_irqrestore(&device->resource->state_rwlock, flags);
}
--
2.53.0