[PATCH net-next 18/36] rxrpc: Split the receive code

From: David Howells
Date: Thu Dec 01 2022 - 19:20:47 EST


Split the code that handles packet reception in softirq mode as a prelude
to moving all the packet processing beyond routing to the appropriate call
and setting up of a new call out into process context.

Signed-off-by: David Howells <dhowells@xxxxxxxxxx>
cc: Marc Dionne <marc.dionne@xxxxxxxxxxxx>
cc: linux-afs@xxxxxxxxxxxxxxxxxxx
---

net/rxrpc/Makefile | 1
net/rxrpc/ar-internal.h | 7 +
net/rxrpc/input.c | 372 +----------------------------------------------
net/rxrpc/io_thread.c | 370 +++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 384 insertions(+), 366 deletions(-)
create mode 100644 net/rxrpc/io_thread.c

diff --git a/net/rxrpc/Makefile b/net/rxrpc/Makefile
index 79687477d93c..e76d3459d78e 100644
--- a/net/rxrpc/Makefile
+++ b/net/rxrpc/Makefile
@@ -16,6 +16,7 @@ rxrpc-y := \
conn_service.o \
input.o \
insecure.o \
+ io_thread.o \
key.o \
local_event.o \
local_object.o \
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 41a57c145f2b..523cc9c5ab12 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -946,6 +946,13 @@ void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
/*
* input.c
*/
+void rxrpc_input_call_packet(struct rxrpc_call *, struct sk_buff *);
+void rxrpc_input_implicit_end_call(struct rxrpc_sock *, struct rxrpc_connection *,
+ struct rxrpc_call *);
+
+/*
+ * io_thread.c
+ */
int rxrpc_input_packet(struct sock *, struct sk_buff *);

/*
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index ab8b7a1be935..f4f6f3c62d03 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: GPL-2.0-or-later
-/* RxRPC packet reception
+/* Processing of received RxRPC packets
*
- * Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
+ * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
* Written by David Howells (dhowells@xxxxxxxxxx)
*/

