"""
LangGraph integration for Lambda-Reflexive Synthesis agents.
Provides drop-in replacement for standard ReAct agents with active inference dynamics.
Usage:
from lrs.integration.langgraph import create_lrs_agent
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-sonnet-4-20250514")
tools = [...] # Your tools
agent = create_lrs_agent(llm, tools)
result = agent.invoke({"messages": [{"role": "user", "content": "Task"}]})
"""
from typing import Dict, List, Annotated, Literal, Optional, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, END
from datetime import datetime
import operator
from lrs.core.precision import HierarchicalPrecision, PrecisionParameters
from lrs.core.free_energy import (
calculate_expected_free_energy,
precision_weighted_selection,
PolicyEvaluation,
)
from lrs.core.lens import ToolLens
from lrs.core.registry import ToolRegistry
# ============================================================================
# State Schema
# ============================================================================
# ============================================================================
# Graph Builder
# ============================================================================
[docs]
class LRSGraphBuilder:
"""
Constructs a LangGraph with active inference dynamics.
Architecture:
propose_policies → evaluate_G → select_policy → execute_tool →
update_precision → [decision gate] → {continue | replan | end}
Attributes:
llm: Language model for policy proposal generation
registry: ToolRegistry with available tools
precision_manager: HierarchicalPrecision tracker
preferences: Goal preferences for pragmatic value calculation
"""
[docs]
def __init__(
self,
llm,
registry: ToolRegistry,
preferences: Optional[Dict[str, float]] = None,
precision_config: Optional[Dict[str, PrecisionParameters]] = None,
use_llm_proposals: bool = True,
):
"""
Initialize graph builder.
Args:
llm: Language model (must have .invoke() or .generate() method)
registry: Tool registry with available tools
preferences: Goal preferences for G calculation.
Example: {'data_retrieved': 3.0, 'error': -5.0}
precision_config: Optional custom precision parameters per level
use_llm_proposals: Whether to use LLM for policy generation (for backward compatibility)
"""
self.llm = llm
self.registry = registry
self.use_llm_proposals = use_llm_proposals
self.preferences = preferences or {"success": 2.0, "error": -5.0, "execution_time": -0.1}
# Initialize hierarchical precision
if precision_config:
self.hp = HierarchicalPrecision(levels=precision_config)
else:
self.hp = HierarchicalPrecision()
[docs]
def build(self) -> StateGraph:
"""
Construct the complete LRS graph.
Returns:
Compiled StateGraph ready for execution
"""
workflow = StateGraph(LRSState)
# Add nodes
workflow.add_node("initialize", self._initialize_state)
workflow.add_node("generate_policies", self._generate_policies)
workflow.add_node("evaluate_G", self._evaluate_free_energy)
workflow.add_node("select_policy", self._select_policy)
workflow.add_node("execute_tool", self._execute_tool)
workflow.add_node("update_precision", self._update_precision)
workflow.add_node("check_goal", self._check_goal_satisfaction)
# Define flow
workflow.set_entry_point("initialize")
workflow.add_edge("initialize", "generate_policies")
workflow.add_edge("generate_policies", "evaluate_G")
workflow.add_edge("evaluate_G", "select_policy")
workflow.add_edge("select_policy", "execute_tool")
workflow.add_edge("execute_tool", "update_precision")
workflow.add_edge("update_precision", "check_goal")
# Conditional branching based on precision and goal state
workflow.add_conditional_edges(
"check_goal",
self._decision_gate,
{
"success": END,
"continue": "execute_tool",
"replan": "generate_policies",
"fail": END,
},
)
return workflow.compile()
# ========================================================================
# Node Implementations
# ========================================================================
def _initialize_state(self, state: LRSState) -> LRSState:
"""
Initialize agent state from user message.
Extracts goal, sets initial precision, prepares belief state.
"""
# Extract goal from messages
if state.get("messages"):
latest_message = state["messages"][-1]
goal = latest_message.get("content", "No goal specified")
else:
goal = "No goal specified"
# Initialize state
state["goal"] = goal
state["precision"] = self.hp.get_all()
state["precision_history"] = [self.hp.get_all()]
state["current_hbn_level"] = "abstract"
state["adaptation_count"] = 0
state["adaptation_events"] = []
state["tool_history"] = []
state["current_policy_index"] = 0
state["belief_state"] = {"goal": goal, "goal_satisfied": False}
state["preferences"] = self.preferences
return state
def _generate_policies(self, state: LRSState) -> LRSState:
"""
Generate optimized candidate policies using intelligent generation.
"""
state["current_hbn_level"] = "planning"
max_depth = 2 if state["precision"]["planning"] > 0.6 else 3
candidates = self._generate_intelligent_candidates(max_depth=max_depth, state=state)
state["candidate_policies"] = candidates
return state
def _generate_policy_candidates(self, max_depth: int) -> List[List[ToolLens]]:
"""
Generate all valid tool sequences up to max_depth.
TODO: Replace with LLM-guided generation for production.
"""
policies = []
def build_tree(current: List[ToolLens], depth: int):
if depth == 0:
if current:
policies.append(current)
return
for tool in self.registry.tools.values():
# Avoid immediate repetition
if not current or tool != current[-1]:
build_tree(current + [tool], depth - 1)
build_tree([], max_depth)
return policies
def _evaluate_free_energy(self, state: LRSState) -> LRSState:
"""
Calculate Expected Free Energy for all candidate policies.
Core active inference calculation: G = Epistemic - Pragmatic
"""
evaluations = []
for policy in state["candidate_policies"]:
eval_result = calculate_expected_free_energy(
policy=policy, registry=self.registry, preferences=state["preferences"]
)
evaluations.append(
PolicyEvaluation(
epistemic_value=0.0,
pragmatic_value=0.0,
total_G=eval_result,
expected_success_prob=0.5,
components={"policy": [t.name for t in policy]},
)
)
state["policy_evaluations"] = evaluations
return state
def _select_policy(self, state: LRSState) -> LRSState:
"""
Select policy via precision-weighted softmax over G values.
High precision → exploit (choose lowest G)
Low precision → explore (flatten distribution)
"""
selected_idx = precision_weighted_selection(
evaluations=state["policy_evaluations"], precision=state["precision"]["planning"]
)
selected_policy = state["candidate_policies"][selected_idx]
state["selected_policy"] = selected_policy
state["current_policy_index"] = 0 # Reset for execution
return state
def _execute_tool(self, state: LRSState) -> LRSState:
"""
Execute next tool in selected policy.
Updates belief state and records prediction error.
"""
state["current_hbn_level"] = "execution"
if not state.get("selected_policy"):
return state
policy = state["selected_policy"]
idx = state["current_policy_index"]
if idx >= len(policy):
# Policy exhausted
return state
# Execute tool
tool = policy[idx]
result = tool.get(state["belief_state"])
# Record execution
execution_record = {
"timestamp": datetime.now().isoformat(),
"tool": tool.name,
"success": result.success,
"prediction_error": result.prediction_error,
"error_message": result.error,
}
state["tool_history"].append(execution_record)
# Update belief state
if result.success:
state["belief_state"] = tool.set(state["belief_state"], result.value)
# Advance policy index
state["current_policy_index"] = idx + 1
return state
def _update_precision(self, state: LRSState) -> LRSState:
"""
Update hierarchical precision based on prediction error.
Implements Bayesian belief revision via Beta distribution updates.
"""
if not state["tool_history"]:
return state
latest_execution = state["tool_history"][-1]
prediction_error = latest_execution["prediction_error"]
# Update hierarchical precision
updated_precisions = self.hp.update(level="execution", prediction_error=prediction_error)
# Sync to state
state["precision"].update(updated_precisions)
state["precision_history"].append(self.hp.get_all())
# Record adaptation events
if prediction_error > 0.7:
state["adaptation_count"] += 1
state["adaptation_events"].append(
{
"timestamp": datetime.now().isoformat(),
"tool": latest_execution["tool"],
"error": prediction_error,
"precision_before": state["precision_history"][-2]["planning"]
if len(state["precision_history"]) > 1
else None,
"precision_after": state["precision"]["planning"],
}
)
return state
def _check_goal_satisfaction(self, state: LRSState) -> LRSState:
"""
Check if goal has been satisfied.
In production, this would use more sophisticated goal checking.
"""
# Simple heuristic: goal satisfied if no errors in last 2 executions
if len(state["tool_history"]) >= 2:
recent_success = all(exec["success"] for exec in state["tool_history"][-2:])
state["belief_state"]["goal_satisfied"] = recent_success
return state
# ========================================================================
# Decision Gate
# ========================================================================
def _decision_gate(self, state: LRSState) -> str:
"""
Determine next action based on goal satisfaction and precision.
Returns:
"success": Goal achieved, end execution
"continue": Continue current policy
"replan": Precision dropped, generate new policies
"fail": Systemic failure, cannot proceed
"""
# Check for goal satisfaction
if state["belief_state"].get("goal_satisfied", False):
return "success"
# Check for systemic failure (all levels have very low precision)
if all(p < 0.2 for p in state["precision"].values()):
return "fail"
# Check if current policy is exhausted
if state["current_policy_index"] >= len(state.get("selected_policy", [])):
# Policy done but goal not satisfied → replan
return "replan"
# Check if precision collapsed (adaptation needed)
if state["precision"]["planning"] < 0.4:
return "replan"
# Continue with current policy
return "continue"
# ============================================================================
# Factory Function (Public API)
# ============================================================================
[docs]
def create_lrs_agent(
llm, tools: List[ToolLens], preferences: Optional[Dict[str, float]] = None, **kwargs
) -> StateGraph:
"""
Create an LRS-powered agent as drop-in replacement for create_react_agent.
Args:
llm: Language model (Anthropic, OpenAI, etc.)
tools: List of ToolLens objects or LangChain tools
preferences: Goal preferences for pragmatic value calculation
**kwargs: Additional configuration (precision_threshold, etc.)
Returns:
Compiled StateGraph with active inference dynamics
Examples:
>>> from lrs import create_lrs_agent
>>> from langchain_anthropic import ChatAnthropic
>>>
>>> llm = ChatAnthropic(model="claude-sonnet-4-20250514")
>>> tools = [ShellTool(), PythonREPLTool()]
>>>
>>> agent = create_lrs_agent(llm, tools, preferences={'success': 5.0})
>>>
>>> result = agent.invoke({
... "messages": [{"role": "user", "content": "List files in /tmp"}]
... })
"""
# Create tool registry
registry = ToolRegistry()
for tool in tools:
registry.register(tool)
# Build graph
builder = LRSGraphBuilder(llm=llm, registry=registry, preferences=preferences)
return builder.build()
# ============================================================================
# Monitoring Integration
# ============================================================================
[docs]
def create_monitored_lrs_agent(
llm, tools: List[ToolLens], tracker: "LRSStateTracker", **kwargs
) -> StateGraph:
"""
Create LRS agent with integrated monitoring.
Automatically streams state updates to dashboard tracker.
Args:
llm: Language model
tools: Tool lenses
tracker: LRSStateTracker instance for monitoring
**kwargs: Additional configuration
Returns:
Compiled StateGraph with monitoring hooks
"""
agent = create_lrs_agent(llm, tools, **kwargs)
# Wrap invoke to capture state
original_invoke = agent.invoke
def monitored_invoke(input_state, **invoke_kwargs):
result = original_invoke(input_state, **invoke_kwargs)
# Update tracker with final state
tracker.update(result)
return result
agent.invoke = monitored_invoke
return agent