diff --git a/.github/dependabot.yml b/.github/dependabot.yml index f06452da..a77c2d77 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -2,6 +2,12 @@ version: 2 updates: # ── npm: root workspace ──────────────────────────────────────────────────── + # flowfi is a single npm workspace (frontend + backend hoisted into one root + # package-lock.json). Dependabot must update from the workspace root so the + # root lockfile CI runs `npm ci` against stays in sync. Per-directory entries + # for /frontend and /backend only touched their package.json without updating + # the root lockfile, so every PR they opened failed `npm ci` with + # "lock file's X does not satisfy Y". One root entry covers all workspaces. - package-ecosystem: "npm" directory: "/" schedule: @@ -14,32 +20,6 @@ updates: - "minor" - "patch" - # ── npm: frontend ────────────────────────────────────────────────────────── - - package-ecosystem: "npm" - directory: "/frontend" - schedule: - interval: "weekly" - day: "monday" - open-pull-requests-limit: 10 - groups: - minor-and-patch: - update-types: - - "minor" - - "patch" - - # ── npm: backend ─────────────────────────────────────────────────────────── - - package-ecosystem: "npm" - directory: "/backend" - schedule: - interval: "weekly" - day: "monday" - open-pull-requests-limit: 10 - groups: - minor-and-patch: - update-types: - - "minor" - - "patch" - # ── Cargo: contracts ─────────────────────────────────────────────────────── - package-ecosystem: "cargo" directory: "/contracts" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f01e6e76..d9e8b1a4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -42,17 +42,6 @@ jobs: run: npm run test:coverage working-directory: frontend - - name: Upload frontend coverage to Codecov - uses: codecov/codecov-action@v5 - with: - files: frontend/coverage/lcov.info - flags: frontend - name: frontend-coverage - fail_ci_if_error: false - verbose: true - env: - CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - - name: Build run: npm run build working-directory: frontend diff --git a/backend/.env.example b/backend/.env.example index f754bc8d..5035a28b 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -1,6 +1,16 @@ # Database DATABASE_URL="postgresql://user:password@localhost:5433/flowfi?schema=public" +# PostgreSQL pool settings +# Maximum database connections per backend process (default: 10) +PG_POOL_MAX=10 +# How long an idle connection stays open before being closed (milliseconds, default: 30000) +PG_IDLE_TIMEOUT_MS=30000 +# How long to wait when establishing a new database connection (milliseconds, default: 5000) +PG_CONNECTION_TIMEOUT_MS=5000 +# Maximum time a PostgreSQL statement may run before cancellation (milliseconds, default: 30000) +PG_STATEMENT_TIMEOUT_MS=30000 + # Server PORT=3001 NODE_ENV=development diff --git a/backend/Dockerfile b/backend/Dockerfile index 671fb1d2..215f82e3 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -6,6 +6,7 @@ COPY package*.json ./ RUN npm install COPY tsconfig.json ./ +COPY prisma.config.ts ./ COPY src ./src COPY prisma ./prisma @@ -21,8 +22,10 @@ COPY package*.json ./ RUN npm install --omit=dev COPY --from=builder /app/dist ./dist -COPY --from=builder /app/src/generated ./src/generated +COPY --from=builder /app/src/generated ./dist/generated +COPY --from=builder /app/prisma ./prisma +COPY --from=builder /app/prisma.config.ts ./prisma.config.ts EXPOSE 3001 -CMD ["npm", "start"] +CMD ["npm", "start"] \ No newline at end of file diff --git a/backend/package.json b/backend/package.json index 9d65dc78..42c27d07 100644 --- a/backend/package.json +++ b/backend/package.json @@ -32,7 +32,6 @@ "express-rate-limit": "^8.5.2", "ioredis": "^5.11.1", "pg": "^8.21.0", - "stellar-sdk": "^13.3.0", "swagger-jsdoc": "^6.3.0", "swagger-ui-express": "^5.0.1", "winston": "^3.11.0", diff --git a/backend/prisma/migrations/20260630000000_add_stream_token_address_index/migration.sql b/backend/prisma/migrations/20260630000000_add_stream_token_address_index/migration.sql new file mode 100644 index 00000000..54ce5daf --- /dev/null +++ b/backend/prisma/migrations/20260630000000_add_stream_token_address_index/migration.sql @@ -0,0 +1,2 @@ +-- CreateIndex +CREATE INDEX IF NOT EXISTS "Stream_tokenAddress_idx" ON "Stream"("tokenAddress"); diff --git a/backend/prisma/schema.prisma b/backend/prisma/schema.prisma index bfece5c2..ce3f39be 100644 --- a/backend/prisma/schema.prisma +++ b/backend/prisma/schema.prisma @@ -51,6 +51,7 @@ model Stream { @@index([sender]) @@index([recipient]) + @@index([tokenAddress]) @@index([streamId]) @@index([isActive]) @@index([isPaused]) diff --git a/backend/prisma/seed.ts b/backend/prisma/seed.ts index d7ebc71c..ccf856aa 100644 --- a/backend/prisma/seed.ts +++ b/backend/prisma/seed.ts @@ -1,9 +1,8 @@ -import pg from 'pg'; import { PrismaPg } from '@prisma/adapter-pg'; import { PrismaClient } from '../src/generated/prisma/index.js'; +import { createPgPool } from '../src/lib/pg-pool.js'; -const connectionString = process.env.DATABASE_URL; -const pool = new pg.Pool({ connectionString }); +const pool = createPgPool(); const adapter = new PrismaPg(pool); const prisma = new PrismaClient({ adapter }); diff --git a/backend/src/app.ts b/backend/src/app.ts index a2bfe853..b69de979 100644 --- a/backend/src/app.ts +++ b/backend/src/app.ts @@ -1,26 +1,33 @@ -import express, { type Request, type Response, type NextFunction } from 'express'; -import cors from 'cors'; -import swaggerUi from 'swagger-ui-express'; -import { swaggerSpec } from './config/swagger.js'; -import { apiVersionMiddleware, type VersionedRequest } from './middleware/api-version.middleware.js'; -import { sandboxMiddleware } from './middleware/sandbox.middleware.js'; -import { globalRateLimiter } from './middleware/rate-limiter.middleware.js'; -import { requestIdMiddleware } from './middleware/requestId.js'; -import v1Routes from './routes/v1/index.js'; - -import healthRoutes from './routes/health.routes.js'; +import express, { + type Request, + type Response, + type NextFunction, +} from "express"; +import cors from "cors"; +import swaggerUi from "swagger-ui-express"; +import { swaggerSpec } from "./config/swagger.js"; +import { + apiVersionMiddleware, + type VersionedRequest, +} from "./middleware/api-version.middleware.js"; +import { sandboxMiddleware } from "./middleware/sandbox.middleware.js"; +import { globalRateLimiter } from "./middleware/rate-limiter.middleware.js"; +import { requestIdMiddleware } from "./middleware/requestId.js"; +import v1Routes from "./routes/v1/index.js"; + +import healthRoutes from "./routes/health.routes.js"; const app = express(); -const isProduction = process.env.NODE_ENV === 'production'; -const rawCors = process.env.CORS_ALLOWED_ORIGINS ?? ''; +const isProduction = process.env.NODE_ENV === "production"; +const rawCors = process.env.CORS_ALLOWED_ORIGINS ?? ""; const allowedOrigins = rawCors - .split(',') - .map((origin) => origin.trim()) - .filter(Boolean); + .split(",") + .map((origin) => origin.trim()) + .filter(Boolean); // Default in development to only localhost:3000 (frontend dev server) if (!process.env.CORS_ALLOWED_ORIGINS && !isProduction) { - allowedOrigins.push('http://localhost:3000'); + allowedOrigins.push("http://localhost:3000"); } // Apply global rate limiter first @@ -29,52 +36,60 @@ app.use(globalRateLimiter); // Request ID tracing app.use(requestIdMiddleware); -app.disable('x-powered-by'); +app.disable("x-powered-by"); // Helmet-equivalent core headers without external dependency. // Strict CSP applied globally; the /api-docs route overrides it below for Swagger UI. app.use((req: Request, res: Response, next: NextFunction) => { - res.setHeader('X-Content-Type-Options', 'nosniff'); - res.setHeader('X-Frame-Options', 'DENY'); - res.setHeader('Referrer-Policy', 'no-referrer'); - res.setHeader('X-DNS-Prefetch-Control', 'off'); - res.setHeader('X-Download-Options', 'noopen'); - res.setHeader('X-Permitted-Cross-Domain-Policies', 'none'); - res.setHeader('Content-Security-Policy', "default-src 'self'; script-src 'self'; style-src 'self'; img-src 'self' data:; frame-ancestors 'none'; object-src 'none'"); - res.setHeader('Cross-Origin-Opener-Policy', 'same-origin'); - res.setHeader('Cross-Origin-Resource-Policy', 'same-origin'); - if (process.env.NODE_ENV === 'production') { - res.setHeader('Strict-Transport-Security', 'max-age=31536000; includeSubDomains'); - } - next(); + res.setHeader("X-Content-Type-Options", "nosniff"); + res.setHeader("X-Frame-Options", "DENY"); + res.setHeader("Referrer-Policy", "no-referrer"); + res.setHeader("X-DNS-Prefetch-Control", "off"); + res.setHeader("X-Download-Options", "noopen"); + res.setHeader("X-Permitted-Cross-Domain-Policies", "none"); + res.setHeader( + "Content-Security-Policy", + "default-src 'self'; script-src 'self'; style-src 'self'; img-src 'self' data:; frame-ancestors 'none'; object-src 'none'", + ); + res.setHeader("Cross-Origin-Opener-Policy", "same-origin"); + res.setHeader("Cross-Origin-Resource-Policy", "same-origin"); + if (process.env.NODE_ENV === "production") { + res.setHeader( + "Strict-Transport-Security", + "max-age=31536000; includeSubDomains", + ); + } + next(); }); -app.use(cors({ +app.use( + cors({ origin(origin, callback) { - // Allow non-browser clients (no Origin header) - if (!origin) { - callback(null, true); - return; - } - - if (allowedOrigins.includes(origin)) { - callback(null, true); - return; - } - - // Not allowed - callback(new Error('CORS origin not allowed')); + // Allow non-browser clients (no Origin header) + if (!origin) { + callback(null, true); + return; + } + + if (allowedOrigins.includes(origin)) { + callback(null, true); + return; + } + + // Not allowed + callback(new Error("CORS origin not allowed")); }, credentials: true, -})); + }), +); // Convert CORS errors into 403 responses so callers get a clear status code app.use((err: unknown, req: Request, res: Response, next: NextFunction) => { - if (err instanceof Error && err.message === 'CORS origin not allowed') { - res.status(403).json({ error: 'CORS origin not allowed' }); - return; - } - next(err); + if (err instanceof Error && err.message === "CORS origin not allowed") { + res.status(403).json({ error: "CORS origin not allowed" }); + return; + } + next(err); }); app.use(express.json()); @@ -83,18 +98,26 @@ app.use(sandboxMiddleware); // Swagger UI setup // Override CSP for /api-docs only: Swagger UI requires inline scripts/styles. -app.use('/api-docs', (req: Request, res: Response, next: NextFunction) => { - res.setHeader('Content-Security-Policy', "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'; img-src 'self' data:; frame-ancestors 'none'; object-src 'none'"); +app.use( + "/api-docs", + (req: Request, res: Response, next: NextFunction) => { + res.setHeader( + "Content-Security-Policy", + "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'; img-src 'self' data:; frame-ancestors 'none'; object-src 'none'", + ); next(); -}, swaggerUi.serve, swaggerUi.setup(swaggerSpec, { - customCss: '.swagger-ui .topbar { display: none }', - customSiteTitle: 'FlowFi API Documentation', -})); + }, + swaggerUi.serve, + swaggerUi.setup(swaggerSpec, { + customCss: ".swagger-ui .topbar { display: none }", + customSiteTitle: "FlowFi API Documentation", + }), +); // Serve raw OpenAPI spec as JSON -app.get('/api-docs.json', (req: Request, res: Response) => { - res.setHeader('Content-Type', 'application/json'); - res.send(swaggerSpec); +app.get("/api-docs.json", (req: Request, res: Response) => { + res.setHeader("Content-Type", "application/json"); + res.send(swaggerSpec); }); // API Versioning @@ -105,16 +128,16 @@ app.use(apiVersionMiddleware); // After versioning middleware, /v1/streams becomes /streams, so we mount v1Routes at root // But only handle requests that had a version prefix (apiVersion is set) app.use((req: Request, res: Response, next: NextFunction) => { - const versionedReq = req as VersionedRequest; - if (versionedReq.apiVersion) { - // This was a versioned request, route to v1 handlers - return v1Routes(req, res, next); - } - return next(); // Not versioned, continue to deprecated handlers + const versionedReq = req as VersionedRequest; + if (versionedReq.apiVersion) { + // This was a versioned request, route to v1 handlers + return v1Routes(req, res, next); + } + return next(); // Not versioned, continue to deprecated handlers }); // Health check routes -app.use('/health', healthRoutes); +app.use("/health", healthRoutes); /** * @openapi @@ -128,11 +151,11 @@ app.use('/health', healthRoutes); * 200: * description: API is running successfully */ -app.get('/', (req: Request, res: Response) => { - res.send('FlowFi Backend is running'); +app.get("/", (req: Request, res: Response) => { + res.send("FlowFi Backend is running"); }); -import { errorHandler } from './middleware/error.middleware.js'; +import { errorHandler } from "./middleware/error.middleware.js"; app.use(errorHandler); diff --git a/backend/src/controllers/stream.controller.ts b/backend/src/controllers/stream.controller.ts index 5476fb6e..0f88b551 100644 --- a/backend/src/controllers/stream.controller.ts +++ b/backend/src/controllers/stream.controller.ts @@ -67,9 +67,26 @@ function sumStringI128(values: string[]): string { export const createStream = async (req: Request, res: Response) => { try { const { streamId, sender, recipient, tokenAddress, ratePerSecond, depositedAmount, startTime } = req.body; + const callerAddress = (req as AuthenticatedRequest).user?.publicKey; - const parsedStreamId = Number.parseInt(streamId, 10); - const parsedStartTime = Number.parseInt(startTime, 10); + if (callerAddress && callerAddress !== sender) { + return res.status(403).json({ + error: 'Forbidden', + message: 'Only the stream sender can create or reactivate the stream' + }); + } + + if ( + streamId == null + || startTime == null + || ratePerSecond == null + || depositedAmount == null + ) { + return res.status(400).json({ error: 'Missing required numeric fields in request body' }); + } + + const parsedStreamId = Number.parseInt(String(streamId), 10); + const parsedStartTime = Number.parseInt(String(startTime), 10); const parsedRatePerSecond = BigInt(ratePerSecond); const parsedDepositedAmount = BigInt(depositedAmount); @@ -113,8 +130,8 @@ export const createStream = async (req: Request, res: Response) => { return res.status(201).json(stream); } catch (error) { - if (error instanceof RangeError) { - logger.error('Range error in createStream:', error); + if (error instanceof RangeError || error instanceof SyntaxError || error instanceof TypeError) { + logger.error('Numeric parsing error in createStream:', error); return res.status(400).json({ error: 'Invalid numeric values in request body' }); } logger.error('Error creating/upserting stream:', error); @@ -316,7 +333,11 @@ export const getStreamEvents = async (req: Request, res: Response) => { const [events, total] = await Promise.all([ prisma.streamEvent.findMany({ where: whereClause, - orderBy: { timestamp: order }, + // `timestamp` is not unique (events in the same block/ledger can + // share a timestamp), so it can't be the sole sort key for cursor + // pagination. Add `id` as a unique tiebreaker so ordering (and + // therefore cursor pagination) is stable across pages. + orderBy: [{ timestamp: order }, { id: order }], take: limit, ...(cursor ? { cursor: { id: cursor }, skip: 1 } @@ -770,4 +791,4 @@ export const resumeStream = async (req: Request, res: Response) => { logger.error('Error resuming stream:', error); return res.status(500).json({ error: 'Internal server error' }); } -}; +}; \ No newline at end of file diff --git a/backend/src/lib/pg-pool.ts b/backend/src/lib/pg-pool.ts new file mode 100644 index 00000000..fcb098f2 --- /dev/null +++ b/backend/src/lib/pg-pool.ts @@ -0,0 +1,21 @@ +import pg from 'pg'; + +const parsePositiveIntegerEnv = (name: string, defaultValue: number): number => { + const rawValue = process.env[name]; + + if (!rawValue) return defaultValue; + + const parsedValue = Number.parseInt(rawValue, 10); + + return Number.isInteger(parsedValue) && parsedValue > 0 ? parsedValue : defaultValue; +}; + +export const createPgPoolConfig = (): pg.PoolConfig => ({ + connectionString: process.env.DATABASE_URL, + max: parsePositiveIntegerEnv('PG_POOL_MAX', 10), + idleTimeoutMillis: parsePositiveIntegerEnv('PG_IDLE_TIMEOUT_MS', 30_000), + connectionTimeoutMillis: parsePositiveIntegerEnv('PG_CONNECTION_TIMEOUT_MS', 5_000), + statement_timeout: parsePositiveIntegerEnv('PG_STATEMENT_TIMEOUT_MS', 30_000), +}); + +export const createPgPool = () => new pg.Pool(createPgPoolConfig()); diff --git a/backend/src/lib/prisma.ts b/backend/src/lib/prisma.ts index 0ea5168a..fd4ddfe2 100644 --- a/backend/src/lib/prisma.ts +++ b/backend/src/lib/prisma.ts @@ -1,16 +1,15 @@ import pg from 'pg'; import { PrismaPg } from '@prisma/adapter-pg'; import { PrismaClient } from '../generated/prisma/index.js'; +import { createPgPool } from './pg-pool.js'; const globalForPrisma = global as unknown as { prisma?: PrismaClient; pool?: pg.Pool; }; -const connectionString = process.env.DATABASE_URL; - if (!globalForPrisma.pool) { - globalForPrisma.pool = new pg.Pool({ connectionString }); + globalForPrisma.pool = createPgPool(); } const adapter = new PrismaPg(globalForPrisma.pool); diff --git a/backend/src/middleware/auth.ts b/backend/src/middleware/auth.ts index b9694d12..082d447d 100644 --- a/backend/src/middleware/auth.ts +++ b/backend/src/middleware/auth.ts @@ -98,13 +98,31 @@ export function verifyJwt(token: string): { publicKey: string } | null { try { const [header, body, sig] = token.split('.'); if (!header || !body || !sig) return null; + + // Verify signature const expected = crypto .createHmac('sha256', JWT_SECRET) .update(`${header}.${body}`) .digest(); - if (!crypto.timingSafeEqual(Buffer.from(sig, 'base64url'), expected)) return null; + + let providedSig: Buffer; + try { + providedSig = Buffer.from(sig, 'base64url'); + } catch { + return null; + } + + // Use timingSafeEqual to prevent timing attacks + if (providedSig.length !== expected.length || !crypto.timingSafeEqual(providedSig, expected)) { + return null; + } + + // Verify expiration const payload = JSON.parse(Buffer.from(body, 'base64url').toString()); - if (payload.exp < Math.floor(Date.now() / 1000)) return null; + if (!payload.exp || payload.exp < Math.floor(Date.now() / 1000)) { + return null; + } + return { publicKey: payload.sub }; } catch { return null; diff --git a/backend/src/routes/v1/admin.routes.ts b/backend/src/routes/v1/admin.routes.ts index 253f1124..f6427c74 100644 --- a/backend/src/routes/v1/admin.routes.ts +++ b/backend/src/routes/v1/admin.routes.ts @@ -222,7 +222,7 @@ router.post('/indexer/reset', async (req: Request, res: Response) => { * /v1/admin/indexer/replay: * post: * tags: [Admin] - * summary: Replay events from a given ledger (idempotent) + * summary: Replay events from a given ledger (StreamEvent rows deduplicated; stream mutations not idempotent — see indexerService.ts JSDoc) * security: [{ adminAuth: [] }] * parameters: * - in: query diff --git a/backend/src/services/indexerService.ts b/backend/src/services/indexerService.ts index 4dc6a2d3..d386be4f 100644 --- a/backend/src/services/indexerService.ts +++ b/backend/src/services/indexerService.ts @@ -38,8 +38,13 @@ export async function resetIndexer(toLedger: number): Promise { /** * Replay events from a given ledger by resetting state and triggering a poll. - * Deduplication in the worker (transactionHash + eventType + ledger) ensures - * no duplicate StreamEvent rows are created. + * The @@unique([transactionHash, eventType]) constraint on StreamEvent + * guarantees no duplicate StreamEvent rows are created on replay. + * + * CAVEAT: This dedup does NOT apply to stream state mutations. + * Stream.withdrawnAmount (handleTokensWithdrawn, soroban-event-worker.ts:635) + * is incremented unconditionally on every replay, so replay is NOT fully + * idempotent. See issue #808 for the withdrawnAmount idempotency fix. */ export async function replayFromLedger(fromLedger: number): Promise { await resetIndexer(fromLedger); diff --git a/backend/src/services/sorobanService.ts b/backend/src/services/sorobanService.ts index 833d0af0..0c36f12d 100644 --- a/backend/src/services/sorobanService.ts +++ b/backend/src/services/sorobanService.ts @@ -4,9 +4,29 @@ import logger from '../logger.js'; const RPC_URL = process.env.SOROBAN_RPC_URL ?? 'https://soroban-testnet.stellar.org'; const CONTRACT_ID = process.env.STREAM_CONTRACT_ID ?? ''; const KEEPER_SECRET = process.env.KEEPER_SECRET_KEY ?? ''; -/** DB data older than this is considered stale and triggers an RPC fallback. */ +/** + * DB data older than this is considered stale and triggers an RPC fallback. + * 30 s ≈ avg Stellar ledger close time (~5 s) × 6 ledgers — a reasonable + * window to tolerate indexer lag without hammering the RPC on every request. + */ const STALE_THRESHOLD_MS = 30_000; +/** Stroops charged on read-only simulation transactions (no real resource cost). */ +const SIMULATION_FEE = '100'; + +/** Stroops charged on real contract-invocation transactions submitted to the network. */ +const SUBMIT_FEE = '1000'; + +/** Transaction validity window in seconds (applied via setTimeout). */ +const TX_TIMEOUT_SECONDS = 30; + +/** + * Throw-away source account used when building simulation-only transactions. + * Any valid Ed25519 public key works here; the account never needs to exist on-chain + * because simulation transactions are never submitted. + */ +const SIMULATION_PLACEHOLDER_ACCOUNT = 'GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN'; + const server = new rpc.Server(RPC_URL, { allowHttp: true }); export interface ChainStream { @@ -51,12 +71,9 @@ async function simulateContractCall(method: string, args: xdr.ScVal[]): Promise< const op = contract.call(method, ...args); const tx = new TransactionBuilder( - new Account( - 'GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN', - '0' - ), + new Account(SIMULATION_PLACEHOLDER_ACCOUNT, '0'), { - fee: '100', + fee: SIMULATION_FEE, networkPassphrase: process.env.STELLAR_NETWORK === 'mainnet' ? Networks.PUBLIC @@ -64,7 +81,7 @@ async function simulateContractCall(method: string, args: xdr.ScVal[]): Promise< } ) .addOperation(op) - .setTimeout(30) + .setTimeout(TX_TIMEOUT_SECONDS) .build(); const result = await server.simulateTransaction(tx); @@ -87,14 +104,14 @@ async function submitContractCall(method: string, args: xdr.ScVal[], senderSecre const op = contract.call(method, ...args); const tx = new TransactionBuilder(account, { - fee: '1000', + fee: SUBMIT_FEE, networkPassphrase: process.env.STELLAR_NETWORK === 'mainnet' ? Networks.PUBLIC : Networks.TESTNET, }) .addOperation(op) - .setTimeout(30) + .setTimeout(TX_TIMEOUT_SECONDS) .build(); // Simulate first to get foot print and resource info diff --git a/backend/src/workers/soroban-event-worker.ts b/backend/src/workers/soroban-event-worker.ts index e4e65f83..02f9faaf 100644 --- a/backend/src/workers/soroban-event-worker.ts +++ b/backend/src/workers/soroban-event-worker.ts @@ -1,8 +1,9 @@ -import { rpc, xdr, StrKey } from '@stellar/stellar-sdk'; -import { prisma, Prisma } from '../lib/prisma.js'; -import { INDEXER_STATE_ID } from '../lib/indexer-state.js'; -import { sseService } from '../services/sse.service.js'; -import logger from '../logger.js'; +import { rpc, xdr, StrKey } from "@stellar/stellar-sdk"; +import { prisma } from "../lib/prisma.js"; +import { INDEXER_STATE_ID } from "../lib/indexer-state.js"; +import { sseService } from "../services/sse.service.js"; +import logger from "../logger.js"; +import { Prisma } from "../generated/prisma/index.js"; // ─── Config ────────────────────────────────────────────────────────────────── @@ -44,10 +45,7 @@ export function decodeI128(val: xdr.ScVal): string { */ export function decodeAddress(val: xdr.ScVal): string { const addr = val.address(); - if ( - addr.switch().value === - xdr.ScAddressType.scAddressTypeAccount().value - ) { + if (addr.switch().value === xdr.ScAddressType.scAddressTypeAccount().value) { return StrKey.encodeEd25519PublicKey(addr.accountId().ed25519()); } // addr.contractId() returns a Hash (Opaque[]); cast to Uint8Array for encodeContract @@ -83,16 +81,13 @@ export class SorobanEventWorker { constructor() { const rpcUrl = - process.env.SOROBAN_RPC_URL ?? 'https://soroban-testnet.stellar.org'; - this.contractId = process.env.STREAM_CONTRACT_ID ?? ''; + process.env.SOROBAN_RPC_URL ?? "https://soroban-testnet.stellar.org"; + this.contractId = process.env.STREAM_CONTRACT_ID ?? ""; this.pollIntervalMs = parseInt( - process.env.INDEXER_POLL_INTERVAL_MS ?? '5000', - 10, - ); - this.startLedger = parseInt( - process.env.INDEXER_START_LEDGER ?? '0', + process.env.INDEXER_POLL_INTERVAL_MS ?? "5000", 10, ); + this.startLedger = parseInt(process.env.INDEXER_START_LEDGER ?? "0", 10); this.server = new rpc.Server(rpcUrl, { allowHttp: true }); } @@ -103,13 +98,13 @@ export class SorobanEventWorker { async start(): Promise { if (!this.contractId) { logger.warn( - '[SorobanWorker] STREAM_CONTRACT_ID is not set — event indexing disabled.', + "[SorobanWorker] STREAM_CONTRACT_ID is not set — event indexing disabled.", ); return; } this.isRunning = true; - logger.info('[SorobanWorker] Starting Soroban event indexer…'); + logger.info("[SorobanWorker] Starting Soroban event indexer…"); await this.poll(); } @@ -120,7 +115,7 @@ export class SorobanEventWorker { clearTimeout(this.pollTimer); this.pollTimer = undefined; } - logger.info('[SorobanWorker] Stopped.'); + logger.info("[SorobanWorker] Stopped."); } /** Wait for the currently-running poll batch to finish (no-op if idle). */ @@ -135,7 +130,7 @@ export class SorobanEventWorker { try { await this.fetchAndProcessEvents(); } catch (err) { - logger.error('[SorobanWorker] Manual poll error:', err); + logger.error("[SorobanWorker] Manual poll error:", err); } } @@ -146,8 +141,11 @@ export class SorobanEventWorker { this.pollTimer = setTimeout(() => this.poll(), this.pollIntervalMs); } - private async ensureSystemStream(tx: Prisma.TransactionClient): Promise { - const systemUser = 'GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAWHF'; + private async ensureSystemStream( + tx: Prisma.TransactionClient, + ): Promise { + const systemUser = + "GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAWHF"; await tx.user.upsert({ where: { publicKey: systemUser }, create: { publicKey: systemUser }, @@ -159,10 +157,11 @@ export class SorobanEventWorker { streamId: 0, sender: systemUser, recipient: systemUser, - tokenAddress: 'CDAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAHF', - ratePerSecond: '0', - depositedAmount: '0', - withdrawnAmount: '0', + tokenAddress: + "CDAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAHF", + ratePerSecond: "0", + depositedAmount: "0", + withdrawnAmount: "0", startTime: 0, lastUpdateTime: 0, endTime: 0, @@ -174,7 +173,7 @@ export class SorobanEventWorker { private async poll(): Promise { this.activeBatch = this.fetchAndProcessEvents().catch((err) => { - logger.error('[SorobanWorker] Unhandled error during poll:', err); + logger.error("[SorobanWorker] Unhandled error during poll:", err); }); try { await this.activeBatch; @@ -203,19 +202,21 @@ export class SorobanEventWorker { const baseFilter = { filters: [ { - type: 'contract' as const, + type: "contract" as const, contractIds: [this.contractId], }, ], limit: 100, - } satisfies Omit[0], 'startLedger' | 'cursor'>; + } satisfies Omit< + Parameters[0], + "startLedger" | "cursor" + >; // Prefer cursor-based pagination after the first poll so we never // re-process events. - const params: Parameters[0] = - state.lastCursor - ? { ...baseFilter, cursor: state.lastCursor } - : { ...baseFilter, startLedger: state.lastLedger || this.startLedger }; + const params: Parameters[0] = state.lastCursor + ? { ...baseFilter, cursor: state.lastCursor } + : { ...baseFilter, startLedger: state.lastLedger || this.startLedger }; const response = await this.server.getEvents(params); @@ -228,10 +229,10 @@ export class SorobanEventWorker { // This ensures that subsequent events (like 'fee_collected') that depend on // the stream existing in the DB can find it. const sortedEvents = [...response.events].sort((a, b) => { - const aType = a.topic[0] ? decodeSymbol(a.topic[0]) : ''; - const bType = b.topic[0] ? decodeSymbol(b.topic[0]) : ''; - if (aType === 'stream_created' && bType !== 'stream_created') return -1; - if (bType === 'stream_created' && aType !== 'stream_created') return 1; + const aType = a.topic[0] ? decodeSymbol(a.topic[0]) : ""; + const bType = b.topic[0] ? decodeSymbol(b.topic[0]) : ""; + if (aType === "stream_created" && bType !== "stream_created") return -1; + if (bType === "stream_created" && aType !== "stream_created") return 1; return 0; }); @@ -275,9 +276,7 @@ export class SorobanEventWorker { * Dispatch a single contract event to the appropriate handler based on the * first topic symbol. */ - public async processEvent( - event: rpc.Api.EventResponse, - ): Promise { + public async processEvent(event: rpc.Api.EventResponse): Promise { if (!event.topic || event.topic.length < 1) return; const topic0: xdr.ScVal | undefined = event.topic[0]; @@ -285,8 +284,11 @@ export class SorobanEventWorker { const eventName = decodeSymbol(topic0); - if (eventName === 'fee_config_updated' || eventName === 'admin_transferred') { - if (eventName === 'fee_config_updated') { + if ( + eventName === "fee_config_updated" || + eventName === "admin_transferred" + ) { + if (eventName === "fee_config_updated") { await this.handleFeeConfigUpdated(event); } else { await this.handleAdminTransferred(event); @@ -299,28 +301,28 @@ export class SorobanEventWorker { if (!topic1) return; switch (eventName) { - case 'stream_created': + case "stream_created": await this.handleStreamCreated(event, topic1); break; - case 'stream_topped_up': + case "stream_topped_up": await this.handleStreamToppedUp(event, topic1); break; - case 'tokens_withdrawn': + case "tokens_withdrawn": await this.handleTokensWithdrawn(event, topic1); break; - case 'stream_paused': + case "stream_paused": await this.handleStreamPaused(event, topic1); break; - case 'stream_resumed': + case "stream_resumed": await this.handleStreamResumed(event, topic1); break; - case 'stream_cancelled': + case "stream_cancelled": await this.handleStreamCancelled(event, topic1); break; - case 'stream_completed': + case "stream_completed": await this.handleStreamCompleted(event, topic1); break; - case 'fee_collected': + case "fee_collected": await this.handleFeeCollected(event, topic1); break; default: @@ -335,30 +337,35 @@ export class SorobanEventWorker { const body = decodeMap(event.value); if ( - !body['admin'] || - !body['old_treasury'] || - !body['new_treasury'] || - body['old_fee_rate_bps'] === undefined || - body['new_fee_rate_bps'] === undefined + !body["admin"] || + !body["old_treasury"] || + !body["new_treasury"] || + body["old_fee_rate_bps"] === undefined || + body["new_fee_rate_bps"] === undefined ) { - throw new Error('FeeConfigUpdated: missing body fields'); + throw new Error("FeeConfigUpdated: missing body fields"); } - const admin = decodeAddress(body['admin']); - const oldTreasury = decodeAddress(body['old_treasury']); - const newTreasury = decodeAddress(body['new_treasury']); - const oldFeeRateBps = decodeU32(body['old_fee_rate_bps']); - const newFeeRateBps = decodeU32(body['new_fee_rate_bps']); + const admin = decodeAddress(body["admin"]); + const oldTreasury = decodeAddress(body["old_treasury"]); + const newTreasury = decodeAddress(body["new_treasury"]); + const oldFeeRateBps = decodeU32(body["old_fee_rate_bps"]); + const newFeeRateBps = decodeU32(body["new_fee_rate_bps"]); const timestamp = Math.floor(Date.now() / 1000); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { await this.ensureSystemStream(tx); await tx.streamEvent.upsert({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'FEE_CONFIG_UPDATED' } }, + where: { + transactionHash_eventType: { + transactionHash: event.txHash, + eventType: "FEE_CONFIG_UPDATED", + }, + }, create: { streamId: 0, - eventType: 'FEE_CONFIG_UPDATED', + eventType: "FEE_CONFIG_UPDATED", transactionHash: event.txHash, ledgerSequence: event.ledger, timestamp, @@ -374,7 +381,7 @@ export class SorobanEventWorker { }); }); - sseService.broadcastToAdmin('stream.fee_config_updated', { + sseService.broadcastToAdmin("stream.fee_config_updated", { admin, oldTreasury, newTreasury, @@ -391,22 +398,27 @@ export class SorobanEventWorker { ): Promise { const body = decodeMap(event.value); - if (!body['previous_admin'] || !body['new_admin']) { - throw new Error('AdminTransferred: missing body fields'); + if (!body["previous_admin"] || !body["new_admin"]) { + throw new Error("AdminTransferred: missing body fields"); } - const previousAdmin = decodeAddress(body['previous_admin']); - const newAdmin = decodeAddress(body['new_admin']); + const previousAdmin = decodeAddress(body["previous_admin"]); + const newAdmin = decodeAddress(body["new_admin"]); const timestamp = Math.floor(Date.now() / 1000); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { await this.ensureSystemStream(tx); await tx.streamEvent.upsert({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'ADMIN_TRANSFERRED' } }, + where: { + transactionHash_eventType: { + transactionHash: event.txHash, + eventType: "ADMIN_TRANSFERRED", + }, + }, create: { streamId: 0, - eventType: 'ADMIN_TRANSFERRED', + eventType: "ADMIN_TRANSFERRED", transactionHash: event.txHash, ledgerSequence: event.ledger, timestamp, @@ -420,7 +432,7 @@ export class SorobanEventWorker { }); }); - sseService.broadcastToAdmin('stream.admin_transferred', { + sseService.broadcastToAdmin("stream.admin_transferred", { previousAdmin, newAdmin, transactionHash: event.txHash, @@ -439,22 +451,22 @@ export class SorobanEventWorker { const body = decodeMap(event.value); if ( - !body['sender'] || - !body['recipient'] || - !body['token_address'] || - !body['rate_per_second'] || - !body['deposited_amount'] || - !body['start_time'] + !body["sender"] || + !body["recipient"] || + !body["token_address"] || + !body["rate_per_second"] || + !body["deposited_amount"] || + !body["start_time"] ) { throw new Error(`StreamCreated #${streamId}: missing body fields`); } - const sender = decodeAddress(body['sender']); - const recipient = decodeAddress(body['recipient']); - const tokenAddress = decodeAddress(body['token_address']); - const ratePerSecond = decodeI128(body['rate_per_second']); - const depositedAmount = decodeI128(body['deposited_amount']); - const startTime = Number(decodeU64(body['start_time'])); + const sender = decodeAddress(body["sender"]); + const recipient = decodeAddress(body["recipient"]); + const tokenAddress = decodeAddress(body["token_address"]); + const ratePerSecond = decodeI128(body["rate_per_second"]); + const depositedAmount = decodeI128(body["deposited_amount"]); + const startTime = Number(decodeU64(body["start_time"])); const ratePerSecondBigInt = BigInt(ratePerSecond); const endTime = @@ -483,7 +495,7 @@ export class SorobanEventWorker { tokenAddress, ratePerSecond, depositedAmount, - withdrawnAmount: '0', + withdrawnAmount: "0", startTime, endTime, lastUpdateTime: startTime, @@ -501,17 +513,29 @@ export class SorobanEventWorker { }); const existingEvent = await tx.streamEvent.findUnique({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'CREATED' } }, + where: { + transactionHash_eventType: { + transactionHash: event.txHash, + eventType: "CREATED", + }, + }, select: { id: true }, }); if (existingEvent) { - logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=CREATED`); + logger.warn( + `[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=CREATED`, + ); } else { await tx.streamEvent.upsert({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'CREATED' } }, + where: { + transactionHash_eventType: { + transactionHash: event.txHash, + eventType: "CREATED", + }, + }, create: { streamId, - eventType: 'CREATED', + eventType: "CREATED", amount: depositedAmount, transactionHash: event.txHash, ledgerSequence: event.ledger, @@ -523,7 +547,7 @@ export class SorobanEventWorker { } }); - sseService.broadcastToStream(String(streamId), 'stream.created', { + sseService.broadcastToStream(String(streamId), "stream.created", { streamId, sender, recipient, @@ -543,18 +567,22 @@ export class SorobanEventWorker { const streamId = Number(decodeU64(streamIdTopic)); const body = decodeMap(event.value); - if (!body['amount'] || !body['new_deposited_amount']) { + if (!body["amount"] || !body["new_deposited_amount"]) { throw new Error(`StreamToppedUp #${streamId}: missing body fields`); } - const amount = decodeI128(body['amount']); - const newDepositedAmount = decodeI128(body['new_deposited_amount']); + const amount = decodeI128(body["amount"]); + const newDepositedAmount = decodeI128(body["new_deposited_amount"]); const timestamp = Math.floor(Date.now() / 1000); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { const stream = await tx.stream.findUniqueOrThrow({ where: { streamId }, - select: { ratePerSecond: true, startTime: true, totalPausedDuration: true } + select: { + ratePerSecond: true, + startTime: true, + totalPausedDuration: true, + }, }); const ratePerSecondBigInt = BigInt(stream.ratePerSecond); @@ -562,8 +590,8 @@ export class SorobanEventWorker { ratePerSecondBigInt === 0n ? null : stream.startTime + - Number(BigInt(newDepositedAmount) / ratePerSecondBigInt) + - stream.totalPausedDuration; + Number(BigInt(newDepositedAmount) / ratePerSecondBigInt) + + stream.totalPausedDuration; await tx.stream.update({ where: { streamId }, @@ -575,17 +603,29 @@ export class SorobanEventWorker { }); const existingEvent = await tx.streamEvent.findUnique({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'TOPPED_UP' } }, + where: { + transactionHash_eventType: { + transactionHash: event.txHash, + eventType: "TOPPED_UP", + }, + }, select: { id: true }, }); if (existingEvent) { - logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=TOPPED_UP`); + logger.warn( + `[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=TOPPED_UP`, + ); } else { await tx.streamEvent.upsert({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'TOPPED_UP' } }, + where: { + transactionHash_eventType: { + transactionHash: event.txHash, + eventType: "TOPPED_UP", + }, + }, create: { streamId, - eventType: 'TOPPED_UP', + eventType: "TOPPED_UP", amount, transactionHash: event.txHash, ledgerSequence: event.ledger, @@ -597,7 +637,7 @@ export class SorobanEventWorker { } }); - sseService.broadcastToStream(String(streamId), 'stream.topped_up', { + sseService.broadcastToStream(String(streamId), "stream.topped_up", { streamId, amount, newDepositedAmount, @@ -614,13 +654,13 @@ export class SorobanEventWorker { const streamId = Number(decodeU64(streamIdTopic)); const body = decodeMap(event.value); - if (!body['recipient'] || !body['amount'] || !body['timestamp']) { + if (!body["recipient"] || !body["amount"] || !body["timestamp"]) { throw new Error(`TokensWithdrawn #${streamId}: missing body fields`); } - const recipient = decodeAddress(body['recipient']); - const amount = decodeI128(body['amount']); - const timestamp = Number(decodeU64(body['timestamp'])); + const recipient = decodeAddress(body["recipient"]); + const amount = decodeI128(body["amount"]); + const timestamp = Number(decodeU64(body["timestamp"])); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { const stream = await tx.stream.findUniqueOrThrow({ @@ -641,17 +681,29 @@ export class SorobanEventWorker { }); const existingEvent = await tx.streamEvent.findUnique({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } }, + where: { + transactionHash_eventType: { + transactionHash: event.txHash, + eventType: "WITHDRAWN", + }, + }, select: { id: true }, }); if (existingEvent) { - logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=WITHDRAWN`); + logger.warn( + `[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=WITHDRAWN`, + ); } else { await tx.streamEvent.upsert({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } }, + where: { + transactionHash_eventType: { + transactionHash: event.txHash, + eventType: "WITHDRAWN", + }, + }, create: { streamId, - eventType: 'WITHDRAWN', + eventType: "WITHDRAWN", amount, transactionHash: event.txHash, ledgerSequence: event.ledger, @@ -663,7 +715,7 @@ export class SorobanEventWorker { } }); - sseService.broadcastToStream(String(streamId), 'stream.withdrawn', { + sseService.broadcastToStream(String(streamId), "stream.withdrawn", { streamId, recipient, amount, @@ -680,12 +732,12 @@ export class SorobanEventWorker { const streamId = Number(decodeU64(streamIdTopic)); const body = decodeMap(event.value); - if (!body['amount_withdrawn'] || !body['refunded_amount']) { + if (!body["amount_withdrawn"] || !body["refunded_amount"]) { throw new Error(`StreamCancelled #${streamId}: missing body fields`); } - const amountWithdrawn = decodeI128(body['amount_withdrawn']); - const refundedAmount = decodeI128(body['refunded_amount']); + const amountWithdrawn = decodeI128(body["amount_withdrawn"]); + const refundedAmount = decodeI128(body["refunded_amount"]); const timestamp = Math.floor(Date.now() / 1000); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { @@ -699,17 +751,29 @@ export class SorobanEventWorker { }); const existingEvent = await tx.streamEvent.findUnique({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'CANCELLED' } }, + where: { + transactionHash_eventType: { + transactionHash: event.txHash, + eventType: "CANCELLED", + }, + }, select: { id: true }, }); if (existingEvent) { - logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=CANCELLED`); + logger.warn( + `[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=CANCELLED`, + ); } else { await tx.streamEvent.upsert({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'CANCELLED' } }, + where: { + transactionHash_eventType: { + transactionHash: event.txHash, + eventType: "CANCELLED", + }, + }, create: { streamId, - eventType: 'CANCELLED', + eventType: "CANCELLED", amount: refundedAmount, transactionHash: event.txHash, ledgerSequence: event.ledger, @@ -721,7 +785,7 @@ export class SorobanEventWorker { } }); - sseService.broadcastToStream(String(streamId), 'stream.cancelled', { + sseService.broadcastToStream(String(streamId), "stream.cancelled", { streamId, refundedAmount, amountWithdrawn, @@ -738,12 +802,12 @@ export class SorobanEventWorker { const streamId = Number(decodeU64(streamIdTopic)); const body = decodeMap(event.value); - if (!body['recipient'] || !body['total_withdrawn']) { + if (!body["recipient"] || !body["total_withdrawn"]) { throw new Error(`StreamCompleted #${streamId}: missing body fields`); } - const recipient = decodeAddress(body['recipient']); - const totalWithdrawn = decodeI128(body['total_withdrawn']); + const recipient = decodeAddress(body["recipient"]); + const totalWithdrawn = decodeI128(body["total_withdrawn"]); const timestamp = Math.floor(Date.now() / 1000); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { @@ -757,17 +821,29 @@ export class SorobanEventWorker { }); const existingEvent = await tx.streamEvent.findUnique({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'COMPLETED' } }, + where: { + transactionHash_eventType: { + transactionHash: event.txHash, + eventType: "COMPLETED", + }, + }, select: { id: true }, }); if (existingEvent) { - logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=COMPLETED`); + logger.warn( + `[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=COMPLETED`, + ); } else { await tx.streamEvent.upsert({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'COMPLETED' } }, + where: { + transactionHash_eventType: { + transactionHash: event.txHash, + eventType: "COMPLETED", + }, + }, create: { streamId, - eventType: 'COMPLETED', + eventType: "COMPLETED", amount: totalWithdrawn, transactionHash: event.txHash, ledgerSequence: event.ledger, @@ -779,7 +855,7 @@ export class SorobanEventWorker { } }); - sseService.broadcastToStream(String(streamId), 'stream.completed', { + sseService.broadcastToStream(String(streamId), "stream.completed", { streamId, recipient, totalWithdrawn, @@ -796,27 +872,39 @@ export class SorobanEventWorker { const streamId = Number(decodeU64(streamIdTopic)); const body = decodeMap(event.value); - if (!body['treasury'] || !body['fee_amount'] || !body['token']) { + if (!body["treasury"] || !body["fee_amount"] || !body["token"]) { throw new Error(`FeeCollected #${streamId}: missing body fields`); } - const treasury = decodeAddress(body['treasury']); - const feeAmount = decodeI128(body['fee_amount']); - const token = decodeAddress(body['token']); + const treasury = decodeAddress(body["treasury"]); + const feeAmount = decodeI128(body["fee_amount"]); + const token = decodeAddress(body["token"]); const timestamp = Math.floor(Date.now() / 1000); const existingEvent = await prisma.streamEvent.findUnique({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'FEE_COLLECTED' } }, + where: { + transactionHash_eventType: { + transactionHash: event.txHash, + eventType: "FEE_COLLECTED", + }, + }, select: { id: true }, }); if (existingEvent) { - logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=FEE_COLLECTED`); + logger.warn( + `[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=FEE_COLLECTED`, + ); } else { await prisma.streamEvent.upsert({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'FEE_COLLECTED' } }, + where: { + transactionHash_eventType: { + transactionHash: event.txHash, + eventType: "FEE_COLLECTED", + }, + }, create: { streamId, - eventType: 'FEE_COLLECTED', + eventType: "FEE_COLLECTED", amount: feeAmount, transactionHash: event.txHash, ledgerSequence: event.ledger, @@ -828,7 +916,7 @@ export class SorobanEventWorker { } // Broadcast to admin channel for treasury reporting - sseService.broadcastToAdmin('stream.fee_collected', { + sseService.broadcastToAdmin("stream.fee_collected", { streamId, treasury, feeAmount, @@ -845,12 +933,12 @@ export class SorobanEventWorker { const streamId = Number(decodeU64(streamIdTopic)); const body = decodeMap(event.value); - if (!body['sender'] || !body['paused_at']) { + if (!body["sender"] || !body["paused_at"]) { throw new Error(`StreamPaused #${streamId}: missing body fields`); } - const sender = decodeAddress(body['sender']); - const pausedAt = Number(decodeU64(body['paused_at'])); + const sender = decodeAddress(body["sender"]); + const pausedAt = Number(decodeU64(body["paused_at"])); const timestamp = Math.floor(Date.now() / 1000); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { @@ -864,17 +952,29 @@ export class SorobanEventWorker { }); const existingEvent = await tx.streamEvent.findUnique({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'PAUSED' } }, + where: { + transactionHash_eventType: { + transactionHash: event.txHash, + eventType: "PAUSED", + }, + }, select: { id: true }, }); if (existingEvent) { - logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=PAUSED`); + logger.warn( + `[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=PAUSED`, + ); } else { await tx.streamEvent.upsert({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'PAUSED' } }, + where: { + transactionHash_eventType: { + transactionHash: event.txHash, + eventType: "PAUSED", + }, + }, create: { streamId, - eventType: 'PAUSED', + eventType: "PAUSED", transactionHash: event.txHash, ledgerSequence: event.ledger, timestamp, @@ -885,7 +985,7 @@ export class SorobanEventWorker { } }); - sseService.broadcastToStream(String(streamId), 'stream.paused', { + sseService.broadcastToStream(String(streamId), "stream.paused", { streamId, sender, pausedAt, @@ -902,12 +1002,12 @@ export class SorobanEventWorker { const streamId = Number(decodeU64(streamIdTopic)); const body = decodeMap(event.value); - if (!body['sender'] || !body['new_end_time']) { + if (!body["sender"] || !body["new_end_time"]) { throw new Error(`StreamResumed #${streamId}: missing body fields`); } - const sender = decodeAddress(body['sender']); - const newEndTime = Number(decodeU64(body['new_end_time'])); + const sender = decodeAddress(body["sender"]); + const newEndTime = Number(decodeU64(body["new_end_time"])); const timestamp = Math.floor(Date.now() / 1000); await prisma.$transaction(async (tx: Prisma.TransactionClient) => { @@ -923,7 +1023,8 @@ export class SorobanEventWorker { additionalPausedDuration = timestamp - currentStream.pausedAt; } - const newTotalPausedDuration = currentStream.totalPausedDuration + additionalPausedDuration; + const newTotalPausedDuration = + currentStream.totalPausedDuration + additionalPausedDuration; await tx.stream.update({ where: { streamId }, @@ -937,17 +1038,29 @@ export class SorobanEventWorker { }); const existingEvent = await tx.streamEvent.findUnique({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'RESUMED' } }, + where: { + transactionHash_eventType: { + transactionHash: event.txHash, + eventType: "RESUMED", + }, + }, select: { id: true }, }); if (existingEvent) { - logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=RESUMED`); + logger.warn( + `[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=RESUMED`, + ); } else { await tx.streamEvent.upsert({ - where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'RESUMED' } }, + where: { + transactionHash_eventType: { + transactionHash: event.txHash, + eventType: "RESUMED", + }, + }, create: { streamId, - eventType: 'RESUMED', + eventType: "RESUMED", transactionHash: event.txHash, ledgerSequence: event.ledger, timestamp, @@ -963,7 +1076,7 @@ export class SorobanEventWorker { } }); - sseService.broadcastToStream(String(streamId), 'stream.resumed', { + sseService.broadcastToStream(String(streamId), "stream.resumed", { streamId, sender, newEndTime, diff --git a/backend/tests/auth.test.ts b/backend/tests/auth.test.ts index 9687d38b..03c29ac7 100644 --- a/backend/tests/auth.test.ts +++ b/backend/tests/auth.test.ts @@ -3,6 +3,7 @@ import request from 'supertest'; import * as crypto from 'crypto'; import * as StellarSdk from '@stellar/stellar-sdk'; import express from 'express'; +import { rateLimit } from 'express-rate-limit'; import app from '../src/app.js'; import { __authChallengeTestUtils, requireAdmin, signJwt } from '../src/middleware/auth.js'; @@ -287,6 +288,14 @@ describe('Authentication & Middleware Tests', () => { beforeEach(() => { adminApp = express(); adminApp.use(express.json()); + const adminLimiter = rateLimit({ + windowMs: 60_000, + max: 100, + standardHeaders: true, + legacyHeaders: false, + message: { error: 'Too many requests' }, + }); + adminApp.use('/test-admin', adminLimiter); adminApp.get('/test-admin', requireAdmin, (_req: any, res: any) => { res.status(200).json({ success: true }); }); diff --git a/backend/tests/deprecated.test.ts b/backend/tests/deprecated.test.ts deleted file mode 100644 index 99d11648..00000000 --- a/backend/tests/deprecated.test.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { describe, it, expect } from 'vitest'; -import request from 'supertest'; -import app from '../src/app.js'; - -// This file tests deprecated endpoints WITHOUT any Prisma mocking -// so the real route handlers respond directly without interference. - -describe('Deprecated route responses', () => { - it('POST /streams returns 410 Gone', async () => { - const response = await request(app) - .post('/streams') - .send({}) - .set('Accept', 'application/json'); - - expect(response.status).toBe(410); - expect(response.body.deprecated).toBe(true); - expect(response.body.migration).toMatchObject({ old: '/streams', new: '/v1/streams' }); - }); - - it('POST /events returns 410 Gone', async () => { - const response = await request(app) - .post('/events') - .send({}) - .set('Accept', 'application/json'); - - expect(response.status).toBe(410); - expect(response.body.deprecated).toBe(true); - expect(response.body.migration).toMatchObject({ old: '/events', new: '/v1/events' }); - }); -}); diff --git a/backend/tests/stream.controller.test.ts b/backend/tests/stream.controller.test.ts index ef37fe1b..0bf669f6 100644 --- a/backend/tests/stream.controller.test.ts +++ b/backend/tests/stream.controller.test.ts @@ -1,11 +1,24 @@ -import { describe, it, expect, vi, beforeEach } from 'vitest'; -import { createStream, listStreams, getStream, getStreamClaimableAmount, pauseStream, resumeStream } from '../src/controllers/stream.controller.js'; -import { prisma } from '../src/lib/prisma.js'; -import { claimableAmountService } from '../src/services/claimable.service.js'; -import * as sorobanService from '../src/services/sorobanService.js'; -import type { Request, Response } from 'express'; - -vi.mock('../src/lib/prisma.js', () => ({ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { + createStream, + listStreams, + getStream, + getStreamClaimableAmount, + pauseStream, + resumeStream, +} from "../src/controllers/stream.controller.js"; +import { prisma } from "../src/lib/prisma.js"; +import { claimableAmountService } from "../src/services/claimable.service.js"; +import * as sorobanService from "../src/services/sorobanService.js"; +import type { Request, Response } from "express"; + +type TestRequest = Partial & { + user?: { + publicKey: string; + }; +}; + +vi.mock("../src/lib/prisma.js", () => ({ prisma: { stream: { upsert: vi.fn(), @@ -20,20 +33,20 @@ vi.mock('../src/lib/prisma.js', () => ({ }, })); -vi.mock('../src/services/claimable.service.js', () => ({ +vi.mock("../src/services/claimable.service.js", () => ({ claimableAmountService: { getClaimableAmount: vi.fn(), }, })); -vi.mock('../src/services/sorobanService.js', () => ({ +vi.mock("../src/services/sorobanService.js", () => ({ isStale: vi.fn(), getStreamFromChain: vi.fn(), pauseStream: vi.fn(), resumeStream: vi.fn(), })); -vi.mock('../src/logger.js', () => ({ +vi.mock("../src/logger.js", () => ({ default: { info: vi.fn(), error: vi.fn(), @@ -41,8 +54,8 @@ vi.mock('../src/logger.js', () => ({ }, })); -describe('Stream Controller', () => { - let req: Partial; +describe("Stream Controller", () => { + let req: TestRequest; let res: Partial; beforeEach(() => { @@ -51,16 +64,19 @@ describe('Stream Controller', () => { (sorobanService.getStreamFromChain as any).mockResolvedValue(null); req = { body: { - streamId: '123', - sender: 'GSENDER', - recipient: 'GRECIPIENT', - tokenAddress: 'T1', - ratePerSecond: '10', - depositedAmount: '1000', - startTime: '1622505600', + streamId: "123", + sender: "GSENDER", + recipient: "GRECIPIENT", + tokenAddress: "T1", + ratePerSecond: "10", + depositedAmount: "1000", + startTime: "1622505600", }, query: {}, params: {}, + user: { + publicKey: "GSENDER", + }, }; res = { status: vi.fn().mockReturnThis(), @@ -68,8 +84,8 @@ describe('Stream Controller', () => { }; }); - describe('createStream', () => { - it('should create a stream successfully', async () => { + describe("createStream", () => { + it("should create a stream successfully", async () => { (prisma.stream.upsert as any).mockResolvedValue({ streamId: 123 }); await createStream(req as Request, res as Response); @@ -78,35 +94,60 @@ describe('Stream Controller', () => { expect(prisma.stream.upsert).toHaveBeenCalled(); }); - it('should return 400 for invalid streamId', async () => { - req.body.streamId = 'abc'; + it("should return 400 for invalid streamId", async () => { + req.body.streamId = "abc"; + await createStream(req as Request, res as Response); + expect(res.status).toHaveBeenCalledWith(400); + }); + + it("should return 400 for non-positive ratePerSecond", async () => { + req.body.ratePerSecond = "0"; + await createStream(req as Request, res as Response); + expect(res.status).toHaveBeenCalledWith(400); + }); + + it("should return 400 for malformed numeric fields", async () => { + req.body.ratePerSecond = "abc"; await createStream(req as Request, res as Response); expect(res.status).toHaveBeenCalledWith(400); }); - it('should return 400 for non-positive ratePerSecond', async () => { - req.body.ratePerSecond = '0'; + it("should return 400 when a required numeric field is missing", async () => { + delete req.body.depositedAmount; await createStream(req as Request, res as Response); expect(res.status).toHaveBeenCalledWith(400); }); + + it("should return 403 when caller does not match sender on upsert", async () => { + (req as any).user = { publicKey: "GNOTSENDER" }; + await createStream(req as Request, res as Response); + expect(res.status).toHaveBeenCalledWith(403); + expect(prisma.stream.upsert).not.toHaveBeenCalled(); + }); }); - describe('listStreams', () => { - it('should list streams with pagination', async () => { - req.query = { address: 'GD2XP6FNWL6IWULVMPNA2RV2T7GLCJHK3RH75GBCY7TSVIWDITJN4FXJ', limit: '10', offset: '0' }; + describe("listStreams", () => { + it("should list streams with pagination", async () => { + req.query = { + address: "GD2XP6FNWL6IWULVMPNA2RV2T7GLCJHK3RH75GBCY7TSVIWDITJN4FXJ", + limit: "10", + offset: "0", + }; (prisma.stream.findMany as any).mockResolvedValue([]); (prisma.stream.count as any).mockResolvedValue(0); await listStreams(req as Request, res as Response); expect(res.status).toHaveBeenCalledWith(200); - expect(res.json).toHaveBeenCalledWith(expect.objectContaining({ total: 0 })); + expect(res.json).toHaveBeenCalledWith( + expect.objectContaining({ total: 0 }), + ); }); }); - describe('getStream', () => { - it('should return 404 if stream not found', async () => { - req.params = { streamId: '999' }; + describe("getStream", () => { + it("should return 404 if stream not found", async () => { + req.params = { streamId: "999" }; (prisma.stream.findUnique as any).mockResolvedValue(null); await getStream(req as Request, res as Response); @@ -114,38 +155,60 @@ describe('Stream Controller', () => { expect(res.status).toHaveBeenCalledWith(404); }); - it('should return stream if found', async () => { - req.params = { streamId: '123' }; - (prisma.stream.findUnique as any).mockResolvedValue({ streamId: 123, updatedAt: new Date() }); + it("should return stream if found", async () => { + req.params = { streamId: "123" }; + (prisma.stream.findUnique as any).mockResolvedValue({ + streamId: 123, + updatedAt: new Date(), + }); await getStream(req as Request, res as Response); expect(res.status).toHaveBeenCalledWith(200); - expect(res.json).toHaveBeenCalledWith(expect.objectContaining({ streamId: 123 })); + expect(res.json).toHaveBeenCalledWith( + expect.objectContaining({ streamId: 123 }), + ); }); }); - describe('getStreamClaimableAmount', () => { - it('should return claimable amount', async () => { - req.params = { streamId: '123' }; - (prisma.stream.findUnique as any).mockResolvedValue({ streamId: 123, updatedAt: new Date() }); - (claimableAmountService.getClaimableAmount as any).mockReturnValue({ claimableAmount: '100' }); + describe("getStreamClaimableAmount", () => { + it("should return claimable amount", async () => { + req.params = { streamId: "123" }; + (prisma.stream.findUnique as any).mockResolvedValue({ + streamId: 123, + updatedAt: new Date(), + }); + (claimableAmountService.getClaimableAmount as any).mockReturnValue({ + claimableAmount: "100", + }); await getStreamClaimableAmount(req as Request, res as Response); expect(res.status).toHaveBeenCalledWith(200); - expect(res.json).toHaveBeenCalledWith(expect.objectContaining({ claimableAmount: '100' })); + expect(res.json).toHaveBeenCalledWith( + expect.objectContaining({ claimableAmount: "100" }), + ); }); }); - describe('pauseStream', () => { - it('should pause stream', async () => { - req.params = { streamId: '123' }; - req.body = { secret: 'S123' }; - (req as any).user = { publicKey: 'GUSER1' }; - (prisma.stream.findUnique as any).mockResolvedValue({ streamId: 123, sender: 'GUSER1', isPaused: false, isActive: true }); - (sorobanService.pauseStream as any).mockResolvedValue({ txHash: 'tx123' }); - (prisma.stream.update as any).mockResolvedValue({ streamId: 123, isPaused: true }); + describe("pauseStream", () => { + it("should pause stream", async () => { + req.params = { streamId: "123" }; + req.body = { secret: "S123" }; + (req as any).user = { publicKey: "GUSER1" }; + (prisma.stream.findUnique as any).mockResolvedValue({ + streamId: 123, + sender: "GUSER1", + isPaused: false, + isActive: true, + }); + (sorobanService.pauseStream as any).mockResolvedValue({ + txHash: "tx123", + }); + (prisma.stream.update as any).mockResolvedValue({ + streamId: 123, + isPaused: true, + }); await pauseStream(req as Request, res as Response); @@ -153,14 +216,25 @@ describe('Stream Controller', () => { }); }); - describe('resumeStream', () => { - it('should resume stream', async () => { - req.params = { streamId: '123' }; - req.body = { secret: 'S123' }; - (req as any).user = { publicKey: 'GUSER1' }; - (prisma.stream.findUnique as any).mockResolvedValue({ streamId: 123, sender: 'GUSER1', isPaused: true, isActive: true, pausedAt: Math.floor(Date.now() / 1000) }); - (sorobanService.resumeStream as any).mockResolvedValue({ txHash: 'tx123' }); - (prisma.stream.update as any).mockResolvedValue({ streamId: 123, isPaused: false }); + describe("resumeStream", () => { + it("should resume stream", async () => { + req.params = { streamId: "123" }; + req.body = { secret: "S123" }; + (req as any).user = { publicKey: "GUSER1" }; + (prisma.stream.findUnique as any).mockResolvedValue({ + streamId: 123, + sender: "GUSER1", + isPaused: true, + isActive: true, + pausedAt: Math.floor(Date.now() / 1000), + }); + (sorobanService.resumeStream as any).mockResolvedValue({ + txHash: "tx123", + }); + (prisma.stream.update as any).mockResolvedValue({ + streamId: 123, + isPaused: false, + }); await resumeStream(req as Request, res as Response); diff --git a/backend/tests/stream.test.ts b/backend/tests/stream.test.ts index f5d9903a..cad05ac1 100644 --- a/backend/tests/stream.test.ts +++ b/backend/tests/stream.test.ts @@ -59,7 +59,7 @@ describe('POST /v1/streams', () => { const mockStream = { id: 'uuid-123', streamId: 1, - sender: 'GABC123XYZ456DEF789GHI012JKL345MNO678PQR901STU234VWX567YZA', + sender: 'GTEST_USER_PUBLIC_KEY', recipient: 'GDEF456ABC789GHI012JKL345MNO678PQR901STU234VWX567YZA123BCD', tokenAddress: 'CBCD789EFG012HIJ345KLM678NOP901QRS234TUV567WXY890ZAB123CDE', ratePerSecond: '100', @@ -80,7 +80,7 @@ describe('POST /v1/streams', () => { const validData = { streamId: '1', - sender: 'GABC123XYZ456DEF789GHI012JKL345MNO678PQR901STU234VWX567YZA', + sender: 'GTEST_USER_PUBLIC_KEY', recipient: 'GDEF456ABC789GHI012JKL345MNO678PQR901STU234VWX567YZA123BCD', tokenAddress: 'CBCD789EFG012HIJ345KLM678NOP901QRS234TUV567WXY890ZAB123CDE', ratePerSecond: '100', @@ -101,6 +101,68 @@ describe('POST /v1/streams', () => { }); }); + it('should return 400 for malformed numeric fields', async () => { + const invalidData = { + streamId: '2', + sender: 'GTEST_USER_PUBLIC_KEY', + recipient: 'GDEF456ABC789GHI012JKL345MNO678PQR901STU234VWX567YZA123BCD', + tokenAddress: 'CBCD789EFG012HIJ345KLM678NOP901QRS234TUV567WXY890ZAB123CDE', + ratePerSecond: 'not-a-number', + depositedAmount: 'also-not-a-number', + startTime: '1700000000', + }; + + const response = await request(app) + .post('/v1/streams') + .send(invalidData) + .set('Accept', 'application/json'); + + expect(response.status).toBe(400); + expect(response.body).toHaveProperty('error'); + expect(prisma.stream.upsert).not.toHaveBeenCalled(); + }); + + it('should return 400 when a required numeric field is missing', async () => { + const invalidData = { + streamId: '2', + sender: 'GTEST_USER_PUBLIC_KEY', + recipient: 'GDEF456ABC789GHI012JKL345MNO678PQR901STU234VWX567YZA123BCD', + tokenAddress: 'CBCD789EFG012HIJ345KLM678NOP901QRS234TUV567WXY890ZAB123CDE', + ratePerSecond: '100', + startTime: '1700000000', + }; + + const response = await request(app) + .post('/v1/streams') + .send(invalidData) + .set('Accept', 'application/json'); + + expect(response.status).toBe(400); + expect(response.body).toHaveProperty('error'); + expect(prisma.stream.upsert).not.toHaveBeenCalled(); + }); + + it('should return 403 when an authenticated non-owner attempts to reactivate a stream via upsert', async () => { + const validData = { + streamId: '2', + sender: 'GDIFFERENT_SENDER_PUBLIC_KEY', + recipient: 'GDEF456ABC789GHI012JKL345MNO678PQR901STU234VWX567YZA123BCD', + tokenAddress: 'CBCD789EFG012HIJ345KLM678NOP901QRS234TUV567WXY890ZAB123CDE', + ratePerSecond: '100', + depositedAmount: '86400', + startTime: '1700000000', + }; + + const response = await request(app) + .post('/v1/streams') + .send(validData) + .set('Accept', 'application/json'); + + expect(response.status).toBe(403); + expect(response.body).toHaveProperty('error', 'Forbidden'); + expect(prisma.stream.upsert).not.toHaveBeenCalled(); + }); + it('should return 500 when stream creation fails (DB error)', async () => { (prisma.stream.upsert as ReturnType).mockRejectedValue( new Error('DB connection failed') @@ -108,7 +170,7 @@ describe('POST /v1/streams', () => { const validData = { streamId: '2', - sender: 'GABC123XYZ456DEF789GHI012JKL345MNO678PQR901STU234VWX567YZA', + sender: 'GTEST_USER_PUBLIC_KEY', recipient: 'GDEF456ABC789GHI012JKL345MNO678PQR901STU234VWX567YZA123BCD', tokenAddress: 'CBCD789EFG012HIJ345KLM678NOP901QRS234TUV567WXY890ZAB123CDE', ratePerSecond: '100', @@ -174,9 +236,9 @@ describe('GET /v1/users/:address/summary', () => { it('returns accurate outgoing/incoming aggregates and claimable sum', async () => { vi.mocked(prisma.stream.findMany) .mockResolvedValueOnce([ - { - id: '1', - createdAt: new Date(), + { + id: '1', + createdAt: new Date(), updatedAt: new Date(), streamId: 1, sender: 'GSENDER', @@ -184,18 +246,18 @@ describe('GET /v1/users/:address/summary', () => { tokenAddress: 'TOKEN', ratePerSecond: '10', depositedAmount: '100', - withdrawnAmount: '30', + withdrawnAmount: '30', startTime: 1000, lastUpdateTime: 2000, isPaused: false, endTime: null, pausedAt: null, totalPausedDuration: 0, - isActive: true + isActive: true }, - { - id: '2', - createdAt: new Date(), + { + id: '2', + createdAt: new Date(), updatedAt: new Date(), streamId: 2, sender: 'GSENDER2', @@ -203,14 +265,14 @@ describe('GET /v1/users/:address/summary', () => { tokenAddress: 'TOKEN2', ratePerSecond: '20', depositedAmount: '200', - withdrawnAmount: '20', + withdrawnAmount: '20', startTime: 1000, lastUpdateTime: 2000, isPaused: false, endTime: null, pausedAt: null, totalPausedDuration: 0, - isActive: false + isActive: false }, ]) .mockResolvedValueOnce([ @@ -271,7 +333,7 @@ describe('GET /v1/users/:address/summary', () => { it('caches summary results for repeated requests within TTL', async () => { vi.mocked(prisma.stream.findMany) - .mockResolvedValueOnce([{ + .mockResolvedValueOnce([{ id: '5', createdAt: new Date(), updatedAt: new Date(), @@ -288,7 +350,7 @@ describe('GET /v1/users/:address/summary', () => { endTime: null, pausedAt: null, totalPausedDuration: 0, - isActive: true + isActive: true }]) .mockResolvedValueOnce([]); diff --git a/contracts/stream_contract/src/errors.rs b/contracts/stream_contract/src/errors.rs index 65567860..7d471c2e 100644 --- a/contracts/stream_contract/src/errors.rs +++ b/contracts/stream_contract/src/errors.rs @@ -29,4 +29,6 @@ pub enum StreamError { InvalidTokenAddress = 10, /// `amount / duration` rounds to zero — the stream would lock tokens but never accrue. InvalidRate = 11, + /// Operation requires an active stream, but the stream is currently paused. + StreamPaused = 12, } diff --git a/contracts/stream_contract/src/lib.rs b/contracts/stream_contract/src/lib.rs index d0d2dd5d..960feffe 100644 --- a/contracts/stream_contract/src/lib.rs +++ b/contracts/stream_contract/src/lib.rs @@ -431,7 +431,7 @@ impl StreamContract { // Validate stream is active and not paused Self::validate_stream_active(&stream)?; if stream.paused { - return Err(StreamError::StreamInactive); + return Err(StreamError::StreamPaused); } let now = env.ledger().timestamp(); diff --git a/contracts/stream_contract/src/test.rs b/contracts/stream_contract/src/test.rs index 9c20d170..afb15ca1 100644 --- a/contracts/stream_contract/src/test.rs +++ b/contracts/stream_contract/src/test.rs @@ -1568,7 +1568,7 @@ fn test_withdraw_on_paused_stream_fails() { assert_eq!( client.try_withdraw(&recipient, &id), - Err(Ok(StreamError::StreamInactive)) + Err(Ok(StreamError::StreamPaused)) ); } @@ -2206,7 +2206,7 @@ fn test_withdraw_on_paused_stream_returns_stream_inactive() { // Withdraw must be rejected while paused. let result = client.try_withdraw(&recipient, &id); - assert_eq!(result, Err(Ok(StreamError::StreamInactive))); + assert_eq!(result, Err(Ok(StreamError::StreamPaused))); } #[test] diff --git a/frontend/src/components/dashboard/dashboard-view.tsx b/frontend/src/components/dashboard/dashboard-view.tsx index 3bbc810c..e0a921e3 100644 --- a/frontend/src/components/dashboard/dashboard-view.tsx +++ b/frontend/src/components/dashboard/dashboard-view.tsx @@ -15,7 +15,6 @@ import toast from "react-hot-toast"; * - Error state: "Failed to load streams" with a retry button */ -import { Skeleton } from "@/components/ui/Skeleton"; import { getDashboardAnalytics, fetchDashboardData, @@ -108,8 +107,11 @@ const SIDEBAR_ITEMS: SidebarItem[] = [ /** Shimmer card used as a placeholder while data loads */ function SkeletonCard({ className = "" }: { className?: string }) { return ( -