Re: [PATCH v5] Bluetooth: compute LE flow credits based on recvbuf space

From: Luiz Augusto von Dentz
Date: Wed Apr 10 2024 - 10:40:30 EST


Hi Sebastian,

On Mon, Apr 8, 2024 at 9:13 AM Sebastian Urban <surban@xxxxxxxxxx> wrote:
>
> Previously LE flow credits were returned to the
> sender even if the socket's receive buffer was
> full. This meant that no back-pressure
> was applied to the sender, thus it continued to
> send data, resulting in data loss without any
> error being reported. Furthermore, the amount
> of credits was essentially fixed to a small amount,
> leading to reduced performance.
>
> This is fixed by computing the number of returned
> LE flow credits based on the available space in the
> receive buffer of an L2CAP socket. Consequently,
> if the receive buffer is full, no credits are returned
> until the buffer is read and thus cleared by user-space.
>
> Since the computation of available
> receive buffer space can only be performed
> approximately, i.e. sk_buff overhead is ignored,
> and the receive buffer size may be changed by
> user-space after flow credits have been sent,
> superfluous received data is temporary stored within
> l2cap_pinfo. This is necessary because Bluetooth LE
> provides no retransmission mechanism once the
> data has been acked by the physical layer.
>
> Signed-off-by: Sebastian Urban <surban@xxxxxxxxxx>
> ---
> include/net/bluetooth/l2cap.h | 10 +++++-
> net/bluetooth/l2cap_core.c | 51 ++++++++++++++++++++++----
> net/bluetooth/l2cap_sock.c | 67 +++++++++++++++++++++++++----------
> 3 files changed, 103 insertions(+), 25 deletions(-)
>
> diff --git a/include/net/bluetooth/l2cap.h b/include/net/bluetooth/l2cap.h
> index 3f4057ced971..bc6ff40ebc9f 100644
> --- a/include/net/bluetooth/l2cap.h
> +++ b/include/net/bluetooth/l2cap.h
> @@ -547,6 +547,8 @@ struct l2cap_chan {
>
> __u16 tx_credits;
> __u16 rx_credits;
> + int rx_avail;
> + int rx_staged;

I'd probably use size_t for these above, and put some comments of what
they refer to e.g. rx_staged is what has been received already, right?
Couldn't we use chan->sdu->len instead?

>
> __u8 tx_state;
> __u8 rx_state;
> @@ -682,10 +684,15 @@ struct l2cap_user {
> /* ----- L2CAP socket info ----- */
> #define l2cap_pi(sk) ((struct l2cap_pinfo *) sk)
>
> +struct l2cap_rx_busy {
> + struct list_head list;
> + struct sk_buff *skb;
> +};
> +
> struct l2cap_pinfo {
> struct bt_sock bt;
> struct l2cap_chan *chan;
> - struct sk_buff *rx_busy_skb;
> + struct list_head rx_busy;
> };
>
> enum {
> @@ -944,6 +951,7 @@ int l2cap_chan_reconfigure(struct l2cap_chan *chan, __u16 mtu);
> int l2cap_chan_send(struct l2cap_chan *chan, struct msghdr *msg, size_t len,
> const struct sockcm_cookie *sockc);
> void l2cap_chan_busy(struct l2cap_chan *chan, int busy);
> +void l2cap_chan_rx_avail(struct l2cap_chan *chan, int rx_avail);
> int l2cap_chan_check_security(struct l2cap_chan *chan, bool initiator);
> void l2cap_chan_set_defaults(struct l2cap_chan *chan);
> int l2cap_ertm_init(struct l2cap_chan *chan);
> diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c
> index b0970462a689..7bad232d40ba 100644
> --- a/net/bluetooth/l2cap_core.c
> +++ b/net/bluetooth/l2cap_core.c
> @@ -454,6 +454,9 @@ struct l2cap_chan *l2cap_chan_create(void)
> /* Set default lock nesting level */
> atomic_set(&chan->nesting, L2CAP_NESTING_NORMAL);
>
> + /* Available receive buffer space is initially unknown */
> + chan->rx_avail = -1;
> +
> write_lock(&chan_list_lock);
> list_add(&chan->global_l, &chan_list);
> write_unlock(&chan_list_lock);
> @@ -535,6 +538,26 @@ void l2cap_chan_set_defaults(struct l2cap_chan *chan)
> }
> EXPORT_SYMBOL_GPL(l2cap_chan_set_defaults);
>
> +static __u16 l2cap_le_rx_credits(struct l2cap_chan *chan)
> +{
> + if (chan->mps == 0)
> + return 0;
> +
> + /* If we don't know the available space in the receiver buffer, give
> + * enough credits for a full packet.
> + */
> + if (chan->rx_avail == -1)
> + return (chan->imtu / chan->mps) + 1;
> +
> + /* If we know how much space is available in the receive buffer, give
> + * out as many credits as would fill the buffer.
> + */
> + if (chan->rx_avail <= chan->rx_staged)
> + return 0;

Missing space.

> + return min_t(int, U16_MAX,
> + (chan->rx_avail - chan->rx_staged) / chan->mps);

We probably need to use DIV_ROUND_UP since the division can return 0
or is that intentional since that means we cannot store another full
mps? I think I'd give it a credit since this may impact the throughput
if we are not given credits when just a few bytes would be necessary
and in any case we would store the extra bytes in the rx_busy list if
that is over the rx_avail.

> +}
> +
> static void l2cap_le_flowctl_init(struct l2cap_chan *chan, u16 tx_credits)
> {
> chan->sdu = NULL;
> @@ -543,8 +566,7 @@ static void l2cap_le_flowctl_init(struct l2cap_chan *chan, u16 tx_credits)
> chan->tx_credits = tx_credits;
> /* Derive MPS from connection MTU to stop HCI fragmentation */
> chan->mps = min_t(u16, chan->imtu, chan->conn->mtu - L2CAP_HDR_SIZE);
> - /* Give enough credits for a full packet */
> - chan->rx_credits = (chan->imtu / chan->mps) + 1;
> + chan->rx_credits = l2cap_le_rx_credits(chan);
>
> skb_queue_head_init(&chan->tx_q);
> }
> @@ -556,7 +578,7 @@ static void l2cap_ecred_init(struct l2cap_chan *chan, u16 tx_credits)
> /* L2CAP implementations shall support a minimum MPS of 64 octets */
> if (chan->mps < L2CAP_ECRED_MIN_MPS) {
> chan->mps = L2CAP_ECRED_MIN_MPS;
> - chan->rx_credits = (chan->imtu / chan->mps) + 1;
> + chan->rx_credits = l2cap_le_rx_credits(chan);
> }
> }
>
> @@ -6520,9 +6542,7 @@ static void l2cap_chan_le_send_credits(struct l2cap_chan *chan)
> {
> struct l2cap_conn *conn = chan->conn;
> struct l2cap_le_credits pkt;
> - u16 return_credits;
> -
> - return_credits = (chan->imtu / chan->mps) + 1;
> + u16 return_credits = l2cap_le_rx_credits(chan);
>
> if (chan->rx_credits >= return_credits)
> return;
> @@ -6541,6 +6561,16 @@ static void l2cap_chan_le_send_credits(struct l2cap_chan *chan)
> l2cap_send_cmd(conn, chan->ident, L2CAP_LE_CREDITS, sizeof(pkt), &pkt);
> }
>
> +void l2cap_chan_rx_avail(struct l2cap_chan *chan, int rx_avail)
> +{
> + BT_DBG("chan %p has %d bytes avail for rx", chan, rx_avail);

We should probably check if the value has changed though, or this
shall only be called when the buffer changes?

> + chan->rx_avail = rx_avail;
> +
> + if (chan->state == BT_CONNECTED)
> + l2cap_chan_le_send_credits(chan);
> +}
> +
> static int l2cap_ecred_recv(struct l2cap_chan *chan, struct sk_buff *skb)
> {
> int err;
> @@ -6550,6 +6580,14 @@ static int l2cap_ecred_recv(struct l2cap_chan *chan, struct sk_buff *skb)
> /* Wait recv to confirm reception before updating the credits */
> err = chan->ops->recv(chan, skb);
>
> + chan->rx_staged = 0;
> +
> + if (err < 0 && chan->rx_avail != -1) {
> + BT_ERR("Queueing received LE L2CAP data failed");
> + l2cap_send_disconn_req(chan, ECONNRESET);
> + return err;
> + }
> +
> /* Update credits whenever an SDU is received */
> l2cap_chan_le_send_credits(chan);
>
> @@ -6571,6 +6609,7 @@ static int l2cap_ecred_data_rcv(struct l2cap_chan *chan, struct sk_buff *skb)
> return -ENOBUFS;
> }
>
> + chan->rx_staged += skb->len;
> chan->rx_credits--;
> BT_DBG("rx_credits %u -> %u", chan->rx_credits + 1, chan->rx_credits);
>
> diff --git a/net/bluetooth/l2cap_sock.c b/net/bluetooth/l2cap_sock.c
> index 7846a068bf60..46603605cb69 100644
> --- a/net/bluetooth/l2cap_sock.c
> +++ b/net/bluetooth/l2cap_sock.c
> @@ -1157,6 +1157,7 @@ static int l2cap_sock_recvmsg(struct socket *sock, struct msghdr *msg,
> {
> struct sock *sk = sock->sk;
> struct l2cap_pinfo *pi = l2cap_pi(sk);
> + int avail;
> int err;
>
> if (unlikely(flags & MSG_ERRQUEUE))
> @@ -1192,28 +1193,34 @@ static int l2cap_sock_recvmsg(struct socket *sock, struct msghdr *msg,
> else
> err = bt_sock_recvmsg(sock, msg, len, flags);
>
> - if (pi->chan->mode != L2CAP_MODE_ERTM)
> + if (pi->chan->mode != L2CAP_MODE_ERTM &&
> + pi->chan->mode != L2CAP_MODE_LE_FLOWCTL &&
> + pi->chan->mode != L2CAP_MODE_EXT_FLOWCTL)
> return err;
>
> - /* Attempt to put pending rx data in the socket buffer */
> -
> lock_sock(sk);
>
> - if (!test_bit(CONN_LOCAL_BUSY, &pi->chan->conn_state))
> - goto done;
> + avail = max(0, sk->sk_rcvbuf - atomic_read(&sk->sk_rmem_alloc));
> + l2cap_chan_rx_avail(pi->chan, avail);
>
> - if (pi->rx_busy_skb) {
> - if (!__sock_queue_rcv_skb(sk, pi->rx_busy_skb))
> - pi->rx_busy_skb = NULL;
> - else
> + /* Attempt to put pending rx data in the socket buffer */
> + while (!list_empty(&pi->rx_busy)) {
> + struct l2cap_rx_busy *rx_busy =
> + list_first_entry(&pi->rx_busy,
> + struct l2cap_rx_busy,
> + list);
> + if (__sock_queue_rcv_skb(sk, rx_busy->skb) < 0)
> goto done;
> + list_del(&rx_busy->list);
> + kfree(rx_busy);
> }
>
> /* Restore data flow when half of the receive buffer is
> * available. This avoids resending large numbers of
> * frames.
> */
> - if (atomic_read(&sk->sk_rmem_alloc) <= sk->sk_rcvbuf >> 1)
> + if (test_bit(CONN_LOCAL_BUSY, &pi->chan->conn_state) &&
> + atomic_read(&sk->sk_rmem_alloc) <= sk->sk_rcvbuf >> 1)
> l2cap_chan_busy(pi->chan, 0);
>
> done:
> @@ -1474,17 +1481,21 @@ static struct l2cap_chan *l2cap_sock_new_connection_cb(struct l2cap_chan *chan)
> static int l2cap_sock_recv_cb(struct l2cap_chan *chan, struct sk_buff *skb)
> {
> struct sock *sk = chan->data;
> + struct l2cap_pinfo *pi = l2cap_pi(sk);
> + int avail;
> int err;
>
> lock_sock(sk);
>
> - if (l2cap_pi(sk)->rx_busy_skb) {
> + if (chan->mode == L2CAP_MODE_ERTM && !list_empty(&pi->rx_busy)) {
> err = -ENOMEM;
> goto done;
> }
>
> if (chan->mode != L2CAP_MODE_ERTM &&
> - chan->mode != L2CAP_MODE_STREAMING) {
> + chan->mode != L2CAP_MODE_STREAMING &&
> + chan->mode != L2CAP_MODE_LE_FLOWCTL &&
> + chan->mode != L2CAP_MODE_EXT_FLOWCTL) {
> /* Even if no filter is attached, we could potentially
> * get errors from security modules, etc.
> */
> @@ -1495,7 +1506,10 @@ static int l2cap_sock_recv_cb(struct l2cap_chan *chan, struct sk_buff *skb)
>
> err = __sock_queue_rcv_skb(sk, skb);
>
> - /* For ERTM, handle one skb that doesn't fit into the recv
> + avail = max(0, sk->sk_rcvbuf - atomic_read(&sk->sk_rmem_alloc));
> + l2cap_chan_rx_avail(chan, avail);
> +
> + /* For ERTM and LE, handle a skb that doesn't fit into the recv
> * buffer. This is important to do because the data frames
> * have already been acked, so the skb cannot be discarded.
> *
> @@ -1504,8 +1518,18 @@ static int l2cap_sock_recv_cb(struct l2cap_chan *chan, struct sk_buff *skb)
> * acked and reassembled until there is buffer space
> * available.
> */
> - if (err < 0 && chan->mode == L2CAP_MODE_ERTM) {
> - l2cap_pi(sk)->rx_busy_skb = skb;
> + if (err < 0 &&
> + (chan->mode == L2CAP_MODE_ERTM ||
> + chan->mode == L2CAP_MODE_LE_FLOWCTL ||
> + chan->mode == L2CAP_MODE_EXT_FLOWCTL)) {
> + struct l2cap_rx_busy *rx_busy =
> + kmalloc(sizeof(*rx_busy), GFP_KERNEL);
> + if (!rx_busy) {
> + err = -ENOMEM;
> + goto done;
> + }
> + rx_busy->skb = skb;
> + list_add_tail(&rx_busy->list, &pi->rx_busy);
> l2cap_chan_busy(chan, 1);
> err = 0;
> }
> @@ -1731,6 +1755,8 @@ static const struct l2cap_ops l2cap_chan_ops = {
>
> static void l2cap_sock_destruct(struct sock *sk)
> {
> + struct l2cap_rx_busy *rx_busy, *next;
> +
> BT_DBG("sk %p", sk);
>
> if (l2cap_pi(sk)->chan) {
> @@ -1738,9 +1764,10 @@ static void l2cap_sock_destruct(struct sock *sk)
> l2cap_chan_put(l2cap_pi(sk)->chan);
> }
>
> - if (l2cap_pi(sk)->rx_busy_skb) {
> - kfree_skb(l2cap_pi(sk)->rx_busy_skb);
> - l2cap_pi(sk)->rx_busy_skb = NULL;
> + list_for_each_entry_safe(rx_busy, next, &l2cap_pi(sk)->rx_busy, list) {
> + kfree_skb(rx_busy->skb);
> + list_del(&rx_busy->list);
> + kfree(rx_busy);
> }
>
> skb_queue_purge(&sk->sk_receive_queue);
> @@ -1845,6 +1872,8 @@ static struct sock *l2cap_sock_alloc(struct net *net, struct socket *sock,
> sk->sk_destruct = l2cap_sock_destruct;
> sk->sk_sndtimeo = L2CAP_CONN_TIMEOUT;
>
> + INIT_LIST_HEAD(&l2cap_pi(sk)->rx_busy);
> +
> chan = l2cap_chan_create();
> if (!chan) {
> sk_free(sk);
> @@ -1853,6 +1882,8 @@ static struct sock *l2cap_sock_alloc(struct net *net, struct socket *sock,
>
> l2cap_chan_hold(chan);
>
> + l2cap_chan_rx_avail(chan, sk->sk_rcvbuf);
> +
> l2cap_pi(sk)->chan = chan;
>
> return sk;
> --
> 2.34.1
>


--
Luiz Augusto von Dentz