[PATCH AUTOSEL 4.19 477/671] tipc: reduce risk of wakeup queue starvation

From: Sasha Levin
Date: Thu Jan 16 2020 - 13:46:48 EST


From: Jon Maloy <jon.maloy@xxxxxxxxxxxx>

[ Upstream commit 7c5b42055964f587e55bd87ef334c3a27e95d144 ]

In commit 365ad353c256 ("tipc: reduce risk of user starvation during
link congestion") we allowed senders to add exactly one list of extra
buffers to the link backlog queues during link congestion (aka
"oversubscription"). However, the criteria for when to stop adding
wakeup messages to the input queue when the overload abates is
inaccurate, and may cause starvation problems during very high load.

Currently, we stop adding wakeup messages after 10 total failed attempts
where we find that there is no space left in the backlog queue for a
certain importance level. The counter for this is accumulated across all
levels, which may lead the algorithm to leave the loop prematurely,
although there may still be plenty of space available at some levels.
The result is sometimes that messages near the wakeup queue tail are not
added to the input queue as they should be.

We now introduce a more exact algorithm, where we keep adding wakeup
messages to a level as long as the backlog queue has free slots for
the corresponding level, and stop at the moment there are no more such
slots or when there are no more wakeup messages to dequeue.

Fixes: 365ad35 ("tipc: reduce risk of user starvation during link congestion")
Reported-by: Tung Nguyen <tung.q.nguyen@xxxxxxxxxxxxxx>
Acked-by: Ying Xue <ying.xue@xxxxxxxxxxxxx>
Signed-off-by: Jon Maloy <jon.maloy@xxxxxxxxxxxx>
Signed-off-by: David S. Miller <davem@xxxxxxxxxxxxx>
Signed-off-by: Sasha Levin <sashal@xxxxxxxxxx>
---
net/tipc/link.c | 29 +++++++++++++++++++++--------
1 file changed, 21 insertions(+), 8 deletions(-)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index 0fbf8ea18ce0..cc9a0485536b 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -830,18 +830,31 @@ static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr)
*/
static void link_prepare_wakeup(struct tipc_link *l)
{
+ struct sk_buff_head *wakeupq = &l->wakeupq;
+ struct sk_buff_head *inputq = l->inputq;
struct sk_buff *skb, *tmp;
- int imp, i = 0;
+ struct sk_buff_head tmpq;
+ int avail[5] = {0,};
+ int imp = 0;
+
+ __skb_queue_head_init(&tmpq);

- skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
+ for (; imp <= TIPC_SYSTEM_IMPORTANCE; imp++)
+ avail[imp] = l->backlog[imp].limit - l->backlog[imp].len;
+
+ skb_queue_walk_safe(wakeupq, skb, tmp) {
imp = TIPC_SKB_CB(skb)->chain_imp;
- if (l->backlog[imp].len < l->backlog[imp].limit) {
- skb_unlink(skb, &l->wakeupq);
- skb_queue_tail(l->inputq, skb);
- } else if (i++ > 10) {
- break;
- }
+ if (avail[imp] <= 0)
+ continue;
+ avail[imp]--;
+ __skb_unlink(skb, wakeupq);
+ __skb_queue_tail(&tmpq, skb);
}
+
+ spin_lock_bh(&inputq->lock);
+ skb_queue_splice_tail(&tmpq, inputq);
+ spin_unlock_bh(&inputq->lock);
+
}

void tipc_link_reset(struct tipc_link *l)
--
2.20.1