Source code for lrs.monitoring.tracker

"""State tracking for LRS agents.

Maintains a rolling history of agent states for analysis and visualization."""

from typing import List, Dict, Any, Optional
from collections import deque
from dataclasses import dataclass
from datetime import datetime
import json


[docs] @dataclass class StateSnapshot: """ Snapshot of agent state at a specific point in time. Attributes: timestamp: When this snapshot was taken precision: Precision values at all levels prediction_errors: Recent prediction errors tool_history: Tool execution history adaptation_count: Number of adaptations so far belief_state: Current beliefs """ timestamp: datetime precision: Dict[str, float] prediction_errors: List[float] tool_history: List[Dict[str, Any]] adaptation_count: int belief_state: Dict[str, Any]
[docs] class LRSStateTracker: """ Tracks agent state history for monitoring and analysis. Maintains a rolling window of state snapshots with configurable size. Used by the dashboard and for post-execution analysis. Examples: >>> tracker = LRSStateTracker(max_history=100) >>> >>> # Track state during execution >>> for step in agent_execution: ... tracker.track_state(step) >>> >>> # Analyze precision trajectory >>> precision_history = tracker.get_precision_trajectory('execution') >>> print(f"Average precision: {sum(precision_history) / len(precision_history)}") >>> >>> # Get adaptation events >>> adaptations = tracker.get_adaptation_events() >>> print(f"Total adaptations: {len(adaptations)}") """
[docs] def __init__(self, max_history: int = 100): """ Initialize state tracker. Args: max_history: Maximum number of states to keep in history """ self.max_history = max_history self.history: deque = deque(maxlen=max_history) self.adaptation_events: List[Dict[str, Any]] = []
[docs] def track_state(self, state: Dict[str, Any]): """ Track a new state snapshot. Args: state: Current agent state (LRSState dict) Examples: >>> tracker.track_state({ ... 'precision': {'execution': 0.7, 'planning': 0.6}, ... 'tool_history': [...], ... 'belief_state': {...} ... }) """ # Extract relevant information precision = state.get('precision', {}) tool_history = state.get('tool_history', []) adaptation_count = state.get('adaptation_count', 0) belief_state = state.get('belief_state', {}) # Extract recent prediction errors recent_errors = [] if tool_history: recent_errors = [ entry.get('prediction_error', 0.0) for entry in tool_history[-10:] # Last 10 ] # Create snapshot snapshot = StateSnapshot( timestamp=datetime.now(), precision=precision.copy(), prediction_errors=recent_errors, tool_history=tool_history.copy(), adaptation_count=adaptation_count, belief_state=belief_state.copy() ) # Add to history self.history.append(snapshot) # Check for adaptation events if len(self.history) > 1: prev_adaptations = self.history[-2].adaptation_count curr_adaptations = adaptation_count if curr_adaptations > prev_adaptations: # New adaptation occurred self._record_adaptation_event(state)
def _record_adaptation_event(self, state: Dict[str, Any]): """Record an adaptation event with context""" tool_history = state.get('tool_history', []) precision = state.get('precision', {}) # Find the tool that triggered adaptation trigger_tool = None trigger_error = None if tool_history: latest = tool_history[-1] if latest.get('prediction_error', 0) > 0.7: trigger_tool = latest.get('tool') trigger_error = latest.get('prediction_error') event = { 'timestamp': datetime.now().isoformat(), 'trigger_tool': trigger_tool, 'trigger_error': trigger_error, 'precision_before': self.history[-2].precision if len(self.history) > 1 else {}, 'precision_after': precision, 'adaptation_number': state.get('adaptation_count', 0) } self.adaptation_events.append(event)
[docs] def get_precision_trajectory(self, level: str = 'execution') -> List[float]: """ Get precision trajectory for a specific level. Args: level: Precision level ('abstract', 'planning', or 'execution') Returns: List of precision values over time Examples: >>> trajectory = tracker.get_precision_trajectory('execution') >>> import matplotlib.pyplot as plt >>> plt.plot(trajectory) >>> plt.show() """ return [ snapshot.precision.get(level, 0.5) for snapshot in self.history ]
[docs] def get_all_precision_trajectories(self) -> Dict[str, List[float]]: """ Get precision trajectories for all levels. Returns: Dict mapping level names to precision trajectories """ return { 'abstract': self.get_precision_trajectory('abstract'), 'planning': self.get_precision_trajectory('planning'), 'execution': self.get_precision_trajectory('execution') }
[docs] def get_prediction_errors(self) -> List[float]: """ Get all prediction errors from history. Returns: Flat list of all prediction errors """ errors = [] for snapshot in self.history: errors.extend(snapshot.prediction_errors) return errors
[docs] def get_adaptation_events(self) -> List[Dict[str, Any]]: """ Get all recorded adaptation events. Returns: List of adaptation event dicts """ return self.adaptation_events.copy()
[docs] def get_tool_usage_stats(self) -> Dict[str, Dict[str, Any]]: """ Calculate tool usage statistics. Returns: Dict mapping tool names to stats (calls, successes, avg_error) Examples: >>> stats = tracker.get_tool_usage_stats() >>> for tool, data in stats.items(): ... print(f"{tool}: {data['success_rate']:.1%} success rate") """ tool_stats = {} for snapshot in self.history: for entry in snapshot.tool_history: tool_name = entry.get('tool') if not tool_name: continue if tool_name not in tool_stats: tool_stats[tool_name] = { 'calls': 0, 'successes': 0, 'failures': 0, 'total_error': 0.0, 'errors': [] } stats = tool_stats[tool_name] stats['calls'] += 1 if entry.get('success'): stats['successes'] += 1 else: stats['failures'] += 1 error = entry.get('prediction_error', 0.0) stats['total_error'] += error stats['errors'].append(error) # Calculate derived stats for tool_name, stats in tool_stats.items(): if stats['calls'] > 0: stats['success_rate'] = stats['successes'] / stats['calls'] stats['avg_error'] = stats['total_error'] / stats['calls'] else: stats['success_rate'] = 0.0 stats['avg_error'] = 0.0 return tool_stats
[docs] def get_current_state(self) -> Optional[StateSnapshot]: """ Get most recent state snapshot. Returns: Latest StateSnapshot or None if no history """ if self.history: return self.history[-1] return None
[docs] def export_history(self, filepath: str): """ Export history to JSON file. Args: filepath: Output file path Examples: >>> tracker.export_history('agent_history.json') """ data = { 'snapshots': [ { 'timestamp': snapshot.timestamp.isoformat(), 'precision': snapshot.precision, 'prediction_errors': snapshot.prediction_errors, 'tool_history': snapshot.tool_history, 'adaptation_count': snapshot.adaptation_count, 'belief_state': snapshot.belief_state } for snapshot in self.history ], 'adaptation_events': self.adaptation_events } with open(filepath, 'w') as f: json.dump(data, f, indent=2)
[docs] def clear(self): """Clear all tracked history""" self.history.clear() self.adaptation_events.clear()
[docs] def get_summary(self) -> Dict[str, Any]: """ Get summary statistics of tracked execution. Returns: Dict with summary metrics Examples: >>> summary = tracker.get_summary() >>> print(f"Total steps: {summary['total_steps']}") >>> print(f"Adaptations: {summary['total_adaptations']}") """ if not self.history: return { 'total_steps': 0, 'total_adaptations': 0, 'avg_precision': 0.0, 'final_precision': {} } precision_trajectories = self.get_all_precision_trajectories() # Calculate average precision across all levels all_precisions = [] for trajectory in precision_trajectories.values(): all_precisions.extend(trajectory) avg_precision = sum(all_precisions) / len(all_precisions) if all_precisions else 0.0 return { 'total_steps': len(self.history), 'total_adaptations': len(self.adaptation_events), 'avg_precision': avg_precision, 'final_precision': self.history[-1].precision, 'tool_usage': self.get_tool_usage_stats() }