diff --git a/agent-service/.env.example b/agent-service/.env.example index 605f157ca0c..23d4ac446a9 100644 --- a/agent-service/.env.example +++ b/agent-service/.env.example @@ -18,4 +18,12 @@ LLM_ENDPOINT=http://localhost:9096 # Texera backend services TEXERA_DASHBOARD_SERVICE_ENDPOINT=http://localhost:8080 WORKFLOW_COMPILING_SERVICE_ENDPOINT=http://localhost:9090 -WORKFLOW_EXECUTION_SERVICE_ENDPOINT=http://localhost:8085 \ No newline at end of file +WORKFLOW_EXECUTION_SERVICE_ENDPOINT=http://localhost:8085 + +# Access control. When AGENT_AUTH_REQUIRED is true, every request must carry a +# valid user JWT (Authorization: Bearer for HTTP, ?access-token= for the +# WebSocket) and agents are isolated to their owning user. AUTH_JWT_SECRET is +# the HS256 secret shared with the rest of Texera and is used to verify token +# signatures. Defaults below preserve the previous permissive behavior. +AGENT_AUTH_REQUIRED=false +AUTH_JWT_SECRET= \ No newline at end of file diff --git a/agent-service/src/api/auth-api.test.ts b/agent-service/src/api/auth-api.test.ts new file mode 100644 index 00000000000..695b11144af --- /dev/null +++ b/agent-service/src/api/auth-api.test.ts @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { createHmac } from "crypto"; +import { + createAuthHeaders, + extractUserFromToken, + getUidFromToken, + isAuthRequired, + validateToken, + verifyToken, +} from "./auth-api"; + +const SECRET = "unit-test-secret-key"; + +function b64url(input: string | Buffer): string { + return Buffer.from(input).toString("base64").replace(/=/g, "").replace(/\+/g, "-").replace(/\//g, "_"); +} + +function signJwt(payload: Record, opts: { secret?: string; alg?: string } = {}): string { + const header = b64url(JSON.stringify({ alg: opts.alg ?? "HS256", typ: "JWT" })); + const body = b64url(JSON.stringify(payload)); + const sig = b64url( + createHmac("sha256", opts.secret ?? SECRET) + .update(`${header}.${body}`) + .digest() + ); + return `${header}.${body}.${sig}`; +} + +function futureExp(): number { + return Math.floor(Date.now() / 1000) + 3600; +} + +const prevSecret = process.env.AUTH_JWT_SECRET; +const prevRequired = process.env.AGENT_AUTH_REQUIRED; + +beforeEach(() => { + delete process.env.AUTH_JWT_SECRET; + delete process.env.AGENT_AUTH_REQUIRED; +}); + +afterEach(() => { + if (prevSecret === undefined) delete process.env.AUTH_JWT_SECRET; + else process.env.AUTH_JWT_SECRET = prevSecret; + if (prevRequired === undefined) delete process.env.AGENT_AUTH_REQUIRED; + else process.env.AGENT_AUTH_REQUIRED = prevRequired; +}); + +describe("extractUserFromToken / getUidFromToken", () => { + test("maps claims to UserInfo", () => { + const token = signJwt({ sub: "alice", userId: 9, email: "a@b.c", role: "ADMIN", exp: futureExp() }); + expect(extractUserFromToken(token)).toEqual({ uid: 9, name: "alice", email: "a@b.c", role: "ADMIN" }); + expect(getUidFromToken(token)).toBe(9); + }); + + test("getUidFromToken returns undefined for a malformed token", () => { + expect(getUidFromToken("not-a-jwt")).toBeUndefined(); + }); +}); + +describe("isAuthRequired", () => { + test("defaults to false", () => { + expect(isAuthRequired()).toBe(false); + }); + + test("is true for 'true' or '1'", () => { + process.env.AGENT_AUTH_REQUIRED = "true"; + expect(isAuthRequired()).toBe(true); + process.env.AGENT_AUTH_REQUIRED = "1"; + expect(isAuthRequired()).toBe(true); + }); +}); + +describe("verifyToken", () => { + beforeEach(() => { + process.env.AUTH_JWT_SECRET = SECRET; + }); + + test("accepts a correctly signed, unexpired token", () => { + expect(verifyToken(signJwt({ sub: "u", userId: 1, exp: futureExp() }))).toBe(true); + }); + + test("rejects a token signed with the wrong secret", () => { + expect(verifyToken(signJwt({ sub: "u", userId: 1, exp: futureExp() }, { secret: "other" }))).toBe(false); + }); + + test("rejects an expired token", () => { + const exp = Math.floor(Date.now() / 1000) - 3600; + expect(verifyToken(signJwt({ sub: "u", userId: 1, exp }))).toBe(false); + }); + + test("rejects a token missing the subject claim", () => { + expect(verifyToken(signJwt({ userId: 1, exp: futureExp() }))).toBe(false); + }); + + test("rejects a non-HS256 algorithm", () => { + expect(verifyToken(signJwt({ sub: "u", userId: 1, exp: futureExp() }, { alg: "none" }))).toBe(false); + }); + + test("rejects a structurally invalid token", () => { + expect(verifyToken("a.b")).toBe(false); + }); + + test("returns false when no secret is configured", () => { + delete process.env.AUTH_JWT_SECRET; + expect(verifyToken(signJwt({ sub: "u", userId: 1, exp: futureExp() }))).toBe(false); + }); +}); + +describe("validateToken", () => { + test("enforced mode requires a valid signature", () => { + process.env.AUTH_JWT_SECRET = SECRET; + process.env.AGENT_AUTH_REQUIRED = "true"; + expect(validateToken(signJwt({ sub: "u", userId: 1, exp: futureExp() }))).toBe(true); + expect(validateToken(signJwt({ sub: "u", userId: 1, exp: futureExp() }, { secret: "x" }))).toBe(false); + }); + + test("permissive mode accepts any unexpired, decodable token", () => { + // No enforcement: a token signed with an arbitrary secret is still accepted + // as long as it is well-formed and unexpired (prior behavior). + expect(validateToken(signJwt({ sub: "u", userId: 1, exp: futureExp() }, { secret: "anything" }))).toBe(true); + }); + + test("permissive mode rejects an expired token", () => { + const exp = Math.floor(Date.now() / 1000) - 3600; + expect(validateToken(signJwt({ sub: "u", userId: 1, exp }))).toBe(false); + }); + + test("permissive mode rejects a malformed token", () => { + expect(validateToken("nonsense")).toBe(false); + }); +}); + +describe("createAuthHeaders", () => { + test("builds a Bearer header with JSON content type", () => { + expect(createAuthHeaders("t.o.k")).toEqual({ + Authorization: "Bearer t.o.k", + "Content-Type": "application/json", + }); + }); +}); diff --git a/agent-service/src/api/auth-api.ts b/agent-service/src/api/auth-api.ts index 087f93ac46f..0fa65cadcf8 100644 --- a/agent-service/src/api/auth-api.ts +++ b/agent-service/src/api/auth-api.ts @@ -17,22 +17,69 @@ * under the License. */ +import { createHmac, timingSafeEqual } from "crypto"; import type { UserInfo } from "../types/agent"; +import { createLogger } from "../logger"; export type { UserInfo } from "../types/agent"; -function decodeJWT(token: string): any { +const log = createLogger("Auth"); + +// Matches the backend JwtAuth (org.apache.texera.auth): HMAC-SHA256 over +// `${header}.${payload}` keyed by AUTH_JWT_SECRET (UTF-8), with a 30s clock skew. +const JWT_ALGORITHM = "HS256"; +const CLOCK_SKEW_SECONDS = 30; + +// Auth settings are read from the environment at call time (not cached) so that +// they can be toggled per request/per test without rebuilding the app. +function getJwtSecret(): string { + return process.env.AUTH_JWT_SECRET ?? ""; +} + +/** + * Whether the agent service enforces authentication and per-user isolation. + * + * Opt-in via AGENT_AUTH_REQUIRED so the feature can be deployed and the client + * updated before enforcement is switched on. When disabled the service keeps + * its previous permissive behavior. + */ +export function isAuthRequired(): boolean { + const v = process.env.AGENT_AUTH_REQUIRED; + return v === "true" || v === "1"; +} + +function base64UrlToBuffer(segment: string): Buffer { + return Buffer.from(segment.replace(/-/g, "+").replace(/_/g, "/"), "base64"); +} + +function base64UrlEncode(buf: Buffer): string { + return buf.toString("base64").replace(/=/g, "").replace(/\+/g, "-").replace(/\//g, "_"); +} + +interface JwtParts { + header: any; + payload: any; +} + +function decodeJwtParts(token: string): JwtParts { + const parts = token.split("."); + if (parts.length !== 3) { + throw new Error("Invalid JWT format"); + } try { - const parts = token.split("."); - if (parts.length !== 3) { - throw new Error("Invalid JWT format"); - } - return JSON.parse(Buffer.from(parts[1], "base64").toString("utf-8")); + return { + header: JSON.parse(base64UrlToBuffer(parts[0]).toString("utf-8")), + payload: JSON.parse(base64UrlToBuffer(parts[1]).toString("utf-8")), + }; } catch (error) { throw new Error(`Failed to decode JWT: ${error}`); } } +function decodeJWT(token: string): any { + return decodeJwtParts(token).payload; +} + export function extractUserFromToken(token: string): UserInfo { const payload = decodeJWT(token); return { @@ -43,18 +90,68 @@ export function extractUserFromToken(token: string): UserInfo { }; } -function isTokenExpired(token: string): boolean { +export function getUidFromToken(token: string): number | undefined { try { - const payload = decodeJWT(token); - if (!payload.exp) return false; - return Date.now() >= payload.exp * 1000; + const uid = decodeJWT(token).userId; + return typeof uid === "number" ? uid : undefined; } catch { - return true; + return undefined; } } +function isExpired(payload: any): boolean { + if (typeof payload.exp !== "number") return true; + return Math.floor(Date.now() / 1000) > payload.exp + CLOCK_SKEW_SECONDS; +} + +/** + * Cryptographically verify an HS256 JWT against AUTH_JWT_SECRET and check its + * expiry and required claims. Returns false (never throws) on any failure. + */ +export function verifyToken(token: string): boolean { + const secret = getJwtSecret(); + if (!secret) { + log.warn("token verification requested but AUTH_JWT_SECRET is not set"); + return false; + } + + const parts = token.split("."); + if (parts.length !== 3) return false; + + let parsed: JwtParts; + try { + parsed = decodeJwtParts(token); + } catch { + return false; + } + + if (parsed.header?.alg !== JWT_ALGORITHM) return false; + // The backend requires both a subject and an expiration time. + if (!parsed.payload?.sub || typeof parsed.payload?.exp !== "number") return false; + if (isExpired(parsed.payload)) return false; + + const expected = base64UrlEncode(createHmac("sha256", secret).update(`${parts[0]}.${parts[1]}`).digest()); + const provided = parts[2]; + if (expected.length !== provided.length) return false; + return timingSafeEqual(Buffer.from(expected), Buffer.from(provided)); +} + +/** + * Accept a token for use. When enforcement is on this requires a valid HS256 + * signature and a live expiry; otherwise it falls back to an expiry-only check + * to preserve the service's prior behavior. + */ export function validateToken(token: string): boolean { - return !isTokenExpired(token); + if (isAuthRequired()) { + return verifyToken(token); + } + try { + const payload = decodeJWT(token); + if (typeof payload.exp !== "number") return true; + return !isExpired(payload); + } catch { + return false; + } } export function createAuthHeaders(token: string): Record { diff --git a/agent-service/src/server.test.ts b/agent-service/src/server.test.ts index 0f618e599c2..79e6bbcc371 100644 --- a/agent-service/src/server.test.ts +++ b/agent-service/src/server.test.ts @@ -17,7 +17,8 @@ * under the License. */ -import { beforeEach, describe, expect, test } from "bun:test"; +import { afterAll, beforeAll, beforeEach, describe, expect, test } from "bun:test"; +import { createHmac } from "crypto"; import { buildApp, _resetAgentStoreForTests } from "./server"; import { env } from "./config/env"; @@ -86,20 +87,20 @@ describe(`POST ${API}/agents`, () => { state: string; delegate: unknown; }>(res); - expect(agent.id).toMatch(/^agent-\d+$/); + expect(agent.id).toMatch(/^agent-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/); expect(agent.name).toBe("Tester"); expect(agent.modelType).toBe("test-model"); expect(agent.state).toBe("AVAILABLE"); expect(agent.delegate).toBeUndefined(); }); - test("auto-numbers agent ids monotonically", async () => { + test("assigns a unique, non-guessable id to each agent", async () => { const a = await readJson<{ id: string }>(await postJson(`${API}/agents`, { modelType: "m" })); const b = await readJson<{ id: string }>(await postJson(`${API}/agents`, { modelType: "m" })); - const aNum = Number(a.id.split("-")[1]); - const bNum = Number(b.id.split("-")[1]); - expect(bNum).toBe(aNum + 1); + expect(a.id).not.toBe(b.id); + // UUID-based ids are not enumerable, unlike the previous sequential counter. + expect(a.id).toMatch(/^agent-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/); }); test("rejects invalid token", async () => { @@ -221,3 +222,110 @@ describe(`PATCH ${API}/agents/:id/settings`, () => { expect(reread.toolTimeoutSeconds).toBe(30); }); }); + +describe("access control (AGENT_AUTH_REQUIRED)", () => { + const SECRET = "test-secret-key-for-agent-service-access-control"; + const prevSecret = process.env.AUTH_JWT_SECRET; + const prevRequired = process.env.AGENT_AUTH_REQUIRED; + + function b64url(input: string | Buffer): string { + return Buffer.from(input).toString("base64").replace(/=/g, "").replace(/\+/g, "-").replace(/\//g, "_"); + } + + function signJwt(payload: Record, secret = SECRET): string { + const header = b64url(JSON.stringify({ alg: "HS256", typ: "JWT" })); + const body = b64url(JSON.stringify(payload)); + const sig = b64url(createHmac("sha256", secret).update(`${header}.${body}`).digest()); + return `${header}.${body}.${sig}`; + } + + function tokenFor(uid: number, secret = SECRET): string { + return signJwt({ sub: `user-${uid}`, userId: uid, exp: Math.floor(Date.now() / 1000) + 3600 }, secret); + } + + function authGet(path: string, token?: string): Promise { + const headers: Record = {}; + if (token) headers.Authorization = `Bearer ${token}`; + return app.handle(new Request(url(path), { headers })); + } + + async function createOwnedAgent(uid: number): Promise { + const res = await postJson(`${API}/agents`, { modelType: "m", userToken: tokenFor(uid) }); + expect(res.status).toBe(200); + return (await readJson<{ id: string }>(res)).id; + } + + beforeAll(() => { + process.env.AUTH_JWT_SECRET = SECRET; + process.env.AGENT_AUTH_REQUIRED = "true"; + }); + + afterAll(() => { + if (prevSecret === undefined) delete process.env.AUTH_JWT_SECRET; + else process.env.AUTH_JWT_SECRET = prevSecret; + if (prevRequired === undefined) delete process.env.AGENT_AUTH_REQUIRED; + else process.env.AGENT_AUTH_REQUIRED = prevRequired; + }); + + test("rejects agent creation without a token", async () => { + const res = await postJson(`${API}/agents`, { modelType: "m" }); + expect(res.status).toBe(401); + }); + + test("rejects a forged token (bad signature) at creation", async () => { + const forged = tokenFor(1, "the-wrong-secret"); + const res = await postJson(`${API}/agents`, { modelType: "m", userToken: forged }); + expect(res.status).toBe(401); + }); + + test("rejects an expired token at creation", async () => { + const expired = signJwt({ sub: "user-1", userId: 1, exp: Math.floor(Date.now() / 1000) - 3600 }); + const res = await postJson(`${API}/agents`, { modelType: "m", userToken: expired }); + expect(res.status).toBe(401); + }); + + test("an owner can read its own agent", async () => { + const id = await createOwnedAgent(1); + const res = await authGet(`${API}/agents/${id}`, tokenFor(1)); + expect(res.status).toBe(200); + }); + + test("a different user cannot read someone else's agent (403)", async () => { + const id = await createOwnedAgent(1); + const res = await authGet(`${API}/agents/${id}`, tokenFor(2)); + expect(res.status).toBe(403); + }); + + test("a request without a token is rejected (401)", async () => { + const id = await createOwnedAgent(1); + const res = await authGet(`${API}/agents/${id}`); + expect(res.status).toBe(401); + }); + + test("a control route is also guarded (stop -> 403 for non-owner)", async () => { + const id = await createOwnedAgent(1); + const res = await app.handle( + new Request(url(`${API}/agents/${id}/stop`), { + method: "POST", + headers: { "Content-Type": "application/json", Authorization: `Bearer ${tokenFor(2)}` }, + body: "{}", + }) + ); + expect(res.status).toBe(403); + }); + + test("listing is scoped to the caller's own agents", async () => { + const mine = await createOwnedAgent(1); + await createOwnedAgent(2); + + const res = await authGet(`${API}/agents`, tokenFor(1)); + expect(res.status).toBe(200); + const body = await readJson<{ agents: { id: string }[] }>(res); + expect(body.agents.map(a => a.id)).toEqual([mine]); + }); + + test("listing without a token is rejected (401)", async () => { + const res = await authGet(`${API}/agents`); + expect(res.status).toBe(401); + }); +}); diff --git a/agent-service/src/server.ts b/agent-service/src/server.ts index d5eeae82c9b..4dc00dec458 100644 --- a/agent-service/src/server.ts +++ b/agent-service/src/server.ts @@ -20,10 +20,11 @@ import { Elysia, t } from "elysia"; import { cors } from "@elysiajs/cors"; import { createOpenAI } from "@ai-sdk/openai"; +import { randomUUID } from "crypto"; import { TexeraAgent } from "./agent/texera-agent"; import { getVisibleResultHeaders } from "./agent/tools/tools-utility"; import { getBackendConfig } from "./api/backend-api"; -import { extractUserFromToken, validateToken } from "./api/auth-api"; +import { extractUserFromToken, validateToken, isAuthRequired, getUidFromToken } from "./api/auth-api"; import { retrieveWorkflow } from "./api/workflow-api"; import { WorkflowSystemMetadata } from "./agent/util/workflow-system-metadata"; import { env } from "./config/env"; @@ -42,16 +43,55 @@ import type { import { OperatorResultSerializationMode } from "./types/agent"; const agentStore = new Map(); -let agentCounter = 0; + +// agentId -> owning user's uid. Recorded at creation time from the delegate +// token, independently of whether a workflow was loaded, so ownership does not +// depend on the backend being reachable. +const agentOwners = new Map(); + +// Bearer token from the Authorization header (HTTP) or the access-token query +// parameter (WebSocket, since browsers cannot set headers on the WS handshake). +function extractBearerToken( + headers: Record | undefined, + query: Record | undefined +): string | undefined { + const auth = headers?.authorization; + if (auth && auth.startsWith("Bearer ")) { + const token = auth.slice("Bearer ".length).trim(); + if (token) return token; + } + const q = query?.["access-token"]; + return typeof q === "string" && q.length > 0 ? q : undefined; +} + +// Enforces authentication and per-user isolation. A no-op when AGENT_AUTH_REQUIRED +// is off, preserving the service's prior permissive behavior. Throws errors the +// router's onError maps to 401/403. +function authorizeAgentAccess(agentId: string, token: string | undefined): void { + if (!isAuthRequired()) return; + if (!token || !validateToken(token)) { + throw new Error("Unauthorized"); + } + const ownerUid = agentOwners.get(agentId); + // Ownerless agents (created before enforcement was enabled) are accessible to + // any authenticated user; owned agents only to their owner. + if (ownerUid !== undefined && getUidFromToken(token) !== ownerUid) { + throw new Error("Forbidden"); + } +} async function createAgentInstance( modelType: string, customName?: string, delegateConfig?: AgentDelegateConfig ): Promise<{ agentId: string; agent: TexeraAgent }> { - const agentId = `agent-${++agentCounter}`; + const agentId = `agent-${randomUUID()}`; const config = getBackendConfig(); + if (delegateConfig?.userInfo?.uid !== undefined) { + agentOwners.set(agentId, delegateConfig.userInfo.uid); + } + const openai = createOpenAI({ baseURL: `${config.modelsEndpoint}/api`, apiKey: env.LLM_API_KEY, @@ -153,6 +193,14 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) set.status = 401; return { error: "Invalid or expired token" }; } + if (errorMessage === "Unauthorized") { + set.status = 401; + return { error: "Unauthorized" }; + } + if (errorMessage === "Forbidden") { + set.status = 403; + return { error: "Forbidden" }; + } if (errorMessage === "modelType is required") { set.status = 400; return { error: "modelType is required" }; @@ -160,9 +208,33 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) set.status = 500; return { error: errorMessage || "Internal server error" }; }) - .get("/", () => { - const agentList = Array.from(agentStore.entries()).map(([id, agent]) => getAgentInfo(id, agent)); - return { agents: agentList }; + // Enforce ownership for every /:id route in one place. List and create carry + // no :id and are authorized in their own handlers. + .onBeforeHandle(({ params, headers, query }) => { + const id = (params as Record)?.id; + if (!id) return; + if (!agentStore.has(id)) throw new Error("Agent not found"); + authorizeAgentAccess(id, extractBearerToken(headers as any, query as any)); + }) + .get("/", ({ headers, query }) => { + const entries = Array.from(agentStore.entries()); + + if (!isAuthRequired()) { + return { agents: entries.map(([id, agent]) => getAgentInfo(id, agent)) }; + } + + const token = extractBearerToken(headers as any, query as any); + if (!token || !validateToken(token)) { + throw new Error("Unauthorized"); + } + // Scope the listing to the caller's own agents (plus any ownerless agents + // created before enforcement was enabled). + const uid = getUidFromToken(token); + const visible = entries.filter(([id]) => { + const ownerUid = agentOwners.get(id); + return ownerUid === undefined || ownerUid === uid; + }); + return { agents: visible.map(([id, agent]) => getAgentInfo(id, agent)) }; }) .post( @@ -174,6 +246,11 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) throw new Error("modelType is required"); } + // When enforcement is on, every agent must have an owner, so a token is required. + if (isAuthRequired() && !userToken) { + throw new Error("Unauthorized"); + } + let delegateConfig: AgentDelegateConfig | undefined; if (userToken) { if (!validateToken(userToken)) { @@ -257,6 +334,7 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) agent.destroy(); agentStore.delete(id); + agentOwners.delete(id); return { deleted: true }; }) @@ -495,6 +573,19 @@ export function buildApp() { return; } + // Browsers cannot set headers on a WS handshake, so the token is read + // from the access-token query parameter (consistent with the other + // Texera websocket clients). + try { + const token = extractBearerToken((ws.data as any).headers, (ws.data as any).query); + authorizeAgentAccess(agentId, token); + } catch (error) { + const message = error instanceof Error ? error.message : "Unauthorized"; + ws.send(JSON.stringify({ type: "error", error: message })); + ws.close(); + return; + } + agent.addWebsocket(ws); const initMessage: WsOutgoingMessage = { @@ -595,7 +686,7 @@ export function buildApp() { // Reset module-level state. Used by tests to start each case from a clean store. export function _resetAgentStoreForTests(): void { agentStore.clear(); - agentCounter = 0; + agentOwners.clear(); } function printStartupMessage(app: ReturnType) { diff --git a/frontend/src/app/workspace/service/agent/agent.service.ts b/frontend/src/app/workspace/service/agent/agent.service.ts index 2009734030b..2ca2df8b01f 100644 --- a/frontend/src/app/workspace/service/agent/agent.service.ts +++ b/frontend/src/app/workspace/service/agent/agent.service.ts @@ -236,10 +236,16 @@ export class AgentService { /** * Build HTTP headers for agent-service requests. - * Includes X-Agent-Workflow-Id for consistent hash routing in k8s. + * Includes the user's bearer token (used for access control when the agent + * service has AGENT_AUTH_REQUIRED enabled) and X-Agent-Workflow-Id for + * consistent hash routing in k8s. */ private agentHeaders(agentId?: string): { headers: HttpHeaders } { let headers = new HttpHeaders(); + const token = AuthService.getAccessToken(); + if (token) { + headers = headers.set("Authorization", `Bearer ${token}`); + } if (agentId) { const wid = this.agentStateTracking.get(agentId)?.workflowId; if (wid !== undefined) { @@ -256,7 +262,7 @@ export class AgentService { */ private syncAgentsWithBackend(): void { this.http - .get(`${this.AGENT_API_BASE}/agents`) + .get(`${this.AGENT_API_BASE}/agents`, this.agentHeaders()) .pipe(catchError(() => of({ agents: [] }))) .subscribe(response => { const backendAgentIds = new Set(response.agents.map(a => a.id)); @@ -408,9 +414,13 @@ export class AgentService { * Start WebSocket connection for real-time ReActSteps updates */ private startStatePolling(agentId: string, tracking: AgentStateTracking): void { - // Build WebSocket URL + // Build WebSocket URL. Browsers cannot set headers on the WS handshake, so + // the bearer token is passed as the access-token query parameter (matching + // the other Texera websocket clients) for access control. const wsProtocol = window.location.protocol === "https:" ? "wss:" : "ws:"; - const wsUrl = `${wsProtocol}//${window.location.host}${this.AGENT_API_BASE}/agents/${agentId}/react`; + const token = AuthService.getAccessToken(); + const tokenParam = token ? `?access-token=${encodeURIComponent(token)}` : ""; + const wsUrl = `${wsProtocol}//${window.location.host}${this.AGENT_API_BASE}/agents/${agentId}/react${tokenParam}`; const ws = new WebSocket(wsUrl); tracking.websocket = ws; @@ -792,7 +802,7 @@ export class AgentService { * Also syncs local cache with backend - removes any stale agents that no longer exist on the backend. */ public getAllAgents(): Observable { - return this.http.get(`${this.AGENT_API_BASE}/agents`).pipe( + return this.http.get(`${this.AGENT_API_BASE}/agents`, this.agentHeaders()).pipe( map(response => { const agents = response.agents.map(a => ({ id: a.id,