A resilient, drift-free unified ingestion pipeline (Problem 1) and revenue metrics service (Problem 2) built using Express, Prisma, and PostgreSQL.
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Render cron jobs β
β cron-fetch (*/15 * * * *) β
β cron-process (*/5 * * * *) β
β cron-daily-summary (0 9 * * *) β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
ββββββββββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββ
β ConnectorFactory ββββΆβ ProducerJob ββββΆβ ingest_outbox β
β Stripe / HubSpot / β β fetch β enqueueβ β (Postgres) β
β GCal β β cursor advance β ββββββββββββββββββββ
ββββββββββββββββββββββ βββββββββββββββββββ β
β² β βΌ
β βββββββββββββ΄βββββββββ ββββββββββββββββββββ
StaleCursorβ β β β OutboxProcessor β
recovery β βΌ β β normalize β
β CursorService β β upsert β
β RunReportService β β β DLQ on poison β
β NotifierService β ββββββββββββββββββββ
β β
β βΌ
β ββββββββββββββββββββ
β β payments β
ββββββββββββββββββββββββββββββββββββ contacts β
β events β
β (Supabase pg) β
ββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββ
β Express HTTP API β
β POST /trigger/:source/:mode (admin)β
β POST /webhooks/stripe β
β GET /runs, GET /runs/:runId β
β GET /metrics/revenue/summary β
β GET /metrics/revenue/daily β
β GET /metrics/revenue/weekly β
β GET /healthz, /readyz β
βββββββββββββββββββββββββββββββββββββββ
Instead of running database-level date groupings (like DATE_TRUNC which varies across PostgreSQL, SQLite, and MySQL and is highly timezone-sensitive), we fetch:
- The absolute total sum and count from the database using aggregate tools (
_sumand_count). - The individual payment rows matching the exact filters.
- We perform the grouping (daily/weekly) in-memory in TypeScript, and assert that the sum of the breakdown buckets exactly matches the database aggregate total.
- If any drift is detected (e.g. concurrent inserts altering stats during querying), the service throws a
Critical Metric Drift Detectederror instead of returning mismatched numbers.
Fetching from external APIs (Stripe/Hubspot/GCal) is slow and network-unstable. We never publish straight to message queues or final entities during a fetch loop. Instead, we write raw payloads directly to the database in a sync_ingest_outbox table first. Even if RabbitMQ or the downstream networks are completely down, the ingestion succeeds, and the processing is retried safely.
We advance sync cursors only after the outbox enqueue transaction successfully resolves in PostgreSQL. If the process crashes during a fetch, the cursor is not advanced, and the restart will fetch the page again. Duplicates are filtered at the outbox insert layer using skipDuplicates: true on a unique index.
If external APIs fail or are rate-limited, the system status is reported as degraded, but the HTTP server's /readyz endpoint still returns 200 OK as long as the core database is accessible. Returning a 503 for external sync errors causes deployment platforms (like Render or Kubernetes) to restart the container, converting a third-party API outage into local API downtime.
To run live API checks using Postman:
- Import the Collection: In Postman, click Import and paste the raw URL:
https://raw.githubusercontent.com/kunalrawat425/backend-assignment/master/postman/buffalo.postman_collection.json(or select the local file postman/buffalo.postman_collection.json from your cloned project directory). - Configure Variables:
Under the Collection settings (Variables tab), set the following variables:
BASE_URL: Server endpoint (e.g.http://localhost:3000for local orhttps://backend-assignment-7in3.onrender.comfor your Render live URL).API_KEY: Set tof5d96a7ebcd7fbe4f691c28c894d0a1b(or your production API key).ADMIN_API_KEY: Set to9a7c3b2f5d1e4c7b8e0a1f2c3d4e5f6a(or your production Admin key).
- Run Requests: You can run individual routes or use the Collection Runner to sequentially test liveness, readiness, sync triggers, and drift-free revenue views.
- Node.js:
v20.xor higher - pnpm:
v9.x(preferred) or npm - PostgreSQL: Local Docker instance or Supabase database URL
- Clone the repository and install dependencies:
pnpm install
- Configure environment variables:
Create a
.envfile by copying the template:Opencp .env.example .env
.envand fill in the PostgreSQL connection string, Stripe keys, and API tokens. - Generate Prisma client bindings:
pnpm db:generate
- Deploy database migrations:
pnpm db:migrate:dev
- Start PostgreSQL database (Optional):
If you don't have an external Postgres database, start one via Docker:
docker compose up -d
- Run in development mode (watcher):
pnpm dev
- Build TypeScript source to javascript:
pnpm build
- Start production build locally:
pnpm start
- Run all tests (Unit & Integration):
pnpm test - Run integration tests specifically:
pnpm test:integration
- Run the automated curl test script (verifies live endpoints sequential):
npx tsx run-curl-tests.ts
- Run the E2E verification scenario (runs full happy-path + mock syncs):
npx tsx run-e2e-scenario.ts
- Run brutal chaos scenarios (simulates DB disconnects, double locks, and DLQ drops):
npx tsx run-brutal-scenarios.ts
- Enforce Single Source of Truth static analysis check:
pnpm check:single-revenue
All endpoints (except /healthz and /readyz) require authentication headers:
- Admin endpoints require:
X-Admin-Api-Key: <ADMIN_API_KEY> - General endpoints require:
X-Api-Key: <API_KEY> - Mutating POST requests require:
Idempotency-Key: <ANY_UNIQUE_STRING>
All three revenue views always agree. The daily and weekly responses return the global totalRevenueCents aggregate so the client always has a single source of truth.
Returns the total accumulated revenue sum and count of collected payments.
- Query Params:
startDate(optional, YYYY-MM-DD)endDate(optional, YYYY-MM-DD)source(optional, e.g.stripeorhubspot)
- Request:
curl "http://localhost:3000/metrics/revenue/summary?startDate=2026-06-15&endDate=2026-06-18" \ -H "X-Api-Key: f5d96a7ebcd7fbe4f691c28c894d0a1b"
- Response:
{ "totalRevenueCents": "82800", "currency": "USD", "count": 9, "startDate": "2026-06-15", "endDate": "2026-06-18", "source": null }
Returns daily aggregated revenue buckets alongside the global aggregate total.
- Request:
curl "http://localhost:3000/metrics/revenue/daily" \ -H "X-Api-Key: f5d96a7ebcd7fbe4f691c28c894d0a1b"
- Response:
{ "totalRevenueCents": "82800", "currency": "USD", "breakdown": [ { "date": "2026-06-15", "amountCents": "54900", "count": 2 }, { "date": "2026-06-16", "amountCents": "9900", "count": 1 }, { "date": "2026-06-17", "amountCents": "18000", "count": 6 } ], "startDate": null, "endDate": null, "source": null }
Returns weekly aggregated revenue buckets (grouped by the start of the week, Monday) alongside the global aggregate total.
- Request:
curl "http://localhost:3000/metrics/revenue/weekly" \ -H "X-Api-Key: f5d96a7ebcd7fbe4f691c28c894d0a1b"
- Response:
{ "totalRevenueCents": "82800", "currency": "USD", "breakdown": [ { "weekStartDate": "2026-06-15", "amountCents": "82800", "count": 9 } ], "startDate": null, "endDate": null, "source": null }
Indicates if the Node.js process is alive.
- Response:
{ "status": "ok", "uptimeS": 120 }
Verifies database health and logs the status of the sync run history.
- Response:
{ "status": "ok", "syncStatus": "healthy", "checks": { "db": { "ok": true, "latencyMs": 14 }, "stripe": { "ok": true, "lastSync": "2026-06-17T16:08:00Z" } }, "uptimeS": 120 }
Triggers a background ingest job for a specific source (stripe, hubspot, gcal). Mode can be incremental or full.
- Request:
curl -X POST "http://localhost:3000/trigger/stripe/incremental" \ -H "X-Admin-Api-Key: 9a7c3b2f5d1e4c7b8e0a1f2c3d4e5f6a" \ -H "Idempotency-Key: my-unique-uuid-key"
- Response:
{ "runId": "7bc36996-fc4d-499e-80d3-ab9527efd92e", "source": "stripe", "mode": "incremental", "status": "accepted" }
If a cursor becomes stale (e.g. Stripe API credentials change, Google Calendar Sync Channel expires with a 410 Gone, or HubSpot cursor formatting changes), the connector throws a StaleCursorError. The producer catches this, marks the sync report as stale, resets the cursor state, and executes a full backfill sync immediately to restore data consistency.
Advisory locks via PostgreSQL pg_try_advisory_xact_lock are acquired on a per-source, per-entity level inside the transaction. If two cron instances or API calls fire simultaneously, the second one fails to acquire the advisory lock and immediately exits gracefully, preventing duplicate database writes and CPU spikes.
Prisma queries are wrapped in a retry handler (withDbRetry) that intercepts transient errors (like P1001 target database connection timeout, socket hang-ups, or database pools saturated) and retries them up to 5 times with exponential backoff before failing the job.
If an individual raw payload fails to normalize repeatedly (e.g. because HubSpot returned a contact payload with corrupted phone formats that fail Zod validation), the processor increments its attempt counter. Once it hits 5 failures, it shifts the payload to sync_dlq_log along with the exact validation error message, and marks the outbox status as FAILED, unblocking the rest of the queue.
All raw status names from external systems (such as requires_capture, partially_refunded, etc.) default to UNKNOWN if they are not explicitly present in the mapped status allow-list. Mappings must be updated inside the code to count them as collected revenue, ensuring no accidental numbers contaminate the metric aggregates.
When job:process is processing a batch of records:
- Advisory Claiming: The script claims rows using
SELECT ... FOR UPDATE SKIP LOCKEDinside a PostgreSQL transaction. - Crash Scenario (SIGKILL, Out-of-Memory, Server Panic):
- State in Outbox: If the process crashes mid-batch, the database connection is abruptly severed. PostgreSQL detects the dead session and automatically releases the row locks. Since the rows were never marked
consumed(which only happens at the very end of the batch transaction), their statuses remainpending. - State in Target Tables: Some destination rows (e.g.
payments) might have already been updated in the DB before the crash. - Re-run Behavior: When the consumer script runs again, it claims the exact same
pendingrows. - Idempotency Guarantee: Because all database writes are implemented as Idempotent Upserts (e.g.,
ON CONFLICT (source, externalId) DO UPDATE), reprocessing a partially written batch simply overwrites the records with the exact same data, avoiding double-insertions or duplicate revenue tracking.
- State in Outbox: If the process crashes mid-batch, the database connection is abruptly severed. PostgreSQL detects the dead session and automatically releases the row locks. Since the rows were never marked
When job:fetch is fetching paginated records from Stripe, HubSpot, or Google Calendar:
- Paging Mechanics: The job fetches page
Nfrom the external API, enqueues the records tosync_ingest_outbox, and then saves thenext_cursorto thesync_cursortable. - Crash Scenario: The job crashes mid-way through fetching a page (e.g., local server loses internet connection).
- State in Database: The database rolls back the active transaction. The cursor is not advanced to
next_cursor. - Re-run Behavior: Upon restart, the producer reads the old cursor and re-requests page
N. - Outbox Deduping: Since the outbox has a unique index on
(source, entity, external_id), theinsertManyquery usesskipDuplicates: trueand ignores the records that were already enqueued. No duplicate payloads accumulate in the outbox.
- State in Database: The database rolls back the active transaction. The cursor is not advanced to
- The Failure: The database connection drops in the middle of a transaction, comes back for 2 seconds, and drops again.
- System Behavior: Instead of failing immediately, the system wraps all database operations in a retry runner
withDbRetrythat handles transient Prisma errors (P1001,P2024,ECONNRESET). It retries up to 5 times using exponential backoff with jitter. If the database remains offline after 5 retries, the job terminates cleanly, preserving transaction boundaries.
- The Failure: Stripe or HubSpot changes its payload schema unexpectedly, or sends corrupted data that fails Zod validation inside our normalizers.
- System Behavior:
- The consumer process attempts to process the item.
- The normalizer throws a Zod validation error, and the transaction rolls back.
- The consumer increments the
attemptscounter for that specific outbox row in the database. - On retry, if it fails again, it repeats.
- Once the attempt counter hits 5, the processor automatically catches the error, writes a detailed error report to the
sync_dlq_log(Dead Letter Queue) table along with the full raw payload, and marks the outbox status asFAILED. - The queue remains unblocked, and subsequent healthy payloads are processed.
- The Failure: The database server runs out of disk space, making all inserts fail.
- System Behavior: Both the ingestion (
job:fetch) and processing (job:process) scripts crash immediately because they cannot write cursors or outbox statuses. - Recovery: Since all writes are transaction-safe, no partial or corrupted states are saved. Once disk space is freed, running the scripts resumes work from the last healthy cursors without any manual data cleaning required.
We have mapped the integration files, components, and key configurations for each external vendor:
- Environment Variables (
.env):STRIPE_ENABLED: Enables or disables the Stripe connector.STRIPE_API_KEY: Restricted Stripe API Key formatsk_test_...(Bearer token authentication).STRIPE_WEBHOOK_SECRET: Webhook signing secretwhsec_...(hmac signature verification).
- Authentication Method: Token-based bearer authentication via the official Stripe Node library.
- Component Map:
- Connector Class: stripe.connector.ts (fetches incremental payments using
createdrange filters, and full dumps). - Normalizer Class: stripe.normalizer.ts (converts charges and refunds to unified payment models).
- Status Map: stripe.map.ts & stripe.refund.map.ts (maps raw payment status fields like
succeededto unifiedCOLLECTED). - Webhook Ingress: webhook.controller.ts (routes incoming webhooks to transaction-safe outbox slots).
- Connector Class: stripe.connector.ts (fetches incremental payments using
- Environment Variables (
.env):HUBSPOT_ENABLED: Enables or disables the HubSpot connector.HUBSPOT_ACCESS_TOKEN: Private App Access Token formatpat-na1-...orpat-na2-...(Bearer authentication).
- Authentication Method: Private App token authentication using standard HTTP headers against the HubSpot CRM API v3.
- Component Map:
- Connector Class: hubspot.connector.ts (polls CRM deals/contacts sequentially, querying for updates since the last cursor timestamp).
- Normalizer Class: hubspot.normalizer.ts (converts CRM Deal properties to
UnifiedPaymentand Contacts toUnifiedContact). - Status Map: hubspot.map.ts (maps CRM pipeline stages like
closedwonorcompletedto unifiedCOLLECTEDstatus).
- Environment Variables (
.env):GCAL_ENABLED: Enables or disables the Google Calendar connector.GOOGLE_CLIENT_EMAIL: Service Account Email address (e.g.[email protected]).GOOGLE_PRIVATE_KEY: Service Account RSA PEM Private Key contents.GOOGLE_CALENDAR_ID: Target calendar identifier (defaults toprimary).
- Authentication Method: JWT oauth2 client auth using Google Node APIs library (
googleapis). - Component Map:
- Connector Class: gcal.connector.ts (performs incremental listing using resource
syncTokencursors, and full listing using pagination parameters). - Normalizer Class: gcal.normalizer.ts (maps calendar events to the
UnifiedEventmodel). - Status Map: None (Google Calendar processes events, which do not contribute directly to payment metrics).
- Connector Class: gcal.connector.ts (performs incremental listing using resource
What happens when parts of the system go down?
| Down Component | Ingestion Sync Impact | Revenue Metrics Impact | Recovery Mechanism |
|---|---|---|---|
| Upstream APIs (Stripe, HubSpot, GCal) | Isolated Failure: Failed sync reports are created. Healthy APIs continue running. | None: Cached and existing database metrics are still fully readable. | Cursors are NOT advanced. The pipeline retries on the next cron run using the last successful cursor state. |
| Database (PostgreSQL/Supabase) | Paused Sync: Ingestion and Outbox processing pause safely. | Service Outage: HTTP metrics API calls fail with a 500 error. |
withDbRetry retries queries with exponential backoff. Ingestion resumes from last cursors when DB recovers. |
| Outbox Consumer (Processor Job) | Data Buffered: Fetches write raw events to sync_ingest_outbox successfully but final tables don't update. |
Stale Metrics: Metrics remain accessible but do not reflect newly ingested data. | Queue drains sequentially on processor restart. Atomic transactions ensure no lost messages. |
| Express HTTP Server (Web App Container) | None: Scheduled ingestion fetches and processing jobs run via CLI cron tasks (job:fetch / job:process) independently. |
API Outage: Clients cannot retrieve metrics via HTTP requests. | Cron scripts continue running in isolated processes; HTTP server auto-restarts via Render health check. |
- Reliability: Decouples network API requests from database processing using the Transactional Outbox Pattern. This completely eliminates the "dual-write" problem where a database update succeeds but queue publication fails.
- Code Flexibility: Employs the Strategy Pattern via
ConnectorFactoryandBaseConnector. Integrating a new payment provider (e.g. PayPal) only requires subclassingBaseConnectorand adding its custom schema mapper. - Fault Tolerance: Automatic fallback to Full Backfill if a cursor is invalidated (e.g.
StaleCursorError). Bad/malformed payloads are automatically isolated insync_dlq_logafter 5 failed attempts, preventing queue blocks.
- Reliability: Exposes a unified query builder (
computeRevenue) as the Single Source of Truth (SSOT). Daily, weekly, and summary metrics run the same query constraints, ensuring they never drift. - Code Flexibility: Aggregations are calculated in-memory rather than relying on complex SQL functions. This keeps the service database-engine agnostic (run testing on SQLite, production on PostgreSQL/Supabase).
- Fault Tolerance: Unmapped payment statuses default to
UNKNOWNinstead of throwing errors. They are logged as warnings and omitted from revenue calculations until explicitly mapped, preventing accounting leakages.
We maintain a rigorous multi-tier testing strategy ensuring system reliability under brutal workloads:
- Memory Exhaustion (OOM): Pulling massive datasets from upstream APIs into large in-memory arrays during deep sync backfills.
- Network & Connection Flapping: Socket timeouts, rate limit blocks, or transient TCP packet drops while connecting to Stripe, HubSpot, Google Calendar, or the Supabase database.
- Concurrency Lockups & Race Conditions: Multiple workers writing/updating the same entity tables concurrently, leading to database deadlocks.
- Validation Failures & Schema Drift: Upstream APIs releasing unexpected payload shifts, causing JSON schema validations (Zod) to crash standard processors.
We run automated scripts that intentionally trigger crashes and verify recovery:
- Database Connection Flapping Check: Simulates packet loss during database queries. Asserts that
withDbRetryintercepts the exception, executes exponential backoff, and resolves the query successfully on the 3rd attempt without crashing. - Advisory Lock Double-Firing Check: Simulates two concurrent transactions attempting to acquire the same Postgres advisory lock for
stripe:payments. Asserts that the second transaction fails to acquire the lock immediately, preventing double syncs. - Poison Pill Ingestion Check: Inserts an invalid payment payload (unsupported currency 'EUR') with attempts set to 4. Asserts that the consumer catches the failure on the 5th attempt, writes a detailed error report to the
sync_dlq_log(DLQ) table, and marks the outbox status asFAILEDto unblock other messages.
- Batching and Pagination: We implement pagination with a page size of 50 to prevent OOM errors when processing millions of Stripe/Hubspot rows.
- Outbox Claim Limits: The Outbox consumer claims entries in batches of
OUTBOX_BATCH_SIZE = 50inside a loop instead of loading all pending rows, keeping the memory footprint constant. - Database Connection Pooling: Prisma Client is configured with direct connection limits (
connection_limit=5) to prevent connection exhaustion. - Throttling: We implement Bottleneck rate-limiting queues to stay compliant with upstream API thresholds.
- Precision Loss on Large Integers: If HubSpot deal values or Stripe charge amounts exceed JavaScript's max safe integer, precision loss occurs. We use
BigInt(mapped to Postgresbigintand serialized safely as strings in API JSON) to keep exact cents accuracy. - Timezone discrepancies: If a third-party source sends timestamps without explicit offsets or local timezone formatting, daily/weekly aggregations could result in a 1-day metric shift. The system parses all ISO dates using strict UTC bounds (
toISOString().split('T')[0]) to maintain timezone coherence across machines.
- PostgreSQL Outbox instead of RabbitMQ: RabbitMQ introduces broker downtime, message lost on unacknowledged connections, and out-of-order writes. Implementing an outbox table in Postgres allows the ingestion queue to share the same atomic transaction context as target tables.
- Static Analysis CI Guard (
check-single-revenue-impl.sh): We built a custom shell scanner script that runs in CI. It fails the build if developer code attempts to duplicate status-based revenue queries outside of the canonicalRevenueService.ts, enforcing the SSOT property automatically. - Advisory Locks: The cron jobs acquire database-level advisory transaction locks (
pg_try_advisory_xact_lock). If multiple instances run simultaneously, they skip without throwing errors or locking table rows.
- Stripe Node SDK β https://docs.stripe.com/api?lang=node
- HubSpot CRM API v3 β https://developers.hubspot.com/docs/api/crm/deals
- Google Calendar API β https://developers.google.com/calendar/api/v3/reference/events/list
- Prisma migrations β https://www.prisma.io/docs/concepts/components/prisma-migrate
- Postgres advisory locks β https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
- Outbox pattern β https://microservices.io/patterns/data/transactional-outbox.html
- Render free tier specifics β https://render.com/docs/free
- Supabase pgbouncer config β https://supabase.com/docs/guides/database/connecting-to-postgres#connection-pooler
This project was built collaboratively with Claude (Anthropic). Chat share link: <add before submission>.