Skip to content

runmq/nestjs

Repository files navigation

nestjs-runmq

NestJS module for RunMQ — decorator-based message processors, an injectable publisher service, and automatic lifecycle management.

Installation

npm install nestjs-runmq runmq

Quick Start

1. Register the Module

// app.module.ts
import { Module } from '@nestjs/common';
import { RunMQModule } from 'nestjs-runmq';

@Module({
  imports: [
    RunMQModule.forRoot({
      url: 'amqp://guest:guest@localhost:5672',
      reconnectDelay: 3000,
      maxReconnectAttempts: 5,
      management: {
        url: 'http://localhost:15672',
        username: 'guest',
        password: 'guest',
      },
    }),
  ],
})
export class AppModule {}

RunMQ connects automatically when the app starts and disconnects cleanly on shutdown.


2. Create a Processor

Decorate a class with @Processor() and mark the handler method with @ProcessMessage():

// email.processor.ts
import { Injectable } from '@nestjs/common';
import { Processor, ProcessMessage, RunMQMessageContent } from 'nestjs-runmq';

@Processor({
  topic: 'user.created',
  name: 'emailService',
  consumersCount: 2,
  attempts: 3,
  attemptsDelay: 2000,
})
@Injectable()
export class EmailProcessor {
  @ProcessMessage()
  async handle(message: RunMQMessageContent<{ email: string; name: string }>) {
    console.log(`Sending welcome email to ${message.message.email}`);
  }
}

Register it as a provider in any module:

@Module({
  providers: [EmailProcessor],
})
export class EmailModule {}

The processor is discovered and registered automatically on startup — no manual wiring needed.


3. Publish Messages

Inject RunMQPublisherService anywhere in your app:

// user.service.ts
import { Injectable } from '@nestjs/common';
import { RunMQPublisherService } from 'nestjs-runmq';

@Injectable()
export class UserService {
  constructor(private readonly publisher: RunMQPublisherService) {}

  async createUser(email: string, name: string) {
    // ... save user
    this.publisher.publish('user.created', { email, name });
  }
}

Configuration

Static (forRoot)

RunMQModule.forRoot({
  url: 'amqp://guest:guest@localhost:5672',
  reconnectDelay: 5000,        // Optional, default: 5000ms
  maxReconnectAttempts: 5,     // Optional, default: 5
  management: {                // Optional, enables policy-based TTL
    url: 'http://localhost:15672',
    username: 'guest',
    password: 'guest',
  },
})

Async (forRootAsync)

Load configuration at runtime — e.g., from environment variables via @nestjs/config:

// app.module.ts
import { ConfigModule, ConfigService } from '@nestjs/config';

RunMQModule.forRootAsync({
  imports: [ConfigModule],
  inject: [ConfigService],
  useFactory: (config: ConfigService) => ({
    url: config.get('RABBITMQ_URL'),
    reconnectDelay: config.get('RABBITMQ_RECONNECT_DELAY', 5000),
    maxReconnectAttempts: config.get('RABBITMQ_MAX_RECONNECT_ATTEMPTS', 5),
    management: {
      url: config.get('RABBITMQ_MANAGEMENT_URL'),
      username: config.get('RABBITMQ_MANAGEMENT_USER', 'guest'),
      password: config.get('RABBITMQ_MANAGEMENT_PASS', 'guest'),
    },
  }),
})

useClass

import { Injectable } from '@nestjs/common';
import { RunMQOptionsFactory, RunMQModuleOptions } from 'nestjs-runmq';

@Injectable()
export class RabbitMQConfig implements RunMQOptionsFactory {
  createRunMQOptions(): RunMQModuleOptions {
    return {
      url: process.env.RABBITMQ_URL ?? 'amqp://guest:guest@localhost:5672',
      reconnectDelay: 5000,
      maxReconnectAttempts: 5,
      management: {
        url: process.env.RABBITMQ_MANAGEMENT_URL ?? 'http://localhost:15672',
        username: process.env.RABBITMQ_MANAGEMENT_USER ?? 'guest',
        password: process.env.RABBITMQ_MANAGEMENT_PASS ?? 'guest',
      },
    };
  }
}

RunMQModule.forRootAsync({ useClass: RabbitMQConfig })

