Re: [RFC PATCH net-next 10/11] net: enetc: add RX support for zero-copy XDP sockets

From: Maciej Fijalkowski
Date: Wed Feb 08 2023 - 11:37:03 EST


On Mon, Feb 06, 2023 at 12:08:36PM +0200, Vladimir Oltean wrote:
> Add support for filling an RX ring with buffers coming from an XSK umem.
> Although enetc has up to 8 RX rings, we still use one of the 2 per-CPU
> RX rings for XSK.
>
> To set up an XSK pool on one of the RX queues, we use the
> reconfiguration procedure which temporarily stops the rings.
>
> Since the RX procedure in the NAPI poll function is completely different
> (both the API for creating an xdp_buff, as well as refilling the ring
> with memory from user space), create a separate enetc_clean_rx_ring_xsk()
> function which gets called when we have both an XSK pool and an XDK
> program on this RX queue.
>
> Signed-off-by: Vladimir Oltean <vladimir.oltean@xxxxxxx>
> ---
> drivers/net/ethernet/freescale/enetc/enetc.c | 377 +++++++++++++++++-
> drivers/net/ethernet/freescale/enetc/enetc.h | 3 +
> .../net/ethernet/freescale/enetc/enetc_pf.c | 1 +
> 3 files changed, 373 insertions(+), 8 deletions(-)
>
> diff --git a/drivers/net/ethernet/freescale/enetc/enetc.c b/drivers/net/ethernet/freescale/enetc/enetc.c
> index dee432cacf85..3990c006c011 100644
> --- a/drivers/net/ethernet/freescale/enetc/enetc.c
> +++ b/drivers/net/ethernet/freescale/enetc/enetc.c
> @@ -10,6 +10,7 @@
> #include <net/ip6_checksum.h>
> #include <net/pkt_sched.h>
> #include <net/tso.h>
> +#include <net/xdp_sock_drv.h>
>
> u32 enetc_port_mac_rd(struct enetc_si *si, u32 reg)
> {
> @@ -103,6 +104,9 @@ static void enetc_free_rx_swbd(struct enetc_bdr *rx_ring,
> rx_swbd->dir);
> __free_page(rx_swbd->page);
> rx_swbd->page = NULL;
> + } else if (rx_swbd->xsk_buff) {
> + xsk_buff_free(rx_swbd->xsk_buff);
> + rx_swbd->xsk_buff = NULL;
> }
> }
>
> @@ -979,6 +983,44 @@ static int enetc_refill_rx_ring(struct enetc_bdr *rx_ring, const int buff_cnt)
> return j;
> }
>
> +static int enetc_refill_rx_ring_xsk(struct enetc_bdr *rx_ring, int buff_cnt)
> +{
> + struct xsk_buff_pool *pool = rx_ring->xdp.xsk_pool;
> + struct enetc_rx_swbd *rx_swbd;
> + struct xdp_buff *xsk_buff;
> + union enetc_rx_bd *rxbd;
> + int i, j;
> +
> + i = rx_ring->next_to_use;
> + rxbd = enetc_rxbd(rx_ring, i);
> +
> + for (j = 0; j < buff_cnt; j++) {
> + xsk_buff = xsk_buff_alloc(pool); // TODO use _batch?

yes, use batch:P

> + if (!xsk_buff)
> + break;
> +
> + rx_swbd = &rx_ring->rx_swbd[i];
> + rx_swbd->xsk_buff = xsk_buff;
> + rx_swbd->dma = xsk_buff_xdp_get_dma(xsk_buff);
> +
> + /* update RxBD */
> + rxbd->w.addr = cpu_to_le64(rx_swbd->dma);
> + /* clear 'R" as well */
> + rxbd->r.lstatus = 0;
> +
> + enetc_rxbd_next(rx_ring, &rxbd, &i);
> + }
> +
> + if (likely(j)) {
> + rx_ring->next_to_use = i;
> +
> + /* update ENETC's consumer index */
> + enetc_wr_reg_hot(rx_ring->rcir, rx_ring->next_to_use);
> + }
> +
> + return j;
> +}
> +
> #ifdef CONFIG_FSL_ENETC_PTP_CLOCK
> static void enetc_get_rx_tstamp(struct net_device *ndev,
> union enetc_rx_bd *rxbd,
> @@ -1128,6 +1170,18 @@ static void enetc_add_rx_buff_to_skb(struct enetc_bdr *rx_ring, int i,
> enetc_flip_rx_buff(rx_ring, rx_swbd);
> }
>
> +static void enetc_put_rx_swbd(struct enetc_bdr *rx_ring, int i)
> +{
> + struct enetc_rx_swbd *rx_swbd = &rx_ring->rx_swbd[i];
> +
> + if (rx_swbd->xsk_buff) {
> + xsk_buff_free(rx_swbd->xsk_buff);
> + rx_swbd->xsk_buff = NULL;
> + } else {
> + enetc_put_rx_buff(rx_ring, rx_swbd);
> + }
> +}
> +
> static bool enetc_check_bd_errors_and_consume(struct enetc_bdr *rx_ring,
> u32 bd_status,
> union enetc_rx_bd **rxbd, int *i,
> @@ -1136,7 +1190,7 @@ static bool enetc_check_bd_errors_and_consume(struct enetc_bdr *rx_ring,
> if (likely(!(bd_status & ENETC_RXBD_LSTATUS(ENETC_RXBD_ERR_MASK))))
> return false;
>
> - enetc_put_rx_buff(rx_ring, &rx_ring->rx_swbd[*i]);
> + enetc_put_rx_swbd(rx_ring, *i);
> (*buffs_missing)++;
> enetc_rxbd_next(rx_ring, rxbd, i);
>
> @@ -1144,7 +1198,7 @@ static bool enetc_check_bd_errors_and_consume(struct enetc_bdr *rx_ring,
> dma_rmb();
> bd_status = le32_to_cpu((*rxbd)->r.lstatus);
>
> - enetc_put_rx_buff(rx_ring, &rx_ring->rx_swbd[*i]);
> + enetc_put_rx_swbd(rx_ring, *i);
> (*buffs_missing)++;
> enetc_rxbd_next(rx_ring, rxbd, i);
> }
> @@ -1484,6 +1538,43 @@ static void enetc_build_xdp_buff(struct enetc_bdr *rx_ring, u32 bd_status,
> }
> }
>
> +static struct xdp_buff *enetc_build_xsk_buff(struct xsk_buff_pool *pool,
> + struct enetc_bdr *rx_ring,
> + u32 bd_status,
> + union enetc_rx_bd **rxbd, int *i,
> + int *buffs_missing, int *rx_byte_cnt)
> +{
> + struct enetc_rx_swbd *rx_swbd = &rx_ring->rx_swbd[*i];
> + u16 size = le16_to_cpu((*rxbd)->r.buf_len);
> + struct xdp_buff *xsk_buff;
> +
> + /* Multi-buffer frames are not supported in XSK mode */

Nice! I realized we need to forbid that on ice now.

> + if (unlikely(!(bd_status & ENETC_RXBD_LSTATUS_F))) {
> + while (!(bd_status & ENETC_RXBD_LSTATUS_F)) {
> + enetc_put_rx_swbd(rx_ring, *i);
> +
> + (*buffs_missing)++;
> + enetc_rxbd_next(rx_ring, rxbd, i);
> + dma_rmb();
> + bd_status = le32_to_cpu((*rxbd)->r.lstatus);
> + }
> +
> + return NULL;
> + }
> +
> + xsk_buff = rx_swbd->xsk_buff;
> + xsk_buff_set_size(xsk_buff, size);
> + xsk_buff_dma_sync_for_cpu(xsk_buff, pool);
> +
> + rx_swbd->xsk_buff = NULL;
> +
> + (*buffs_missing)++;
> + (*rx_byte_cnt) += size;
> + enetc_rxbd_next(rx_ring, rxbd, i);
> +
> + return xsk_buff;
> +}
> +
> /* Convert RX buffer descriptors to TX buffer descriptors. These will be
> * recycled back into the RX ring in enetc_clean_tx_ring.
> */
> @@ -1659,11 +1750,136 @@ static int enetc_clean_rx_ring_xdp(struct enetc_bdr *rx_ring,
> return rx_frm_cnt;
> }
>
> +static void enetc_xsk_buff_to_skb(struct xdp_buff *xsk_buff,
> + struct enetc_bdr *rx_ring,
> + union enetc_rx_bd *rxbd,
> + struct napi_struct *napi)
> +{
> + size_t len = xdp_get_buff_len(xsk_buff);
> + struct sk_buff *skb;
> +
> + skb = napi_alloc_skb(napi, len);
> + if (unlikely(!skb)) {
> + rx_ring->stats.rx_alloc_errs++;
> + goto out;
> + }
> +
> + skb_put_data(skb, xsk_buff->data, len);
> +
> + enetc_get_offloads(rx_ring, rxbd, skb);
> +
> + skb_record_rx_queue(skb, rx_ring->index);
> + skb->protocol = eth_type_trans(skb, rx_ring->ndev);
> +
> + rx_ring->stats.packets += skb->len;
> + rx_ring->stats.bytes++;
> +
> + napi_gro_receive(napi, skb);
> +out:
> + xsk_buff_free(xsk_buff);
> +}
> +
> +static int enetc_clean_rx_ring_xsk(struct enetc_bdr *rx_ring,
> + struct napi_struct *napi, int work_limit,
> + struct bpf_prog *prog,
> + struct xsk_buff_pool *pool)
> +{
> + struct net_device *ndev = rx_ring->ndev;
> + union enetc_rx_bd *rxbd, *orig_rxbd;
> + int rx_frm_cnt = 0, rx_byte_cnt = 0;
> + int xdp_redirect_frm_cnt = 0;
> + struct xdp_buff *xsk_buff;
> + int buffs_missing, err, i;
> + bool wakeup_xsk = false;
> + u32 bd_status, xdp_act;
> +
> + buffs_missing = enetc_bd_unused(rx_ring);
> + /* next descriptor to process */
> + i = rx_ring->next_to_clean;
> +
> + while (likely(rx_frm_cnt < work_limit)) {
> + if (buffs_missing >= ENETC_RXBD_BUNDLE) {
> + buffs_missing -= enetc_refill_rx_ring_xsk(rx_ring,
> + buffs_missing);
> + wakeup_xsk |= (buffs_missing != 0);
> + }
> +
> + rxbd = enetc_rxbd(rx_ring, i);
> + bd_status = le32_to_cpu(rxbd->r.lstatus);
> + if (!bd_status)
> + break;
> +
> + enetc_wr_reg_hot(rx_ring->idr, BIT(rx_ring->index));
> + dma_rmb(); /* for reading other rxbd fields */
> +
> + if (enetc_check_bd_errors_and_consume(rx_ring, bd_status,
> + &rxbd, &i,
> + &buffs_missing))
> + continue;
> +
> + orig_rxbd = rxbd;
> +
> + xsk_buff = enetc_build_xsk_buff(pool, rx_ring, bd_status,
> + &rxbd, &i, &buffs_missing,
> + &rx_byte_cnt);
> + if (!xsk_buff)
> + continue;
> +
> + xdp_act = bpf_prog_run_xdp(prog, xsk_buff);
> + switch (xdp_act) {
> + default:
> + bpf_warn_invalid_xdp_action(ndev, prog, xdp_act);
> + fallthrough;
> + case XDP_ABORTED:
> + trace_xdp_exception(ndev, prog, xdp_act);
> + fallthrough;
> + case XDP_DROP:
> + xsk_buff_free(xsk_buff);
> + break;
> + case XDP_PASS:
> + enetc_xsk_buff_to_skb(xsk_buff, rx_ring, orig_rxbd,
> + napi);
> + break;
> + case XDP_REDIRECT:
> + err = xdp_do_redirect(ndev, xsk_buff, prog);
> + if (unlikely(err)) {
> + if (err == -ENOBUFS)
> + wakeup_xsk = true;
> + xsk_buff_free(xsk_buff);
> + rx_ring->stats.xdp_redirect_failures++;
> + } else {
> + xdp_redirect_frm_cnt++;
> + rx_ring->stats.xdp_redirect++;
> + }

no XDP_TX support? I don't see it being added on next patch.

> + }
> +
> + rx_frm_cnt++;
> + }
> +
> + rx_ring->next_to_clean = i;
> +
> + rx_ring->stats.packets += rx_frm_cnt;
> + rx_ring->stats.bytes += rx_byte_cnt;
> +
> + if (xdp_redirect_frm_cnt)
> + xdp_do_flush_map();
> +
> + if (xsk_uses_need_wakeup(pool)) {
> + if (wakeup_xsk)
> + xsk_set_rx_need_wakeup(pool);
> + else
> + xsk_clear_rx_need_wakeup(pool);
> + }
> +
> + return rx_frm_cnt;
> +}
> +
> static int enetc_poll(struct napi_struct *napi, int budget)
> {
> struct enetc_int_vector
> *v = container_of(napi, struct enetc_int_vector, napi);
> struct enetc_bdr *rx_ring = &v->rx_ring;
> + struct xsk_buff_pool *pool;
> struct bpf_prog *prog;
> bool complete = true;
> int work_done;
> @@ -1676,10 +1892,15 @@ static int enetc_poll(struct napi_struct *napi, int budget)
> complete = false;
>
> prog = rx_ring->xdp.prog;
> - if (prog)

(...)