[PATCH net-next v2 18/39] rxrpc: Implement progressive transmission queue struct

From: David Howells
Date: Wed Dec 04 2024 - 02:52:54 EST


We need to scan the buffers in the transmission queue occasionally when
processing ACKs, but the transmission queue is currently a linked list of
transmission buffers which, when we eventually expand the Tx window to 8192
packets will be very slow to walk.

Instead, pull the fields we need to examine a lot (last sent time,
retransmitted flag) into a new struct rxrpc_txqueue and make each one hold
an array of 32 or 64 packets.

The transmission queue is then a list of these structs, each pointing to a
contiguous set of packets. Scanning is then a lot faster as the flags and
timestamps are concentrated in the CPU dcache.

The transmission timestamps are stored as a number of microseconds from a
base ktime to reduce memory requirements. This should be fine provided we
manage to transmit an entire buffer within an hour.

This will make implementing RACK-TLP [RFC8985] easier as it will be less
costly to scan the transmission buffers.

Signed-off-by: David Howells <dhowells@xxxxxxxxxx>
cc: Marc Dionne <marc.dionne@xxxxxxxxxxxx>
cc: "David S. Miller" <davem@xxxxxxxxxxxxx>
cc: Eric Dumazet <edumazet@xxxxxxxxxx>
cc: Jakub Kicinski <kuba@xxxxxxxxxx>
cc: Paolo Abeni <pabeni@xxxxxxxxxx>
cc: linux-afs@xxxxxxxxxxxxxxxxxxx
cc: netdev@xxxxxxxxxxxxxxx
---
include/trace/events/rxrpc.h | 98 ++++++++++++++---
net/rxrpc/ar-internal.h | 47 ++++++--
net/rxrpc/call_event.c | 204 ++++++++++++++++++++++-------------
net/rxrpc/call_object.c | 38 ++++---
net/rxrpc/input.c | 72 ++++++++++---
net/rxrpc/output.c | 163 ++++++++++++++--------------
net/rxrpc/sendmsg.c | 69 +++++++++---
net/rxrpc/txbuf.c | 41 +------
8 files changed, 467 insertions(+), 265 deletions(-)

diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h
index 28fa7be31ff8..e6cf9ec940aa 100644
--- a/include/trace/events/rxrpc.h
+++ b/include/trace/events/rxrpc.h
@@ -297,7 +297,6 @@

#define rxrpc_txqueue_traces \
EM(rxrpc_txqueue_await_reply, "AWR") \
- EM(rxrpc_txqueue_dequeue, "DEQ") \
EM(rxrpc_txqueue_end, "END") \
EM(rxrpc_txqueue_queue, "QUE") \
EM(rxrpc_txqueue_queue_last, "QLS") \
@@ -482,6 +481,19 @@
EM(rxrpc_txbuf_see_send_more, "SEE SEND+ ") \
E_(rxrpc_txbuf_see_unacked, "SEE UNACKED")

+#define rxrpc_tq_traces \
+ EM(rxrpc_tq_alloc, "ALLOC") \
+ EM(rxrpc_tq_cleaned, "CLEAN") \
+ EM(rxrpc_tq_decant, "DCNT ") \
+ EM(rxrpc_tq_decant_advance, "DCNT>") \
+ EM(rxrpc_tq_queue, "QUEUE") \
+ EM(rxrpc_tq_queue_dup, "QUE!!") \
+ EM(rxrpc_tq_rotate, "ROT ") \
+ EM(rxrpc_tq_rotate_and_free, "ROT-F") \
+ EM(rxrpc_tq_rotate_and_keep, "ROT-K") \
+ EM(rxrpc_tq_transmit, "XMIT ") \
+ E_(rxrpc_tq_transmit_advance, "XMIT>")
+
#define rxrpc_pmtud_reduce_traces \
EM(rxrpc_pmtud_reduce_ack, "Ack ") \
EM(rxrpc_pmtud_reduce_icmp, "Icmp ") \
@@ -518,6 +530,7 @@ enum rxrpc_rtt_tx_trace { rxrpc_rtt_tx_traces } __mode(byte);
enum rxrpc_sack_trace { rxrpc_sack_traces } __mode(byte);
enum rxrpc_skb_trace { rxrpc_skb_traces } __mode(byte);
enum rxrpc_timer_trace { rxrpc_timer_traces } __mode(byte);
+enum rxrpc_tq_trace { rxrpc_tq_traces } __mode(byte);
enum rxrpc_tx_point { rxrpc_tx_points } __mode(byte);
enum rxrpc_txbuf_trace { rxrpc_txbuf_traces } __mode(byte);
enum rxrpc_txqueue_trace { rxrpc_txqueue_traces } __mode(byte);
@@ -554,6 +567,7 @@ rxrpc_rtt_tx_traces;
rxrpc_sack_traces;
rxrpc_skb_traces;
rxrpc_timer_traces;
+rxrpc_tq_traces;
rxrpc_tx_points;
rxrpc_txbuf_traces;
rxrpc_txqueue_traces;
@@ -881,7 +895,7 @@ TRACE_EVENT(rxrpc_txqueue,
__field(rxrpc_seq_t, acks_hard_ack)
__field(rxrpc_seq_t, tx_bottom)
__field(rxrpc_seq_t, tx_top)
- __field(rxrpc_seq_t, tx_prepared)
+ __field(rxrpc_seq_t, send_top)
__field(int, tx_winsize)
),

@@ -891,7 +905,7 @@ TRACE_EVENT(rxrpc_txqueue,
__entry->acks_hard_ack = call->acks_hard_ack;
__entry->tx_bottom = call->tx_bottom;
__entry->tx_top = call->tx_top;
- __entry->tx_prepared = call->tx_prepared;
+ __entry->send_top = call->send_top;
__entry->tx_winsize = call->tx_winsize;
),

@@ -902,14 +916,14 @@ TRACE_EVENT(rxrpc_txqueue,
__entry->acks_hard_ack,
__entry->tx_top - __entry->tx_bottom,
__entry->tx_top - __entry->acks_hard_ack,
- __entry->tx_prepared - __entry->tx_bottom,
+ __entry->send_top - __entry->tx_top,
__entry->tx_winsize)
);

TRACE_EVENT(rxrpc_transmit,
- TP_PROTO(struct rxrpc_call *call, int space),
+ TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t send_top, int space),

- TP_ARGS(call, space),
+ TP_ARGS(call, send_top, space),

