[PATCH 09/10] staging: lustre: Dynamic LNet Configuration (DLC) dynamic routing

From: James Simmons
Date: Mon Feb 15 2016 - 10:26:33 EST


From: Amir Shehata <amir.shehata@xxxxxxxxx>

This is the second patch of a set of patches that enables DLC.

This patch adds the following features to LNET. Currently these
features are not driven by user space.
- Enabling Routing on Demand. The default number of router
buffers are allocated.
- Disable Routing on demand. Unused router buffers are freed and
used router buffers are freed when they are no longer in use.
The following time routing is enabled the default router buffer
values are used. It has been decided that remembering the
user set router buffer values should be remembered and re-set
by user space scripts.
- Increase the number of router buffers on demand, by allocating
new ones.
- Decrease the number of router buffers. Exccess buffers are freed
if they are not in use. Otherwise they are freed once they are
no longer in use.

Signed-off-by: Amir Shehata <amir.shehata@xxxxxxxxx>
Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-2456
Change-Id: Id07d4ad424d8f5ba72475d4149380afe2ac54e77
Reviewed-on: http://review.whamcloud.com/9831
Reviewed-by: James Simmons <uja.ornl@xxxxxxxxx>
Reviewed-by: Doug Oucharek <doug.s.oucharek@xxxxxxxxx>
Reviewed-by: Liang Zhen <liang.zhen@xxxxxxxxx>
Reviewed-by: Oleg Drokin <oleg.drokin@xxxxxxxxx>
---
.../staging/lustre/include/linux/lnet/lib-lnet.h | 8 +-
.../staging/lustre/include/linux/lnet/lib-types.h | 8 +-
drivers/staging/lustre/lnet/lnet/api-ni.c | 4 +-
drivers/staging/lustre/lnet/lnet/lib-move.c | 89 +++++--
drivers/staging/lustre/lnet/lnet/router.c | 277 +++++++++++++++-----
5 files changed, 304 insertions(+), 82 deletions(-)

diff --git a/drivers/staging/lustre/include/linux/lnet/lib-lnet.h b/drivers/staging/lustre/include/linux/lnet/lib-lnet.h
index 77d8e37..3a1cf61 100644
--- a/drivers/staging/lustre/include/linux/lnet/lib-lnet.h
+++ b/drivers/staging/lustre/include/linux/lnet/lib-lnet.h
@@ -461,7 +461,11 @@ int lnet_get_route(int idx, __u32 *net, __u32 *hops,
void lnet_router_debugfs_init(void);
void lnet_router_debugfs_fini(void);
int lnet_rtrpools_alloc(int im_a_router);
-void lnet_rtrpools_free(void);
+void lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages);
+int lnet_rtrpools_adjust(int tiny, int small, int large);
+int lnet_rtrpools_enable(void);
+void lnet_rtrpools_disable(void);
+void lnet_rtrpools_free(int keep_pools);
lnet_remotenet_t *lnet_find_net_locked(__u32 net);

int lnet_islocalnid(lnet_nid_t nid);
@@ -481,6 +485,8 @@ void lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
int lnet_send(lnet_nid_t nid, lnet_msg_t *msg, lnet_nid_t rtr_nid);
void lnet_return_tx_credits_locked(lnet_msg_t *msg);
void lnet_return_rx_credits_locked(lnet_msg_t *msg);
+void lnet_schedule_blocked_locked(lnet_rtrbufpool_t *rbp);
+void lnet_drop_routed_msgs_locked(struct list_head *list, int cpt);

