[PATCH net-next 23/35] rxrpc: Implement a mechanism to send an event notification to a call
From: David Howells
Date: Wed Nov 30 2022 - 12:01:45 EST
Provide a means by which an event notification can be sent to a call such
that the I/O thread can process it rather than it being done in a separate
workqueue. This will allow a lot of locking to be removed.
Signed-off-by: David Howells <dhowells@xxxxxxxxxx>
cc: Marc Dionne <marc.dionne@xxxxxxxxxxxx>
cc: linux-afs@xxxxxxxxxxxxxxxxxxx
---
include/trace/events/rxrpc.h | 52 ++++++++++++++++++++++++++++++++++++++++++
net/rxrpc/ar-internal.h | 5 +++-
net/rxrpc/call_object.c | 24 +++++++++++++++++++
net/rxrpc/input.c | 3 +-
net/rxrpc/io_thread.c | 20 +++++++++++++++-
net/rxrpc/local_object.c | 1 +
6 files changed, 100 insertions(+), 5 deletions(-)
diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h
index 44a9be9836f9..0b12d96c7921 100644
--- a/include/trace/events/rxrpc.h
+++ b/include/trace/events/rxrpc.h
@@ -16,6 +16,13 @@
/*
* Declare tracing information enums and their string mappings for display.
*/
+#define rxrpc_call_poke_traces \
+ EM(rxrpc_call_poke_error, "Error") \
+ EM(rxrpc_call_poke_idle, "Idle") \
+ EM(rxrpc_call_poke_start, "Start") \
+ EM(rxrpc_call_poke_timer, "Timer") \
+ E_(rxrpc_call_poke_timer_now, "Timer-now")
+
#define rxrpc_skb_traces \
EM(rxrpc_skb_eaten_by_unshare, "ETN unshare ") \
EM(rxrpc_skb_eaten_by_unshare_nomem, "ETN unshar-nm") \
@@ -150,6 +157,7 @@
EM(rxrpc_call_get_input, "GET input ") \
EM(rxrpc_call_get_kernel_service, "GET krnl-srv") \
EM(rxrpc_call_get_notify_socket, "GET notify ") \
+ EM(rxrpc_call_get_poke, "GET poke ") \
EM(rxrpc_call_get_recvmsg, "GET recvmsg ") \
EM(rxrpc_call_get_release_sock, "GET rel-sock") \
EM(rxrpc_call_get_sendmsg, "GET sendmsg ") \
@@ -160,6 +168,7 @@
EM(rxrpc_call_put_discard_prealloc, "PUT disc-pre") \
EM(rxrpc_call_put_input, "PUT input ") \
EM(rxrpc_call_put_kernel, "PUT kernel ") \
+ EM(rxrpc_call_put_poke, "PUT poke ") \
EM(rxrpc_call_put_recvmsg, "PUT recvmsg ") \
EM(rxrpc_call_put_release_sock, "PUT rls-sock") \
EM(rxrpc_call_put_release_sock_tba, "PUT rls-sk-a") \
@@ -378,6 +387,7 @@
#define E_(a, b) a
enum rxrpc_bundle_trace { rxrpc_bundle_traces } __mode(byte);
+enum rxrpc_call_poke_trace { rxrpc_call_poke_traces } __mode(byte);
enum rxrpc_call_trace { rxrpc_call_traces } __mode(byte);
enum rxrpc_client_trace { rxrpc_client_traces } __mode(byte);
enum rxrpc_congest_change { rxrpc_congest_changes } __mode(byte);
@@ -408,6 +418,7 @@ enum rxrpc_txqueue_trace { rxrpc_txqueue_traces } __mode(byte);
#define E_(a, b) TRACE_DEFINE_ENUM(a);
rxrpc_bundle_traces;
+rxrpc_call_poke_traces;
rxrpc_call_traces;
rxrpc_client_traces;
rxrpc_congest_changes;
@@ -1747,6 +1758,47 @@ TRACE_EVENT(rxrpc_txbuf,
__entry->ref)
);
+TRACE_EVENT(rxrpc_poke_call,
+ TP_PROTO(struct rxrpc_call *call, bool busy,
+ enum rxrpc_call_poke_trace what),
+
+ TP_ARGS(call, busy, what),
+
+ TP_STRUCT__entry(
+ __field(unsigned int, call_debug_id )
+ __field(bool, busy )
+ __field(enum rxrpc_call_poke_trace, what )
+ ),
+
+ TP_fast_assign(
+ __entry->call_debug_id = call->debug_id;
+ __entry->busy = busy;
+ __entry->what = what;
+ ),
+
+ TP_printk("c=%08x %s%s",
+ __entry->call_debug_id,
+ __print_symbolic(__entry->what, rxrpc_call_poke_traces),
+ __entry->busy ? "!" : "")
+ );
+
+TRACE_EVENT(rxrpc_call_poked,
+ TP_PROTO(struct rxrpc_call *call),
+
+ TP_ARGS(call),
+
+ TP_STRUCT__entry(
+ __field(unsigned int, call_debug_id )
+ ),
+
+ TP_fast_assign(
+ __entry->call_debug_id = call->debug_id;
+ ),
+
+ TP_printk("c=%08x",
+ __entry->call_debug_id)
+ );
+
#undef EM
#undef E_
#endif /* _TRACE_RXRPC_H */
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 654e9dab107c..a80655fa9dfb 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -292,6 +292,7 @@ struct rxrpc_local {
struct sk_buff_head reject_queue; /* packets awaiting rejection */
struct sk_buff_head event_queue; /* endpoint event packets awaiting processing */
struct sk_buff_head rx_queue; /* Received packets */
+ struct list_head call_attend_q; /* Calls requiring immediate attention */
struct rb_root client_bundles; /* Client connection bundles by socket params */
spinlock_t client_bundles_lock; /* Lock for client_bundles */
spinlock_t lock; /* access lock */
@@ -616,6 +617,7 @@ struct rxrpc_call {
struct list_head recvmsg_link; /* Link in rx->recvmsg_q */
struct list_head sock_link; /* Link in rx->sock_calls */
struct rb_node sock_node; /* Node in rx->calls */
+ struct list_head attend_link; /* Link in local->call_attend_q */
struct rxrpc_txbuf *tx_pending; /* Tx buffer being filled */
wait_queue_head_t waitq; /* Wait queue for channel or Tx */
s64 tx_total_len; /* Total length left to be transmitted (or -1) */
@@ -843,6 +845,7 @@ extern const char *const rxrpc_call_states[];
extern const char *const rxrpc_call_completions[];
extern struct kmem_cache *rxrpc_call_jar;
+void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what);
struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *, unsigned long);
struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *, gfp_t, unsigned int);
struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *,
@@ -951,7 +954,7 @@ void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
/*
* input.c
*/
-void rxrpc_input_call_packet(struct rxrpc_call *, struct sk_buff *);
+void rxrpc_input_call_event(struct rxrpc_call *, struct sk_buff *);
void rxrpc_input_implicit_end_call(struct rxrpc_sock *, struct rxrpc_connection *,
struct rxrpc_call *);
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index f6d1b3a6f8c6..997641e3d1c8 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -45,6 +45,29 @@ static struct semaphore rxrpc_call_limiter =
static struct semaphore rxrpc_kernel_call_limiter =
__SEMAPHORE_INITIALIZER(rxrpc_kernel_call_limiter, 1000);
+void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what)
+{
+ struct rxrpc_local *local;
+ struct rxrpc_peer *peer = call->peer;
+ bool busy;
+
+ if (WARN_ON_ONCE(!peer))
+ return;
+ local = peer->local;
+
+ if (call->state < RXRPC_CALL_COMPLETE) {
+ spin_lock_bh(&local->lock);
+ busy = !list_empty(&call->attend_link);
+ trace_rxrpc_poke_call(call, busy, what);
+ if (!busy) {
+ rxrpc_get_call(call, rxrpc_call_get_poke);
+ list_add_tail(&call->attend_link, &local->call_attend_q);
+ }
+ spin_unlock_bh(&local->lock);
+ rxrpc_wake_up_io_thread(local);
+ }
+}
+
static void rxrpc_call_timer_expired(struct timer_list *t)
{
struct rxrpc_call *call = from_timer(call, t, timer);
@@ -137,6 +160,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
INIT_LIST_HEAD(&call->accept_link);
INIT_LIST_HEAD(&call->recvmsg_link);
INIT_LIST_HEAD(&call->sock_link);
+ INIT_LIST_HEAD(&call->attend_link);
INIT_LIST_HEAD(&call->tx_buffer);
skb_queue_head_init(&call->recvmsg_queue);
skb_queue_head_init(&call->rx_oos_queue);
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 13c52145a926..036f02371051 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -1017,8 +1017,7 @@ static void rxrpc_input_abort(struct rxrpc_call *call, struct sk_buff *skb)
/*
* Process an incoming call packet.
*/
-void rxrpc_input_call_packet(struct rxrpc_call *call,
- struct sk_buff *skb)
+void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
unsigned long timo;
diff --git a/net/rxrpc/io_thread.c b/net/rxrpc/io_thread.c
index 416c6101cf78..cc249bc6b8cd 100644
--- a/net/rxrpc/io_thread.c
+++ b/net/rxrpc/io_thread.c
@@ -366,7 +366,7 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff *skb)
/* Process a call packet; this either discards or passes on the ref
* elsewhere.
*/
- rxrpc_input_call_packet(call, skb);
+ rxrpc_input_call_event(call, skb);
goto out;
discard:
@@ -413,6 +413,7 @@ int rxrpc_io_thread(void *data)
{
struct sk_buff_head rx_queue;
struct rxrpc_local *local = data;
+ struct rxrpc_call *call;
struct sk_buff *skb;
skb_queue_head_init(&rx_queue);
@@ -422,6 +423,20 @@ int rxrpc_io_thread(void *data)
for (;;) {
rxrpc_inc_stat(local->rxnet, stat_io_loop);
+ /* Deal with calls that want immediate attention. */
+ if ((call = list_first_entry_or_null(&local->call_attend_q,
+ struct rxrpc_call,
+ attend_link))) {
+ spin_lock_bh(&local->lock);
+ list_del_init(&call->attend_link);
+ spin_unlock_bh(&local->lock);
+
+ trace_rxrpc_call_poked(call);
+ rxrpc_input_call_event(call, NULL);
+ rxrpc_put_call(call, rxrpc_call_put_poke);
+ continue;
+ }
+
/* Process received packets and errors. */
if ((skb = __skb_dequeue(&rx_queue))) {
switch (skb->mark) {
@@ -450,7 +465,8 @@ int rxrpc_io_thread(void *data)
}
set_current_state(TASK_INTERRUPTIBLE);
- if (!skb_queue_empty(&local->rx_queue)) {
+ if (!skb_queue_empty(&local->rx_queue) ||
+ !list_empty(&local->call_attend_q)) {
__set_current_state(TASK_RUNNING);
continue;
}
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index 6b4d77219f36..03f491cc23ef 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -104,6 +104,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
skb_queue_head_init(&local->reject_queue);
skb_queue_head_init(&local->event_queue);
skb_queue_head_init(&local->rx_queue);
+ INIT_LIST_HEAD(&local->call_attend_q);
local->client_bundles = RB_ROOT;
spin_lock_init(&local->client_bundles_lock);
spin_lock_init(&local->lock);