diff --git a/README.md b/README.md index ec10357..c443372 100644 --- a/README.md +++ b/README.md @@ -274,7 +274,7 @@ Then invite the app to the channel and pair in that channel/thread with `relay p | create delegation task (opt-in shared rooms) | `/delegate ` | `relay delegate ` | `/relay delegate ` | | control delegation task | `/task@ [task-id]` (or `/task [task-id]` in private/other clients) | `relay task [task-id]` | `/relay task [task-id]` | -`quiet`, `normal`, `verbose`, and `completion-only` are valid progress modes. Progress mode controls non-terminal progress noise: quiet suppresses progress updates, completion-only sends final results plus safe compaction notifications, normal sends coalesced milestone progress, and verbose additionally includes safe visible model/tool snapshots at a shorter interval. Progress updates are deduplicated/coalesced, and Telegram live progress is edited in place where possible instead of posting every raw Pi stream event. Terminal notifications still deliver the final assistant answer when it fits safe platform limits, splitting by paragraphs within platform limits and falling back to a Markdown document when an adapter supports files and the output is too large for a reasonable chat burst. +`quiet`, `normal`, `verbose`, and `completion-only` are valid progress modes. Progress mode controls non-terminal progress noise: quiet suppresses progress updates, completion-only sends final results plus safe compaction notifications, normal sends coalesced milestone progress, and verbose additionally includes safe visible model/tool snapshots at a shorter interval. Progress updates are deduplicated/coalesced, and supported messengers update the same live progress message in place where possible instead of posting every raw Pi stream event. Terminal notifications still deliver the final assistant answer when it fits safe platform limits, splitting by paragraphs within platform limits and falling back to a Markdown document when an adapter supports files and the output is too large for a reasonable chat burst. Remote `/disconnect` is scoped to the requesting chat/conversation only: it revokes that Telegram, Discord, or Slack binding and suppresses future session output/buttons there, without disconnecting other messengers that remain paired to the same Pi session. Local `/relay disconnect` is broader and disconnects the current session from all paired messenger bindings. diff --git a/extensions/relay/adapters/discord/adapter.ts b/extensions/relay/adapters/discord/adapter.ts index 50a8f61..65d2525 100644 --- a/extensions/relay/adapters/discord/adapter.ts +++ b/extensions/relay/adapters/discord/adapter.ts @@ -8,6 +8,7 @@ import type { ChannelInboundFile, ChannelInboundHandler, ChannelInboundMessage, + ChannelLiveProgressRef, ChannelOutboundFile, ChannelOutboundPayload, ChannelRouteAddress, @@ -21,13 +22,24 @@ import { DEFAULT_CONVERTIBLE_INBOUND_IMAGE_MIME_TYPES, acceptedInboundImageForma export interface DiscordApiOperations { connect?(handler: (event: DiscordGatewayEvent) => Promise): Promise; disconnect?(): Promise; - sendMessage(payload: DiscordSendMessagePayload): Promise; + sendMessage(payload: DiscordSendMessagePayload): Promise; + editMessage?: (payload: DiscordEditMessagePayload) => Promise; sendFile(payload: DiscordSendFilePayload): Promise; sendTyping(channelId: string): Promise; answerInteraction(interactionId: string, interactionToken: string | undefined, options?: { text?: string; alert?: boolean }): Promise; downloadFile?(url: string): Promise; } +export interface DiscordSendMessageResult { + messageId?: string; +} + +export interface DiscordEditMessagePayload { + channelId: string; + messageId: string; + content: string; +} + export interface DiscordGatewayEvent { type: "message" | "interaction"; payload: DiscordMessagePayload | DiscordInteractionPayload; @@ -166,6 +178,20 @@ export class DiscordChannelAdapter implements ChannelAdapter { await this.sendDocument(address, file, options); } + async sendLiveProgress(address: ChannelRouteAddress, text: string): Promise { + const result = await this.api.sendMessage({ channelId: address.conversationId, content: escapeDiscordPlainText(text) }); + return result && typeof result === "object" && typeof result.messageId === "string" && result.messageId.length > 0 + ? { messageId: result.messageId } + : undefined; + } + + async updateLiveProgress(address: ChannelRouteAddress, ref: ChannelLiveProgressRef, text: string): Promise { + if (!this.api.editMessage) { + throw new Error("Discord message updates are not supported by this adapter configuration."); + } + await this.api.editMessage({ channelId: address.conversationId, messageId: ref.messageId, content: escapeDiscordPlainText(text) }); + } + async sendActivity(address: ChannelRouteAddress, _activity: "typing" | "uploading" | "recording" = "typing"): Promise { await this.api.sendTyping(address.conversationId); } diff --git a/extensions/relay/adapters/discord/live-client.ts b/extensions/relay/adapters/discord/live-client.ts index 481b64f..05a17e3 100644 --- a/extensions/relay/adapters/discord/live-client.ts +++ b/extensions/relay/adapters/discord/live-client.ts @@ -20,9 +20,11 @@ import type { DiscordButtonComponent, DiscordGatewayEvent, DiscordInteractionPayload, + DiscordEditMessagePayload, DiscordMessagePayload, DiscordSendFilePayload, DiscordSendMessagePayload, + DiscordSendMessageResult, } from "./adapter.js"; import { redactSecrets } from "../../config/setup.js"; import type { DiscordRelayConfig } from "../../core/types.js"; @@ -119,12 +121,23 @@ export class DiscordLiveOperations implements DiscordApiOperations { this.client.destroy(); } - async sendMessage(payload: DiscordSendMessagePayload): Promise { + async sendMessage(payload: DiscordSendMessagePayload): Promise { const channel = await this.fetchTextChannel(payload.channelId); const options: MessageCreateOptions = { content: payload.content || " ", allowedMentions: { parse: [] } }; const components = discordActionRows(payload.components ?? []); if (components.length > 0) options.components = components; - await channel.send(options); + const message = await channel.send(options); + return message?.id ? { messageId: message.id } : {}; + } + + async editMessage(payload: DiscordEditMessagePayload): Promise { + const channel = await this.fetchTextChannel(payload.channelId); + const fetchableMessages = channel.messages; + if (!fetchableMessages?.fetch) { + throw new Error(`Discord channel ${payload.channelId} does not support message fetching for edits.`); + } + const message = await fetchableMessages.fetch(payload.messageId); + await message.edit({ content: payload.content || " ", allowedMentions: { parse: [] } }); } async sendFile(payload: DiscordSendFilePayload): Promise { @@ -314,8 +327,19 @@ function discordButtonStyle(style: DiscordButtonComponent["style"]): ButtonStyle } interface SendableDiscordTextChannel { - send(options: MessageCreateOptions): Promise; + send(options: MessageCreateOptions): Promise; sendTyping(): Promise; + messages?: { + fetch(messageId: string): Promise; + }; +} + +interface DiscordSentMessageLike { + id?: string; +} + +interface DiscordEditableMessageLike { + edit(options: { content: string; allowedMentions: { parse: [] } }): Promise; } interface DiscordApplicationCommandManagerLike { diff --git a/extensions/relay/adapters/discord/runtime.ts b/extensions/relay/adapters/discord/runtime.ts index c5c8070..a7be116 100644 --- a/extensions/relay/adapters/discord/runtime.ts +++ b/extensions/relay/adapters/discord/runtime.ts @@ -10,6 +10,7 @@ import { delegationTaskActionButtons, parseDelegationInvocation, renderDelegatio import { formatFullOutput, formatLatestImageEmptyMessage, formatRelayRecentActivity, formatRelayStatusForRoute, formatSessionSelectorError, formatSummaryOutput } from "../../formatting/presenters.js"; import { formatSessionList, resolveSessionSelector, resolveSessionTargetArgs, type SessionListEntry } from "../../core/session-selection.js"; import { displayProgressMode, formatProgressUpdate, normalizeProgressMode, progressIntervalMsFor, progressModeFor, shouldSendProgressActivity } from "../../notifications/progress.js"; +import { deliverLiveProgress, type LiveProgressDeliveryState } from "../../notifications/progress-delivery.js"; import { sendFinalOutputWithFallback } from "../../core/final-output.js"; import { formatRelayLifecycleNotification, type RelayLifecycleEventKind } from "../../notifications/lifecycle.js"; import { abortRouteSafely, compactRouteSafely, deliverRoutePrompt, latestRouteImagesSafely, probeRouteAvailability, routeActionDisplayMessage, routeIdleState, routeImageByPathSafely, routeModelState, routeSkillCommandsSafely, routeWorkspaceRootSafely, unavailableRouteMessage } from "../../core/route-actions.js"; @@ -73,7 +74,7 @@ export class DiscordRuntime { private readonly activeSessionByConversationUser = new Map(); private readonly recentBindingBySessionKey = new Map(); private readonly typingStates = new Map }>(); - private readonly progressStates = new Map }>(); + private readonly progressStates = new Map(); private readonly invalidPairingAttempts = new Map(); private readonly activeDelegationTaskBySessionKey = new Map(); private started = false; @@ -1508,9 +1509,32 @@ export class DiscordRuntime { } if (!this.adapter) return; const text = formatProgressUpdate(pending, this.config, { header: false }); - if (!text) return; + if (!text || !this.adapter) return; state.lastSentAt = Date.now(); - await this.adapter.sendText(bindingAddress(binding), text); + const address = bindingAddress(binding); + const adapter: { + sendText: DiscordChannelAdapter["sendText"]; + sendLiveProgress?: DiscordChannelAdapter["sendLiveProgress"]; + updateLiveProgress?: DiscordChannelAdapter["updateLiveProgress"]; + } = this.adapter; + const sendLiveProgress = adapter.sendLiveProgress + ? async (nextText: string) => { + const ref = await adapter.sendLiveProgress?.(address, nextText); + return ref?.messageId; + } + : undefined; + const updateLiveProgress = state.liveMessageRef && adapter.updateLiveProgress + ? (ref: string, nextText: string) => adapter.updateLiveProgress?.(address, { messageId: ref }, nextText) ?? Promise.reject(new Error("Discord live progress updates are unavailable.")) + : undefined; + await deliverLiveProgress( + state, + text, + { + sendLiveProgress, + updateLiveProgress, + sendProgressSnapshot: (nextText) => adapter.sendText(address, nextText), + }, + ); } private startTypingActivity(route: SessionRoute, address: ChannelRouteAddress): void { diff --git a/extensions/relay/adapters/slack/adapter.ts b/extensions/relay/adapters/slack/adapter.ts index 081998a..88b84fe 100644 --- a/extensions/relay/adapters/slack/adapter.ts +++ b/extensions/relay/adapters/slack/adapter.ts @@ -9,6 +9,7 @@ import type { ChannelInboundFile, ChannelInboundHandler, ChannelInboundMessage, + ChannelLiveProgressRef, ChannelOutboundFile, ChannelOutboundPayload, ChannelRouteAddress, @@ -23,7 +24,8 @@ export interface SlackApiOperations { startSocketMode?(handler: (event: SlackEnvelope) => Promise): Promise; stopSocketMode?(): Promise; authTest?(): Promise; - postMessage(payload: SlackPostMessagePayload): Promise; + postMessage(payload: SlackPostMessagePayload): Promise; + updateMessage?(payload: SlackUpdateMessagePayload): Promise; uploadFile(payload: SlackUploadFilePayload): Promise; addReaction?(payload: SlackReactionPayload): Promise; removeReaction?(payload: SlackReactionPayload): Promise; @@ -32,6 +34,17 @@ export interface SlackApiOperations { downloadFile?(url: string): Promise; } +export interface SlackPostMessageResult { + ts?: string; +} + +export interface SlackUpdateMessagePayload { + channel: string; + ts: string; + text: string; + blocks?: SlackBlock[]; +} + export interface SlackReactionPayload { channel: string; timestamp: string; @@ -217,6 +230,18 @@ export class SlackChannelAdapter implements ChannelAdapter { if (options?.buttons) await this.sendButtonPrompt(address, options.buttons); } + async sendLiveProgress(address: ChannelRouteAddress, text: string): Promise { + const result = await this.api.postMessage({ channel: address.conversationId, text, threadTs: slackThreadTs(address) }); + return typeof result === "object" && result?.ts ? { messageId: result.ts } : undefined; + } + + async updateLiveProgress(address: ChannelRouteAddress, ref: ChannelLiveProgressRef, text: string): Promise { + if (!this.api.updateMessage) { + throw new Error("Slack message updates are not supported by this adapter configuration."); + } + await this.api.updateMessage({ channel: address.conversationId, ts: ref.messageId, text }); + } + async sendImage(address: ChannelRouteAddress, file: ChannelOutboundFile, options?: { caption?: string; buttons?: ChannelButtonLayout }): Promise { assertCanSendOutboundFile(this, file, "image"); await this.sendDocument(address, file, options); diff --git a/extensions/relay/adapters/slack/live-client.ts b/extensions/relay/adapters/slack/live-client.ts index 5f5f49e..e5d6916 100644 --- a/extensions/relay/adapters/slack/live-client.ts +++ b/extensions/relay/adapters/slack/live-client.ts @@ -1,5 +1,15 @@ import { appendFileSync } from "node:fs"; -import type { SlackApiOperations, SlackAuthTestResult, SlackEnvelope, SlackPostEphemeralPayload, SlackPostMessagePayload, SlackReactionPayload, SlackUploadFilePayload } from "./adapter.js"; +import type { + SlackApiOperations, + SlackAuthTestResult, + SlackEnvelope, + SlackPostEphemeralPayload, + SlackPostMessagePayload, + SlackPostMessageResult, + SlackReactionPayload, + SlackUpdateMessagePayload, + SlackUploadFilePayload, +} from "./adapter.js"; import { redactSecrets } from "../../config/setup.js"; import type { SlackRelayConfig } from "../../core/types.js"; @@ -87,8 +97,14 @@ export class SlackLiveOperations implements SlackApiOperations { }; } - async postMessage(payload: SlackPostMessagePayload): Promise { - await this.callSlackApi("chat.postMessage", this.botToken, { channel: payload.channel, text: payload.text, thread_ts: payload.threadTs, blocks: payload.blocks ? slackBlocks(payload.blocks) : undefined }); + async postMessage(payload: SlackPostMessagePayload): Promise { + const response = await this.callSlackApi("chat.postMessage", this.botToken, { channel: payload.channel, text: payload.text, thread_ts: payload.threadTs, blocks: payload.blocks ? slackBlocks(payload.blocks) : undefined }); + const ts = stringField(response, "ts"); + return ts ? { ts } : {}; + } + + async updateMessage(payload: SlackUpdateMessagePayload): Promise { + await this.callSlackApi("chat.update", this.botToken, { channel: payload.channel, ts: payload.ts, text: payload.text, blocks: payload.blocks ? slackBlocks(payload.blocks) : undefined }); } async uploadFile(payload: SlackUploadFilePayload): Promise { diff --git a/extensions/relay/adapters/slack/runtime.ts b/extensions/relay/adapters/slack/runtime.ts index 09199b3..3ff7e79 100644 --- a/extensions/relay/adapters/slack/runtime.ts +++ b/extensions/relay/adapters/slack/runtime.ts @@ -10,6 +10,7 @@ import { delegationTaskActionButtons, parseDelegationInvocation, renderDelegatio import { formatFullOutput, formatLatestImageEmptyMessage, formatRelayRecentActivity, formatRelayStatusForRoute, formatSessionSelectorError, formatSummaryOutput, sessionEntryForRoute } from "../../formatting/presenters.js"; import { formatSessionList, resolveSessionSelector, resolveSessionTargetArgs, type SessionListEntry } from "../../core/session-selection.js"; import { displayProgressMode, formatProgressUpdate, normalizeProgressMode, progressIntervalMsFor, progressModeFor, shouldSendProgressActivity } from "../../notifications/progress.js"; +import { deliverLiveProgress, type LiveProgressDeliveryState } from "../../notifications/progress-delivery.js"; import { sendFinalOutputWithFallback } from "../../core/final-output.js"; import { deliverWorkspaceFileToRequester, formatRequesterFileDeliveryResult, parseRemoteSendFileArgs, type RelayFileDeliveryRequester } from "../../core/requester-file-delivery.js"; import { abortRouteSafely, compactRouteSafely, deliverRoutePrompt, latestRouteImagesSafely, probeRouteAvailability, routeActionDisplayMessage, routeIdleState, routeImageByPathSafely, routeModelState, routeSkillCommandsSafely, routeWorkspaceRootSafely, unavailableRouteMessage } from "../../core/route-actions.js"; @@ -82,7 +83,7 @@ export class SlackRuntime { private readonly seenEventKeys = new Map(); private readonly consumedResponseUrls = new Map(); private readonly thinkingReactions = new Map(); - private readonly progressStates = new Map; timer?: ReturnType; lastSentAt?: number }>(); + private readonly progressStates = new Map(); private readonly activeDelegationTaskBySessionKey = new Map(); private botIdentity?: SlackAuthTestResult; private started = false; @@ -1541,7 +1542,30 @@ export class SlackRuntime { const text = formatProgressUpdate(pending, this.config, { header: false }); if (!text || !this.adapter) return; state.lastSentAt = Date.now(); - await this.adapter.sendText(bindingAddress(binding), text); + const address = bindingAddress(binding); + const adapter: { + sendText: SlackChannelAdapter["sendText"]; + sendLiveProgress?: SlackChannelAdapter["sendLiveProgress"]; + updateLiveProgress?: SlackChannelAdapter["updateLiveProgress"]; + } = this.adapter; + const sendLiveProgress = adapter.sendLiveProgress + ? async (nextText: string) => { + const ref = await adapter.sendLiveProgress?.(address, nextText); + return ref?.messageId; + } + : undefined; + const updateLiveProgress = state.liveMessageRef && adapter.updateLiveProgress + ? (ref: string, nextText: string) => adapter.updateLiveProgress?.(address, { messageId: ref }, nextText) ?? Promise.reject(new Error("Slack live progress updates are unavailable.")) + : undefined; + await deliverLiveProgress( + state, + text, + { + sendLiveProgress, + updateLiveProgress, + sendProgressSnapshot: (nextText) => adapter.sendText(address, nextText), + }, + ); } private async startThinkingReaction(route: SessionRoute, message: ChannelInboundMessage): Promise { diff --git a/extensions/relay/adapters/telegram/runtime.ts b/extensions/relay/adapters/telegram/runtime.ts index 432b6e2..5f27176 100644 --- a/extensions/relay/adapters/telegram/runtime.ts +++ b/extensions/relay/adapters/telegram/runtime.ts @@ -89,6 +89,7 @@ import { recentActivityLimit, shouldSendProgressActivity, } from "../../notifications/progress.js"; +import { deliverLiveProgress, type LiveProgressDeliveryState } from "../../notifications/progress-delivery.js"; import { buildImagePromptContent, createTurnId, @@ -107,14 +108,7 @@ const TELEGRAM_ACTIVITY_INITIAL_REFRESH_MS = 1_200; const TELEGRAM_ACTIVITY_REFRESH_MS = 4_000; const CUSTOM_ANSWER_EXPIRY_MS = 10 * 60_000; -type TelegramProgressDeliveryState = { - lastEventId?: string; - pending: NonNullable; - timer?: ReturnType; - lastSentAt?: number; - liveMessageId?: number; - lastText?: string; -}; +type TelegramProgressDeliveryState = LiveProgressDeliveryState; const ANSWER_AMBIGUITY_EXPIRY_MS = 5 * 60_000; interface TelegramGroupCommandTarget { @@ -729,37 +723,32 @@ export class InProcessTunnelRuntime implements TunnelRuntime { } state.lastSentAt = Date.now(); const messageText = `${this.sourcePrefixForRoute(route)}${text}`; - await this.deliverProgressSnapshot(chatId, state, messageText); - } - - private async deliverProgressSnapshot(chatId: number, state: TelegramProgressDeliveryState, messageText: string): Promise { - // Live progress is best-effort: prefer edit-in-place, then editable send, then a plain snapshot. - if (state.lastText === messageText) return; - const editableApi = this.api as TelegramApiClient & { sendEditablePlainText?: (chatId: number, text: string) => Promise; editPlainText?: (chatId: number, messageId: number, text: string) => Promise }; - if (state.liveMessageId && editableApi.editPlainText) { - try { - await editableApi.editPlainText(chatId, state.liveMessageId, messageText); - state.lastText = messageText; - return; - } catch { - state.liveMessageId = undefined; - } - } - if (editableApi.sendEditablePlainText) { - try { - state.liveMessageId = await editableApi.sendEditablePlainText(chatId, messageText); - state.lastText = messageText; - return; - } catch { - state.liveMessageId = undefined; - } - } - try { - await this.api.sendPlainText(chatId, messageText); - state.lastText = messageText; - } catch { - state.liveMessageId = undefined; - } + const editableApi = this.api as TelegramApiClient & { + sendEditablePlainText?: (chatId: number, text: string) => Promise; + editPlainText?: (chatId: number, messageId: number, text: string) => Promise; + }; + await deliverLiveProgress( + state, + messageText, + { + updateLiveProgress: state.liveMessageRef && editableApi.editPlainText + ? async (ref, nextText) => { + const messageId = Number(ref); + if (!Number.isSafeInteger(messageId)) { + throw new Error("Telegram live message id is not numeric."); + } + await editableApi.editPlainText(chatId, messageId, nextText); + } + : undefined, + sendLiveProgress: editableApi.sendEditablePlainText + ? async (nextText) => { + const messageId = await editableApi.sendEditablePlainText(chatId, nextText); + return Number.isSafeInteger(messageId) ? String(messageId) : undefined; + } + : undefined, + sendProgressSnapshot: (nextText) => this.api.sendPlainText(chatId, nextText), + }, + ); } private async acquireLock(): Promise { diff --git a/extensions/relay/notifications/progress-delivery.ts b/extensions/relay/notifications/progress-delivery.ts new file mode 100644 index 0000000..4176953 --- /dev/null +++ b/extensions/relay/notifications/progress-delivery.ts @@ -0,0 +1,52 @@ +import type { ProgressActivityEntry } from "../core/types.js"; + +export interface LiveProgressDeliveryState { + lastEventId?: string; + pending: ProgressActivityEntry[]; + timer?: ReturnType; + lastSentAt?: number; + lastText?: string; + liveMessageRef?: Ref; +} + +export interface LiveProgressDeliveryActions { + sendLiveProgress?: (text: string) => Promise; + updateLiveProgress?: (ref: Ref, text: string) => Promise; + sendProgressSnapshot: (text: string) => Promise; +} + +export async function deliverLiveProgress( + state: LiveProgressDeliveryState, + text: string, + actions: LiveProgressDeliveryActions, +): Promise { + if (state.lastText === text) return; + + if (state.liveMessageRef && actions.updateLiveProgress) { + try { + await actions.updateLiveProgress(state.liveMessageRef, text); + state.lastText = text; + return; + } catch { + state.liveMessageRef = undefined; + } + } + + if (actions.sendLiveProgress) { + try { + const ref = await actions.sendLiveProgress(text); + state.liveMessageRef = typeof ref === "string" && ref.length > 0 ? ref : undefined; + state.lastText = text; + return; + } catch { + state.liveMessageRef = undefined; + } + } + + try { + await actions.sendProgressSnapshot(text); + state.lastText = text; + } catch { + state.liveMessageRef = undefined; + } +} diff --git a/openspec/changes/update-live-progress-across-messengers/.openspec.yaml b/openspec/changes/update-live-progress-across-messengers/.openspec.yaml new file mode 100644 index 0000000..c86b1d7 --- /dev/null +++ b/openspec/changes/update-live-progress-across-messengers/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-06-14 diff --git a/openspec/changes/update-live-progress-across-messengers/design.md b/openspec/changes/update-live-progress-across-messengers/design.md new file mode 100644 index 0000000..598c25b --- /dev/null +++ b/openspec/changes/update-live-progress-across-messengers/design.md @@ -0,0 +1,96 @@ +## Context + +The live-progress coalescing work introduced a shared progress model and Telegram edit-in-place delivery. Telegram direct and broker runtimes now maintain a live message reference and update it when possible, while Slack and Discord still send a new coalesced snapshot for each eligible progress flush. This means the user experience remains inconsistent: Telegram can show one evolving progress card, but Slack and Discord can still accumulate progress messages. + +Slack and Discord both support updating bot-owned messages, but PiRelay's current operation contracts do not expose the required message references. Slack `postMessage` returns `ts` and `chat.update` can update the message. Discord message create returns an `id`, and the bot can edit its own channel message with that id. + +## Goals / Non-Goals + +**Goals:** + +- Provide edit/update-in-place live progress for Slack and Discord where platform APIs support it. +- Keep progress delivery semantics consistent across Telegram, Slack, Discord, and broker paths. +- Use a single clear fallback invariant: update existing live progress, send a live/editable progress message, send a plain snapshot, then swallow final failure. +- Preserve per-binding progress mode filtering, coalescing, rate limiting, destination scoping, and binding authority checks. +- Keep final assistant output separate from live progress updates. +- Add parity tests for update success, update failure fallback, live-send failure fallback, unsupported update fallback, and filtered-empty cleanup. + +**Non-Goals:** + +- Do not merge live progress into terminal completion/failure/abort messages. +- Do not add progress updates for `quiet` or ordinary tool progress for `completion-only`. +- Do not require every future adapter to support editing; unsupported adapters must keep safe snapshot fallback. +- Do not persist live progress message references in tunnel state. +- Do not add external dependencies. + +## Decisions + +### 1. Treat live-progress editing as an optional channel capability + +Slack and Discord should implement the existing optional `ChannelAdapter.sendLiveProgress()` and `ChannelAdapter.updateLiveProgress()` capability, returning a `ChannelLiveProgressRef` when a live message reference is available. Unsupported adapters continue to use plain `sendText()` snapshots. + +Rationale: the channel adapter contract already models optional live progress editing. Reusing it avoids creating messenger-specific runtime APIs for every platform. + +Alternative considered: copy Telegram runtime-specific methods into Slack and Discord runtimes. Rejected because that spreads the same fallback state machine across more places. + +### 2. Extend platform operations to return message references + +Slack `postMessage` should return a small result containing `ts` when available. Discord `sendMessage` should return a small result containing the created message `id` when available. Live operations add update/edit methods: + +- Slack: `updateMessage({ channel, ts, text, blocks? })` +- Discord: `editMessage({ channelId, messageId, content, components? })` + +Rationale: runtimes cannot update a message unless the adapter returns a platform reference from the original send. + +Alternative considered: parse references from inbound events or store synthetic references only in tests. Rejected because live outbound messages need real platform refs. + +### 3. Use a shared TypeScript helper for live progress delivery when practical + +Introduce a pure or near-pure helper for TypeScript runtimes that owns the fallback ladder: + +```text +if text is unchanged: return +if ref exists: try update; on failure clear ref +try sendLive; on success store ref and text +try sendPlain; on success store text without ref +if all fail: clear ref and return +``` + +Broker can keep a small JavaScript mirror or call a JS-compatible helper if practical. + +Rationale: repeated Copilot feedback showed the fallback state machine is easy to get subtly wrong. Centralizing it reduces drift. + +Alternative considered: leave per-runtime implementations. Rejected as likely to repeat the same bugs across Slack and Discord. + +### 4. Keep refs scoped to destination state only + +Live progress refs remain in memory in the per-destination progress state. They are cleared on terminal route state, route unregister, binding mismatch, paused/revoked/moved state, filtered-empty pending progress, update failure, and runtime stop. + +Rationale: message ids/timestamps are not secrets, but persisting them is unnecessary and can create stale cross-turn behavior. + +### 5. Use platform-specific formatting only at the adapter edge + +The shared progress formatter produces safe bounded text. Adapters may transform it into Slack blocks or Discord components only when equivalent text remains available as fallback. + +Rationale: progress should remain safe and readable across clients and logs. + +## Risks / Trade-offs + +- **Risk: Slack `chat.update` may fail for deleted/old/non-bot messages** → Clear the ref and fall back to a new live/plain snapshot. +- **Risk: Discord message editing may fail due to permissions or message deletion** → Clear the ref and fall back to a new live/plain snapshot. +- **Risk: Edits may not notify users** → This is intended for progress; terminal completion still sends a separate message. +- **Risk: Operation signature changes ripple through tests/fakes** → Keep return shapes optional and backward-compatible where possible. +- **Risk: Shared helper over-abstracts platform behavior** → Keep helper focused on fallback state only; adapters still own platform API calls and formatting. +- **Risk: Progress card can outlive a turn** → Clear refs on terminal state, unregister, and binding authority changes. + +## Migration Plan + +- No persisted state migration is required. +- Existing Slack/Discord progress behavior remains safe while live-update support is absent or disabled. +- Rollback is safe: runtimes can fall back to `sendText()` snapshots if update methods are removed or fail. + +## Open Questions + +- Should Slack progress use plain text only or Block Kit formatting? Initial implementation should prefer plain text to minimize update complexity. +- Should Discord progress include components? Initial implementation should avoid components for progress to simplify edits. +- Should compaction progress edit from started to completed, or remain separate durable messages? Initial behavior should follow the same progress mode and live card policy, with terminal/final output still separate. diff --git a/openspec/changes/update-live-progress-across-messengers/proposal.md b/openspec/changes/update-live-progress-across-messengers/proposal.md new file mode 100644 index 0000000..6d78552 --- /dev/null +++ b/openspec/changes/update-live-progress-across-messengers/proposal.md @@ -0,0 +1,33 @@ +## Why + +PiRelay now coalesces live progress and Telegram can edit a single live progress message, but Slack and Discord still receive repeated progress snapshots. This creates inconsistent UX across messengers and leaves the original chat-noise problem only partially solved. + +## What Changes + +- Extend Slack and Discord live operations to return message references from outbound bot messages and to update bot-owned messages when the platform supports it. +- Implement `sendLiveProgress` / `updateLiveProgress` capability for Slack and Discord channel adapters using Slack `chat.update` and Discord message edit APIs. +- Update Slack and Discord runtimes to keep per-destination live progress references and update one live progress card instead of posting a new progress snapshot for every flush. +- Preserve the same fallback invariant across messengers: try edit-in-place, then send a live/editable progress message, then send a plain snapshot, and swallow final failures because progress is best-effort. +- Keep terminal completion/failure/abort output separate from live progress messages. +- Preserve authorization, binding authority, paused/revoked/moved checks, destination scoping, rate limiting, progress-mode filtering, and secret-safe formatting. + +## Capabilities + +### New Capabilities + + + +### Modified Capabilities + +- `messenger-relay-sessions`: Shared progress delivery semantics require edit/update-in-place where a live messenger platform supports bot-message updates. +- `relay-channel-adapters`: Channel adapter contracts and parity expectations cover optional live progress references and update fallback behavior for Telegram, Slack, Discord, and future adapters. +- `slack-runtime-client`: Slack live operations support bot-message references and `chat.update` for live progress updates. +- `discord-runtime-client`: Discord live operations support bot-message references and bot-message edit for live progress updates. + +## Impact + +- Affected adapter contracts: Slack and Discord operation interfaces gain optional message-reference return/update operations; existing tests and fakes must be updated. +- Affected runtimes: Slack and Discord progress state stores live message refs and uses edit/update fallback behavior. +- Affected live clients: Slack adds `chat.update`; Discord adds message edit and returns message ids from send. +- Affected tests: shared progress helper tests, Slack runtime/client tests, Discord runtime/client tests, adapter parity tests, and failure-fallback tests. +- No new runtime dependencies are expected. diff --git a/openspec/changes/update-live-progress-across-messengers/specs/discord-runtime-client/spec.md b/openspec/changes/update-live-progress-across-messengers/specs/discord-runtime-client/spec.md new file mode 100644 index 0000000..b9b10c9 --- /dev/null +++ b/openspec/changes/update-live-progress-across-messengers/specs/discord-runtime-client/spec.md @@ -0,0 +1,19 @@ +## ADDED Requirements + +### Requirement: Discord live client supports progress message edits +The Discord live client SHALL expose bot-message references and edit operations needed for live progress edit-in-place behavior while preserving safe fallback when Discord rejects an edit. + +#### Scenario: Discord send message returns id reference +- **WHEN** PiRelay sends a Discord bot message for live progress +- **THEN** the Discord live client returns the created message id when Discord provides it +- **AND** that id can be used as a live progress reference scoped to the Discord channel destination + +#### Scenario: Discord edits progress message +- **WHEN** PiRelay has a Discord live progress reference for a bot-owned message and progress text changes +- **THEN** the Discord live client edits the expected channel message using the platform message id +- **AND** it keeps equivalent safe text content even if richer formatting or components are later added + +#### Scenario: Discord edit failure is recoverable +- **WHEN** Discord rejects a live progress edit because the message is deleted, inaccessible, not bot-owned, or permissions changed +- **THEN** PiRelay clears the stale Discord progress reference and falls back to a new live progress message or plain snapshot +- **AND** it does not mark Discord runtime unhealthy solely because a best-effort progress edit failed diff --git a/openspec/changes/update-live-progress-across-messengers/specs/messenger-relay-sessions/spec.md b/openspec/changes/update-live-progress-across-messengers/specs/messenger-relay-sessions/spec.md new file mode 100644 index 0000000..ddacabb --- /dev/null +++ b/openspec/changes/update-live-progress-across-messengers/specs/messenger-relay-sessions/spec.md @@ -0,0 +1,29 @@ +## ADDED Requirements + +### Requirement: Live progress updates use in-place delivery where supported +The system SHALL deliver non-terminal live progress as an updated per-destination progress message when the active messenger adapter supports bot-message updates, while preserving safe snapshot fallback for unsupported or failed update paths. + +#### Scenario: Supported adapter updates existing progress +- **WHEN** a paired running session emits multiple eligible progress updates for a binding whose messenger adapter supports live progress updates +- **THEN** PiRelay updates the existing live progress message for that destination rather than sending a new message for each progress flush +- **AND** the progress content remains coalesced, rate-limited, redacted, and bounded by configured progress limits + +#### Scenario: Unsupported adapter sends snapshots +- **WHEN** a paired running session emits eligible progress updates for a binding whose messenger adapter does not support live progress updates +- **THEN** PiRelay sends bounded coalesced progress snapshots according to the binding's progress mode +- **AND** it does not treat missing edit capability as an adapter failure + +#### Scenario: Update failure falls back safely +- **WHEN** updating an existing live progress message fails because the message was deleted, expired, inaccessible, or rejected by the platform +- **THEN** PiRelay clears that live progress reference and falls back to sending a new live progress message or plain snapshot +- **AND** final failure to deliver progress is swallowed because non-terminal progress is best-effort + +#### Scenario: Terminal output remains separate +- **WHEN** a Pi turn completes, fails, or aborts after live progress updates were sent or edited +- **THEN** PiRelay sends terminal output or notification according to final-output policy as a separate messenger result +- **AND** it does not merge final assistant output into the live progress card + +#### Scenario: Progress modes still apply independently +- **WHEN** one binding is normal, another is verbose, another is completion-only, and another is quiet +- **THEN** live progress update/edit behavior respects each binding's existing progress-mode eligibility independently +- **AND** quiet receives no live progress while completion-only receives no ordinary live progress diff --git a/openspec/changes/update-live-progress-across-messengers/specs/relay-channel-adapters/spec.md b/openspec/changes/update-live-progress-across-messengers/specs/relay-channel-adapters/spec.md new file mode 100644 index 0000000..fe65153 --- /dev/null +++ b/openspec/changes/update-live-progress-across-messengers/specs/relay-channel-adapters/spec.md @@ -0,0 +1,24 @@ +## ADDED Requirements + +### Requirement: Channel adapters expose optional live progress update capability +The system SHALL model live progress message creation and update as optional channel adapter capabilities with safe fallback to ordinary text messages when a platform lacks update support or an update operation fails. + +#### Scenario: Adapter returns live progress reference +- **WHEN** a channel adapter supports live progress and sends a live progress message +- **THEN** it returns a non-secret message reference sufficient to update that bot-owned message later +- **AND** the reference is scoped to the destination and is not persisted as a long-term binding secret + +#### Scenario: Adapter updates live progress reference +- **WHEN** a channel adapter receives a valid live progress reference for a bot-owned message in the expected destination +- **THEN** it updates that message with the new safe progress text using platform-specific APIs +- **AND** it falls back to ordinary text delivery or reports a recoverable failure when the update cannot be performed + +#### Scenario: Adapter fallback invariant is consistent +- **WHEN** live progress delivery is attempted through Telegram, Slack, Discord, or a future adapter +- **THEN** the delivery path tries update-in-place first when a reference exists, then sends a new live/editable progress message when supported, then sends a plain text snapshot +- **AND** if every attempt fails, the failure is contained as best-effort progress and does not fail the Pi turn or mark the messenger runtime unhealthy + +#### Scenario: Binding authority is checked before protected progress delivery +- **WHEN** live progress is about to be sent or updated through any adapter +- **THEN** PiRelay verifies the current binding authority, destination identity, paused/revoked/moved state, and route liveness for that destination +- **AND** refuses to send or update protected progress when authority is unavailable or denied diff --git a/openspec/changes/update-live-progress-across-messengers/specs/slack-runtime-client/spec.md b/openspec/changes/update-live-progress-across-messengers/specs/slack-runtime-client/spec.md new file mode 100644 index 0000000..b1ca168 --- /dev/null +++ b/openspec/changes/update-live-progress-across-messengers/specs/slack-runtime-client/spec.md @@ -0,0 +1,19 @@ +## ADDED Requirements + +### Requirement: Slack live client supports progress message updates +The Slack live client SHALL expose bot-message references and update operations needed for live progress edit-in-place behavior while preserving safe fallback when Slack rejects an update. + +#### Scenario: Slack post message returns timestamp reference +- **WHEN** PiRelay posts a Slack bot message for live progress +- **THEN** the Slack live client returns the message timestamp `ts` when Slack provides it +- **AND** that timestamp can be used as a live progress reference scoped to the Slack channel or thread destination + +#### Scenario: Slack updates progress message +- **WHEN** PiRelay has a Slack live progress reference for a bot-owned message and progress text changes +- **THEN** the Slack live client calls Slack `chat.update` for the expected channel and timestamp +- **AND** it preserves equivalent safe text content even if richer formatting is later added + +#### Scenario: Slack update failure is recoverable +- **WHEN** Slack rejects a live progress update because the message is deleted, too old, not bot-owned, or otherwise inaccessible +- **THEN** PiRelay clears the stale Slack progress reference and falls back to a new live progress message or plain snapshot +- **AND** it does not mark Slack runtime unhealthy solely because a best-effort progress update failed diff --git a/openspec/changes/update-live-progress-across-messengers/tasks.md b/openspec/changes/update-live-progress-across-messengers/tasks.md new file mode 100644 index 0000000..cf5cf50 --- /dev/null +++ b/openspec/changes/update-live-progress-across-messengers/tasks.md @@ -0,0 +1,37 @@ +## 1. Shared Live Progress Delivery Model + +- [x] 1.1 Add or refine shared live-progress delivery state types for text, live message refs, timers, last-sent timestamps, and pending entries. +- [x] 1.2 Implement a focused TypeScript helper for the fallback ladder: update existing ref, send live/editable message, send plain snapshot, swallow final failure. +- [x] 1.3 Ensure the helper clears stale refs on update/live-send failures and does not suppress future eligible progress after fallback. +- [x] 1.4 Keep terminal completion/failure/abort delivery separate from live progress state and clear live progress refs on terminal/unregister paths. +- [x] 1.5 Add helper unit tests for unchanged text, update success, update failure, live-send success, live-send failure fallback, plain-send failure, and stale-ref clearing. + +## 2. Slack Live Progress Support + +- [x] 2.1 Extend `SlackApiOperations.postMessage` to return an optional Slack message `ts` while keeping existing fake/live callers compatible. +- [x] 2.2 Add `SlackApiOperations.updateMessage` and implement it in `SlackLiveOperations` using Slack `chat.update`. +- [x] 2.3 Implement `sendLiveProgress` and `updateLiveProgress` in `SlackChannelAdapter` with safe fallback to text snapshots. +- [x] 2.4 Update `SlackRuntime` progress state and `flushProgress` to update a live progress card instead of always posting new snapshots. +- [x] 2.5 Add Slack adapter/runtime/live-client tests for send ref capture, update success, update failure fallback, unsupported update fallback, filtered-empty cleanup, and progress-mode filtering. + +## 3. Discord Live Progress Support + +- [x] 3.1 Extend `DiscordApiOperations.sendMessage` to return an optional Discord message id while keeping existing fake/live callers compatible. +- [x] 3.2 Add `DiscordApiOperations.editMessage` and implement it in `DiscordLiveOperations` using Discord message edit API. +- [x] 3.3 Implement `sendLiveProgress` and `updateLiveProgress` in `DiscordChannelAdapter` with safe fallback to text snapshots. +- [x] 3.4 Update `DiscordRuntime` progress state and `flushProgress` to edit a live progress card instead of always posting new snapshots. +- [x] 3.5 Add Discord adapter/runtime/live-client tests for send ref capture, edit success, edit failure fallback, unsupported edit fallback, filtered-empty cleanup, typing coexistence, and progress-mode filtering. + +## 4. Cross-Messenger Parity and Safety + +- [x] 4.1 Verify Telegram direct behavior still passes through the shared fallback invariant or remains behaviorally equivalent with dedicated tests. +- [x] 4.2 Verify Telegram broker remains behaviorally equivalent, including editable-send failure fallback and edit-path outbox tests. +- [x] 4.3 Verify Slack, Discord, Telegram direct, and Telegram broker all preserve authorization, paused/revoked/moved binding authority, destination scoping, and stale route checks before sending/updating progress. +- [x] 4.4 Verify ordinary progress remains suppressed in completion-only and quiet modes, while eligible compaction progress follows the live progress policy. +- [x] 4.5 Add or update README/help text to explain that supported messengers update a live progress card instead of posting repeated progress messages. + +## 5. Validation + +- [x] 5.1 Run `npm run typecheck`. +- [x] 5.2 Run `npm test`. +- [x] 5.3 Run `openspec validate update-live-progress-across-messengers --strict`. diff --git a/tests/discord-adapter.test.ts b/tests/discord-adapter.test.ts index b119aa3..797ac43 100644 --- a/tests/discord-adapter.test.ts +++ b/tests/discord-adapter.test.ts @@ -57,6 +57,39 @@ describe("DiscordChannelAdapter", () => { expect(event!.conversation.kind).toBe("group"); }); + it("captures Discord live progress refs and updates them by message id", async () => { + const sendMessage = vi.fn(async () => ({ messageId: "456" })); + const editMessage = vi.fn(async () => undefined); + const sendFile = vi.fn(async (_payload: unknown) => undefined); + const sendTyping = vi.fn(async (_channelId: string) => undefined); + const answerInteraction = vi.fn(async (_interactionId: string, _interactionToken: string | undefined, _options?: unknown) => undefined); + const downloadFile = vi.fn(async (_url: string) => new Uint8Array([9])); + const adapter = new DiscordChannelAdapter(config, { sendMessage, sendFile, sendTyping, answerInteraction, downloadFile, editMessage }); + const address = { channel: "discord", conversationId: "dm1", userId: "u1" } as const; + + const ref = await adapter.sendLiveProgress(address, "Compiling"); + await expect(adapter.updateLiveProgress(address, { messageId: ref?.messageId ?? "" }, "Still compiling" )).resolves.toBeUndefined(); + + expect(ref).toEqual({ messageId: "456" }); + expect(sendMessage).toHaveBeenCalledWith({ channelId: "dm1", content: "Compiling" }); + expect(editMessage).toHaveBeenCalledWith({ channelId: "dm1", messageId: "456", content: "Still compiling" }); + + const noOpAdapter = new DiscordChannelAdapter(config, { sendMessage: async () => undefined, sendFile, sendTyping, answerInteraction, downloadFile }); + expect(await noOpAdapter.sendLiveProgress(address, "x")).toBeUndefined(); + }); + + it("throws a consistent error when Discord message updates are unavailable", async () => { + const sendMessage = vi.fn(async (_payload: unknown) => undefined); + const sendFile = vi.fn(async (_payload: unknown) => undefined); + const sendTyping = vi.fn(async (_channelId: string) => undefined); + const answerInteraction = vi.fn(async (_interactionId: string, _interactionToken: string | undefined, _options?: unknown) => undefined); + const downloadFile = vi.fn(async (_url: string) => new Uint8Array([9])); + const adapter = new DiscordChannelAdapter(config, { sendMessage, sendFile, sendTyping, answerInteraction, downloadFile }); + const address = { channel: "discord", conversationId: "dm1", userId: "u1" } as const; + + await expect(adapter.updateLiveProgress(address, { messageId: "456" }, "Still compiling")).rejects.toThrow("Discord message updates are not supported by this adapter configuration."); + }); + it("chunks text, maps buttons, and sends files through injected operations", async () => { const sendMessage = vi.fn(async (_payload: unknown) => undefined); const sendFile = vi.fn(async (_payload: unknown) => undefined); diff --git a/tests/discord-live-client.test.ts b/tests/discord-live-client.test.ts index 9a2d8f2..79710de 100644 --- a/tests/discord-live-client.test.ts +++ b/tests/discord-live-client.test.ts @@ -183,6 +183,32 @@ describe("discord live client helpers", () => { expect(create).not.toHaveBeenCalled(); }); + it("returns a message id when sending Discord messages", async () => { + const send = vi.fn(async (_options: unknown) => ({ id: "m-99", channelId: "c1" })); + const client = mockDiscordClient(); + vi.mocked(client.channels.fetch).mockResolvedValue({ send, sendTyping: vi.fn(async () => undefined), messages: { fetch: vi.fn() } } as never); + const operations = new DiscordLiveOperations({ token: "discord-token", client }); + + const result = await operations.sendMessage({ channelId: "c1", content: "hello" }); + + expect(result).toEqual({ messageId: "m-99" }); + expect(send).toHaveBeenCalledWith(expect.objectContaining({ content: "hello", allowedMentions: { parse: [] } })); + }); + + it("updates Discord messages by fetching and editing", async () => { + const edit = vi.fn(async () => undefined); + const fetch = vi.fn(async () => ({ edit })); + const send = vi.fn(async () => ({ id: "m-100", channelId: "c1" })); + const client = mockDiscordClient(); + vi.mocked(client.channels.fetch).mockResolvedValue({ send, sendTyping: vi.fn(async () => undefined), messages: { fetch } } as never); + const operations = new DiscordLiveOperations({ token: "discord-token", client }); + + await operations.editMessage({ channelId: "c1", messageId: "m-100", content: "updated" }); + + expect(fetch).toHaveBeenCalledWith("m-100"); + expect(edit).toHaveBeenCalledWith({ content: "updated", allowedMentions: { parse: [] } }); + }); + it("detects placeholders only in placeholder-style token forms", () => { expect(discordSubcommandTakesArgs("/status")).toBe(false); expect(discordSubcommandTakesArgs("/send ")).toBe(true); diff --git a/tests/discord-runtime.test.ts b/tests/discord-runtime.test.ts index 37b66ce..130605e 100644 --- a/tests/discord-runtime.test.ts +++ b/tests/discord-runtime.test.ts @@ -2,7 +2,7 @@ import { mkdtemp, rm, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { afterEach, describe, expect, it, vi } from "vitest"; -import type { DiscordApiOperations, DiscordAttachmentPayload, DiscordGatewayEvent, DiscordMentionPayload, DiscordSendFilePayload, DiscordSendMessagePayload } from "../extensions/relay/adapters/discord/adapter.js"; +import type { DiscordApiOperations, DiscordAttachmentPayload, DiscordEditMessagePayload, DiscordGatewayEvent, DiscordMentionPayload, DiscordSendFilePayload, DiscordSendMessagePayload, DiscordSendMessageResult } from "../extensions/relay/adapters/discord/adapter.js"; import { createDiscordRuntime, DiscordRuntime, getOrCreateDiscordRuntime } from "../extensions/relay/adapters/discord/runtime.js"; import { routeUnavailableError } from "../extensions/relay/core/route-actions.js"; import { TunnelStateStore } from "../extensions/relay/state/tunnel-store.js"; @@ -15,9 +15,15 @@ const ONE_BY_ONE_GIF = Buffer.from("R0lGODlhAQABAIABAP///wAAACH5BAEKAAEALAAAAAAB class FakeDiscordOperations implements DiscordApiOperations { handler?: (event: DiscordGatewayEvent) => Promise; readonly messages: DiscordSendMessagePayload[] = []; + readonly edits: DiscordEditMessagePayload[] = []; readonly files: DiscordSendFilePayload[] = []; readonly typing: string[] = []; readonly answers: Array<{ interactionId: string; text?: string; alert?: boolean }> = []; + readonly messageIds: string[] = []; + messageIdPrefix = "111"; + messageIndex = 0; + onSendMessage?: (payload: DiscordSendMessagePayload) => Promise; + onEditMessage?: (payload: DiscordEditMessagePayload) => Promise; downloadBytes = new Uint8Array([1, 2, 3]); readonly downloadUrls: string[] = []; @@ -32,8 +38,21 @@ class FakeDiscordOperations implements DiscordApiOperations { this.handler = undefined; } - async sendMessage(payload: DiscordSendMessagePayload): Promise { + async sendMessage(payload: DiscordSendMessagePayload): Promise { this.messages.push(payload); + const id = `${this.messageIdPrefix}.${++this.messageIndex}`; + this.messageIds.push(id); + if (this.onSendMessage) { + await this.onSendMessage(payload); + } + return { messageId: id }; + } + + async editMessage(payload: DiscordEditMessagePayload): Promise { + this.edits.push(payload); + if (this.onEditMessage) { + await this.onEditMessage(payload); + } } async sendFile(payload: DiscordSendFilePayload): Promise { @@ -206,7 +225,7 @@ describe("DiscordRuntime", () => { sessionLabel: session.sessionLabel, boundAt: new Date().toISOString(), lastSeenAt: new Date().toISOString(), - metadata: { progressMode: "normal" }, + metadata: { progressMode: "normal", }, }); await runtime.start(); @@ -223,6 +242,116 @@ describe("DiscordRuntime", () => { expect(ops.messages[0]?.content).not.toContain("Drafting response"); }); + it("reuses a Discord live progress message when updates are supported", async () => { + const cfg = await config(); + cfg.progressIntervalMs = 1; + const ops = new FakeDiscordOperations(); + const runtime = new DiscordRuntime(cfg, { operations: ops }); + const session = route().route; + session.notification.lastStatus = "running"; + const store = new TunnelStateStore(cfg.stateDir); + await store.upsertChannelBinding({ + channel: "discord", + instanceId: "default", + conversationId: "dm1", + userId: "u1", + sessionKey: session.sessionKey, + sessionId: session.sessionId, + sessionLabel: session.sessionLabel, + boundAt: new Date().toISOString(), + lastSeenAt: new Date().toISOString(), + metadata: { progressMode: "normal" }, + }); + + await runtime.start(); + session.notification.progressEvent = { id: "discord-live-progress-1", kind: "tool", text: "Compile", at: Date.now() }; + await runtime.registerRoute(session); + await vi.waitFor(() => expect(ops.messages).toHaveLength(1)); + + const firstMessageId = ops.messageIds.at(-1); + session.notification.progressEvent = { id: "discord-live-progress-2", kind: "tool", text: "Compile tests", at: Date.now() }; + await runtime.registerRoute(session); + await vi.waitFor(() => expect(ops.edits).toHaveLength(1)); + + expect(ops.messages).toHaveLength(1); + expect(ops.edits[0]).toMatchObject({ channelId: "dm1", messageId: firstMessageId, content: expect.stringContaining("Compile tests") }); + }); + + it("falls back to a new Discord progress snapshot when edit updates fail", async () => { + const cfg = await config(); + cfg.progressIntervalMs = 1; + const ops = new FakeDiscordOperations(); + ops.onEditMessage = async () => { + throw new Error("edit failed"); + }; + const runtime = new DiscordRuntime(cfg, { operations: ops }); + const session = route().route; + session.notification.lastStatus = "running"; + const store = new TunnelStateStore(cfg.stateDir); + await store.upsertChannelBinding({ + channel: "discord", + instanceId: "default", + conversationId: "dm1", + userId: "u1", + sessionKey: session.sessionKey, + sessionId: session.sessionId, + sessionLabel: session.sessionLabel, + boundAt: new Date().toISOString(), + lastSeenAt: new Date().toISOString(), + metadata: { progressMode: "normal" }, + }); + + await runtime.start(); + session.notification.progressEvent = { id: "discord-fallback-progress-1", kind: "tool", text: "Compile", at: Date.now() }; + await runtime.registerRoute(session); + await vi.waitFor(() => expect(ops.messages).toHaveLength(1)); + + session.notification.progressEvent = { id: "discord-fallback-progress-2", kind: "tool", text: "Packaging", at: Date.now() }; + await runtime.registerRoute(session); + await vi.waitFor(() => expect(ops.messages.length).toBe(2)); + + expect(ops.edits).toHaveLength(1); + expect(ops.messages.at(-1)?.content).toContain("Packaging"); + }); + + it("does not edit a stale Discord live progress message after binding is revoked", async () => { + vi.useFakeTimers(); + const cfg = await config(); + cfg.progressIntervalMs = 50; + const ops = new FakeDiscordOperations(); + const runtime = new DiscordRuntime(cfg, { operations: ops }); + const session = route().route; + session.notification.lastStatus = "running"; + const store = new TunnelStateStore(cfg.stateDir); + await store.upsertChannelBinding({ + channel: "discord", + instanceId: "default", + conversationId: "dm1", + userId: "u1", + sessionKey: session.sessionKey, + sessionId: session.sessionId, + sessionLabel: session.sessionLabel, + boundAt: new Date().toISOString(), + lastSeenAt: new Date().toISOString(), + metadata: { progressMode: "normal" }, + }); + + await runtime.start(); + session.notification.progressEvent = { id: "discord-progress-before-revoke", kind: "tool", text: "Compiling", at: Date.now() }; + await runtime.registerRoute(session); + await vi.runOnlyPendingTimersAsync(); + await vi.waitFor(() => expect(ops.messages).toHaveLength(1)); + + session.notification.progressEvent = { id: "discord-progress-after-revoke", kind: "tool", text: "Should not edit", at: Date.now() + 1 }; + await runtime.registerRoute(session); + await store.revokeChannelBinding("discord", session.sessionKey); + await vi.advanceTimersByTimeAsync(50); + + expect(ops.messages).toHaveLength(1); + expect(ops.edits).toHaveLength(0); + await vi.waitFor(() => expect((runtime as unknown as { progressStates: Map }).progressStates.size).toBe(0)); + }); + it("clears Discord progress state when pending progress becomes suppressed", async () => { vi.useFakeTimers(); const cfg = await config(); @@ -261,7 +390,7 @@ describe("DiscordRuntime", () => { metadata: { progressMode: "quiet" }, }); await vi.runOnlyPendingTimersAsync(); - await vi.waitFor(() => expect((runtime as any).progressStates.size).toBe(0)); + await vi.waitFor(() => expect((runtime as unknown as { progressStates: Map }).progressStates.size).toBe(0)); expect(ops.messages).toHaveLength(0); }); diff --git a/tests/progress.test.ts b/tests/progress.test.ts index d70ad37..5eab82b 100644 --- a/tests/progress.test.ts +++ b/tests/progress.test.ts @@ -14,6 +14,8 @@ import { shouldSendNonTerminalProgress, shouldSendProgressActivity, } from "../extensions/relay/notifications/progress.js"; +import { deliverLiveProgress } from "../extensions/relay/notifications/progress-delivery.js"; +import type { LiveProgressDeliveryState } from "../extensions/relay/notifications/progress-delivery.js"; import type { SessionNotificationState, TelegramBindingMetadata, TelegramTunnelConfig } from "../extensions/relay/core/types.js"; const config: Pick = { @@ -120,3 +122,120 @@ describe("progress helpers", () => { expect(formatRecentActivity(undefined)).toContain("No recent activity"); }); }); + +describe("deliverLiveProgress helper", () => { + it("suppresses unchanged progress snapshots", async () => { + const updates: string[] = []; + const edits: string[] = []; + const snapshots: string[] = []; + const state: LiveProgressDeliveryState = { pending: [] }; + state.lastText = "Already sent"; + + await deliverLiveProgress(state, "Already sent", { + sendLiveProgress: async (text) => { + updates.push(text); + return "x"; + }, + updateLiveProgress: async (ref, text) => { + edits.push(`${ref}:${text}`); + }, + sendProgressSnapshot: async (text) => { + snapshots.push(text); + }, + }); + + expect(updates).toHaveLength(0); + expect(edits).toHaveLength(0); + expect(snapshots).toHaveLength(0); + }); + + it("updates a live message when supported", async () => { + const updates: string[] = []; + const state: LiveProgressDeliveryState = { pending: [], liveMessageRef: "msg-1", lastText: undefined }; + + await deliverLiveProgress(state, "Running tests", { + sendLiveProgress: async (text) => { + updates.push(`send:${text}`); + return "msg-2"; + }, + updateLiveProgress: async (ref, text) => { + updates.push(`update:${ref}:${text}`); + }, + sendProgressSnapshot: async (text) => { + updates.push(`snapshot:${text}`); + }, + }); + + expect(updates).toEqual(["update:msg-1:Running tests"]); + expect(state.liveMessageRef).toBe("msg-1"); + expect(state.lastText).toBe("Running tests"); + }); + + it("falls back to new live progress when live edit fails", async () => { + const updates: string[] = []; + const state: LiveProgressDeliveryState = { pending: [], liveMessageRef: "msg-1", lastText: undefined }; + + await deliverLiveProgress(state, "Still running", { + sendLiveProgress: async (text) => { + updates.push(`send:${text}`); + return "msg-2"; + }, + updateLiveProgress: async () => { + updates.push("update-failed"); + throw new Error("edit failed"); + }, + sendProgressSnapshot: async (text) => { + updates.push(`snapshot:${text}`); + }, + }); + + expect(updates[0]).toBe("update-failed"); + expect(updates[1]).toBe("send:Still running"); + expect(state.liveMessageRef).toBe("msg-2"); + expect(state.lastText).toBe("Still running"); + expect(updates).toHaveLength(2); + }); + + it("falls back to plain snapshot when live path fails", async () => { + const updates: string[] = []; + const state: LiveProgressDeliveryState = { pending: [] }; + + await deliverLiveProgress(state, "Fallback snapshot", { + sendLiveProgress: async () => { + updates.push("send-live"); + throw new Error("live blocked"); + }, + sendProgressSnapshot: async (text) => { + updates.push(`snapshot:${text}`); + }, + }); + + expect(updates).toEqual(["send-live", "snapshot:Fallback snapshot"]); + expect(state.lastText).toBe("Fallback snapshot"); + expect(state.liveMessageRef).toBeUndefined(); + }); + + it("clears stale live refs and swallows snapshot failures", async () => { + const updates: string[] = []; + const state: LiveProgressDeliveryState = { pending: [], liveMessageRef: "stale", lastText: "old" }; + + await deliverLiveProgress(state, "Still running", { + updateLiveProgress: async () => { + updates.push("update"); + throw new Error("stale update"); + }, + sendLiveProgress: async () => { + updates.push("send-live"); + throw new Error("send blocked"); + }, + sendProgressSnapshot: async (text) => { + updates.push(`snapshot:${text}`); + throw new Error("snapshot blocked"); + }, + }); + + expect(updates).toEqual(["update", "send-live", "snapshot:Still running"]); + expect(state.liveMessageRef).toBeUndefined(); + expect(state.lastText).toBe("old"); + }); +}); diff --git a/tests/slack-adapter.test.ts b/tests/slack-adapter.test.ts index d1dd7ae..66b106c 100644 --- a/tests/slack-adapter.test.ts +++ b/tests/slack-adapter.test.ts @@ -229,6 +229,27 @@ describe("SlackChannelAdapter", () => { expect(slackPairingCommand("abc")).toBe("relay pair abc"); }); + it("captures Slack live progress refs and updates by message id", async () => { + const postMessage = vi.fn(async () => ({ ts: "123.45" })); + const updateMessage = vi.fn(async (_payload: unknown) => undefined); + const adapter = new SlackChannelAdapter(config, { postMessage, updateMessage, uploadFile: async () => undefined, postEphemeral: async () => undefined }); + const address = { channel: "slack", conversationId: "D1", userId: "U1" } as const; + + const ref = await adapter.sendLiveProgress(address, "Running tests"); + await expect(adapter.updateLiveProgress(address, { messageId: ref?.messageId ?? "" }, "Still running" )).resolves.toBeUndefined(); + + expect(ref).toEqual({ messageId: "123.45" }); + expect(postMessage).toHaveBeenCalledWith(expect.objectContaining({ channel: "D1", text: "Running tests" })); + expect(updateMessage).toHaveBeenCalledWith(expect.objectContaining({ channel: "D1", ts: "123.45", text: "Still running" })); + }); + + it("throws a consistent error when Slack message updates are unavailable", async () => { + const adapter = new SlackChannelAdapter(config, { postMessage: async () => undefined, uploadFile: async () => undefined, postEphemeral: async () => undefined }); + const address = { channel: "slack", conversationId: "D1", userId: "U1" } as const; + + await expect(adapter.updateLiveProgress(address, { messageId: "123.4" }, "New status")).rejects.toThrow("Slack message updates are not supported by this adapter configuration."); + }); + it("normalizes Slack shared-room mentions", () => { expect(slackMentionedUserIds("hi <@U123> and <@U456>")).toEqual(["U123", "U456"]); expect(slackMessageSharedRoomAddressing("hi <@U123>", "U123")).toEqual({ kind: "local" }); diff --git a/tests/slack-runtime.test.ts b/tests/slack-runtime.test.ts index a52821c..5047d22 100644 --- a/tests/slack-runtime.test.ts +++ b/tests/slack-runtime.test.ts @@ -2,7 +2,7 @@ import { mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { afterEach, describe, expect, it, vi } from "vitest"; -import type { SlackApiOperations, SlackAuthTestResult, SlackEnvelope, SlackPostEphemeralPayload, SlackPostMessagePayload, SlackReactionPayload, SlackUploadFilePayload } from "../extensions/relay/adapters/slack/adapter.js"; +import type { SlackApiOperations, SlackAuthTestResult, SlackEnvelope, SlackPostEphemeralPayload, SlackPostMessagePayload, SlackReactionPayload, SlackPostMessageResult, SlackUpdateMessagePayload, SlackUploadFilePayload } from "../extensions/relay/adapters/slack/adapter.js"; import { SlackLiveOperations } from "../extensions/relay/adapters/slack/live-client.js"; import { SlackRuntime } from "../extensions/relay/adapters/slack/runtime.js"; import { routeUnavailableError } from "../extensions/relay/core/route-actions.js"; @@ -51,9 +51,14 @@ class FakeWebSocket { class FakeSlackOperations implements SlackApiOperations { handler?: (event: SlackEnvelope) => Promise; readonly posts: SlackPostMessagePayload[] = []; + readonly updates: SlackUpdateMessagePayload[] = []; readonly ephemeral: SlackPostEphemeralPayload[] = []; readonly responses: Array<{ url: string; text: string }> = []; responseError?: Error; + updateMessageError?: Error; + readonly postMessageTsPrefix = "100"; + postMessageIndex = 0; + onUpdateMessage?: (payload: SlackUpdateMessagePayload) => Promise; addReaction?: (payload: SlackReactionPayload) => Promise; removeReaction?: (payload: SlackReactionPayload) => Promise; @@ -69,8 +74,20 @@ class FakeSlackOperations implements SlackApiOperations { return { teamId: "T1", userId: "U_BOT", botId: "B1", appId: "A1" }; } - async postMessage(payload: SlackPostMessagePayload): Promise { + async postMessage(payload: SlackPostMessagePayload): Promise { this.posts.push(payload); + this.postMessageIndex += 1; + return { ts: `${this.postMessageTsPrefix}.${this.postMessageIndex}` }; + } + + async updateMessage(payload: SlackUpdateMessagePayload): Promise { + this.updates.push(payload); + if (this.updateMessageError) { + throw this.updateMessageError; + } + if (this.onUpdateMessage) { + await this.onUpdateMessage(payload); + } } readonly uploads: SlackUploadFilePayload[] = []; @@ -793,6 +810,115 @@ describe("SlackRuntime foundations", () => { expect(operations.posts.at(-1)?.text).not.toContain("Pi progress"); }); + it("reuses a Slack live progress message when updates are supported", async () => { + const operations = new FakeSlackOperations(); + const runtimeConfig = await config(); + runtimeConfig.verboseProgressIntervalMs = 1; + const testRoute = route(); + testRoute.notification.lastStatus = "running"; + const store = new TunnelStateStore(runtimeConfig.stateDir); + await store.upsertChannelBinding({ + channel: "slack", + instanceId: "default", + conversationId: "D1", + userId: "U_DRIVER", + sessionKey: testRoute.sessionKey, + sessionId: testRoute.sessionId, + sessionLabel: testRoute.sessionLabel, + boundAt: new Date().toISOString(), + lastSeenAt: new Date().toISOString(), + metadata: { progressMode: "verbose", threadTs: "parent-progress" }, + }); + const runtime = new SlackRuntime(runtimeConfig, { operations }); + + testRoute.notification.progressEvent = { id: "progress-live-1", kind: "tool", text: "Compiling", at: Date.now() }; + await runtime.registerRoute(testRoute); + await waitForSlackRuntimeCondition(() => operations.posts.length > 0); + + expect(operations.posts.at(-1)).toMatchObject({ channel: "D1", threadTs: "parent-progress" }); + testRoute.notification.progressEvent = { id: "progress-live-2", kind: "tool", text: "Running tests", at: Date.now() }; + await runtime.registerRoute(testRoute); + await waitForSlackRuntimeCondition(() => operations.updates.length > 0); + + expect(operations.posts).toHaveLength(1); + expect(operations.updates).toHaveLength(1); + expect(operations.updates[0]).toMatchObject({ channel: "D1", ts: "100.1", text: expect.stringContaining("Running tests") }); + }); + + it("falls back to a new Slack progress snapshot when edit updates fail", async () => { + const operations = new FakeSlackOperations(); + operations.onUpdateMessage = async () => { + throw new Error("edit failed"); + }; + const runtimeConfig = await config(); + runtimeConfig.verboseProgressIntervalMs = 1; + const testRoute = route(); + testRoute.notification.lastStatus = "running"; + const store = new TunnelStateStore(runtimeConfig.stateDir); + await store.upsertChannelBinding({ + channel: "slack", + instanceId: "default", + conversationId: "D1", + userId: "U_DRIVER", + sessionKey: testRoute.sessionKey, + sessionId: testRoute.sessionId, + sessionLabel: testRoute.sessionLabel, + boundAt: new Date().toISOString(), + lastSeenAt: new Date().toISOString(), + metadata: { progressMode: "verbose", threadTs: "parent-progress" }, + }); + const runtime = new SlackRuntime(runtimeConfig, { operations }); + + testRoute.notification.progressEvent = { id: "progress-failed-update-1", kind: "tool", text: "Compiling", at: Date.now() }; + await runtime.registerRoute(testRoute); + await waitForSlackRuntimeCondition(() => operations.posts.length > 0); + testRoute.notification.progressEvent = { id: "progress-failed-update-2", kind: "tool", text: "Packaging", at: Date.now() + 1 }; + await runtime.registerRoute(testRoute); + await waitForSlackRuntimeCondition(() => operations.posts.length === 2); + + expect(operations.posts).toHaveLength(2); + expect(operations.posts.at(-1)?.text).toContain("Packaging"); + expect(operations.updates).toHaveLength(1); + }); + + it("does not edit a stale Slack live progress message after binding moves", async () => { + vi.useFakeTimers(); + const operations = new FakeSlackOperations(); + const runtimeConfig = await config(); + runtimeConfig.progressIntervalMs = 50; + const testRoute = route(); + testRoute.notification.lastStatus = "running"; + const store = new TunnelStateStore(runtimeConfig.stateDir); + const baseBinding = { + channel: "slack" as const, + instanceId: "default", + conversationId: "D1", + userId: "U_DRIVER", + sessionKey: testRoute.sessionKey, + sessionId: testRoute.sessionId, + sessionLabel: testRoute.sessionLabel, + boundAt: new Date().toISOString(), + lastSeenAt: new Date().toISOString(), + metadata: { progressMode: "normal" }, + }; + await store.upsertChannelBinding(baseBinding); + const runtime = new SlackRuntime(runtimeConfig, { operations }); + + testRoute.notification.progressEvent = { id: "progress-before-move", kind: "tool", text: "Compiling", at: Date.now() }; + await runtime.registerRoute(testRoute); + await vi.runOnlyPendingTimersAsync(); + await vi.waitFor(() => expect(operations.posts).toHaveLength(1)); + + testRoute.notification.progressEvent = { id: "progress-after-move", kind: "tool", text: "Should not edit", at: Date.now() + 1 }; + await runtime.registerRoute(testRoute); + await store.upsertChannelBinding({ ...baseBinding, conversationId: "D2" }); + await vi.advanceTimersByTimeAsync(50); + + expect(operations.posts).toHaveLength(1); + expect(operations.updates).toHaveLength(0); + await vi.waitFor(() => expect((runtime as unknown as { progressStates: Map }).progressStates.size).toBe(0)); + }); + it("uploads latest, explicit images, and requester-scoped Slack files", async () => { const operations = new FakeSlackOperations(); const runtimeConfig = await config();