#!/usr/bin/env python3 from __future__ import annotations import argparse import math import sys from dataclasses import dataclass, field from pathlib import Path from typing import Iterable TABLE_SECTIONS = { "mounts_by_format", "mounts_by_pack", "loads_by_format", "loads_by_phase_format", "top_phase_pack_loads", "top_extension_loads", "loader_stages", } @dataclass class Report: label: str path: Path metadata: dict[str, str] = field(default_factory=dict) phase_markers: list[tuple[str, float]] = field(default_factory=list) tables: dict[str, dict[str, dict[str, float]]] = field(default_factory=dict) def uptime_ms(self) -> float: return parse_float(self.metadata.get("uptime_ms", "0")) def current_phase(self) -> str: return self.metadata.get("current_phase", "-") def reason(self) -> str: return self.metadata.get("reason", "-") def section(self, name: str) -> dict[str, dict[str, float]]: return self.tables.get(name, {}) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Summarize or compare pack_profile.txt reports generated by the " "client-side M2 pack profiler." ), ) parser.add_argument( "reports", nargs="+", metavar="REPORT", help=( "Either /path/to/pack_profile.txt or label=/path/to/pack_profile.txt. " "Pass one report for a summary, or two or more reports for a comparison." ), ) parser.add_argument( "--top", type=int, default=6, help="How many hotspot rows to show in top-pack, top-extension and stage sections.", ) return parser.parse_args() def parse_float(value: str) -> float: try: return float(value.strip()) except ValueError: return 0.0 def parse_metric_value(value: str) -> float: value = value.strip() if value.endswith(" ms"): value = value[:-3] return parse_float(value) def parse_report_arg(value: str) -> tuple[str, Path]: if "=" in value: label, path_value = value.split("=", 1) if label and path_value: return label, Path(path_value) path = Path(value) return path.stem, path def load_report(arg_value: str) -> Report: label, path = parse_report_arg(arg_value) if not path.is_file(): raise FileNotFoundError(f"report not found: {path}") report = Report(label=label, path=path) current_section: str | None = None for raw_line in path.read_text(encoding="utf-8", errors="replace").splitlines(): line = raw_line.strip() if not line or line == "PACK PROFILE REPORT": continue if line.startswith("[") and line.endswith("]"): current_section = line[1:-1] if current_section in TABLE_SECTIONS: report.tables.setdefault(current_section, {}) continue if current_section is None: if "=" in line: key, value = line.split("=", 1) report.metadata[key.strip()] = value.strip() continue if current_section == "phase_markers": if "\t" not in line: continue phase, value = line.split("\t", 1) report.phase_markers.append((phase.strip(), parse_metric_value(value))) continue if current_section not in TABLE_SECTIONS: continue fields = [field.strip() for field in line.split("\t") if field.strip()] if not fields: continue key = fields[0] metrics: dict[str, float] = {} for field in fields[1:]: if "=" not in field: continue metric_key, metric_value = field.split("=", 1) metrics[metric_key.strip()] = parse_metric_value(metric_value) report.tables[current_section][key] = metrics return report def aggregate_phase_loads(report: Report) -> dict[str, dict[str, float]]: aggregated: dict[str, dict[str, float]] = {} for key, metrics in report.section("loads_by_phase_format").items(): phase, _, _format = key.partition("|") phase_key = phase.strip() bucket = aggregated.setdefault( phase_key, {"count": 0.0, "fail": 0.0, "bytes": 0.0, "time_ms": 0.0}, ) for metric_name in ("count", "fail", "bytes", "time_ms"): bucket[metric_name] += metrics.get(metric_name, 0.0) return aggregated def sum_section_metrics(section: dict[str, dict[str, float]]) -> dict[str, float]: totals: dict[str, float] = {} for metrics in section.values(): for key, value in metrics.items(): totals[key] = totals.get(key, 0.0) + value return totals def sum_selected_section_metrics( section: dict[str, dict[str, float]], include_keys: Iterable[str] | None = None, exclude_keys: Iterable[str] | None = None, ) -> dict[str, float]: include_set = set(include_keys or []) exclude_set = set(exclude_keys or []) filtered: dict[str, dict[str, float]] = {} for key, metrics in section.items(): if include_set and key not in include_set: continue if key in exclude_set: continue filtered[key] = metrics return sum_section_metrics(filtered) def sort_rows_by_metric( section: dict[str, dict[str, float]], metric: str, limit: int, ) -> list[tuple[str, dict[str, float]]]: rows = list(section.items()) rows.sort(key=lambda item: (-item[1].get(metric, 0.0), item[0])) return rows[:limit] def format_ms(value: float) -> str: return f"{value:.3f}" def format_delta(value: float) -> str: return f"{value:+.3f}" def format_percent(value: float | None) -> str: if value is None or math.isinf(value) or math.isnan(value): return "-" return f"{value:+.1f}%" def format_bytes(value: float) -> str: units = ("B", "KiB", "MiB", "GiB") size = float(value) unit_index = 0 while abs(size) >= 1024.0 and unit_index < len(units) - 1: size /= 1024.0 unit_index += 1 if unit_index == 0: return f"{int(round(size))} {units[unit_index]}" return f"{size:.2f} {units[unit_index]}" def format_count(value: float) -> str: return str(int(round(value))) def format_metric(metric: str, value: float) -> str: if metric in {"bytes", "in", "out"}: return format_bytes(value) if metric in {"time_ms"}: return format_ms(value) if metric in {"count", "fail", "entries"}: return format_count(value) if metric.endswith("_ms"): return format_ms(value) if metric.endswith("_us"): return f"{value:.1f}" if abs(value - round(value)) < 0.000001: return format_count(value) return f"{value:.3f}" def render_table(headers: list[str], rows: list[list[str]], numeric_columns: set[int] | None = None) -> str: numeric_columns = numeric_columns or set() widths = [len(header) for header in headers] for row in rows: for index, cell in enumerate(row): widths[index] = max(widths[index], len(cell)) lines = [] header_cells = [] for index, header in enumerate(headers): align = str.rjust if index in numeric_columns else str.ljust header_cells.append(align(header, widths[index])) lines.append(" ".join(header_cells)) divider = [] for index, width in enumerate(widths): divider.append(("-" * width)) lines.append(" ".join(divider)) for row in rows: cells = [] for index, cell in enumerate(row): align = str.rjust if index in numeric_columns else str.ljust cells.append(align(cell, widths[index])) lines.append(" ".join(cells)) return "\n".join(lines) def relative_delta_percent(base: float, candidate: float) -> float | None: if abs(base) < 0.000001: return None return ((candidate - base) / base) * 100.0 def summarize_report(report: Report, top: int) -> str: lines = [ f"== {report.label} ==", f"path: {report.path}", ( f"reason={report.reason()} phase={report.current_phase()} " f"uptime_ms={format_ms(report.uptime_ms())}" ), "", ] load_totals = sum_section_metrics(report.section("loads_by_format")) mount_totals = sum_section_metrics(report.section("mounts_by_format")) overview_rows = [[ format_count(load_totals.get("count", 0.0)), format_ms(load_totals.get("time_ms", 0.0)), format_bytes(load_totals.get("bytes", 0.0)), format_count(load_totals.get("fail", 0.0)), format_count(mount_totals.get("count", 0.0)), format_ms(mount_totals.get("time_ms", 0.0)), ]] lines.extend([ "Overview", render_table( ["loads", "load_ms", "load_bytes", "load_fail", "mounts", "mount_ms"], overview_rows, numeric_columns={0, 1, 2, 3, 4, 5}, ), "", ]) if report.phase_markers: phase_rows = [[phase, format_ms(value)] for phase, value in report.phase_markers] lines.extend([ "Phase Markers", render_table(["phase", "ms"], phase_rows, numeric_columns={1}), "", ]) loads_by_format_rows = [] for key, metrics in sort_rows_by_metric(report.section("loads_by_format"), "time_ms", top): loads_by_format_rows.append([ key, format_count(metrics.get("count", 0.0)), format_ms(metrics.get("time_ms", 0.0)), format_bytes(metrics.get("bytes", 0.0)), format_count(metrics.get("fail", 0.0)), ]) if loads_by_format_rows: lines.extend([ "Loads By Format", render_table( ["format", "count", "time_ms", "bytes", "fail"], loads_by_format_rows, numeric_columns={1, 2, 3, 4}, ), "", ]) mounts_by_format_rows = [] for key, metrics in sort_rows_by_metric(report.section("mounts_by_format"), "time_ms", top): mounts_by_format_rows.append([ key, format_count(metrics.get("count", 0.0)), format_ms(metrics.get("time_ms", 0.0)), format_count(metrics.get("entries", 0.0)), format_count(metrics.get("fail", 0.0)), ]) if mounts_by_format_rows: lines.extend([ "Mounts By Format", render_table( ["format", "count", "time_ms", "entries", "fail"], mounts_by_format_rows, numeric_columns={1, 2, 3, 4}, ), "", ]) phase_load_rows = [] for key, metrics in sort_rows_by_metric(aggregate_phase_loads(report), "time_ms", top): phase_load_rows.append([ key, format_count(metrics.get("count", 0.0)), format_ms(metrics.get("time_ms", 0.0)), format_bytes(metrics.get("bytes", 0.0)), format_count(metrics.get("fail", 0.0)), ]) if phase_load_rows: lines.extend([ "Load Time By Phase", render_table( ["phase", "count", "time_ms", "bytes", "fail"], phase_load_rows, numeric_columns={1, 2, 3, 4}, ), "", ]) stage_rows = [] for key, metrics in sort_rows_by_metric(report.section("loader_stages"), "time_ms", top): count = metrics.get("count", 0.0) avg_us = (metrics.get("time_ms", 0.0) * 1000.0 / count) if count else 0.0 stage_rows.append([ key, format_count(count), format_ms(metrics.get("time_ms", 0.0)), f"{avg_us:.1f}", format_bytes(metrics.get("in", 0.0)), format_bytes(metrics.get("out", 0.0)), ]) if stage_rows: lines.extend([ "Top Loader Stages", render_table( ["stage", "count", "time_ms", "avg_us", "in", "out"], stage_rows, numeric_columns={1, 2, 3, 4, 5}, ), "", ]) hot_pack_rows = [] for key, metrics in sort_rows_by_metric(report.section("top_phase_pack_loads"), "time_ms", top): hot_pack_rows.append([ key, format_count(metrics.get("count", 0.0)), format_ms(metrics.get("time_ms", 0.0)), format_bytes(metrics.get("bytes", 0.0)), ]) if hot_pack_rows: lines.extend([ "Top Phase Pack Loads", render_table( ["phase | pack", "count", "time_ms", "bytes"], hot_pack_rows, numeric_columns={1, 2, 3}, ), "", ]) extension_rows = [] for key, metrics in sort_rows_by_metric(report.section("top_extension_loads"), "time_ms", top): extension_rows.append([ key, format_count(metrics.get("count", 0.0)), format_ms(metrics.get("time_ms", 0.0)), format_bytes(metrics.get("bytes", 0.0)), ]) if extension_rows: lines.extend([ "Top Extensions", render_table( ["extension", "count", "time_ms", "bytes"], extension_rows, numeric_columns={1, 2, 3}, ), "", ]) return "\n".join(lines).rstrip() def compare_two_reports(left: Report, right: Report, top: int) -> str: lines = [f"== {left.label} vs {right.label} ==", ""] overview_rows = [] for report in (left, right): load_totals = sum_section_metrics(report.section("loads_by_format")) mount_totals = sum_section_metrics(report.section("mounts_by_format")) overview_rows.append([ report.label, format_ms(report.uptime_ms()), format_count(load_totals.get("count", 0.0)), format_ms(load_totals.get("time_ms", 0.0)), format_bytes(load_totals.get("bytes", 0.0)), format_count(mount_totals.get("count", 0.0)), format_ms(mount_totals.get("time_ms", 0.0)), ]) lines.extend([ "Overview", render_table( ["report", "uptime_ms", "loads", "load_ms", "load_bytes", "mounts", "mount_ms"], overview_rows, numeric_columns={1, 2, 3, 4, 5, 6}, ), "", ]) left_packed_loads = sum_selected_section_metrics(left.section("loads_by_format"), exclude_keys={"disk"}) right_packed_loads = sum_selected_section_metrics(right.section("loads_by_format"), exclude_keys={"disk"}) packed_load_rows = [[ format_count(left_packed_loads.get("count", 0.0)), format_ms(left_packed_loads.get("time_ms", 0.0)), format_bytes(left_packed_loads.get("bytes", 0.0)), format_count(right_packed_loads.get("count", 0.0)), format_ms(right_packed_loads.get("time_ms", 0.0)), format_bytes(right_packed_loads.get("bytes", 0.0)), format_delta(right_packed_loads.get("time_ms", 0.0) - left_packed_loads.get("time_ms", 0.0)), format_percent(relative_delta_percent(left_packed_loads.get("time_ms", 0.0), right_packed_loads.get("time_ms", 0.0))), ]] lines.extend([ "Packed Load Totals", render_table( [ f"{left.label}_count", f"{left.label}_ms", f"{left.label}_bytes", f"{right.label}_count", f"{right.label}_ms", f"{right.label}_bytes", "delta_ms", "delta_pct", ], packed_load_rows, numeric_columns={0, 1, 2, 3, 4, 5, 6, 7}, ), "", ]) left_packed_mounts = sum_selected_section_metrics(left.section("mounts_by_format")) right_packed_mounts = sum_selected_section_metrics(right.section("mounts_by_format")) packed_mount_rows = [[ format_count(left_packed_mounts.get("count", 0.0)), format_ms(left_packed_mounts.get("time_ms", 0.0)), format_count(left_packed_mounts.get("entries", 0.0)), format_count(right_packed_mounts.get("count", 0.0)), format_ms(right_packed_mounts.get("time_ms", 0.0)), format_count(right_packed_mounts.get("entries", 0.0)), format_delta(right_packed_mounts.get("time_ms", 0.0) - left_packed_mounts.get("time_ms", 0.0)), format_percent(relative_delta_percent(left_packed_mounts.get("time_ms", 0.0), right_packed_mounts.get("time_ms", 0.0))), ]] if packed_mount_rows: lines.extend([ "Packed Mount Totals", render_table( [ f"{left.label}_count", f"{left.label}_ms", f"{left.label}_entries", f"{right.label}_count", f"{right.label}_ms", f"{right.label}_entries", "delta_ms", "delta_pct", ], packed_mount_rows, numeric_columns={0, 1, 2, 3, 4, 5, 6, 7}, ), "", ]) left_phases = dict(left.phase_markers) right_phases = dict(right.phase_markers) ordered_phases = [phase for phase, _ in left.phase_markers] ordered_phases.extend(phase for phase, _ in right.phase_markers if phase not in left_phases) phase_rows = [] for phase in ordered_phases: left_value = left_phases.get(phase) right_value = right_phases.get(phase) if left_value is None and right_value is None: continue delta = (right_value or 0.0) - (left_value or 0.0) delta_pct = relative_delta_percent(left_value or 0.0, right_value or 0.0) phase_rows.append([ phase, "-" if left_value is None else format_ms(left_value), "-" if right_value is None else format_ms(right_value), format_delta(delta), format_percent(delta_pct), ]) if phase_rows: lines.extend([ "Phase Markers", render_table( ["phase", left.label, right.label, "delta_ms", "delta_pct"], phase_rows, numeric_columns={1, 2, 3, 4}, ), "", ]) left_phase_loads = aggregate_phase_loads(left) right_phase_loads = aggregate_phase_loads(right) phase_names = sorted(set(left_phase_loads) | set(right_phase_loads)) phase_load_rows = [] for phase in phase_names: left_metrics = left_phase_loads.get(phase, {}) right_metrics = right_phase_loads.get(phase, {}) left_time = left_metrics.get("time_ms", 0.0) right_time = right_metrics.get("time_ms", 0.0) phase_load_rows.append([ phase, format_ms(left_time), format_ms(right_time), format_delta(right_time - left_time), format_percent(relative_delta_percent(left_time, right_time)), format_count(left_metrics.get("count", 0.0)), format_count(right_metrics.get("count", 0.0)), ]) if phase_load_rows: phase_load_rows.sort(key=lambda row: (-max(parse_float(row[1]), parse_float(row[2])), row[0])) lines.extend([ "Load Time By Phase", render_table( ["phase", f"{left.label}_ms", f"{right.label}_ms", "delta_ms", "delta_pct", f"{left.label}_count", f"{right.label}_count"], phase_load_rows, numeric_columns={1, 2, 3, 4, 5, 6}, ), "", ]) format_names = sorted(set(left.section("loads_by_format")) | set(right.section("loads_by_format"))) format_rows = [] for format_name in format_names: left_metrics = left.section("loads_by_format").get(format_name, {}) right_metrics = right.section("loads_by_format").get(format_name, {}) left_time = left_metrics.get("time_ms", 0.0) right_time = right_metrics.get("time_ms", 0.0) format_rows.append([ format_name, format_count(left_metrics.get("count", 0.0)), format_ms(left_time), format_bytes(left_metrics.get("bytes", 0.0)), format_count(right_metrics.get("count", 0.0)), format_ms(right_time), format_bytes(right_metrics.get("bytes", 0.0)), format_delta(right_time - left_time), format_percent(relative_delta_percent(left_time, right_time)), ]) if format_rows: format_rows.sort(key=lambda row: (-max(parse_float(row[2]), parse_float(row[5])), row[0])) lines.extend([ "Loads By Format", render_table( [ "format", f"{left.label}_count", f"{left.label}_ms", f"{left.label}_bytes", f"{right.label}_count", f"{right.label}_ms", f"{right.label}_bytes", "delta_ms", "delta_pct", ], format_rows, numeric_columns={1, 2, 3, 4, 5, 6, 7, 8}, ), "", ]) mount_names = sorted(set(left.section("mounts_by_format")) | set(right.section("mounts_by_format"))) mount_rows = [] for format_name in mount_names: left_metrics = left.section("mounts_by_format").get(format_name, {}) right_metrics = right.section("mounts_by_format").get(format_name, {}) left_time = left_metrics.get("time_ms", 0.0) right_time = right_metrics.get("time_ms", 0.0) mount_rows.append([ format_name, format_count(left_metrics.get("count", 0.0)), format_ms(left_time), format_count(left_metrics.get("entries", 0.0)), format_count(right_metrics.get("count", 0.0)), format_ms(right_time), format_count(right_metrics.get("entries", 0.0)), format_delta(right_time - left_time), format_percent(relative_delta_percent(left_time, right_time)), ]) if mount_rows: mount_rows.sort(key=lambda row: (-max(parse_float(row[2]), parse_float(row[5])), row[0])) lines.extend([ "Mounts By Format", render_table( [ "format", f"{left.label}_count", f"{left.label}_ms", f"{left.label}_entries", f"{right.label}_count", f"{right.label}_ms", f"{right.label}_entries", "delta_ms", "delta_pct", ], mount_rows, numeric_columns={1, 2, 3, 4, 5, 6, 7, 8}, ), "", ]) stage_names = sorted(set(left.section("loader_stages")) | set(right.section("loader_stages"))) stage_rows = [] for stage_name in stage_names: left_metrics = left.section("loader_stages").get(stage_name, {}) right_metrics = right.section("loader_stages").get(stage_name, {}) left_time = left_metrics.get("time_ms", 0.0) right_time = right_metrics.get("time_ms", 0.0) left_count = left_metrics.get("count", 0.0) right_count = right_metrics.get("count", 0.0) left_avg_us = (left_time * 1000.0 / left_count) if left_count else 0.0 right_avg_us = (right_time * 1000.0 / right_count) if right_count else 0.0 stage_rows.append([ stage_name, format_ms(left_time), f"{left_avg_us:.1f}", format_ms(right_time), f"{right_avg_us:.1f}", format_delta(right_time - left_time), format_percent(relative_delta_percent(left_time, right_time)), ]) if stage_rows: stage_rows.sort(key=lambda row: (-max(parse_float(row[1]), parse_float(row[3])), row[0])) lines.extend([ "Loader Stages", render_table( [ "stage", f"{left.label}_ms", f"{left.label}_avg_us", f"{right.label}_ms", f"{right.label}_avg_us", "delta_ms", "delta_pct", ], stage_rows[:top], numeric_columns={1, 2, 3, 4, 5, 6}, ), "", ]) for report in (left, right): hot_rows = [] for key, metrics in sort_rows_by_metric(report.section("top_phase_pack_loads"), "time_ms", top): hot_rows.append([ key, format_count(metrics.get("count", 0.0)), format_ms(metrics.get("time_ms", 0.0)), format_bytes(metrics.get("bytes", 0.0)), ]) if hot_rows: lines.extend([ f"Top Phase Pack Loads: {report.label}", render_table( ["phase | pack", "count", "time_ms", "bytes"], hot_rows, numeric_columns={1, 2, 3}, ), "", ]) return "\n".join(lines).rstrip() def summarize_many_reports(reports: list[Report], top: int) -> str: if len(reports) == 2: return compare_two_reports(reports[0], reports[1], top) parts = [] for report in reports: parts.append(summarize_report(report, top)) return "\n\n".join(parts) def main() -> int: args = parse_args() try: reports = [load_report(value) for value in args.reports] except FileNotFoundError as exc: print(str(exc), file=sys.stderr) return 1 print(summarize_many_reports(reports, max(args.top, 1))) return 0 if __name__ == "__main__": raise SystemExit(main())