- 1. Node.js TypeScript #1. Modules, process arguments, basics of the File System
- 2. Node.js TypeScript #2. The synchronous nature of the EventEmitter
- 3. Node.js TypeScript #3. Explaining the Buffer
- 4. Node.js TypeScript #4. Paused and flowing modes of a readable stream
- 5. Node.js TypeScript #5. Writable streams, pipes, and the process streams
- 6. Node.js TypeScript #6. Sending HTTP requests, understanding multipart/form-data
- 7. Node.js TypeScript #7. Creating a server and receiving requests
- 8. Node.js TypeScript #8. Implementing HTTPS with our own OpenSSL certificate
- 9. Node.js TypeScript #9. The Event Loop in Node.js
- 10. Node.js TypeScript #10. Is Node.js single-threaded? Creating child processes
- 11. Node.js TypeScript #11. Harnessing the power of many processes using a cluster
- 12. Node.js TypeScript #15. Benefits of the HTTP/2 protocol
- 13. Node.js TypeScript #12. Introduction to Worker Threads with TypeScript
- 14. Node.js TypeScript #13. Sending data between Worker Threads
- 15. Node.js TypeScript #14. Measuring processes & worker threads with Performance Hooks
Streams are present in programming for a long time now and it looks like they are going to stay. In Node.js they also play a big role and with them, we can deal with reading/writing data in an efficient way. You can encounter streams, for example, when working with files or dealing with HTTP requests. In this article, we cover the first of a few types of streams: the readable stream.
Node.js TypeScript Readable Streams
Streams are a way to deal with collections of data that might not be available all at once. Thanks to that, it does not have to fit in the memory, which makes it efficient when working with large amounts of data. Aside from that, you can start processing data as soon as you have just a part of it, instead of waiting until the whole data is available.
In one of our previous examples we read a file:
1 2 3 4 5 6 7 8 9 10 |
import * as fs from 'fs'; import * as util from 'util' const readFile = util.promisify(fs.readFile); readFile('./file.txt', { encoding: 'utf8' }) .then((content) => { console.log(content); }) .catch(error => console.log(error)); |
The above is not very efficient, because this solution waits for the whole file to load into memory before performing any actions. A way to improve on that is to create a readable stream using fs.createReadableStream.
Every stream is an instance of EventEmitter that we’ve covered in the second part of this series. Thanks to that, we can listen to any data coming in, using the EventEmitter API:
1 2 3 4 5 6 7 |
import * as fs from 'fs'; const stream = fs.createReadStream('./file.txt'); stream.on('data', (chunk) => { console.log('New chunk of data:', chunk); }) |
This way, the file gets split into multiple chunks. The stream emits the ‘data’ event every time the stream emits a chunk of data.
12 New chunk of data: <Buffer 4c 6f 72 65 6d 20 69 70 73 75 ... >(...)
As you can see, every chunk is an instance of a Buffer. The bigger the file, the more chunks we receive.
To stringify the buffer we have a few options. We can use the toString or the StringDecoder directly on the buffers just like in the previous part of the series where we cover the Buffer.
Aside from that, we can also specify the encoding in the arguments of the createReadStream function.
1 2 3 4 5 6 7 8 9 10 11 12 |
import * as fs from 'fs'; const stream = fs.createReadStream( './file.txt', { encoding: 'utf-8' } ); stream.on('data', (chunk) => { console.log('New chunk of data:', chunk); }) |
12 New chunk of data: Lorem ipsum dolor sit amet, consectetur adipiscing elit.(...)
Why is the ‘data’ event emitted? Modes of a readable stream
In the example above, steam starts emitting chunks of data because we attach a listener callback to the ‘data’ event. If we attach a callback after some time after creating a stream, we still get the whole data.
1 2 3 4 5 6 7 8 9 |
import * as fs from 'fs'; const stream = fs.createReadStream('./file.txt'); setTimeout(() => { stream.on('data', (chunk) => { console.log(chunk); }) }, 2000); |
12 New chunk of data: <Buffer 4c 6f 72 65 6d 20 69 70 73 75 ... >(...)
To understand it better, we need to take a look at the modes of a readable stream. The readable stream can be in two modes:
- paused
- flowing
All readable streams start in the paused mode by default. One of the ways of switching the mode of a stream to flowing is to attach a ‘data‘ event listener.
A way to switch the readable stream to a flowing mode manually is to call the stream.resume method.
1 2 3 4 5 6 7 8 9 10 11 |
import * as fs from 'fs'; const stream = fs.createReadStream('./small.txt'); stream.resume(); setTimeout(() => {m. To do so, stream.on('data', (data) => { console.log(data) }) }, 2000); |
Here we switch the stream into the flowing mode two seconds before we start listening for data. If we turn a readable stream into a flowing mode without handlers ready to consume the incoming chunks, the data is lost and so it happens in the example above.
Readable stream under the hood
After getting familiar with the readable stream using the fs.createReadableStream, let’s create our readable stream to illustrate better how it works.
1 2 3 4 5 6 7 8 9 10 11 |
import { Readable } from 'stream'; const stream = new Readable(); stream.push('Hello'); stream.push('World!'); stream.push(null); stream.on('data', (chunk) => { console.log(chunk.toString()); }); |
12 HelloWorld!
The push method causes the data to be added to an internal queue, that can be consumed by users. Passing null signals that the stream is done outputting data. A thing to notice is that, in the example above, we push data before attaching the ‘data‘. We receive the chunks with the ‘data‘ event listener because when we first create the stream, it is in a paused mode. Thanks to that, we don’t lose the data.
read function and the ‘readable’ event
The read function pulls data from the internal queue of a stream. It is called automatically when a readable stream is in a flowing mode, until the internal queue is empty. We can observe it by attaching a console.log to the read function:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
import { Readable } from 'stream'; const stream = new Readable(); const read = stream.read.bind(stream); stream.read = function() { console.log('read() called'); return read(); } stream.push('Hello'); stream.push('World!'); stream.push(null); stream.on('data', (chunk) => { console.log(chunk); }); |
When we run it, we can see that the read function is called multiple times, when we start the stream.
12345678 read() calledHelloread() calledWorld!read() calledread() calledread() calledread() called
We can also call it on a readable stream that is in a paused mode. To do so, we first need to wait for the stream to emnit a ‘readable‘ event, indicating that data is available to be read.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
import { Readable } from 'stream'; const stream = new Readable(); stream.push('Hello'); stream.push('World!'); stream.push(null); stream.on('readable', () => { let data; while (null !== (data = stream.read())) { console.log('Received:', data.toString()); } }); |
The stream emits the ‘readable‘ event when it finishes, just before the ‘end‘ event.
Summary
In this article, we covered what are streams and how can we use them. While in this part of the series we focus on the readable streams, in the upcoming parts we cover writeable streams, pipes and more, so stay tuned!