- 1. API with NestJS #1. Controllers, routing and the module structure
- 2. API with NestJS #2. Setting up a PostgreSQL database with TypeORM
- 3. API with NestJS #3. Authenticating users with bcrypt, Passport, JWT, and cookies
- 4. API with NestJS #4. Error handling and data validation
- 5. API with NestJS #5. Serializing the response with interceptors
- 6. API with NestJS #6. Looking into dependency injection and modules
- 7. API with NestJS #7. Creating relationships with Postgres and TypeORM
- 8. API with NestJS #8. Writing unit tests
- 9. API with NestJS #9. Testing services and controllers with integration tests
- 10. API with NestJS #10. Uploading public files to Amazon S3
- 11. API with NestJS #11. Managing private files with Amazon S3
- 12. API with NestJS #12. Introduction to Elasticsearch
- 13. API with NestJS #13. Implementing refresh tokens using JWT
- 14. API with NestJS #14. Improving performance of our Postgres database with indexes
- 15. API with NestJS #15. Defining transactions with PostgreSQL and TypeORM
- 16. API with NestJS #16. Using the array data type with PostgreSQL and TypeORM
- 17. API with NestJS #17. Offset and keyset pagination with PostgreSQL and TypeORM
- 18. API with NestJS #18. Exploring the idea of microservices
- 19. API with NestJS #19. Using RabbitMQ to communicate with microservices
- 20. API with NestJS #20. Communicating with microservices using the gRPC framework
- 21. API with NestJS #21. An introduction to CQRS
- 22. API with NestJS #22. Storing JSON with PostgreSQL and TypeORM
- 23. API with NestJS #23. Implementing in-memory cache to increase the performance
- 24. API with NestJS #24. Cache with Redis. Running the app in a Node.js cluster
- 25. API with NestJS #25. Sending scheduled emails with cron and Nodemailer
- 26. API with NestJS #26. Real-time chat with WebSockets
- 27. API with NestJS #27. Introduction to GraphQL. Queries, mutations, and authentication
- 28. API with NestJS #28. Dealing in the N + 1 problem in GraphQL
- 29. API with NestJS #29. Real-time updates with GraphQL subscriptions
- 30. API with NestJS #30. Scalar types in GraphQL
- 31. API with NestJS #31. Two-factor authentication
- 32. API with NestJS #32. Introduction to Prisma with PostgreSQL
- 33. API with NestJS #33. Managing PostgreSQL relationships with Prisma
- 34. API with NestJS #34. Handling CPU-intensive tasks with queues
- 35. API with NestJS #35. Using server-side sessions instead of JSON Web Tokens
- 36. API with NestJS #36. Introduction to Stripe with React
- 37. API with NestJS #37. Using Stripe to save credit cards for future use
- 38. API with NestJS #38. Setting up recurring payments via subscriptions with Stripe
- 39. API with NestJS #39. Reacting to Stripe events with webhooks
- 40. API with NestJS #40. Confirming the email address
- 41. API with NestJS #41. Verifying phone numbers and sending SMS messages with Twilio
- 42. API with NestJS #42. Authenticating users with Google
- 43. API with NestJS #43. Introduction to MongoDB
- 44. API with NestJS #44. Implementing relationships with MongoDB
- 45. API with NestJS #45. Virtual properties with MongoDB and Mongoose
- 46. API with NestJS #46. Managing transactions with MongoDB and Mongoose
- 47. API with NestJS #47. Implementing pagination with MongoDB and Mongoose
- 48. API with NestJS #48. Definining indexes with MongoDB and Mongoose
- 49. API with NestJS #49. Updating with PUT and PATCH with MongoDB and Mongoose
- 50. API with NestJS #50. Introduction to logging with the built-in logger and TypeORM
- 51. API with NestJS #51. Health checks with Terminus and Datadog
- 52. API with NestJS #52. Generating documentation with Compodoc and JSDoc
- 53. API with NestJS #53. Implementing soft deletes with PostgreSQL and TypeORM
- 54. API with NestJS #54. Storing files inside a PostgreSQL database
- 55. API with NestJS #55. Uploading files to the server
- 56. API with NestJS #56. Authorization with roles and claims
- 57. API with NestJS #57. Composing classes with the mixin pattern
- 58. API with NestJS #58. Using ETag to implement cache and save bandwidth
- 59. API with NestJS #59. Introduction to a monorepo with Lerna and Yarn workspaces
- 60. API with NestJS #60. The OpenAPI specification and Swagger
- 61. API with NestJS #61. Dealing with circular dependencies
- 62. API with NestJS #62. Introduction to MikroORM with PostgreSQL
- 63. API with NestJS #63. Relationships with PostgreSQL and MikroORM
- 64. API with NestJS #64. Transactions with PostgreSQL and MikroORM
- 65. API with NestJS #65. Implementing soft deletes using MikroORM and filters
- 66. API with NestJS #66. Improving PostgreSQL performance with indexes using MikroORM
- 67. API with NestJS #67. Migrating to TypeORM 0.3
- 68. API with NestJS #68. Interacting with the application through REPL
- 69. API with NestJS #69. Database migrations with TypeORM
- 70. API with NestJS #70. Defining dynamic modules
- 71. API with NestJS #71. Introduction to feature flags
- 72. API with NestJS #72. Working with PostgreSQL using raw SQL queries
- 73. API with NestJS #73. One-to-one relationships with raw SQL queries
- 74. API with NestJS #74. Designing many-to-one relationships using raw SQL queries
- 75. API with NestJS #75. Many-to-many relationships using raw SQL queries
- 76. API with NestJS #76. Working with transactions using raw SQL queries
- 77. API with NestJS #77. Offset and keyset pagination with raw SQL queries
- 78. API with NestJS #78. Generating statistics using aggregate functions in raw SQL
- 79. API with NestJS #79. Implementing searching with pattern matching and raw SQL
- 80. API with NestJS #80. Updating entities with PUT and PATCH using raw SQL queries
- 81. API with NestJS #81. Soft deletes with raw SQL queries
- 82. API with NestJS #82. Introduction to indexes with raw SQL queries
- 83. API with NestJS #83. Text search with tsvector and raw SQL
- 84. API with NestJS #84. Implementing filtering using subqueries with raw SQL
- 85. API with NestJS #85. Defining constraints with raw SQL
- 86. API with NestJS #86. Logging with the built-in logger when using raw SQL
- 87. API with NestJS #87. Writing unit tests in a project with raw SQL
- 88. API with NestJS #88. Testing a project with raw SQL using integration tests
- 89. API with NestJS #89. Replacing Express with Fastify
- 90. API with NestJS #90. Using various types of SQL joins
- 91. API with NestJS #91. Dockerizing a NestJS API with Docker Compose
- 92. API with NestJS #92. Increasing the developer experience with Docker Compose
- 93. API with NestJS #93. Deploying a NestJS app with Amazon ECS and RDS
- 94. API with NestJS #94. Deploying multiple instances on AWS with a load balancer
- 95. API with NestJS #95. CI/CD with Amazon ECS and GitHub Actions
- 96. API with NestJS #96. Running unit tests with CI/CD and GitHub Actions
- 97. API with NestJS #97. Introduction to managing logs with Amazon CloudWatch
- 98. API with NestJS #98. Health checks with Terminus and Amazon ECS
- 99. API with NestJS #99. Scaling the number of application instances with Amazon ECS
- 100. API with NestJS #100. The HTTPS protocol with Route 53 and AWS Certificate Manager
- 101. API with NestJS #101. Managing sensitive data using the AWS Secrets Manager
- 102. API with NestJS #102. Writing unit tests with Prisma
- 103. API with NestJS #103. Integration tests with Prisma
- 104. API with NestJS #104. Writing transactions with Prisma
- 105. API with NestJS #105. Implementing soft deletes with Prisma and middleware
- 106. API with NestJS #106. Improving performance through indexes with Prisma
- 107. API with NestJS #107. Offset and keyset pagination with Prisma
- 108. API with NestJS #108. Date and time with Prisma and PostgreSQL
- 109. API with NestJS #109. Arrays with PostgreSQL and Prisma
- 110. API with NestJS #110. Managing JSON data with PostgreSQL and Prisma
- 111. API with NestJS #111. Constraints with PostgreSQL and Prisma
- 112. API with NestJS #112. Serializing the response with Prisma
- 113. API with NestJS #113. Logging with Prisma
- 114. API with NestJS #114. Modifying data using PUT and PATCH methods with Prisma
- 115. API with NestJS #115. Database migrations with Prisma
- 116. API with NestJS #116. REST API versioning
- 117. API with NestJS #117. CORS – Cross-Origin Resource Sharing
- 118. API with NestJS #118. Uploading and streaming videos
- 119. API with NestJS #119. Type-safe SQL queries with Kysely and PostgreSQL
- 120. API with NestJS #120. One-to-one relationships with the Kysely query builder
- 121. API with NestJS #121. Many-to-one relationships with PostgreSQL and Kysely
NestJS subscribes to the idea that a microservice is an application that uses a different transport layer than HTTP. Therefore, in the previous part of this series, we’ve used TCP.
However, with NestJS, we have a broad selection of transporters that transmit messages across microservices. Each one of them is different, and it is worth exploring them before making a choice. Conveniently, Nest provides an abstraction for each transport layer, so it is easy to switch them without making significant changes to our code.
Introduction to RabbitMQ
The RabbitMQ is its fundamentals, a message broker that implements the Advanced Message Queuing Protocol (AMQP).
One of the most important concepts to understand is the producer (also called a publisher), whose job is to send the messages. The second important thing is the consumer that waits to receive messages.
Between the producer and the consumer is a queue. When the producer sends the message, it lands in the queue. The producer sends the messages through the exchange, which is a message routing agent.
We can bind the exchange to many different queues. When sending the message, the publisher can send the message to specified queues.
Finally, the consumer picks up the message from the queue and handles it.
Summing up the above:
- the producer sends a message to the exchange,
- the exchange receives the message and routes it to the desired queue,
- the consumer takes the message from the queue and consumes it.
Advantages of using RabbitMQ with microservices
With a message queue, we can send a message from one service to another without knowing if it can handle it. Therefore, we can separate our microservices more and make them less dependant on each other. CloudAMQP, in its article, also mentions that it helped them a lot in keeping the architecture flexible and fixing bugs. When a bug is found, a faulty microservice can be stopped and restarted after it is resolved. The queue of messages might pile quite a bit, but no data is lost.
Running RabbitMQ with Docker Compose
So far in this series, we’ve been using Docker Compose to work with Postgres and Elasticsearch. We can also easily use it to run an instance of RabbitMQ. Let’s add it to our docker-compose.yml file:
docker-compose.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
version: "3" services: rabbitmq: image: rabbitmq:3-management container_name: rabbitmq hostname: rabbitmq volumes: - /var/lib/rabbitmq ports: - "5672:5672" - "15672:15672" env_file: - ./rabbitmq.env # ... |
Above, you can see that we specify the rabbitmq.env file. We do that to specify the default username and password. Let’s create it and add it to .gitignore along with the .env file.
rabbitmq.env
1 2 |
RABBITMQ_DEFAULT_USER=admin RABBITMQ_DEFAULT_PASS=admin |
.gitignore
1 2 3 |
.env rabbitmq.env # ... |
In our configuration, we can see two ports exposed:
- 5672 is the main RabbitMQ port that our NestJS application will use
- 15672 is the port of the management plugin. It provides an interface that lets us manage and monitor our RabbitMQ instance
Using RabbitMQ with NestJS
In this article, we add RabbitMQ to the microservice built in the previous part of this series. Its job is to store email subscriptions.
Using RabbitMQ with microservices in NestJS is straightforward, thanks to the transporter built into the NestJS framework. We need to install some additional dependencies, though.
1 |
npm install amqplib amqp-connection-manager |
We also need additional environment variables in every part of our system that wants to connect to RabbitMQ.
1 2 3 4 |
RABBITMQ_USER=admin RABBITMQ_PASSWORD=admin RABBITMQ_HOST=localhost:5672 RABBITMQ_QUEUE_NAME=email-subscribers |
Creating a microservice
Let’s start by configuring our microservice to use the above configuration.
main.ts
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
import { NestFactory } from '@nestjs/core'; import { AppModule } from './app.module'; import { MicroserviceOptions, Transport } from '@nestjs/microservices'; import { ConfigService } from '@nestjs/config'; async function bootstrap() { const app = await NestFactory.create(AppModule); const configService = app.get(ConfigService); const user = configService.get('RABBITMQ_USER'); const password = configService.get('RABBITMQ_PASSWORD'); const host = configService.get('RABBITMQ_HOST'); const queueName = configService.get('RABBITMQ_QUEUE_NAME'); await app.connectMicroservice<MicroserviceOptions>({ transport: Transport.RMQ, options: { urls: [`amqp://${user}:${password}@${host}`], queue: queueName, queueOptions: { durable: true, }, }, }); app.startAllMicroservices(); } bootstrap(); |
Queues in RabbitMQ can be durable or transient. The metadata of a durable queue is stored on the disk. If the queue is not durable, it is deleted during boot and would not survive a restart. It would delete not-consumed messages.
According to the RabbitMQ documentation, the performance does not differ in most cases.
After running our application, let’s visit http://localhost:15672. Here, we can see an interface that lets us interact with our RabbitMQ instance. The credentials are the same as we specified in the rabbitmq.env file.
When we visit the Queues tab, we can see our brand new queue that was automatically created.
The code from the previous part of this series would still work, and our controllers and services work the same.
Connecting to the microservice within the client
The other side of the connection is the client. It also needs to use environmental variables and connect to RabbitMQ.
subscribers.module.ts
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
import { Module } from '@nestjs/common'; import SubscribersController from './subscribers.controller'; import { ConfigModule, ConfigService } from '@nestjs/config'; import { ClientProxyFactory, Transport } from '@nestjs/microservices'; @Module({ imports: [ConfigModule], controllers: [SubscribersController], providers: [ { provide: 'SUBSCRIBERS_SERVICE', useFactory: (configService: ConfigService) => { const user = configService.get('RABBITMQ_USER'); const password = configService.get('RABBITMQ_PASSWORD'); const host = configService.get('RABBITMQ_HOST'); const queueName = configService.get('RABBITMQ_QUEUE_NAME'); return ClientProxyFactory.create({ transport: Transport.RMQ, options: { urls: [`amqp://${user}:${password}@${host}`], queue: queueName, queueOptions: { durable: true, }, }, }) }, inject: [ConfigService], } ], }) export class SubscribersModule {} |
Thanks to doing the above, we have a function connection with our microservice.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
import { Body, Controller, Post, UseGuards, UseInterceptors, ClassSerializerInterceptor, Inject, } from '@nestjs/common'; import JwtAuthenticationGuard from '../authentication/jwt-authentication.guard'; import CreateSubscriberDto from './dto/createSubscriber.dto'; import { ClientProxy } from '@nestjs/microservices'; @Controller('subscribers') @UseInterceptors(ClassSerializerInterceptor) export default class SubscribersController { constructor( @Inject('SUBSCRIBERS_SERVICE') private subscribersService: ClientProxy, ) {} @Post() @UseGuards(JwtAuthenticationGuard) async createPost(@Body() subscriber: CreateSubscriberDto) { return this.subscribersService.send({ cmd: 'add-subscriber' }, subscriber) } // ... } |
Replying to messages
When we start using our microservice, we notice that it can send a reply. This might make us wonder how does it happen. On the surface, we have a publisher that simply sends messages to the consumer.
If we take a look under the hood of NestJS, we can notice that it uses the amq.rabbitmq.reply-to queue. The framework sets is as a reply queue in the options passed to the amqplib library we’ve installed before. In the documentation of the Direct Reply-to feature, we can see that amq.rabbitmq.reply-to is a pseudo-queue that we can’t see in the RabbitMQ management interface.
This feature allows us to implement a request/reply pattern where a microservice can effortlessly respond.
Message acknowledgment
The important thing is to ensure that a message never gets lost. Since our messages are not guaranteed to reach our consumers or be successfully processed, we need a confirmation mechanism. Fortunately, RabbitMQ supports consumer acknowledgments and publisher confirms. This topic is covered in detail in the RabbitMQ documentation.
The consumer sends back an acknowledgment, stating that it received and processed the message. If the consumer fails to consume the message fully, RabbitMQ will re-queue it.
By default, NestJS handles acknowledgments automatically. We can do that manually, though. To do that, we need to pass the noAck: false flag when creating a microservice.
1 2 3 4 5 6 7 8 9 10 11 |
await app.connectMicroservice<MicroserviceOptions>({ transport: Transport.RMQ, options: { urls: [`amqp://${user}:${password}@${host}`], queue: queueName, noAck: false, queueOptions: { durable: true, }, }, }); |
To manually acknowledge a message, we need to access the context of the currently processed message.
1 2 3 4 5 6 7 8 9 10 |
@MessagePattern({ cmd: 'add-subscriber' }) async addSubscriber(@Payload() subscriber: CreateSubscriberDto, @Ctx() context: RmqContext) { const newSubscriber = await this.subscribersService.addSubscriber(subscriber); const channel = context.getChannelRef(); const originalMsg = context.getMessage(); channel.ack(originalMsg); return newSubscriber; } |
There is more data available in the context. Let’s check out the RmqContext type:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
export declare class RmqContext extends BaseRpcContext<RmqContextArgs> { constructor(args: RmqContextArgs); /** * Returns the original message (with properties, fields, and content). */ getMessage(): Record<string, any>; /** * Returns the reference to the original RMQ channel. */ getChannelRef(): any; /** * Returns the name of the pattern. */ getPattern(): string; } |
Summary
In this article, we’ve implemented a microservice using RabbitMQ. We’ve gone through all of the basics of establishing communication with RabbitMQ and discussed the advantages of such an approach. We’ve also gone a little deeper and looked into the internals of NestJS to understand how it uses RabbitMQ under the hood a little better. This definitely gave us quite a bit of an overview of why we might want to use RabbitMQ and how.
This is very good stuff. can you please provide the git repo link for the whole code example? This will be very helpful
https://github.com/mwanago/nestjs-typescript
what’s the code for the microservice – rabbitMQ?
Hi, I want to send messages to fanout exchange. If exchange doesn’t exist, need to create. But when I configure exchange, Transport.RMQ not working.
{
provide: ‘EXAMPLE_SERVICE’,
useFactory: (configService: ConfigService) => {
const user = configService.get(‘RABBITMQ_USER’);
const password = configService.get(‘RABBITMQ_PASSWORD’);
const host = configService.get(‘RABBITMQ_HOST’);
const queueName = configService.get(‘RABBITMQ_QUEUE_NAME’);
const exchangeName = configService.get(‘RABBITMQ_EXCHANGE_NAME’);
return ClientProxyFactory.create({
transport: Transport.RMQ,
options: {
urls: [
amqp://${user}:${password}@${host}
],exchange: {
name: exchangeName,
type: ‘fanout’,
echangeOpts: {
durable: true,
},
},
},
});
},
inject: [ConfigService],
}
Thank you so much! Can you provide some more information about how we can send delayed messages over the queue? Or can you give any leads about that? Best!