@@ -1029,7 +1029,7 @@ static void rxrpc_input_abort(struct rxrpc_call *call, struct sk_buff *skb)
/*
* Process an incoming call packet.
*/
-static void rxrpc_input_call_packet(struct rxrpc_call *call,
+void rxrpc_input_call_packet(struct rxrpc_call *call,
struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
@@ -1086,9 +1086,9 @@ static void rxrpc_input_call_packet(struct rxrpc_call *call,
*
* TODO: If callNumber > call_id + 1, renegotiate security.
*/
-static void rxrpc_input_implicit_end_call(struct rxrpc_sock *rx,
- struct rxrpc_connection *conn,
- struct rxrpc_call *call)
+void rxrpc_input_implicit_end_call(struct rxrpc_sock *rx,
+ struct rxrpc_connection *conn,
+ struct rxrpc_call *call)
{
switch (READ_ONCE(call->state)) {
case RXRPC_CALL_SERVER_AWAIT_ACK:
@@ -1109,363 +1109,3 @@ static void rxrpc_input_implicit_end_call(struct rxrpc_sock *rx,
__rxrpc_disconnect_call(conn, call);
spin_unlock(&rx->incoming_lock);
}
-
-/*
- * post connection-level events to the connection
- * - this includes challenges, responses, some aborts and call terminal packet
- * retransmission.
- */
-static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
- struct sk_buff *skb)
-{
- _enter("%p,%p", conn, skb);
-
- skb_queue_tail(&conn->rx_queue, skb);
- rxrpc_queue_conn(conn, rxrpc_conn_queue_rx_work);
-}
-
-/*
- * post endpoint-level events to the local endpoint
- * - this includes debug and version messages
- */
-static void rxrpc_post_packet_to_local(struct rxrpc_local *local,
- struct sk_buff *skb)
-{
- _enter("%p,%p", local, skb);
-
- if (rxrpc_get_local_maybe(local, rxrpc_local_get_queue)) {
- skb_queue_tail(&local->event_queue, skb);
- rxrpc_queue_local(local);
- } else {
- rxrpc_free_skb(skb, rxrpc_skb_put_input);
- }
-}
-
-/*
- * put a packet up for transport-level abort
- */
-static void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
-{
- if (rxrpc_get_local_maybe(local, rxrpc_local_get_queue)) {
- skb_queue_tail(&local->reject_queue, skb);
- rxrpc_queue_local(local);
- } else {
- rxrpc_free_skb(skb, rxrpc_skb_put_input);
- }
-}
-
-/*
- * Extract the wire header from a packet and translate the byte order.
- */
-static noinline
-int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
-{
- struct rxrpc_wire_header whdr;
-
- /* dig out the RxRPC connection details */
- if (skb_copy_bits(skb, 0, &whdr, sizeof(whdr)) < 0) {
- trace_rxrpc_rx_eproto(NULL, sp->hdr.serial,
- tracepoint_string("bad_hdr"));
- return -EBADMSG;
- }
-
- memset(sp, 0, sizeof(*sp));
- sp->hdr.epoch = ntohl(whdr.epoch);
- sp->hdr.cid = ntohl(whdr.cid);
- sp->hdr.callNumber = ntohl(whdr.callNumber);
- sp->hdr.seq = ntohl(whdr.seq);
- sp->hdr.serial = ntohl(whdr.serial);
- sp->hdr.flags = whdr.flags;
- sp->hdr.type = whdr.type;
- sp->hdr.userStatus = whdr.userStatus;
- sp->hdr.securityIndex = whdr.securityIndex;
- sp->hdr._rsvd = ntohs(whdr._rsvd);
- sp->hdr.serviceId = ntohs(whdr.serviceId);
- return 0;
-}
-
-/*
- * Extract the abort code from an ABORT packet and stash it in skb->priority.
- */
-static bool rxrpc_extract_abort(struct sk_buff *skb)
-{
- __be32 wtmp;
-
- if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header),
- &wtmp, sizeof(wtmp)) < 0)
- return false;
- skb->priority = ntohl(wtmp);
- return true;
-}
-
-/*
- * handle data received on the local endpoint
- * - may be called in interrupt context
- *
- * [!] Note that as this is called from the encap_rcv hook, the socket is not
- * held locked by the caller and nothing prevents sk_user_data on the UDP from
- * being cleared in the middle of processing this function.
- *
- * Called with the RCU read lock held from the IP layer via UDP.
- */
-int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
-{
- struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
- struct rxrpc_connection *conn;
- struct rxrpc_channel *chan;
- struct rxrpc_call *call = NULL;
- struct rxrpc_skb_priv *sp;
- struct rxrpc_peer *peer = NULL;
- struct rxrpc_sock *rx = NULL;
- unsigned int channel;
-
- _enter("%p", udp_sk);
-
- if (unlikely(!local)) {
- kfree_skb(skb);
- return 0;
- }
- if (skb->tstamp == 0)
- skb->tstamp = ktime_get_real();
-
- rxrpc_new_skb(skb, rxrpc_skb_new_encap_rcv);
-
- skb_pull(skb, sizeof(struct udphdr));
-
- /* The UDP protocol already released all skb resources;
- * we are free to add our own data there.
- */
- sp = rxrpc_skb(skb);
-
- /* dig out the RxRPC connection details */
- if (rxrpc_extract_header(sp, skb) < 0)
- goto bad_message;
-
- if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
- static int lose;
- if ((lose++ & 7) == 7) {
- trace_rxrpc_rx_lose(sp);
- rxrpc_free_skb(skb, rxrpc_skb_put_lose);
- return 0;
- }
- }
-
- if (skb->tstamp == 0)
- skb->tstamp = ktime_get_real();
- trace_rxrpc_rx_packet(sp);
-
- switch (sp->hdr.type) {
- case RXRPC_PACKET_TYPE_VERSION:
- if (rxrpc_to_client(sp))
- goto discard;
- rxrpc_post_packet_to_local(local, skb);
- goto out;
-
- case RXRPC_PACKET_TYPE_BUSY:
- if (rxrpc_to_server(sp))
- goto discard;
- fallthrough;
- case RXRPC_PACKET_TYPE_ACK:
- case RXRPC_PACKET_TYPE_ACKALL:
- if (sp->hdr.callNumber == 0)
- goto bad_message;
- break;
- case RXRPC_PACKET_TYPE_ABORT:
- if (!rxrpc_extract_abort(skb))
- return true; /* Just discard if malformed */
- break;
-
- case RXRPC_PACKET_TYPE_DATA:
- if (sp->hdr.callNumber == 0 ||
- sp->hdr.seq == 0)
- goto bad_message;
-
- /* Unshare the packet so that it can be modified for in-place
- * decryption.
- */
- if (sp->hdr.securityIndex != 0) {
- struct sk_buff *nskb = skb_unshare(skb, GFP_ATOMIC);
- if (!nskb) {
- rxrpc_eaten_skb(skb, rxrpc_skb_eaten_by_unshare_nomem);
- goto out;
- }
-
- if (nskb != skb) {
- rxrpc_eaten_skb(skb, rxrpc_skb_eaten_by_unshare);
- skb = nskb;
- rxrpc_new_skb(skb, rxrpc_skb_new_unshared);
- sp = rxrpc_skb(skb);
- }
- }
- break;
-
- case RXRPC_PACKET_TYPE_CHALLENGE:
- if (rxrpc_to_server(sp))
- goto discard;
- break;
- case RXRPC_PACKET_TYPE_RESPONSE:
- if (rxrpc_to_client(sp))
- goto discard;
- break;
-
- /* Packet types 9-11 should just be ignored. */
- case RXRPC_PACKET_TYPE_PARAMS:
- case RXRPC_PACKET_TYPE_10:
- case RXRPC_PACKET_TYPE_11:
- goto discard;
-
- default:
- goto bad_message;
- }
-
- if (sp->hdr.serviceId == 0)
- goto bad_message;
-
- if (rxrpc_to_server(sp)) {
- /* Weed out packets to services we're not offering. Packets
- * that would begin a call are explicitly rejected and the rest
- * are just discarded.
- */
- rx = rcu_dereference(local->service);
- if (!rx || (sp->hdr.serviceId != rx->srx.srx_service &&
- sp->hdr.serviceId != rx->second_service)) {
- if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA &&
- sp->hdr.seq == 1)
- goto unsupported_service;
- goto discard;
- }
- }
-
- conn = rxrpc_find_connection_rcu(local, skb, &peer);
- if (conn) {
- if (sp->hdr.securityIndex != conn->security_ix)
- goto wrong_security;
-
- if (sp->hdr.serviceId != conn->service_id) {
- int old_id;
-
- if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags))
- goto reupgrade;
- old_id = cmpxchg(&conn->service_id, conn->orig_service_id,
- sp->hdr.serviceId);
-
- if (old_id != conn->orig_service_id &&
- old_id != sp->hdr.serviceId)
- goto reupgrade;
- }
-
- if (sp->hdr.callNumber == 0) {
- /* Connection-level packet */
- _debug("CONN %p {%d}", conn, conn->debug_id);
- rxrpc_post_packet_to_conn(conn, skb);
- goto out;
- }
-
- if ((int)sp->hdr.serial - (int)conn->hi_serial > 0)
- conn->hi_serial = sp->hdr.serial;
-
- /* Call-bound packets are routed by connection channel. */
- channel = sp->hdr.cid & RXRPC_CHANNELMASK;
- chan = &conn->channels[channel];
-
- /* Ignore really old calls */
- if (sp->hdr.callNumber < chan->last_call)
- goto discard;
-
- if (sp->hdr.callNumber == chan->last_call) {
- if (chan->call ||
- sp->hdr.type == RXRPC_PACKET_TYPE_ABORT)
- goto discard;
-
- /* For the previous service call, if completed
- * successfully, we discard all further packets.
- */
- if (rxrpc_conn_is_service(conn) &&
- chan->last_type == RXRPC_PACKET_TYPE_ACK)
- goto discard;
-
- /* But otherwise we need to retransmit the final packet
- * from data cached in the connection record.
- */
- if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA)
- trace_rxrpc_rx_data(chan->call_debug_id,
- sp->hdr.seq,
- sp->hdr.serial,
- sp->hdr.flags);
- rxrpc_post_packet_to_conn(conn, skb);
- goto out;
- }
-
- call = rcu_dereference(chan->call);
-
- if (sp->hdr.callNumber > chan->call_id) {
- if (rxrpc_to_client(sp))
- goto reject_packet;
- if (call)
- rxrpc_input_implicit_end_call(rx, conn, call);
- call = NULL;
- }
-
- if (call) {
- if (sp->hdr.serviceId != call->service_id)
- call->service_id = sp->hdr.serviceId;
- if ((int)sp->hdr.serial - (int)call->rx_serial > 0)
- call->rx_serial = sp->hdr.serial;
- if (!test_bit(RXRPC_CALL_RX_HEARD, &call->flags))
- set_bit(RXRPC_CALL_RX_HEARD, &call->flags);
- }
- }
-
- if (!call || refcount_read(&call->ref) == 0) {
- if (rxrpc_to_client(sp) ||
- sp->hdr.type != RXRPC_PACKET_TYPE_DATA)
- goto bad_message;
- if (sp->hdr.seq != 1)
- goto discard;
- call = rxrpc_new_incoming_call(local, rx, skb);
- if (!call)
- goto reject_packet;
- }
-
- /* Process a call packet; this either discards or passes on the ref
- * elsewhere.
- */
- rxrpc_input_call_packet(call, skb);
- goto out;
-
-discard:
- rxrpc_free_skb(skb, rxrpc_skb_put_input);
-out:
- trace_rxrpc_rx_done(0, 0);
- return 0;
-
-wrong_security:
- trace_rxrpc_abort(0, "SEC", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
- RXKADINCONSISTENCY, EBADMSG);
- skb->priority = RXKADINCONSISTENCY;
- goto post_abort;
-
-unsupported_service:
- trace_rxrpc_abort(0, "INV", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
- RX_INVALID_OPERATION, EOPNOTSUPP);
- skb->priority = RX_INVALID_OPERATION;
- goto post_abort;
-
-reupgrade:
- trace_rxrpc_abort(0, "UPG", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
- RX_PROTOCOL_ERROR, EBADMSG);
- goto protocol_error;
-
-bad_message:
- trace_rxrpc_abort(0, "BAD", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
- RX_PROTOCOL_ERROR, EBADMSG);
-protocol_error:
- skb->priority = RX_PROTOCOL_ERROR;
-post_abort:
- skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
-reject_packet:
- trace_rxrpc_rx_done(skb->mark, skb->priority);
- rxrpc_reject_packet(local, skb);
- _leave(" [badmsg]");
- return 0;
-}
diff --git a/net/rxrpc/io_thread.c b/net/rxrpc/io_thread.c
new file mode 100644
index 000000000000..d2aaad5afa1d
--- /dev/null
+++ b/net/rxrpc/io_thread.c
@@ -0,0 +1,370 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/* RxRPC packet reception
+ *
+ * Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@xxxxxxxxxx)
+ */
+
+#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
+
+#include "ar-internal.h"
+
+/*
+ * post connection-level events to the connection
+ * - this includes challenges, responses, some aborts and call terminal packet
+ * retransmission.
+ */
+static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
+ struct sk_buff *skb)
+{
+ _enter("%p,%p", conn, skb);
+
+ skb_queue_tail(&conn->rx_queue, skb);
+ rxrpc_queue_conn(conn, rxrpc_conn_queue_rx_work);
+}
+
+/*
+ * post endpoint-level events to the local endpoint
+ * - this includes debug and version messages
+ */
+static void rxrpc_post_packet_to_local(struct rxrpc_local *local,
+ struct sk_buff *skb)
+{
+ _enter("%p,%p", local, skb);
+
+ if (rxrpc_get_local_maybe(local, rxrpc_local_get_queue)) {
+ skb_queue_tail(&local->event_queue, skb);
+ rxrpc_queue_local(local);
+ } else {
+ rxrpc_free_skb(skb, rxrpc_skb_put_input);
+ }
+}
+
+/*
+ * put a packet up for transport-level abort
+ */
+static void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
+{
+ if (rxrpc_get_local_maybe(local, rxrpc_local_get_queue)) {
+ skb_queue_tail(&local->reject_queue, skb);
+ rxrpc_queue_local(local);
+ } else {
+ rxrpc_free_skb(skb, rxrpc_skb_put_input);
+ }
+}
+
+/*
+ * Extract the wire header from a packet and translate the byte order.
+ */
+static noinline
+int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
+{
+ struct rxrpc_wire_header whdr;
+
+ /* dig out the RxRPC connection details */
+ if (skb_copy_bits(skb, 0, &whdr, sizeof(whdr)) < 0) {
+ trace_rxrpc_rx_eproto(NULL, sp->hdr.serial,
+ tracepoint_string("bad_hdr"));
+ return -EBADMSG;
+ }
+
+ memset(sp, 0, sizeof(*sp));
+ sp->hdr.epoch = ntohl(whdr.epoch);
+ sp->hdr.cid = ntohl(whdr.cid);
+ sp->hdr.callNumber = ntohl(whdr.callNumber);
+ sp->hdr.seq = ntohl(whdr.seq);
+ sp->hdr.serial = ntohl(whdr.serial);
+ sp->hdr.flags = whdr.flags;
+ sp->hdr.type = whdr.type;
+ sp->hdr.userStatus = whdr.userStatus;
+ sp->hdr.securityIndex = whdr.securityIndex;
+ sp->hdr._rsvd = ntohs(whdr._rsvd);
+ sp->hdr.serviceId = ntohs(whdr.serviceId);
+ return 0;
+}
+
+/*
+ * Extract the abort code from an ABORT packet and stash it in skb->priority.
+ */
+static bool rxrpc_extract_abort(struct sk_buff *skb)
+{
+ __be32 wtmp;
+
+ if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header),
+ &wtmp, sizeof(wtmp)) < 0)
+ return false;
+ skb->priority = ntohl(wtmp);
+ return true;
+}
+
+/*
+ * handle data received on the local endpoint
+ * - may be called in interrupt context
+ *
+ * [!] Note that as this is called from the encap_rcv hook, the socket is not
+ * held locked by the caller and nothing prevents sk_user_data on the UDP from
+ * being cleared in the middle of processing this function.
+ *
+ * Called with the RCU read lock held from the IP layer via UDP.
+ */
+int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
+{
+ struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
+ struct rxrpc_connection *conn;
+ struct rxrpc_channel *chan;
+ struct rxrpc_call *call = NULL;
+ struct rxrpc_skb_priv *sp;
+ struct rxrpc_peer *peer = NULL;
+ struct rxrpc_sock *rx = NULL;
+ unsigned int channel;
+
+ _enter("%p", udp_sk);
+
+ if (unlikely(!local)) {
+ kfree_skb(skb);
+ return 0;
+ }
+ if (skb->tstamp == 0)
+ skb->tstamp = ktime_get_real();
+
+ rxrpc_new_skb(skb, rxrpc_skb_new_encap_rcv);
+
+ skb_pull(skb, sizeof(struct udphdr));
+
+ /* The UDP protocol already released all skb resources;
+ * we are free to add our own data there.
+ */
+ sp = rxrpc_skb(skb);
+
+ /* dig out the RxRPC connection details */
+ if (rxrpc_extract_header(sp, skb) < 0)
+ goto bad_message;
+
+ if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
+ static int lose;
+ if ((lose++ & 7) == 7) {
+ trace_rxrpc_rx_lose(sp);
+ rxrpc_free_skb(skb, rxrpc_skb_put_lose);
+ return 0;
+ }
+ }
+
+ if (skb->tstamp == 0)
+ skb->tstamp = ktime_get_real();
+ trace_rxrpc_rx_packet(sp);
+
+ switch (sp->hdr.type) {
+ case RXRPC_PACKET_TYPE_VERSION:
+ if (rxrpc_to_client(sp))
+ goto discard;
+ rxrpc_post_packet_to_local(local, skb);
+ goto out;
+
+ case RXRPC_PACKET_TYPE_BUSY:
+ if (rxrpc_to_server(sp))
+ goto discard;
+ fallthrough;
+ case RXRPC_PACKET_TYPE_ACK:
+ case RXRPC_PACKET_TYPE_ACKALL:
+ if (sp->hdr.callNumber == 0)
+ goto bad_message;
+ break;
+ case RXRPC_PACKET_TYPE_ABORT:
+ if (!rxrpc_extract_abort(skb))
+ return true; /* Just discard if malformed */
+ break;
+
+ case RXRPC_PACKET_TYPE_DATA:
+ if (sp->hdr.callNumber == 0 ||
+ sp->hdr.seq == 0)
+ goto bad_message;
+
+ /* Unshare the packet so that it can be modified for in-place
+ * decryption.
+ */
+ if (sp->hdr.securityIndex != 0) {
+ struct sk_buff *nskb = skb_unshare(skb, GFP_ATOMIC);
+ if (!nskb) {
+ rxrpc_eaten_skb(skb, rxrpc_skb_eaten_by_unshare_nomem);
+ goto out;
+ }
+
+ if (nskb != skb) {
+ rxrpc_eaten_skb(skb, rxrpc_skb_eaten_by_unshare);
+ skb = nskb;
+ rxrpc_new_skb(skb, rxrpc_skb_new_unshared);
+ sp = rxrpc_skb(skb);
+ }
+ }
+ break;
+
+ case RXRPC_PACKET_TYPE_CHALLENGE:
+ if (rxrpc_to_server(sp))
+ goto discard;
+ break;
+ case RXRPC_PACKET_TYPE_RESPONSE:
+ if (rxrpc_to_client(sp))
+ goto discard;
+ break;
+
+ /* Packet types 9-11 should just be ignored. */
+ case RXRPC_PACKET_TYPE_PARAMS:
+ case RXRPC_PACKET_TYPE_10:
+ case RXRPC_PACKET_TYPE_11:
+ goto discard;
+
+ default:
+ goto bad_message;
+ }
+
+ if (sp->hdr.serviceId == 0)
+ goto bad_message;
+
+ if (rxrpc_to_server(sp)) {
+ /* Weed out packets to services we're not offering. Packets
+ * that would begin a call are explicitly rejected and the rest
+ * are just discarded.
+ */
+ rx = rcu_dereference(local->service);
+ if (!rx || (sp->hdr.serviceId != rx->srx.srx_service &&
+ sp->hdr.serviceId != rx->second_service)) {
+ if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA &&
+ sp->hdr.seq == 1)
+ goto unsupported_service;
+ goto discard;
+ }
+ }
+
+ conn = rxrpc_find_connection_rcu(local, skb, &peer);
+ if (conn) {
+ if (sp->hdr.securityIndex != conn->security_ix)
+ goto wrong_security;
+
+ if (sp->hdr.serviceId != conn->service_id) {
+ int old_id;
+
+ if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags))
+ goto reupgrade;
+ old_id = cmpxchg(&conn->service_id, conn->orig_service_id,
+ sp->hdr.serviceId);
+
+ if (old_id != conn->orig_service_id &&
+ old_id != sp->hdr.serviceId)
+ goto reupgrade;
+ }
+
+ if (sp->hdr.callNumber == 0) {
+ /* Connection-level packet */
+ _debug("CONN %p {%d}", conn, conn->debug_id);
+ rxrpc_post_packet_to_conn(conn, skb);
+ goto out;
+ }
+
+ if ((int)sp->hdr.serial - (int)conn->hi_serial > 0)
+ conn->hi_serial = sp->hdr.serial;
+
+ /* Call-bound packets are routed by connection channel. */
+ channel = sp->hdr.cid & RXRPC_CHANNELMASK;
+ chan = &conn->channels[channel];
+
+ /* Ignore really old calls */
+ if (sp->hdr.callNumber < chan->last_call)
+ goto discard;
+
+ if (sp->hdr.callNumber == chan->last_call) {
+ if (chan->call ||
+ sp->hdr.type == RXRPC_PACKET_TYPE_ABORT)
+ goto discard;
+
+ /* For the previous service call, if completed
+ * successfully, we discard all further packets.
+ */
+ if (rxrpc_conn_is_service(conn) &&
+ chan->last_type == RXRPC_PACKET_TYPE_ACK)
+ goto discard;
+
+ /* But otherwise we need to retransmit the final packet
+ * from data cached in the connection record.
+ */
+ if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA)
+ trace_rxrpc_rx_data(chan->call_debug_id,
+ sp->hdr.seq,
+ sp->hdr.serial,
+ sp->hdr.flags);
+ rxrpc_post_packet_to_conn(conn, skb);
+ goto out;
+ }
+
+ call = rcu_dereference(chan->call);
+
+ if (sp->hdr.callNumber > chan->call_id) {
+ if (rxrpc_to_client(sp))
+ goto reject_packet;
+ if (call)
+ rxrpc_input_implicit_end_call(rx, conn, call);
+ call = NULL;
+ }
+
+ if (call) {
+ if (sp->hdr.serviceId != call->service_id)
+ call->service_id = sp->hdr.serviceId;
+ if ((int)sp->hdr.serial - (int)call->rx_serial > 0)
+ call->rx_serial = sp->hdr.serial;
+ if (!test_bit(RXRPC_CALL_RX_HEARD, &call->flags))
+ set_bit(RXRPC_CALL_RX_HEARD, &call->flags);
+ }
+ }
+
+ if (!call || refcount_read(&call->ref) == 0) {
+ if (rxrpc_to_client(sp) ||
+ sp->hdr.type != RXRPC_PACKET_TYPE_DATA)
+ goto bad_message;
+ if (sp->hdr.seq != 1)
+ goto discard;
+ call = rxrpc_new_incoming_call(local, rx, skb);
+ if (!call)
+ goto reject_packet;
+ }
+
+ /* Process a call packet; this either discards or passes on the ref
+ * elsewhere.
+ */
+ rxrpc_input_call_packet(call, skb);
+ goto out;
+
+discard:
+ rxrpc_free_skb(skb, rxrpc_skb_put_input);
+out:
+ trace_rxrpc_rx_done(0, 0);
+ return 0;
+
+wrong_security:
+ trace_rxrpc_abort(0, "SEC", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
+ RXKADINCONSISTENCY, EBADMSG);
+ skb->priority = RXKADINCONSISTENCY;
+ goto post_abort;
+
+unsupported_service:
+ trace_rxrpc_abort(0, "INV", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
+ RX_INVALID_OPERATION, EOPNOTSUPP);
+ skb->priority = RX_INVALID_OPERATION;
+ goto post_abort;
+
+reupgrade:
+ trace_rxrpc_abort(0, "UPG", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
+ RX_PROTOCOL_ERROR, EBADMSG);
+ goto protocol_error;
+
+bad_message:
+ trace_rxrpc_abort(0, "BAD", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
+ RX_PROTOCOL_ERROR, EBADMSG);
+protocol_error:
+ skb->priority = RX_PROTOCOL_ERROR;
+post_abort:
+ skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
+reject_packet:
+ trace_rxrpc_rx_done(skb->mark, skb->priority);
+ rxrpc_reject_packet(local, skb);
+ _leave(" [badmsg]");
+ return 0;
+}