SoFunction
Updated on 2025-03-10

What to note when writing streams in use are things to note

The streams in the process are very powerful. They provide support for processing potential large files and abstract data processing and delivery in some scenarios. Because it is so easy to use, in actual combat, we often write some tool functions/libraries based on it, but often due to our negligence in certain characteristics of streaming, the written functions/libraries will not achieve the desired effect in some situations, or some hidden landmines will be buried. This article will provide two tips that you think are used when writing stream-based tools.

1. Beware of EVENTEMITTER memory leaks

In a function that may be called multiple times, if necessary, add an event listener to the stream to perform certain operations. Then you need to be alert to memory leaks caused by adding listeners:

'use strict';
const fs = require('fs');
const co = require('co');

function getSomeDataFromStream (stream) {
 let data = ();
 if (data) return (data);

 if (!) return (null);

 return new Promise((resolve, reject) => {
  ('readable', () => resolve(()));
  ('error', reject);
  ('end', resolve);
 })
}

let stream = ('/Path/to/a/big/file');

co(function *() {
 let chunk;
 while ((chunk = yield getSomeDataFromStream(stream)) !== null) {
  (chunk);
 }
}).catch();

In the above code, the getSomeDataFromStream function will complete this promise when the stream reports an error event and an end event by listening to the error event. However, when executing the code, we will soon see the alarm message in the console: (node) warning: possible EventEmitter memory leak detected. 11 error listeners added. Use () to increase limit. Each time we call the function, we add an additional error event listener and an end event listener to the incoming stream. To avoid this potential memory leak, we must ensure that after each function is executed, all additional listeners added to this call are cleared and the function is kept polluted:

function getSomeDataFromStream (stream) {
 let data = ();
 if (data) return (data);

 if (!) return (null);

 return new Promise((resolve, reject) => {
  ('readable', onData);
  ('error', onError);
  ('end', done);

  function onData () {
   done();
   resolve(());
  }

  function onError (err) {
   done();
   reject(err);
  }

  function done () {
   ('readable', onData);
   ('error', onError);
   ('end', done);
  }
 })
}

2. Ensure that the callback of the tool function is called after processing the data.

Tool functions often provide a callback function parameter to the outside. After all data in the stream is processed, it is triggered with the specified value. The usual practice is to hang the callback function in the end event of the stream. However, if the processing function is a time-consuming asynchronous operation, the callback function may be called before all data is processed:

'use strict';
const fs = require('fs');

let stream = ('/Path/to/a/big/file');

function processSomeData (stream, callback) {
 ('data', (data) => {
  // Some asynchronous and time-consuming operations on the data  setTimeout(() => (data), 2000);
 });

 ('end', () => {
  // ...
  callback()
 })
}

processSomeData(stream, () => ('end'));

The above code callback may be called when the data is not processed all, because the triggering time of the end event of the stream is only when the data in the stream is read. So we need to additionally check whether the data has been processed:

function processSomeData (stream, callback) {
 let count = 0;
 let finished = 0;
 let isEnd = false;

 ('data', (data) => {
  count++;
  // Some asynchronous and time-consuming operations on the data  setTimeout(() => {
   (data);
   finished++;
   check();
  }, 2000);
 });

 ('end', () => {
  isEnd = true;
  // ...
  check();
 })

 function check () {
  if (count === finished && isEnd) callback()
 }
}

In this way, the callback will be fired after all data is processed.