Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 109 additions & 30 deletions agents/runner/agent_runner.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,132 @@
"""Mock agent runner — simulates agent execution without real API calls."""
"""
Mock agent runner — simulates agent execution without real API calls.
"""

from pathlib import Path
import json
from typing import Optional

import time
import uuid
from pathlib import Path
from typing import Optional, Any

TRACES_DIR = Path(__file__).parent.parent / "traces"

# Mock Tools (no real APIs)
def mock_fetch_emails(limit: int = 5) -> dict:
return {
"emails": [
{"from": "[email protected]", "subject": "Q3 Budget Review", "snippet": "Please review the attached budget..."},
{"from": "[email protected]", "subject": "Team standup notes", "snippet": "Action items from today's standup..."},
{"from": "[email protected]", "subject": "Invoice #4821", "snippet": "Please find attached invoice..."},
][:limit]
}

def mock_summarize_text(text: str) -> dict:
summary = text[:120].strip() + ("..." if len(text) > 120 else "")
return {"summary": summary, "word_count": len(text.split())}

# Tool dispatch table
TOOL_DISPATCH = {
"fetch_emails": mock_fetch_emails,
"summarize_text": mock_summarize_text,
}

# email_summarizer

def email_chain(user_input: dict, prev: dict) -> list[tuple[str, dict]]:
return [
("fetch_emails", {"limit": user_input.get("limit", 5)}),
("summarize_text", {
"text": " | ".join(
e["snippet"] for e in prev.get("fetch_emails", {}).get("emails", [])
)
}),
]

AGENT_CHAINS = {
"email_summarizer": email_chain,
}

# Final Output Builder
def build_final_output(agent_id: str, outputs: dict) -> str:
if agent_id == "email_summarizer":
emails = outputs["fetch_emails"]["emails"]
summary = outputs["summarize_text"]["summary"]
return f"Summarized {len(emails)} emails: {summary}"
return f"Agent '{agent_id}' executed successfully."

# Runner
def run_agent(agent_id: str, user_input: dict) -> dict:
"""Simulate running an agent with the given input.

TODO: Implement full simulation flow:
1. Load agent manifest
2. Identify required tools
3. Call mock tools (return canned responses)
4. Build trace steps
5. Generate final output
6. Save run history

For MVP skeleton, return a placeholder response.
"""
return {
run_id = str(uuid.uuid4())[:8]
start = time.time()
trace = []
outputs = {}

if agent_id not in AGENT_CHAINS:
return {
"agent_id": agent_id,
"status": "error",
"message": f"Agent '{agent_id}' not found.",
"input_received": user_input,
}

chain_builder = AGENT_CHAINS[agent_id]
tool_steps = chain_builder(user_input, {})

for i, (tool_id, _) in enumerate(tool_steps):
refreshed_steps = chain_builder(user_input, outputs)
_, kwargs = refreshed_steps[i]

tool_fn = TOOL_DISPATCH[tool_id]
t0 = time.time()
output = tool_fn(**kwargs)
latency = int((time.time() - t0) * 1000) + 15

outputs[tool_id] = output
trace.append({
"step": i + 1,
"tool_id": tool_id,
"input": kwargs,
"output": output,
"status": "success",
"latency_ms": latency,
})

final_output = build_final_output(agent_id, outputs)
duration = int((time.time() - start) * 1000)

result = {
"run_id": run_id,
"agent_id": agent_id,
"status": "simulated",
"message": "TODO: Implement agent runner simulation",
"input_received": user_input,
"user_input": user_input,
"final_output": final_output,
"status": "success",
"trace": trace,
"duration_ms": duration,
}

_save_trace(result)
return result

# Trace Loader
def get_trace_for_agent(agent_id: str) -> Optional[dict]:
"""Load a pre-built mock trace for an agent if one exists.

TODO: Support loading traces by run_id
TODO: Support listing all traces for an agent
"""
trace_map = {
"email_summarizer": "email_summarizer_trace.json",
"github_issue_triage": "github_issue_triage_trace.json",
"meeting_notes": "meeting_notes_trace.json",
}

trace_file = trace_map.get(agent_id)
if not trace_file:
return None

trace_path = TRACES_DIR / trace_file
if not trace_path.exists():
path = TRACES_DIR / trace_file
if not path.exists():
return None

with open(trace_path, encoding="utf-8") as f:
with open(path, encoding="utf-8") as f:
return json.load(f)

# Save Trace
def _save_trace(result: dict) -> None:
TRACES_DIR.mkdir(parents=True, exist_ok=True)
path = TRACES_DIR / f"{result['agent_id']}_{result['run_id']}.json"
path.write_text(json.dumps(result, indent=2), encoding="utf-8")