TP_STRUCT__entry(
__field(unsigned int, call)
@@ -925,12 +939,12 @@ TRACE_EVENT(rxrpc_transmit,

TP_fast_assign(
__entry->call = call->debug_id;
- __entry->seq = call->tx_bottom;
+ __entry->seq = call->tx_top + 1;
__entry->space = space;
__entry->tx_winsize = call->tx_winsize;
__entry->cong_cwnd = call->cong_cwnd;
__entry->cong_extra = call->cong_extra;
- __entry->prepared = call->tx_prepared - call->tx_bottom;
+ __entry->prepared = send_top - call->tx_bottom;
__entry->in_flight = call->tx_top - call->acks_hard_ack;
__entry->pmtud_jumbo = call->peer->pmtud_jumbo;
),
@@ -947,6 +961,32 @@ TRACE_EVENT(rxrpc_transmit,
__entry->pmtud_jumbo)
);

+TRACE_EVENT(rxrpc_tx_rotate,
+ TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t seq, rxrpc_seq_t to),
+
+ TP_ARGS(call, seq, to),
+
+ TP_STRUCT__entry(
+ __field(unsigned int, call)
+ __field(rxrpc_seq_t, seq)
+ __field(rxrpc_seq_t, to)
+ __field(rxrpc_seq_t, top)
+ ),
+
+ TP_fast_assign(
+ __entry->call = call->debug_id;
+ __entry->seq = seq;
+ __entry->to = to;
+ __entry->top = call->tx_top;
+ ),
+
+ TP_printk("c=%08x q=%08x-%08x-%08x",
+ __entry->call,
+ __entry->seq,
+ __entry->to,
+ __entry->top)
+ );
+
TRACE_EVENT(rxrpc_rx_data,
TP_PROTO(unsigned int call, rxrpc_seq_t seq,
rxrpc_serial_t serial, u8 flags),
@@ -1621,10 +1661,11 @@ TRACE_EVENT(rxrpc_drop_ack,
);

TRACE_EVENT(rxrpc_retransmit,
- TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t seq,
- rxrpc_serial_t serial, ktime_t expiry),
+ TP_PROTO(struct rxrpc_call *call,
+ struct rxrpc_send_data_req *req,
+ struct rxrpc_txbuf *txb, ktime_t expiry),

- TP_ARGS(call, seq, serial, expiry),
+ TP_ARGS(call, req, txb, expiry),

TP_STRUCT__entry(
__field(unsigned int, call)
@@ -1635,8 +1676,8 @@ TRACE_EVENT(rxrpc_retransmit,

TP_fast_assign(
__entry->call = call->debug_id;
- __entry->seq = seq;
- __entry->serial = serial;
+ __entry->seq = req->seq;
+ __entry->serial = txb->serial;
__entry->expiry = expiry;
),

@@ -1714,9 +1755,9 @@ TRACE_EVENT(rxrpc_reset_cwnd,
__entry->cwnd = call->cong_cwnd;
__entry->extra = call->cong_extra;
__entry->hard_ack = call->acks_hard_ack;
- __entry->prepared = call->tx_prepared - call->tx_bottom;
+ __entry->prepared = call->send_top - call->tx_bottom;
__entry->since_last_tx = ktime_sub(now, call->tx_last_sent);
- __entry->has_data = !list_empty(&call->tx_sendmsg);
+ __entry->has_data = call->tx_bottom != call->tx_top;
),

TP_printk("c=%08x q=%08x %s cw=%u+%u pr=%u tm=%llu d=%u",
@@ -2024,6 +2065,33 @@ TRACE_EVENT(rxrpc_txbuf,
__entry->ref)
);

+TRACE_EVENT(rxrpc_tq,
+ TP_PROTO(struct rxrpc_call *call, struct rxrpc_txqueue *tq,
+ rxrpc_seq_t seq, enum rxrpc_tq_trace trace),
+
+ TP_ARGS(call, tq, seq, trace),
+
+ TP_STRUCT__entry(
+ __field(unsigned int, call_debug_id)
+ __field(rxrpc_seq_t, qbase)
+ __field(rxrpc_seq_t, seq)
+ __field(enum rxrpc_tq_trace, trace)
+ ),
+
+ TP_fast_assign(
+ __entry->call_debug_id = call->debug_id;
+ __entry->qbase = tq ? tq->qbase : call->tx_qbase;
+ __entry->seq = seq;
+ __entry->trace = trace;
+ ),
+
+ TP_printk("c=%08x bq=%08x q=%08x %s",
+ __entry->call_debug_id,
+ __entry->qbase,
+ __entry->seq,
+ __print_symbolic(__entry->trace, rxrpc_tq_traces))
+ );
+
TRACE_EVENT(rxrpc_poke_call,
TP_PROTO(struct rxrpc_call *call, bool busy,
enum rxrpc_call_poke_trace what),
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 84efa21f176c..bcce4862b0b7 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -30,6 +30,7 @@ struct rxrpc_crypt {
struct key_preparsed_payload;
struct rxrpc_connection;
struct rxrpc_txbuf;
+struct rxrpc_txqueue;

/*
* Mark applied to socket buffers in skb->mark. skb->priority is used
@@ -691,13 +692,17 @@ struct rxrpc_call {
unsigned short rx_pkt_offset; /* Current recvmsg packet offset */
unsigned short rx_pkt_len; /* Current recvmsg packet len */

+ /* Sendmsg data tracking. */
+ rxrpc_seq_t send_top; /* Highest Tx slot filled by sendmsg. */
+ struct rxrpc_txqueue *send_queue; /* Queue that sendmsg is writing into */
+
/* Transmitted data tracking. */
spinlock_t tx_lock; /* Transmit queue lock */
- struct list_head tx_sendmsg; /* Sendmsg prepared packets */
- struct list_head tx_buffer; /* Buffer of transmissible packets */
+ struct rxrpc_txqueue *tx_queue; /* Start of transmission buffers */
+ struct rxrpc_txqueue *tx_qtail; /* End of transmission buffers */
+ rxrpc_seq_t tx_qbase; /* First slot in tx_queue */
rxrpc_seq_t tx_bottom; /* First packet in buffer */
rxrpc_seq_t tx_transmitted; /* Highest packet transmitted */
- rxrpc_seq_t tx_prepared; /* Highest Tx slot prepared. */
rxrpc_seq_t tx_top; /* Highest Tx slot allocated. */
u16 tx_backoff; /* Delay to insert due to Tx failure (ms) */
u8 tx_winsize; /* Maximum size of Tx window */
@@ -815,9 +820,6 @@ struct rxrpc_send_params {
* Buffer of data to be output as a packet.
*/
struct rxrpc_txbuf {
- struct list_head call_link; /* Link in call->tx_sendmsg/tx_buffer */
- struct list_head tx_link; /* Link in live Enc queue or Tx queue */
- ktime_t last_sent; /* Time at which last transmitted */
refcount_t ref;
rxrpc_seq_t seq; /* Sequence number of this packet */
rxrpc_serial_t serial; /* Last serial number transmitted with */
@@ -849,6 +851,36 @@ static inline bool rxrpc_sending_to_client(const struct rxrpc_txbuf *txb)
return !rxrpc_sending_to_server(txb);
}

+/*
+ * Transmit queue element, including RACK [RFC8985] per-segment metadata. The
+ * transmission timestamp is in usec from the base.
+ */
+struct rxrpc_txqueue {
+ /* Start with the members we want to prefetch. */
+ struct rxrpc_txqueue *next;
+ ktime_t xmit_ts_base;
+ rxrpc_seq_t qbase;
+
+ /* The arrays we want to pack into as few cache lines as possible. */
+ struct {
+#define RXRPC_NR_TXQUEUE BITS_PER_LONG
+#define RXRPC_TXQ_MASK (RXRPC_NR_TXQUEUE - 1)
+ struct rxrpc_txbuf *bufs[RXRPC_NR_TXQUEUE];
+ unsigned int segment_xmit_ts[RXRPC_NR_TXQUEUE];
+ } ____cacheline_aligned;
+};
+
+/*
+ * Data transmission request.
+ */
+struct rxrpc_send_data_req {
+ ktime_t now; /* Current time */
+ struct rxrpc_txqueue *tq; /* Tx queue segment holding first DATA */
+ rxrpc_seq_t seq; /* Sequence of first data */
+ int n; /* Number of DATA packets to glue into jumbo */
+ bool did_send; /* T if did actually send */
+};
+
#include <trace/events/rxrpc.h>

/*
@@ -905,7 +937,6 @@ void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial,
enum rxrpc_propose_ack_trace why);
void rxrpc_propose_delay_ACK(struct rxrpc_call *, rxrpc_serial_t,
enum rxrpc_propose_ack_trace);
-void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *);
void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb);

bool rxrpc_input_call_event(struct rxrpc_call *call);
@@ -1191,10 +1222,10 @@ void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason,
rxrpc_serial_t serial, enum rxrpc_propose_ack_trace why);
void rxrpc_send_probe_for_pmtud(struct rxrpc_call *call);
int rxrpc_send_abort_packet(struct rxrpc_call *);
+void rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req);
void rxrpc_send_conn_abort(struct rxrpc_connection *conn);
void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb);
void rxrpc_send_keepalive(struct rxrpc_peer *);
-void rxrpc_transmit_data(struct rxrpc_call *call, struct rxrpc_txbuf *txb, int n);

/*
* peer_event.c
diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
index ef47de3f41c6..90e3d9395675 100644
--- a/net/rxrpc/call_event.c
+++ b/net/rxrpc/call_event.c
@@ -62,57 +62,85 @@ static void rxrpc_congestion_timeout(struct rxrpc_call *call)
set_bit(RXRPC_CALL_RETRANS_TIMEOUT, &call->flags);
}

+/*
+ * Retransmit one or more packets.
+ */
+static void rxrpc_retransmit_data(struct rxrpc_call *call,
+ struct rxrpc_send_data_req *req,
+ ktime_t rto)
+{
+ struct rxrpc_txqueue *tq = req->tq;
+ unsigned int ix = req->seq & RXRPC_TXQ_MASK;
+ struct rxrpc_txbuf *txb = tq->bufs[ix];
+ ktime_t xmit_ts, resend_at;
+
+ _enter("%x,%x,%x,%x", tq->qbase, req->seq, ix, txb->debug_id);
+
+ xmit_ts = ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[ix]);
+ resend_at = ktime_add(xmit_ts, rto);
+ trace_rxrpc_retransmit(call, req, txb,
+ ktime_sub(resend_at, req->now));
+
+ txb->flags |= RXRPC_TXBUF_RESENT;
+ rxrpc_send_data_packet(call, req);
+ rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans);
+
+ req->tq = NULL;
+ req->n = 0;
+ req->did_send = true;
+ req->now = ktime_get_real();
+}
+
/*
* Perform retransmission of NAK'd and unack'd packets.
*/
void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
{
+ struct rxrpc_send_data_req req = {
+ .now = ktime_get_real(),
+ };
struct rxrpc_ackpacket *ack = NULL;
struct rxrpc_skb_priv *sp;
+ struct rxrpc_txqueue *tq;
struct rxrpc_txbuf *txb;
- rxrpc_seq_t transmitted = call->tx_transmitted;
+ rxrpc_seq_t transmitted = call->tx_transmitted, seq;
ktime_t next_resend = KTIME_MAX, rto = ns_to_ktime(call->peer->rto_us * NSEC_PER_USEC);
- ktime_t resend_at = KTIME_MAX, now, delay;
+ ktime_t resend_at = KTIME_MAX, delay;
bool unacked = false, did_send = false;
- unsigned int i;
+ unsigned int qix;

_enter("{%d,%d}", call->acks_hard_ack, call->tx_top);

- now = ktime_get_real();
-
- if (list_empty(&call->tx_buffer))
+ if (call->tx_bottom == call->tx_top)
goto no_resend;

trace_rxrpc_resend(call, ack_skb);
- txb = list_first_entry(&call->tx_buffer, struct rxrpc_txbuf, call_link);
+ tq = call->tx_queue;
+ seq = call->tx_bottom;

- /* Scan the soft ACK table without dropping the lock and resend any
- * explicitly NAK'd packets.
- */
+ /* Scan the soft ACK table and resend any explicitly NAK'd packets. */
if (ack_skb) {
sp = rxrpc_skb(ack_skb);
ack = (void *)ack_skb->data + sizeof(struct rxrpc_wire_header);

- for (i = 0; i < sp->ack.nr_acks; i++) {
- rxrpc_seq_t seq;
+ for (int i = 0; i < sp->ack.nr_acks; i++) {
+ rxrpc_seq_t aseq;

if (ack->acks[i] & 1)
continue;
- seq = sp->ack.first_ack + i;
- if (after(txb->seq, transmitted))
- break;
- if (after(txb->seq, seq))
- continue; /* A new hard ACK probably came in */
- list_for_each_entry_from(txb, &call->tx_buffer, call_link) {
- if (txb->seq == seq)
- goto found_txb;
- }
- goto no_further_resend;
+ aseq = sp->ack.first_ack + i;
+ while (after_eq(aseq, tq->qbase + RXRPC_NR_TXQUEUE))
+ tq = tq->next;
+ seq = aseq;
+ qix = seq - tq->qbase;
+ txb = tq->bufs[qix];
+ if (after(seq, transmitted))
+ goto no_further_resend;

- found_txb:
- resend_at = ktime_add(txb->last_sent, rto);
+ resend_at = ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[qix]);
+ resend_at = ktime_add(resend_at, rto);
if (after(txb->serial, call->acks_highest_serial)) {
- if (ktime_after(resend_at, now) &&
+ if (ktime_after(resend_at, req.now) &&
ktime_before(resend_at, next_resend))
next_resend = resend_at;
continue; /* Ack point not yet reached */
@@ -120,17 +148,13 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)

rxrpc_see_txbuf(txb, rxrpc_txbuf_see_unacked);

- trace_rxrpc_retransmit(call, txb->seq, txb->serial,
- ktime_sub(resend_at, now));
-
- txb->flags |= RXRPC_TXBUF_RESENT;
- rxrpc_transmit_data(call, txb, 1);
- did_send = true;
- now = ktime_get_real();
+ req.tq = tq;
+ req.seq = seq;
+ req.n = 1;
+ rxrpc_retransmit_data(call, &req, rto);

- if (list_is_last(&txb->call_link, &call->tx_buffer))
+ if (after_eq(seq, call->tx_top))
goto no_further_resend;
- txb = list_next_entry(txb, call_link);
}
}

@@ -139,35 +163,43 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
* ACK'd or NACK'd in due course, so don't worry about it here; here we
* need to consider retransmitting anything beyond that point.
*/
- if (after_eq(call->acks_prev_seq, call->tx_transmitted))
+ seq = call->acks_prev_seq;
+ if (after_eq(seq, call->tx_transmitted))
goto no_further_resend;
+ seq++;

- list_for_each_entry_from(txb, &call->tx_buffer, call_link) {
- resend_at = ktime_add(txb->last_sent, rto);
+ while (after_eq(seq, tq->qbase + RXRPC_NR_TXQUEUE))
+ tq = tq->next;

- if (before_eq(txb->seq, call->acks_prev_seq))
+ while (before_eq(seq, call->tx_transmitted)) {
+ qix = seq - tq->qbase;
+ if (qix >= RXRPC_NR_TXQUEUE) {
+ tq = tq->next;
continue;
- if (after(txb->seq, call->tx_transmitted))
- break; /* Not transmitted yet */
+ }
+ txb = tq->bufs[qix];
+ resend_at = ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[qix]);
+ resend_at = ktime_add(resend_at, rto);

if (ack && ack->reason == RXRPC_ACK_PING_RESPONSE &&
before(txb->serial, ntohl(ack->serial)))
goto do_resend; /* Wasn't accounted for by a more recent ping. */

- if (ktime_after(resend_at, now)) {
+ if (ktime_after(resend_at, req.now)) {
if (ktime_before(resend_at, next_resend))
next_resend = resend_at;
+ seq++;
continue;
}

do_resend:
unacked = true;

- txb->flags |= RXRPC_TXBUF_RESENT;
- rxrpc_transmit_data(call, txb, 1);
- did_send = true;
- rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans);
- now = ktime_get_real();
+ req.tq = tq;
+ req.seq = seq;
+ req.n = 1;
+ rxrpc_retransmit_data(call, &req, rto);
+ seq++;
}

no_further_resend:
@@ -175,7 +207,8 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
if (resend_at < KTIME_MAX) {
delay = rxrpc_get_rto_backoff(call->peer, did_send);
resend_at = ktime_add(resend_at, delay);
- trace_rxrpc_timer_set(call, resend_at - now, rxrpc_timer_trace_resend_reset);
+ trace_rxrpc_timer_set(call, resend_at - req.now,
+ rxrpc_timer_trace_resend_reset);
}
call->resend_at = resend_at;

@@ -186,11 +219,11 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
* that an ACK got lost somewhere. Send a ping to find out instead of
* retransmitting data.
*/
- if (!did_send) {
+ if (!req.did_send) {
ktime_t next_ping = ktime_add_us(call->acks_latest_ts,
call->peer->srtt_us >> 3);

- if (ktime_sub(next_ping, now) <= 0)
+ if (ktime_sub(next_ping, req.now) <= 0)
rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
rxrpc_propose_ack_ping_for_0_retrans);
}
@@ -240,47 +273,68 @@ static unsigned int rxrpc_tx_window_space(struct rxrpc_call *call)
}

/*
- * Decant some if the sendmsg prepared queue into the transmission buffer.
+ * Transmit some as-yet untransmitted data.
*/
-static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
+static void rxrpc_transmit_fresh_data(struct rxrpc_call *call)
{
int space = rxrpc_tx_window_space(call);

if (!test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
- if (list_empty(&call->tx_sendmsg))
+ if (call->send_top == call->tx_top)
return;
rxrpc_expose_client_call(call);
}

while (space > 0) {
- struct rxrpc_txbuf *head = NULL, *txb;
- int count = 0, limit = min(space, 1);
-
- if (list_empty(&call->tx_sendmsg))
+ struct rxrpc_send_data_req req = {
+ .now = ktime_get_real(),
+ .seq = call->tx_transmitted + 1,
+ .n = 0,
+ };
+ struct rxrpc_txqueue *tq;
+ struct rxrpc_txbuf *txb;
+ rxrpc_seq_t send_top, seq;
+ int limit = min(space, 1);
+
+ /* Order send_top before the contents of the new txbufs and
+ * txqueue pointers
+ */
+ send_top = smp_load_acquire(&call->send_top);
+ if (call->tx_top == send_top)
break;

- trace_rxrpc_transmit(call, space);
+ trace_rxrpc_transmit(call, send_top, space);
+
+ tq = call->tx_qtail;
+ seq = call->tx_top;
+ trace_rxrpc_tq(call, tq, seq, rxrpc_tq_decant);

- spin_lock(&call->tx_lock);
do {
- txb = list_first_entry(&call->tx_sendmsg,
- struct rxrpc_txbuf, call_link);
- if (!head)
- head = txb;
- list_move_tail(&txb->call_link, &call->tx_buffer);
- count++;
+ int ix;
+
+ seq++;
+ ix = seq & RXRPC_TXQ_MASK;
+ if (!ix) {
+ tq = tq->next;
+ trace_rxrpc_tq(call, tq, seq, rxrpc_tq_decant_advance);
+ }
+ if (!req.tq)
+ req.tq = tq;
+ txb = tq->bufs[ix];
+ req.n++;
if (!txb->jumboable)
break;
- } while (count < limit && !list_empty(&call->tx_sendmsg));
+ } while (req.n < limit && before(seq, send_top));

- spin_unlock(&call->tx_lock);
-
- call->tx_top = txb->seq;
- if (txb->flags & RXRPC_LAST_PACKET)
+ if (txb->flags & RXRPC_LAST_PACKET) {
rxrpc_close_tx_phase(call);
+ tq = NULL;
+ }
+ call->tx_qtail = tq;
+ call->tx_top = seq;

- space -= count;
- rxrpc_transmit_data(call, head, count);
+ space -= req.n;
+ rxrpc_send_data_packet(call, &req);
}
}

@@ -288,7 +342,7 @@ static void rxrpc_transmit_some_data(struct rxrpc_call *call)
{
switch (__rxrpc_call_state(call)) {
case RXRPC_CALL_SERVER_ACK_REQUEST:
- if (list_empty(&call->tx_sendmsg))
+ if (call->tx_bottom == READ_ONCE(call->send_top))
return;
rxrpc_begin_service_reply(call);
fallthrough;
@@ -297,11 +351,11 @@ static void rxrpc_transmit_some_data(struct rxrpc_call *call)
case RXRPC_CALL_CLIENT_SEND_REQUEST:
if (!rxrpc_tx_window_space(call))
return;
- if (list_empty(&call->tx_sendmsg)) {
+ if (call->tx_bottom == READ_ONCE(call->send_top)) {
rxrpc_inc_stat(call->rxnet, stat_tx_data_underflow);
return;
}
- rxrpc_decant_prepared_tx(call);
+ rxrpc_transmit_fresh_data(call);
break;
default:
return;
@@ -503,8 +557,6 @@ bool rxrpc_input_call_event(struct rxrpc_call *call)
call->peer->pmtud_pending)
rxrpc_send_probe_for_pmtud(call);
}
- if (call->acks_hard_ack != call->tx_bottom)
- rxrpc_shrink_call_tx_buffer(call);
_leave("");
return true;

diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index c026f16f891e..a9682b31a4f9 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -146,8 +146,6 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
INIT_LIST_HEAD(&call->recvmsg_link);
INIT_LIST_HEAD(&call->sock_link);
INIT_LIST_HEAD(&call->attend_link);
- INIT_LIST_HEAD(&call->tx_sendmsg);
- INIT_LIST_HEAD(&call->tx_buffer);
skb_queue_head_init(&call->rx_queue);
skb_queue_head_init(&call->recvmsg_queue);
skb_queue_head_init(&call->rx_oos_queue);
@@ -532,9 +530,26 @@ void rxrpc_get_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
}

/*
- * Clean up the Rx skb ring.
+ * Clean up the transmission buffers.
*/
-static void rxrpc_cleanup_ring(struct rxrpc_call *call)
+static void rxrpc_cleanup_tx_buffers(struct rxrpc_call *call)
+{
+ struct rxrpc_txqueue *tq, *next;
+
+ for (tq = call->tx_queue; tq; tq = next) {
+ next = tq->next;
+ for (int i = 0; i < RXRPC_NR_TXQUEUE; i++)
+ if (tq->bufs[i])
+ rxrpc_put_txbuf(tq->bufs[i], rxrpc_txbuf_put_cleaned);
+ trace_rxrpc_tq(call, tq, 0, rxrpc_tq_cleaned);
+ kfree(tq);
+ }
+}
+
+/*
+ * Clean up the receive buffers.
+ */
+static void rxrpc_cleanup_rx_buffers(struct rxrpc_call *call)
{
rxrpc_purge_queue(&call->recvmsg_queue);
rxrpc_purge_queue(&call->rx_queue);
@@ -673,23 +688,12 @@ static void rxrpc_rcu_free_call(struct rcu_head *rcu)
static void rxrpc_destroy_call(struct work_struct *work)
{
struct rxrpc_call *call = container_of(work, struct rxrpc_call, destroyer);
- struct rxrpc_txbuf *txb;

del_timer_sync(&call->timer);

rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack);
- rxrpc_cleanup_ring(call);
- while ((txb = list_first_entry_or_null(&call->tx_sendmsg,
- struct rxrpc_txbuf, call_link))) {
- list_del(&txb->call_link);
- rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
- }
- while ((txb = list_first_entry_or_null(&call->tx_buffer,
- struct rxrpc_txbuf, call_link))) {
- list_del(&txb->call_link);
- rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
- }
-
+ rxrpc_cleanup_tx_buffers(call);
+ rxrpc_cleanup_rx_buffers(call);
rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned);
rxrpc_put_connection(call->conn, rxrpc_conn_put_call);
rxrpc_deactivate_bundle(call->bundle);
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 96fe005c5e81..cfdd23042d4c 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -214,24 +214,71 @@ void rxrpc_congestion_degrade(struct rxrpc_call *call)
static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to,
struct rxrpc_ack_summary *summary)
{
- struct rxrpc_txbuf *txb;
+ struct rxrpc_txqueue *tq = call->tx_queue;
+ rxrpc_seq_t seq = call->tx_bottom + 1;
bool rot_last = false;

- list_for_each_entry_rcu(txb, &call->tx_buffer, call_link, false) {
- if (before_eq(txb->seq, call->acks_hard_ack))
- continue;
- if (txb->flags & RXRPC_LAST_PACKET) {
+ _enter("%x,%x,%x", call->tx_bottom, call->acks_hard_ack, to);
+
+ trace_rxrpc_tx_rotate(call, seq, to);
+ trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate);
+
+ /* We may have a left over fully-consumed buffer at the front that we
+ * couldn't drop before (rotate_and_keep below).
+ */
+ if (seq == call->tx_qbase + RXRPC_NR_TXQUEUE) {
+ call->tx_qbase += RXRPC_NR_TXQUEUE;
+ call->tx_queue = tq->next;
+ trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free);
+ kfree(tq);
+ tq = call->tx_queue;
+ }
+
+ do {
+ unsigned int ix = seq - call->tx_qbase;
+
+ _debug("tq=%x seq=%x i=%d f=%x", tq->qbase, seq, ix, tq->bufs[ix]->flags);
+ if (tq->bufs[ix]->flags & RXRPC_LAST_PACKET) {
set_bit(RXRPC_CALL_TX_LAST, &call->flags);
rot_last = true;
}
- if (txb->seq == to)
- break;
- }
+ rxrpc_put_txbuf(tq->bufs[ix], rxrpc_txbuf_put_rotated);
+ tq->bufs[ix] = NULL;
+
+ WRITE_ONCE(call->tx_bottom, seq);
+ WRITE_ONCE(call->acks_hard_ack, seq);
+ trace_rxrpc_txqueue(call, (rot_last ?
+ rxrpc_txqueue_rotate_last :
+ rxrpc_txqueue_rotate));

- if (rot_last)
+ seq++;
+ if (!(seq & RXRPC_TXQ_MASK)) {
+ prefetch(tq->next);
+ if (tq != call->tx_qtail) {
+ call->tx_qbase += RXRPC_NR_TXQUEUE;
+ call->tx_queue = tq->next;
+ trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free);
+ kfree(tq);
+ tq = call->tx_queue;
+ } else {
+ trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_keep);
+ tq = NULL;
+ break;
+ }
+ }
+
+ } while (before_eq(seq, to));
+
+ if (rot_last) {
set_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags);
+ if (tq) {
+ trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free);
+ kfree(tq);
+ call->tx_queue = NULL;
+ }
+ }

