Source code for lrs.core.lens
"""
ToolLens: Categorical abstraction for tools.
A lens is a bidirectional morphism:
- get: Execute the tool (forward)
- set: Update belief state (backward)
Lenses compose via the >> operator, creating pipelines with automatic
error propagation.
"""
from typing import Any, Dict, Optional
from dataclasses import dataclass
from abc import ABC, abstractmethod
[docs]
@dataclass
class ExecutionResult:
"""
Result of executing a tool.
Attributes:
success: Whether execution succeeded
value: Return value (None if failed)
error: Error message (None if succeeded)
prediction_error: How surprising this outcome was [0, 1]
Examples:
>>> # Successful execution
>>> result = ExecutionResult(
... success=True,
... value="Data fetched",
... error=None,
... prediction_error=0.1 # Expected success
... )
>>>
>>> # Failed execution
>>> result = ExecutionResult(
... success=False,
... value=None,
... error="API timeout",
... prediction_error=0.9 # Unexpected failure
... )
"""
success: bool
value: Optional[Any]
error: Optional[str]
prediction_error: float
def __post_init__(self):
"""Validate prediction error is in [0, 1]"""
if not 0.0 <= self.prediction_error <= 1.0:
raise ValueError(f"prediction_error must be in [0, 1], got {self.prediction_error}")
[docs]
class ToolLens(ABC):
"""
Abstract base class for tools as lenses.
A lens has two operations:
1. get(state) → ExecutionResult: Execute the tool
2. set(state, observation) → state: Update belief state
Lenses compose via >> operator:
lens_a >> lens_b >> lens_c
This creates a pipeline where:
- Data flows forward through get operations
- Belief updates flow backward through set operations
- Errors propagate automatically
Attributes:
name: Tool identifier
input_schema: JSON schema for inputs
output_schema: JSON schema for outputs
call_count: Number of times get() has been called
failure_count: Number of times get() has failed
Examples:
>>> class FetchTool(ToolLens):
... def get(self, state):
... data = fetch(state['url'])
... return ExecutionResult(True, data, None, 0.1)
...
... def set(self, state, observation):
... return {**state, 'data': observation}
>>>
>>> class ParseTool(ToolLens):
... def get(self, state):
... parsed = json.loads(state['data'])
... return ExecutionResult(True, parsed, None, 0.05)
...
... def set(self, state, observation):
... return {**state, 'parsed': observation}
>>>
>>> # Compose
>>> pipeline = FetchTool() >> ParseTool()
>>> result = pipeline.get({'url': 'api.com/data'})
"""
[docs]
def __init__(
self,
name: str,
input_schema: Dict[str, Any],
output_schema: Dict[str, Any]
):
"""
Initialize tool lens.
Args:
name: Unique tool identifier
input_schema: JSON schema for expected inputs
output_schema: JSON schema for expected outputs
"""
self.name = name
self.input_schema = input_schema
self.output_schema = output_schema
self.call_count = 0
self.failure_count = 0
[docs]
@abstractmethod
def get(self, state: Dict[str, Any]) -> ExecutionResult:
"""
Execute the tool (forward operation).
Args:
state: Current agent state
Returns:
ExecutionResult with value and prediction error
Note:
Implementations should update call_count and failure_count
"""
pass
[docs]
@abstractmethod
def set(self, state: Dict[str, Any], observation: Any) -> Dict[str, Any]:
"""
Update belief state with observation (backward operation).
Args:
state: Current state
observation: Tool output
Returns:
Updated state
"""
pass
def __rshift__(self, other: 'ToolLens') -> 'ComposedLens':
"""
Compose this lens with another: self >> other
Args:
other: Lens to compose with
Returns:
ComposedLens representing the pipeline
Examples:
>>> pipeline = fetch_tool >> parse_tool >> validate_tool
"""
return ComposedLens(self, other)
@property
def success_rate(self) -> float:
"""Calculate success rate from history"""
if self.call_count == 0:
return 0.5 # Neutral prior
return 1.0 - (self.failure_count / self.call_count)
[docs]
class ComposedLens(ToolLens):
"""
Composition of two lenses.
Created via >> operator. Handles:
- Forward data flow (left.get then right.get)
- Backward belief update (right.set then left.set)
- Error short-circuiting (stop on first failure)
Attributes:
left: First lens in composition
right: Second lens in composition
"""
[docs]
def __init__(self, left: ToolLens, right: ToolLens):
"""
Create composed lens.
Args:
left: First lens
right: Second lens
"""
super().__init__(
name=f"{left.name}>>{right.name}",
input_schema=left.input_schema,
output_schema=right.output_schema
)
self.left = left
self.right = right
[docs]
def get(self, state: Dict[str, Any]) -> ExecutionResult:
"""
Execute composed lens (left then right).
If left fails, short-circuit and return left's error.
Otherwise, execute right with left's output.
Args:
state: Input state
Returns:
ExecutionResult from final lens (or first failure)
"""
self.call_count += 1
# Execute left lens
left_result = self.left.get(state)
if not left_result.success:
# Short-circuit on failure
self.failure_count += 1
return left_result
# Update state with left's output
intermediate_state = self.left.set(state, left_result.value)
# Execute right lens
right_result = self.right.get(intermediate_state)
if not right_result.success:
self.failure_count += 1
return right_result
[docs]
def set(self, state: Dict[str, Any], observation: Any) -> Dict[str, Any]:
"""
Update state (right then left, backward flow).
Args:
state: Current state
observation: Final observation
Returns:
Fully updated state
"""
# Update from right (final observation)
state = self.right.set(state, observation)
# Update from left (intermediate state preserved)
state = self.left.set(state, state)
return state