什么是流?如何理解流?下面本篇文章就來帶大家深入了解一下Node中的流(Stream),希望對大家有所幫助!

作者最近在開發中經常使用 pipe 函數,只知道這是流的管道,卻不知道他是如何工作的,所以抱著一探究竟的心理干脆就從流開始學起,隨便將看過的知識和源碼整理成一篇文章分享給大家。
流(Stream)在 Nodejs 中是個十分基礎的概念,很多基礎模塊都是基于流實現的,扮演著十分重要的角色。同時流也是是一個十分難以理解的概念,這主要是相關的文檔比較缺少,對于 NodeJs 初學者來說,理解流往往需要花很多時間理解,才能真正掌握這個概念,所幸的是,對于大部分 NodeJs 使用者來說,僅僅是用來開發 Web 應用,對流的不充分認識并不影響使用。但是,理解流能夠對 NodeJs 中的其他模塊有更好的理解,同時在某些情況下,使用流來處理數據會有更好的效果。【相關教程推薦:nodejs視頻教程】
如何理解流
-
對于流的使用者來說,可以將流看作一個數組,我們只需要關注從中獲取(消費)和寫入(生產)就可以了。
-
對于流的開發者(使用stream模塊創建一個新實例),關注的是如何實現流中的一些方法,通常關注兩點,目標資源是誰和如何操作目標資源,確定了后就需要根據流的不同狀態和事件來對目標資源進行操作
緩存池
NodeJs 中所有的流都有緩沖池,緩沖池存在的目的是增加流的效率,當數據的生產和消費都需要時間時,我們可以在下一次消費前提前生產數據存放到緩沖池。但是緩沖池并不是時刻都處于使用狀態,例如緩存池為空時,數據生產后就不會放入緩存池而是直接消費。 。
如果數據生產的速度大于數據的消費速度,多余的數據會在某個地方等待。如果數據的生產速度小于進程數據的消費速度,那么數據會在某個地方累計到一定的數量,然后在進行消費。(開發者無法控制數據的生產和消費速度,只能盡量在何時的時機生產數據或者消費數據)
那個數據等待,累計數據,然后發生出去的地方。就是緩沖池。緩沖池通常位于電腦的RAM(內存)中。
舉一個常見的緩沖區的例子,我們在觀看在線視頻的時候,如果你的網速很快,緩沖區總是會被立即填充,然后發送給系統播放,然后立即緩沖下一段視頻。觀看的過程中,不會有卡頓。如果網速很慢,則會看到loading,表示緩沖區正在被填充,當填充完成后數據被發送給系統,才能看到這段視頻。
NodeJs 流的緩存池是一個 Buffer 鏈表,每一次想緩存池中加入數據都會重新創建一個 Buffer 節點插入到鏈表尾部。
EventEmitter
NodeJs 中對 Stream 是一個實現了 EventEmitter 的抽象接口,所以我會先簡單的介紹一下 EventEmitter。
EventEmitter 是一個實現事件發布訂閱功能的類,其中常用的幾個方法(on, once, off, emit)相信大家都耳熟能詳了,就不一一介紹了。
const { EventEmitter } = require('events') const eventEmitter = new EventEmitter() // 為 eventA 事件綁定處理函數 eventEmitter.on('eventA', () => { console.log('eventA active 1'); }); // 為 eventB 事件綁定處理函數 eventEmitter.on('eventB', () => { console.log('eventB active 1'); }); eventEmitter.once('eventA', () => { console.log('eventA active 2'); }); // 觸發 eventA eventEmitter.emit('eventA') // eventA active 1 // eventA active 2
值得注意的是, EventEmitter 有兩個叫做 newListener 和 removeListener 的事件,當你向一個事件對象中添加任何事件監聽函數后,都會觸發 newListener(eventEmitter.emit('newListener')),當一個處理函數被移除時同理會觸發 removeListener。
還需要注意的是, once 綁定的處理函數只會執行一次,removeListener 將在其執行前被觸發,這意味著 once 綁定的監聽函數是先被移除才被觸發的。
const { EventEmitter } = require('events') const eventEmitter = new EventEmitter() eventEmitter.on('newListener', (event, listener)=>{ console.log('newListener', event, listener) }) eventEmitter.on('removeListener', (event, listener) => { console.log('removeListener', event, listener) }) //newListener removeListener[Function(anonymous)] eventEmitter.on('eventA', () => { console.log('eventA active 1'); }); //newListener eventA [Function (anonymous)] function listenerB() { console.log('eventB active 1'); } eventEmitter.on('eventB', listenerB); // newListener eventB [Function (anonymous)] eventEmitter.once('eventA', () => { console.log('eventA active 2'); }); // newListener eventA [Function (anonymous)] eventEmitter.emit('eventA') // eventA active 1 // removeListener eventA [Function: bound onceWrapper] { listener: [Function (anonymous)] } // eventA active 2 eventEmitter.off('eventB', listenerB) // removeListener eventB[Function: listenerB]
不過這對于我們后面的內容來說并不重要。
Stream
Stream 是在 Node.js 中處理流數據的抽象接口。Stream 并不是一個實際的接口,而是對所有流的一種統稱。實際的接口有 ReadableStream、 WritableStream、ReadWriteStream 這幾個。
interface ReadableStream extends EventEmitter { readable: boolean; read(size?: number): string | Buffer; setEncoding(encoding: BufferEncoding): this; pause(): this; resume(): this; isPaused(): boolean; pipe<T extends WritableStream>(destination: T, options?: { end?: boolean | undefined; }): T; unpipe(destination?: WritableStream): this; unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void; wrap(oldStream: ReadableStream): this; [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>; } interface WritableStream extends EventEmitter { writable: boolean; write(buffer: Uint8Array | string, cb?: (err?: Error | null) => void): boolean; write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean; end(cb?: () => void): this; end(data: string | Uint8Array, cb?: () => void): this; end(str: string, encoding?: BufferEncoding, cb?: () => void): this; } interface ReadWriteStream extends ReadableStream, WritableStream { }
可以看出 ReadableStream 和 WritableStream 都是繼承 EventEmitter 類的接口(ts中接口是可以繼承類的,因為他們只是在進行類型的合并)。
上面這些接口對應的實現類分別是 Readable、Writable 和 Duplex
NodeJs中的流有4種:
- Readable 可讀流(實現ReadableStream)
- Writable 可寫流(實現WritableStream)
- Duplex 可讀可寫流(繼承Readable后實現WritableStream)
- Transform 轉換流(繼承Duplex)
背壓問題
磁盤寫入數據的速度是遠低于內存的,我們想象內存和磁盤之間有一個“管道”,“管道”中是“流”,內存的數據流入管道是非常快的,當管道塞滿時,內存中就會產生數據背壓,數據積壓在內存中,占用資源。

NodeJs Stream 的解決辦法是為每一個流的 緩存池(就是圖中寫入隊列)設置一個浮標值,當其中數據量達到這個浮標值后,往緩存池再次 push 數據時就會返回 false,表示當前流中緩存池內容已經達到浮標值,不希望再有數據寫入了,這時我們應該立即停止數據的生產,防止緩存池過大產生背壓。
Readable
可讀流(Readable)是流的一種類型,他有兩種模式三種狀態
兩種讀取模式:
-
流動模式:數據會從底層系統讀取寫入到緩沖區,當緩沖區被寫滿后自動通過 EventEmitter 盡快的將數據傳遞給所注冊的事件處理程序中
-
暫停模式:在這種模式下將不會主動觸發 EventEmitter 傳輸數據,必須顯示的調用
Readable.read()方法來從緩沖區中讀取數據,read 會觸發響應到 EventEmitter 事件。
三種狀態:
-
readableFlowing === null(初始狀態)
-
readableFlowing === false(暫停模式)
-
readableFlowing === true(流動模式)
初始時流的 readable.readableFlowing 為 null
添加data事件后變為 true 。調用 pause()、unpipe()、或接收到背壓或者添加 readable 事件,則 readableFlowing 會被設為 false ,在這個狀態下,為 data 事件綁定監聽器不會使 readableFlowing 切換到 true。
調用 resume() 可以讓可讀流的 readableFlowing 切換到 true
移除所有的 readable 事件是使 readableFlowing 變為 null 的唯一方法。
| 事件名 | 說明 |
|---|---|
| readable | 當緩沖區有新的可讀取數據時觸發(每一個想緩存池插入節點都會觸發) |
| data | 每一次消費數據后都會觸發,參數是本次消費的數據 |
| close | 流關閉時觸發 |
| error | 流發生錯誤時觸發 |
| 方法名 | 說明 |
|---|---|
| read(size) | 消費長度為size的數據,返回null表示當前數據不足size,否則返回本次消費的數據。size不傳遞時表示消費緩存池中所有數據 |
const fs = require('fs'); const readStreams = fs.createReadStream('./EventEmitter.js', { highWaterMark: 100// 緩存池浮標值 }) readStreams.on('readable', () => { console.log('緩沖區滿了') readStreams.read()// 消費緩存池的所有數據,返回結果并且觸發data事件 }) readStreams.on('data', (data) => { console.log('data') })
https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527
當 size 為 0 會觸發 readable 事件。
當緩存池中的數據長度達到浮標值 highWaterMark 后,就不會在主動請求生產數據,而是等待數據被消費后在生產數據
暫停狀態的流如果不調用 read 來消費數據時,后續也不會在觸發 data 和 readable,當調用 read 消費時會先判斷本次消費后剩余的數據長度是否低于 浮標值,如果低于 浮標值 就會在消費前請求生產數據。這樣在 read 后的邏輯執行完成后新的數據大概率也已經生產完成,然后再次觸發 readable,這種提前生產下一次消費的數據存放在緩存池的機制也是緩存流為什么快的原因
流動狀態下的流有兩種情況
- 生產速度慢于消費速度時:這種情況下每一個生產數據后一般緩存池中都不會有剩余數據,直接將本次生產的數據傳遞給 data 事件即可(因為沒有進入緩存池,所以也不用調用 read 來消費),然后立即開始生產新數據,待上一次數據消費完后新數據才生產好,再次觸發 data ,一只到流結束。
- 生產速度快于消費速度時:此時每一次生產完數據后一般緩存池都還存在未消費的數據,這種情況一般會在消費數據時開始生產下一次消費的數據,待舊數據消費完后新數據已經生產完并且放入緩存池
他們的區別僅僅在于數據生產后緩存池是否還存在數據,如果存在數據則將生產的數據 push 到緩存池等待消費,如果不存在則直接將數據交給 data 而不加入緩存池。
值得注意的是當一個緩存池中存在數據的流從暫停模式進入的流動模式時,會先循環調用 read 來消費數據只到返回 null
暫停模式

暫停模式下,一個可讀流讀創建時,模式是暫停模式,創建后會自動調用 _read 方法,把數據從數據源 push 到緩沖池中,直到緩沖池中的數據達到了浮標值。每當數據到達浮標值時,可讀流會觸發一個 " readable " 事件,告訴消費者有數據已經準備好了,可以繼續消費。
一般來說, 'readable' 事件表明流有新的動態:要么有新的數據,要么到達流的盡頭。所以,數據源的數據被讀完前,也會觸發一次 'readable' 事件;
消費者 " readable " 事件的處理函數中,通過 stream.read(size) 主動消費緩沖池中的數據。
const { Readable } = require('stream') let count = 1000 const myReadable = new Readable({ highWaterMark: 300, // 參數的 read 方法會作為流的 _read 方法,用于獲取源數據 read(size) { // 假設我們的源數據上 1000 個1 let chunk = null // 讀取數據的過程一般是異步的,例如IO操作 setTimeout(() => { if (count > 0) { let chunkLength = Math.min(count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) } }) // 每一次成功 push 數據到緩存池后都會觸發 readable myReadable.on('readable', () => { const chunk = myReadable.read()//消費當前緩存池中所有數據 console.log(chunk.toString()) })
值得注意的是, 如果 read(size) 的 size 大于浮標值,會重新計算新的浮標值,新浮標值是size的下一個二次冪(size <= 2^n,n取最小值)
// hwm 不會大于 1GB. const MAX_HWM = 0x40000000; function computeNewHighWaterMark(n) { if (n >= MAX_HWM) { // 1GB限制 n = MAX_HWM; } else { //取下一個2最高冪,以防止過度增加hwm n--; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n++; } return n; }
流動模式

所有可讀流開始的時候都是暫停模式,可以通過以下方法可以切換至流動模式:
- 添加 "
data" 事件句柄; - 調用 “
resume”方法; - 使用 "
pipe" 方法把數據發送到可寫流
流動模式下,緩沖池里面的數據會自動輸出到消費端進行消費,同時,每次輸出數據后,會自動回調 _read 方法,把數據源的數據放到緩沖池中,如果此時緩存池中不存在數據則會直接吧數據傳遞給 data 事件,不會經過緩存池;直到流動模式切換至其他暫停模式,或者數據源的數據被讀取完了( push(null) );
可讀流可以通過以下方式切換回暫停模式:
- 如果沒有管道目標,則調用
stream.pause()。 - 如果有管道目標,則移除所有管道目標。調用
stream.unpipe()可以移除多個管道目標。
const { Readable } = require('stream') let count = 1000 const myReadable = new Readable({ highWaterMark: 300, read(size) { let chunk = null setTimeout(() => { if (count > 0) { let chunkLength = Math.min(count, size) chunk = '1'.repeat(chunkLength) count -= chunkLength } this.push(chunk) }, 500) } }) myReadable.on('data', data => { console.log(data.toString()) })
Writable
相對可讀流來說,可寫流要簡單一些。

當生產者調用 write(chunk) 時,內部會根據一些狀態(corked,writing等)選擇是否緩存到緩沖隊列中或者調用 _write,每次寫完數據后,會嘗試清空緩存隊列中的數據。如果緩沖隊列中的數據大小超出了浮標值(highWaterMark),消費者調用 write(chunk) 后會返回 false,這時候生產者應該停止繼續寫入。
那么什么時候可以繼續寫入呢?當緩沖中的數據都被成功 _write 之后,清空了緩沖隊列后會觸發 drain 事件,這時候生產者可以繼續寫入數據。
當生產者需要結束寫入數據時,需要調用 stream.end 方法通知可寫流結束。
const { Writable, Duplex } = require('stream') let fileContent = '' const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) {// 會作為_write方法 setTimeout(()=>{ fileContent += chunk callback()// 寫入結束后調用 }, 500) } }) myWritable.on('close', ()=>{ console.log('close', fileContent) }) myWritable.write('123123')// true myWritable.write('123123')// false myWritable.end()
注意,在緩存池中數據到達浮標值后,此時緩存池中可能存在多個節點,在清空緩存池的過程中(循環調用_read),并不會向可讀流一樣盡量一次消費長度為浮標值的數據,而是每次消費一個緩沖區節點,即使這個緩沖區長度于浮標值不一致也是如此
const { Writable } = require('stream') let fileContent = '' const myWritable = new Writable({ highWaterMark: 10, write(chunk, encoding, callback) { setTimeout(()=>{ fileContent += chunk console.log('消費', chunk.toString()) callback()// 寫入結束后調用 }, 100) } }) myWritable.on('close', ()=>{ console.log('close', fileContent) }) let count = 0 function productionData(){ let flag = true while (count <= 20 && flag){ flag = myWritable.write(count.toString()) count++ } if(count > 20){ myWritable.end() } } productionData() myWritable.on('drain', productionData)
上述是一個浮標值為 10 的可寫流,現在數據源是一個 0——20 到連續的數字字符串,productionData 用于寫入數據。
-
首先第一次調用
myWritable.write("0")時,因為緩存池不存在數據,所以"0"不進入緩存池,而是直接交給_wirte,myWritable.write("0")返回值為true -
當執行
myWritable.write("1")時,因為_wirte的callback還未調用,表明上一次數據還未寫入完,位置保證數據寫入的有序性,只能創建一個緩沖區將"1"加入緩存池中。后面2-9都是如此 -
當執行
myWritable.write("10")時,此時緩沖區長度為9(1-9),還未到達浮標值,"10"繼續作為一個緩沖區加入緩存池中,此時緩存池長度變為11,所以myWritable.write("1")返回false,這意味著緩沖區的數據已經足夠,我們需要等待drain事件通知時再生產數據。 -
100ms過后,
_write("0", encoding, callback)的callback被調用,表明"0"已經寫入完成。然后會檢查緩存池中是否存在數據,如果存在則會先調用_read消費緩存池的頭節點("1"),然后繼續重復這個過程直到緩存池為空后觸發drain事件,再次執行productionData -
調用
myWritable.write("11"),觸發第1步開始的過程,直到流結束。
Duplex
在理解了可讀流與可寫流后,雙工流就好理解了,雙工流事實上是繼承了可讀流然后實現了可寫流(源碼是這么寫的,但是應該說是同時實現了可讀流和可寫流更加好)。

Duplex 流需要同時實現下面兩個方法
-
實現 _read() 方法,為可讀流生產數據
-
實現 _write() 方法,為可寫流消費數據
上面兩個方法如何實現在上面可寫流可讀流的部分已經介紹過了,這里需要注意的是,雙工流是存在兩個獨立的緩存池分別提供給兩個流,他們的數據源也不一樣
以 NodeJs 的標準輸入輸出流為例:
- 當我們在控制臺輸入數據時會觸發其 data 事件,這證明他有可讀流的功能,每一次用戶鍵入回車相當于調用可讀的 push 方法推送生產的數據。
- 當我們調用其 write 方法時也可以向控制臺輸出內容,但是不會觸發 data 事件,這說明他有可寫流的功能,而且有獨立的緩沖區,_write 方法的實現內容就是讓控制臺展示文字。
// 每當用戶在控制臺輸入數據(_read),就會觸發data事件,這是可讀流的特性 process.stdin.on('data', data=>{ process.stdin.write(data); }) // 每隔一秒向標準輸入流生產數據(這是可寫流的特性,會直接輸出到控制臺上),不會觸發data setInterval(()=>{ process.stdin.write('不是用戶控制臺輸入的數據') }, 1000)
Transform

可以將 Duplex 流視為具有可寫流的可讀流。兩者都是獨立的,每個都有獨立的內部緩沖區。讀寫事件獨立發生。
Duplex Stream ------------------| Read <----- External Source You ------------------| Write -----> External Sink ------------------|
Transform 流是雙工的,其中讀寫以因果關系進行。雙工流的端點通過某種轉換鏈接。讀取要求發生寫入。
Transform Stream --------------|-------------- You Write ----> ----> Read You --------------|--------------
對于創建 Transform 流,最重要的是要實現 _transform 方法而不是 _write 或者 _read。 _transform 中對可寫流寫入的數據做處理(消費)然后為可讀流生產數據。
轉換流還經常會實現一個 `_flush` 方法,他會在流結束前被調用,一般用于對流的末尾追加一些東西,例如壓縮文件時的一些壓縮信息就是在這里加上的
const { write } = require('fs') const { Transform, PassThrough } = require('stream') const reurce = '1312123213124341234213423428354816273513461891468186499126412' const transform = new Transform({ highWaterMark: 10, transform(chunk ,encoding, callback){// 轉換數據,調用push將轉換結果加入緩存池 this.push(chunk.toString().replace('1', '@')) callback() }, flush(callback){// end觸發前執行 this.push('<<<') callback() } }) // write 不斷寫入數據 let count = 0 transform.write('>>>') function productionData() { let flag = true while (count <= 20 && flag) { flag = transform.write(count.toString()) count++ } if (count > 20) { transform.end() } } productionData() transform.on('drain', productionData) let result = '' transform.on('data', data=>{ result += data.toString() }) transform.on('end', ()=>{ console.log(result) // >>>0@23456789@0@1@2@3@4@5@6@7@8@920<<< })
Pipe
管道是將上一個程序的輸出作為下一個程序的輸入,這是管道在 Linux 中管道的作用。NodeJs 中的管道其實也類似,它管道用于連接兩個流,上游的流的輸出會作為下游的流的輸入。

管道 sourec.pipe(dest, options) 要求 sourec 是可讀的,dest是可寫的。其返回值是 dest。
對于處于管道中間的流既是下一個流的上游也是上一個流的下游,所以其需要時一個可讀可寫的雙工流,一般我們會使用轉換流來作為管道中間的流。
https://github1s.com/nodejs/node/blob/v17.0.0/lib/internal/streams/legacy.js#L16-L33
Stream.prototype.pipe = function(dest, options) { const source = this; function ondata(chunk) { if (dest.writable && dest.write(chunk) === false && source.pause) { source.pause(); } } source.on('data', ondata); function ondrain() { if (source.readable && source.resume) { source.resume(); } } dest.on('drain', ondrain); // ...后面的代碼省略 }
pipe 的實現非常清晰,當上游的流發出 data 事件時會調用下游流的 write 方法寫入數據,然后立即調用 source.pause() 使得上游變為暫停狀態,這主要是為了防止背壓。
當下游的流將數據消費完成后會調用 source.resume() 使上游再次變為流動狀態。
我們實現一個將 data 文件中所有 1 替換為 @ 然后輸出到 result 文件到管道。
const { Transform } = require('stream') const { createReadStream, createWriteStream } = require('fs') // 一個位于管道中的轉換流 function createTransformStream(){ return new Transform({ transform(chunk, encoding, callback){ this.push(chunk.toString().replace(/1/g, '@')) callback() } }) } createReadStream('./data') .pipe(createTransformStream()) .pipe(createWriteStream('./result'))
在管道中只存在兩個流時,其功能和轉換流有點類似,都是將一個可讀流與一個可寫流串聯起來,但是管道可以串聯多個流。
原文地址:https://juejin.cn/post/7077511716564631566
作者:月夕
站長資訊網