- _enter("%x,%x,%x,%d", to, call->acks_hard_ack, call->tx_top, rot_last);
+ _debug("%x,%x,%x,%d", to, call->acks_hard_ack, call->tx_top, rot_last);

if (call->acks_lowest_nak == call->acks_hard_ack) {
call->acks_lowest_nak = to;
@@ -240,11 +287,6 @@ static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to,
call->acks_lowest_nak = to;
}

- smp_store_release(&call->acks_hard_ack, to);
-
- trace_rxrpc_txqueue(call, (rot_last ?
- rxrpc_txqueue_rotate_last :
- rxrpc_txqueue_rotate));
wake_up(&call->waitq);
return rot_last;
}
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index 400c3389d492..c2044d593237 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -375,10 +375,10 @@ int rxrpc_send_abort_packet(struct rxrpc_call *call)
/*
* Prepare a (sub)packet for transmission.
*/
-static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct rxrpc_txbuf *txb,
- rxrpc_serial_t serial,
- int subpkt, int nr_subpkts,
- ktime_t now)
+static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call,
+ struct rxrpc_send_data_req *req,
+ struct rxrpc_txbuf *txb,
+ rxrpc_serial_t serial, int subpkt)
{
struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base;
struct rxrpc_jumbo_header *jumbo = (void *)(whdr + 1) - sizeof(*jumbo);
@@ -386,7 +386,7 @@ static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct rxrpc
struct rxrpc_connection *conn = call->conn;
struct kvec *kv = &call->local->kvec[subpkt];
size_t len = txb->pkt_len;
- bool last, more;
+ bool last;
u8 flags;

_enter("%x,%zd", txb->seq, len);
@@ -401,14 +401,11 @@ static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct rxrpc
flags = txb->flags & RXRPC_TXBUF_WIRE_FLAGS;
last = txb->flags & RXRPC_LAST_PACKET;

- if (subpkt < nr_subpkts - 1) {
+ if (subpkt < req->n - 1) {
len = RXRPC_JUMBO_DATALEN;
goto dont_set_request_ack;
}

- more = (!list_is_last(&txb->call_link, &call->tx_buffer) ||
- !list_empty(&call->tx_sendmsg));
-
/* If our RTT cache needs working on, request an ACK. Also request
* ACKs if a DATA packet appears to have been lost.
*
@@ -430,7 +427,7 @@ static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct rxrpc
why = rxrpc_reqack_more_rtt;
else if (ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000), ktime_get_real()))
why = rxrpc_reqack_old_rtt;
- else if (!last && !more)
+ else if (!last && !after(READ_ONCE(call->send_top), txb->seq))
why = rxrpc_reqack_app_stall;
else
goto dont_set_request_ack;
@@ -439,13 +436,13 @@ static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct rxrpc
trace_rxrpc_req_ack(call->debug_id, txb->seq, why);
if (why != rxrpc_reqack_no_srv_last) {
flags |= RXRPC_REQUEST_ACK;
- rxrpc_begin_rtt_probe(call, serial, now, rxrpc_rtt_tx_data);
- call->peer->rtt_last_req = now;
+ rxrpc_begin_rtt_probe(call, serial, req->now, rxrpc_rtt_tx_data);
+ call->peer->rtt_last_req = req->now;
}
dont_set_request_ack:

/* The jumbo header overlays the wire header in the txbuf. */
- if (subpkt < nr_subpkts - 1)
+ if (subpkt < req->n - 1)
flags |= RXRPC_JUMBO_PACKET;
else
flags &= ~RXRPC_JUMBO_PACKET;
@@ -469,62 +466,100 @@ static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct rxrpc
return len;
}

+/*
+ * Prepare a transmission queue object for initial transmission. Returns the
+ * number of microseconds since the transmission queue base timestamp.
+ */
+static unsigned int rxrpc_prepare_txqueue(struct rxrpc_txqueue *tq,
+ struct rxrpc_send_data_req *req)
+{
+ if (!tq)
+ return 0;
+ if (tq->xmit_ts_base == KTIME_MIN) {
+ tq->xmit_ts_base = req->now;
+ return 0;
+ }
+ return ktime_to_us(ktime_sub(req->now, tq->xmit_ts_base));
+}
+
/*
* Prepare a (jumbo) packet for transmission.
*/
-static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *head, int n)
+static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req)
{
- struct rxrpc_txbuf *txb = head;
+ struct rxrpc_txqueue *tq = req->tq;
rxrpc_serial_t serial;
- ktime_t now = ktime_get_real();
+ unsigned int xmit_ts;
+ rxrpc_seq_t seq = req->seq;
size_t len = 0;

+ trace_rxrpc_tq(call, tq, seq, rxrpc_tq_transmit);
+
/* Each transmission of a Tx packet needs a new serial number */
- serial = rxrpc_get_next_serials(call->conn, n);
+ serial = rxrpc_get_next_serials(call->conn, req->n);

- for (int i = 0; i < n; i++) {
- txb->last_sent = now;
- len += rxrpc_prepare_data_subpacket(call, txb, serial, i, n, now);
- serial++;
- txb = list_next_entry(txb, call_link);
- }
+ call->tx_last_sent = req->now;
+ xmit_ts = rxrpc_prepare_txqueue(tq, req);
+ prefetch(tq->next);

- /* Set timeouts */
- if (call->peer->rtt_count > 1) {
- ktime_t delay = rxrpc_get_rto_backoff(call->peer, false);
+ for (int i = 0;;) {
+ int ix = seq & RXRPC_TXQ_MASK;
+ struct rxrpc_txbuf *txb = tq->bufs[seq & RXRPC_TXQ_MASK];

- call->ack_lost_at = ktime_add(now, delay);
- trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_lost_ack);
+ _debug("prep[%u] tq=%x q=%x", i, tq->qbase, seq);
+ tq->segment_xmit_ts[ix] = xmit_ts;
+ len += rxrpc_prepare_data_subpacket(call, req, txb, serial, i);
+ serial++;
+ seq++;
+ i++;
+ if (i >= req->n)
+ break;
+ if (!(seq & RXRPC_TXQ_MASK)) {
+ tq = tq->next;
+ trace_rxrpc_tq(call, tq, seq, rxrpc_tq_transmit_advance);
+ xmit_ts = rxrpc_prepare_txqueue(tq, req);
+ }
}

+ /* Set timeouts */
if (!test_and_set_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags)) {
ktime_t delay = ms_to_ktime(READ_ONCE(call->next_rx_timo));

- call->expect_rx_by = ktime_add(now, delay);
+ call->expect_rx_by = ktime_add(req->now, delay);
trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_expect_rx);
}
+ if (call->resend_at == KTIME_MAX) {
+ ktime_t delay = rxrpc_get_rto_backoff(call->peer, false);
+
+ call->resend_at = ktime_add(req->now, delay);
+ trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_resend);
+ }

