[PATCH net 1/9] rxrpc: Pass the input handler's data skb reference to the Rx ring
From: David Howells
Date: Thu Aug 22 2019 - 09:45:59 EST
Pass the reference held on a DATA skb in the rxrpc input handler into the
Rx ring rather than getting an additional ref for this and then dropping
the original ref at the end.
Signed-off-by: David Howells <dhowells@xxxxxxxxxx>
---
net/rxrpc/input.c | 48 ++++++++++++++++++++++++++++++++++--------------
1 file changed, 34 insertions(+), 14 deletions(-)
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index dd47d465d1d3..5789ec5ad54f 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -416,7 +416,8 @@ static void rxrpc_input_dup_data(struct rxrpc_call *call, rxrpc_seq_t seq,
}
/*
- * Process a DATA packet, adding the packet to the Rx ring.
+ * Process a DATA packet, adding the packet to the Rx ring. The caller's
+ * packet ref must be passed on or discarded.
*/
static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
{
@@ -425,20 +426,22 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
unsigned int offset = sizeof(struct rxrpc_wire_header);
unsigned int ix;
rxrpc_serial_t serial = sp->hdr.serial, ack_serial = 0;
- rxrpc_seq_t seq = sp->hdr.seq, hard_ack;
+ rxrpc_seq_t seq0 = sp->hdr.seq, seq, hard_ack;
bool immediate_ack = false, jumbo_bad = false, queued;
u16 len;
- u8 ack = 0, flags, annotation = 0;
+ u8 ack = 0, flags, jumbo_flags, annotation = 0;
_enter("{%u,%u},{%u,%u}",
call->rx_hard_ack, call->rx_top, skb->len, seq);
_proto("Rx DATA %%%u { #%u f=%02x }",
- sp->hdr.serial, seq, sp->hdr.flags);
+ sp->hdr.serial, seq0, sp->hdr.flags);
state = READ_ONCE(call->state);
- if (state >= RXRPC_CALL_COMPLETE)
+ if (state >= RXRPC_CALL_COMPLETE) {
+ rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
return;
+ }
if (call->state == RXRPC_CALL_SERVER_RECV_REQUEST) {
unsigned long timo = READ_ONCE(call->next_req_timo);
@@ -463,7 +466,8 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
!rxrpc_receiving_reply(call))
goto unlock;
- call->ackr_prev_seq = seq;
+ call->ackr_prev_seq = seq0;
+ seq = seq0;
hard_ack = READ_ONCE(call->rx_hard_ack);
if (after(seq, hard_ack + call->rx_winsize)) {
@@ -533,12 +537,25 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
* Barriers against rxrpc_recvmsg_data() and rxrpc_rotate_rx_window()
* and also rxrpc_fill_out_ack().
*/
- rxrpc_get_skb(skb, rxrpc_skb_rx_got);
+ if (flags & RXRPC_JUMBO_PACKET) {
+ rxrpc_get_skb(skb, rxrpc_skb_rx_got);
+ if (skb_copy_bits(skb, offset, &jumbo_flags, 1) < 0) {
+ rxrpc_proto_abort("XJF", call, seq);
+ goto unlock;
+ }
+ }
call->rxtx_annotations[ix] = annotation;
smp_wmb();
call->rxtx_buffer[ix] = skb;
if (after(seq, call->rx_top)) {
smp_store_release(&call->rx_top, seq);
+ if (!(flags & RXRPC_JUMBO_PACKET)) {
+ /* From this point on, we're not allowed to touch the
+ * packet any longer as its ref now belongs to the Rx
+ * ring.
+ */
+ skb = NULL;
+ }
} else if (before(seq, call->rx_top)) {
/* Send an immediate ACK if we fill in a hole */
if (!ack) {
@@ -567,10 +584,7 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
skip:
offset += len;
if (flags & RXRPC_JUMBO_PACKET) {
- if (skb_copy_bits(skb, offset, &flags, 1) < 0) {
- rxrpc_proto_abort("XJF", call, seq);
- goto unlock;
- }
+ flags = jumbo_flags;
offset += sizeof(struct rxrpc_jumbo_header);
seq++;
serial++;
@@ -606,13 +620,14 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
false, true,
rxrpc_propose_ack_input_data);
- if (sp->hdr.seq == READ_ONCE(call->rx_hard_ack) + 1) {
+ if (seq0 == READ_ONCE(call->rx_hard_ack) + 1) {
trace_rxrpc_notify_socket(call->debug_id, serial);
rxrpc_notify_socket(call);
}
unlock:
spin_unlock(&call->input_lock);
+ rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
_leave(" [queued]");
}
@@ -1021,7 +1036,7 @@ static void rxrpc_input_call_packet(struct rxrpc_call *call,
switch (sp->hdr.type) {
case RXRPC_PACKET_TYPE_DATA:
rxrpc_input_data(call, skb);
- break;
+ goto no_free;
case RXRPC_PACKET_TYPE_ACK:
rxrpc_input_ack(call, skb);
@@ -1048,6 +1063,8 @@ static void rxrpc_input_call_packet(struct rxrpc_call *call,
break;
}
+ rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
+no_free:
_leave("");
}
@@ -1373,8 +1390,11 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
mutex_unlock(&call->user_mutex);
}
+ /* Process a call packet; this either discards or passes on the ref
+ * elsewhere.
+ */
rxrpc_input_call_packet(call, skb);
- goto discard;
+ goto out;
discard:
rxrpc_free_skb(skb, rxrpc_skb_rx_freed);