[RFC PATCH 3/8] rxrpc: Allow multiple AF_RXRPC sockets to be bound together to form queues
From: David Howells
Date: Thu Jun 23 2022 - 09:29:51 EST
Allow one rxrpc socket to be bound onto another to form a queue. This is
done by allocating a socket and setting it up, then allocating more sockets
and using a sockopt to bind them together:
fd1 = socket(AF_RXRPC, SOCK_DGRAM, IPPROTO_IPV6);
bind(fd1, &address);
listen(fd1, depth);
fd2 = socket(AF_RXRPC, SOCK_DGRAM, IPPROTO_IPV6);
setsockopt(fd2, SOL_RXRPC, RXRPC_BIND_CHANNEL, &fd1, sizeof(fd1));
>From this point:
(1) Each channel must be charged with user call IDs separately. Each
channel has a separate call ID space. A call ID on one channel cannot
be used to send a message on another channel. The same call ID on
different channels refers to different calls.
(2) An incoming call will get bound to the next channel that does a
recvmsg() on an empty queue. All further incoming packets relating to
that call will go to that channel exclusively.
(3) An outgoung client call made on a particular channel will be bound to
that channel.
(4) If a channel is closed, all calls bound to that channel will be
aborted.
(5) Unaccepted incoming calls are held in a queue common to all channels
and is of the depth set by listen(). Each time recvmsg() is called on
a channel, if that channel has at least one charge available, it will
pop an incoming call from that queue, bind the next charge to it,
attach it to the socket and push it onto the tail of the recvmsg
queue.
This can be used as a mechanism to distribute calls between a thread
pool and a mechanism to control the arrival of new calls on any
particular channel. New calls can and will only be collected if the
channel is charged.
Signed-off-by: David Howells <dhowells@xxxxxxxxxx>
---
include/uapi/linux/rxrpc.h | 1 +
net/rxrpc/af_rxrpc.c | 79 +++++++++++++++++++++++++++++++++++++++++++-
net/rxrpc/call_accept.c | 5 ---
net/rxrpc/call_object.c | 2 +
4 files changed, 80 insertions(+), 7 deletions(-)
diff --git a/include/uapi/linux/rxrpc.h b/include/uapi/linux/rxrpc.h
index 8f8dc7a937a4..811923643751 100644
--- a/include/uapi/linux/rxrpc.h
+++ b/include/uapi/linux/rxrpc.h
@@ -36,6 +36,7 @@ struct sockaddr_rxrpc {
#define RXRPC_MIN_SECURITY_LEVEL 4 /* minimum security level */
#define RXRPC_UPGRADEABLE_SERVICE 5 /* Upgrade service[0] -> service[1] */
#define RXRPC_SUPPORTED_CMSG 6 /* Get highest supported control message type */
+#define RXRPC_BIND_CHANNEL 7 /* Bind a socket as an additional recvmsg channel */
/*
* RxRPC control messages
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index 703e10969d2f..6b89a5a969e0 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -733,6 +733,71 @@ int rxrpc_sock_set_upgradeable_service(struct sock *sk, unsigned int val[2])
}
EXPORT_SYMBOL(rxrpc_sock_set_upgradeable_service);
+/*
+ * Bind this socket to another socket that's already set up and listening to
+ * use this as an additional channel for receiving new service calls.
+ */
+static int rxrpc_bind_channel(struct rxrpc_sock *rx2, int fd)
+{
+ struct rxrpc_service *b;
+ struct rxrpc_sock *rx1;
+ struct socket *sock1;
+ unsigned long *call_id_backlog;
+ int ret;
+
+ if (rx2->sk.sk_state != RXRPC_UNBOUND)
+ return -EISCONN;
+ if (rx2->service || rx2->exclusive)
+ return -EINVAL;
+
+ sock1 = sockfd_lookup(fd, &ret);
+ if (!sock1)
+ return ret;
+ rx1 = rxrpc_sk(sock1->sk);
+
+ ret = -EINVAL;
+ if (rx1 == rx2 || rx2->family != rx1->family ||
+ sock_net(&rx2->sk) != sock_net(&rx1->sk))
+ goto error;
+
+ ret = -EISCONN;
+ if (rx1->sk.sk_state != RXRPC_SERVER_LISTENING)
+ goto error;
+
+ ret = -ENOMEM;
+ call_id_backlog = kcalloc(RXRPC_BACKLOG_MAX,
+ sizeof(call_id_backlog[0]),
+ GFP_KERNEL);
+ if (!call_id_backlog)
+ goto error;
+
+ lock_sock_nested(&rx1->sk, 1);
+
+ ret = -EISCONN;
+ if (rx1->sk.sk_state != RXRPC_SERVER_LISTENING)
+ goto error_unlock;
+
+ b = rx1->service;
+ refcount_inc(&b->ref);
+ refcount_inc(&b->active);
+ rx2->service = b;
+ rx2->srx = rx1->srx;
+ rx2->call_id_backlog = call_id_backlog;
+ rx2->min_sec_level = rx1->min_sec_level;
+ rx2->local = rxrpc_get_local(rx1->local);
+ atomic_inc(&rx1->local->active_users);
+ rx2->sk.sk_state = RXRPC_SERVER_LISTENING;
+ call_id_backlog = NULL;
+ ret = 0;
+
+error_unlock:
+ release_sock(&rx1->sk);
+ kfree(call_id_backlog);
+error:
+ fput(sock1->file);
+ return ret;
+}
+
/*
* set RxRPC socket options
*/
@@ -742,7 +807,7 @@ static int rxrpc_setsockopt(struct socket *sock, int level, int optname,
struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
unsigned int min_sec_level;
u16 service_upgrade[2];
- int ret;
+ int ret, fd;
_enter(",%d,%d,,%d", level, optname, optlen);
@@ -817,6 +882,18 @@ static int rxrpc_setsockopt(struct socket *sock, int level, int optname,
goto error;
goto success;
+ case RXRPC_BIND_CHANNEL:
+ ret = -EINVAL;
+ if (optlen != sizeof(fd))
+ goto error;
+ ret = -EFAULT;
+ if (copy_from_sockptr(&fd, optval, sizeof(fd)) != 0)
+ goto error;
+ ret = rxrpc_bind_channel(rx, fd);
+ if (ret < 0)
+ goto error;
+ goto success;
+
default:
break;
}
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index 3cba4dacb8d4..68760a0657a1 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -296,8 +296,6 @@ void rxrpc_deactivate_service(struct rxrpc_sock *rx)
if (!refcount_dec_and_test(&rx->service->active))
return;
- kdebug("-- deactivate --");
-
/* Now that active is 0, make sure that there aren't any incoming calls
* being set up before we clear the preallocation buffers.
*/
@@ -335,12 +333,9 @@ void rxrpc_deactivate_service(struct rxrpc_sock *rx)
head = b->call_backlog_head;
tail = b->call_backlog_tail;
- kdebug("backlog %x %x", head, tail);
while (CIRC_CNT(head, tail, size) > 0) {
struct rxrpc_call *call = b->call_backlog[tail];
- kdebug("discard c=%08x", call->debug_id);
-
trace_rxrpc_call(call->debug_id, rxrpc_call_discard,
refcount_read(&call->ref),
NULL, (const void *)call->user_call_ID);
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index e90b205a6c0f..4ee98ac689f9 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -712,7 +712,7 @@ void rxrpc_cleanup_call(struct rxrpc_call *call)
ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
if (WARN_ON(!test_bit(RXRPC_CALL_RELEASED, &call->flags))) {
- kdebug("### UNRELEASED c=%08x", call->debug_id);
+ pr_warn("### UNRELEASED c=%08x", call->debug_id);
return;
}