Skip to content

Savory/rabbitMQ

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

@danet/rabbitmq

RabbitMQ queue/messaging transport for Danet, built on amqplib.

It brings the same DI-driven, decorator-based listener model used by Danet's built-in KV Queue and Events modules to a real RabbitMQ broker — inject RabbitMQ to publish, decorate methods with @OnRabbitMQMessage to consume.

Requirements

  • A reachable RabbitMQ broker (e.g. docker run -d -p 5672:5672 rabbitmq:3-management).
  • @danet/core >= 2.11.0.

Installation

deno add jsr:@danet/rabbitmq

Or import it directly with the jsr: specifier:

import { RabbitMQModule } from 'jsr:@danet/rabbitmq';

Usage

Import the module into your root module with forRoot, passing at least an AMQP url:

import { Module } from '@danet/core';
import { RabbitMQModule } from '@danet/rabbitmq';
import { OrderService } from './order.service.ts';

@Module({
	imports: [
		RabbitMQModule.forRoot({
			url: 'amqp://guest:guest@localhost:5672',
			prefetch: 10, // optional QoS
		}),
	],
	injectables: [OrderService],
})
export class AppModule {}

Publishing

Inject RabbitMQ and call sendMessage(queue, payload):

import { Injectable } from '@danet/core';
import { RabbitMQ } from '@danet/rabbitmq';

@Injectable()
export class OrderService {
	constructor(private rabbit: RabbitMQ) {}

	create() {
		this.rabbit.sendMessage('order.created', { orderId: 1 });
	}
}

Consuming

Decorate a method with @OnRabbitMQMessage(queue). It receives the JSON-decoded payload. When it resolves the message is acked; when it throws the message is nacked (without requeue) so a dead-letter policy can take over.

import { Injectable } from '@danet/core';
import { OnRabbitMQMessage } from '@danet/rabbitmq';

@Injectable()
export class OrderHandler {
	@OnRabbitMQMessage('order.created')
	handleOrderCreated(payload: { orderId: number }) {
		// process the message
	}
}

Exchanges (optional)

By default a listener consumes a durable queue directly. To bind that queue to an exchange, pass options:

@OnRabbitMQMessage('order.shipped.email', {
	exchange: 'orders',
	exchangeType: 'topic',
	routingKey: 'order.shipped',
})
notifyByEmail(payload: OrderShipped) {}

Publish to an exchange with publish:

this.rabbit.publish('orders', 'order.shipped', { orderId: 1 }, 'topic');

Sending messages from a non-Danet app

Unlike Danet's KV Queue — where Deno.kv has a single global queue and messages are wrapped as { type, data } — RabbitMQ has native queues and exchanges, so no wrapper is needed. The message body is the raw JSON of your payload:

{ "orderId": 1 }

@OnRabbitMQMessage('order.created') simply consumes the order.created queue and JSON.parses the body. Any RabbitMQ producer (any language) that publishes JSON to the same queue/exchange will be consumed.

Acknowledgements

Messages are acknowledged automatically: ack on success, nack (no requeue) on a thrown handler. To take over acking yourself, pass consumeOptions: { noAck: true } on the decorator.

Notes

The test task runs with --no-check because amqplib is a dynamically-loaded npm module. The exported package API is fully typed.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors