SoFunction
Updated on 2025-03-03

Brief discussion: Understanding stream

Stream is an abstract interface, based on EventEmitter, and is also a high-level package of Buffer used to process streaming data. Stream modules provide various APIs so that we can use Stream very simply.

There are four types of streams, as shown below:

  • Readable, readable stream
  • Writable, writeable stream
  • Duplex, read and write stream
  • Transform, an extended Duplex, can modify written data

1. Readable readable stream

By creating a readable stream, it has two modes: pause and flow.

In flow mode, data will be automatically read from the downstream system and output with data event; in pause mode, the call must be displayed()Method reads data and triggers data event.

All readable streams are initially in pause mode, and you can switch to flow mode by:

  • Listen to 'data' events
  • Call()method
  • Call()Method outputs data to a writable stream Writable

Similarly, you can switch to pause mode, there are two ways:

  • If the pipe target is not set, call()Just the method.
  • If the pipe target is set, you need to remove all data listeners and calls()method

There is a Readable object_readableSateThe object can be used to know what mode the stream is currently in, as shown below:

  • readable._readableState.flowing = null, no data consumer, flow does not generate data
  • readable._readableState.flowing = true, in flow mode
  • readable._readableState.flowing = false, in pause mode

Why use stream to get data

For small files, use()It is more convenient to read data, but when you need to read large files, such as files of several G sizes, using this method will consume a lot of memory and even crash the program. In this case, it is more appropriate to use streams to process. Using segmented reading will not cause memory 'bursting' problems.

data events

When the stream provides data blocks to the consumer, it may be when switching to the flow mode or it may be called()When the method has valid data blocks, use the following:

const fs = require('fs');

const rs = ('./');
var chunkArr = [],
  chunkLen = 0;
('data',(chunk)=>{
  (chunk);
  chunkLen+=;
});
('end',(chunk)=>{
  ((chunkArr,chunkLen).toString());
});

readable event

Triggered when available data in the stream can be read, it is divided into two types: new available data and reaching the end of the stream. The former()The method returns available data, which returns null as follows:

const rs = ('./');
var chunkArr = [],
  chunkLen = 0;

('readable',()=>{
  var chunk = null;
  //It is necessary to determine whether it has reached the end of the stream  if((chunk = ()) !== null){
    (chunk);
    chunkLen+=;
  }
});
('end',(chunk)=>{
  ((chunkArr,chunkLen).toString());
});

pause and resume methods

()Method allows the stream to enter pause mode and stops the 'data' event triggering,()The method causes the stream to enter flow mode and restores the 'data' event trigger, which can also be used to consume all data, as shown below:

const rs = ('./Download.png');
('data',(chunk)=>{
  (`Received${}Byte data...`);
  ();
  (`Data reception will be suspended1.5Second.`);
  setTimeout(()=>{
    ();
  },1000);
});
('end',(chunk)=>{
  (`Data received`);
});

pipe(destination[, options]) method

pipe()The method binds a writable stream to a readable stream and automatically switches to flow mode, outputs all data to a writable stream, and manages the data stream so that data loss will not occur. Use as follows:

const rs = ('./');
();

The above introduces the data consumption methods of multiple readable streams, but for a readable stream, it is best to choose only one of them. It is recommended to use it.pipe()method.

2. Writable Writable Stream

All writable streams are based onThe class is created, and after creation, the data can be written to the stream.

write(chunk[, encoding][, callback]) method

write()Methods write data into the writable stream, the parameters are:

  • chunk, string or buffer
  • encoding, if chunk is a string, it is the encoding of chunk
  • callback, callback function when the current chunk data is written to disk

The return value of this method is a Boolean value. If false, it means that the data block to be written is cached and the cache size exceeds the highWaterMark threshold, otherwise it is true.

Use as follows:

const ws = ('./');
('nihao','utf8',()=>{('this chunk is flushed.');});
('done.')

Backpressure mechanism

If the write speed of the writeable stream cannot keep up with the readable stream's reading speed, the data added by the write method will be cached, gradually increasing, resulting in a large amount of memory. What we want is to consume one data and read another data so that the memory will be maintained at a level. How to do this? You can use the return value of the write method to judge the cache status and 'drain' event of the write stream, and switch the mode of the readable stream in time, as shown below:

function copy(src,dest){
  src = (src);
  dest = (dest);
  const rs = (src);
  const ws = (dest);
  ('Copying...');
  const stime = +new Date();
  ('data',(chunk)=>{
    if(null === (chunk)){
      ();
    }
  });
  ('drain',()=>{
    ();
  });
  ('end',()=>{
    const etime = +new Date();
    (`Completed,Time to use:${(etime-stime)/1000}Second`);
    ();
  });
  function calcProgress(){
    
  }
}
copy('./CSS Authoritative Guide Edition 3.pdf','./');

drain event

if()If the method returns false, the drain event will be triggered, and the above backpressure mechanism has used this event.

finish event

Calling()After the method and all cache data are written to the downstream system, the event will be triggered, as shown below:

const ws = ('./');
const alphabetStr = 'abcdefghijklmnopqrstuvwxyz';
('finish',()=>{
  ('done.');
});
for(let letter of ()){
  (letter);
}
();//Must call