- rxrpc_set_keepalive(call, now);
+ rxrpc_set_keepalive(call, req->now);
return len;
}

/*
- * send a packet through the transport endpoint
+ * Send one or more packets through the transport endpoint
*/
-static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb, int n)
+void rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req)
{
struct rxrpc_connection *conn = call->conn;
enum rxrpc_tx_point frag;
+ struct rxrpc_txqueue *tq = req->tq;
+ struct rxrpc_txbuf *txb;
struct msghdr msg;
+ rxrpc_seq_t seq = req->seq;
size_t len;
bool new_call = test_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags);
int ret;

- _enter("%x,{%d}", txb->seq, txb->pkt_len);
+ _enter("%x,%x-%x", tq->qbase, seq, seq + req->n - 1);

- len = rxrpc_prepare_data_packet(call, txb, n);
+ len = rxrpc_prepare_data_packet(call, req);
+ txb = tq->bufs[seq & RXRPC_TXQ_MASK];

- iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, n, len);
+ iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, req->n, len);

msg.msg_name = &call->peer->srx.transport;
msg.msg_namelen = call->peer->srx.transport_len;
@@ -535,7 +570,7 @@ static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *t
/* Send the packet with the don't fragment bit set unless we think it's
* too big or if this is a retransmission.
*/
- if (txb->seq == call->tx_transmitted + 1 &&
+ if (seq == call->tx_transmitted + 1 &&
len >= sizeof(struct rxrpc_wire_header) + call->peer->max_data) {
rxrpc_local_dont_fragment(conn->local, false);
frag = rxrpc_tx_point_call_data_frag;
@@ -548,8 +583,8 @@ static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *t
* retransmission algorithm doesn't try to resend what we haven't sent
* yet.
*/
- if (txb->seq == call->tx_transmitted + 1)
- call->tx_transmitted = txb->seq + n - 1;
+ if (seq == call->tx_transmitted + 1)
+ call->tx_transmitted = seq + req->n - 1;

if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
static int lose;
@@ -586,19 +621,21 @@ static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *t
rxrpc_tx_backoff(call, ret);

if (ret < 0) {
- /* Cancel the call if the initial transmission fails,
- * particularly if that's due to network routing issues that
- * aren't going away anytime soon. The layer above can arrange
- * the retransmission.
+ /* Cancel the call if the initial transmission fails or if we
+ * hit due to network routing issues that aren't going away
+ * anytime soon. The layer above can arrange the
+ * retransmission.
*/
- if (new_call)
+ if (new_call ||
+ ret == -ENETUNREACH ||
+ ret == -EHOSTUNREACH ||
+ ret == -ECONNREFUSED)
rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
RX_USER_ABORT, ret);
}

done:
_leave(" = %d [%u]", ret, call->peer->max_data);
- return ret;
}

