- 1. API with NestJS #1. Controllers, routing and the module structure
- 2. API with NestJS #2. Setting up a PostgreSQL database with TypeORM
- 3. API with NestJS #3. Authenticating users with bcrypt, Passport, JWT, and cookies
- 4. API with NestJS #4. Error handling and data validation
- 5. API with NestJS #5. Serializing the response with interceptors
- 6. API with NestJS #6. Looking into dependency injection and modules
- 7. API with NestJS #7. Creating relationships with Postgres and TypeORM
- 8. API with NestJS #8. Writing unit tests
- 9. API with NestJS #9. Testing services and controllers with integration tests
- 10. API with NestJS #10. Uploading public files to Amazon S3
- 11. API with NestJS #11. Managing private files with Amazon S3
- 12. API with NestJS #12. Introduction to Elasticsearch
- 13. API with NestJS #13. Implementing refresh tokens using JWT
- 14. API with NestJS #14. Improving performance of our Postgres database with indexes
- 15. API with NestJS #15. Defining transactions with PostgreSQL and TypeORM
- 16. API with NestJS #16. Using the array data type with PostgreSQL and TypeORM
- 17. API with NestJS #17. Offset and keyset pagination with PostgreSQL and TypeORM
- 18. API with NestJS #18. Exploring the idea of microservices
- 19. API with NestJS #19. Using RabbitMQ to communicate with microservices
- 20. API with NestJS #20. Communicating with microservices using the gRPC framework
- 21. API with NestJS #21. An introduction to CQRS
- 22. API with NestJS #22. Storing JSON with PostgreSQL and TypeORM
- 23. API with NestJS #23. Implementing in-memory cache to increase the performance
- 24. API with NestJS #24. Cache with Redis. Running the app in a Node.js cluster
- 25. API with NestJS #25. Sending scheduled emails with cron and Nodemailer
- 26. API with NestJS #26. Real-time chat with WebSockets
- 27. API with NestJS #27. Introduction to GraphQL. Queries, mutations, and authentication
- 28. API with NestJS #28. Dealing in the N + 1 problem in GraphQL
- 29. API with NestJS #29. Real-time updates with GraphQL subscriptions
- 30. API with NestJS #30. Scalar types in GraphQL
- 31. API with NestJS #31. Two-factor authentication
- 32. API with NestJS #32. Introduction to Prisma with PostgreSQL
- 33. API with NestJS #33. Managing PostgreSQL relationships with Prisma
- 34. API with NestJS #34. Handling CPU-intensive tasks with queues
- 35. API with NestJS #35. Using server-side sessions instead of JSON Web Tokens
- 36. API with NestJS #36. Introduction to Stripe with React
- 37. API with NestJS #37. Using Stripe to save credit cards for future use
- 38. API with NestJS #38. Setting up recurring payments via subscriptions with Stripe
- 39. API with NestJS #39. Reacting to Stripe events with webhooks
- 40. API with NestJS #40. Confirming the email address
- 41. API with NestJS #41. Verifying phone numbers and sending SMS messages with Twilio
- 42. API with NestJS #42. Authenticating users with Google
- 43. API with NestJS #43. Introduction to MongoDB
- 44. API with NestJS #44. Implementing relationships with MongoDB
- 45. API with NestJS #45. Virtual properties with MongoDB and Mongoose
- 46. API with NestJS #46. Managing transactions with MongoDB and Mongoose
- 47. API with NestJS #47. Implementing pagination with MongoDB and Mongoose
- 48. API with NestJS #48. Definining indexes with MongoDB and Mongoose
- 49. API with NestJS #49. Updating with PUT and PATCH with MongoDB and Mongoose
- 50. API with NestJS #50. Introduction to logging with the built-in logger and TypeORM
- 51. API with NestJS #51. Health checks with Terminus and Datadog
- 52. API with NestJS #52. Generating documentation with Compodoc and JSDoc
- 53. API with NestJS #53. Implementing soft deletes with PostgreSQL and TypeORM
- 54. API with NestJS #54. Storing files inside a PostgreSQL database
- 55. API with NestJS #55. Uploading files to the server
- 56. API with NestJS #56. Authorization with roles and claims
- 57. API with NestJS #57. Composing classes with the mixin pattern
- 58. API with NestJS #58. Using ETag to implement cache and save bandwidth
- 59. API with NestJS #59. Introduction to a monorepo with Lerna and Yarn workspaces
- 60. API with NestJS #60. The OpenAPI specification and Swagger
- 61. API with NestJS #61. Dealing with circular dependencies
- 62. API with NestJS #62. Introduction to MikroORM with PostgreSQL
- 63. API with NestJS #63. Relationships with PostgreSQL and MikroORM
- 64. API with NestJS #64. Transactions with PostgreSQL and MikroORM
- 65. API with NestJS #65. Implementing soft deletes using MikroORM and filters
- 66. API with NestJS #66. Improving PostgreSQL performance with indexes using MikroORM
- 67. API with NestJS #67. Migrating to TypeORM 0.3
- 68. API with NestJS #68. Interacting with the application through REPL
- 69. API with NestJS #69. Database migrations with TypeORM
- 70. API with NestJS #70. Defining dynamic modules
- 71. API with NestJS #71. Introduction to feature flags
- 72. API with NestJS #72. Working with PostgreSQL using raw SQL queries
- 73. API with NestJS #73. One-to-one relationships with raw SQL queries
- 74. API with NestJS #74. Designing many-to-one relationships using raw SQL queries
- 75. API with NestJS #75. Many-to-many relationships using raw SQL queries
- 76. API with NestJS #76. Working with transactions using raw SQL queries
- 77. API with NestJS #77. Offset and keyset pagination with raw SQL queries
- 78. API with NestJS #78. Generating statistics using aggregate functions in raw SQL
- 79. API with NestJS #79. Implementing searching with pattern matching and raw SQL
- 80. API with NestJS #80. Updating entities with PUT and PATCH using raw SQL queries
- 81. API with NestJS #81. Soft deletes with raw SQL queries
- 82. API with NestJS #82. Introduction to indexes with raw SQL queries
- 83. API with NestJS #83. Text search with tsvector and raw SQL
- 84. API with NestJS #84. Implementing filtering using subqueries with raw SQL
- 85. API with NestJS #85. Defining constraints with raw SQL
- 86. API with NestJS #86. Logging with the built-in logger when using raw SQL
- 87. API with NestJS #87. Writing unit tests in a project with raw SQL
- 88. API with NestJS #88. Testing a project with raw SQL using integration tests
- 89. API with NestJS #89. Replacing Express with Fastify
- 90. API with NestJS #90. Using various types of SQL joins
- 91. API with NestJS #91. Dockerizing a NestJS API with Docker Compose
- 92. API with NestJS #92. Increasing the developer experience with Docker Compose
- 93. API with NestJS #93. Deploying a NestJS app with Amazon ECS and RDS
- 94. API with NestJS #94. Deploying multiple instances on AWS with a load balancer
- 95. API with NestJS #95. CI/CD with Amazon ECS and GitHub Actions
- 96. API with NestJS #96. Running unit tests with CI/CD and GitHub Actions
- 97. API with NestJS #97. Introduction to managing logs with Amazon CloudWatch
- 98. API with NestJS #98. Health checks with Terminus and Amazon ECS
- 99. API with NestJS #99. Scaling the number of application instances with Amazon ECS
- 100. API with NestJS #100. The HTTPS protocol with Route 53 and AWS Certificate Manager
- 101. API with NestJS #101. Managing sensitive data using the AWS Secrets Manager
- 102. API with NestJS #102. Writing unit tests with Prisma
- 103. API with NestJS #103. Integration tests with Prisma
- 104. API with NestJS #104. Writing transactions with Prisma
- 105. API with NestJS #105. Implementing soft deletes with Prisma and middleware
- 106. API with NestJS #106. Improving performance through indexes with Prisma
- 107. API with NestJS #107. Offset and keyset pagination with Prisma
- 108. API with NestJS #108. Date and time with Prisma and PostgreSQL
- 109. API with NestJS #109. Arrays with PostgreSQL and Prisma
- 110. API with NestJS #110. Managing JSON data with PostgreSQL and Prisma
- 111. API with NestJS #111. Constraints with PostgreSQL and Prisma
- 112. API with NestJS #112. Serializing the response with Prisma
- 113. API with NestJS #113. Logging with Prisma
- 114. API with NestJS #114. Modifying data using PUT and PATCH methods with Prisma
- 115. API with NestJS #115. Database migrations with Prisma
- 116. API with NestJS #116. REST API versioning
- 117. API with NestJS #117. CORS – Cross-Origin Resource Sharing
- 118. API with NestJS #118. Uploading and streaming videos
- 119. API with NestJS #119. Type-safe SQL queries with Kysely and PostgreSQL
- 120. API with NestJS #120. One-to-one relationships with the Kysely query builder
- 121. API with NestJS #121. Many-to-one relationships with PostgreSQL and Kysely
Handling CPU-intensive operations with REST API can be tricky. If our endpoint takes too much time to respond, it might result in a timeout. In this article, we look into queues to help us resolve this issue.
Queue proves to be a very useful part of backend architecture. With it, we can implement asynchronous and distributed processing. A queue is a data structure that is modeled on a real-world queue. A publisher can post messages to the queue. A consumer can consume the message and process it. Once the consumer handles the message, no other consumer can process this message.
With NestJS, we have access to the @nestjs/bull package. It wraps the Bull library that provides queue functionalities based on Redis. Redis is a fast and reliable key-value store that keeps data in its memory. Even if we restart our Node.js application, we don’t lose the data saved in Redis.
Setting up Bull and Redis
Since Bull uses Redis to manage queues, we need to set it up. So far, within this series, we’ve used Docker Compose to help us with our architecture. Thankfully, it is straightforward to set up Redis with Docker.
docker-compose.yml
1 2 3 4 5 6 7 |
version: "3" services: redis: image: "redis:alpine" ports: - "6379:6379" # ... |
By default, Redis works on port 6379
Connecting to Redis requires us to define two additional environment variables: the port and the host.
app.module.ts
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import { Module } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; import * as Joi from '@hapi/joi'; @Module({ imports: [ ConfigModule.forRoot({ validationSchema: Joi.object({ REDIS_HOST: Joi.string().required(), REDIS_PORT: Joi.number().required(), // ... }) }), // ... ], controllers: [], providers: [], }) export class AppModule {} |
.env
1 2 3 |
REDIS_HOST=localhost REDIS_PORT=6379 # ... |
We also need to install the necessary dependencies.
1 |
npm install @nestjs/bull @types/bull bull |
Once we’ve got all of the above configured, we can establish a connection with Redis.
app.module.ts
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
import { Module } from '@nestjs/common'; import { ConfigModule, ConfigService } from '@nestjs/config'; import { BullModule } from '@nestjs/bull'; @Module({ imports: [ BullModule.forRootAsync({ imports: [ConfigModule], useFactory: async (configService: ConfigService) => ({ redis: { host: configService.get('REDIS_HOST'), port: Number(configService.get('REDIS_PORT')), }, }), inject: [ConfigService], }), // ... ], controllers: [], providers: [], }) export class AppModule {} |
Thanks to calling BullModule.forRootAsync, we can use Bull across all of our modules.
We can pass more options besides the redis object when configuring Bull. For a whole list check out the documentation.
Managing queues with Bull
Let’s create a queue that can help optimize multiple PNG images for us. We will start with defining a module.
optimize.module.ts
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import { Module } from '@nestjs/common'; import { OptimizeController } from './optimize.controller'; import { BullModule } from '@nestjs/bull'; import { ImageProcessor } from './image.processor'; @Module({ imports: [ BullModule.registerQueue({ name: 'image', }) ], providers: [ImageProcessor], exports: [], controllers: [OptimizeController] }) export class OptimizeModule {} |
Above, we register our queue using BullModule.registerQueue. Thanks to doing so, we can use it in our OptimizeController.
optimize.controller.ts
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
import { Controller, Post, UploadedFiles, UseInterceptors, } from '@nestjs/common'; import { AnyFilesInterceptor } from '@nestjs/platform-express'; import { Express } from 'express'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; @Controller('optimize') export class OptimizeController { constructor( @InjectQueue('image') private readonly imageQueue: Queue, ) {} @Post('image') @UseInterceptors(AnyFilesInterceptor()) async processImage(@UploadedFiles() files: Express.Multer.File[]) { const job = await this.imageQueue.add('optimize', { files }); return { jobId: job.id } } } |
Above, we follow the NestJS documentation on how to upload multiple files with Multer. To do that, we need the AnyFilesInterceptor and the @UploadedFiles() decorator.
Once we have the files, we need to add a job to our queue using the add() method. We pass two arguments to it: the name of the job that we later refer to and the data it needs.
In the above endpoint, we respond with the id of the job. This will allow the user to ask for the return value of the job later.
Consuming the queue
Now we need to define a consumer. With it, we can process jobs added to the queue.
To optimize images, we use the imagemin library. Since we expect the user to upload multiple images, we compress the result to a .zip file using the adm-zip package.
1 |
npm install imagemin @types/imagemin imagemin-pngquant adm-zip @types/adm-zip |
optimize.processor.ts
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
import { Process, Processor } from '@nestjs/bull'; import { Job } from 'bull'; import * as AdmZip from 'adm-zip'; import { buffer } from 'imagemin'; import imageminPngquant from 'imagemin-pngquant'; import { Express } from 'express'; @Processor('image') export class ImageProcessor { @Process('optimize') async handleOptimization(job: Job) { const files: Express.Multer.File[] = job.data.files; const optimizationPromises: Promise<Buffer>[] = files.map(file => { const fileBuffer = Buffer.from(file.buffer); return buffer(fileBuffer, { plugins: [ imageminPngquant({ quality: [0.6, 0.8] }) ] }) }) const optimizedImages = await Promise.all(optimizationPromises); const zip = new AdmZip(); optimizedImages.forEach((image, index) => { const fileData = files[index]; zip.addFile(fileData.originalname, image); }) return zip.toBuffer(); } } |
To make it more verbose, we could update the progress of the job by calling the job.progress(number) method when we finish up optimizing some of the images.
Above, we manipulate buffers. A Node.js buffer represents a fixed-length sequence of bytes. If you want to know more about buffers, check out Node.js TypeScript #3. Explaining the Buffer.
We call the Buffer.from(file.buffer) function, because the file.buffer stopped being an instance of the Buffer class when serialized and put to the Redus store.
Returning the result of the job
The crucial part of our handleOptimization method is the fact that it returns a buffer. Thanks to that, Bull saves our optimized images to Redis, and we can refer to it later.
To do that, let’s create a new endpoint that takes the job’s id as a parameter.
optimize.controller.ts
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
import { Controller, Get, Param, Res, } from '@nestjs/common'; import { Response } from 'express'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; import { Readable } from 'stream'; @Controller('optimize') export class OptimizeController { constructor( @InjectQueue('image') private readonly imageQueue: Queue, ) {} // ... @Get('image/:id') async getJobResult(@Res() response: Response, @Param('id') id: string) { const job = await this.imageQueue.getJob(id); if (!job) { return response.sendStatus(404); } const isCompleted = await job.isCompleted(); if (!isCompleted) { return response.sendStatus(202); } const result = Buffer.from(job.returnvalue); const stream = Readable.from(result); stream.pipe(response); } } |
If we would update the progress of the job in the consumer, we might respond with it if the job is not yet complete.
Above, we use the imageQueue.getJob() method to get the job with a given id. Since we’ve used the @Res() decorator, we put NestJS into the library-specific mode for the getJobResult handler. Because of that, we are responsible for managing the response manually, for example, with the response.sendStatus method.
If the job with the specified id exists but hasn’t yet been completed, we respond with the 202 Accepted status. It indicates that we’ve accepted the request and are processing it, but we haven’t yet completed it.
If the job is completed, we create a readable stream from the buffer and send it to the user.
If you want to know more about streams, check out Node.js TypeScript #4. Paused and flowing modes of a readable stream
If you want to use Postman to download the result, use the “Send and download” button.
Running jobs in separate processes
Our job processors can run in separate processes for better performance.
optimize.module.ts
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
import { Module } from '@nestjs/common'; import { OptimizeController } from './optimize.controller'; import { BullModule } from '@nestjs/bull'; import { join } from 'path'; @Module({ imports: [ BullModule.registerQueue({ name: 'image', processors: [{ name: 'optimize', path: join(__dirname, 'image.processor.js') }], }) ], providers: [], exports: [], controllers: [OptimizeController] }) export class OptimizeModule {} |
Since we execute our image processor in a forked process, the dependency injection isn’t available. If we would need some dependencies, we would need to initialize them.
image.processor.ts
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
import * as AdmZip from 'adm-zip'; import { buffer } from 'imagemin'; import imageminPngquant from 'imagemin-pngquant'; import { Express } from 'express'; import { Job, DoneCallback } from 'bull'; async function imageProcessor(job: Job, doneCallback: DoneCallback) { const files: Express.Multer.File[] = job.data.files; const optimizationPromises: Promise<Buffer>[] = files.map(file => { const fileBuffer = Buffer.from(file.buffer); return buffer(fileBuffer, { plugins: [ imageminPngquant({ quality: [0.6, 0.8] }) ] }) }) const optimizedImages = await Promise.all(optimizationPromises); const zip = new AdmZip(); optimizedImages.forEach((image, index) => { const fileData = files[index]; zip.addFile(fileData.originalname, image); }) doneCallback(null, zip.toBuffer()); } export default imageProcessor; |
If you want to know more about child processes, read Node.js TypeScript #10. Is Node.js single-threaded? Creating child processes
Summary
In this article, we’ve learned the basics of managing queues with NestJS and Bull. To do that, we’ve implemented an example in which we optimize multiple images at once. Thanks to doing that through the queue, we can better manage our resources. We can also avoid timeouts on CPU-intensive tasks and run them in separate processes.
that was very good to know. thanks <3
I am trying to integrate worker_threads with nestjs. I am successful, but unsure how can I perform db operations within the worker since the worker is not a call it cannot be added to module and when it gives me issues when I try to inject my data model in the worker to perform any crud ops. Any suggestions?