useExisting

RunMQModule.forRootAsync({ useExisting: RabbitMQConfig })

Decorators

@Processor(options)

Class-level decorator. Marks a class as a RunMQ message processor.

Option Type Required Default Description
topic string Yes Topic to subscribe to
name string Yes Unique processor name (creates an isolated queue)
consumersCount number No 1 Concurrent consumers
attempts number No 1 Max retry attempts
attemptsDelay number No 1000 Milliseconds between retries
messageSchema MessageSchema No Optional JSON schema validation
usePoliciesForDelay boolean No false Use RabbitMQ policies for delay queues (recommended)

@ProcessMessage()

Method-level decorator. Marks which method handles incoming messages. Exactly one method per @Processor class must be decorated.

Method signature: (message: RunMQMessageContent<T>) => Promise<void>

@InjectRunMQ()

Parameter decorator. Injects the raw RunMQ instance for advanced use cases:

import { Injectable } from '@nestjs/common';
import { InjectRunMQ } from 'nestjs-runmq';
import { RunMQ } from 'runmq';

@Injectable()
export class HealthService {
  constructor(@InjectRunMQ() private readonly runmq: RunMQ) {}

  check() {
    return { rabbitmq: this.runmq.isActive() };
  }
}

Injectable Services

RunMQPublisherService

Method Signature Description
publish (topic: string, message: Record<string, any>, correlationId?: string) => void Publishes a message to the given topic

Throws 'RunMQ is not connected' if called before the connection is established.


Error Handling

Scenario Behavior
RabbitMQ unreachable at startup Logged via NestJS Logger, error is re-thrown
publish() before connection Throws Error('RunMQ is not connected')
Duplicate @Processor name Throws at startup: Duplicate processor name: {name}
No @ProcessMessage in a @Processor class Throws at startup: No @ProcessMessage handler found in {ClassName}
Multiple @ProcessMessage in one class Throws at startup: Multiple @ProcessMessage handlers in {ClassName}

Re-exported Types

The following types from runmq are re-exported for convenience:

import {
  RunMQMessageContent,
  RunMQMessageMetaContent,
  RunMQConnectionConfig,
  RunMQProcessorConfiguration,
  MessageSchema,
  SchemaType,
  SchemaFailureStrategy,
  RabbitMQManagementConfig,
  RunMQLogger,
  RunMQ,
} from 'nestjs-runmq';

Full Example

// app.module.ts
import { Module } from '@nestjs/common';
import { RunMQModule } from 'nestjs-runmq';
import { EmailModule } from './email/email.module';
import { UserModule } from './user/user.module';

@Module({
  imports: [
    RunMQModule.forRoot({
      url: 'amqp://guest:guest@localhost:5672',
      reconnectDelay: 5000,
      maxReconnectAttempts: 5,
      management: {
        url: 'http://localhost:15672',
        username: 'guest',
        password: 'guest',
      },
    }),
    EmailModule,
    UserModule,
  ],
})
export class AppModule {}
// email/email.processor.ts
import { Injectable } from '@nestjs/common';
import { Processor, ProcessMessage, RunMQMessageContent } from 'nestjs-runmq';

@Processor({ topic: 'user.created', name: 'emailService', consumersCount: 2, attempts: 3 })
@Injectable()
export class EmailProcessor {
  @ProcessMessage()
  async handle(message: RunMQMessageContent<{ email: string; name: string }>) {
    console.log(`Sending welcome email to ${message.message.email}`);
  }
}
// email/email.module.ts
import { Module } from '@nestjs/common';
import { EmailProcessor } from './email.processor';

@Module({ providers: [EmailProcessor] })
export class EmailModule {}
// user/user.service.ts
import { Injectable } from '@nestjs/common';
import { RunMQPublisherService } from 'nestjs-runmq';

@Injectable()
export class UserService {
  constructor(private readonly publisher: RunMQPublisherService) {}

  async createUser(email: string, name: string) {
    this.publisher.publish('user.created', { email, name });
  }
}

Dashboard

Monitor your queues, processors, and messages in real time with RunMQ Pulse.


License

MIT

About

NestJS module for RunMQ - a reliable, high-performance message queue library for Node.js, built on top of RabbitMQ

Topics

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors