diff --git a/migrations/20260623120000-create-mail-bucket-entries.js b/migrations/20260623120000-create-mail-bucket-entries.js new file mode 100644 index 0000000..86080ad --- /dev/null +++ b/migrations/20260623120000-create-mail-bucket-entries.js @@ -0,0 +1,53 @@ +'use strict'; + +const TABLE_NAME = 'mail_bucket_entries'; + +/** @type {import('sequelize-cli').Migration} */ +module.exports = { + async up(queryInterface, Sequelize) { + await queryInterface.createTable(TABLE_NAME, { + id: { + type: Sequelize.UUID, + defaultValue: Sequelize.UUIDV4, + primaryKey: true, + allowNull: false, + }, + mail_address_id: { + type: Sequelize.UUID, + allowNull: false, + references: { model: 'mail_addresses', key: 'id' }, + onUpdate: 'CASCADE', + onDelete: 'CASCADE', + }, + entry_key: { + type: Sequelize.STRING(255), + allowNull: false, + unique: true, + }, + bridge_entry_id: { + type: Sequelize.STRING(24), + allowNull: false, + }, + size: { + type: Sequelize.BIGINT, + allowNull: false, + }, + created_at: { + type: Sequelize.DATE, + allowNull: false, + defaultValue: Sequelize.fn('now'), + }, + updated_at: { + type: Sequelize.DATE, + allowNull: false, + defaultValue: Sequelize.fn('now'), + }, + }); + + await queryInterface.addIndex(TABLE_NAME, ['mail_address_id']); + }, + + async down(queryInterface) { + await queryInterface.dropTable(TABLE_NAME); + }, +}; diff --git a/src/modules/account/repositories/address.repository.spec.ts b/src/modules/account/repositories/address.repository.spec.ts index b85c944..ea2bdd9 100644 --- a/src/modules/account/repositories/address.repository.spec.ts +++ b/src/modules/account/repositories/address.repository.spec.ts @@ -134,6 +134,7 @@ describe('AddressRepository', () => { it('when a provider link resolves to an account, then returns userUuid and networkBucketId', async () => { const link = { address: { + id: 'address-1', networkBucketId: 'bucket-1', account: { userId: 'user-uuid-1' }, }, @@ -154,6 +155,7 @@ describe('AddressRepository', () => { ], }); expect(result).toEqual({ + mailAddressId: 'address-1', userUuid: 'user-uuid-1', networkBucketId: 'bucket-1', }); @@ -172,6 +174,7 @@ describe('AddressRepository', () => { describe('findBucketContextByAddress', () => { it('when the address resolves to an account, then returns userUuid and networkBucketId', async () => { const model = { + id: 'address-1', networkBucketId: 'bucket-1', account: { userId: 'user-uuid-1' }, } as unknown as MailAddressModel; @@ -185,6 +188,7 @@ describe('AddressRepository', () => { include: [{ model: MailAccountModel, required: true }], }); expect(result).toEqual({ + mailAddressId: 'address-1', userUuid: 'user-uuid-1', networkBucketId: 'bucket-1', }); diff --git a/src/modules/account/repositories/address.repository.ts b/src/modules/account/repositories/address.repository.ts index ec11742..9fcbd9f 100644 --- a/src/modules/account/repositories/address.repository.ts +++ b/src/modules/account/repositories/address.repository.ts @@ -13,6 +13,7 @@ import { MailProviderAccountModel } from '../models/mail-provider-account.model. const MAX_BATCH_LOOKUP = 50; export interface ProviderAccountBucketContext { + mailAddressId: string; userUuid: string; networkBucketId: string | null; } @@ -110,6 +111,7 @@ export class AddressRepository { if (!model?.account) return null; return { + mailAddressId: model.id, userUuid: model.account.userId, networkBucketId: model.networkBucketId, }; @@ -132,6 +134,7 @@ export class AddressRepository { if (!link?.address?.account) return null; return { + mailAddressId: link.address.id, userUuid: link.address.account.userId, networkBucketId: link.address.networkBucketId, }; diff --git a/src/modules/email/email.module.ts b/src/modules/email/email.module.ts index 876af55..b4c93b0 100644 --- a/src/modules/email/email.module.ts +++ b/src/modules/email/email.module.ts @@ -1,5 +1,5 @@ import { Module } from '@nestjs/common'; -import { BridgeModule } from '../infrastructure/bridge/bridge.module.js'; +import { MailUsageModule } from '../usage/mail-usage.module.js'; import { JmapModule } from '../infrastructure/jmap/jmap.module.js'; import { SmtpModule } from '../infrastructure/smtp/smtp.module.js'; import { ProvisioningModule } from '../provisioning/provisioning.module.js'; @@ -8,7 +8,7 @@ import { EmailService } from './email.service.js'; import { Reflector } from '@nestjs/core'; @Module({ - imports: [JmapModule, SmtpModule, ProvisioningModule, BridgeModule], + imports: [JmapModule, SmtpModule, ProvisioningModule, MailUsageModule], controllers: [EmailController], providers: [EmailService, Reflector], }) diff --git a/src/modules/email/email.service.spec.ts b/src/modules/email/email.service.spec.ts index a547dc6..46043f9 100644 --- a/src/modules/email/email.service.spec.ts +++ b/src/modules/email/email.service.spec.ts @@ -8,7 +8,7 @@ import { Readable } from 'node:stream'; import { EmailService } from './email.service.js'; import { MailProvider } from './mail-provider.port.js'; import { AccountService } from '../account/account.service.js'; -import { BridgeClient } from '../infrastructure/bridge/bridge.service.js'; +import { MailUsageService } from '../usage/mail-usage.service.js'; import { StalwartSmtpService } from '../infrastructure/smtp/stalwart-smtp.service.js'; import { newMailbox, @@ -39,7 +39,7 @@ describe('EmailService', () => { let accountService: DeepMocked; let smtp: DeepMocked; let configService: DeepMocked; - let bridge: DeepMocked; + let usage: DeepMocked; const userEmail = 'test@example.com'; beforeEach(async () => { @@ -55,7 +55,7 @@ describe('EmailService', () => { accountService = module.get>(AccountService); smtp = module.get>(StalwartSmtpService); configService = module.get>(ConfigService); - bridge = module.get>(BridgeClient); + usage = module.get>(MailUsageService); }); describe('getMailboxes', () => { @@ -464,12 +464,13 @@ describe('EmailService', () => { await service.deleteEmail(userEmail, 'email-id'); expect(provider.deleteEmail).toHaveBeenCalledWith(userEmail, 'email-id'); - expect(bridge.deleteBucketEntry).not.toHaveBeenCalled(); + expect(usage.releaseStoredMessage).not.toHaveBeenCalled(); }); it('when the message is permanently destroyed, then releases the quota entry on the address bucket', async () => { provider.deleteEmail.mockResolvedValue({ deletedEntryKey: '42:7' }); accountService.findBucketContextByAddress.mockResolvedValue({ + mailAddressId: 'address-1', userUuid: 'user-1', networkBucketId: 'bucket-1', }); @@ -479,32 +480,34 @@ describe('EmailService', () => { expect(accountService.findBucketContextByAddress).toHaveBeenCalledWith( userEmail, ); - expect(bridge.deleteBucketEntry).toHaveBeenCalledWith( - 'user-1', - 'bucket-1', - '42:7', - ); + expect(usage.releaseStoredMessage).toHaveBeenCalledWith({ + userUuid: 'user-1', + bucketId: 'bucket-1', + entryKey: '42:7', + }); }); it('when the destroyed address has no network bucket, then no quota entry is released', async () => { provider.deleteEmail.mockResolvedValue({ deletedEntryKey: '42:7' }); accountService.findBucketContextByAddress.mockResolvedValue({ + mailAddressId: 'address-1', userUuid: 'user-1', networkBucketId: null, }); await service.deleteEmail(userEmail, 'email-id'); - expect(bridge.deleteBucketEntry).not.toHaveBeenCalled(); + expect(usage.releaseStoredMessage).not.toHaveBeenCalled(); }); it('when releasing the quota entry fails, then the deletion still succeeds', async () => { provider.deleteEmail.mockResolvedValue({ deletedEntryKey: '42:7' }); accountService.findBucketContextByAddress.mockResolvedValue({ + mailAddressId: 'address-1', userUuid: 'user-1', networkBucketId: 'bucket-1', }); - bridge.deleteBucketEntry.mockRejectedValue(new Error('Bridge down')); + usage.releaseStoredMessage.mockRejectedValue(new Error('Bridge down')); await expect( service.deleteEmail(userEmail, 'email-id'), diff --git a/src/modules/email/email.service.ts b/src/modules/email/email.service.ts index a9879d3..aeb7939 100644 --- a/src/modules/email/email.service.ts +++ b/src/modules/email/email.service.ts @@ -6,7 +6,7 @@ import { } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { AccountService } from '../account/account.service.js'; -import { BridgeClient } from '../infrastructure/bridge/bridge.service.js'; +import { MailUsageService } from '../usage/mail-usage.service.js'; import { MailProvider } from './mail-provider.port.js'; import type { DraftEmailDto, @@ -60,7 +60,7 @@ export class EmailService { private readonly accountService: AccountService, private readonly smtp: StalwartSmtpService, private readonly configService: ConfigService, - private readonly bridge: BridgeClient, + private readonly usage: MailUsageService, ) {} getMailboxes(userEmail: string): Promise { @@ -276,11 +276,11 @@ export class EmailService { } try { - await this.bridge.deleteBucketEntry( - context.userUuid, - context.networkBucketId, + await this.usage.releaseStoredMessage({ + userUuid: context.userUuid, + bucketId: context.networkBucketId, entryKey, - ); + }); } catch (error) { this.logger.warn( `Failed to release quota entry '${entryKey}' for '${userEmail}': ${(error as Error).message}`, diff --git a/src/modules/infrastructure/bridge/bridge.service.spec.ts b/src/modules/infrastructure/bridge/bridge.service.spec.ts index 9910091..277a9c8 100644 --- a/src/modules/infrastructure/bridge/bridge.service.spec.ts +++ b/src/modules/infrastructure/bridge/bridge.service.spec.ts @@ -143,7 +143,7 @@ describe('BridgeClient', () => { }); describe('createBucketEntry', () => { - it('when Bridge returns 200, then signs a token, POSTs key and size, and returns the entry', async () => { + it('when Bridge returns 200, then signs a token, POSTs only the size, and returns the entry', async () => { const entry = { id: 'entry-1', maxSpaceBytes: 1000, @@ -155,19 +155,14 @@ describe('BridgeClient', () => { body: { text: () => Promise.resolve(JSON.stringify(entry)) }, }); - const result = await service.createBucketEntry( - 'user-1', - 'bucket-1', - '42:7', - 240, - ); + const result = await service.createBucketEntry('user-1', 'bucket-1', 240); expect(result).toStrictEqual(entry); expect(httpRequest).toHaveBeenCalledWith( expect.objectContaining({ method: 'POST', path: '/v2/gateway/users/user-1/buckets/bucket-1/entries', - body: JSON.stringify({ key: '42:7', size: 240 }), + body: JSON.stringify({ size: 240 }), headers: expect.objectContaining({ authorization: 'Bearer signed-jwt', }) as unknown, @@ -183,7 +178,7 @@ describe('BridgeClient', () => { }); const error: unknown = await service - .createBucketEntry('user-1', 'bucket-1', '42:7', 240) + .createBucketEntry('user-1', 'bucket-1', 240) .catch((e: unknown) => e); expect(error).toBeInstanceOf(BridgeApiError); @@ -196,19 +191,25 @@ describe('BridgeClient', () => { }); describe('deleteBucketEntry', () => { - it('when Bridge returns 200, then signs a token and DELETEs the url-encoded entry key', async () => { + it('when Bridge returns 200, then signs a token, DELETEs by entry id, and returns the snapshot', async () => { + const snapshot = { maxSpaceBytes: 1000, totalUsedSpaceBytes: 0 }; jwtService.sign.mockReturnValue('signed-jwt'); httpRequest.mockResolvedValue({ statusCode: 200, - body: { text: () => Promise.resolve('') }, + body: { text: () => Promise.resolve(JSON.stringify(snapshot)) }, }); - await service.deleteBucketEntry('user-1', 'bucket-1', '42:7'); + const result = await service.deleteBucketEntry( + 'user-1', + 'bucket-1', + 'entry-1', + ); + expect(result).toStrictEqual(snapshot); expect(httpRequest).toHaveBeenCalledWith( expect.objectContaining({ method: 'DELETE', - path: '/v2/gateway/users/user-1/buckets/bucket-1/entries/42%3A7', + path: '/v2/gateway/users/user-1/buckets/bucket-1/entries/entry-1', headers: expect.objectContaining({ authorization: 'Bearer signed-jwt', }) as unknown, @@ -224,7 +225,7 @@ describe('BridgeClient', () => { }); const error: unknown = await service - .deleteBucketEntry('user-1', 'bucket-1', '42:7') + .deleteBucketEntry('user-1', 'bucket-1', 'entry-1') .catch((e: unknown) => e); expect(error).toBeInstanceOf(BridgeApiError); diff --git a/src/modules/infrastructure/bridge/bridge.service.ts b/src/modules/infrastructure/bridge/bridge.service.ts index 1068606..03335d4 100644 --- a/src/modules/infrastructure/bridge/bridge.service.ts +++ b/src/modules/infrastructure/bridge/bridge.service.ts @@ -7,7 +7,11 @@ import { import { ConfigService } from '@nestjs/config'; import { JwtService } from '@nestjs/jwt'; import { Client } from 'undici'; -import type { BucketEntry, MailBucket } from './bridge.types.js'; +import type { + BucketEntry, + MailBucket, + UserSpaceSnapshot, +} from './bridge.types.js'; @Injectable() export class BridgeClient implements OnModuleInit, OnModuleDestroy { @@ -101,7 +105,6 @@ export class BridgeClient implements OnModuleInit, OnModuleDestroy { async createBucketEntry( userUuid: string, bucketId: string, - key: string, size: number, ): Promise { const token = this.signGatewayToken(userUuid); @@ -114,14 +117,14 @@ export class BridgeClient implements OnModuleInit, OnModuleDestroy { accept: 'application/json', authorization: `Bearer ${token}`, }, - body: JSON.stringify({ key, size }), + body: JSON.stringify({ size }), }); const text = await body.text(); if (statusCode !== 200) { throw new BridgeApiError( - `Failed to create bucket entry '${key}' on bucket '${bucketId}' for user '${userUuid}': HTTP ${statusCode}`, + `Failed to create bucket entry on bucket '${bucketId}' for user '${userUuid}': HTTP ${statusCode}`, statusCode, text, ); @@ -133,13 +136,13 @@ export class BridgeClient implements OnModuleInit, OnModuleDestroy { async deleteBucketEntry( userUuid: string, bucketId: string, - key: string, - ): Promise { + entryId: string, + ): Promise { const token = this.signGatewayToken(userUuid); const { statusCode, body } = await this.httpClient.request({ method: 'DELETE', - path: `${this.basePath}/v2/gateway/users/${encodeURIComponent(userUuid)}/buckets/${encodeURIComponent(bucketId)}/entries/${encodeURIComponent(key)}`, + path: `${this.basePath}/v2/gateway/users/${encodeURIComponent(userUuid)}/buckets/${encodeURIComponent(bucketId)}/entries/${encodeURIComponent(entryId)}`, headers: { accept: 'application/json', authorization: `Bearer ${token}`, @@ -150,11 +153,13 @@ export class BridgeClient implements OnModuleInit, OnModuleDestroy { if (statusCode !== 200) { throw new BridgeApiError( - `Failed to delete bucket entry '${key}' on bucket '${bucketId}' for user '${userUuid}': HTTP ${statusCode}`, + `Failed to delete bucket entry '${entryId}' on bucket '${bucketId}' for user '${userUuid}': HTTP ${statusCode}`, statusCode, text, ); } + + return JSON.parse(text) as UserSpaceSnapshot; } private signGatewayToken(userUuid: string): string { diff --git a/src/modules/stalwart-events/stalwart-events.module.ts b/src/modules/stalwart-events/stalwart-events.module.ts index 34d3b1d..e8a70a1 100644 --- a/src/modules/stalwart-events/stalwart-events.module.ts +++ b/src/modules/stalwart-events/stalwart-events.module.ts @@ -1,12 +1,12 @@ import { Module } from '@nestjs/common'; import { AccountModule } from '../account/account.module.js'; -import { BridgeModule } from '../infrastructure/bridge/bridge.module.js'; +import { MailUsageModule } from '../usage/mail-usage.module.js'; import { StalwartEventsController } from './stalwart-events.controller.js'; import { StalwartEventsAuthGuard } from './stalwart-events-auth.guard.js'; import { StalwartEventsService } from './stalwart-events.service.js'; @Module({ - imports: [AccountModule, BridgeModule], + imports: [AccountModule, MailUsageModule], controllers: [StalwartEventsController], providers: [StalwartEventsAuthGuard, StalwartEventsService], }) diff --git a/src/modules/stalwart-events/stalwart-events.service.spec.ts b/src/modules/stalwart-events/stalwart-events.service.spec.ts index 5db044e..56154a9 100644 --- a/src/modules/stalwart-events/stalwart-events.service.spec.ts +++ b/src/modules/stalwart-events/stalwart-events.service.spec.ts @@ -2,7 +2,7 @@ import { describe, it, expect, beforeEach } from 'vitest'; import { Test, type TestingModule } from '@nestjs/testing'; import { createMock, type DeepMocked } from '@golevelup/ts-vitest'; import { AccountService } from '../account/account.service.js'; -import { BridgeClient } from '../infrastructure/bridge/bridge.service.js'; +import { MailUsageService } from '../usage/mail-usage.service.js'; import { StalwartEventsService } from './stalwart-events.service.js'; import type { StalwartEvent, @@ -34,7 +34,7 @@ function ingestEvent( describe('StalwartEventsService', () => { let service: StalwartEventsService; let accounts: DeepMocked; - let bridge: DeepMocked; + let usage: DeepMocked; beforeEach(async () => { const module: TestingModule = await Test.createTestingModule({ @@ -45,32 +45,29 @@ describe('StalwartEventsService', () => { service = module.get(StalwartEventsService); accounts = module.get(AccountService); - bridge = module.get(BridgeClient); + usage = module.get(MailUsageService); }); describe('handleBatch', () => { - it('when an ingest event resolves to a bucket, then creates a bucket entry keyed by accountId:documentId', async () => { + it('when an ingest event resolves to a bucket, then tracks the stored message keyed by accountId:documentId', async () => { accounts.findBucketContextByProviderInternalId.mockResolvedValue({ + mailAddressId: 'address-1', userUuid: 'user-1', networkBucketId: 'bucket-1', }); - bridge.createBucketEntry.mockResolvedValue({ - id: 'entry-1', - maxSpaceBytes: 1000, - totalUsedSpaceBytes: 240, - }); await service.handleBatch({ events: [ingestEvent()] }); expect( accounts.findBucketContextByProviderInternalId, ).toHaveBeenCalledWith('42'); - expect(bridge.createBucketEntry).toHaveBeenCalledWith( - 'user-1', - 'bucket-1', - '42:7', - 240, - ); + expect(usage.trackStoredMessage).toHaveBeenCalledWith({ + mailAddressId: 'address-1', + userUuid: 'user-1', + bucketId: 'bucket-1', + entryKey: '42:7', + size: 240, + }); }); it('when the event type is a duplicate, then it is skipped', async () => { @@ -81,7 +78,7 @@ describe('StalwartEventsService', () => { expect( accounts.findBucketContextByProviderInternalId, ).not.toHaveBeenCalled(); - expect(bridge.createBucketEntry).not.toHaveBeenCalled(); + expect(usage.trackStoredMessage).not.toHaveBeenCalled(); }); it('when the event type is not a message-ingest, then it is skipped', async () => { @@ -95,38 +92,35 @@ describe('StalwartEventsService', () => { expect( accounts.findBucketContextByProviderInternalId, ).not.toHaveBeenCalled(); - expect(bridge.createBucketEntry).not.toHaveBeenCalled(); + expect(usage.trackStoredMessage).not.toHaveBeenCalled(); }); - it('when no account resolves for the event, then no bucket entry is created', async () => { + it('when no account resolves for the event, then no message is tracked', async () => { accounts.findBucketContextByProviderInternalId.mockResolvedValue(null); await service.handleBatch({ events: [ingestEvent()] }); - expect(bridge.createBucketEntry).not.toHaveBeenCalled(); + expect(usage.trackStoredMessage).not.toHaveBeenCalled(); }); - it('when the resolved address has no network bucket, then no bucket entry is created', async () => { + it('when the resolved address has no network bucket, then no message is tracked', async () => { accounts.findBucketContextByProviderInternalId.mockResolvedValue({ + mailAddressId: 'address-1', userUuid: 'user-1', networkBucketId: null, }); await service.handleBatch({ events: [ingestEvent()] }); - expect(bridge.createBucketEntry).not.toHaveBeenCalled(); + expect(usage.trackStoredMessage).not.toHaveBeenCalled(); }); - it('when the batch has several events, then each ingest event is processed', async () => { + it('when the batch has several events, then each ingest event is tracked', async () => { accounts.findBucketContextByProviderInternalId.mockResolvedValue({ + mailAddressId: 'address-1', userUuid: 'user-1', networkBucketId: 'bucket-1', }); - bridge.createBucketEntry.mockResolvedValue({ - id: 'entry-1', - maxSpaceBytes: 1000, - totalUsedSpaceBytes: 240, - }); await service.handleBatch({ events: [ @@ -135,18 +129,12 @@ describe('StalwartEventsService', () => { ], }); - expect(bridge.createBucketEntry).toHaveBeenCalledTimes(2); - expect(bridge.createBucketEntry).toHaveBeenCalledWith( - 'user-1', - 'bucket-1', - '42:7', - 240, + expect(usage.trackStoredMessage).toHaveBeenCalledTimes(2); + expect(usage.trackStoredMessage).toHaveBeenCalledWith( + expect.objectContaining({ entryKey: '42:7' }), ); - expect(bridge.createBucketEntry).toHaveBeenCalledWith( - 'user-1', - 'bucket-1', - '42:8', - 240, + expect(usage.trackStoredMessage).toHaveBeenCalledWith( + expect.objectContaining({ entryKey: '42:8' }), ); }); }); diff --git a/src/modules/stalwart-events/stalwart-events.service.ts b/src/modules/stalwart-events/stalwart-events.service.ts index 615ffa4..0c842fb 100644 --- a/src/modules/stalwart-events/stalwart-events.service.ts +++ b/src/modules/stalwart-events/stalwart-events.service.ts @@ -1,6 +1,6 @@ import { Injectable, Logger } from '@nestjs/common'; import { AccountService } from '../account/account.service.js'; -import { BridgeClient } from '../infrastructure/bridge/bridge.service.js'; +import { MailUsageService } from '../usage/mail-usage.service.js'; import type { StalwartEvent, StalwartWebhookPayload, @@ -12,7 +12,7 @@ export class StalwartEventsService { constructor( private readonly accounts: AccountService, - private readonly bridge: BridgeClient, + private readonly usage: MailUsageService, ) {} async handleBatch(payload: StalwartWebhookPayload): Promise { @@ -57,16 +57,12 @@ export class StalwartEventsService { return; } - const { totalUsedSpaceBytes } = await this.bridge.createBucketEntry( - context.userUuid, - context.networkBucketId, + await this.usage.trackStoredMessage({ + mailAddressId: context.mailAddressId, + userUuid: context.userUuid, + bucketId: context.networkBucketId, entryKey, size, - ); - - this.logger.log( - { entryKey, size, totalUsedSpaceBytes, type: event.type }, - 'Created bucket entry for ingested message', - ); + }); } } diff --git a/src/modules/usage/domain/mail-bucket-entry.domain.ts b/src/modules/usage/domain/mail-bucket-entry.domain.ts new file mode 100644 index 0000000..5f177f8 --- /dev/null +++ b/src/modules/usage/domain/mail-bucket-entry.domain.ts @@ -0,0 +1,27 @@ +export interface MailBucketEntryAttributes { + id: string; + mailAddressId: string; + entryKey: string; + bridgeEntryId: string; + size: number; + createdAt: Date; + updatedAt: Date; +} + +export class MailBucketEntry { + readonly id!: string; + readonly mailAddressId!: string; + readonly entryKey!: string; + readonly bridgeEntryId!: string; + readonly size!: number; + readonly createdAt!: Date; + readonly updatedAt!: Date; + + private constructor(attributes: MailBucketEntryAttributes) { + Object.assign(this, attributes); + } + + static build(attributes: MailBucketEntryAttributes): MailBucketEntry { + return new MailBucketEntry(attributes); + } +} diff --git a/src/modules/usage/mail-usage.module.ts b/src/modules/usage/mail-usage.module.ts new file mode 100644 index 0000000..79abfc8 --- /dev/null +++ b/src/modules/usage/mail-usage.module.ts @@ -0,0 +1,13 @@ +import { Module } from '@nestjs/common'; +import { SequelizeModule } from '@nestjs/sequelize'; +import { BridgeModule } from '../infrastructure/bridge/bridge.module.js'; +import { MailBucketEntryModel } from './models/mail-bucket-entry.model.js'; +import { MailBucketEntryRepository } from './repositories/mail-bucket-entry.repository.js'; +import { MailUsageService } from './mail-usage.service.js'; + +@Module({ + imports: [SequelizeModule.forFeature([MailBucketEntryModel]), BridgeModule], + providers: [MailBucketEntryRepository, MailUsageService], + exports: [MailUsageService], +}) +export class MailUsageModule {} diff --git a/src/modules/usage/mail-usage.service.spec.ts b/src/modules/usage/mail-usage.service.spec.ts new file mode 100644 index 0000000..f9258cb --- /dev/null +++ b/src/modules/usage/mail-usage.service.spec.ts @@ -0,0 +1,162 @@ +import { describe, it, expect, beforeEach } from 'vitest'; +import { Test, type TestingModule } from '@nestjs/testing'; +import { createMock, type DeepMocked } from '@golevelup/ts-vitest'; +import { BridgeClient } from '../infrastructure/bridge/bridge.service.js'; +import { MailBucketEntry } from './domain/mail-bucket-entry.domain.js'; +import { + DuplicateEntryKeyError, + MailBucketEntryRepository, +} from './repositories/mail-bucket-entry.repository.js'; +import { MailUsageService } from './mail-usage.service.js'; + +function entry(overrides: Partial = {}): MailBucketEntry { + return MailBucketEntry.build({ + id: 'row-1', + mailAddressId: 'address-1', + entryKey: '42:7', + bridgeEntryId: 'entry-1', + size: 240, + createdAt: new Date(), + updatedAt: new Date(), + ...overrides, + }); +} + +const trackParams = { + mailAddressId: 'address-1', + userUuid: 'user-1', + bucketId: 'bucket-1', + entryKey: '42:7', + size: 240, +}; + +describe('MailUsageService', () => { + let service: MailUsageService; + let entries: DeepMocked; + let bridge: DeepMocked; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [MailUsageService], + }) + .useMocker(() => createMock()) + .compile(); + + service = module.get(MailUsageService); + entries = module.get(MailBucketEntryRepository); + bridge = module.get(BridgeClient); + }); + + describe('trackStoredMessage', () => { + it('when the entry is new, then mints a bridge entry and persists the pointer', async () => { + entries.findByEntryKey.mockResolvedValue(null); + bridge.createBucketEntry.mockResolvedValue({ + id: 'entry-1', + maxSpaceBytes: 1000, + totalUsedSpaceBytes: 240, + }); + + await service.trackStoredMessage(trackParams); + + expect(bridge.createBucketEntry).toHaveBeenCalledWith( + 'user-1', + 'bucket-1', + 240, + ); + expect(entries.create).toHaveBeenCalledWith({ + mailAddressId: 'address-1', + entryKey: '42:7', + bridgeEntryId: 'entry-1', + size: 240, + }); + }); + + it('when the entry is already tracked, then mints nothing (idempotent)', async () => { + entries.findByEntryKey.mockResolvedValue(entry()); + + await service.trackStoredMessage(trackParams); + + expect(bridge.createBucketEntry).not.toHaveBeenCalled(); + expect(entries.create).not.toHaveBeenCalled(); + }); + + it('when a concurrent delivery wins the race, then rolls back the minted bridge entry', async () => { + entries.findByEntryKey.mockResolvedValue(null); + bridge.createBucketEntry.mockResolvedValue({ + id: 'entry-1', + maxSpaceBytes: 1000, + totalUsedSpaceBytes: 240, + }); + entries.create.mockRejectedValue(new DuplicateEntryKeyError('42:7')); + + await service.trackStoredMessage(trackParams); + + expect(bridge.deleteBucketEntry).toHaveBeenCalledWith( + 'user-1', + 'bucket-1', + 'entry-1', + ); + }); + + it('when persistence fails for another reason, then propagates and does not roll back', async () => { + entries.findByEntryKey.mockResolvedValue(null); + bridge.createBucketEntry.mockResolvedValue({ + id: 'entry-1', + maxSpaceBytes: 1000, + totalUsedSpaceBytes: 240, + }); + entries.create.mockRejectedValue(new Error('DB down')); + + await expect(service.trackStoredMessage(trackParams)).rejects.toThrow( + 'DB down', + ); + expect(bridge.deleteBucketEntry).not.toHaveBeenCalled(); + }); + }); + + describe('releaseStoredMessage', () => { + it('when the pointer exists, then deletes the bridge entry by id and drops the pointer', async () => { + entries.findByEntryKey.mockResolvedValue(entry()); + + await service.releaseStoredMessage({ + userUuid: 'user-1', + bucketId: 'bucket-1', + entryKey: '42:7', + }); + + expect(bridge.deleteBucketEntry).toHaveBeenCalledWith( + 'user-1', + 'bucket-1', + 'entry-1', + ); + expect(entries.deleteByEntryKey).toHaveBeenCalledWith('42:7'); + }); + + it('when no pointer exists, then is a no-op', async () => { + entries.findByEntryKey.mockResolvedValue(null); + + await service.releaseStoredMessage({ + userUuid: 'user-1', + bucketId: 'bucket-1', + entryKey: '42:7', + }); + + expect(bridge.deleteBucketEntry).not.toHaveBeenCalled(); + expect(entries.deleteByEntryKey).not.toHaveBeenCalled(); + }); + + it('when the bridge delete fails, then the pointer is kept for retry', async () => { + entries.findByEntryKey.mockResolvedValue(entry()); + bridge.deleteBucketEntry.mockRejectedValue(new Error('Bridge down')); + + await expect( + service.releaseStoredMessage({ + userUuid: 'user-1', + bucketId: 'bucket-1', + entryKey: '42:7', + }), + ).rejects.toThrow('Bridge down'); + expect(entries.deleteByEntryKey).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/src/modules/usage/mail-usage.service.ts b/src/modules/usage/mail-usage.service.ts new file mode 100644 index 0000000..4338e5a --- /dev/null +++ b/src/modules/usage/mail-usage.service.ts @@ -0,0 +1,94 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { BridgeClient } from '../infrastructure/bridge/bridge.service.js'; +import { + DuplicateEntryKeyError, + MailBucketEntryRepository, +} from './repositories/mail-bucket-entry.repository.js'; + +export interface TrackStoredMessageParams { + mailAddressId: string; + userUuid: string; + bucketId: string; + entryKey: string; + size: number; +} + +export interface ReleaseStoredMessageParams { + userUuid: string; + bucketId: string; + entryKey: string; +} + +@Injectable() +export class MailUsageService { + private readonly logger = new Logger(MailUsageService.name); + + constructor( + private readonly entries: MailBucketEntryRepository, + private readonly bridge: BridgeClient, + ) {} + + async trackStoredMessage(params: TrackStoredMessageParams): Promise { + const { mailAddressId, userUuid, bucketId, entryKey, size } = params; + + const existing = await this.entries.findByEntryKey(entryKey); + if (existing) { + this.logger.debug({ entryKey }, 'Message already tracked; skipping'); + return; + } + + const { id: bridgeEntryId, totalUsedSpaceBytes } = + await this.bridge.createBucketEntry(userUuid, bucketId, size); + + try { + await this.entries.create({ + mailAddressId, + entryKey, + bridgeEntryId, + size, + }); + } catch (error) { + if (error instanceof DuplicateEntryKeyError) { + this.logger.debug( + { entryKey, bridgeEntryId }, + 'Concurrent tracking detected; rolling back minted bucket entry', + ); + await this.bridge.deleteBucketEntry(userUuid, bucketId, bridgeEntryId); + return; + } + throw error; + } + + this.logger.log( + { entryKey, bridgeEntryId, size, totalUsedSpaceBytes }, + 'Tracked stored message', + ); + } + + async releaseStoredMessage( + params: ReleaseStoredMessageParams, + ): Promise { + const { userUuid, bucketId, entryKey } = params; + + const existing = await this.entries.findByEntryKey(entryKey); + if (!existing) { + this.logger.debug( + { entryKey }, + 'No tracked entry for message; skipping release', + ); + return; + } + + await this.bridge.deleteBucketEntry( + userUuid, + bucketId, + existing.bridgeEntryId, + ); + await this.entries.deleteByEntryKey(entryKey); + + this.logger.log( + { entryKey, bridgeEntryId: existing.bridgeEntryId }, + 'Released stored message', + ); + } +} diff --git a/src/modules/usage/models/mail-bucket-entry.model.ts b/src/modules/usage/models/mail-bucket-entry.model.ts new file mode 100644 index 0000000..5a56bb4 --- /dev/null +++ b/src/modules/usage/models/mail-bucket-entry.model.ts @@ -0,0 +1,46 @@ +import { + AllowNull, + BelongsTo, + Column, + DataType, + Default, + ForeignKey, + Model, + PrimaryKey, + Table, + Unique, +} from 'sequelize-typescript'; +import { MailAddressModel } from '../../account/models/mail-address.model.js'; + +@Table({ + underscored: true, + timestamps: true, + tableName: 'mail_bucket_entries', +}) +export class MailBucketEntryModel extends Model { + @PrimaryKey + @Default(DataType.UUIDV4) + @Column(DataType.UUID) + declare id: string; + + @AllowNull(false) + @ForeignKey(() => MailAddressModel) + @Column(DataType.UUID) + declare mailAddressId: string; + + @AllowNull(false) + @Unique + @Column(DataType.STRING(255)) + declare entryKey: string; + + @AllowNull(false) + @Column(DataType.STRING(24)) + declare bridgeEntryId: string; + + @AllowNull(false) + @Column(DataType.BIGINT) + declare size: string; + + @BelongsTo(() => MailAddressModel) + declare address: MailAddressModel; +} diff --git a/src/modules/usage/repositories/mail-bucket-entry.repository.spec.ts b/src/modules/usage/repositories/mail-bucket-entry.repository.spec.ts new file mode 100644 index 0000000..8208c80 --- /dev/null +++ b/src/modules/usage/repositories/mail-bucket-entry.repository.spec.ts @@ -0,0 +1,125 @@ +import { describe, it, expect, beforeEach } from 'vitest'; +import { Test, type TestingModule } from '@nestjs/testing'; +import { getModelToken } from '@nestjs/sequelize'; +import { createMock, type DeepMocked } from '@golevelup/ts-vitest'; +import { UniqueConstraintError } from 'sequelize'; +import { + DuplicateEntryKeyError, + MailBucketEntryRepository, +} from './mail-bucket-entry.repository.js'; +import { MailBucketEntryModel } from '../models/mail-bucket-entry.model.js'; + +describe('MailBucketEntryRepository', () => { + let repository: MailBucketEntryRepository; + let entryModel: DeepMocked; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [MailBucketEntryRepository], + }) + .useMocker(() => createMock()) + .compile(); + + repository = module.get(MailBucketEntryRepository); + entryModel = module.get(getModelToken(MailBucketEntryModel)); + }); + + describe('create', () => { + it('when persisting, then returns the entry with size coerced to a number', async () => { + const now = new Date(); + entryModel.create.mockResolvedValue({ + id: 'row-1', + mailAddressId: 'address-1', + entryKey: '42:7', + bridgeEntryId: 'entry-1', + size: '240', + createdAt: now, + updatedAt: now, + } as unknown as MailBucketEntryModel); + + const result = await repository.create({ + mailAddressId: 'address-1', + entryKey: '42:7', + bridgeEntryId: 'entry-1', + size: 240, + }); + + expect(entryModel.create).toHaveBeenCalledWith({ + mailAddressId: 'address-1', + entryKey: '42:7', + bridgeEntryId: 'entry-1', + size: 240, + }); + expect(result.size).toBe(240); + expect(result.entryKey).toBe('42:7'); + }); + + it('when the entry key already exists, then throws DuplicateEntryKeyError', async () => { + entryModel.create.mockRejectedValue( + new UniqueConstraintError({ errors: [] }), + ); + + await expect( + repository.create({ + mailAddressId: 'address-1', + entryKey: '42:7', + bridgeEntryId: 'entry-1', + size: 240, + }), + ).rejects.toBeInstanceOf(DuplicateEntryKeyError); + }); + + it('when persistence fails for another reason, then rethrows the original error', async () => { + entryModel.create.mockRejectedValue(new Error('DB down')); + + await expect( + repository.create({ + mailAddressId: 'address-1', + entryKey: '42:7', + bridgeEntryId: 'entry-1', + size: 240, + }), + ).rejects.toThrow('DB down'); + }); + }); + + describe('findByEntryKey', () => { + it('when a row exists, then returns the domain entry', async () => { + const now = new Date(); + entryModel.findOne.mockResolvedValue({ + id: 'row-1', + mailAddressId: 'address-1', + entryKey: '42:7', + bridgeEntryId: 'entry-1', + size: '240', + createdAt: now, + updatedAt: now, + } as unknown as MailBucketEntryModel); + + const result = await repository.findByEntryKey('42:7'); + + expect(entryModel.findOne).toHaveBeenCalledWith({ + where: { entryKey: '42:7' }, + }); + expect(result?.bridgeEntryId).toBe('entry-1'); + }); + + it('when no row matches, then returns null', async () => { + entryModel.findOne.mockResolvedValue(null); + + const result = await repository.findByEntryKey('42:7'); + + expect(result).toBeNull(); + }); + }); + + describe('deleteByEntryKey', () => { + it('when called, then destroys the row matching the entry key', async () => { + await repository.deleteByEntryKey('42:7'); + + expect(entryModel.destroy).toHaveBeenCalledWith({ + where: { entryKey: '42:7' }, + }); + }); + }); +}); diff --git a/src/modules/usage/repositories/mail-bucket-entry.repository.ts b/src/modules/usage/repositories/mail-bucket-entry.repository.ts new file mode 100644 index 0000000..7f63e7d --- /dev/null +++ b/src/modules/usage/repositories/mail-bucket-entry.repository.ts @@ -0,0 +1,64 @@ +import { Injectable } from '@nestjs/common'; +import { InjectModel } from '@nestjs/sequelize'; +import { UniqueConstraintError } from 'sequelize'; +import { + MailBucketEntry, + type MailBucketEntryAttributes, +} from '../domain/mail-bucket-entry.domain.js'; +import { MailBucketEntryModel } from '../models/mail-bucket-entry.model.js'; + +export interface CreateMailBucketEntryParams { + mailAddressId: string; + entryKey: string; + bridgeEntryId: string; + size: number; +} + +export class DuplicateEntryKeyError extends Error { + constructor(entryKey: string) { + super(`Bucket entry already tracked for key '${entryKey}'`); + this.name = 'DuplicateEntryKeyError'; + } +} + +@Injectable() +export class MailBucketEntryRepository { + constructor( + @InjectModel(MailBucketEntryModel) + private readonly entryModel: typeof MailBucketEntryModel, + ) {} + + async create(params: CreateMailBucketEntryParams): Promise { + try { + const model = await this.entryModel.create({ ...params }); + return this.toDomain(model); + } catch (error) { + if (error instanceof UniqueConstraintError) { + throw new DuplicateEntryKeyError(params.entryKey); + } + throw error; + } + } + + async findByEntryKey(entryKey: string): Promise { + const model = await this.entryModel.findOne({ where: { entryKey } }); + return model ? this.toDomain(model) : null; + } + + async deleteByEntryKey(entryKey: string): Promise { + await this.entryModel.destroy({ where: { entryKey } }); + } + + private toDomain(model: MailBucketEntryModel): MailBucketEntry { + const attrs: MailBucketEntryAttributes = { + id: model.id, + mailAddressId: model.mailAddressId, + entryKey: model.entryKey, + bridgeEntryId: model.bridgeEntryId, + size: Number(model.size), + createdAt: model.createdAt as Date, + updatedAt: model.updatedAt as Date, + }; + return MailBucketEntry.build(attrs); + } +}