You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
129 lines
4.9 KiB
129 lines
4.9 KiB
from collections import deque
|
|
import time
|
|
|
|
|
|
class Telemetry:
|
|
"""Handles timing and performance monitoring for different code sections."""
|
|
|
|
def __init__(self, window_size: int = 100):
|
|
self.window_size = window_size
|
|
self._current_timers = {}
|
|
self._last_results = {}
|
|
self._history = {} # Stores deques for history
|
|
|
|
def start_timer(self, name: str):
|
|
"""Starts a timer for a given operation."""
|
|
self._current_timers[name] = time.perf_counter()
|
|
|
|
def stop_timer(self, name: str) -> float:
|
|
"""Stops a timer and records the duration."""
|
|
if name not in self._current_timers:
|
|
print(f"Warning: Telemetry Timer '{name}' stopped without being started.")
|
|
return 0.0
|
|
|
|
start_time = self._current_timers.pop(name)
|
|
duration = time.perf_counter() - start_time
|
|
self.record_value(name, duration)
|
|
return duration
|
|
|
|
class Timer:
|
|
"""Context manager for timing operations."""
|
|
|
|
def __init__(self, telemetry, name: str):
|
|
self.telemetry = telemetry
|
|
self.name = name
|
|
|
|
def __enter__(self):
|
|
self.telemetry.start_timer(self.name)
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.telemetry.stop_timer(self.name)
|
|
return False # Don't suppress exceptions
|
|
|
|
def timer(self, name: str):
|
|
"""Returns a context manager for timing an operation.
|
|
|
|
Example usage:
|
|
with telemetry.timer("operation_name"):
|
|
# Code to time goes here
|
|
"""
|
|
return self.Timer(self, name)
|
|
|
|
def record_value(self, name: str, value: float):
|
|
"""Records a pre-calculated value (e.g., duration, count)."""
|
|
self._last_results[name] = value
|
|
if name not in self._history:
|
|
self._history[name] = deque(maxlen=self.window_size)
|
|
self._history[name].append(value)
|
|
|
|
def get_last_timing(self) -> dict[str, float]:
|
|
"""Returns the timing results from the most recent cycle and clears internal state."""
|
|
results = self._last_results.copy()
|
|
# Don't clear _last_results here, let the logger decide when it's done
|
|
return results
|
|
|
|
def clear_last_timing(self):
|
|
"""Clears the timing results for the next iteration."""
|
|
self._last_results.clear()
|
|
|
|
def get_average(self, name: str) -> float | None:
|
|
"""Calculates the moving average for a recorded metric."""
|
|
if name not in self._history or not self._history[name]:
|
|
return None
|
|
return sum(self._history[name]) / len(self._history[name])
|
|
|
|
def get_history(self, name: str) -> deque[float] | None:
|
|
"""Returns the historical data for a metric."""
|
|
return self._history.get(name)
|
|
|
|
def log_timing_info(
|
|
self,
|
|
context: str = "",
|
|
threshold: float = 0.001,
|
|
log_averages: bool = True,
|
|
):
|
|
"""Logs timing information based on thresholds and averages."""
|
|
current_iteration_data = self.get_last_timing() # Get the latest data
|
|
significant_timings = {k: v for k, v in current_iteration_data.items() if v > threshold}
|
|
|
|
should_log = bool(significant_timings) # Log if any timing exceeds threshold
|
|
|
|
# Check averages condition
|
|
avg_data = {}
|
|
if log_averages:
|
|
for name in self._history:
|
|
avg = self.get_average(name)
|
|
if avg is not None:
|
|
avg_data[f"{name}_avg"] = avg
|
|
if avg_data: # Log if average data exists
|
|
should_log = True
|
|
|
|
# If nothing to log, return early
|
|
if not should_log:
|
|
self.clear_last_timing() # Clear data since it wasn't logged
|
|
return
|
|
|
|
log_lines = [f"\n{context} Timing breakdown:" if context else "\nTiming breakdown:"]
|
|
|
|
# Log current iteration significant timings
|
|
if significant_timings:
|
|
log_lines.append(f" Current Iteration (> {threshold*1000:.1f}ms):")
|
|
for name, duration in sorted(significant_timings.items()):
|
|
log_lines.append(f" {name}: {duration * 1000:.2f}ms")
|
|
elif current_iteration_data: # Log total time even if below threshold
|
|
total_key = next((k for k in current_iteration_data if "total" in k), None)
|
|
if total_key:
|
|
log_lines.append(
|
|
f" Current Iteration Total: {current_iteration_data[total_key] * 1000:.2f}ms"
|
|
)
|
|
|
|
# Log averages if requested and available
|
|
if log_averages and avg_data:
|
|
log_lines.append(f" Moving Averages (last {self.window_size} iters):")
|
|
for name, avg in sorted(avg_data.items()):
|
|
log_lines.append(f" {name}: {avg * 1000:.2f}ms")
|
|
|
|
# Print the collected log lines
|
|
print("\n".join(log_lines))
|
|
self.clear_last_timing() # Clear data now that it has been logged/processed
|