[PATCH 3/4] staging: lustre: lnet: convert selftest to use workqueues

From: NeilBrown
Date: Sun Dec 17 2017 - 20:27:45 EST


Instead of the cfs workitem library, use workqueues.

As lnet wants to provide a cpu mask of allowed cpus, it
needs to be a WQ_UNBOUND work queue so that tasks can
run on cpus other than where they were submitted.

apply_workqueue_atts needs to be exported for lustre to use it.

Signed-off-by: NeilBrown <neilb@xxxxxxxx>
---
drivers/staging/lustre/lnet/selftest/framework.c | 10 +---
drivers/staging/lustre/lnet/selftest/module.c | 39 ++++++++------
drivers/staging/lustre/lnet/selftest/rpc.c | 61 +++++++++-------------
drivers/staging/lustre/lnet/selftest/selftest.h | 40 ++++++--------
kernel/workqueue.c | 1
5 files changed, 69 insertions(+), 82 deletions(-)

diff --git a/drivers/staging/lustre/lnet/selftest/framework.c b/drivers/staging/lustre/lnet/selftest/framework.c
index 2e1126552e18..c7697f66f663 100644
--- a/drivers/staging/lustre/lnet/selftest/framework.c
+++ b/drivers/staging/lustre/lnet/selftest/framework.c
@@ -941,15 +941,13 @@ sfw_create_test_rpc(struct sfw_test_unit *tsu, struct lnet_process_id peer,
return 0;
}

-static int
+static void
sfw_run_test(struct swi_workitem *wi)
{
struct sfw_test_unit *tsu = container_of(wi, struct sfw_test_unit, tsu_worker);
struct sfw_test_instance *tsi = tsu->tsu_instance;
struct srpc_client_rpc *rpc = NULL;

- LASSERT(wi == &tsu->tsu_worker);
-
if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc)) {
LASSERT(!rpc);
goto test_done;
@@ -975,7 +973,7 @@ sfw_run_test(struct swi_workitem *wi)
rpc->crpc_timeout = rpc_timeout;
srpc_post_rpc(rpc);
spin_unlock(&rpc->crpc_lock);
- return 0;
+ return;

test_done:
/*
@@ -985,9 +983,7 @@ sfw_run_test(struct swi_workitem *wi)
* - my batch is still active; no one can run it again now.
* Cancel pending schedules and prevent future schedule attempts:
*/
- swi_exit_workitem(wi);
sfw_test_unit_done(tsu);
- return 1;
}