/*
@@ -773,41 +810,3 @@ void rxrpc_send_keepalive(struct rxrpc_peer *peer)
peer->last_tx_at = ktime_get_seconds();
_leave("");
}
-
-/*
- * Schedule an instant Tx resend.
- */
-static inline void rxrpc_instant_resend(struct rxrpc_call *call,
- struct rxrpc_txbuf *txb)
-{
- if (!__rxrpc_call_is_complete(call))
- kdebug("resend");
-}
-
-/*
- * Transmit a packet, possibly gluing several subpackets together.
- */
-void rxrpc_transmit_data(struct rxrpc_call *call, struct rxrpc_txbuf *txb, int n)
-{
- int ret;
-
- ret = rxrpc_send_data_packet(call, txb, n);
- if (ret < 0) {
- switch (ret) {
- case -ENETUNREACH:
- case -EHOSTUNREACH:
- case -ECONNREFUSED:
- rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
- 0, ret);
- break;
- default:
- _debug("need instant resend %d", ret);
- rxrpc_instant_resend(call, txb);
- }
- } else {
- ktime_t delay = ns_to_ktime(call->peer->rto_us * NSEC_PER_USEC);
-
- call->resend_at = ktime_add(ktime_get_real(), delay);
- trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_resend_tx);
- }
-}
diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c
index 467c9402882e..85b35b11755d 100644
--- a/net/rxrpc/sendmsg.c
+++ b/net/rxrpc/sendmsg.c
@@ -98,7 +98,7 @@ static bool rxrpc_check_tx_space(struct rxrpc_call *call, rxrpc_seq_t *_tx_win)