end([chunk][, encoding][, callback]) method

end()After the method is called, it cannot be called again()The method writes data and is responsible for throwing an error.

3. Duplex read and write stream

Duplex streams simultaneously implement the interface between Readable and Writable classes, both readable and writable streams. For example, 'zlib streams', 'crypto streams', 'TCP sockets', etc. are all Duplex streams.

4. Transform flow

The difference between the Duplex stream is that the Transform stream automatically transforms the data from the write end to the readable end. For example: 'zlib streams', 'crypto streams', etc. are all Transform streams.

5. Implementation of four streams

streamThe API provided by the module allows us to implement streams very simply, and this module usesrequire('stream')Reference, we only need to inherit one of the four streams(, , , or ), then implement its interface. The interfaces that need to be implemented are as follows:

| Use-case | Class | Method(s) to implement |
 | ------------- |-------------| -----|
 | Reading only | Readable | _read |
 | Writing only | Writable | _write, _writev |
 | Reading and writing | Duplex | _read, _write, _writev |
 | Operate on written data, then read the result | Transform | _transform, _flush |

Readable stream implementation

As shown above, we just need to inherit the Readable class and implement the _read interface, as shown below:

const Readable = require('stream').Readable;
const util = require('util');
const alphabetArr = 'abcdefghijklmnopqrstuvwxyz'.split();
/*function AbReadable(){
  if(!this instanceof AbReadable){
    return new AbReadable();
  }
  (this);
}
(AbReadable,Readable);
._read = function(){
  if(!){
    (null);
  }else{
    (());
  }
};

const abReadable = new AbReadable();
();*/

/*class AbReadable extends Readable{
  constructor(){
    super();
  }
  _read(){
    if(!){
      (null);
    }else{
      (());
    }
  }
}
const abReadable = new AbReadable();
();*/

/*const abReadable = new Readable({
  read(){
    if(!){
      (null);
    }else{
      (());
    }
  }
});
();*/

const abReadable = Readable();
abReadable._read = function(){
  if (!) {
    (null);
  } else {
    (());
  }
}
();

The above code uses four methods to create a Readable readable stream, which must be implemented_read()Methods and use()Method, the function of this method is to add the specified data to the read queue.

Writable stream implementation

We just need to inherit the Writable class and implement the _write or _writev interface as shown below (only two methods are used):

/*class MyWritable extends Writable{
  constructor(){
    super();
  }
  _write(chunk,encoding,callback){
    (chunk);
    callback();
  }
}
const myWritable = new MyWritable();*/
const myWritable = new Writable({
  write(chunk,encoding,callback){
    (chunk);
    callback();
  }
});
('finish',()=>{
  ('done');
})
('a');
('b');
('c');
();

Duplex stream implementation

To implement Duplex flow, you need to inherit the Duplex class and implement the _read and _write interfaces, as shown below:

class MyDuplex extends Duplex{
  constructor(){
    super();
     = [];
  }
  _read(){
    if (!) {
      (null);
    } else {
      (());
    }
  }
  _write(chunk,encoding,cb){
    (chunk);
    cb();
  }
}

const myDuplex = new MyDuplex();
('finish',()=>{
  ('write done.')
});
('end',()=>{
  ('read done.')
});
('\na\n');
('c\n');
('b\n');
();

The above code has been implemented_read()The method can be used as a readable stream and implements_write()Methods can also be used as writable streams.

Transform flow implementation

To implement Transform flow, you need to inherit the Transform class and implement the _transform interface as shown below:

class MyTransform extends Transform{
  constructor(){
    super();
  }
  _transform(chunk, encoding, callback){
    chunk = (chunk+'').toUpperCase();
    callback(null,chunk);
  }
}
const myTransform = new MyTransform();
('hello world!');
();
();

In the above code_transform()The first parameter of the method is either error or null, and the second parameter will be automatically forwarded to()Method, so this method can also be written as follows:

_transform(chunk, encoding, callback){
  chunk = (chunk+'').toUpperCase()
  (chunk)
  callback();
}

Object Mode stream implementation

We know that the data in the stream is of Buffer type by default. The data of the readable stream enters the stream and is converted into a buffer, and then consumed. When the writeable stream writes the data, the underlying call will also convert it into a buffer. But setting the objectMode selection of the constructor to true can produce the same data, as shown below:

const rs = Readable();
('a');
('b');
(null);
('data',(chunk)=&gt;{(chunk);});//<Buffer 61> and <Buffer 62>
const rs1 = Readable({objectMode:!0});
('a');
('b');
(null);
('data',(chunk)=&gt;{(chunk);});//a and b

The following is to implement a simple CSS compression tool using Transform streams, as shown below:

function minify(src,dest){
  const transform = new Transform({
    transform(chunk,encoding,cb){
      cb(null,(()).replace(/[\s\r\n\t]/g,''));
    }
  });
  (src,{encoding:'utf8'}).pipe(transform).pipe((dest));
}
minify('./','./');

The above is all the content of this article. I hope it will be helpful to everyone's study and I hope everyone will support me more.