[PATCH RFC bpf-next 33/52] bpf, cpumap: add option to set a timeout for deferred flush
From: Alexander Lobakin
Date: Tue Jun 28 2022 - 15:54:59 EST
GRO efficiency depends a lot on the batch size. With the size of 8,
it is less efficient than e.g. with NAPI and the size of 64.
To do less percentage of full flushes and not hold GRO packets for
too long, use the GRO hrtimer to wake up the kthread even if there's
no new frames in the ptr_ring. Its value is being passed from the
user side inside the corresponding &bpf_cpumap_val on map creation,
in nanoseconds.
When the timeout is 0/unset, the behaviour is the same as it was
prior to the change.
Signed-off-by: Alexander Lobakin <alexandr.lobakin@xxxxxxxxx>
---
include/uapi/linux/bpf.h | 1 +
kernel/bpf/cpumap.c | 39 +++++++++++++++++++++++++++++-----
tools/include/uapi/linux/bpf.h | 1 +
3 files changed, 36 insertions(+), 5 deletions(-)
diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
index 1caaec1de625..097719ee2172 100644
--- a/include/uapi/linux/bpf.h
+++ b/include/uapi/linux/bpf.h
@@ -5989,6 +5989,7 @@ struct bpf_cpumap_val {
int fd; /* prog fd on map write */
__u32 id; /* prog id on map read */
} bpf_prog;
+ __u64 timeout; /* timeout to wait for new packets, in ns */
};
enum sk_action {
diff --git a/kernel/bpf/cpumap.c b/kernel/bpf/cpumap.c
index 2d0edf8f6a05..145f49de0931 100644
--- a/kernel/bpf/cpumap.c
+++ b/kernel/bpf/cpumap.c
@@ -95,7 +95,8 @@ static struct bpf_map *cpu_map_alloc(union bpf_attr *attr)
/* check sanity of attributes */
if (attr->max_entries == 0 || attr->key_size != 4 ||
(value_size != offsetofend(struct bpf_cpumap_val, qsize) &&
- value_size != offsetofend(struct bpf_cpumap_val, bpf_prog.fd)) ||
+ value_size != offsetofend(struct bpf_cpumap_val, bpf_prog.fd) &&
+ value_size != offsetofend(struct bpf_cpumap_val, timeout)) ||
attr->map_flags & ~BPF_F_NUMA_NODE)
return ERR_PTR(-EINVAL);
@@ -312,18 +313,42 @@ static void cpu_map_gro_flush(struct bpf_cpu_map_entry *rcpu,
/* If the ring is not empty, there'll be a new iteration
* soon, and we only need to do a full flush if a tick is
* long (> 1 ms).
- * If the ring is empty, to not hold GRO packets in the
- * stack for too long, do a full flush.
+ * If the ring is empty, and there were some new packets
+ * processed, either do a partial flush and spin up a timer
+ * to flush the rest if the timeout is set, or do a full
+ * flush otherwise.
+ * No new packets with non-zero gro_bitmask can mean that we
+ * probably came from the timer call and/or there's [almost]
+ * no activity here right now. To not hold GRO packets in
+ * the stack for too long, do a full flush.
* This is equivalent to how NAPI decides whether to perform
* a full flush (by batches of up to 64 frames tho).
*/
if (__ptr_ring_empty(rcpu->queue))
- flush_old = false;
+ flush_old = new ? !!rcpu->value.timeout : false;
__gro_flush(&rcpu->gro, flush_old);
}
gro_normal_list(&rcpu->gro);
+
+ /* Non-zero gro_bitmask at this point means that we have some packets
+ * held in the GRO engine after a partial flush. If we have a timeout
+ * set up, and there are no signs of a new kthread iteration, launch
+ * a timer to flush them as well.
+ */
+ if (rcpu->gro.bitmask && __ptr_ring_empty(rcpu->queue))
+ gro_timer_start(&rcpu->gro, rcpu->value.timeout);
+}
+
+static enum hrtimer_restart cpu_map_gro_watchdog(struct hrtimer *timer)
+{
+ const struct bpf_cpu_map_entry *rcpu;
+
+ rcpu = container_of(timer, typeof(*rcpu), gro.timer);
+ wake_up_process(rcpu->kthread);
+
+ return HRTIMER_NORESTART;
}
static int cpu_map_kthread_run(void *data)
@@ -489,8 +514,9 @@ __cpu_map_entry_alloc(struct bpf_map *map, struct bpf_cpumap_val *value,
rcpu->cpu = cpu;
rcpu->map_id = map->id;
rcpu->value.qsize = value->qsize;
+ rcpu->value.timeout = value->timeout;
- gro_init(&rcpu->gro, NULL);
+ gro_init(&rcpu->gro, cpu_map_gro_watchdog);
if (fd > 0 && __cpu_map_load_bpf_program(rcpu, map, fd))
goto free_gro;
@@ -606,6 +632,9 @@ static int cpu_map_update_elem(struct bpf_map *map, void *key, void *value,
return -EEXIST;
if (unlikely(cpumap_value.qsize > 16384)) /* sanity limit on qsize */
return -EOVERFLOW;
+ /* Don't allow timeout longer than 1 ms -- 1 tick on HZ == 1000 */
+ if (unlikely(cpumap_value.timeout > 1 * NSEC_PER_MSEC))
+ return -ERANGE;
/* Make sure CPU is a valid possible cpu */
if (key_cpu >= nr_cpumask_bits || !cpu_possible(key_cpu))
diff --git a/tools/include/uapi/linux/bpf.h b/tools/include/uapi/linux/bpf.h
index 436b925adfb3..a3579cdb0225 100644
--- a/tools/include/uapi/linux/bpf.h
+++ b/tools/include/uapi/linux/bpf.h
@@ -5989,6 +5989,7 @@ struct bpf_cpumap_val {
int fd; /* prog fd on map write */
__u32 id; /* prog id on map read */
} bpf_prog;
+ __u64 timeout; /* timeout to wait for new packets, in ns */
};
enum sk_action {
--
2.36.1