[PATCH 5.10 247/252] SUNRPC: More fixes for backlog congestion
From: Greg Kroah-Hartman
Date: Mon May 31 2021 - 11:10:18 EST
From: Trond Myklebust <trond.myklebust@xxxxxxxxxxxxxxx>
commit e86be3a04bc4aeaf12f93af35f08f8d4385bcd98 upstream.
Ensure that we fix the XPRT_CONGESTED starvation issue for RDMA as well
as socket based transports.
Ensure we always initialise the request after waking up from the backlog
list.
Fixes: e877a88d1f06 ("SUNRPC in case of backlog, hand free slots directly to waiting task")
Signed-off-by: Trond Myklebust <trond.myklebust@xxxxxxxxxxxxxxx>
Signed-off-by: Greg Kroah-Hartman <gregkh@xxxxxxxxxxxxxxxxxxx>
---
include/linux/sunrpc/xprt.h | 2 +
net/sunrpc/xprt.c | 58 +++++++++++++++++++---------------------
net/sunrpc/xprtrdma/transport.c | 12 ++++----
net/sunrpc/xprtrdma/verbs.c | 18 ++++++++++--
net/sunrpc/xprtrdma/xprt_rdma.h | 1
5 files changed, 52 insertions(+), 39 deletions(-)
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -367,6 +367,8 @@ struct rpc_xprt * xprt_alloc(struct net
unsigned int num_prealloc,
unsigned int max_req);
void xprt_free(struct rpc_xprt *);
+void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task);
+bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req);
static inline int
xprt_enable_swap(struct rpc_xprt *xprt)
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1575,11 +1575,18 @@ xprt_transmit(struct rpc_task *task)
spin_unlock(&xprt->queue_lock);
}
-static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
+static void xprt_complete_request_init(struct rpc_task *task)
+{
+ if (task->tk_rqstp)
+ xprt_request_init(task);
+}
+
+void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
{
set_bit(XPRT_CONGESTED, &xprt->state);
- rpc_sleep_on(&xprt->backlog, task, NULL);
+ rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init);
}
+EXPORT_SYMBOL_GPL(xprt_add_backlog);
static bool __xprt_set_rq(struct rpc_task *task, void *data)
{
@@ -1587,14 +1594,13 @@ static bool __xprt_set_rq(struct rpc_tas
if (task->tk_rqstp == NULL) {
memset(req, 0, sizeof(*req)); /* mark unused */
- task->tk_status = -EAGAIN;
task->tk_rqstp = req;
return true;
}
return false;
}
-static bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
+bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
{
if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) {
clear_bit(XPRT_CONGESTED, &xprt->state);
@@ -1602,6 +1608,7 @@ static bool xprt_wake_up_backlog(struct
}
return true;
}
+EXPORT_SYMBOL_GPL(xprt_wake_up_backlog);
static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
{
@@ -1611,7 +1618,7 @@ static bool xprt_throttle_congested(stru
goto out;
spin_lock(&xprt->reserve_lock);
if (test_bit(XPRT_CONGESTED, &xprt->state)) {
- rpc_sleep_on(&xprt->backlog, task, NULL);
+ xprt_add_backlog(xprt, task);
ret = true;
}
spin_unlock(&xprt->reserve_lock);
@@ -1780,10 +1787,6 @@ xprt_request_init(struct rpc_task *task)
struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req = task->tk_rqstp;
- if (req->rq_task)
- /* Already initialized */
- return;
-
req->rq_task = task;
req->rq_xprt = xprt;
req->rq_buffer = NULL;
@@ -1844,10 +1847,8 @@ void xprt_retry_reserve(struct rpc_task
struct rpc_xprt *xprt = task->tk_xprt;
task->tk_status = 0;
- if (task->tk_rqstp != NULL) {
- xprt_request_init(task);
+ if (task->tk_rqstp != NULL)
return;
- }
task->tk_status = -EAGAIN;
xprt_do_reserve(xprt, task);
@@ -1872,24 +1873,21 @@ void xprt_release(struct rpc_task *task)
}
xprt = req->rq_xprt;
- if (xprt) {
- xprt_request_dequeue_xprt(task);
- spin_lock(&xprt->transport_lock);
- xprt->ops->release_xprt(xprt, task);
- if (xprt->ops->release_request)
- xprt->ops->release_request(task);
- xprt_schedule_autodisconnect(xprt);
- spin_unlock(&xprt->transport_lock);
- if (req->rq_buffer)
- xprt->ops->buf_free(task);
- xdr_free_bvec(&req->rq_rcv_buf);
- xdr_free_bvec(&req->rq_snd_buf);
- if (req->rq_cred != NULL)
- put_rpccred(req->rq_cred);
- if (req->rq_release_snd_buf)
- req->rq_release_snd_buf(req);
- } else
- xprt = task->tk_xprt;
+ xprt_request_dequeue_xprt(task);
+ spin_lock(&xprt->transport_lock);
+ xprt->ops->release_xprt(xprt, task);
+ if (xprt->ops->release_request)
+ xprt->ops->release_request(task);
+ xprt_schedule_autodisconnect(xprt);
+ spin_unlock(&xprt->transport_lock);
+ if (req->rq_buffer)
+ xprt->ops->buf_free(task);
+ xdr_free_bvec(&req->rq_rcv_buf);
+ xdr_free_bvec(&req->rq_snd_buf);
+ if (req->rq_cred != NULL)
+ put_rpccred(req->rq_cred);
+ if (req->rq_release_snd_buf)
+ req->rq_release_snd_buf(req);
task->tk_rqstp = NULL;
if (likely(!bc_prealloc(req)))
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -520,9 +520,8 @@ xprt_rdma_alloc_slot(struct rpc_xprt *xp
return;
out_sleep:
- set_bit(XPRT_CONGESTED, &xprt->state);
- rpc_sleep_on(&xprt->backlog, task, NULL);
task->tk_status = -EAGAIN;
+ xprt_add_backlog(xprt, task);
}
/**
@@ -537,10 +536,11 @@ xprt_rdma_free_slot(struct rpc_xprt *xpr
struct rpcrdma_xprt *r_xprt =
container_of(xprt, struct rpcrdma_xprt, rx_xprt);
- memset(rqst, 0, sizeof(*rqst));
- rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
- if (unlikely(!rpc_wake_up_next(&xprt->backlog)))
- clear_bit(XPRT_CONGESTED, &xprt->state);
+ rpcrdma_reply_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
+ if (!xprt_wake_up_backlog(xprt, rqst)) {
+ memset(rqst, 0, sizeof(*rqst));
+ rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
+ }
}
static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt,
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1198,6 +1198,20 @@ void rpcrdma_mr_put(struct rpcrdma_mr *m
}
/**
+ * rpcrdma_reply_put - Put reply buffers back into pool
+ * @buffers: buffer pool
+ * @req: object to return
+ *
+ */
+void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
+{
+ if (req->rl_reply) {
+ rpcrdma_rep_put(buffers, req->rl_reply);
+ req->rl_reply = NULL;
+ }
+}
+
+/**
* rpcrdma_buffer_get - Get a request buffer
* @buffers: Buffer pool from which to obtain a buffer
*
@@ -1225,9 +1239,7 @@ rpcrdma_buffer_get(struct rpcrdma_buffer
*/
void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
{
- if (req->rl_reply)
- rpcrdma_rep_put(buffers, req->rl_reply);
- req->rl_reply = NULL;
+ rpcrdma_reply_put(buffers, req);
spin_lock(&buffers->rb_lock);
list_add(&req->rl_list, &buffers->rb_send_bufs);
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -472,6 +472,7 @@ void rpcrdma_mrs_refresh(struct rpcrdma_
struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers,
struct rpcrdma_req *req);
+void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req);
void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size,