From 1fdca13229531555318f3d56057bb00e574882b6 Mon Sep 17 00:00:00 2001 From: Bob Bai Date: Thu, 28 May 2026 11:44:56 -0700 Subject: [PATCH] feat(agent-service): persist agent state to disk and rehydrate on startup Agents lived only in a process-local Map, so a restart/crash/redeploy lost every agent and its conversation. This adds opt-in disk persistence: - TexeraAgent.toSnapshot()/restoreFromSnapshot() capture and restore the ReAct step tree, HEAD, settings, delegate metadata, and workflow content (the user token and result caches are intentionally excluded). - AgentSnapshotStore writes one JSON file per agent (atomic temp+rename), with debounced coalescing, and loads/skips-corrupt on startup. - The server persists on create/clear/checkout/settings-change and after each WS turn, removes on delete, and rehydrates all agents on startup. Enabled by setting AGENT_STATE_DIR; empty keeps the previous in-memory-only behavior. Closes #5267 --- agent-service/.env.example | 8 +- .../src/agent/texera-agent-snapshot.test.ts | 169 ++++++++++++++++++ agent-service/src/agent/texera-agent.ts | 107 ++++++++++- .../persistence/agent-snapshot-store.test.ts | 160 +++++++++++++++++ .../src/persistence/agent-snapshot-store.ts | 137 ++++++++++++++ agent-service/src/server.test.ts | 98 +++++++++- agent-service/src/server.ts | 103 +++++++++-- agent-service/src/types/agent.ts | 43 +++++ 8 files changed, 810 insertions(+), 15 deletions(-) create mode 100644 agent-service/src/agent/texera-agent-snapshot.test.ts create mode 100644 agent-service/src/persistence/agent-snapshot-store.test.ts create mode 100644 agent-service/src/persistence/agent-snapshot-store.ts diff --git a/agent-service/.env.example b/agent-service/.env.example index 605f157ca0c..36df06a35ed 100644 --- a/agent-service/.env.example +++ b/agent-service/.env.example @@ -18,4 +18,10 @@ LLM_ENDPOINT=http://localhost:9096 # Texera backend services TEXERA_DASHBOARD_SERVICE_ENDPOINT=http://localhost:8080 WORKFLOW_COMPILING_SERVICE_ENDPOINT=http://localhost:9090 -WORKFLOW_EXECUTION_SERVICE_ENDPOINT=http://localhost:8085 \ No newline at end of file +WORKFLOW_EXECUTION_SERVICE_ENDPOINT=http://localhost:8085 + +# Directory where agent state (conversation, workflow, settings) is persisted as +# one JSON file per agent. Agents are rehydrated from here on startup, so they +# survive restarts. Leave empty to disable persistence (agents live only in +# memory, the previous behavior). Mount a volume here in containerized deploys. +AGENT_STATE_DIR= \ No newline at end of file diff --git a/agent-service/src/agent/texera-agent-snapshot.test.ts b/agent-service/src/agent/texera-agent-snapshot.test.ts new file mode 100644 index 00000000000..52f0a3d36ae --- /dev/null +++ b/agent-service/src/agent/texera-agent-snapshot.test.ts @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test } from "bun:test"; +import { TexeraAgent } from "./texera-agent"; +import type { LanguageModel } from "ai"; +import { INITIAL_STEP_ID, OperatorResultSerializationMode } from "../types/agent"; +import type { AgentSnapshot } from "../types/agent"; +import type { WorkflowContent } from "../types/workflow"; + +function newAgent(createdAt?: Date): TexeraAgent { + return new TexeraAgent({ + model: {} as LanguageModel, + modelType: "test-model", + agentId: "agent-x", + agentName: "Bob", + createdAt, + }); +} + +const sampleWorkflow: WorkflowContent = { + operators: [ + { + operatorID: "op1", + operatorType: "Filter", + operatorVersion: "1.0", + operatorProperties: { predicate: "x > 1" }, + inputPorts: [{ portID: "input-0", displayName: "in" }], + outputPorts: [{ portID: "output-0", displayName: "out" }], + showAdvanced: false, + }, + ], + operatorPositions: { op1: { x: 10, y: 20 } }, + links: [], + commentBoxes: [], + settings: { dataTransferBatchSize: 400 }, +}; + +function sampleSnapshot(): AgentSnapshot { + return { + version: 1, + agentId: "agent-x", + agentName: "Bob", + modelType: "test-model", + createdAt: "2024-01-01T00:00:00.000Z", + head: "s1", + stepCounter: 3, + messageCounter: 2, + settings: { + disabledTools: ["executeOperator"], + maxOperatorResultCharLimit: 1500, + maxOperatorResultCellCharLimit: 800, + operatorResultSerializationMode: OperatorResultSerializationMode.TSV, + toolTimeoutMs: 120000, + executionTimeoutMs: 180000, + maxSteps: 42, + allowedOperatorTypes: ["Filter", "CSVFileScan"], + }, + delegate: { + userInfo: { uid: 1, name: "user", email: "u@example.com", role: "REGULAR" }, + workflowId: 7, + workflowName: "wf", + computingUnitId: 3, + }, + steps: [ + { + id: INITIAL_STEP_ID, + messageId: "initial", + stepId: -1, + timestamp: 0, + role: "user", + content: "", + isBegin: true, + isEnd: true, + }, + { + id: "s1", + parentId: INITIAL_STEP_ID, + messageId: "m1", + stepId: 0, + timestamp: 123, + role: "user", + content: "build a filter", + isBegin: true, + isEnd: true, + }, + ], + messageGroups: { m1: ["s1"] }, + workflowContent: sampleWorkflow, + }; +} + +describe("TexeraAgent.toSnapshot", () => { + test("captures a fresh agent: version, empty history, initial head", () => { + const snap = newAgent().toSnapshot(); + expect(snap.version).toBe(1); + expect(snap.agentId).toBe("agent-x"); + expect(snap.head).toBe(INITIAL_STEP_ID); + // Only the sentinel initial step exists. + expect(snap.steps).toHaveLength(1); + expect(snap.steps[0].id).toBe(INITIAL_STEP_ID); + expect(snap.messageGroups).toEqual({}); + expect(snap.delegate).toBeUndefined(); + }); + + test("preserves createdAt as an ISO string", () => { + const snap = newAgent(new Date("2024-01-01T00:00:00.000Z")).toSnapshot(); + expect(snap.createdAt).toBe("2024-01-01T00:00:00.000Z"); + }); +}); + +describe("TexeraAgent.restoreFromSnapshot", () => { + test("restores conversation, settings, workflow, and delegate", () => { + const snap = sampleSnapshot(); + const agent = newAgent(new Date(snap.createdAt)); + agent.restoreFromSnapshot(snap); + + expect(agent.getHead()).toBe("s1"); + expect(agent.getReActSteps().map(s => s.id)).toEqual(["s1"]); + expect(agent.getAllSteps().map(s => s.id)).toEqual(["s1"]); // excludes the initial sentinel + + const settings = agent.getSettings(); + expect(settings.maxSteps).toBe(42); + expect(settings.maxOperatorResultCharLimit).toBe(1500); + expect(Array.from(settings.disabledTools)).toEqual(["executeOperator"]); + + expect( + agent + .getWorkflowState() + .getWorkflowContent() + .operators.map(o => o.operatorID) + ).toEqual(["op1"]); + + // Delegate metadata is restored, but the user token is not persisted. + expect(agent.getDelegateConfig()?.workflowId).toBe(7); + expect(agent.getDelegateConfig()?.userToken).toBe(""); + }); + + test("round-trips: toSnapshot(restore(s)) deep-equals s", () => { + const snap = sampleSnapshot(); + const agent = newAgent(new Date(snap.createdAt)); + agent.restoreFromSnapshot(snap); + + // Survives a JSON round-trip as it would on disk. + const roundTripped = JSON.parse(JSON.stringify(agent.toSnapshot())) as AgentSnapshot; + expect(roundTripped).toEqual(snap); + }); + + test("rejects an unsupported snapshot version", () => { + const snap = { ...sampleSnapshot(), version: 2 as unknown as 1 }; + expect(() => newAgent().restoreFromSnapshot(snap)).toThrow(/Unsupported agent snapshot version/); + }); +}); diff --git a/agent-service/src/agent/texera-agent.ts b/agent-service/src/agent/texera-agent.ts index 37eb12d8688..d8608b6ed3c 100644 --- a/agent-service/src/agent/texera-agent.ts +++ b/agent-service/src/agent/texera-agent.ts @@ -24,7 +24,7 @@ import { WorkflowState } from "./workflow-state"; import { WorkflowSystemMetadata } from "./util/workflow-system-metadata"; import { WorkflowResultState } from "./workflow-result-state"; import { formatOperatorResult } from "./tools/result-formatting"; -import type { AgentSettings, ReActStep, TokenUsage, UserInfo } from "../types/agent"; +import type { AgentSettings, AgentSnapshot, ReActStep, TokenUsage, UserInfo } from "../types/agent"; import { AgentState as AgentStateEnum, DEFAULT_AGENT_SETTINGS, @@ -60,6 +60,8 @@ export interface TexeraAgentConfig { agentId: string; agentName?: string; systemPrompt?: string; + // Preserved across restarts when an agent is reconstructed from a snapshot. + createdAt?: Date; } export interface AgentMessageResult { @@ -129,7 +131,7 @@ export class TexeraAgent { this.agentId = config.agentId; this.agentName = config.agentName || `Agent-${config.agentId}`; this.modelType = config.modelType; - this.createdAt = new Date(); + this.createdAt = config.createdAt ?? new Date(); this.model = config.model; this.systemPrompt = config.systemPrompt || ""; this.log = createLogger("TexeraAgent", { agentId: this.agentId }); @@ -823,6 +825,107 @@ export class TexeraAgent { return relevantSteps; } + /** + * Produce a durable, JSON-serializable snapshot of this agent. The user token + * and execution-result caches are intentionally excluded (see AgentSnapshot). + */ + toSnapshot(): AgentSnapshot { + const messageGroups: Record = {}; + for (const [messageId, steps] of this.reActStepsByMessageId) { + messageGroups[messageId] = steps.map(s => s.id); + } + + return { + version: 1, + agentId: this.agentId, + agentName: this.agentName, + modelType: this.modelType, + createdAt: this.createdAt.toISOString(), + head: this.head, + stepCounter: this.stepCounter, + messageCounter: this.messageCounter, + settings: { + disabledTools: Array.from(this.settings.disabledTools), + maxOperatorResultCharLimit: this.settings.maxOperatorResultCharLimit, + maxOperatorResultCellCharLimit: this.settings.maxOperatorResultCellCharLimit, + operatorResultSerializationMode: this.settings.operatorResultSerializationMode, + toolTimeoutMs: this.settings.toolTimeoutMs, + executionTimeoutMs: this.settings.executionTimeoutMs, + maxSteps: this.settings.maxSteps, + allowedOperatorTypes: this.settings.allowedOperatorTypes, + }, + delegate: this.delegateConfig + ? { + userInfo: this.delegateConfig.userInfo, + workflowId: this.delegateConfig.workflowId, + workflowName: this.delegateConfig.workflowName, + computingUnitId: this.delegateConfig.computingUnitId, + } + : undefined, + steps: Array.from(this.stepsById.values()), + messageGroups, + workflowContent: this.workflowState.getWorkflowContent(), + }; + } + + /** + * Restore conversation, workflow, settings, and delegate metadata from a + * snapshot. Must be called on a freshly constructed agent (one built with the + * snapshot's createdAt). The restored delegate carries no user token, so the + * caller must re-attach one before the agent can execute or auto-persist. + */ + restoreFromSnapshot(snapshot: AgentSnapshot): void { + if (snapshot.version !== 1) { + throw new Error(`Unsupported agent snapshot version: ${snapshot.version}`); + } + + this.stepCounter = snapshot.stepCounter; + this.messageCounter = snapshot.messageCounter; + + this.settings = { + ...DEFAULT_AGENT_SETTINGS, + ...snapshot.settings, + disabledTools: new Set(snapshot.settings.disabledTools), + systemPrompt: this.systemPrompt, + }; + + this.stepsById = new Map(snapshot.steps.map(step => [step.id, step])); + this.reActStepsByMessageId = new Map(); + for (const [messageId, stepIds] of Object.entries(snapshot.messageGroups)) { + const steps = stepIds.map(id => this.stepsById.get(id)).filter((s): s is ReActStep => s !== undefined); + this.reActStepsByMessageId.set(messageId, steps); + } + + // Ensure the initial sentinel step always exists so HEAD traversal/checkout works. + if (!this.stepsById.has(INITIAL_STEP_ID)) { + this.stepsById.set(INITIAL_STEP_ID, { + id: INITIAL_STEP_ID, + messageId: "initial", + stepId: -1, + timestamp: Date.now(), + role: "user", + content: "", + isBegin: true, + isEnd: true, + }); + } + this.head = snapshot.head; + + if (snapshot.delegate) { + this.delegateConfig = { + userToken: "", + userInfo: snapshot.delegate.userInfo, + workflowId: snapshot.delegate.workflowId, + workflowName: snapshot.delegate.workflowName, + computingUnitId: snapshot.delegate.computingUnitId, + }; + } + + this.workflowState.setWorkflowContent(snapshot.workflowContent); + this.rebuildSystemPrompt(); + this.tools = this.createTools(); + } + destroy(): void { if (this.workflowChangeSubscription) { this.workflowChangeSubscription.unsubscribe(); diff --git a/agent-service/src/persistence/agent-snapshot-store.test.ts b/agent-service/src/persistence/agent-snapshot-store.test.ts new file mode 100644 index 00000000000..17360f08ed3 --- /dev/null +++ b/agent-service/src/persistence/agent-snapshot-store.test.ts @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { mkdtemp, rm, writeFile, readdir } from "fs/promises"; +import { tmpdir } from "os"; +import { join } from "path"; +import { AgentSnapshotStore } from "./agent-snapshot-store"; +import { OperatorResultSerializationMode } from "../types/agent"; +import type { AgentSnapshot } from "../types/agent"; + +let dir: string; + +beforeEach(async () => { + dir = await mkdtemp(join(tmpdir(), "agent-snap-")); +}); + +afterEach(async () => { + await rm(dir, { recursive: true, force: true }); +}); + +function snapshot(agentId: string): AgentSnapshot { + return { + version: 1, + agentId, + agentName: "Bob", + modelType: "m", + createdAt: "2024-01-01T00:00:00.000Z", + head: "step-initial", + stepCounter: 0, + messageCounter: 0, + settings: { + disabledTools: [], + maxOperatorResultCharLimit: 2000, + maxOperatorResultCellCharLimit: 2000, + operatorResultSerializationMode: OperatorResultSerializationMode.TSV, + toolTimeoutMs: 240000, + executionTimeoutMs: 240000, + maxSteps: 100, + allowedOperatorTypes: [], + }, + steps: [], + messageGroups: {}, + workflowContent: { + operators: [], + operatorPositions: {}, + links: [], + commentBoxes: [], + settings: { dataTransferBatchSize: 400 }, + }, + }; +} + +describe("AgentSnapshotStore - save/load", () => { + test("save then load returns an equal snapshot", async () => { + const store = new AgentSnapshotStore(dir); + const snap = snapshot("agent-1"); + await store.save(snap); + expect(await store.load("agent-1")).toEqual(snap); + }); + + test("load returns null for an unknown agent", async () => { + const store = new AgentSnapshotStore(dir); + expect(await store.load("missing")).toBeNull(); + }); + + test("creates the target directory on first save", async () => { + const nested = join(dir, "does", "not", "exist"); + const store = new AgentSnapshotStore(nested); + await store.save(snapshot("agent-1")); + expect(await store.load("agent-1")).not.toBeNull(); + }); + + test("save overwrites a previous snapshot for the same agent", async () => { + const store = new AgentSnapshotStore(dir); + await store.save(snapshot("agent-1")); + const updated = { ...snapshot("agent-1"), stepCounter: 5 }; + await store.save(updated); + expect((await store.load("agent-1"))?.stepCounter).toBe(5); + expect(await readdir(dir)).toHaveLength(1); // not a second file + }); +}); + +describe("AgentSnapshotStore - loadAll", () => { + test("returns every saved snapshot", async () => { + const store = new AgentSnapshotStore(dir); + await store.save(snapshot("agent-1")); + await store.save(snapshot("agent-2")); + const all = await store.loadAll(); + expect(all.map(s => s.agentId).sort()).toEqual(["agent-1", "agent-2"]); + }); + + test("returns empty when the directory does not exist", async () => { + const store = new AgentSnapshotStore(join(dir, "nope")); + expect(await store.loadAll()).toEqual([]); + }); + + test("skips corrupt and unsupported files", async () => { + const store = new AgentSnapshotStore(dir); + await store.save(snapshot("good")); + await writeFile(join(dir, "broken.agent.json"), "{not json", "utf-8"); + await writeFile(join(dir, "wrongversion.agent.json"), JSON.stringify({ agentId: "x", version: 99 }), "utf-8"); + await writeFile(join(dir, "ignored.txt"), "ignore me", "utf-8"); + + const all = await store.loadAll(); + expect(all.map(s => s.agentId)).toEqual(["good"]); + }); +}); + +describe("AgentSnapshotStore - remove", () => { + test("removes a saved snapshot", async () => { + const store = new AgentSnapshotStore(dir); + await store.save(snapshot("agent-1")); + await store.remove("agent-1"); + expect(await store.load("agent-1")).toBeNull(); + }); + + test("removing a missing agent does not throw", async () => { + const store = new AgentSnapshotStore(dir); + await store.remove("missing"); + expect(await store.load("missing")).toBeNull(); + }); +}); + +describe("AgentSnapshotStore - debounced scheduleSave", () => { + test("coalesces rapid updates and flush writes the latest", async () => { + const store = new AgentSnapshotStore(dir, { debounceMs: 1000 }); + store.scheduleSave({ ...snapshot("agent-1"), stepCounter: 1 }); + store.scheduleSave({ ...snapshot("agent-1"), stepCounter: 2 }); + store.scheduleSave({ ...snapshot("agent-1"), stepCounter: 3 }); + + await store.flush(); + + const loaded = await store.load("agent-1"); + expect(loaded?.stepCounter).toBe(3); + expect(await readdir(dir)).toHaveLength(1); + }); + + test("flush with nothing pending is a no-op", async () => { + const store = new AgentSnapshotStore(dir); + await store.flush(); + expect(await store.loadAll()).toEqual([]); + }); +}); diff --git a/agent-service/src/persistence/agent-snapshot-store.ts b/agent-service/src/persistence/agent-snapshot-store.ts new file mode 100644 index 00000000000..25b71d19a39 --- /dev/null +++ b/agent-service/src/persistence/agent-snapshot-store.ts @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { mkdir, readdir, readFile, rename, rm, writeFile } from "fs/promises"; +import { join } from "path"; +import type { AgentSnapshot } from "../types/agent"; +import { createLogger } from "../logger"; + +const log = createLogger("SnapshotStore"); + +const DEFAULT_DEBOUNCE_MS = 500; +const FILE_SUFFIX = ".agent.json"; + +/** + * Disk-backed store of agent snapshots, one JSON file per agent. + * + * Writes are atomic (write to a temp file then rename) and can be coalesced via + * `scheduleSave` so a burst of updates results in a single write. `loadAll` + * rehydrates surviving agents on startup and skips any unreadable/invalid file. + */ +export class AgentSnapshotStore { + private readonly dir: string; + private readonly debounceMs: number; + private readonly pending = new Map }>(); + + constructor(dir: string, options: { debounceMs?: number } = {}) { + this.dir = dir; + this.debounceMs = options.debounceMs ?? DEFAULT_DEBOUNCE_MS; + } + + // agentIds are server-generated, but sanitize anyway to keep writes inside dir. + private fileFor(agentId: string): string { + const safe = agentId.replace(/[^a-zA-Z0-9_-]/g, "_"); + return join(this.dir, `${safe}${FILE_SUFFIX}`); + } + + async save(snapshot: AgentSnapshot): Promise { + await mkdir(this.dir, { recursive: true }); + const file = this.fileFor(snapshot.agentId); + const tmp = `${file}.${process.pid}.tmp`; + await writeFile(tmp, JSON.stringify(snapshot), "utf-8"); + await rename(tmp, file); + } + + /** Debounced save; coalesces rapid updates for the same agent into one write. */ + scheduleSave(snapshot: AgentSnapshot): void { + const existing = this.pending.get(snapshot.agentId); + if (existing) clearTimeout(existing.timer); + + const timer = setTimeout(() => { + const entry = this.pending.get(snapshot.agentId); + if (!entry) return; + this.pending.delete(snapshot.agentId); + void this.save(entry.snapshot).catch(err => + log.error({ err, agentId: snapshot.agentId }, "failed to persist agent snapshot") + ); + }, this.debounceMs); + + // A pending snapshot write should not keep the process alive on its own. + if (typeof timer.unref === "function") timer.unref(); + this.pending.set(snapshot.agentId, { snapshot, timer }); + } + + async load(agentId: string): Promise { + try { + const raw = await readFile(this.fileFor(agentId), "utf-8"); + return JSON.parse(raw) as AgentSnapshot; + } catch { + return null; + } + } + + async loadAll(): Promise { + let files: string[]; + try { + files = await readdir(this.dir); + } catch { + return []; // directory does not exist yet -> nothing persisted + } + + const snapshots: AgentSnapshot[] = []; + for (const file of files) { + if (!file.endsWith(FILE_SUFFIX)) continue; + try { + const raw = await readFile(join(this.dir, file), "utf-8"); + const snapshot = JSON.parse(raw) as AgentSnapshot; + if (snapshot?.agentId && snapshot.version === 1) { + snapshots.push(snapshot); + } else { + log.warn({ file }, "skipping snapshot with missing id or unsupported version"); + } + } catch (err) { + log.warn({ file, err }, "skipping unreadable agent snapshot"); + } + } + return snapshots; + } + + async remove(agentId: string): Promise { + const existing = this.pending.get(agentId); + if (existing) { + clearTimeout(existing.timer); + this.pending.delete(agentId); + } + await rm(this.fileFor(agentId), { force: true }); + } + + /** Persist all pending debounced writes immediately (shutdown / tests). */ + async flush(): Promise { + const entries = Array.from(this.pending.values()); + this.pending.clear(); + await Promise.all( + entries.map(({ snapshot, timer }) => { + clearTimeout(timer); + return this.save(snapshot).catch(err => + log.error({ err, agentId: snapshot.agentId }, "failed to flush agent snapshot") + ); + }) + ); + } +} diff --git a/agent-service/src/server.test.ts b/agent-service/src/server.test.ts index 0f618e599c2..4f4e3318cfb 100644 --- a/agent-service/src/server.test.ts +++ b/agent-service/src/server.test.ts @@ -17,8 +17,14 @@ * under the License. */ -import { beforeEach, describe, expect, test } from "bun:test"; -import { buildApp, _resetAgentStoreForTests } from "./server"; +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { mkdtemp, rm } from "fs/promises"; +import { tmpdir } from "os"; +import { join } from "path"; +import { buildApp, _resetAgentStoreForTests, _getSnapshotStoreForTests, rehydrateAgents } from "./server"; +import { AgentSnapshotStore } from "./persistence/agent-snapshot-store"; +import { OperatorResultSerializationMode } from "./types/agent"; +import type { AgentSnapshot } from "./types/agent"; import { env } from "./config/env"; const API = env.API_PREFIX; @@ -221,3 +227,91 @@ describe(`PATCH ${API}/agents/:id/settings`, () => { expect(reread.toolTimeoutSeconds).toBe(30); }); }); + +describe("agent persistence (AGENT_STATE_DIR)", () => { + const prevDir = process.env.AGENT_STATE_DIR; + let dir: string; + + beforeEach(async () => { + dir = await mkdtemp(join(tmpdir(), "agent-server-persist-")); + process.env.AGENT_STATE_DIR = dir; + // Re-resolve the module's snapshot store against the fresh directory. + _resetAgentStoreForTests(); + }); + + afterEach(async () => { + if (prevDir === undefined) delete process.env.AGENT_STATE_DIR; + else process.env.AGENT_STATE_DIR = prevDir; + _resetAgentStoreForTests(); + await rm(dir, { recursive: true, force: true }); + }); + + function diskSnapshot(agentId: string): AgentSnapshot { + return { + version: 1, + agentId, + agentName: "Restored", + modelType: "m", + createdAt: "2024-01-01T00:00:00.000Z", + head: "step-initial", + stepCounter: 0, + messageCounter: 0, + settings: { + disabledTools: [], + maxOperatorResultCharLimit: 2000, + maxOperatorResultCellCharLimit: 2000, + operatorResultSerializationMode: OperatorResultSerializationMode.TSV, + toolTimeoutMs: 240000, + executionTimeoutMs: 240000, + maxSteps: 100, + allowedOperatorTypes: [], + }, + steps: [], + messageGroups: {}, + workflowContent: { + operators: [], + operatorPositions: {}, + links: [], + commentBoxes: [], + settings: { dataTransferBatchSize: 400 }, + }, + }; + } + + test("a created agent is written to disk", async () => { + const created = await postJson(`${API}/agents`, { modelType: "m", name: "Persisted" }); + const { id } = await readJson<{ id: string }>(created); + + // Force the debounced write to complete, then read with an independent store. + await _getSnapshotStoreForTests()!.flush(); + const onDisk = await new AgentSnapshotStore(dir).load(id); + + expect(onDisk).not.toBeNull(); + expect(onDisk?.agentId).toBe(id); + expect(onDisk?.agentName).toBe("Persisted"); + }); + + test("deleting an agent removes its snapshot file", async () => { + const created = await postJson(`${API}/agents`, { modelType: "m" }); + const { id } = await readJson<{ id: string }>(created); + await _getSnapshotStoreForTests()!.flush(); + + await del(`${API}/agents/${id}`); + + expect(await new AgentSnapshotStore(dir).load(id)).toBeNull(); + }); + + test("rehydrateAgents restores a persisted agent so it is served again", async () => { + // Simulate a prior process: write a snapshot straight to disk. + await new AgentSnapshotStore(dir).save(diskSnapshot("agent-persisted-1")); + + const restored = await rehydrateAgents(new AgentSnapshotStore(dir), () => ({}) as any); + expect(restored).toBe(1); + + const res = await getJson(`${API}/agents/agent-persisted-1`); + expect(res.status).toBe(200); + const body = await readJson<{ id: string; name: string }>(res); + expect(body.id).toBe("agent-persisted-1"); + expect(body.name).toBe("Restored"); + }); +}); diff --git a/agent-service/src/server.ts b/agent-service/src/server.ts index d5eeae82c9b..d04a92b17f1 100644 --- a/agent-service/src/server.ts +++ b/agent-service/src/server.ts @@ -20,12 +20,14 @@ import { Elysia, t } from "elysia"; import { cors } from "@elysiajs/cors"; import { createOpenAI } from "@ai-sdk/openai"; +import type { LanguageModel } from "ai"; import { TexeraAgent } from "./agent/texera-agent"; import { getVisibleResultHeaders } from "./agent/tools/tools-utility"; import { getBackendConfig } from "./api/backend-api"; import { extractUserFromToken, validateToken } from "./api/auth-api"; import { retrieveWorkflow } from "./api/workflow-api"; import { WorkflowSystemMetadata } from "./agent/util/workflow-system-metadata"; +import { AgentSnapshotStore } from "./persistence/agent-snapshot-store"; import { env } from "./config/env"; import { createLogger } from "./logger"; @@ -44,23 +46,79 @@ import { OperatorResultSerializationMode } from "./types/agent"; const agentStore = new Map(); let agentCounter = 0; +// Lazily resolved from AGENT_STATE_DIR (read live so it can be configured or +// overridden in tests without rebuilding the app). null = persistence disabled. +let snapshotStore: AgentSnapshotStore | null | undefined; + +function getSnapshotStore(): AgentSnapshotStore | null { + if (snapshotStore === undefined) { + const dir = process.env.AGENT_STATE_DIR; + snapshotStore = dir ? new AgentSnapshotStore(dir) : null; + if (dir) log.info({ dir }, "agent state persistence enabled"); + } + return snapshotStore; +} + +// Persist the agent's current snapshot (debounced). No-op when persistence is off. +function persistAgent(agentId: string): void { + const store = getSnapshotStore(); + const agent = agentStore.get(agentId); + if (store && agent) { + store.scheduleSave(agent.toSnapshot()); + } +} + +function buildChatModel(modelType: string): LanguageModel { + const config = getBackendConfig(); + // Reasoning effort variants are configured as separate model entries in litellm-config.yaml + // with extra_body to inject reasoning_effort, bypassing LiteLLM's param validation. + const openai = createOpenAI({ + baseURL: `${config.modelsEndpoint}/api`, + apiKey: env.LLM_API_KEY, + }); + return openai.chat(modelType); +} + +/** + * Reconstruct persisted agents into the in-memory store on startup. Returns the + * number successfully rehydrated. Exposed for tests. + */ +export async function rehydrateAgents( + store: AgentSnapshotStore, + buildModel: (modelType: string) => LanguageModel = buildChatModel +): Promise { + const snapshots = await store.loadAll(); + let restored = 0; + for (const snapshot of snapshots) { + try { + const agent = new TexeraAgent({ + model: buildModel(snapshot.modelType), + modelType: snapshot.modelType, + agentId: snapshot.agentId, + agentName: snapshot.agentName, + createdAt: new Date(snapshot.createdAt), + }); + await agent.initialize(); + agent.restoreFromSnapshot(snapshot); + agentStore.set(snapshot.agentId, agent); + restored++; + } catch (err) { + log.error({ err, agentId: snapshot.agentId }, "failed to rehydrate agent"); + } + } + log.info({ restored, total: snapshots.length }, "rehydrated agents from disk"); + return restored; +} + async function createAgentInstance( modelType: string, customName?: string, delegateConfig?: AgentDelegateConfig ): Promise<{ agentId: string; agent: TexeraAgent }> { const agentId = `agent-${++agentCounter}`; - const config = getBackendConfig(); - const openai = createOpenAI({ - baseURL: `${config.modelsEndpoint}/api`, - apiKey: env.LLM_API_KEY, - }); - - // Reasoning effort variants are configured as separate model entries in litellm-config.yaml - // with extra_body to inject reasoning_effort, bypassing LiteLLM's param validation. const agent = new TexeraAgent({ - model: openai.chat(modelType), + model: buildChatModel(modelType), modelType, agentId, agentName: customName || "Bob", @@ -214,6 +272,7 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) }); } + persistAgent(agentId); return getAgentInfo(agentId, agent); }, { @@ -248,7 +307,7 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) }; }) - .delete("/:id", ({ params: { id }, set }) => { + .delete("/:id", async ({ params: { id }, set }) => { const agent = agentStore.get(id); if (!agent) { set.status = 404; @@ -257,6 +316,7 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) agent.destroy(); agentStore.delete(id); + await getSnapshotStore()?.remove(id); return { deleted: true }; }) @@ -298,6 +358,7 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) .post("/:id/clear", ({ params: { id } }) => { const agent = getAgent(id); agent.clearHistory(); + persistAgent(id); return { status: "cleared" }; }) @@ -320,6 +381,7 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) operatorResults: getOperatorResultSummaries(agent), }); + persistAgent(id); return { status: "checked out", headId: stepId, @@ -377,6 +439,7 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) allowedOperatorTypes: settings.allowedOperatorTypes, }); + persistAgent(id); const agentSettings = agent.getSettings(); return { maxOperatorResultCharLimit: agentSettings.maxOperatorResultCharLimit, @@ -566,6 +629,9 @@ export function buildApp() { operatorResults: getOperatorResultSummaries(agent), }); + // Persist the conversation/workflow produced by this turn. + persistAgent(agentId); + wsLog.info({ agentId, steps: result.messages.length }, "agent run complete"); } catch (error: any) { agent.setStepCallback(null); @@ -596,6 +662,13 @@ export function buildApp() { export function _resetAgentStoreForTests(): void { agentStore.clear(); agentCounter = 0; + snapshotStore = undefined; +} + +// Re-read AGENT_STATE_DIR on the next access; lets tests point persistence at a +// fresh temp directory. +export function _getSnapshotStoreForTests(): AgentSnapshotStore | null { + return getSnapshotStore(); } function printStartupMessage(app: ReturnType) { @@ -652,6 +725,16 @@ async function initializeServices() { export async function start() { await initializeServices(); + + const store = getSnapshotStore(); + if (store) { + try { + await rehydrateAgents(store); + } catch (error) { + log.error({ err: error }, "failed to rehydrate persisted agents"); + } + } + const app = buildApp().listen(env.PORT); printStartupMessage(app); return app; diff --git a/agent-service/src/types/agent.ts b/agent-service/src/types/agent.ts index 765f5a7cb46..7dacf5a08e7 100644 --- a/agent-service/src/types/agent.ts +++ b/agent-service/src/types/agent.ts @@ -163,3 +163,46 @@ export interface UpdateAgentSettingsRequest { maxSteps?: number; allowedOperatorTypes?: string[]; } + +// JSON-serializable form of AgentSettings (Set -> array; systemPrompt is +// derived from metadata at restore time and therefore not persisted). +export interface SerializedAgentSettings { + disabledTools: string[]; + maxOperatorResultCharLimit: number; + maxOperatorResultCellCharLimit: number; + operatorResultSerializationMode: OperatorResultSerializationMode; + toolTimeoutMs: number; + executionTimeoutMs: number; + maxSteps: number; + allowedOperatorTypes: string[]; +} + +/** + * Durable, JSON-serializable snapshot of a TexeraAgent. + * + * Captures the conversation (ReAct step tree + HEAD), the workflow being + * edited, settings, and delegate metadata so an agent can be reconstructed + * after a process restart. The user token is deliberately omitted (it is + * short-lived and security-sensitive); execution-result caches are also + * omitted as they can be recomputed. + */ +export interface AgentSnapshot { + version: 1; + agentId: string; + agentName: string; + modelType: string; + createdAt: string; + head: string; + stepCounter: number; + messageCounter: number; + settings: SerializedAgentSettings; + delegate?: { + userInfo?: UserInfo; + workflowId: number; + workflowName?: string; + computingUnitId?: number; + }; + steps: ReActStep[]; + messageGroups: Record; + workflowContent: WorkflowContent; +}