/* portals functions */
/* portals attributes */
diff --git a/drivers/staging/lustre/include/linux/lnet/lib-types.h b/drivers/staging/lustre/include/linux/lnet/lib-types.h
index be650d4..b0ba9d8 100644
--- a/drivers/staging/lustre/include/linux/lnet/lib-types.h
+++ b/drivers/staging/lustre/include/linux/lnet/lib-types.h
@@ -285,6 +285,7 @@ typedef struct lnet_ni {
#define LNET_PING_FEAT_INVAL (0) /* no feature */
#define LNET_PING_FEAT_BASE (1 << 0) /* just a ping */
#define LNET_PING_FEAT_NI_STATUS (1 << 1) /* return NI status */
+#define LNET_PING_FEAT_RTE_DISABLED (1 << 2) /* Routing enabled */

#define LNET_PING_FEAT_MASK (LNET_PING_FEAT_BASE | \
LNET_PING_FEAT_NI_STATUS)
@@ -410,7 +411,12 @@ typedef struct {

#define LNET_PEER_HASHSIZE 503 /* prime! */

-#define LNET_NRBPOOLS 3 /* # different router buffer pools */
+#define LNET_TINY_BUF_IDX 0
+#define LNET_SMALL_BUF_IDX 1
+#define LNET_LARGE_BUF_IDX 2
+
+/* # different router buffer pools */
+#define LNET_NRBPOOLS (LNET_LARGE_BUF_IDX + 1)

enum {
/* Didn't match anything */
diff --git a/drivers/staging/lustre/lnet/lnet/api-ni.c b/drivers/staging/lustre/lnet/lnet/api-ni.c
index cd68ca7..06046b2 100644
--- a/drivers/staging/lustre/lnet/lnet/api-ni.c
+++ b/drivers/staging/lustre/lnet/lnet/api-ni.c
@@ -638,7 +638,7 @@ lnet_unprepare(void)

lnet_msg_containers_destroy();
lnet_peer_tables_destroy();
- lnet_rtrpools_free();
+ lnet_rtrpools_free(0);

if (the_lnet.ln_counters) {
cfs_percpt_free(the_lnet.ln_counters);
@@ -1501,6 +1501,8 @@ lnet_create_ping_info(void)
pinfo->pi_pid = the_lnet.ln_pid;
pinfo->pi_magic = LNET_PROTO_PING_MAGIC;
pinfo->pi_features = LNET_PING_FEAT_NI_STATUS;
+ if (!the_lnet.ln_routing)
+ pinfo->pi_features |= LNET_PING_FEAT_RTE_DISABLED;

for (i = 0; i < n; i++) {
lnet_ni_status_t *ns = &pinfo->pi_ni[i];
diff --git a/drivers/staging/lustre/lnet/lnet/lib-move.c b/drivers/staging/lustre/lnet/lnet/lib-move.c
index cc8c2c5..f2b1116 100644
--- a/drivers/staging/lustre/lnet/lnet/lib-move.c
+++ b/drivers/staging/lustre/lnet/lnet/lib-move.c
@@ -945,9 +945,6 @@ lnet_post_routed_recv_locked(lnet_msg_t *msg, int do_recv)
rbp = lnet_msg2bufpool(msg);

if (!msg->msg_rtrcredit) {
- LASSERT((rbp->rbp_credits < 0) ==
- !list_empty(&rbp->rbp_msgs));
-
msg->msg_rtrcredit = 1;
rbp->rbp_credits--;
if (rbp->rbp_credits < rbp->rbp_mincredits)
@@ -1039,6 +1036,43 @@ lnet_return_tx_credits_locked(lnet_msg_t *msg)
}

void
+lnet_schedule_blocked_locked(lnet_rtrbufpool_t *rbp)
+{
+ lnet_msg_t *msg;
+
+ if (list_empty(&rbp->rbp_msgs))
+ return;
+ msg = list_entry(rbp->rbp_msgs.next,
+ lnet_msg_t, msg_list);
+ list_del(&msg->msg_list);
+
+ (void)lnet_post_routed_recv_locked(msg, 1);
+}
+
+void
+lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
+{
+ struct list_head drop;
+ lnet_msg_t *msg;
+ lnet_msg_t *tmp;
+
+ INIT_LIST_HEAD(&drop);
+
+ list_splice_init(list, &drop);
+
+ lnet_net_unlock(cpt);
+
+ list_for_each_entry_safe(msg, tmp, &drop, msg_list) {
+ lnet_ni_recv(msg->msg_rxpeer->lp_ni, msg->msg_private, NULL,
+ 0, 0, 0, msg->msg_hdr.payload_length);
+ list_del_init(&msg->msg_list);
+ lnet_finalize(NULL, msg, -ECANCELED);
+ }
+
+ lnet_net_lock(cpt);
+}
+
+void
lnet_return_rx_credits_locked(lnet_msg_t *msg)
{
lnet_peer_t *rxpeer = msg->msg_rxpeer;
@@ -1058,27 +1092,41 @@ lnet_return_rx_credits_locked(lnet_msg_t *msg)

rb = list_entry(msg->msg_kiov, lnet_rtrbuf_t, rb_kiov[0]);
rbp = rb->rb_pool;
- LASSERT(rbp == lnet_msg2bufpool(msg));

msg->msg_kiov = NULL;
msg->msg_rtrcredit = 0;

- LASSERT((rbp->rbp_credits < 0) ==
- !list_empty(&rbp->rbp_msgs));
+ LASSERT(rbp == lnet_msg2bufpool(msg));
+
LASSERT((rbp->rbp_credits > 0) ==
!list_empty(&rbp->rbp_bufs));

- list_add(&rb->rb_list, &rbp->rbp_bufs);
- rbp->rbp_credits++;
- if (rbp->rbp_credits <= 0) {
- msg2 = list_entry(rbp->rbp_msgs.next,
- lnet_msg_t, msg_list);
- list_del(&msg2->msg_list);
+ /*
+ * If routing is now turned off, we just drop this buffer and
+ * don't bother trying to return credits.
+ */
+ if (!the_lnet.ln_routing) {
+ lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
+ goto routing_off;
+ }

- (void) lnet_post_routed_recv_locked(msg2, 1);
+ /*
+ * It is possible that a user has lowered the desired number of
+ * buffers in this pool. Make sure we never put back
+ * more buffers than the stated number.
+ */
+ if (rbp->rbp_credits >= rbp->rbp_nbuffers) {
+ /* Discard this buffer so we don't have too many. */
+ lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
+ } else {
+ list_add(&rb->rb_list, &rbp->rbp_bufs);
+ rbp->rbp_credits++;
+ if (rbp->rbp_credits <= 0)
+ lnet_schedule_blocked_locked(rbp);
}
}

+routing_off:
if (msg->msg_peerrtrcredit) {
/* give back peer router credits */
msg->msg_peerrtrcredit = 0;
@@ -1087,7 +1135,14 @@ lnet_return_rx_credits_locked(lnet_msg_t *msg)
!list_empty(&rxpeer->lp_rtrq));

rxpeer->lp_rtrcredits++;
- if (rxpeer->lp_rtrcredits <= 0) {
+ /*
+ * drop all messages which are queued to be routed on that
+ * peer.
+ */
+ if (!the_lnet.ln_routing) {
+ lnet_drop_routed_msgs_locked(&rxpeer->lp_rtrq,
+ msg->msg_rx_cpt);
+ } else if (rxpeer->lp_rtrcredits <= 0) {
msg2 = list_entry(rxpeer->lp_rtrq.next,
lnet_msg_t, msg_list);
list_del(&msg2->msg_list);
@@ -1646,6 +1701,9 @@ lnet_parse_forward_locked(lnet_ni_t *ni, lnet_msg_t *msg)
{
int rc = 0;

+ if (!the_lnet.ln_routing)
+ return -ECANCELED;
+
if (msg->msg_rxpeer->lp_rtrcredits <= 0 ||
lnet_msg2bufpool(msg)->rbp_credits <= 0) {
if (!ni->ni_lnd->lnd_eager_recv) {
@@ -1799,9 +1857,8 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,

if (the_lnet.ln_routing &&
ni->ni_last_alive != ktime_get_real_seconds()) {
- lnet_ni_lock(ni);
-
/* NB: so far here is the only place to set NI status to "up */
+ lnet_ni_lock(ni);
ni->ni_last_alive = ktime_get_real_seconds();
if (ni->ni_status &&
ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
diff --git a/drivers/staging/lustre/lnet/lnet/router.c b/drivers/staging/lustre/lnet/lnet/router.c
index 67566ca..e3a6611 100644
--- a/drivers/staging/lustre/lnet/lnet/router.c
+++ b/drivers/staging/lustre/lnet/lnet/router.c
@@ -28,8 +28,11 @@
#define LNET_NRB_TINY (LNET_NRB_TINY_MIN * 4)
#define LNET_NRB_SMALL_MIN 4096 /* min value for each CPT */
#define LNET_NRB_SMALL (LNET_NRB_SMALL_MIN * 4)
+#define LNET_NRB_SMALL_PAGES 1
#define LNET_NRB_LARGE_MIN 256 /* min value for each CPT */
#define LNET_NRB_LARGE (LNET_NRB_LARGE_MIN * 4)
+#define LNET_NRB_LARGE_PAGES ((LNET_MTU + PAGE_CACHE_SIZE - 1) >> \
+ PAGE_CACHE_SHIFT)

static char *forwarding = "";
module_param(forwarding, charp, 0444);
@@ -570,7 +573,8 @@ lnet_get_route(int idx, __u32 *net, __u32 *hops,
*hops = route->lr_hops;
*priority = route->lr_priority;
*gateway = route->lr_gateway->lp_nid;
- *alive = route->lr_gateway->lp_alive;
+ *alive = route->lr_gateway->lp_alive &&
+ !route->lr_downis;
lnet_net_unlock(cpt);
return 0;
}
@@ -608,7 +612,7 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
{
lnet_ping_info_t *info = rcd->rcd_pinginfo;
struct lnet_peer *gw = rcd->rcd_gateway;
- lnet_route_t *rtr;
+ lnet_route_t *rte;

if (!gw->lp_alive)
return;
@@ -634,11 +638,16 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
if (!(gw->lp_ping_feats & LNET_PING_FEAT_NI_STATUS))
return; /* can't carry NI status info */

- list_for_each_entry(rtr, &gw->lp_routes, lr_gwlist) {
+ list_for_each_entry(rte, &gw->lp_routes, lr_gwlist) {
int down = 0;
int up = 0;
int i;

+ if (gw->lp_ping_feats & LNET_PING_FEAT_RTE_DISABLED) {
+ rte->lr_downis = 1;
+ continue;
+ }
+
for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
lnet_ni_status_t *stat = &info->pi_ni[i];
lnet_nid_t nid = stat->ns_nid;
@@ -659,7 +668,7 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
}

if (stat->ns_status == LNET_NI_STATUS_UP) {
- if (LNET_NIDNET(nid) == rtr->lr_net) {
+ if (LNET_NIDNET(nid) == rte->lr_net) {
up = 1;
break;
}
@@ -673,10 +682,10 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
}

if (up) { /* ignore downed NIs if NI for dest network is up */
- rtr->lr_downis = 0;
+ rte->lr_downis = 0;
continue;
}
- rtr->lr_downis = down;
+ rte->lr_downis = down;
}
}

@@ -1226,7 +1235,7 @@ rescan:
return 0;
}

-static void
+void
lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages)
{
int sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
@@ -1273,67 +1282,103 @@ lnet_new_rtrbuf(lnet_rtrbufpool_t *rbp, int cpt)
}

static void
-lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp)
+lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp, int cpt)
{
int npages = rbp->rbp_npages;
- int nbuffers = 0;
+ struct list_head tmp;
lnet_rtrbuf_t *rb;

if (!rbp->rbp_nbuffers) /* not initialized or already freed */
return;

- LASSERT(list_empty(&rbp->rbp_msgs));
- LASSERT(rbp->rbp_credits == rbp->rbp_nbuffers);
+ INIT_LIST_HEAD(&tmp);

- while (!list_empty(&rbp->rbp_bufs)) {
- LASSERT(rbp->rbp_credits > 0);
+ lnet_net_lock(cpt);
+ lnet_drop_routed_msgs_locked(&rbp->rbp_msgs, cpt);
+ list_splice_init(&rbp->rbp_bufs, &tmp);
+ rbp->rbp_nbuffers = 0;
+ rbp->rbp_credits = 0;
+ rbp->rbp_mincredits = 0;
+ lnet_net_unlock(cpt);

- rb = list_entry(rbp->rbp_bufs.next,
- lnet_rtrbuf_t, rb_list);
+ /* Free buffers on the free list. */
+ while (!list_empty(&tmp)) {
+ rb = list_entry(tmp.next, lnet_rtrbuf_t, rb_list);
list_del(&rb->rb_list);
lnet_destroy_rtrbuf(rb, npages);
- nbuffers++;
}
-
- LASSERT(rbp->rbp_nbuffers == nbuffers);
- LASSERT(rbp->rbp_credits == nbuffers);
-
- rbp->rbp_nbuffers = 0;
- rbp->rbp_credits = 0;
}

static int
-lnet_rtrpool_alloc_bufs(lnet_rtrbufpool_t *rbp, int nbufs, int cpt)
+lnet_rtrpool_adjust_bufs(lnet_rtrbufpool_t *rbp, int nbufs, int cpt)
{
+ struct list_head rb_list;
lnet_rtrbuf_t *rb;
- int i;
+ int num_rb;
+ int num_buffers = 0;
+ int npages = rbp->rbp_npages;

- if (rbp->rbp_nbuffers) {
- LASSERT(rbp->rbp_nbuffers == nbufs);
+ /*
+ * If we are called for less buffers than already in the pool, we
+ * just lower the nbuffers number and excess buffers will be
+ * thrown away as they are returned to the free list. Credits
+ * then get adjusted as well.
+ */
+ if (nbufs <= rbp->rbp_nbuffers) {
+ lnet_net_lock(cpt);
+ rbp->rbp_nbuffers = nbufs;
+ lnet_net_unlock(cpt);
return 0;
}

- for (i = 0; i < nbufs; i++) {
- rb = lnet_new_rtrbuf(rbp, cpt);
+ INIT_LIST_HEAD(&rb_list);
+
+ /*
+ * allocate the buffers on a local list first. If all buffers are
+ * allocated successfully then join this list to the rbp buffer
+ * list. If not then free all allocated buffers.
+ */
+ num_rb = rbp->rbp_nbuffers;

+ while (num_rb < nbufs) {
+ rb = lnet_new_rtrbuf(rbp, cpt);
if (!rb) {
- CERROR("Failed to allocate %d router bufs of %d pages\n",
- nbufs, rbp->rbp_npages);
- return -ENOMEM;
+ CERROR("Failed to allocate %d route bufs of %d pages\n",
+ nbufs, npages);
+ goto failed;
}

- rbp->rbp_nbuffers++;
- rbp->rbp_credits++;
- rbp->rbp_mincredits++;
- list_add(&rb->rb_list, &rbp->rbp_bufs);
-
- /* No allocation "under fire" */
- /* Otherwise we'd need code to schedule blocked msgs etc */
- LASSERT(!the_lnet.ln_routing);
+ list_add(&rb->rb_list, &rb_list);
+ num_buffers++;
+ num_rb++;
}

- LASSERT(rbp->rbp_credits == nbufs);
+ lnet_net_lock(cpt);
+
+ list_splice_tail(&rb_list, &rbp->rbp_bufs);
+ rbp->rbp_nbuffers += num_buffers;
+ rbp->rbp_credits += num_buffers;
+ rbp->rbp_mincredits = rbp->rbp_credits;
+ /*
+ * We need to schedule blocked msg using the newly
+ * added buffers.
+ */
+ while (!list_empty(&rbp->rbp_bufs) &&
+ !list_empty(&rbp->rbp_msgs))
+ lnet_schedule_blocked_locked(rbp);
+
+ lnet_net_unlock(cpt);
+
return 0;
+
+failed:
+ while (!list_empty(&rb_list)) {
+ rb = list_entry(rb_list.next, lnet_rtrbuf_t, rb_list);
+ list_del(&rb->rb_list);
+ lnet_destroy_rtrbuf(rb, npages);
+ }
+
+ return -ENOMEM;
}

static void
@@ -1348,7 +1393,7 @@ lnet_rtrpool_init(lnet_rtrbufpool_t *rbp, int npages)
}

void
-lnet_rtrpools_free(void)
+lnet_rtrpools_free(int keep_pools)
{
lnet_rtrbufpool_t *rtrp;
int i;
@@ -1357,17 +1402,19 @@ lnet_rtrpools_free(void)
return;

cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
- lnet_rtrpool_free_bufs(&rtrp[0]);
- lnet_rtrpool_free_bufs(&rtrp[1]);
- lnet_rtrpool_free_bufs(&rtrp[2]);
+ lnet_rtrpool_free_bufs(&rtrp[LNET_TINY_BUF_IDX], i);
+ lnet_rtrpool_free_bufs(&rtrp[LNET_SMALL_BUF_IDX], i);
+ lnet_rtrpool_free_bufs(&rtrp[LNET_LARGE_BUF_IDX], i);
}

- cfs_percpt_free(the_lnet.ln_rtrpools);
- the_lnet.ln_rtrpools = NULL;
+ if (!keep_pools) {
+ cfs_percpt_free(the_lnet.ln_rtrpools);
+ the_lnet.ln_rtrpools = NULL;
+ }
}

static int
-lnet_nrb_tiny_calculate(int npages)
+lnet_nrb_tiny_calculate(void)
{
int nrbs = LNET_NRB_TINY;

@@ -1386,7 +1433,7 @@ lnet_nrb_tiny_calculate(int npages)
}

static int
-lnet_nrb_small_calculate(int npages)
+lnet_nrb_small_calculate(void)
{
int nrbs = LNET_NRB_SMALL;

@@ -1405,7 +1452,7 @@ lnet_nrb_small_calculate(int npages)
}

static int
-lnet_nrb_large_calculate(int npages)
+lnet_nrb_large_calculate(void)
{
int nrbs = LNET_NRB_LARGE;

@@ -1427,16 +1474,12 @@ int
lnet_rtrpools_alloc(int im_a_router)
{
lnet_rtrbufpool_t *rtrp;
- int large_pages;
- int small_pages = 1;
int nrb_tiny;
int nrb_small;
int nrb_large;
int rc;
int i;

- large_pages = (LNET_MTU + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
-
if (!strcmp(forwarding, "")) {
/* not set either way */
if (!im_a_router)
@@ -1451,15 +1494,15 @@ lnet_rtrpools_alloc(int im_a_router)
return -EINVAL;
}

- nrb_tiny = lnet_nrb_tiny_calculate(0);
+ nrb_tiny = lnet_nrb_tiny_calculate();
if (nrb_tiny < 0)
return -EINVAL;

- nrb_small = lnet_nrb_small_calculate(small_pages);
+ nrb_small = lnet_nrb_small_calculate();
if (nrb_small < 0)
return -EINVAL;

- nrb_large = lnet_nrb_large_calculate(large_pages);
+ nrb_large = lnet_nrb_large_calculate();
if (nrb_large < 0)
return -EINVAL;

@@ -1473,18 +1516,23 @@ lnet_rtrpools_alloc(int im_a_router)
}

cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
- lnet_rtrpool_init(&rtrp[0], 0);
- rc = lnet_rtrpool_alloc_bufs(&rtrp[0], nrb_tiny, i);
+ lnet_rtrpool_init(&rtrp[LNET_TINY_BUF_IDX], 0);
+ rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_TINY_BUF_IDX],
+ nrb_tiny, i);
if (rc)
goto failed;

- lnet_rtrpool_init(&rtrp[1], small_pages);
- rc = lnet_rtrpool_alloc_bufs(&rtrp[1], nrb_small, i);
+ lnet_rtrpool_init(&rtrp[LNET_SMALL_BUF_IDX],
+ LNET_NRB_SMALL_PAGES);
+ rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_SMALL_BUF_IDX],
+ nrb_small, i);
if (rc)
goto failed;

- lnet_rtrpool_init(&rtrp[2], large_pages);
- rc = lnet_rtrpool_alloc_bufs(&rtrp[2], nrb_large, i);
+ lnet_rtrpool_init(&rtrp[LNET_LARGE_BUF_IDX],
+ LNET_NRB_LARGE_PAGES);
+ rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_LARGE_BUF_IDX],
+ nrb_large, i);
if (rc)
goto failed;
}
@@ -1496,11 +1544,114 @@ lnet_rtrpools_alloc(int im_a_router)
return 0;

failed:
- lnet_rtrpools_free();
+ lnet_rtrpools_free(0);
return rc;
}

int
+lnet_rtrpools_adjust(int tiny, int small, int large)
+{
+ int nrb = 0;
+ int rc = 0;
+ int i;
+ lnet_rtrbufpool_t *rtrp;
+
+ /*
+ * this function doesn't revert the changes if adding new buffers
+ * failed. It's up to the user space caller to revert the
+ * changes.
+ */
+
+ if (!the_lnet.ln_routing)
+ return 0;
+
+ /*
+ * If the provided values for each buffer pool are different than the
+ * configured values, we need to take action.
+ */
+ if (tiny >= 0 && tiny != tiny_router_buffers) {
+ tiny_router_buffers = tiny;
+ nrb = lnet_nrb_tiny_calculate();
+ cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
+ rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_TINY_BUF_IDX],
+ nrb, i);
+ if (rc)
+ return rc;
+ }
+ }
+ if (small >= 0 && small != small_router_buffers) {
+ small_router_buffers = small;
+ nrb = lnet_nrb_small_calculate();
+ cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
+ rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_SMALL_BUF_IDX],
+ nrb, i);
+ if (rc)
+ return rc;
+ }
+ }
+ if (large >= 0 && large != large_router_buffers) {
+ large_router_buffers = large;
+ nrb = lnet_nrb_large_calculate();
+ cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
+ rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_LARGE_BUF_IDX],
+ nrb, i);
+ if (rc)
+ return rc;
+ }
+ }
+
+ return 0;
+}
+
+int
+lnet_rtrpools_enable(void)
+{
+ int rc;
+
+ if (the_lnet.ln_routing)
+ return 0;
+
+ if (!the_lnet.ln_rtrpools)
+ /*
+ * If routing is turned off, and we have never
+ * initialized the pools before, just call the
+ * standard buffer pool allocation routine as
+ * if we are just configuring this for the first
+ * time.
+ */
+ return lnet_rtrpools_alloc(1);
+
+ rc = lnet_rtrpools_adjust(0, 0, 0);
+ if (rc)
+ return rc;
+
+ lnet_net_lock(LNET_LOCK_EX);
+ the_lnet.ln_routing = 1;
+
+ the_lnet.ln_ping_info->pi_features &= ~LNET_PING_FEAT_RTE_DISABLED;
+ lnet_net_unlock(LNET_LOCK_EX);
+
+ return 0;
+}
+
+void
+lnet_rtrpools_disable(void)
+{
+ if (!the_lnet.ln_routing)
+ return;
+
+ lnet_net_lock(LNET_LOCK_EX);
+ the_lnet.ln_routing = 0;
+ the_lnet.ln_ping_info->pi_features |= LNET_PING_FEAT_RTE_DISABLED;
+
+ tiny_router_buffers = 0;
+ small_router_buffers = 0;
+ large_router_buffers = 0;
+ lnet_net_unlock(LNET_LOCK_EX);
+ lnet_rtrpools_free(1);
+}
+
+int
lnet_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive, unsigned long when)
{
struct lnet_peer *lp = NULL;
--
1.7.1