From 7ca328f1708cb80747e8aacf8fd560114bac9ee9 Mon Sep 17 00:00:00 2001 From: ahmad-ajmal Date: Wed, 20 May 2026 09:18:44 +0100 Subject: [PATCH] fix #259: pass prior tool outputs to actions by $ref instead of inlining --- .gitignore | 1 + agent_core/__init__.py | 4 + agent_core/core/impl/action/__init__.py | 7 + agent_core/core/impl/action/executor.py | 30 ++- agent_core/core/impl/action/manager.py | 78 ++++++- agent_core/core/impl/action/output_store.py | 199 +++++++++++++++++ agent_core/core/impl/action/ref_resolver.py | 225 ++++++++++++++++++++ agent_core/core/impl/action/router.py | 70 ++++++ agent_core/core/prompts/action.py | 72 +++++++ app/agent_base.py | 8 +- app/data/action/run_python.py | 39 +++- app/ui_layer/events/transformer.py | 10 +- 12 files changed, 725 insertions(+), 18 deletions(-) create mode 100644 agent_core/core/impl/action/output_store.py create mode 100644 agent_core/core/impl/action/ref_resolver.py diff --git a/.gitignore b/.gitignore index aea71a6e..19dfb32e 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ chroma_db_memory/ core/chroma_db_actions/ **/logs/ agent_file_system/workspace/ +agent_file_system/action_outputs/ craftbot/external_tools/ craftbot/generated_task_document/ **/__pycache__/ diff --git a/agent_core/__init__.py b/agent_core/__init__.py index b7badbdc..26288676 100644 --- a/agent_core/__init__.py +++ b/agent_core/__init__.py @@ -148,6 +148,8 @@ from agent_core.core.impl.action import ( ActionExecutor, ActionLibrary, + ActionOutputRecord, + ActionOutputStore, ActionRouter, ActionManager, set_gui_execute_hook, @@ -360,6 +362,8 @@ # Implementations "ActionExecutor", "ActionLibrary", + "ActionOutputRecord", + "ActionOutputStore", "ActionRouter", "ActionManager", "set_gui_execute_hook", diff --git a/agent_core/core/impl/action/__init__.py b/agent_core/core/impl/action/__init__.py index a29b0a0c..3cf2ef70 100644 --- a/agent_core/core/impl/action/__init__.py +++ b/agent_core/core/impl/action/__init__.py @@ -14,6 +14,10 @@ set_gui_execute_hook, ) from agent_core.core.impl.action.library import ActionLibrary +from agent_core.core.impl.action.output_store import ( + ActionOutputRecord, + ActionOutputStore, +) from agent_core.core.impl.action.router import ActionRouter, _is_visible_in_mode from agent_core.core.impl.action.manager import ( ActionManager, @@ -31,6 +35,9 @@ "set_gui_execute_hook", # Library "ActionLibrary", + # Output store + "ActionOutputRecord", + "ActionOutputStore", # Router "ActionRouter", "_is_visible_in_mode", diff --git a/agent_core/core/impl/action/executor.py b/agent_core/core/impl/action/executor.py index 1498413e..c7dd3f32 100644 --- a/agent_core/core/impl/action/executor.py +++ b/agent_core/core/impl/action/executor.py @@ -413,16 +413,32 @@ def _atomic_action_venv_process( timeout=timeout, ) - return { - "stdout": proc.stdout.strip(), - "stderr": proc.stderr.strip(), - "returncode": proc.returncode, - } + stdout = proc.stdout.strip() + stderr = proc.stderr.strip() + + if proc.returncode != 0: + err = stderr or f"Action exited with code {proc.returncode}" + return {"status": "error", "message": err} + + # The sandbox script prints ``json.dumps(result)`` to stdout, so + # the action's logical output is the inner dict — not the raw + # subprocess wrapper. Parse it here so downstream consumers + # (event stream, ActionOutputStore, $ref navigation) see the + # action's real shape (e.g. ``{"status", "stdout", "stderr"}``) + # rather than ``{"stdout": , ...}``. Mirrors the + # behaviour of ``_atomic_action_internal_subprocess``. + if not stdout: + return {"status": "success", "output": ""} + + try: + return json.loads(stdout) + except json.JSONDecodeError: + return {"status": "success", "output": stdout} except subprocess.TimeoutExpired: - return {"stdout": "", "stderr": "Execution timed out", "returncode": -1} + return {"status": "error", "message": "Execution timed out"} except Exception as e: - return {"stdout": "", "stderr": f"Execution failed: {e}", "returncode": -1} + return {"status": "error", "message": f"Execution failed: {e}"} finally: _restore_worker_stdio(saved_stdout, saved_stderr) diff --git a/agent_core/core/impl/action/manager.py b/agent_core/core/impl/action/manager.py index b038c61c..44fc578b 100644 --- a/agent_core/core/impl/action/manager.py +++ b/agent_core/core/impl/action/manager.py @@ -28,6 +28,11 @@ from agent_core.core.protocols.context import ContextEngineProtocol from agent_core.core.protocols.state import StateManagerProtocol from agent_core.core.impl.action.executor import ActionExecutor +from agent_core.core.impl.action.output_store import ActionOutputStore, make_key +from agent_core.core.impl.action.ref_resolver import ( + render_output_for_event_stream, + resolve_refs, +) from agent_core.utils.logger import logger # ============================================================================ @@ -113,6 +118,7 @@ def __init__( on_action_start: Optional[OnActionStartHook] = None, on_action_end: Optional[OnActionEndHook] = None, get_parent_id: Optional[GetParentIdHook] = None, + action_output_store: Optional[ActionOutputStore] = None, ): """ Build an ActionManager that can execute and track actions. @@ -127,6 +133,11 @@ def __init__( on_action_start: Optional hook called when action starts. on_action_end: Optional hook called when action ends. get_parent_id: Optional hook to resolve parent_id from task context. + action_output_store: Deterministic per-session archive of action + outputs. When supplied, every invocation is persisted, the + LLM's ``$ref`` markers in parameters are resolved against it + before the handler runs, and big outputs collapse to a shape + summary in the event stream. """ self.action_library = action_library self.llm_interface = llm_interface @@ -135,6 +146,7 @@ def __init__( self.context_engine = context_engine self.state_manager = state_manager self.executor = ActionExecutor() + self.action_output_store = action_output_store # Track in-flight actions self._inflight: Dict[str, Dict] = {} @@ -227,6 +239,10 @@ async def execute_action( logger.debug(f"[INPUT DATA] {input_data}") run_id = str(uuid.uuid4()) started_at = datetime.utcnow().isoformat() + # Stable reference key the LLM uses in any future ``$ref`` for this + # invocation. Computed up-front so action_start, action_end, and the + # archive all stamp the same value, removing the LLM's need to guess. + action_key = make_key(action.name, run_id) # Resolve parent_id using hook if available if not parent_id and self._get_parent_id: @@ -257,11 +273,22 @@ async def execute_action( # Log to event stream # Only pass session_id when is_running_task=True (task stream exists) # When no task exists, use global stream by not passing task_id - pretty_input = _to_pretty_json(input_data) + # + # Strip ``_``-prefixed plumbing keys (e.g. ``_session_id``) from what + # we log. They are internal channels used to hand context to the + # action sandbox; surfacing them in the event stream would leak + # internal state into every downstream action-selection prompt. + pretty_input = _to_pretty_json( + {k: v for k, v in input_data.items() if not k.startswith("_")} + ) self._log_event_stream( is_gui_task=is_gui_task, event_type="action_start", - event=f"Running action {action.name} with input: {pretty_input}.", + event=( + f"Running action {action_key} with input: {pretty_input}. " + f"(Reference this run later via " + f'{{"$ref": "{action_key}", "path": "..."}})' + ), display_message=f"Running {action.display_name}", action_name=action.name, # Always pass session_id when present so the event_stream_manager can route @@ -273,6 +300,20 @@ async def execute_action( session_id=session_id, ) + # Resolve ``$ref`` markers the LLM may have placed in parameters. + # The event-stream log above intentionally records the *unresolved* + # input so the agent's history shows the compact references that were + # actually emitted; only the handler-side ``input_data`` carries the + # materialised values. + if self.action_output_store and session_id: + try: + input_data = resolve_refs(input_data, self.action_output_store, session_id) + except Exception as exc: + logger.warning( + f"[ACTION] Failed to resolve $ref markers for {action.name} " + f"(session_id={session_id}): {exc}" + ) + logger.debug(f"Starting execution of action {action.name}...") try: @@ -352,17 +393,46 @@ async def execute_action( # 3. Persist final state # ──────────────────────────────────────────────────────────────── + # Always-on archive: one file per invocation at a deterministic path. + # Big outputs collapse to a shape summary + ``$ref`` instructions in + # the event stream; the full payload lives on disk for the LLM to + # navigate via ``$ref`` on subsequent steps. + archive_record = None + archive_path = None + if self.action_output_store and session_id: + archive_record = self.action_output_store.record( + session_id=session_id, + action_name=action.name, + run_id=run_id, + outputs=outputs if isinstance(outputs, dict) else {"value": outputs}, + started_at=started_at, + ended_at=ended_at, + status=status, + ) + if archive_record is not None: + archive_path = str( + self.action_output_store.record_path(session_id, action.name, run_id) + ) + logger.info(f"Action {action.name} completed with status: {status}.") # Log to event stream # Only pass session_id when is_running_task=True (task stream exists) output_has_error = outputs and outputs.get("status") == "error" display_status = "failed" if (status == "error" or output_has_error) else "completed" - pretty_output = _to_pretty_json(outputs) + pretty_output = render_output_for_event_stream( + outputs, + file_path=archive_path, + record_key=archive_record.key if archive_record else None, + ) self._log_event_stream( is_gui_task=is_gui_task, event_type="action_end", - event=f"Action {action.name} completed with output: {pretty_output}.", + event=( + f"Action {action_key} completed with output: {pretty_output}. " + f"(Reference this output via " + f'{{"$ref": "{action_key}", "path": "..."}})' + ), display_message=f"{action.display_name} → {display_status}", action_name=action.name, # Always pass session_id when present so the event_stream_manager can route diff --git a/agent_core/core/impl/action/output_store.py b/agent_core/core/impl/action/output_store.py new file mode 100644 index 00000000..9b3761bc --- /dev/null +++ b/agent_core/core/impl/action/output_store.py @@ -0,0 +1,199 @@ +# -*- coding: utf-8 -*- +""" +ActionOutputStore — deterministic per-action persistence. + +Every action invocation writes exactly one JSON file at a stable, predictable +path. The store is intentionally minimal: write-only ``record(...)`` for the +manager, ``load(...)`` for the ref resolver, no caching, no in-process +manifest, no compaction. Determinism is the point. + +Layout: + agent_file_system/action_outputs/ + {session_id}/ + {action_name}__{short_run_id}.json + +Each file: + { + "key": "#", + "session_id": "...", + "action_name": "...", + "run_id": "...", + "short_run_id": "<6 hex>", + "started_at": "...", + "ended_at": "...", + "status": "success" | "error" | ..., + "outputs": + } + +Reads happen via ``load(session_id, key)`` — the manager's ref resolver uses +this when an action parameter contains ``{"$ref": "", "path": "..."}``. +""" + +from __future__ import annotations + +import json +import re +import uuid +from dataclasses import asdict, dataclass +from pathlib import Path +from typing import Any, Dict, Optional + +from agent_core.utils.logger import logger + + +_FILENAME_SAFE = re.compile(r"[^A-Za-z0-9._-]") +SHORT_RUN_ID_LEN = 6 + + +def _safe_filename(name: str) -> str: + return _FILENAME_SAFE.sub("_", name)[:120] or "_" + + +def make_short_run_id(run_id: str) -> str: + """Stable 6-hex prefix of a UUID-style run id; used in keys and filenames.""" + cleaned = (run_id or "").replace("-", "") + if cleaned: + return cleaned[:SHORT_RUN_ID_LEN] + return uuid.uuid4().hex[:SHORT_RUN_ID_LEN] + + +def make_key(action_name: str, run_id: str) -> str: + """Compose the stable reference key used in ``$ref`` and the manifest.""" + return f"{action_name}#{make_short_run_id(run_id)}" + + +@dataclass(frozen=True) +class ActionOutputRecord: + """Immutable on-disk record of a single action invocation.""" + + key: str + session_id: str + action_name: str + run_id: str + short_run_id: str + started_at: str + ended_at: str + status: str + outputs: Dict[str, Any] + + +class ActionOutputStore: + """Per-session deterministic archive of action outputs.""" + + def __init__(self, root: Path): + self.root = Path(root) + + # ------------------------------------------------------------------ + # Write + # ------------------------------------------------------------------ + + def record( + self, + *, + session_id: str, + action_name: str, + run_id: str, + outputs: Dict[str, Any], + started_at: str, + ended_at: str, + status: str, + ) -> Optional[ActionOutputRecord]: + """Persist a single action invocation. Returns the record, or ``None``.""" + if not session_id: + return None + + try: + short_run_id = make_short_run_id(run_id) + record = ActionOutputRecord( + key=f"{action_name}#{short_run_id}", + session_id=session_id, + action_name=action_name, + run_id=run_id, + short_run_id=short_run_id, + started_at=started_at, + ended_at=ended_at, + status=status, + outputs=outputs if isinstance(outputs, dict) else {"value": outputs}, + ) + + session_dir = self._session_dir(session_id) + session_dir.mkdir(parents=True, exist_ok=True) + + record_path = session_dir / self._record_filename(action_name, run_id) + record_path.write_text( + json.dumps(asdict(record), indent=2, ensure_ascii=False, default=str), + encoding="utf-8", + ) + return record + except Exception as exc: + logger.warning( + f"[ActionOutputStore] Failed to record {action_name} " + f"(run_id={run_id}, session_id={session_id}): {exc}" + ) + return None + + # ------------------------------------------------------------------ + # Read + # ------------------------------------------------------------------ + + def load(self, session_id: str, key: str) -> Optional[ActionOutputRecord]: + """Load a record by its ``{action_name}#{short_run_id}`` key.""" + if not session_id or not key or "#" not in key: + return None + + action_name, short_run_id = key.rsplit("#", 1) + session_dir = self._session_dir(session_id) + if not session_dir.exists(): + return None + + # Filenames are ``{action_name}__{full_run_id}.json``; the key only has + # the short_run_id, so scan the directory for a matching prefix. + safe_action = _safe_filename(action_name) + prefix = f"{safe_action}__" + match: Optional[Path] = None + for entry in session_dir.iterdir(): + if not entry.name.startswith(prefix) or not entry.name.endswith(".json"): + continue + run_id_part = entry.name[len(prefix):-len(".json")] + if make_short_run_id(run_id_part) == short_run_id: + match = entry + break + if match is None: + return None + + try: + data = json.loads(match.read_text(encoding="utf-8")) + except Exception as exc: + logger.warning(f"[ActionOutputStore] Failed to load {match}: {exc}") + return None + + try: + return ActionOutputRecord( + key=data["key"], + session_id=data.get("session_id", session_id), + action_name=data["action_name"], + run_id=data["run_id"], + short_run_id=data["short_run_id"], + started_at=data["started_at"], + ended_at=data["ended_at"], + status=data["status"], + outputs=data.get("outputs", {}), + ) + except KeyError as exc: + logger.warning(f"[ActionOutputStore] Record {match} missing field: {exc}") + return None + + def record_path(self, session_id: str, action_name: str, run_id: str) -> Path: + """Return the on-disk path a record would be (or was) written to.""" + return self._session_dir(session_id) / self._record_filename(action_name, run_id) + + # ------------------------------------------------------------------ + # Internals + # ------------------------------------------------------------------ + + def _session_dir(self, session_id: str) -> Path: + return self.root / _safe_filename(session_id) + + @staticmethod + def _record_filename(action_name: str, run_id: str) -> str: + return f"{_safe_filename(action_name)}__{run_id}.json" diff --git a/agent_core/core/impl/action/ref_resolver.py b/agent_core/core/impl/action/ref_resolver.py new file mode 100644 index 00000000..923bd1fc --- /dev/null +++ b/agent_core/core/impl/action/ref_resolver.py @@ -0,0 +1,225 @@ +# -*- coding: utf-8 -*- +""" +Reference resolution + shape summarisation for action I/O. + +The LLM emits compact ``{"$ref": "key", "path": "..."}`` markers in action +parameters; the manager replaces them with the actual values from +``ActionOutputStore`` before the action handler runs. This keeps large prior +outputs out of the LLM's response (and therefore out of every subsequent +prompt's event stream). + +The shape summariser is the companion: it renders an action's output as a +purely structural skeleton (types, key names, list lengths) for the event +stream — no content values that the LLM would re-ship on every later prompt. +""" + +from __future__ import annotations + +import json +import re +from typing import Any, Dict, List, Optional + +from agent_core.core.impl.action.output_store import ActionOutputStore +from agent_core.utils.logger import logger + + +# ────────────────────────────────────────────────────────────────────────── +# Reference resolution +# ────────────────────────────────────────────────────────────────────────── + +_PATH_SEGMENT = re.compile(r"([^.\[\]]+)|\[(\d+)\]") + + +def _split_path(path: str) -> List[Any]: + """Tokenise a dotted/indexed path. ``a.b[2].c`` → ``['a', 'b', 2, 'c']``.""" + tokens: List[Any] = [] + for match in _PATH_SEGMENT.finditer(path): + name, index = match.group(1), match.group(2) + if name is not None: + tokens.append(name) + elif index is not None: + tokens.append(int(index)) + return tokens + + +def navigate(value: Any, path: Optional[str]) -> Any: + """Walk a dotted/indexed path through nested dicts and lists.""" + if not path: + return value + current = value + for token in _split_path(path): + if isinstance(token, int): + if not isinstance(current, list) or token >= len(current): + raise KeyError(f"index {token} out of range") + current = current[token] + else: + if not isinstance(current, dict) or token not in current: + raise KeyError(f"key {token!r} not found") + current = current[token] + return current + + +def _is_ref(value: Any) -> bool: + return ( + isinstance(value, dict) + and "$ref" in value + and isinstance(value["$ref"], str) + ) + + +def _resolve_one( + ref: Dict[str, Any], + store: ActionOutputStore, + session_id: str, +) -> Any: + """Resolve a single ``$ref`` marker. Returns the resolved value, or a + structured error placeholder the action handler can surface cleanly.""" + key = ref.get("$ref") + path = ref.get("path") + + record = store.load(session_id, key) if key else None + if record is None: + logger.warning( + f"[ref_resolver] Unable to resolve $ref={key!r} (session={session_id})" + ) + return {"$ref_error": "key not found", "$ref": key, "path": path} + + try: + return navigate(record.outputs, path) + except KeyError as exc: + logger.warning( + f"[ref_resolver] Path {path!r} failed for $ref={key!r}: {exc}" + ) + return {"$ref_error": str(exc), "$ref": key, "path": path} + + +def resolve_refs( + value: Any, + store: ActionOutputStore, + session_id: str, +) -> Any: + """Recursively replace every ``$ref`` marker in ``value``. + + Returns a new structure with refs substituted; the original is untouched. + Non-dict/non-list values pass through. Refs nested inside resolved + payloads are *not* re-resolved — one level of substitution only. + """ + if not session_id or store is None: + return value + if _is_ref(value): + return _resolve_one(value, store, session_id) + if isinstance(value, dict): + return {k: resolve_refs(v, store, session_id) for k, v in value.items()} + if isinstance(value, list): + return [resolve_refs(item, store, session_id) for item in value] + return value + + +# ────────────────────────────────────────────────────────────────────────── +# Shape summary +# ────────────────────────────────────────────────────────────────────────── + +# Caps to keep summaries bounded even for pathological inputs. +_MAX_LINES = 24 +_MAX_KEYS_PER_DICT = 12 +_SCALAR_INLINE_CHARS = 60 # show actual value for short scalars + + +def summarize_shape(value: Any) -> str: + """Render a purely structural summary of ``value``. + + No content from arbitrary string leaves above ``_SCALAR_INLINE_CHARS``, + no list items — only types, key names, and list lengths. Designed so the + same big payload can be summarised the same way every time it appears in + the event stream. + """ + lines: List[str] = [] + + def emit(path: str, summary: str) -> bool: + if len(lines) >= _MAX_LINES: + return False + lines.append(f"{path}: {summary}" if path else summary) + return True + + def walk(path: str, node: Any) -> None: + if len(lines) >= _MAX_LINES: + return + + if isinstance(node, dict): + keys = list(node.keys()) + shown = keys[:_MAX_KEYS_PER_DICT] + extra = len(keys) - len(shown) + header = f"dict ({len(keys)} keys: {', '.join(map(str, shown))}{'…' if extra > 0 else ''})" + emit(path, header) + for k in shown: + walk(_child_path(path, str(k)), node[k]) + return + + if isinstance(node, list): + emit(path, f"list[{len(node)}]") + return + + if isinstance(node, str): + if len(node) <= _SCALAR_INLINE_CHARS: + emit(path, f"str = {node!r}") + else: + emit(path, f"str (len={len(node)})") + return + + if isinstance(node, (int, float, bool)) or node is None: + emit(path, f"{type(node).__name__} = {node!r}") + return + + emit(path, type(node).__name__) + + walk("", value) + if len(lines) >= _MAX_LINES: + lines.append("… (summary truncated)") + return "\n".join(lines) + + +def _child_path(parent: str, name: str) -> str: + return f"{parent}.{name}" if parent else name + + +# ────────────────────────────────────────────────────────────────────────── +# Inline-vs-shape decision +# ────────────────────────────────────────────────────────────────────────── + +# Below this byte budget we keep the full output inline in the event stream; +# above it we emit shape + file path. Small enough that any Discord-size dump +# externalises, large enough that small acks (todo updates, send-message +# receipts) keep their full content where it's actually useful. +EVENT_STREAM_INLINE_BUDGET = 2000 + + +def render_output_for_event_stream( + outputs: Any, + *, + file_path: Optional[str], + record_key: Optional[str], +) -> str: + """Return the string the event stream should show for an action's output. + + Small outputs are inlined verbatim as pretty JSON. Large outputs collapse + to a deterministic shape summary plus the file path where the full + payload lives (so the agent can ``$ref`` into it). + """ + try: + pretty = json.dumps(outputs, indent=2, ensure_ascii=False, default=str) + except (TypeError, ValueError): + pretty = str(outputs) + + if len(pretty) <= EVENT_STREAM_INLINE_BUDGET: + return pretty + + shape = summarize_shape(outputs) + footer_parts = [] + if record_key: + footer_parts.append( + f"Reference: {{\"$ref\": \"{record_key}\", \"path\": \"\"}}" + ) + if file_path: + footer_parts.append(f"Full output: {file_path}") + footer = "\n".join(footer_parts) + return f"[shape only]\n{shape}\n{footer}".rstrip() diff --git a/agent_core/core/impl/action/router.py b/agent_core/core/impl/action/router.py index 1bd5d11a..a6b89001 100644 --- a/agent_core/core/impl/action/router.py +++ b/agent_core/core/impl/action/router.py @@ -750,6 +750,14 @@ def _parse_action_decision(self, raw: str) -> Tuple[Optional[Dict[str, Any]], Op parsed = ast.literal_eval(normalized) except Exception as eval_error: logger.error(f"Unable to parse action decision: {repr(normalized)}") + # Classify truncation-shaped failures so the retry feedback can + # tell the LLM what actually went wrong (output cut off, likely + # because a large value was inlined) instead of asking for a + # "corrected JSON object" that will be truncated the same way. + if self._looks_truncated(json_error, normalized): + return None, ( + f"truncated: {json_error}; literal_eval error: {eval_error}" + ) return None, f"json error: {json_error}; literal_eval error: {eval_error}" if not isinstance(parsed, dict): @@ -758,6 +766,37 @@ def _parse_action_decision(self, raw: str) -> Tuple[Optional[Dict[str, Any]], Op return parsed, None + @staticmethod + def _looks_truncated(json_error: json.JSONDecodeError, normalized: str) -> bool: + """Heuristic: did the LLM's JSON response get cut off mid-output? + + Two error shapes are treated as truncation: + + - ``Unterminated string`` — parsing reached EOF inside a string literal. + ``json_error.pos`` points to where the string *opened*, not where it + died (which is the end of the input by definition), so the position + isn't a useful tail check here. Always classify as truncation. + + - ``Expecting value`` / ``Expecting ',' delimiter`` / + ``Expecting property name`` — these can fire mid-payload too, so + additionally require that the failure landed at (or essentially at) + the very end of the response. + """ + msg = (json_error.msg or "").lower() + if "unterminated" in msg: + return True + + expecting_msgs = ( + "expecting value", + "expecting ',' delimiter", + "expecting property name", + ) + if not any(m in msg for m in expecting_msgs): + return False + + tail_window = 4 + return json_error.pos >= max(0, len(normalized) - tail_window) + def _augment_prompt_with_feedback( self, base_prompt: str, @@ -765,6 +804,37 @@ def _augment_prompt_with_feedback( raw_response: str, error_message: str, ) -> str: + # Truncation needs a different nudge than a malformed JSON shape: the + # LLM didn't pick the wrong format, the response itself was cut off by + # the output-token cap — almost always because a large value (typically + # prior tool output) was inlined into a parameter. Telling it to "fix + # the JSON" again would just reproduce the same truncation, burning the + # retry budget. Instead, point it at the externalized file pointers + # that already exist in the event stream. + if isinstance(error_message, str) and error_message.startswith("truncated:"): + feedback_block = ( + f"\n\nPrevious attempt {attempt} was TRUNCATED by the model's " + "output-token limit before the JSON could close " + f"({error_message}).\n" + "This almost always happens when a large value was inlined into " + "an action parameter (e.g. prior tool output pasted into the " + "`code` field of run_python).\n" + "DO NOT re-emit the same large payload. Earlier action outputs " + "that exceeded the inline threshold were saved to files — the " + "event stream shows their paths in messages of the form " + "\"Action X completed. The output is too long therefore is " + "saved in ...\". Use `stream_read` / `grep_files` to " + "read them, or `open(path)` from inside `run_python`. Then " + "return ONLY the corrected JSON object with action_name and " + "parameters fields.\n\n" + "RAW RESPONSE (note the abrupt end):\n" + f"{raw_response}\n" + "--- End of RAW RESPONSE ---\n" + "Respond now with a shorter JSON object that references files " + "instead of inlining their contents." + ) + return base_prompt + feedback_block + feedback_block = ( f"\n\nPrevious attempt {attempt} failed to parse because: {error_message}. " "Review your last reply above (shown in the RAW RESPONSE section) and return a corrected response. " diff --git a/agent_core/core/prompts/action.py b/agent_core/core/prompts/action.py index 793d22f9..b5f00aa7 100644 --- a/agent_core/core/prompts/action.py +++ b/agent_core/core/prompts/action.py @@ -309,6 +309,51 @@ 3. task_update_todos + send_message is a good combination - use them together when updating progress and notifying the user. + +Reference Passing (`$ref`): +Every action's full output is archived on disk at a stable key +`{{action_name}}#{{short_run_id}}`. Whenever an `action_end` event in the stream +shows `[shape only]` instead of the literal output, the full payload is one +`$ref` away — NEVER reconstruct or paste large prior outputs into a parameter. + +Any action parameter may be a reference object instead of a literal value: + {{"$ref": "#", "path": ""}} + +The manager resolves the reference (loads the archive file, navigates the +path) before the action handler runs, so the handler receives the real value +— a dict, list, string, anything. The reference itself stays small in your +response, which keeps the per-turn JSON well under any output-token cap. + +Examples: + +1) Feed a prior tool result into run_python without re-emitting it: + {{ + "action_name": "run_python", + "parameters": {{ + "code": "for c in channels: print(c['name'])", + "channels": {{"$ref": "get_discord_channels#a3f1c2", + "path": "result.result.all_channels"}} + }} + }} + +2) Have send-message deliver text built by an earlier run_python call, + without that text ever appearing in your JSON response: + {{ + "action_name": "send_message", + "parameters": {{ + "message": {{"$ref": "run_python#7b2e91", "path": "stdout"}} + }} + }} + +Path syntax: dotted keys plus square-bracket indices. `result.items[0].id`. +Omit `path` (or pass empty) to receive the whole `outputs` object of the +referenced action. + +When in doubt, prefer `$ref` over inlining prior data. The agent never needs +to "remember" the contents of an output it has already produced — the +archive holds the canonical copy. + + Before selecting an action, you MUST reason through these steps: 1. Identify the current todo from the [todos] event (marked [>] in_progress or first [ ] pending). @@ -522,6 +567,33 @@ 3. task_update_todos + send_message is a good combination - use them together when updating progress and notifying the user. + +Reference Passing (`$ref`): +Every action's full output is archived on disk at a stable key +`{{action_name}}#{{short_run_id}}`. Whenever an `action_end` event in the +stream shows `[shape only]` instead of the literal output, the full payload +is one `$ref` away — NEVER reconstruct or paste large prior outputs into a +parameter. + +Any action parameter may be a reference object instead of a literal value: + {{"$ref": "#", "path": ""}} + +The manager resolves the reference before the handler runs. + +Example — feed a prior tool result into run_python without inlining it: + {{ + "action_name": "run_python", + "parameters": {{ + "code": "for c in channels: print(c['name'])", + "channels": {{"$ref": "get_discord_channels#a3f1c2", + "path": "result.result.all_channels"}} + }} + }} + +Path syntax: dotted keys plus square-bracket indices. Omit `path` to receive +the whole `outputs` object. + + Before selecting an action, quickly reason through: 1. What is the goal of this simple task? diff --git a/app/agent_base.py b/app/agent_base.py index ee3df6a0..f0c7e31e 100644 --- a/app/agent_base.py +++ b/app/agent_base.py @@ -32,7 +32,7 @@ from dataclasses import dataclass from typing import Any, Awaitable, Callable, Dict, List, Optional -from agent_core import ActionLibrary, ActionManager, ActionRouter +from agent_core import ActionLibrary, ActionManager, ActionOutputStore, ActionRouter from agent_core import settings_manager, config_watcher from app.config import ( @@ -226,8 +226,12 @@ def __init__( self.context_engine = ContextEngine(state_manager=self.state_manager) self.context_engine.set_role_info_hook(self._generate_role_info_prompt) + self.action_output_store = ActionOutputStore( + AGENT_FILE_SYSTEM_PATH / "action_outputs" + ) self.action_manager = ActionManager( - self.action_library, self.llm, self.db_interface, self.event_stream_manager, self.context_engine, self.state_manager + self.action_library, self.llm, self.db_interface, self.event_stream_manager, self.context_engine, self.state_manager, + action_output_store=self.action_output_store, ) self.action_router = ActionRouter(self.action_library, self.llm, self.context_engine) diff --git a/app/data/action/run_python.py b/app/data/action/run_python.py index b37fa591..4c6d5bd8 100644 --- a/app/data/action/run_python.py +++ b/app/data/action/run_python.py @@ -2,7 +2,23 @@ @action( name="run_python", - description="Execute a Python code snippet in an isolated environment. Missing packages are auto-installed. Use print() to return results.", + description=( + "Execute a Python code snippet in an isolated environment. Missing " + "packages are auto-installed. Use print() to return results.\n\n" + "PASSING PRIOR DATA: any additional parameters you supply alongside " + "`code` are exposed in the script as Python variables of the same " + "name. Use a `$ref` parameter to pull a slice of an earlier action's " + "output without pasting it into `code`. Example:\n" + " parameters = {\n" + " \"code\": \"for c in channels: print(c['name'])\",\n" + " \"channels\": {\"$ref\": \"get_discord_channels#a3f1c2\", " + "\"path\": \"result.result.all_channels\"}\n" + " }\n" + "The manager resolves the ref before the script runs; inside the " + "script `channels` is already a Python list. Never paste prior tool " + "output as a string literal into `code` — it triggers JSON " + "truncation and parsing failures." + ), execution_mode="sandboxed", mode="CLI", default=True, @@ -11,7 +27,12 @@ "code": { "type": "string", "example": "print('Hello World')", - "description": "Python code to execute. Use print() to output results." + "description": ( + "Python code to execute. Use print() to output results. " + "Reference any sibling parameters by name inside the code; " + "they are pre-populated (including any `$ref`-resolved " + "values) before execution." + ), } }, output_schema={ @@ -47,6 +68,18 @@ def create_and_run_python_script(input_data: dict) -> dict: if not code: return {"status": "error", "stdout": "", "stderr": "", "message": "No code provided"} + # Every sibling parameter (anything besides ``code`` and the ``_``-prefixed + # plumbing keys the ActionManager injects) becomes a Python variable of + # the same name inside the script. ``$ref`` markers are already resolved + # by the manager, so the values seen here are real data. + exec_globals = {"__builtins__": __builtins__} + for k, v in input_data.items(): + if k == "code" or k.startswith("_"): + continue + if not isinstance(k, str) or not k.isidentifier(): + continue + exec_globals[k] = v + # Capture stdout/stderr stdout_buf = io.StringIO() stderr_buf = io.StringIO() @@ -68,7 +101,7 @@ def install_package(pkg): # Simple exec with retry for missing modules for attempt in range(3): try: - exec(code, {"__builtins__": __builtins__}) + exec(code, exec_globals) break except ModuleNotFoundError as e: match = re.search(r"No module named ['\"]([^'\"]+)['\"]", str(e)) diff --git a/app/ui_layer/events/transformer.py b/app/ui_layer/events/transformer.py index 3bca0d10..3db12308 100644 --- a/app/ui_layer/events/transformer.py +++ b/app/ui_layer/events/transformer.py @@ -39,10 +39,16 @@ class EventTransformer: "task start", "task_start", } - # Event kinds that should be hidden from chat (reasoning, internal events) + # Event kinds that should be hidden from chat (reasoning, internal events). + # Matched via substring containment against the lowercased event kind, so + # entries here must NOT appear as substrings of any legitimate kind. In + # particular, ``plan`` is intentionally absent — it would substring-match + # ``platform`` in kinds like ``"agent message to platform: ..."`` and + # silently filter every platform-routed chat message. ``planning`` covers + # the intended use case without the collision. HIDDEN_EVENT_KINDS = { "reasoning", "thinking", "thought", "internal", - "plan", "planning", "consider", "analysis", + "planning", "consider", "analysis", "reflection", "debug", "trace", "context", "memory", "observation", "reasoning_step", }