if (_tx_win)
*_tx_win = tx_bottom;
- return call->tx_prepared - tx_bottom < 256;
+ return call->send_top - tx_bottom < 256;
}

/*
@@ -242,36 +242,74 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
struct rxrpc_txbuf *txb,
rxrpc_notify_end_tx_t notify_end_tx)
{
+ struct rxrpc_txqueue *sq = call->send_queue;
rxrpc_seq_t seq = txb->seq;
bool poke, last = txb->flags & RXRPC_LAST_PACKET;
-
+ int ix = seq & RXRPC_TXQ_MASK;
rxrpc_inc_stat(call->rxnet, stat_tx_data);

- ASSERTCMP(txb->seq, ==, call->tx_prepared + 1);
-
- /* We have to set the timestamp before queueing as the retransmit
- * algorithm can see the packet as soon as we queue it.
- */
- txb->last_sent = ktime_get_real();
+ ASSERTCMP(txb->seq, ==, call->send_top + 1);

if (last)
trace_rxrpc_txqueue(call, rxrpc_txqueue_queue_last);
else
trace_rxrpc_txqueue(call, rxrpc_txqueue_queue);

+ if (WARN_ON_ONCE(sq->bufs[ix]))
+ trace_rxrpc_tq(call, sq, seq, rxrpc_tq_queue_dup);
+ else
+ trace_rxrpc_tq(call, sq, seq, rxrpc_tq_queue);
+
/* Add the packet to the call's output buffer */
spin_lock(&call->tx_lock);
- poke = list_empty(&call->tx_sendmsg);
- list_add_tail(&txb->call_link, &call->tx_sendmsg);
- call->tx_prepared = seq;
- if (last)
+ poke = (READ_ONCE(call->tx_bottom) == call->send_top);
+ sq->bufs[ix] = txb;
+ /* Order send_top after the queue->next pointer and txb content. */
+ smp_store_release(&call->send_top, seq);
+ if (last) {
rxrpc_notify_end_tx(rx, call, notify_end_tx);
+ call->send_queue = NULL;
+ }
spin_unlock(&call->tx_lock);

