[PATCH v1 50/58] perf rwtop: Port rwtop to use python module

From: Ian Rogers

Date: Sun Apr 19 2026 - 20:09:55 EST


Port the legacy Perl script rwtop.pl to a python script using the perf
module in tools/perf/python.

The new script uses a class-based architecture and leverages the
perf.session API for event processing.

It periodically displays system-wide r/w call activity, broken down by
PID, refreshed every interval.

Complications:
- Implemented periodic display based on event timestamps
(sample.sample_time) instead of relying on SIGALRM, making it robust
for file-based processing.
- Used ANSI escape codes (\x1b[H\x1b[2J) to clear the terminal.
- Fixed unused imports and indentation issues identified by pylint.
- pylint warns about the module name not being snake_case, but it is
kept for consistency with the original script name.

Assisted-by: Gemini:gemini-3.1-pro-preview
Signed-off-by: Ian Rogers <irogers@xxxxxxxxxx>
---
tools/perf/python/rwtop.py | 179 +++++++++++++++++++++++++++++++++++++
1 file changed, 179 insertions(+)
create mode 100755 tools/perf/python/rwtop.py

diff --git a/tools/perf/python/rwtop.py b/tools/perf/python/rwtop.py
new file mode 100755
index 000000000000..e895b34b7114
--- /dev/null
+++ b/tools/perf/python/rwtop.py
@@ -0,0 +1,179 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0-only
+"""Periodically displays system-wide r/w call activity, broken down by pid."""
+
+import argparse
+from collections import defaultdict
+import sys
+from typing import Optional, Dict, Any
+import perf
+
+class RwTop:
+ """Periodically displays system-wide r/w call activity."""
+ def __init__(self, interval: int = 3, nlines: int = 20) -> None:
+ self.interval_ns = interval * 1000000000
+ self.nlines = nlines
+ self.reads: Dict[int, Dict[str, Any]] = defaultdict(
+ lambda: {
+ "bytes_requested": 0,
+ "bytes_read": 0,
+ "total_reads": 0,
+ "comm": "",
+ "errors": defaultdict(int),
+ }
+ )
+ self.writes: Dict[int, Dict[str, Any]] = defaultdict(
+ lambda: {
+ "bytes_written": 0,
+ "total_writes": 0,
+ "comm": "",
+ "errors": defaultdict(int),
+ }
+ )
+ self.unhandled: Dict[str, int] = defaultdict(int)
+ self.session: Optional[perf.session] = None
+ self.last_print_time: int = 0
+
+ def process_event(self, sample: perf.sample_event) -> None:
+ """Process events."""
+ event_name = str(sample.evsel)
+ pid = sample.sample_pid
+ sample_time = sample.sample_time
+
+ if self.last_print_time == 0:
+ self.last_print_time = sample_time
+
+ # Check if interval has passed
+ if sample_time - self.last_print_time >= self.interval_ns:
+ self.print_totals()
+ self.last_print_time = sample_time
+
+ assert self.session is not None
+ try:
+ comm = self.session.process(pid).comm()
+ except Exception: # pylint: disable=broad-except
+ comm = "unknown"
+
+ if "sys_enter_read" in event_name:
+ self._handle_sys_enter_read(sample, pid, comm)
+ elif "sys_exit_read" in event_name:
+ self._handle_sys_exit_read(sample, pid)
+ elif "sys_enter_write" in event_name:
+ self._handle_sys_enter_write(sample, pid, comm)
+ elif "sys_exit_write" in event_name:
+ self._handle_sys_exit_write(sample, pid)
+ else:
+ self.unhandled[event_name] += 1
+
+ def _handle_sys_enter_read(self, sample: perf.sample_event, pid: int, comm: str) -> None:
+ try:
+ count = sample.count
+ self.reads[pid]["bytes_requested"] += count
+ self.reads[pid]["total_reads"] += 1
+ self.reads[pid]["comm"] = comm
+ except AttributeError:
+ pass
+
+ def _handle_sys_exit_read(self, sample: perf.sample_event, pid: int) -> None:
+ try:
+ ret = sample.ret
+ if ret > 0:
+ self.reads[pid]["bytes_read"] += ret
+ else:
+ self.reads[pid]["errors"][ret] += 1
+ except AttributeError:
+ pass
+
+ def _handle_sys_enter_write(self, sample: perf.sample_event, pid: int, comm: str) -> None:
+ try:
+ count = sample.count
+ self.writes[pid]["bytes_written"] += count
+ self.writes[pid]["total_writes"] += 1
+ self.writes[pid]["comm"] = comm
+ except AttributeError:
+ pass
+
+ def _handle_sys_exit_write(self, sample: perf.sample_event, pid: int) -> None:
+ try:
+ ret = sample.ret
+ if ret <= 0:
+ self.writes[pid]["errors"][ret] += 1
+ except AttributeError:
+ pass
+
+ def print_totals(self) -> None:
+ """Print summary tables."""
+ # Clear terminal using ANSI escape codes
+ print("\x1b[H\x1b[2J", end="")
+
+ print("read counts by pid:\n")
+ print(
+ f"{'pid':>6s} {'comm':<20s} {'# reads':>10s} "
+ f"{'bytes_req':>10s} {'bytes_read':>10s}"
+ )
+ print(f"{'-'*6} {'-'*20} {'-'*10} {'-'*10} {'-'*10}")
+
+ count = 0
+ for pid, data in sorted(self.reads.items(),
+ key=lambda kv: kv[1]["bytes_read"], reverse=True):
+ print(
+ f"{pid:6d} {data['comm']:<20s} {data['total_reads']:10d} "
+ f"{data['bytes_requested']:10d} {data['bytes_read']:10d}"
+ )
+ count += 1
+ if count >= self.nlines:
+ break
+
+ print("\nwrite counts by pid:\n")
+ print(f"{'pid':>6s} {'comm':<20s} {'# writes':>10s} {'bytes_written':>13s}")
+ print(f"{'-'*6} {'-'*20} {'-'*10} {'-'*13}")
+
+ count = 0
+ for pid, data in sorted(self.writes.items(),
+ key=lambda kv: kv[1]["bytes_written"], reverse=True):
+ print(
+ f"{pid:6d} {data['comm']:<20s} "
+ f"{data['total_writes']:10d} {data['bytes_written']:13d}"
+ )
+ count += 1
+ if count >= self.nlines:
+ break
+
+ # Reset counts
+ self.reads.clear()
+ self.writes.clear()
+
+ def run(self, input_file: str) -> None:
+ """Run the session."""
+ self.session = perf.session(perf.data(input_file), sample=self.process_event)
+ self.session.process_events()
+
+ # Print final totals if there are any left
+ if self.reads or self.writes:
+ self.print_totals()
+
+ if self.unhandled:
+ print("\nunhandled events:\n")
+ print(f"{'event':<40s} {'count':>10s}")
+ print(f"{'-'*40} {'-'*10}")
+ for event_name, count in self.unhandled.items():
+ print(f"{event_name:<40s} {count:10d}")
+
+def main() -> None:
+ """Main function."""
+ parser = argparse.ArgumentParser(description="Trace r/w activity by PID")
+ parser.add_argument(
+ "interval", type=int, nargs="?", default=3, help="Refresh interval in seconds"
+ )
+ parser.add_argument("-i", "--input", default="perf.data", help="Input file")
+ args = parser.parse_args()
+
+ analyzer = RwTop(args.interval)
+ try:
+ analyzer.run(args.input)
+ except IOError as e:
+ print(e, file=sys.stderr)
+ sys.exit(1)
+
+if __name__ == "__main__":
+ main()
--
2.54.0.rc1.513.gad8abe7a5a-goog