[PATCH net-next v3 3/8] bpf: cpumap: switch to GRO from netif_receive_skb_list()

From: Alexander Lobakin
Date: Wed Jan 15 2025 - 10:20:55 EST


cpumap has its own BH context based on kthread. It has a sane batch
size of 8 frames per one cycle.
GRO can be used here on its own. Adjust cpumap calls to the upper stack
to use GRO API instead of netif_receive_skb_list() which processes skbs
by batches, but doesn't involve GRO layer at all.
In plenty of tests, GRO performs better than listed receiving even
given that it has to calculate full frame checksums on the CPU.
As GRO passes the skbs to the upper stack in the batches of
@gro_normal_batch, i.e. 8 by default, and skb->dev points to the
device where the frame comes from, it is enough to disable GRO
netdev feature on it to completely restore the original behaviour:
untouched frames will be being bulked and passed to the upper stack
by 8, as it was with netif_receive_skb_list().

Signed-off-by: Alexander Lobakin <aleksander.lobakin@xxxxxxxxx>
Tested-by: Daniel Xu <dxu@xxxxxxxxx>
---
kernel/bpf/cpumap.c | 45 ++++++++++++++++++++++++++++++++++++++++++---
1 file changed, 42 insertions(+), 3 deletions(-)

diff --git a/kernel/bpf/cpumap.c b/kernel/bpf/cpumap.c
index 774accbd4a22..10d062dddb6f 100644
--- a/kernel/bpf/cpumap.c
+++ b/kernel/bpf/cpumap.c
@@ -33,8 +33,8 @@
#include <trace/events/xdp.h>
#include <linux/btf_ids.h>

-#include <linux/netdevice.h> /* netif_receive_skb_list */
-#include <linux/etherdevice.h> /* eth_type_trans */
+#include <linux/netdevice.h>
+#include <net/gro.h>

/* General idea: XDP packets getting XDP redirected to another CPU,
* will maximum be stored/queued for one driver ->poll() call. It is
@@ -68,6 +68,7 @@ struct bpf_cpu_map_entry {

struct bpf_cpumap_val value;
struct bpf_prog *prog;
+ struct gro_node gro;

struct completion kthread_running;
struct rcu_work free_work;
@@ -261,10 +262,36 @@ static int cpu_map_bpf_prog_run(struct bpf_cpu_map_entry *rcpu, void **frames,
return nframes;
}

+static void cpu_map_gro_receive(struct bpf_cpu_map_entry *rcpu,
+ struct list_head *list)
+{
+ struct sk_buff *skb, *tmp;
+
+ list_for_each_entry_safe(skb, tmp, list, list) {
+ skb_list_del_init(skb);
+ gro_receive_skb(&rcpu->gro, skb);
+ }
+}
+
+static void cpu_map_gro_flush(struct bpf_cpu_map_entry *rcpu, bool empty)
+{
+ /*
+ * If the ring is not empty, there'll be a new iteration soon, and we
+ * only need to do a full flush if a tick is long (> 1 ms).
+ * If the ring is empty, to not hold GRO packets in the stack for too
+ * long, do a full flush.
+ * This is equivalent to how NAPI decides whether to perform a full
+ * flush.
+ */
+ gro_flush(&rcpu->gro, !empty && HZ >= 1000);
+ gro_normal_list(&rcpu->gro);
+}
+
static int cpu_map_kthread_run(void *data)
{
struct bpf_cpu_map_entry *rcpu = data;
unsigned long last_qs = jiffies;
+ u32 packets = 0;

complete(&rcpu->kthread_running);
set_current_state(TASK_INTERRUPTIBLE);
@@ -282,6 +309,7 @@ static int cpu_map_kthread_run(void *data)
void *frames[CPUMAP_BATCH];
void *skbs[CPUMAP_BATCH];
LIST_HEAD(list);
+ bool empty;

/* Release CPU reschedule checks */
if (__ptr_ring_empty(rcpu->queue)) {
@@ -361,7 +389,15 @@ static int cpu_map_kthread_run(void *data)
trace_xdp_cpumap_kthread(rcpu->map_id, n, kmem_alloc_drops,
sched, &stats);

- netif_receive_skb_list(&list);
+ cpu_map_gro_receive(rcpu, &list);
+
+ /* Flush either every 64 packets or in case of empty ring */
+ empty = __ptr_ring_empty(rcpu->queue);
+ if (packets += n >= NAPI_POLL_WEIGHT || empty) {
+ cpu_map_gro_flush(rcpu, empty);
+ packets = 0;
+ }
+
local_bh_enable(); /* resched point, may call do_softirq() */
}
__set_current_state(TASK_RUNNING);
@@ -430,6 +466,7 @@ __cpu_map_entry_alloc(struct bpf_map *map, struct bpf_cpumap_val *value,
rcpu->cpu = cpu;
rcpu->map_id = map->id;
rcpu->value.qsize = value->qsize;
+ gro_init(&rcpu->gro);

if (fd > 0 && __cpu_map_load_bpf_program(rcpu, map, fd))
goto free_ptr_ring;
@@ -458,6 +495,7 @@ __cpu_map_entry_alloc(struct bpf_map *map, struct bpf_cpumap_val *value,
if (rcpu->prog)
bpf_prog_put(rcpu->prog);
free_ptr_ring:
+ gro_cleanup(&rcpu->gro);
ptr_ring_cleanup(rcpu->queue, NULL);
free_queue:
kfree(rcpu->queue);
@@ -487,6 +525,7 @@ static void __cpu_map_entry_free(struct work_struct *work)

if (rcpu->prog)
bpf_prog_put(rcpu->prog);
+ gro_cleanup(&rcpu->gro);
/* The queue should be empty at this point */
__cpu_map_ring_cleanup(rcpu->queue);
ptr_ring_cleanup(rcpu->queue, NULL);
--
2.48.0