[PATCH net-next 3/4] selftests: net: move common xdp.py functions into lib
From: Chris J Arges
Date: Fri Feb 27 2026 - 12:46:40 EST
This moves a few functions which can be useful to other python programs
that manipulate XDP programs. This also refactors xdp.py to use the
refactored functions.
Signed-off-by: Chris J Arges <carges@xxxxxxxxxxxxxx>
---
.../selftests/drivers/net/lib/py/__init__.py | 2 +
tools/testing/selftests/drivers/net/xdp.py | 95 +++++--------------
.../testing/selftests/net/lib/py/__init__.py | 2 +
tools/testing/selftests/net/lib/py/bpf.py | 68 +++++++++++++
4 files changed, 95 insertions(+), 72 deletions(-)
create mode 100644 tools/testing/selftests/net/lib/py/bpf.py
diff --git a/tools/testing/selftests/drivers/net/lib/py/__init__.py b/tools/testing/selftests/drivers/net/lib/py/__init__.py
index 8b75faa9af6d..29c4913cd434 100644
--- a/tools/testing/selftests/drivers/net/lib/py/__init__.py
+++ b/tools/testing/selftests/drivers/net/lib/py/__init__.py
@@ -23,6 +23,7 @@ try:
from net.lib.py import CmdExitFailure
from net.lib.py import bkg, cmd, bpftool, bpftrace, defer, ethtool, \
fd_read_timeout, ip, rand_port, wait_port_listen, wait_file
+ from net.lib.py import bpf_map_set, bpf_map_dump, bpf_prog_map_ids
from net.lib.py import KsftSkipEx, KsftFailEx, KsftXfailEx
from net.lib.py import ksft_disruptive, ksft_exit, ksft_pr, ksft_run, \
ksft_setup, ksft_variants, KsftNamedVariant
@@ -36,6 +37,7 @@ try:
"bkg", "cmd", "bpftool", "bpftrace", "defer", "ethtool",
"fd_read_timeout", "ip", "rand_port",
"wait_port_listen", "wait_file",
+ "bpf_map_set", "bpf_map_dump", "bpf_prog_map_ids",
"KsftSkipEx", "KsftFailEx", "KsftXfailEx",
"ksft_disruptive", "ksft_exit", "ksft_pr", "ksft_run",
"ksft_setup", "ksft_variants", "KsftNamedVariant",
diff --git a/tools/testing/selftests/drivers/net/xdp.py b/tools/testing/selftests/drivers/net/xdp.py
index e54df158dfe9..10d821156db1 100755
--- a/tools/testing/selftests/drivers/net/xdp.py
+++ b/tools/testing/selftests/drivers/net/xdp.py
@@ -16,7 +16,8 @@ from lib.py import KsftNamedVariant, ksft_variants
from lib.py import KsftFailEx, NetDrvEpEnv
from lib.py import EthtoolFamily, NetdevFamily, NlError
from lib.py import bkg, cmd, rand_port, wait_port_listen
-from lib.py import ip, bpftool, defer
+from lib.py import ip, defer
+from lib.py import bpf_map_set, bpf_map_dump, bpf_prog_map_ids
class TestConfig(Enum):
@@ -122,47 +123,11 @@ def _load_xdp_prog(cfg, bpf_info):
xdp_info = ip(f"-d link show dev {cfg.ifname}", json=True)[0]
prog_info["id"] = xdp_info["xdp"]["prog"]["id"]
prog_info["name"] = xdp_info["xdp"]["prog"]["name"]
- prog_id = prog_info["id"]
-
- map_ids = bpftool(f"prog show id {prog_id}", json=True)["map_ids"]
- prog_info["maps"] = {}
- for map_id in map_ids:
- name = bpftool(f"map show id {map_id}", json=True)["name"]
- prog_info["maps"][name] = map_id
+ prog_info["maps"] = bpf_prog_map_ids(prog_info["id"])
return prog_info
-def format_hex_bytes(value):
- """
- Helper function that converts an integer into a formatted hexadecimal byte string.
-
- Args:
- value: An integer representing the number to be converted.
-
- Returns:
- A string representing hexadecimal equivalent of value, with bytes separated by spaces.
- """
- hex_str = value.to_bytes(4, byteorder='little', signed=True)
- return ' '.join(f'{byte:02x}' for byte in hex_str)
-
-
-def _set_xdp_map(map_name, key, value):
- """
- Updates an XDP map with a given key-value pair using bpftool.
-
- Args:
- map_name: The name of the XDP map to update.
- key: The key to update in the map, formatted as a hexadecimal string.
- value: The value to associate with the key, formatted as a hexadecimal string.
- """
- key_formatted = format_hex_bytes(key)
- value_formatted = format_hex_bytes(value)
- bpftool(
- f"map update name {map_name} key hex {key_formatted} value hex {value_formatted}"
- )
-
-
def _get_stats(xdp_map_id):
"""
Retrieves and formats statistics from an XDP map.
@@ -177,25 +142,11 @@ def _get_stats(xdp_map_id):
Raises:
KsftFailEx: If the stats retrieval fails.
"""
- stats_dump = bpftool(f"map dump id {xdp_map_id}", json=True)
- if not stats_dump:
+ stats = bpf_map_dump(xdp_map_id)
+ if not stats:
raise KsftFailEx(f"Failed to get stats for map {xdp_map_id}")
- stats_formatted = {}
- for key in range(0, 5):
- val = stats_dump[key]["formatted"]["value"]
- if stats_dump[key]["formatted"]["key"] == XDPStats.RX.value:
- stats_formatted[XDPStats.RX.value] = val
- elif stats_dump[key]["formatted"]["key"] == XDPStats.PASS.value:
- stats_formatted[XDPStats.PASS.value] = val
- elif stats_dump[key]["formatted"]["key"] == XDPStats.DROP.value:
- stats_formatted[XDPStats.DROP.value] = val
- elif stats_dump[key]["formatted"]["key"] == XDPStats.TX.value:
- stats_formatted[XDPStats.TX.value] = val
- elif stats_dump[key]["formatted"]["key"] == XDPStats.ABORT.value:
- stats_formatted[XDPStats.ABORT.value] = val
-
- return stats_formatted
+ return stats
def _test_pass(cfg, bpf_info, msg_sz):
@@ -211,8 +162,8 @@ def _test_pass(cfg, bpf_info, msg_sz):
prog_info = _load_xdp_prog(cfg, bpf_info)
port = rand_port()
- _set_xdp_map("map_xdp_setup", TestConfig.MODE.value, XDPAction.PASS.value)
- _set_xdp_map("map_xdp_setup", TestConfig.PORT.value, port)
+ bpf_map_set("map_xdp_setup", TestConfig.MODE.value, XDPAction.PASS.value)
+ bpf_map_set("map_xdp_setup", TestConfig.PORT.value, port)
ksft_eq(_test_udp(cfg, port, msg_sz), True, "UDP packet exchange failed")
stats = _get_stats(prog_info["maps"]["map_xdp_stats"])
@@ -258,8 +209,8 @@ def _test_drop(cfg, bpf_info, msg_sz):
prog_info = _load_xdp_prog(cfg, bpf_info)
port = rand_port()
- _set_xdp_map("map_xdp_setup", TestConfig.MODE.value, XDPAction.DROP.value)
- _set_xdp_map("map_xdp_setup", TestConfig.PORT.value, port)
+ bpf_map_set("map_xdp_setup", TestConfig.MODE.value, XDPAction.DROP.value)
+ bpf_map_set("map_xdp_setup", TestConfig.PORT.value, port)
ksft_eq(_test_udp(cfg, port, msg_sz), False, "UDP packet exchange should fail")
stats = _get_stats(prog_info["maps"]["map_xdp_stats"])
@@ -305,8 +256,8 @@ def _test_xdp_native_tx(cfg, bpf_info, payload_lens):
prog_info = _load_xdp_prog(cfg, bpf_info)
port = rand_port()
- _set_xdp_map("map_xdp_setup", TestConfig.MODE.value, XDPAction.TX.value)
- _set_xdp_map("map_xdp_setup", TestConfig.PORT.value, port)
+ bpf_map_set("map_xdp_setup", TestConfig.MODE.value, XDPAction.TX.value)
+ bpf_map_set("map_xdp_setup", TestConfig.PORT.value, port)
expected_pkts = 0
for payload_len in payload_lens:
@@ -454,15 +405,15 @@ def _test_xdp_native_tail_adjst(cfg, pkt_sz_lst, offset_lst):
prog_info = _load_xdp_prog(cfg, bpf_info)
# Configure the XDP map for tail adjustment
- _set_xdp_map("map_xdp_setup", TestConfig.MODE.value, XDPAction.TAIL_ADJST.value)
- _set_xdp_map("map_xdp_setup", TestConfig.PORT.value, port)
+ bpf_map_set("map_xdp_setup", TestConfig.MODE.value, XDPAction.TAIL_ADJST.value)
+ bpf_map_set("map_xdp_setup", TestConfig.PORT.value, port)
for offset in offset_lst:
tag = format(random.randint(65, 90), "02x")
- _set_xdp_map("map_xdp_setup", TestConfig.ADJST_OFFSET.value, offset)
+ bpf_map_set("map_xdp_setup", TestConfig.ADJST_OFFSET.value, offset)
if offset > 0:
- _set_xdp_map("map_xdp_setup", TestConfig.ADJST_TAG.value, int(tag, 16))
+ bpf_map_set("map_xdp_setup", TestConfig.ADJST_TAG.value, int(tag, 16))
for pkt_sz in pkt_sz_lst:
test_str = "".join(random.choice(string.ascii_lowercase) for _ in range(pkt_sz))
@@ -574,8 +525,8 @@ def _test_xdp_native_head_adjst(cfg, prog, pkt_sz_lst, offset_lst):
prog_info = _load_xdp_prog(cfg, BPFProgInfo(prog, "xdp_native.bpf.o", "xdp.frags", 9000))
port = rand_port()
- _set_xdp_map("map_xdp_setup", TestConfig.MODE.value, XDPAction.HEAD_ADJST.value)
- _set_xdp_map("map_xdp_setup", TestConfig.PORT.value, port)
+ bpf_map_set("map_xdp_setup", TestConfig.MODE.value, XDPAction.HEAD_ADJST.value)
+ bpf_map_set("map_xdp_setup", TestConfig.PORT.value, port)
hds_thresh = get_hds_thresh(cfg)
for offset in offset_lst:
@@ -595,11 +546,11 @@ def _test_xdp_native_head_adjst(cfg, prog, pkt_sz_lst, offset_lst):
test_str = ''.join(random.choice(string.ascii_lowercase) for _ in range(pkt_sz))
tag = format(random.randint(65, 90), '02x')
- _set_xdp_map("map_xdp_setup",
+ bpf_map_set("map_xdp_setup",
TestConfig.ADJST_OFFSET.value,
offset)
- _set_xdp_map("map_xdp_setup", TestConfig.ADJST_TAG.value, int(tag, 16))
- _set_xdp_map("map_xdp_setup", TestConfig.ADJST_OFFSET.value, offset)
+ bpf_map_set("map_xdp_setup", TestConfig.ADJST_TAG.value, int(tag, 16))
+ bpf_map_set("map_xdp_setup", TestConfig.ADJST_OFFSET.value, offset)
recvd_str = _exchg_udp(cfg, port, test_str)
@@ -691,8 +642,8 @@ def test_xdp_native_qstats(cfg, act):
prog_info = _load_xdp_prog(cfg, bpf_info)
port = rand_port()
- _set_xdp_map("map_xdp_setup", TestConfig.MODE.value, act.value)
- _set_xdp_map("map_xdp_setup", TestConfig.PORT.value, port)
+ bpf_map_set("map_xdp_setup", TestConfig.MODE.value, act.value)
+ bpf_map_set("map_xdp_setup", TestConfig.PORT.value, port)
# Discard the input, but we need a listener to avoid ICMP errors
rx_udp = f"socat -{cfg.addr_ipver} -T 2 -u UDP-RECV:{port},reuseport " + \
diff --git a/tools/testing/selftests/net/lib/py/__init__.py b/tools/testing/selftests/net/lib/py/__init__.py
index f528b67639de..c1aa9d0f8dcf 100644
--- a/tools/testing/selftests/net/lib/py/__init__.py
+++ b/tools/testing/selftests/net/lib/py/__init__.py
@@ -14,6 +14,7 @@ from .netns import NetNS, NetNSEnter
from .nsim import NetdevSim, NetdevSimDev
from .utils import CmdExitFailure, fd_read_timeout, cmd, bkg, defer, \
bpftool, ip, ethtool, bpftrace, rand_port, wait_port_listen, wait_file, tool
+from .bpf import bpf_map_set, bpf_map_dump, bpf_prog_map_ids
from .ynl import NlError, YnlFamily, EthtoolFamily, NetdevFamily, RtnlFamily, RtnlAddrFamily
from .ynl import NetshaperFamily, DevlinkFamily, PSPFamily
@@ -27,6 +28,7 @@ __all__ = ["KSRC",
"CmdExitFailure", "fd_read_timeout", "cmd", "bkg", "defer",
"bpftool", "ip", "ethtool", "bpftrace", "rand_port",
"wait_port_listen", "wait_file", "tool",
+ "bpf_map_set", "bpf_map_dump", "bpf_prog_map_ids",
"NetdevSim", "NetdevSimDev",
"NetshaperFamily", "DevlinkFamily", "PSPFamily", "NlError",
"YnlFamily", "EthtoolFamily", "NetdevFamily", "RtnlFamily",
diff --git a/tools/testing/selftests/net/lib/py/bpf.py b/tools/testing/selftests/net/lib/py/bpf.py
new file mode 100644
index 000000000000..96b29d41c34b
--- /dev/null
+++ b/tools/testing/selftests/net/lib/py/bpf.py
@@ -0,0 +1,68 @@
+# SPDX-License-Identifier: GPL-2.0
+
+"""
+BPF helper utilities for kernel selftests.
+
+Provides common operations for interacting with BPF maps and programs
+via bpftool, used by XDP and other BPF-based test files.
+"""
+
+from .utils import bpftool
+
+def format_hex_bytes(value):
+ """
+ Helper function that converts an integer into a formatted hexadecimal byte string.
+
+ Args:
+ value: An integer representing the number to be converted.
+
+ Returns:
+ A string representing hexadecimal equivalent of value, with bytes separated by spaces.
+ """
+ hex_str = value.to_bytes(4, byteorder='little', signed=True)
+ return ' '.join(f'{byte:02x}' for byte in hex_str)
+
+
+def bpf_map_set(map_name, key, value):
+ """
+ Updates an XDP map with a given key-value pair using bpftool.
+
+ Args:
+ map_name: The name of the XDP map to update.
+ key: The key to update in the map, formatted as a hexadecimal string.
+ value: The value to associate with the key, formatted as a hexadecimal string.
+ """
+ key_formatted = format_hex_bytes(key)
+ value_formatted = format_hex_bytes(value)
+ bpftool(
+ f"map update name {map_name} key hex {key_formatted} value hex {value_formatted}"
+ )
+
+def bpf_map_dump(map_id):
+ """Dump all entries of a BPF array map.
+
+ Args:
+ map_id: Numeric map ID (as returned by bpftool prog show).
+
+ Returns:
+ A dict mapping formatted key (int) to formatted value (int).
+ """
+ raw = bpftool(f"map dump id {map_id}", json=True)
+ return {e["formatted"]["key"]: e["formatted"]["value"] for e in raw}
+
+
+def bpf_prog_map_ids(prog_id):
+ """Get the map name-to-ID mapping for a loaded BPF program.
+
+ Args:
+ prog_id: Numeric program ID.
+
+ Returns:
+ A dict mapping map name (str) to map ID (int).
+ """
+ map_ids = bpftool(f"prog show id {prog_id}", json=True)["map_ids"]
+ maps = {}
+ for mid in map_ids:
+ name = bpftool(f"map show id {mid}", json=True)["name"]
+ maps[name] = mid
+ return maps
--
2.43.0