flinch_trace_tools (copy/paste kit) ================================== This is a boring measurement kit for the "is there a ~724ms pause and is it compute or waiting?" argument. You get: - nvml_trace.py: samples NVML at ~10ms to a CSV, and can record MARK labels via a FIFO - analyze_trace.py: computes duration + rough GPU energy between two MARK labels ------------------------------------------------------------ nvml_trace.py ------------------------------------------------------------ Save as: nvml_trace.py #!/usr/bin/env python3 """ nvml_trace.py — minimal NVML (+ optional Intel RAPL) sampler that writes CSV. Goal: produce a trace.csv that can answer: - did GPU power/util drop during the alleged pause (waiting: queue/network/backpressure)? - or stay high (actual compute: extra pass/safety sidecar/batching pathology)? Features: - Samples NVIDIA GPU power/util/mem util/SM clock/temp via NVML (pynvml). - Optionally logs Intel RAPL package energy counter (energy_uj) if present (Linux). - Optionally logs stage markers via a FIFO, e.g.: echo infer_start > /tmp/flinch_marks """ import argparse import csv import os import stat import threading import time # --- NVML --- try: import pynvml # pip install nvidia-ml-py3 pynvml.nvmlInit() NVML_OK = True except Exception: NVML_OK = False def t_mono_ns() -> int: # monotonic; good for span measurement return time.perf_counter_ns() def t_wall_ns() -> int: # wall clock; useful for correlating with other logs return time.time_ns() def read_rapl_energy_uj(path: str): try: with open(path, "r") as f: return int(f.read().strip()) except Exception: return "" def ensure_fifo(path: str): if os.path.exists(path): st = os.stat(path) if not stat.S_ISFIFO(st.st_mode): raise RuntimeError(f"{path} exists but is not a FIFO") else: os.mkfifo(path) class Tracer: def __init__(self, out_csv, gpu_index=0, interval_ms=10, rapl_path=None, marks_fifo=None): self.out_csv = out_csv self.gpu_index = gpu_index self.interval_s = interval_ms / 1000.0 self.rapl_path = rapl_path self.marks_fifo = marks_fifo self._stop = threading.Event() self._lock = threading.Lock() self._fh = None self._w = None self.handle = None def _open_csv(self): new_file = (not os.path.exists(self.out_csv)) or (os.path.getsize(self.out_csv) == 0) self._fh = open(self.out_csv, "a", newline="") self._w = csv.writer(self._fh) if new_file: self._w.writerow([ "t_mono_ns", "t_wall_ns", "row_type", # SAMPLE or MARK "label", # only for MARK "gpu_power_W", "gpu_util_pct", "gpu_mem_util_pct", "gpu_sm_clock_MHz", "gpu_temp_C", "rapl_energy_uj", ]) self._fh.flush() def _close_csv(self): try: if self._fh: self._fh.flush() self._fh.close() finally: self._fh = None self._w = None def _write(self, row): with self._lock: self._w.writerow(row) self._fh.flush() def start(self): self._open_csv() if NVML_OK: self.handle = pynvml.nvmlDeviceGetHandleByIndex(self.gpu_index) if self.marks_fifo: ensure_fifo(self.marks_fifo) threading.Thread(target=self._mark_loop, daemon=True).start() threading.Thread(target=self._sample_loop, daemon=True).start() def stop(self): self._stop.set() time.sleep(self.interval_s * 2) self._close_csv() def mark(self, label: str): self._write([t_mono_ns(), t_wall_ns(), "MARK", label, "", "", "", "", "", ""]) def _mark_loop(self): # Reopen FIFO after writer disconnects; keep going. while not self._stop.is_set(): try: with open(self.marks_fifo, "r") as f: for line in f: if self._stop.is_set(): break label = line.strip() if label: self.mark(label) except Exception: time.sleep(0.05) def _sample_loop(self): while not self._stop.is_set(): tm = t_mono_ns() tw = t_wall_ns() rapl = read_rapl_energy_uj(self.rapl_path) if self.rapl_path else "" if self.handle is not None: try: p_mw = pynvml.nvmlDeviceGetPowerUsage(self.handle) # milliwatts util = pynvml.nvmlDeviceGetUtilizationRates(self.handle) clk = pynvml.nvmlDeviceGetClockInfo(self.handle, pynvml.NVML_CLOCK_SM) temp = pynvml.nvmlDeviceGetTemperature(self.handle, pynvml.NVML_TEMPERATURE_GPU) row = [ tm, tw, "SAMPLE", "", p_mw / 1000.0, util.gpu, util.memory, clk, temp, rapl, $$ except Exception: row = [tm, tw, "SAMPLE", "", "", "", "", "", "", rapl] else: row = [tm, tw, "SAMPLE", "", "", "", "", "", "", rapl] self._write(row) time.sleep(self.interval_s) def main(): ap = argparse.ArgumentParser() ap.add_argument("--out", required=True) ap.add_argument("--gpu", type=int, default=0) ap.add_argument("--interval-ms", type=int, default=10) ap.add_argument("--seconds", type=float, default=30.0) ap.add_argument("--marks-fifo", default="", help="e.g. /tmp/flinch_marks (optional)") ap.add_argument("--rapl", default="/sys/class/powercap/intel-rapl/intel-rapl:0/energy_uj") ap.add_argument("--no-rapl", action="store_true") args = ap.parse_args() rapl_path = None if args.no_rapl else args.rapl marks_fifo = args.marks_fifo or None tr = Tracer( out_csv=args.out, gpu_index=args.gpu, interval_ms=args.interval_ms, rapl_path=rapl_path, marks_fifo=marks_fifo, ) tr.start() tr.mark("begin") time.sleep(args.seconds) tr.mark("end") tr.stop() if __name__ == "__main__": main() ------------------------------------------------------------ analyze_trace.py ------------------------------------------------------------ Save as: analyze_trace.py #!/usr/bin/env python3 """ analyze_trace.py — compute duration and approximate GPU energy between two MARK labels. Energy integration: trapezoid rule over gpu_power_W samples within the window. Rough, but enough to tell "waiting" vs "compute" most of the time. """ import argparse import csv def parse_float(x): try: return float(x) except Exception: return None def load_rows(path): with open(path, "r", newline="") as f: return list(csv.DictReader(f)) def find_mark(rows, label): for r in rows: if r.get("row_type") == "MARK" and r.get("label") == label: return int(r["t_mono_ns"]) return None def integrate_gpu_energy(rows, t0_ns, t1_ns): samples = [] for r in rows: if r.get("row_type") != "SAMPLE": continue t = int(r["t_mono_ns"]) if t < t0_ns or t > t1_ns: continue p = parse_float(r.get("gpu_power_W", "")) u = parse_float(r.get("gpu_util_pct", "")) if p is None: continue samples.append((t, p, u)) samples.sort(key=lambda x: x[0]) if len(samples) < 2: return None e_j = 0.0 util_sum = 0.0 util_n = 0 for (ta, pa, ua), (tb, pb, ub) in zip(samples[:-1], samples[1:]): dt = (tb - ta) / 1e9 e_j += 0.5 * (pa + pb) * dt if ua is not None: util_sum += ua util_n += 1 avg_util = (util_sum / util_n) if util_n else None return e_j, avg_util, len(samples) def main(): ap = argparse.ArgumentParser() ap.add_argument("--csv", required=True) ap.add_argument("--start", required=True) ap.add_argument("--end", required=True) args = ap.parse_args() rows = load_rows(args.csv) t0 = find_mark(rows, args.start) t1 = find_mark(rows, args.end) if t0 is None or t1 is None: raise SystemExit(f"Missing MARK(s). start={t0} end={t1}") if t1 <= t0: raise SystemExit("End MARK must be after start MARK") dur_s = (t1 - t0) / 1e9 print(f"duration_s={dur_s:.6f}") out = integrate_gpu_energy(rows, t0, t1) if out is None: print("gpu_energy_J≈(n/a) # not enough gpu_power_W samples in window") return e_j, avg_util, n = out print(f"gpu_energy_J≈{e_j:.3f} # trapezoid over {n} samples") if avg_util is not None: print(f"avg_gpu_util_pct≈{avg_util:.2f}") if __name__ == "__main__": main() ------------------------------------------------------------ Usage (quick) ------------------------------------------------------------ 1) Install NVML Python bindings: pip install nvidia-ml-py3 2) Run tracer: python nvml_trace.py --out trace.csv --gpu 0 --interval-ms 10 --seconds 60 --marks-fifo /tmp/flinch_marks 3) From your client/server/sidecar (anything), drop stage markers: echo infer_start > /tmp/flinch_marks echo first_token > /tmp/flinch_marks 4) Analyze: python analyze_trace.py --csv trace.csv --start infer_start --end first_token Notes: - If power/util drops during the pause window → probably waiting/queueing/network/backpressure. - If power/util stays high → probably compute (extra pass, safety sidecar, batching pathology, etc). - Hosted APIs won’t give you enqueue/dequeue, so don’t pretend RTT is meaningful.