Web 流 API
History
稳定性:2 - 稳定
WHATWG 流标准 的实现。
WHATWG 流标准(或"web 流”)定义了一个用于处理流式数据的 API。它类似于 Node.js 流 API,但出现得较晚,并已成为许多 JavaScript 环境中流式数据的“标准”API。
主要有三种类型的对象:
ReadableStream- 代表流式数据的来源。WritableStream- 代表流式数据的目的地。TransformStream- 代表转换流式数据的算法。
此示例创建一个简单的 ReadableStream,每秒永久推送一次当前的 performance.now() 时间戳。使用异步迭代器从流中读取数据。
import {
ReadableStream,
} from 'node:stream/web';
import {
setInterval as every,
} from 'node:timers/promises';
import {
performance,
} from 'node:perf_hooks';
const SECOND = 1000;
const stream = new ReadableStream({
async start(controller) {
for await (const _ of every(SECOND))
controller.enqueue(performance.now());
},
});
for await (const value of stream)
console.log(value);Node.js 流可以通过 stream.Readable、stream.Writable 和 stream.Duplex 对象上存在的 toWeb 和 fromWeb 方法转换为 web 流,反之亦然。
有关更多详细信息,请参阅相关文档:
stream.Readable.toWebstream.Readable.fromWebstream.Writable.toWebstream.Writable.fromWebstream.Duplex.toWebstream.Duplex.fromWeb
类:ReadableStream
History
此类现在暴露于全局对象上。
new ReadableStream(underlyingSource ?, strategy?): void<Object><Function>ReadableStream
内部队列未满时重复调用。操作可以是同步或异步的。如果是异步的,则在先前返回的 promise 被兑现之前,不会再次调用该函数。undefined
的 promise。<string>'bytes'
或
undefined
。<number>type
等于
'bytes'
时使用。当设置为非零值时,视图缓冲区会自动分配给
ReadableByteStreamController.byobRequest
。当未设置时,必须使用流的内部队列通过默认读取器
ReadableStreamDefaultReader
传输数据。- 类型:
<boolean>如果此<ReadableStream>存在活动读取器,则设置为true。
readableStream.locked 属性默认为 false,当存在活动读取器消费流的数据时切换为 true。
readableStream.cancel(reason?): void<any>undefined
的 promise。readableStream.getReader(options?): voidimport { ReadableStream } from 'node:stream/web';
const stream = new ReadableStream();
const reader = stream.getReader();
console.log(await reader.read());导致 readableStream.locked 为 true。
readableStream.pipeThrough(transform, options?): void<Object><ReadableStream>ReadableStream
,
transform.writable
将从此
ReadableStream
接收的潜在修改后的数据推送到此流。<WritableStream>WritableStream
,此
ReadableStream
的数据将写入此流。<Object><boolean>true
时,此
ReadableStream
中的错误不会导致
transform.writable
被中止。<boolean>true
时,目标
transform.writable
中的错误不会导致此
ReadableStream
被取消。<boolean>true
时,关闭此
ReadableStream
不会导致
transform.writable
被关闭。<AbortSignal><AbortController>
取消数据传输。<ReadableStream>
来自
transform.readable
。将此 <ReadableStream> 连接到 transform 参数中提供的 <ReadableStream> 和 <WritableStream> 对,使得来自此 <ReadableStream> 的数据被写入 transform.writable,可能被转换,然后推送到 transform.readable。一旦管道配置完成,将返回 transform.readable。
在管道操作活跃时,导致 readableStream.locked 为 true。
import {
ReadableStream,
TransformStream,
} from 'node:stream/web';
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a');
},
});
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
const transformedStream = stream.pipeThrough(transform);
for await (const chunk of transformedStream)
console.log(chunk);
// 打印:AreadableStream.pipeTo(destination, options?): void<WritableStream><WritableStream>
,此
ReadableStream
的数据将写入此流。<Object><boolean>true
时,此
ReadableStream
中的错误不会导致
destination
被中止。<boolean>true
时,
destination
中的错误不会导致此
ReadableStream
被取消。<boolean>true
时,关闭此
ReadableStream
不会导致
destination
被关闭。<AbortSignal><AbortController>
取消数据传输。undefined
的 promise在管道操作活跃时,导致 readableStream.locked 为 true。
readableStream.tee(): void返回一对新的 <ReadableStream> 实例,此 ReadableStream 的数据将转发到这两个实例。每个实例将接收相同的数据。
导致 readableStream.locked 为 true。
readableStream.values(options?): void创建并返回一个可用于消费此 ReadableStream 数据的异步迭代器。
在异步迭代器活跃时,导致 readableStream.locked 为 true。
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream.values({ preventCancel: true }))
console.log(Buffer.from(chunk).toString());<ReadableStream> 对象支持使用 for await 语法的异步迭代器协议。
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream)
console.log(Buffer.from(chunk).toString());异步迭代器将消费 <ReadableStream> 直到其终止。
默认情况下,如果异步迭代器提前退出(通过 break、return 或 throw),<ReadableStream> 将被关闭。为了防止 <ReadableStream> 自动关闭,请使用 readableStream.values() 方法获取异步迭代器并将 preventCancel 选项设置为 true。
<ReadableStream> 不得被锁定(即,它不得有现有的活动读取器)。在异步迭代期间,<ReadableStream> 将被锁定。
<ReadableStream> 实例可以使用 <MessagePort> 进行传输。
const stream = new ReadableStream(getReadableSourceSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getReader().read().then((chunk) => {
console.log(chunk);
});
};
port2.postMessage(stream, [stream]);ReadableStream.from(iterable): voiditerable{Iterable} 实现Symbol.asyncIterator或Symbol.iterator可迭代协议的对象。
一个实用方法,从可迭代对象创建一个新的 <ReadableStream>。
import { ReadableStream } from 'node:stream/web';
async function* asyncIterableGenerator() {
yield 'a';
yield 'b';
yield 'c';
}
const stream = ReadableStream.from(asyncIterableGenerator());
for await (const chunk of stream)
console.log(chunk); // 打印:'a', 'b', 'c'要将生成的 <ReadableStream> 管道传输到 <WritableStream>,{Iterable} 应产生一系列 {Buffer}、<TypedArray> 或 <DataView> 对象。
import { ReadableStream } from 'node:stream/web';
import { Buffer } from 'node:buffer';
async function* asyncIterableGenerator() {
yield Buffer.from('a');
yield Buffer.from('b');
yield Buffer.from('c');
}
const stream = ReadableStream.from(asyncIterableGenerator());
await stream.pipeTo(createWritableStreamSomehow());类:ReadableStreamDefaultReader
History
此类现在暴露于全局对象上。
默认情况下,调用 readableStream.getReader() 而不带参数将返回 ReadableStreamDefaultReader 的实例。默认读取器将流经流的数据块视为不透明值,这允许 <ReadableStream> 与通常任何 JavaScript 值一起工作。
new ReadableStreamDefaultReader(stream): void<ReadableStream>创建一个新的 <ReadableStreamDefaultReader>,锁定到给定的 <ReadableStream>。
readableStreamDefaultReader.cancel(reason?): void<any>undefined
的 promise。取消 <ReadableStream> 并返回一个 promise,当底层流被取消时该 promise 被兑现。
- 类型:
<Promise>当关联的<ReadableStream>关闭时兑现为undefined,如果流出错或读取器的锁在流完成关闭之前被释放,则被拒绝。
read(): void- 返回:一个兑现为一个对象的 promise:
Attributes
从底层 <ReadableStream> 请求下一个数据块,并返回一个 promise,一旦数据可用,该 promise 将被兑现。
readableStreamDefaultReader.releaseLock(): void释放此读取器对底层 <ReadableStream> 的锁。
类:ReadableStreamBYOBReader
History
此类现在暴露于全局对象上。
ReadableStreamBYOBReader 是面向字节的 <ReadableStream>(即在创建 ReadableStream 时将 underlyingSource.type 设置为 'bytes' 的流)的替代消费者。
BYOB 是"bring your own buffer"(自带缓冲区)的缩写。这是一种模式,允许更有效地读取面向字节的数据,避免额外的复制。
import {
open,
} from 'node:fs/promises';
import {
ReadableStream,
} from 'node:stream/web';
import { Buffer } from 'node:buffer';
class Source {
type = 'bytes';
autoAllocateChunkSize = 1024;
async start(controller) {
this.file = await open(new URL(import.meta.url));
this.controller = controller;
}
async pull(controller) {
const view = controller.byobRequest?.view;
const {
bytesRead,
} = await this.file.read({
buffer: view,
offset: view.byteOffset,
length: view.byteLength,
});
if (bytesRead === 0) {
await this.file.close();
this.controller.close();
}
controller.byobRequest.respond(bytesRead);
}
}
const stream = new ReadableStream(new Source());
async function read(stream) {
const reader = stream.getReader({ mode: 'byob' });
const chunks = [];
let result;
do {
result = await reader.read(Buffer.alloc(100));
if (result.value !== undefined)
chunks.push(Buffer.from(result.value));
} while (!result.done);
return Buffer.concat(chunks);
}
const data = await read(stream);
console.log(Buffer.from(data).toString());new ReadableStreamBYOBReader(stream): void<ReadableStream>创建一个新的 ReadableStreamBYOBReader,锁定到给定的 <ReadableStream>。
readableStreamBYOBReader.cancel(reason?): void<any>undefined
的 promise。取消 <ReadableStream> 并返回一个 promise,当底层流被取消时该 promise 被兑现。
- 类型:
<Promise>当关联的<ReadableStream>关闭时兑现为undefined,如果流出错或读取器的锁在流完成关闭之前被释放,则被拒绝。
read(view, options?): void<TypedArray>
|
<DataView><TypedArray>
|
<DataView><boolean>从底层 <ReadableStream> 请求下一个数据块,并返回一个 promise,一旦数据可用,该 promise 将被兑现。
不要将池化的 {Buffer} 对象实例传递到此方法。
池化的 Buffer 对象是使用 Buffer.allocUnsafe() 或 Buffer.from() 创建的,或者通常由各种 node:fs 模块回调返回。这些类型的 Buffer 使用共享的底层 <ArrayBuffer> 对象,该对象包含所有池化 Buffer 实例的所有数据。当将 Buffer、<TypedArray> 或 <DataView> 传递到 readableStreamBYOBReader.read() 时,视图的底层 ArrayBuffer 被_分离_,使该 ArrayBuffer 上可能存在的所有现有视图无效。这可能会给您的应用程序带来灾难性后果。
readableStreamBYOBReader.releaseLock(): void释放此读取器对底层 <ReadableStream> 的锁。
类:ReadableStreamDefaultController
History
每个 <ReadableStream> 都有一个控制器,负责流的队列的内部状态和管理。 ReadableStreamDefaultController 是非面向字节的 ReadableStream 的默认控制器实现。
readableStreamDefaultController.close(): void关闭与此控制器关联的 <ReadableStream>。
- 类型:
<number>
返回填充 <ReadableStream> 队列所需的剩余数据量。
readableStreamDefaultController.enqueue(chunk?): void<any>将新的数据块附加到 <ReadableStream> 的队列。
readableStreamDefaultController.error(error?): void<any>发出一个错误信号,导致 <ReadableStream> 出错并关闭。
类:ReadableByteStreamController
History
支持处理来自已释放读取器的 BYOB 拉取请求。
每个 <ReadableStream> 都有一个控制器,负责流的队列的内部状态和管理。 ReadableByteStreamController 用于面向字节的 ReadableStream。
readableByteStreamController.close(): void关闭与此控制器关联的 <ReadableStream>。
- 类型:
<number>
返回填充 <ReadableStream> 队列所需的剩余数据量。
readableByteStreamController.enqueue(chunk): void<TypedArray>
|
<DataView>将新的数据块附加到 <ReadableStream> 的队列。
readableByteStreamController.error(error?): void<any>发出一个错误信号,导致 <ReadableStream> 出错并关闭。
类:ReadableStreamBYOBRequest
History
此类现在暴露于全局对象上。
当在面向字节的流中使用 ReadableByteStreamController,以及使用 ReadableStreamBYOBReader 时,
readableByteStreamController.byobRequest 属性提供对 ReadableStreamBYOBRequest 实例的访问,
该实例代表当前的读取请求。该对象用于访问为读取请求提供的 ArrayBuffer/TypedArray 以进行填充,
并提供用于信号数据已提供的方法。
readableStreamBYOBRequest.respond(bytesWritten): void<number>信号表明已将 bytesWritten 数量的字节写入 readableStreamBYOBRequest.view。
readableStreamBYOBRequest.respondWithNewView(view): void<TypedArray>
|
<DataView>信号表明请求已 fulfilled,字节已写入新的 Buffer、TypedArray 或 DataView。
- 类型:
<TypedArray>|<DataView>
类:WritableStream
History
此类现在暴露于全局对象上。
WritableStream 是发送流数据的目的地。
import {
WritableStream,
} from 'node:stream/web';
const stream = new WritableStream({
write(chunk) {
console.log(chunk);
},
});
await stream.getWriter().write('Hello World');new WritableStream(underlyingSink?, strategy?): void<Object><Function>WritableStream
时立即调用。<WritableStreamDefaultController>undefined
或一个兑现值为
undefined
的 promise。<Function>WritableStream
时调用。<Function>WritableStream
关闭时调用。<any>type
选项保留供将来使用,
必须
为 undefined。writableStream.abort(reason?): void<any>undefined
的 promise。突然终止 WritableStream。所有排队的写入将被取消,其关联的 promise 将被拒绝。
writableStream.close(): void- 返回:一个兑现值为
undefined的 promise。
当不再期望额外写入时,关闭 WritableStream。
writableStream.getWriter(): void创建并返回一个新的写入器实例,可用于将数据写入 WritableStream。
- 类型:
<boolean>
writableStream.locked 属性默认为 false,当有活动写入器附加到此 WritableStream 时切换为 true。
<WritableStream> 实例可以使用 <MessagePort> 进行传输。
const stream = new WritableStream(getWritableSinkSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getWriter().write('hello');
};
port2.postMessage(stream, [stream]);类:WritableStreamDefaultWriter
History
此类现在暴露于全局对象上。
new WritableStreamDefaultWriter(stream): void<WritableStream>创建一个新的 WritableStreamDefaultWriter,锁定到给定的 WritableStream。
writableStreamDefaultWriter.abort(reason?): void<any>undefined
的 promise。突然终止 WritableStream。所有排队的写入将被取消,其关联的 promise 将被拒绝。
writableStreamDefaultWriter.close(): void- 返回:一个兑现值为
undefined的 promise。
当不再期望额外写入时,关闭 WritableStream。
- 类型:
<Promise>当关联的<WritableStream>关闭时兑现为undefined,如果流出错或写入器的锁在流完成关闭之前被释放,则被拒绝。
- 类型:
<number>
填充 <WritableStream> 队列所需的数据量。
- 类型:
<Promise>当写入器准备好使用时兑现为undefined。
writableStreamDefaultWriter.releaseLock(): void释放此写入器对底层 <ReadableStream> 的锁。
writableStreamDefaultWriter.write(chunk?): void<any>undefined
的 promise。将新的数据块附加到 <WritableStream> 的队列。
类:WritableStreamDefaultController
History
此类现在暴露于全局对象上。
WritableStreamDefaultController 管理 <WritableStream> 的内部状态。
writableStreamDefaultController.error(error?): void<any>由用户代码调用,以信号表明在处理 WritableStream 数据时发生了错误。调用时,<WritableStream> 将被中止,当前待处理的写入将被取消。
- 类型:
<AbortSignal>一个AbortSignal,可用于在<WritableStream>被中止时取消待处理的写入或关闭操作。
类:TransformStream
History
此类现在暴露于全局对象上。
TransformStream 由一个 <ReadableStream> 和一个 <WritableStream> 组成,它们连接在一起,使得写入 WritableStream 的数据被接收,并可能被转换,然后推送到 ReadableStream 的队列。
import {
TransformStream,
} from 'node:stream/web';
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
await Promise.all([
transform.writable.getWriter().write('A'),
transform.readable.getReader().read(),
]);new TransformStream(transformer?, writableStrategy?, readableStrategy?): void<Object><Function>TransformStream
时立即调用。<TransformStreamDefaultController>undefined
或一个兑现值为
undefined
的 promise<Function>transformStream.writable
的数据块,然后将其转发到
transformStream.readable
。<Function>TransformStream
的可写侧关闭之前立即调用,信号表明转换过程结束。<TransformStreamDefaultController>undefined
的 promise。<any>readableType
选项保留供将来使用,
必须
为
undefined
。<any>writableType
选项保留供将来使用,
必须
为
undefined
。<TransformStream> 实例可以使用 <MessagePort> 进行传输。
const stream = new TransformStream();
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
const { writable, readable } = data;
// ...
};
port2.postMessage(stream, [stream]);类:TransformStreamDefaultController
History
此类现在暴露于全局对象上。
TransformStreamDefaultController 管理 TransformStream 的内部状态。
- 类型:
<number>
填充可读侧队列所需的数据量。
transformStreamDefaultController.enqueue(chunk?): void<any>将数据块附加到可读侧的队列。
transformStreamDefaultController.error(reason?): void<any>向可读侧和可写侧发出信号,表明在处理转换数据时发生了错误,导致两侧突然关闭。
transformStreamDefaultController.terminate(): void关闭传输的可读侧,并导致可写侧因错误而突然关闭。
类:ByteLengthQueuingStrategy
History
此类现在暴露于全局对象上。
new ByteLengthQueuingStrategy(init): void- 类型:
<number>
- 类型:
<Function>Attributes
类:CountQueuingStrategy
History
此类现在暴露于全局对象上。
new CountQueuingStrategy(init): void- 类型:
<number>
- 类型:
<Function>Attributes
类:TextEncoderStream
History
此类现在暴露于全局对象上。
new TextEncoderStream(): void创建一个新的 TextEncoderStream 实例。
- 类型:
<string>
TextEncoderStream 实例支持的编码。
类:TextDecoderStream
History
此类现在暴露于全局对象上。
new TextDecoderStream(encoding?, options?): void创建一个新的 TextDecoderStream 实例。
- 类型:
<string>
TextDecoderStream 实例支持的编码。
- 类型:
<boolean>
如果解码错误导致抛出 TypeError,则值将为 true。
- 类型:
<boolean>
如果解码结果将包含字节顺序标记,则值将为 true。
类:CompressionStream
History
此类现在暴露于全局对象上。
CompressionStream Constructor
History
format 现在接受 brotli 值。
format 现在接受 deflate-raw 值。
new CompressionStream(format): void<string>'deflate'
、
'deflate-raw'
、
'gzip'
或
'brotli'
之一。类:DecompressionStream
History
此类现在暴露于全局对象上。
DecompressionStream Constructor
History
format 现在接受 brotli 值。
format 现在接受 deflate-raw 值。
new DecompressionStream(format): void<string>'deflate'
、
'deflate-raw'
、
'gzip'
或
'brotli'
之一。实用消费者
History
实用消费者函数提供用于消费流的常见选项。
它们通过以下方式访问:
import {
arrayBuffer,
blob,
buffer,
json,
text,
} from 'node:stream/consumers';streamConsumers.arrayBuffer(stream): void<ReadableStream>
|
<stream.Readable>
|
<AsyncIterator><Promise>
兑现为一个包含流全部内容的
ArrayBuffer
。import { arrayBuffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { TextEncoder } from 'node:util';
const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
const data = await arrayBuffer(readable);
console.log(`from readable: ${data.byteLength}`);
// 打印:from readable: 76streamConsumers.blob(stream): void<ReadableStream>
|
<stream.Readable>
|
<AsyncIterator>import { blob } from 'node:stream/consumers';
const dataBlob = new Blob(['hello world from consumers!']);
const readable = dataBlob.stream();
const data = await blob(readable);
console.log(`from readable: ${data.size}`);
// 打印:from readable: 27streamConsumers.buffer(stream): void<ReadableStream>
|
<stream.Readable>
|
<AsyncIterator><Promise>
兑现为一个包含流全部内容的 {Buffer}。import { buffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';
const dataBuffer = Buffer.from('hello world from consumers!');
const readable = Readable.from(dataBuffer);
const data = await buffer(readable);
console.log(`from readable: ${data.length}`);
// 打印:from readable: 27streamConsumers.bytes(stream): void<ReadableStream>
|
<stream.Readable>
|
<AsyncIterator><Promise>
兑现为一个包含流全部内容的
<Uint8Array>
。import { bytes } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';
const dataBuffer = Buffer.from('hello world from consumers!');
const readable = Readable.from(dataBuffer);
const data = await bytes(readable);
console.log(`from readable: ${data.length}`);
// 打印:from readable: 27streamConsumers.json(stream): void<ReadableStream>
|
<stream.Readable>
|
<AsyncIterator><Promise>
兑现为流的内容,解析为 UTF-8 编码字符串,然后通过
JSON.parse()
。import { json } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const items = Array.from(
{
length: 100,
},
() => ({
message: 'hello world from consumers!',
}),
);
const readable = Readable.from(JSON.stringify(items));
const data = await json(readable);
console.log(`from readable: ${data.length}`);
// 打印:from readable: 100streamConsumers.text(stream): void<ReadableStream>
|
<stream.Readable>
|
<AsyncIterator><Promise>
兑现为流的内容,解析为 UTF-8 编码字符串。import { text } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const readable = Readable.from('Hello world from consumers!');
const data = await text(readable);
console.log(`from readable: ${data.length}`);
// 打印:from readable: 27