We learned from the previous two articles. If you want to write Readable data to Writable, you must first manually read the data into memory and then write it to Writable. In other words, every time you pass data, you need to write the following template code
('readable', (err) => { if(err) throw err (()) })
For convenience of use, the pipe() method is provided, allowing us to pass data gracefully
(writable)
Now let's see how it is implemented
pipe
First, you need to call the pipe() method of Readable first.
// lib/_stream_readable.js = function(dest, pipeOpts) { var src = this; var state = this._readableState; // Record Writable switch () { case 0: = dest; break; case 1: = [, dest]; break; default: (dest); break; } += 1; // ... ('end', endFn); ('unpipe', onunpipe); // ... ('drain', ondrain); // ... ('data', ondata); // ... // Ensure that when the error event is triggered, onerror is executed first prependListener(dest, 'error', onerror); // ... ('close', onclose); // ... ('finish', onfinish); // ... // Trigger the pipe event of Writable ('pipe', src); // Change Readable to flow mode if (!) { debug('pipe resume'); (); } return dest; };
When executing the pipe() function, first record the Writable in , then bind the related events, and finally, if the Readable is not the flow mode, call resume() and change the Readable to the flow mode.
Passing data
Readable After obtaining data from the data source, triggers the data event and executes ondata()
ondata() Related code:
// lib/_stream_readable.js // Prevent the repeated increase of awaitDrain, awaitDrain cannot be cleared, and Readable is stuck // See /nodejs/node/issues/7278 for details var increasedAwaitDrain = false; function ondata(chunk) { debug('ondata'); increasedAwaitDrain = false; var ret = (chunk); if (false === ret && !increasedAwaitDrain) { // Prevent the call (dest) in () and cause the awaitDrain to be cleared and the Readable is stuck if ((( === 1 && === dest) || ( > 1 && (dest) !== -1) ) && !cleanedUp) { debug('false write response, pause', src._readableState.awaitDrain); src._readableState.awaitDrain++; increasedAwaitDrain = true; } // Enter pause mode (); } }
In the ondata(chunk) function, write data to Writable via (chunk)
At this time, (chunk) may be called or unpipe inside _write(), which will cause awaitDrain to increase multiple times, and cannot be cleared, and the Readable will be stuck
When data cannot be written to Writable, Readable enters pause mode until all drain events are triggered
Trigger the drain event and execute ondrain()
// lib/_stream_readable.js var ondrain = pipeOnDrain(src); function pipeOnDrain(src) { return function() { var state = src._readableState; debug('pipeOnDrain', ); if () --; // awaitDrain === 0, and there is a data listener if ( === 0 && (src, 'data')) { = true; flow(src); } }; }
When each drain event is fired, awaitDrain is reduced until awaitDrain is 0. At this time, call flow(src) to make Readable enter flow mode
At this point, the entire data transfer loop has been established, and the data will flow into Writable continuously along the loop until all data is written.
unpipe
No matter whether there is an error during the writing process, unpipe() will be executed in the end
// lib/_stream_readable.js // ... function unpipe() { debug('unpipe'); (dest); } // ... = function(dest) { var state = this._readableState; var unpipeInfo = { hasUnpiped: false }; // Nothing if ( === 0) return this; // Only one if ( === 1) { if (dest && dest !== ) return this; // Unpipe all without specified if (!dest) dest = ; = null; = 0; = false; if (dest) ('unpipe', this, unpipeInfo); return this; } // Unpipe all without specified if (!dest) { var dests = ; var len = ; = null; = 0; = false; for (var i = 0; i < len; i++) dests[i].emit('unpipe', this, unpipeInfo); return this; } // Find the specified Writable and unpipe var index = (dest); if (index === -1) return this; (index, 1); -= 1; if ( === 1) = [0]; ('unpipe', this, unpipeInfo); return this; };
The () function will select the execution policy based on the attribute and the dest parameter. The unpipe event of dest will be triggered
After the unpipe event is triggered, call onunpipe() to clean up the relevant data
// lib/_stream_readable.js function onunpipe(readable, unpipeInfo) { debug('onunpipe'); if (readable === src) { if (unpipeInfo && === false) { = true; // Clean up related data cleanup(); } } }
End
During the entire pipe process, Readable is the active party (responsible for the entire pipe process: including data transmission, unpipe and exception handling), and Writable is the passive party (only triggering the drain event)
To summarize the process of pipe:
- First execute (writable) and connect readable with writable
- When there is data in the readable, ('data'), write the data to the writable
- If (chunk) returns false, enter pause mode and wait for the drain event to fire
- After all drain events are fired, enter flow mode again and write data
- No matter whether the data is written or interrupted, unpipe() will be called in the end
- unpipe() calls (), triggering the unpipe event of dest, and cleaning up related data
refer to:
/nodejs/node/blob/master/lib/_stream_readable.js
/nodejs/node/blob/master/lib/_stream_writable.js
The above is all the content of this article. I hope it will be helpful to everyone's study and I hope everyone will support me more.