Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions backend/prisma/migrations/20260221132622_init/migration.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
-- CreateTable
CREATE TABLE "User" (
"id" TEXT NOT NULL PRIMARY KEY,
"id" TEXT NOT NULL,
"publicKey" TEXT NOT NULL,
"createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" DATETIME NOT NULL
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,

CONSTRAINT "User_pkey" PRIMARY KEY ("id"),
CONSTRAINT "User_publicKey_key" UNIQUE ("publicKey")
);

-- CreateTable
CREATE TABLE "Stream" (
"id" TEXT NOT NULL PRIMARY KEY,
"id" TEXT NOT NULL,
"streamId" INTEGER NOT NULL,
"sender" TEXT NOT NULL,
"recipient" TEXT NOT NULL,
Expand All @@ -19,35 +22,34 @@ CREATE TABLE "Stream" (
"startTime" INTEGER NOT NULL,
"lastUpdateTime" INTEGER NOT NULL,
"isActive" BOOLEAN NOT NULL DEFAULT true,
"createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" DATETIME NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,

CONSTRAINT "Stream_pkey" PRIMARY KEY ("id"),
CONSTRAINT "Stream_streamId_key" UNIQUE ("streamId"),
CONSTRAINT "Stream_sender_fkey" FOREIGN KEY ("sender") REFERENCES "User" ("publicKey") ON DELETE RESTRICT ON UPDATE CASCADE,
CONSTRAINT "Stream_recipient_fkey" FOREIGN KEY ("recipient") REFERENCES "User" ("publicKey") ON DELETE RESTRICT ON UPDATE CASCADE
);

-- CreateTable
CREATE TABLE "StreamEvent" (
"id" TEXT NOT NULL PRIMARY KEY,
"id" TEXT NOT NULL,
"streamId" INTEGER NOT NULL,
"eventType" TEXT NOT NULL,
"amount" TEXT,
"transactionHash" TEXT NOT NULL,
"ledgerSequence" INTEGER NOT NULL,
"timestamp" INTEGER NOT NULL,
"metadata" TEXT,
"createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,

CONSTRAINT "StreamEvent_pkey" PRIMARY KEY ("id"),
CONSTRAINT "StreamEvent_streamId_fkey" FOREIGN KEY ("streamId") REFERENCES "Stream" ("streamId") ON DELETE RESTRICT ON UPDATE CASCADE
);

-- CreateIndex
CREATE UNIQUE INDEX "User_publicKey_key" ON "User"("publicKey");

-- CreateIndex
CREATE INDEX "User_publicKey_idx" ON "User"("publicKey");

-- CreateIndex
CREATE UNIQUE INDEX "Stream_streamId_key" ON "Stream"("streamId");

-- CreateIndex
CREATE INDEX "Stream_sender_idx" ON "Stream"("sender");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- AlterTable
ALTER TABLE "Stream" ADD COLUMN "endTime" BIGINT,
ALTER COLUMN "startTime" SET DATA TYPE BIGINT,
ALTER COLUMN "lastUpdateTime" SET DATA TYPE BIGINT,
ALTER COLUMN "pausedAt" SET DATA TYPE BIGINT;

-- AlterTable
ALTER TABLE "StreamEvent" ALTER COLUMN "timestamp" SET DATA TYPE BIGINT;

-- CreateIndex
CREATE INDEX "StreamEvent_createdAt_idx" ON "StreamEvent"("createdAt");

-- CreateIndex
CREATE INDEX "StreamEvent_streamId_createdAt_idx" ON "StreamEvent"("streamId", "createdAt");
10 changes: 5 additions & 5 deletions backend/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ model Stream {
ratePerSecond String // Rate as string to preserve precision (i128)
depositedAmount String // Total deposited amount (i128)
withdrawnAmount String // Total withdrawn amount (i128)
startTime Int // Unix timestamp when stream started
lastUpdateTime Int // Unix timestamp of last update
endTime Int? // Unix timestamp when stream ends
startTime BigInt // Unix timestamp when stream started
lastUpdateTime BigInt // Unix timestamp of last update
endTime BigInt? // Unix timestamp when stream ends
isActive Boolean @default(true)
isPaused Boolean @default(false)
pausedAt Int? // Unix timestamp when paused
pausedAt BigInt? // Unix timestamp when paused
totalPausedDuration Int @default(0) // Accumulated paused duration in seconds
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
Expand Down Expand Up @@ -72,7 +72,7 @@ model StreamEvent {
amount String? // Amount involved in the event (for top-ups, withdrawals)
transactionHash String // Stellar transaction hash
ledgerSequence Int // Ledger sequence number
timestamp Int // Unix timestamp
timestamp BigInt // Unix timestamp
metadata String? // JSON string for additional event data
createdAt DateTime @default(now())

Expand Down
6 changes: 6 additions & 0 deletions backend/src/app.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import express, { type Request, type Response, type NextFunction } from 'express';
import cors from 'cors';

// Globally handle BigInt serialization in JSON responses
(BigInt.prototype as any).toJSON = function () {
return Number(this);
};

import swaggerUi from 'swagger-ui-express';
import { swaggerSpec } from './config/swagger.js';
import { apiVersionMiddleware, type VersionedRequest } from './middleware/api-version.middleware.js';
Expand Down
48 changes: 31 additions & 17 deletions backend/src/controllers/stream.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ export const createStream = async (req: Request, res: Response) => {
const { streamId, sender, recipient, tokenAddress, ratePerSecond, depositedAmount, startTime } = req.body;

const parsedStreamId = Number.parseInt(streamId, 10);
const parsedStartTime = Number.parseInt(startTime, 10);
const parsedStartTime = BigInt(startTime);
const parsedRatePerSecond = BigInt(ratePerSecond);
const parsedDepositedAmount = BigInt(depositedAmount);

if (!Number.isFinite(parsedStreamId)) {
return res.status(400).json({ error: 'Invalid streamId: must be a valid integer' });
}

if (!Number.isFinite(parsedStartTime) || parsedStartTime < 0) {
if (parsedStartTime < 0n) {
return res.status(400).json({ error: 'Invalid startTime: must be a non-negative integer' });
}

Expand All @@ -89,13 +89,13 @@ export const createStream = async (req: Request, res: Response) => {
return res.status(400).json({ error: 'Invalid depositedAmount: must be greater than zero' });
}

const endTime = parsedStartTime + Number(parsedDepositedAmount / parsedRatePerSecond);
const endTime = parsedStartTime + (parsedDepositedAmount / parsedRatePerSecond);

const stream = await prisma.stream.upsert({
where: { streamId: parsedStreamId },
update: {
isActive: true,
lastUpdateTime: Math.floor(Date.now() / 1000)
lastUpdateTime: BigInt(Math.floor(Date.now() / 1000))
},
create: {
streamId: parsedStreamId,
Expand Down Expand Up @@ -632,21 +632,28 @@ export const pauseStream = async (req: Request, res: Response) => {
where: { streamId: parsedStreamId },
data: {
isPaused: true,
pausedAt: now,
lastUpdateTime: now,
pausedAt: BigInt(now),
lastUpdateTime: BigInt(now),
},
});

// Create a PAUSED event
await prisma.streamEvent.create({
data: {
// Create or update a PAUSED event
await prisma.streamEvent.upsert({
where: {
transactionHash_eventType: {
transactionHash: result.txHash,
eventType: 'PAUSED',
},
},
create: {
streamId: parsedStreamId,
eventType: 'PAUSED',
transactionHash: result.txHash,
ledgerSequence: 0, // Will be updated by event indexer
timestamp: now,
timestamp: BigInt(now),
metadata: JSON.stringify({ pausedBy: authReq.user.publicKey }),
},
update: {},
});

logger.info(`Stream ${parsedStreamId} paused by ${authReq.user.publicKey}`);
Expand Down Expand Up @@ -722,8 +729,8 @@ export const resumeStream = async (req: Request, res: Response) => {

// Calculate pause duration and update the database
const now = Math.floor(Date.now() / 1000);
const pausedAt = stream.pausedAt ?? now;
const pauseDuration = Math.max(0, now - pausedAt);
const pausedAt = stream.pausedAt ?? BigInt(now);
const pauseDuration = Math.max(0, now - Number(pausedAt));
const totalPausedDuration = (stream.totalPausedDuration ?? 0) + pauseDuration;

const updatedStream = await prisma.stream.update({
Expand All @@ -732,23 +739,30 @@ export const resumeStream = async (req: Request, res: Response) => {
isPaused: false,
pausedAt: null,
totalPausedDuration,
lastUpdateTime: now,
lastUpdateTime: BigInt(now),
},
});

// Create a RESUMED event
await prisma.streamEvent.create({
data: {
// Create or update a RESUMED event
await prisma.streamEvent.upsert({
where: {
transactionHash_eventType: {
transactionHash: result.txHash,
eventType: 'RESUMED',
},
},
create: {
streamId: parsedStreamId,
eventType: 'RESUMED',
transactionHash: result.txHash,
ledgerSequence: 0, // Will be updated by event indexer
timestamp: now,
timestamp: BigInt(now),
metadata: JSON.stringify({
resumedBy: authReq.user.publicKey,
pauseDuration,
}),
},
update: {},
});

logger.info(`Stream ${parsedStreamId} resumed by ${authReq.user.publicKey}`);
Expand Down
17 changes: 12 additions & 5 deletions backend/src/routes/v1/streams/withdraw.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,22 +117,29 @@ export const withdrawHandler = async (req: AuthenticatedRequest, res: Response)
where: { streamId: parsedStreamId },
data: {
withdrawnAmount: nextWithdrawnAmount,
lastUpdateTime: now,
lastUpdateTime: BigInt(now),
isActive: isCompleted ? false : stream.isActive,
},
});

// Create a WITHDRAWN event
await prisma.streamEvent.create({
data: {
// Create or update a WITHDRAWN event
await prisma.streamEvent.upsert({
where: {
transactionHash_eventType: {
transactionHash: result.txHash,
eventType: 'WITHDRAWN',
},
},
create: {
streamId: parsedStreamId,
eventType: 'WITHDRAWN',
amount: claimable.claimableAmount,
transactionHash: result.txHash,
ledgerSequence: 0,
timestamp: now,
timestamp: BigInt(now),
metadata: JSON.stringify({ withdrawnBy: req.user.publicKey }),
},
update: {},
});

logger.info(`Stream ${parsedStreamId} withdrawn by ${req.user.publicKey}`);
Expand Down
18 changes: 9 additions & 9 deletions backend/src/services/claimable.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ export interface ClaimableStreamState {
ratePerSecond: string;
depositedAmount: string;
withdrawnAmount: string;
startTime: number;
lastUpdateTime: number;
startTime: bigint | number;
lastUpdateTime: bigint | number;
isActive: boolean;
isPaused: boolean;
pausedAt: number | null;
pausedAt: bigint | number | null;
totalPausedDuration: number;
updatedAt?: Date;
}
Expand Down Expand Up @@ -62,11 +62,11 @@ function getStateFingerprint(stream: ClaimableStreamState): string {
stream.ratePerSecond,
stream.depositedAmount,
stream.withdrawnAmount,
stream.startTime,
stream.lastUpdateTime,
stream.startTime.toString(),
stream.lastUpdateTime.toString(),
stream.isActive ? '1' : '0',
stream.isPaused ? '1' : '0',
stream.pausedAt ?? 'null',
stream.pausedAt?.toString() ?? 'null',
stream.totalPausedDuration,
].join(':');

Expand Down Expand Up @@ -114,14 +114,14 @@ export class ClaimableAmountService {
};
}

const anchorTime = BigInt(Math.max(0, stream.lastUpdateTime));
const nowTs = BigInt(Math.max(0, calculatedAt));
const anchorTime = BigInt(stream.lastUpdateTime) > 0n ? BigInt(stream.lastUpdateTime) : 0n;
const nowTs = BigInt(calculatedAt) > 0n ? BigInt(calculatedAt) : 0n;
let elapsed = nowTs > anchorTime ? nowTs - anchorTime : 0n;

// Paused duration is handled by the contract updating lastUpdateTime on resume,
// but we still account for it if it's currently paused.
if (stream.isPaused && stream.pausedAt !== null) {
const currentPauseStart = BigInt(Math.max(0, stream.pausedAt));
const currentPauseStart = BigInt(stream.pausedAt) > 0n ? BigInt(stream.pausedAt) : 0n;
if (nowTs > currentPauseStart) {
const currentPauseDuration = nowTs - currentPauseStart;
elapsed = elapsed > currentPauseDuration ? elapsed - currentPauseDuration : 0n;
Expand Down
Loading
Loading