static int
@@ -1017,7 +1013,7 @@ sfw_run_batch(struct sfw_batch *tsb)
tsu->tsu_loop = tsi->tsi_loop;
wi = &tsu->tsu_worker;
swi_init_workitem(wi, sfw_run_test,
- lst_sched_test[lnet_cpt_of_nid(tsu->tsu_dest.nid)]);
+ lst_test_wq[lnet_cpt_of_nid(tsu->tsu_dest.nid)]);
swi_schedule_workitem(wi);
}
}
diff --git a/drivers/staging/lustre/lnet/selftest/module.c b/drivers/staging/lustre/lnet/selftest/module.c
index ba4b6145c953..aa6bfd5baf2f 100644
--- a/drivers/staging/lustre/lnet/selftest/module.c
+++ b/drivers/staging/lustre/lnet/selftest/module.c
@@ -47,8 +47,8 @@ enum {

static int lst_init_step = LST_INIT_NONE;

-struct cfs_wi_sched *lst_sched_serial;
-struct cfs_wi_sched **lst_sched_test;
+struct workqueue_struct *lst_serial_wq;
+struct workqueue_struct **lst_test_wq;

static void
lnet_selftest_exit(void)
@@ -68,16 +68,16 @@ lnet_selftest_exit(void)
case LST_INIT_WI_TEST:
for (i = 0;
i < cfs_cpt_number(lnet_cpt_table()); i++) {
- if (!lst_sched_test[i])
+ if (!lst_test_wq[i])
continue;
- cfs_wi_sched_destroy(lst_sched_test[i]);
+ destroy_workqueue(lst_test_wq[i]);
}
- kvfree(lst_sched_test);
- lst_sched_test = NULL;
+ kvfree(lst_test_wq);
+ lst_test_wq = NULL;
/* fall through */
case LST_INIT_WI_SERIAL:
- cfs_wi_sched_destroy(lst_sched_serial);
- lst_sched_serial = NULL;
+ destroy_workqueue(lst_serial_wq);
+ lst_serial_wq = NULL;
case LST_INIT_NONE:
break;
default:
@@ -92,33 +92,40 @@ lnet_selftest_init(void)
int rc;
int i;

- rc = cfs_wi_sched_create("lst_s", lnet_cpt_table(), CFS_CPT_ANY,
- 1, &lst_sched_serial);
- if (rc) {
+ lst_serial_wq = alloc_ordered_workqueue("lst_s", 0);
+ if (!lst_serial_wq) {
CERROR("Failed to create serial WI scheduler for LST\n");
return rc;
}
lst_init_step = LST_INIT_WI_SERIAL;

nscheds = cfs_cpt_number(lnet_cpt_table());
- lst_sched_test = kvmalloc_array(nscheds, sizeof(lst_sched_test[0]),
+ lst_test_wq = kvmalloc_array(nscheds, sizeof(lst_test_wq[0]),
GFP_KERNEL | __GFP_ZERO);
- if (!lst_sched_test)
+ if (!lst_test_wq)
goto error;

lst_init_step = LST_INIT_WI_TEST;
for (i = 0; i < nscheds; i++) {
int nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
+ struct workqueue_attrs attrs;

/* reserve at least one CPU for LND */
nthrs = max(nthrs - 1, 1);
- rc = cfs_wi_sched_create("lst_t", lnet_cpt_table(), i,
- nthrs, &lst_sched_test[i]);
- if (rc) {
+ lst_test_wq[i] = alloc_workqueue("lst_t", WQ_UNBOUND, nthrs);
+ if (!lst_test_wq[i]) {
CWARN("Failed to create CPU partition affinity WI scheduler %d for LST\n",
i);
goto error;
}
+ attrs.nice = 0;
+ #ifdef CONFIG_CPUMASK_OFFSTACK
+ attrs.cpumask = lnet_cpt_table()->ctb_parts[i].cpt_cpumask;
+ #else
+ cpumask_copy(attrs.cpumask, lnet_cpt_table()->ctb_parts[i].cpt_cpumask);
+ #endif
+ attrs.no_numa = false;
+ apply_workqueue_attrs(lst_test_wq[i], &attrs);
}

rc = srpc_startup();
diff --git a/drivers/staging/lustre/lnet/selftest/rpc.c b/drivers/staging/lustre/lnet/selftest/rpc.c
index 4ebb5a1107be..b515138dca2c 100644
--- a/drivers/staging/lustre/lnet/selftest/rpc.c
+++ b/drivers/staging/lustre/lnet/selftest/rpc.c
@@ -68,7 +68,7 @@ srpc_serv_portal(int svc_id)
}

/* forward ref's */
-int srpc_handle_rpc(struct swi_workitem *wi);
+void srpc_handle_rpc(struct swi_workitem *wi);

void srpc_get_counters(struct srpc_counters *cnt)
{
@@ -178,7 +178,7 @@ srpc_init_server_rpc(struct srpc_server_rpc *rpc,
memset(rpc, 0, sizeof(*rpc));
swi_init_workitem(&rpc->srpc_wi, srpc_handle_rpc,
srpc_serv_is_framework(scd->scd_svc) ?
- lst_sched_serial : lst_sched_test[scd->scd_cpt]);
+ lst_serial_wq : lst_test_wq[scd->scd_cpt]);

rpc->srpc_ev.ev_fired = 1; /* no event expected now */

@@ -242,7 +242,7 @@ srpc_service_nrpcs(struct srpc_service *svc)
max(nrpcs, SFW_FRWK_WI_MIN) : max(nrpcs, SFW_TEST_WI_MIN);
}

-int srpc_add_buffer(struct swi_workitem *wi);
+void srpc_add_buffer(struct swi_workitem *wi);

static int
srpc_service_init(struct srpc_service *svc)
@@ -277,11 +277,11 @@ srpc_service_init(struct srpc_service *svc)
scd->scd_ev.ev_type = SRPC_REQUEST_RCVD;

/*
- * NB: don't use lst_sched_serial for adding buffer,
+ * NB: don't use lst_serial_wq for adding buffer,
* see details in srpc_service_add_buffers()
*/
swi_init_workitem(&scd->scd_buf_wi,
- srpc_add_buffer, lst_sched_test[i]);
+ srpc_add_buffer, lst_test_wq[i]);

if (i && srpc_serv_is_framework(svc)) {
/*
@@ -514,7 +514,7 @@ __must_hold(&scd->scd_lock)
return rc;
}

-int
+void
srpc_add_buffer(struct swi_workitem *wi)
{
struct srpc_service_cd *scd = container_of(wi, struct srpc_service_cd, scd_buf_wi);
@@ -573,7 +573,6 @@ srpc_add_buffer(struct swi_workitem *wi)
}

spin_unlock(&scd->scd_lock);
- return 0;
}

int
@@ -605,15 +604,15 @@ srpc_service_add_buffers(struct srpc_service *sv, int nbuffer)
spin_lock(&scd->scd_lock);
/*
* NB: srpc_service_add_buffers() can be called inside
- * thread context of lst_sched_serial, and we don't normally
+ * thread context of lst_serial_wq, and we don't normally
* allow to sleep inside thread context of WI scheduler
* because it will block current scheduler thread from doing
* anything else, even worse, it could deadlock if it's
* waiting on result from another WI of the same scheduler.
* However, it's safe at here because scd_buf_wi is scheduled
- * by thread in a different WI scheduler (lst_sched_test),
+ * by thread in a different WI scheduler (lst_test_wq),
* so we don't have any risk of deadlock, though this could
- * block all WIs pending on lst_sched_serial for a moment
+ * block all WIs pending on lst_serial_wq for a moment
* which is not good but not fatal.
*/
lst_wait_until(scd->scd_buf_err ||
@@ -660,11 +659,9 @@ srpc_finish_service(struct srpc_service *sv)
LASSERT(sv->sv_shuttingdown); /* srpc_shutdown_service called */

cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
+ swi_cancel_workitem(&scd->scd_buf_wi);
+
spin_lock(&scd->scd_lock);
- if (!swi_deschedule_workitem(&scd->scd_buf_wi)) {
- spin_unlock(&scd->scd_lock);
- return 0;
- }

if (scd->scd_buf_nposted > 0) {
CDEBUG(D_NET, "waiting for %d posted buffers to unlink\n",
@@ -680,11 +677,9 @@ srpc_finish_service(struct srpc_service *sv)

rpc = list_entry(scd->scd_rpc_active.next,
struct srpc_server_rpc, srpc_list);
- CNETERR("Active RPC %p on shutdown: sv %s, peer %s, wi %s scheduled %d running %d, ev fired %d type %d status %d lnet %d\n",
+ CNETERR("Active RPC %p on shutdown: sv %s, peer %s, wi %s, ev fired %d type %d status %d lnet %d\n",
rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
swi_state2str(rpc->srpc_wi.swi_state),
- rpc->srpc_wi.swi_workitem.wi_scheduled,
- rpc->srpc_wi.swi_workitem.wi_running,
rpc->srpc_ev.ev_fired, rpc->srpc_ev.ev_type,
rpc->srpc_ev.ev_status, rpc->srpc_ev.ev_lnet);
spin_unlock(&scd->scd_lock);
@@ -947,7 +942,6 @@ srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status)
* Cancel pending schedules and prevent future schedule attempts:
*/
LASSERT(rpc->srpc_ev.ev_fired);
- swi_exit_workitem(&rpc->srpc_wi);

if (!sv->sv_shuttingdown && !list_empty(&scd->scd_buf_blocked)) {
buffer = list_entry(scd->scd_buf_blocked.next,
@@ -965,7 +959,7 @@ srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status)
}

/* handles an incoming RPC */
-int
+void
srpc_handle_rpc(struct swi_workitem *wi)
{
struct srpc_server_rpc *rpc = container_of(wi, struct srpc_server_rpc, srpc_wi);
@@ -987,9 +981,8 @@ srpc_handle_rpc(struct swi_workitem *wi)

if (ev->ev_fired) { /* no more event, OK to finish */
srpc_server_rpc_done(rpc, -ESHUTDOWN);
- return 1;
}
- return 0;
+ return;
}

spin_unlock(&scd->scd_lock);
@@ -1007,7 +1000,7 @@ srpc_handle_rpc(struct swi_workitem *wi)
if (!msg->msg_magic) {
/* moaned already in srpc_lnet_ev_handler */
srpc_server_rpc_done(rpc, EBADMSG);
- return 1;
+ return;
}

srpc_unpack_msg_hdr(msg);
@@ -1023,7 +1016,7 @@ srpc_handle_rpc(struct swi_workitem *wi)
LASSERT(!reply->status || !rpc->srpc_bulk);
if (rc) {
srpc_server_rpc_done(rpc, rc);
- return 1;
+ return;
}
}

@@ -1032,7 +1025,7 @@ srpc_handle_rpc(struct swi_workitem *wi)
if (rpc->srpc_bulk) {
rc = srpc_do_bulk(rpc);
if (!rc)
- return 0; /* wait for bulk */
+ return; /* wait for bulk */

LASSERT(ev->ev_fired);
ev->ev_status = rc;
@@ -1050,16 +1043,16 @@ srpc_handle_rpc(struct swi_workitem *wi)

if (rc) {
srpc_server_rpc_done(rpc, rc);
- return 1;
+ return;
}
}

wi->swi_state = SWI_STATE_REPLY_SUBMITTED;
rc = srpc_send_reply(rpc);
if (!rc)
- return 0; /* wait for reply */
+ return; /* wait for reply */
srpc_server_rpc_done(rpc, rc);
- return 1;
+ return;

case SWI_STATE_REPLY_SUBMITTED:
if (!ev->ev_fired) {
@@ -1072,10 +1065,8 @@ srpc_handle_rpc(struct swi_workitem *wi)

wi->swi_state = SWI_STATE_DONE;
srpc_server_rpc_done(rpc, ev->ev_status);
- return 1;
+ return;
}
-
- return 0;
}

static void
@@ -1170,7 +1161,6 @@ srpc_client_rpc_done(struct srpc_client_rpc *rpc, int status)
* Cancel pending schedules and prevent future schedule attempts:
*/
LASSERT(!srpc_event_pending(rpc));
- swi_exit_workitem(wi);

spin_unlock(&rpc->crpc_lock);

@@ -1178,7 +1168,7 @@ srpc_client_rpc_done(struct srpc_client_rpc *rpc, int status)
}

/* sends an outgoing RPC */
-int
+void
srpc_send_rpc(struct swi_workitem *wi)
{
int rc = 0;
@@ -1214,7 +1204,7 @@ srpc_send_rpc(struct swi_workitem *wi)
rc = srpc_prepare_reply(rpc);
if (rc) {
srpc_client_rpc_done(rpc, rc);
- return 1;
+ return;
}

rc = srpc_prepare_bulk(rpc);
@@ -1291,7 +1281,7 @@ srpc_send_rpc(struct swi_workitem *wi)

wi->swi_state = SWI_STATE_DONE;
srpc_client_rpc_done(rpc, rc);
- return 1;
+ return;
}

if (rc) {
@@ -1308,10 +1298,9 @@ srpc_send_rpc(struct swi_workitem *wi)

if (!srpc_event_pending(rpc)) {
srpc_client_rpc_done(rpc, -EINTR);
- return 1;
+ return;
}
}
- return 0;
}

struct srpc_client_rpc *
diff --git a/drivers/staging/lustre/lnet/selftest/selftest.h b/drivers/staging/lustre/lnet/selftest/selftest.h
index 465417263ef1..ad04534f000c 100644
--- a/drivers/staging/lustre/lnet/selftest/selftest.h
+++ b/drivers/staging/lustre/lnet/selftest/selftest.h
@@ -169,11 +169,11 @@ struct srpc_buffer {
};

struct swi_workitem;
-typedef int (*swi_action_t) (struct swi_workitem *);
+typedef void (*swi_action_t) (struct swi_workitem *);

struct swi_workitem {
- struct cfs_wi_sched *swi_sched;
- struct cfs_workitem swi_workitem;
+ struct workqueue_struct *swi_wq;
+ struct work_struct swi_work;
swi_action_t swi_action;
int swi_state;
};
@@ -444,7 +444,7 @@ void srpc_free_bulk(struct srpc_bulk *bk);
struct srpc_bulk *srpc_alloc_bulk(int cpt, unsigned int off,
unsigned int bulk_npg, unsigned int bulk_len,
int sink);
-int srpc_send_rpc(struct swi_workitem *wi);
+void srpc_send_rpc(struct swi_workitem *wi);
int srpc_send_reply(struct srpc_server_rpc *rpc);
int srpc_add_service(struct srpc_service *sv);
int srpc_remove_service(struct srpc_service *sv);
@@ -456,8 +456,8 @@ void srpc_service_remove_buffers(struct srpc_service *sv, int nbuffer);
void srpc_get_counters(struct srpc_counters *cnt);
void srpc_set_counters(const struct srpc_counters *cnt);

-extern struct cfs_wi_sched *lst_sched_serial;
-extern struct cfs_wi_sched **lst_sched_test;
+extern struct workqueue_struct *lst_serial_wq;
+extern struct workqueue_struct **lst_test_wq;

static inline int
srpc_serv_is_framework(struct srpc_service *svc)
@@ -465,42 +465,36 @@ srpc_serv_is_framework(struct srpc_service *svc)
return svc->sv_id < SRPC_FRAMEWORK_SERVICE_MAX_ID;
}

-static inline int
-swi_wi_action(struct cfs_workitem *wi)
+static void
+swi_wi_action(struct work_struct *wi)
{
struct swi_workitem *swi;

- swi = container_of(wi, struct swi_workitem, swi_workitem);
+ swi = container_of(wi, struct swi_workitem, swi_work);

- return swi->swi_action(swi);
+ swi->swi_action(swi);
}

static inline void
swi_init_workitem(struct swi_workitem *swi,
- swi_action_t action, struct cfs_wi_sched *sched)
+ swi_action_t action, struct workqueue_struct *wq)
{
- swi->swi_sched = sched;
+ swi->swi_wq = wq;
swi->swi_action = action;
swi->swi_state = SWI_STATE_NEWBORN;
- cfs_wi_init(&swi->swi_workitem, swi_wi_action);
+ INIT_WORK(&swi->swi_work, swi_wi_action);
}

