[PATCH RFC bpf-next 32/52] bpf, cpumap: switch to GRO from netif_receive_skb_list()

From: Alexander Lobakin
Date: Tue Jun 28 2022 - 15:56:41 EST


cpumap has its own BH context based on kthread. It has a sane batch
size of 8 frames per one cycle.
GRO can be used on its own, adjust cpumap calls to the
upper stack to use GRO API instead of netif_receive_skb_list() which
processes skbs by batches, but doesn't involve GRO layer at all.
It is most beneficial when a NIC which frame come from is XDP
generic metadata-enabled, but in plenty of tests GRO performs better
than listed receiving even given that it has to calculate full frame
checksums on CPU.
As GRO passes the skbs to the upper stack in the batches of
@gro_normal_batch, i.e. 8 by default, and @skb->dev point to the
device where the frame comes from, it is enough to disable GRO
netdev feature on it to completely restore the original behaviour:
untouched frames will be being bulked and passed to the upper stack
by 8, as it was with netif_receive_skb_list().

Signed-off-by: Alexander Lobakin <alexandr.lobakin@xxxxxxxxx>
---
kernel/bpf/cpumap.c | 43 ++++++++++++++++++++++++++++++++++++++-----
1 file changed, 38 insertions(+), 5 deletions(-)

diff --git a/kernel/bpf/cpumap.c b/kernel/bpf/cpumap.c
index f4860ac756cd..2d0edf8f6a05 100644
--- a/kernel/bpf/cpumap.c
+++ b/kernel/bpf/cpumap.c
@@ -29,8 +29,8 @@
#include <trace/events/xdp.h>
#include <linux/btf_ids.h>

-#include <linux/netdevice.h> /* netif_receive_skb_list */
-#include <linux/etherdevice.h> /* eth_type_trans */
+#include <linux/netdevice.h>
+#include <net/gro.h>

/* General idea: XDP packets getting XDP redirected to another CPU,
* will maximum be stored/queued for one driver ->poll() call. It is
@@ -67,6 +67,8 @@ struct bpf_cpu_map_entry {
struct bpf_cpumap_val value;
struct bpf_prog *prog;

+ struct gro_node gro;
+
atomic_t refcnt; /* Control when this struct can be free'ed */
struct rcu_head rcu;

@@ -162,6 +164,7 @@ static void put_cpu_map_entry(struct bpf_cpu_map_entry *rcpu)
if (atomic_dec_and_test(&rcpu->refcnt)) {
if (rcpu->prog)
bpf_prog_put(rcpu->prog);
+ gro_cleanup(&rcpu->gro);
/* The queue should be empty at this point */
__cpu_map_ring_cleanup(rcpu->queue);
ptr_ring_cleanup(rcpu->queue, NULL);
@@ -295,6 +298,33 @@ static int cpu_map_bpf_prog_run(struct bpf_cpu_map_entry *rcpu, void **frames,
return nframes;
}

+static void cpu_map_gro_flush(struct bpf_cpu_map_entry *rcpu,
+ struct list_head *list)
+{
+ bool new = !list_empty(list);
+
+ if (likely(new))
+ gro_receive_skb_list(&rcpu->gro, list);
+
+ if (rcpu->gro.bitmask) {
+ bool flush_old = HZ >= 1000;
+
+ /* If the ring is not empty, there'll be a new iteration
+ * soon, and we only need to do a full flush if a tick is
+ * long (> 1 ms).
+ * If the ring is empty, to not hold GRO packets in the
+ * stack for too long, do a full flush.
+ * This is equivalent to how NAPI decides whether to perform
+ * a full flush (by batches of up to 64 frames tho).
+ */
+ if (__ptr_ring_empty(rcpu->queue))
+ flush_old = false;
+
+ __gro_flush(&rcpu->gro, flush_old);
+ }
+
+ gro_normal_list(&rcpu->gro);
+}

static int cpu_map_kthread_run(void *data)
{
@@ -384,7 +414,7 @@ static int cpu_map_kthread_run(void *data)

list_add_tail(&skb->list, &list);
}
- netif_receive_skb_list(&list);
+ cpu_map_gro_flush(rcpu, &list);

/* Feedback loop via tracepoint */
trace_xdp_cpumap_kthread(rcpu->map_id, n, kmem_alloc_drops,
@@ -460,8 +490,10 @@ __cpu_map_entry_alloc(struct bpf_map *map, struct bpf_cpumap_val *value,
rcpu->map_id = map->id;
rcpu->value.qsize = value->qsize;

+ gro_init(&rcpu->gro, NULL);
+
if (fd > 0 && __cpu_map_load_bpf_program(rcpu, map, fd))
- goto free_ptr_ring;
+ goto free_gro;

/* Setup kthread */
rcpu->kthread = kthread_create_on_node(cpu_map_kthread_run, rcpu, numa,
@@ -482,7 +514,8 @@ __cpu_map_entry_alloc(struct bpf_map *map, struct bpf_cpumap_val *value,
free_prog:
if (rcpu->prog)
bpf_prog_put(rcpu->prog);
-free_ptr_ring:
+free_gro:
+ gro_cleanup(&rcpu->gro);
ptr_ring_cleanup(rcpu->queue, NULL);
free_queue:
kfree(rcpu->queue);
--
2.36.1