Node.js中你不可不精的Stream(流)
杨阳本尊 人气:0一、什么是Stream(流)
流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。 stream 模块提供了基础的API。使用这些API可以很容易地来构建实现流接口的对象。例如, HTTP 请求 和 process.stdout 就都是流的实例。
流可以是可读的、可写的,或是可读写的。注意,所有的流都是 EventEmitter 的实例。
二、流的类型
Node.js 中有四种基本的流类型:
- Readable - 可读的流 (例如 fs.createReadStream())。
- Writable - 可写的流 (例如 fs.createWriteStream())。
- Duplex - 可读写的流(双工流) (例如 net.Socket)。
- Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate())。
var Stream = require('stream') //stream 模块引入方式 var Readable = Stream.Readable //可读的流 var Writable = Stream.Writable //可写的流 var Duplex = Stream.Duplex //可读写的流 var Transform = Stream.Transform //在读写过程中可以修改和变换数据的 Duplex 流
Node.js中关于流的操作被封装到了Stream模块中,这个模块也被多个核心模块所引用。例如在fs.createReadStream()和fs.createWriteStream()的源码实现里,都调用了Stream模块提供的抽象接口来实现对流数据的操作。
三、为什么使用Stream?
我们通过两个例子,了解一下为什么要使用Stream。
Exp1:
下面是一个读取文件内容的例子:
const fs = require('fs') fs.readFile(file, function (err, content) { //读出来的content是Buffer console.log(content) console.log(content.toString()) })
但如果文件内容较大,譬如在500M时,执行上述代码的输出为:
<Buffer 64 74 09 75 61 09 63 6f 75 6e 74 0a 0a 64 74 09 75 61 09 63 6f 75 6e 74 0a 32 30 31 35 31 32 30 38 09 4d 6f 7a 69 6c 6c 61 2f 35 2e 30 20 28 63 6f 6d ... > buffer.js:382 throw new Error('toString failed'); ^ Error: toString failed at Buffer.toString (buffer.js:382:11)
报错的原因是content这个Buffer对象的长度过大,导致toString方法失败。
可见,这种一次获取全部内容的做法,不适合操作大文件。
可以考虑使用流来读取文件内容。
var fs = require('fs') fs.createReadStream(bigFile).pipe(process.stdout)
fs.createReadStream创建一个可读流,连接了源头(上游,文件)和消耗方(下游,标准输出)。
执行上面代码时,流会逐次调用fs.read(ReadStream这个类的源码里有一个_read方法,这个_read方法在内部调用了fs.read来实现对文件的读取),将文件中的内容分批取出传给下游。
在文件看来,它的内容被分块地连续取走了。
在下游看来,它收到的是一个先后到达的数据序列。
如果不需要一次操作全部内容,它可以处理完一个数据便丢掉。
在流看来,任一时刻它都只存储了文件中的一部分数据,只是内容在变化而已。
这种情况就像是用水管去取池子中的水。
每当用掉一点水,水管便会从池子中再取出一点。
无论水池有多大,都只存储了与水管容积等量的水。
Exp2:
下面是一个在线看视频的例子,假定我们通过HTTP请求返回视频内容给用户
const http = require('http'); const fs = require('fs'); http.createServer((req, res) => { fs.readFile(videoPath, (err, data) => { res.end(data); }); }).listen(8080);
但这样有两个明显的问题
- 视频文件需要全部读取完,才能返回给用户,这样等待时间会很长。
- 视频文件一次全放入内存中,内存吃不消。
用流可以将视频文件一点一点读到内存中,再一点一点返回给用户,读一部分,写一部分。(利用了 HTTP 协议的 Transfer-Encoding: chunked 分段传输特性),用户体验得到优化,同时对内存的开销明显下降。
const http = require('http'); const fs = require('fs'); http.createServer((req, res) => { fs.createReadStream(videoPath).pipe(res); }).listen(8080);
通过上述两个例子,我们知道,在大数据情况下必须使用流式处理。
四、可读流(Readable Stream)
可读流(Readable streams)是对提供数据的源头(source)的抽象。
常见的可读流:
- HTTP responses, on the client
- HTTP requests, on the server
- fs read streams
- TCP sockets //sockets是一个双工流,即可读可写的流
- process.stdin //标准输入
所有的 Readable Stream 都实现了 stream.Readable 类定义的接口。
可读流的两种模式(flowing 和 paused)
- 在 flowing 模式下,可读流自动从系统底层读取数据,并通过 EventEmitter 接口的事件尽快将数据提供给应用(所有的流都是 EventEmitter 的实例)。
- 在 paused 模式下,必须显式调用 stream.read()方法来从流中读取数据片段。
创建流的Readable流,默认是非流动模式(paused模式),默认不会读取数据。所有初始工作模式为paused的Readable流,可以通过下面三种途径切换为flowing模式:
- 监听'data'事件
- 调用stream.resume()方法
- 调用stream.pipe()方法将数据发送到Writable
fs.createReadStream(path[, options])源码实现
//文件名 ReadStream.js let fs = require('fs');//读取文件 let EventEmitter = require('events'); class ReadStream extends EventEmitter {//流操作都是基于事件的 constructor(path, options = {}) { super(); //需要的参数 this.path = path;//读取文件的路径 this.highWaterMark = options.highWaterMark || 64 * 1024;//缓冲区大小,默认64KB this.autoClose = options.autoClose || true;//是否需要自动关闭文件描述符,默认为true this.start = options.start || 0; //options 可以包括 start 和 end 值,使其可以从文件读取一定范围的字节而不是整个文件 this.pos = this.start; // 从文件的那个位置开始读取内容,pos会随着读取的位置而改变 this.end = options.end || null; // null表示没传递 this.encoding = options.encoding || null; this.flags = options.flags || 'r';//以何种方式操作文件 // 参数的问题 this.flowing = null; // 默认为非流动模式 // 建一个buffer存放读出来的数据 this.buffer = Buffer.alloc(this.highWaterMark); this.open(); // {newListener:[fn]} // 次方法默认同步调用的 this.on('newListener', (type) => { // 等待着 它监听data事件 if (type === 'data') {//当监听到data事件时,把流设置为流动模式 this.flowing = true; this.read();// 开始读取 客户已经监听了data事件 } }) } pause(){//将流从flowing模式切换为paused模式 this.flowing = false; } resume(){//将流从paused模式切换为flowing模式 this.flowing =true; this.read();//将流从paused模式切换为flowing模式后,继续读取文件内容 } read(){ // 默认第一次调用read方法时还没有获取fd,文件的打开是异步的,所以不能直接读 if(typeof this.fd !== 'number'){ //如果fd不是number类型,证明文件还没有打开,此时需要监听一次open事件,因为文件一打开,就会触发open事件,这个在this.open()里写了 return this.once('open',() => this.read()); // 等待着触发open事件后fd肯定拿到了,拿到以后再去执行read方法 } // 当获取到fd时 开始读取文件了 // 第一次应该读2个 第二次应该读2个 // 第二次pos的值是4 end是4 // 读取文件里一共4有个数为123 4,我们读取里面的123 4 let howMuchToRead = this.end?Math.min(this.end-this.pos+1,this.highWaterMark): this.highWaterMark;//规定每次读取多少个字节 fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (error, byteRead) => { // byteRead为真实的读到了几个字节的内容 // 读取完毕 this.pos += byteRead; // 读出来两个,pos位置就往后移两位 // this.buffer默认就是三个 let b = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead);//对读出来的内容进行编码 this.emit('data', b);//触发data事件,将读到的内容输出给用户 if ((byteRead === this.highWaterMark)&&this.flowing){ return this.read(); // 继续读 } // 这里就是没有更多的逻辑了 if (byteRead < this.highWaterMark){ // 没有更多了 this.emit('end'); // 读取完毕 this.destroy(); // 销毁即可 } }); } // 打开文件用的 destroy() { if (typeof this.fd != 'number') { return this.emit('close'); } //如果文件还没打开,直接触发close事件 fs.close(this.fd, () => { // 如果文件打开过了 那就关闭文件并且触发close事件 this.emit('close'); }); } open() { fs.open(this.path, this.flags, (err, fd) => { //fd是文件描述符,它标识的就是当前this.path这个文件,从3开始(number类型) if (err) { if (this.autoClose) { // 如果需要自动关闭我再去销毁fd this.destroy(); // 销毁(关闭文件,触发关闭事件) } this.emit('error', err); // 如果有错误触发error事件 return; } this.fd = fd; // 保存文件描述符 this.emit('open', this.fd); // 文件被打开了,触发文件被打开的方法 }); } pipe(dest){//管道流的实现 pipe()方法是ReadStream下的方法,它里面的参数是WritableStream this.on('data',(data)=>{ let flag = dest.write(data); if(!flag){//这个flag就是每次调用ws.write()后返回的读状态值 this.pause();// 已经不能继续写了,等他写完了再恢复 } }); dest.on('drain',()=>{//当读取缓存区清空后 console.log('写一下停一下') this.resume();//继续往dest写入数据 }); } } module.exports = ReadStream;//导出可读流
使用fs.createReadStream()
// 流:有序的有方向的,可以自己控制速率 // 读:读是将内容读取到内存中 // 写:写是将内存或者文件的内容写入到文件内 // 读取的时候默认读 默认一次读取64k,encoding 读取出来的内容默认都是buffer //let fs = require('fs'); //let rs = fs.createReadStream({...});//原生实现可读流 let ReadStream = require('./ReadStream'); let rs = new ReadStream('./2.txt', { highWaterMark: 3, // 字节 flags:'r',//读文件 autoClose:true, // 默认读取完毕后自动关闭文件描述符 start:0, //end:3,// 流是闭合区间 包start也包end encoding:'utf8' }); // 默认创建一个流 是非流动模式(上述源码中有写的),默认不会读取数据 // 如果我们需要接收数据,那我们要监听data事件,这样数据会自动的流出来 rs.on('error',function (err) {// 通常,这会在底层系统内部出错从而不能产生数据,或当流的实现试图传递错误数据时发生。 console.log(err) }); rs.on('open',function () {//文件被打开了,获取到了fd。内部会自动的触发这个事件 rs.emit('data'); console.log('文件打开了'); }); rs.on('data',function (data) {//有数据流出来了 console.log(data); rs.pause(); // 暂停触发on('data')事件,将流动模式又转化成了非流动模式 }); setTimeout(()=>{rs.resume()},3000);//三秒钟之后再将非流动模式转化为流动模式 rs.on('end',function () {// 读取完毕 console.log('读取完毕了'); }); rs.on('close',function () {//close 事件将在流或其底层资源(比如一个文件)关闭后触发。close 事件触发后,该流将不会再触发任何事件。 //console.log('关闭') });
四、可写流(Writable Stream)
可写流是对数据流向设备的抽象,用来消费上游流过来的数据,通过可写流程序可以把数据写入设备,常见的是本地磁盘文件或者 TCP、HTTP 等网络响应。
常见的可写流:
- HTTP requests, on the client
- HTTP responses, on the server
- fs write streams
- zlib streams
- crypto streams
- TCP sockets
- child process stdin
- process.stdout, process.stderr
所有 Writable 流都实现了 stream.Writable 类定义的接口。
可写流的使用
调用可写流实例的 write() 方法就可以把数据写入可写流
const fs = require('fs'); const rs = fs.createReadStream(sourcePath); const ws = fs.createWriteStream(destPath); rs.setEncoding('utf-8'); // 设置编码格式 rs.on('data', chunk => { ws.write(chunk); // 写入数据 });
监听了可读流的data事件就会使可读流进入流动模式,我们在回调事件里调用了可写流的 write() 方法,这样数据就被写入了可写流抽象的设备destPath中。
write() 方法有三个参数
- chunk {String| Buffer},表示要写入的数据
- encoding 当写入的数据是字符串的时候可以设置编码
- callback 数据被写入之后的回调函数
drain事件
如果调用 stream.write(chunk)方法返回false,表示当前缓存区已满,流将在适当的时机(缓存区清空后)触发drain事件。
const fs = require('fs'); const rs = fs.createReadStream(sourcePath); const ws = fs.createWriteStream(destPath); rs.setEncoding('utf-8'); // 设置编码格式 rs.on('data', chunk => { let flag = ws.write(chunk); // 写入数据 if (!flag) { // 如果缓存区已满暂停读取 rs.pause(); } }); ws.on('drain', () => { rs.resume(); // 缓存区已清空 继续读取写入 });
fs.createWriteStream(path[, options])源码实现
// 文件 WriteStream.js let fs = require('fs'); let EventEmitter = require('events'); class WriteStream extends EventEmitter { constructor(path, options = {}) { super(); this.path = path; this.flags = options.flags || 'w'; this.encoding = options.encoding || 'utf8'; this.start = options.start || 0; this.pos = this.start; this.mode = options.mode || 0o666; this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark || 16 * 1024; this.open(); // fd 异步的 //触发一个open事件,当触发open事件后fd肯定就存在了 // 写文件的时候 需要的参数有哪些 // 第一次写入是真的往文件里写 this.writing = false; // 默认第一次就不是正在写入 // 用简单的数组来模拟一下缓存 this.cache = []; // 维护一个变量,表示缓存的长度 this.len = 0; // 是否触发drain事件 this.needDrain = false; } clearBuffer() { let buffer = this.cache.shift(); if (buffer) { // 如果缓存里有 this._write(buffer.chunk, buffer.encoding, () => this.clearBuffer()); } else {// 如果缓存里没有了 if (this.needDrain) { // 需要触发drain事件 this.writing = false; // 告诉下次直接写就可以了 不需要写到内存中了 this.needDrain = false; this.emit('drain'); } } } _write(chunk, encoding, clearBuffer) { // 因为write方法是同步调用的此时fd还没有获取到,所以等待获取到再执行write操作 if (typeof this.fd != 'number') { return this.once('open', () => this._write(chunk, encoding, clearBuffer)); } fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, byteWritten) => { this.pos += byteWritten; this.len -= byteWritten; // 每次写入后就要在内存中减少一下 clearBuffer(); // 第一次就写完了 }) } write(chunk, encoding = this.encoding) { // 客户调用的是write方法去写入内容 // 要判断 chunk必须是buffer或者字符串 为了统一,如果传递的是字符串也要转成buffer chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding); this.len += chunk.length; // 维护缓存的长度 3 let ret = this.len < this.highWaterMark; if (!ret) { this.needDrain = true; // 表示需要触发drain事件 } if (this.writing) { // 表示正在写入,应该放到内存中 this.cache.push({ chunk, encoding, }); } else { // 第一次 this.writing = true; this._write(chunk, encoding, () => this.clearBuffer()); // 专门实现写的方法 } return ret; // 能不能继续写了,false表示下次的写的时候就要占用更多内存了 } destroy() { if (typeof this.fd != 'number') { this.emit('close'); } else { fs.close(this.fd, () => { this.emit('close'); }); } } open() { fs.open(this.path, this.flags, this.mode, (err, fd) => { if (err) { this.emit('error', err); if (this.autoClose) { this.destroy(); // 如果自动关闭就销毁文件描述符 } return; } this.fd = fd; this.emit('open', this.fd); }); } } module.exports = WriteStream;
使用fs.createWriteStream()
// 可写流有缓存区的概念 // 1.第一次写入是真的向文件里写,第二次在写入的时候是放到了缓存区里 // 2.写入时会返回一个boolean类型,返回为false时表示缓存区满了,不要再写入了 // 3.当内存和正在写入的内容消耗完后,会触发一个drain事件 //let fs = require('fs'); //let rs = fs.createWriteStream({...});//原生实现可写流 let WS = require('./WriteStream') let ws = new WS('./2.txt', { flags: 'w', // 写入文件,默认文件不存在会创建 highWaterMark: 1, // 设置当前缓存区的大小 encoding: 'utf8', // 文件里存放的都是二进制 start: 0, autoClose: true, // 自动关闭文件描述符 mode: 0o666, // 可读可写 }); // drain的触发时机,只有当highWaterMark填满时,才可能触发drain // 当嘴里的和地下的都吃完了,就会触发drain方法 let i = 9; function write() { let flag = true; while (flag && i >= 0) { i--; flag = ws.write('111'); // 987 // 654 // 321 // 0 console.log(flag) } } write(); ws.on('drain', function () { console.log('dry'); write(); });
总结
stream(流)分为可读流(flowing mode和paused mode)、可写流、可读写流,Node.js 提供了多种流对象。 例如, HTTP 请求 和 process.stdout 就都是流的实例。stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实现流接口的对象。它们底层都调用了stream模块并进行封装。
好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对的支持。
加载全部内容