static inline void
swi_schedule_workitem(struct swi_workitem *wi)
{
- cfs_wi_schedule(wi->swi_sched, &wi->swi_workitem);
-}
-
-static inline void
-swi_exit_workitem(struct swi_workitem *swi)
-{
- cfs_wi_exit(swi->swi_sched, &swi->swi_workitem);
+ queue_work(wi->swi_wq, &wi->swi_work);
}

static inline int
-swi_deschedule_workitem(struct swi_workitem *swi)
+swi_cancel_workitem(struct swi_workitem *swi)
{
- return cfs_wi_deschedule(swi->swi_sched, &swi->swi_workitem);
+ return cancel_work_sync(&swi->swi_work);
}

int sfw_startup(void);
@@ -534,7 +528,7 @@ srpc_init_client_rpc(struct srpc_client_rpc *rpc, struct lnet_process_id peer,

INIT_LIST_HEAD(&rpc->crpc_list);
swi_init_workitem(&rpc->crpc_wi, srpc_send_rpc,
- lst_sched_test[lnet_cpt_of_nid(peer.nid)]);
+ lst_test_wq[lnet_cpt_of_nid(peer.nid)]);
spin_lock_init(&rpc->crpc_lock);
atomic_set(&rpc->crpc_refcount, 1); /* 1 ref for caller */

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 8fdb710bfdd7..024fd99bf7ea 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -3806,6 +3806,7 @@ int apply_workqueue_attrs(struct workqueue_struct *wq,

return ret;
}
+EXPORT_SYMBOL_GPL(apply_workqueue_attrs);

/**
* wq_update_unbound_numa - update NUMA affinity of a wq for CPU hot[un]plug