if (poke)
rxrpc_poke_call(call, rxrpc_call_poke_start);
}

+/*
+ * Allocate a new txqueue unit and add it to the transmission queue.
+ */
+static int rxrpc_alloc_txqueue(struct sock *sk, struct rxrpc_call *call)
+{
+ struct rxrpc_txqueue *tq;
+
+ tq = kzalloc(sizeof(*tq), sk->sk_allocation);
+ if (!tq)
+ return -ENOMEM;
+
+ tq->xmit_ts_base = KTIME_MIN;
+ for (int i = 0; i < RXRPC_NR_TXQUEUE; i++)
+ tq->segment_xmit_ts[i] = UINT_MAX;
+
+ if (call->send_queue) {
+ tq->qbase = call->send_top + 1;
+ call->send_queue->next = tq;
+ call->send_queue = tq;
+ } else if (WARN_ON(call->tx_queue)) {
+ kfree(tq);
+ return -ENOMEM;
+ } else {
+ tq->qbase = 0;
+ call->tx_qbase = 0;
+ call->send_queue = tq;
+ call->tx_qtail = tq;
+ call->tx_queue = tq;
+ }
+
+ trace_rxrpc_tq(call, tq, call->send_top, rxrpc_tq_alloc);
+ return 0;
+}
+
/*
* send data through a socket
* - must be called in process context
@@ -346,6 +384,13 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
if (!rxrpc_check_tx_space(call, NULL))
goto wait_for_space;

+ /* See if we need to begin/extend the Tx queue. */
+ if (!call->send_queue || !((call->send_top + 1) & RXRPC_TXQ_MASK)) {
+ ret = rxrpc_alloc_txqueue(sk, call);
+ if (ret < 0)
+ goto maybe_error;
+ }
+
/* Work out the maximum size of a packet. Assume that
* the security header is going to be in the padded
* region (enc blocksize), but the trailer is not.
diff --git a/net/rxrpc/txbuf.c b/net/rxrpc/txbuf.c
index 0cc8f49d69a9..067223c8c35f 100644
--- a/net/rxrpc/txbuf.c
+++ b/net/rxrpc/txbuf.c
@@ -43,17 +43,14 @@ struct rxrpc_txbuf *rxrpc_alloc_data_txbuf(struct rxrpc_call *call, size_t data_

whdr = buf + hoff;

- INIT_LIST_HEAD(&txb->call_link);
- INIT_LIST_HEAD(&txb->tx_link);
refcount_set(&txb->ref, 1);
- txb->last_sent = KTIME_MIN;
txb->call_debug_id = call->debug_id;
txb->debug_id = atomic_inc_return(&rxrpc_txbuf_debug_ids);
txb->alloc_size = data_size;
txb->space = data_size;
txb->offset = sizeof(*whdr);
txb->flags = call->conn->out_clientflag;
- txb->seq = call->tx_prepared + 1;
+ txb->seq = call->send_top + 1;
txb->nr_kvec = 1;
txb->kvec[0].iov_base = whdr;
txb->kvec[0].iov_len = sizeof(*whdr);
@@ -114,8 +111,6 @@ struct rxrpc_txbuf *rxrpc_alloc_ack_txbuf(struct rxrpc_call *call, size_t sack_s
filler = buf + sizeof(*whdr) + sizeof(*ack) + 1;
trailer = buf + sizeof(*whdr) + sizeof(*ack) + 1 + 3;

- INIT_LIST_HEAD(&txb->call_link);
- INIT_LIST_HEAD(&txb->tx_link);
refcount_set(&txb->ref, 1);
txb->call_debug_id = call->debug_id;
txb->debug_id = atomic_inc_return(&rxrpc_txbuf_debug_ids);
@@ -200,37 +195,3 @@ void rxrpc_put_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
rxrpc_free_txbuf(txb);
}
}
-
-/*
- * Shrink the transmit buffer.
- */
-void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call)
-{
- struct rxrpc_txbuf *txb;
- rxrpc_seq_t hard_ack = smp_load_acquire(&call->acks_hard_ack);
- bool wake = false;
-
- _enter("%x/%x/%x", call->tx_bottom, call->acks_hard_ack, call->tx_top);
-
- while ((txb = list_first_entry_or_null(&call->tx_buffer,
- struct rxrpc_txbuf, call_link))) {
- hard_ack = call->acks_hard_ack;
- if (before(hard_ack, txb->seq))
- break;
-
- if (txb->seq != call->tx_bottom + 1)
- rxrpc_see_txbuf(txb, rxrpc_txbuf_see_out_of_step);
- ASSERTCMP(txb->seq, ==, call->tx_bottom + 1);
- WRITE_ONCE(call->tx_bottom, call->tx_bottom + 1);
- list_del_rcu(&txb->call_link);
-
- trace_rxrpc_txqueue(call, rxrpc_txqueue_dequeue);
-
- rxrpc_put_txbuf(txb, rxrpc_txbuf_put_rotated);
- if (after(call->acks_hard_ack, call->tx_bottom + 128))
- wake = true;
- }
-
- if (wake)
- wake_up(&call->waitq);
-}