On this page

工作线程

History
Source Code: lib/worker_threads.js

稳定性:2 - 稳定

node:worker_threads 模块允许使用并行执行 JavaScript 的线程。要访问它:

工作器(线程)适用于执行 CPU 密集型的 JavaScript 操作。它们对 I/O 密集型工作帮助不大。Node.js 内置的异步 I/O 操作比工作器更高效。

child_processcluster 不同,worker_threads 可以共享内存。它们通过传输 ArrayBuffer 实例或共享 SharedArrayBuffer 实例来实现这一点。

import {
  Worker,
  isMainThread,
  parentPort,
  workerData,
} from 'node:worker_threads';

if (!isMainThread) {
  const { parse } = await import('some-js-parsing-library');
  const script = workerData;
  parentPort.postMessage(parse(script));
}

export default function parseJSAsync(script) {
  return new Promise((resolve, reject) => {
    const worker = new Worker(new URL(import.meta.url), {
      workerData: script,
    });
    worker.on('message', resolve);
    worker.once('error', reject);
    worker.once('exit', (code) => {
      if (code !== 0)
        reject(new Error(`Worker stopped with exit code ${code}`));
    });
  });
};

上述示例为每个 parseJSAsync() 调用生成一个 Worker 线程。实际上,对于此类任务应使用 Worker 池。否则,创建 Worker 的开销可能会超过其收益。

实现 Worker 池时,使用 AsyncResource API 通知诊断工具(例如提供异步堆栈跟踪)关于任务与其结果之间的关联。有关示例实现,请参阅 async_hooks 文档中的 "使用 AsyncResource 用于 Worker 线程池"

Worker 线程默认继承非进程特定的选项。参阅 Worker 构造函数选项 了解如何自定义工作线程选项,特别是 argvexecArgv 选项。

M

worker_threads.getEnvironmentData

History
worker_threads.getEnvironmentData(key): void
Attributes
key:<any>
任何任意的、可克隆的 JavaScript 值,可用作 <Map> 键。
返回: <any>

在工作线程内,worker.getEnvironmentData() 返回传递给生成线程的 worker.setEnvironmentData() 的数据克隆。每个新 Worker 都会自动接收环境数据的自己的副本。

import {
  Worker,
  isMainThread,
  setEnvironmentData,
  getEnvironmentData,
} from 'node:worker_threads';

if (isMainThread) {
  setEnvironmentData('Hello', 'World!');
  const worker = new Worker(new URL(import.meta.url));
} else {
  console.log(getEnvironmentData('Hello'));  // 打印 'World!'.
}
P

worker_threads.isInternalThread

History

如果此代码在内部 Worker 线程(例如加载器线程)内运行,则为 true

P

worker_threads.isMainThread

History

如果此代码不在 Worker 线程内运行,则为 true

import { Worker, isMainThread } from 'node:worker_threads';

if (isMainThread) {
  // 这会在 Worker 实例中重新加载当前文件。
  new Worker(new URL(import.meta.url));
} else {
  console.log('Inside Worker!');
  console.log(isMainThread);  // 打印 'false'.
}
M

worker_threads.markAsUntransferable

History
worker_threads.markAsUntransferable(object): void
Attributes
object:<any>
任何任意的 JavaScript 值。

将对象标记为不可传输。如果 object 出现在 port.postMessage() 调用的传输列表中,则会抛出错误。如果 object 是原始值,则此操作无效。

特别是,这对于可以被克隆而不是传输,并且被发送方其他对象使用的对象是有意义的。例如,Node.js 用它来标记用于 BufferArrayBuffer。在此类数组缓冲区实例上禁止使用 ArrayBuffer.prototype.transfer()

此操作无法撤销。

import { MessageChannel, markAsUntransferable } from 'node:worker_threads';

const pooledBuffer = new ArrayBuffer(8);
const typedArray1 = new Uint8Array(pooledBuffer);
const typedArray2 = new Float64Array(pooledBuffer);

markAsUntransferable(pooledBuffer);

const { port1 } = new MessageChannel();
try {
  // 这将抛出一个错误,因为 pooledBuffer 不可传输。
  port1.postMessage(typedArray1, [ typedArray1.buffer ]);
} catch (error) {
  // error.name === 'DataCloneError'
}

// 以下行打印 typedArray1 的内容 -- 它仍然拥有
// 其内存且未被传输。如果没有
// `markAsUntransferable()`,这将打印一个空的 Uint8Array 并且
// postMessage 调用会成功。
// typedArray2 也完好无损。
console.log(typedArray1);
console.log(typedArray2);

浏览器中没有与此 API 等效的功能。

M

worker_threads.isMarkedAsUntransferable

History
worker_threads.isMarkedAsUntransferable(object): void
Attributes
object:<any>
任何 JavaScript 值。
返回: <boolean>

检查对象是否被 markAsUntransferable() 标记为不可传输。

import { markAsUntransferable, isMarkedAsUntransferable } from 'node:worker_threads';

const pooledBuffer = new ArrayBuffer(8);
markAsUntransferable(pooledBuffer);

isMarkedAsUntransferable(pooledBuffer);  // 返回 true。

浏览器中没有与此 API 等效的功能。

M

worker_threads.markAsUncloneable

History
worker_threads.markAsUncloneable(object): void
Attributes
object:<any>
任何任意的 JavaScript 值。

将对象标记为不可克隆。如果 objectport.postMessage() 调用中用作 message,则会抛出错误。如果 object 是原始值,则此操作无效。

这对 ArrayBuffer 或任何 Buffer 类对象没有影响。

此操作无法撤销。

import { markAsUncloneable } from 'node:worker_threads';

const anyObject = { foo: 'bar' };
markAsUncloneable(anyObject);
const { port1 } = new MessageChannel();
try {
  // 这将抛出一个错误,因为 anyObject 不可克隆。
  port1.postMessage(anyObject);
} catch (error) {
  // error.name === 'DataCloneError'
}

浏览器中没有与此 API 等效的功能。

M

worker_threads.moveMessagePortToContext

History
worker_threads.moveMessagePortToContext(port, contextifiedSandbox): void
Attributes
要传输的消息端口。
contextifiedSandbox:<Object>
一个 [contextified][] 对象,由  vm.createContext() 方法返回。
返回: <MessagePort>

MessagePort 传输到不同的 vm 上下文。原始 port 对象变为不可用,返回的 MessagePort 实例取代其位置。

返回的 MessagePort 是目标上下文中的对象,并继承自其全局 Object 类。传递给 port.onmessage() 监听器的对象也在目标上下文中创建,并继承自其全局 Object 类。

但是,创建的 MessagePort 不再继承自 <EventTarget>,只能使用 port.onmessage() 来接收事件。

P

worker_threads.parentPort

History

如果此线程是一个 Worker,则这是一个 MessagePort, 允许与主线程通信。使用 parentPort.postMessage() 发送的消息在主线程中 通过 worker.on('message') 可用,而使用 worker.postMessage() 从主线程 发送的消息在此线程中通过 parentPort.on('message') 可用。

import { Worker, isMainThread, parentPort } from 'node:worker_threads';

if (isMainThread) {
  const worker = new Worker(new URL(import.meta.url));
  worker.once('message', (message) => {
    console.log(message);  // 打印 'Hello, world!'。
  });
  worker.postMessage('Hello, world!');
} else {
  // 当收到来自主线程的消息时,将其发回:
  parentPort.once('message', (message) => {
    parentPort.postMessage(message);
  });
}
M

worker_threads.postMessageToThread

History
worker_threads.postMessageToThread(threadId, value, transferList?, timeout?): void

稳定性:1.1 - 积极开发中

Attributes
threadId:<number>
目标线程 ID。如果线程 ID 无效,将抛出  ERR_WORKER_MESSAGING_FAILED 错误。如果目标线程 ID 是当前线程 ID, 将抛出 ERR_WORKER_MESSAGING_SAME_THREAD 错误。
value:<any>
要发送的值。
transferList:<Object[]>
如果  value 中传递了一个或多个 MessagePort 类似对象, 则这些项需要 transferList ,否则将抛出 ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST 。 参见 port.postMessage() 获取更多信息。
timeout:<number>
等待消息传递的时间(毫秒)。 默认为  undefined ,表示永远等待。如果操作超时, 将抛出 ERR_WORKER_MESSAGING_TIMEOUT 错误。
返回: <Promise> 如果消息被目标线程成功处理,则履行的 promise。

向另一个 worker 发送一个值,由其线程 ID 标识。

如果目标线程没有 workerMessage 事件的监听器,则操作将抛出 ERR_WORKER_MESSAGING_FAILED 错误。

如果目标线程在处理 workerMessage 事件时抛出错误,则操作将抛出 ERR_WORKER_MESSAGING_ERRORED 错误。

当目标线程不是当前线程的直接父线程或子线程时,应使用此方法。 如果两个线程是父子关系,请使用 require('node:worker_threads').parentPort.postMessage()worker.postMessage() 让线程通信。

下面的示例展示了 postMessageToThread 的使用:它创建了 10 个嵌套线程, 最后一个将尝试与主线程通信。

import process from 'node:process';
import {
  postMessageToThread,
  threadId,
  workerData,
  Worker,
} from 'node:worker_threads';

const channel = new BroadcastChannel('sync');
const level = workerData?.level ?? 0;

if (level < 10) {
  const worker = new Worker(new URL(import.meta.url), {
    workerData: { level: level + 1 },
  });
}

if (level === 0) {
  process.on('workerMessage', (value, source) => {
    console.log(`${source} -> ${threadId}:`, value);
    postMessageToThread(source, { message: 'pong' });
  });
} else if (level === 10) {
  process.on('workerMessage', (value, source) => {
    console.log(`${source} -> ${threadId}:`, value);
    channel.postMessage('done');
    channel.close();
  });

  await postMessageToThread(0, { message: 'ping' });
}

channel.onmessage = channel.close;
worker_threads.receiveMessageOnPort(port): void
Attributes

从给定的 MessagePort 接收单条消息。如果没有可用消息, 则返回 undefined,否则返回一个包含单个 message 属性的对象, 该属性包含消息负载,对应于 MessagePort 队列中最旧的消息。

import { MessageChannel, receiveMessageOnPort } from 'node:worker_threads';
const { port1, port2 } = new MessageChannel();
port1.postMessage({ hello: 'world' });

console.log(receiveMessageOnPort(port2));
// 打印:{ message: { hello: 'world' } }
console.log(receiveMessageOnPort(port2));
// 打印:undefined

使用此函数时,不会发出 'message' 事件,也不会调用 onmessage 监听器。

P

worker_threads.resourceLimits

History

提供此 Worker 线程内 JS 引擎资源约束的集合。 如果将 resourceLimits 选项传递给 Worker 构造函数, 则与此值匹配。

如果在主线程中使用此选项,其值为一个空对象。

P

worker_threads.SHARE_ENV

History

一个特殊值,可作为 Worker 构造函数的 env 选项传递, 以指示当前线程和 Worker 线程应共享对同一组环境变量的读写访问权限。

import process from 'node:process';
import { Worker, SHARE_ENV } from 'node:worker_threads';
new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
  .once('exit', () => {
    console.log(process.env.SET_IN_WORKER);  // 打印 'foo'。
  });
M

worker_threads.setEnvironmentData

History
worker_threads.setEnvironmentData(key, value?): void
Attributes
key:<any>
任何可用作 <Map> 键的任意可克隆 JavaScript 值。
value:<any>
任何将被克隆并自动传递给所有新  Worker 实例的任意可克隆 JavaScript 值。如果 value 传递 为 undefined ,则之前为 key 设置的任何值将被删除。

worker.setEnvironmentData() API 设置当前线程和从当前上下文 生成的所有新 Worker 实例中 worker.getEnvironmentData() 的内容。

P

worker_threads.threadId

History

当前线程的整数标识符。在相应的 worker 对象上 (如果有的话),它可作为 worker.threadId 使用。 此值在单个进程内的每个 Worker 实例中都是唯一的。

P

worker_threads.threadName

History
Attributes

当前线程的字符串标识符,如果线程未运行则为 null。 在相应的 worker 对象上(如果有的话),它可作为 worker.threadName 使用。

P

worker_threads.workerData

History

任意 JavaScript 值,包含传递给此线程的 Worker 构造函数的数据的克隆。

数据的克隆方式如同使用 postMessage(), 根据 HTML 结构化克隆算法

import { Worker, isMainThread, workerData } from 'node:worker_threads';

if (isMainThread) {
  const worker = new Worker(new URL(import.meta.url), { workerData: 'Hello, world!' });
} else {
  console.log(workerData);  // 打印 'Hello, world!'。
}
P

worker_threads.locks

History

稳定性:1 - 实验性

Attributes

一个 LockManager 实例,可用于协调访问同一进程内多个线程之间可能共享的资源。该 API 镜像了 浏览器 LockManager 的语义。

类:Lock

History

Lock 接口提供有关通过 locks.request() 授予的锁的信息。

P

lock.name

History
Attributes

锁的名称。

P

lock.mode

History
Attributes

锁的模式。sharedexclusive

类:LockManager

History

LockManager 接口提供请求和内省锁的方法。要获取 LockManager 实例,请使用

此实现与 浏览器 LockManager API 匹配。

M

locks.request

History
locks.request(name, options?, callback): void
Attributes
options:<Object>
'exclusive''shared'默认值: 'exclusive'
ifAvailable:<boolean>
如果为  true ,则仅当锁未被持有时才授予请求。如果无法授予, callback 将 使用 null 调用,而不是 Lock 实例。 默认值: false
steal:<boolean>
如果为  true ,则释放任何具有相同名称的现有锁,并立即授予请求,抢占任何排队的 请求。 默认值: false
可用于中止 待处理(但尚未授予)锁请求的信号。
callback:<Function>
一旦锁被授予则调用(如果  ifAvailabletrue 且锁不可用,则立即使用 null 调用)。当函数返回时,锁会自动释放,或者——如果函数返回 一个 promise——当该 promise 解决时。
返回: <Promise> 一旦锁被释放则解决。
import { locks } from 'node:worker_threads';

await locks.request('my_resource', async (lock) => {
  // 锁已获取。
});
// 锁在此处已释放。
M

locks.query

History
locks.query(): void

解决为一个 LockManagerSnapshot,描述当前进程当前持有和待处理的锁。

import { locks } from 'node:worker_threads';

const snapshot = await locks.query();
for (const lock of snapshot.held) {
  console.log(`held lock: name ${lock.name}, mode ${lock.mode}`);
}
for (const pending of snapshot.pending) {
  console.log(`pending lock: name ${pending.name}, mode ${pending.mode}`);
}

BroadcastChannel 的实例允许与所有绑定到相同通道名称的其他 BroadcastChannel 实例进行异步一对多通信。

import {
  isMainThread,
  BroadcastChannel,
  Worker,
} from 'node:worker_threads';

const bc = new BroadcastChannel('hello');

if (isMainThread) {
  let c = 0;
  bc.onmessage = (event) => {
    console.log(event.data);
    if (++c === 10) bc.close();
  };
  for (let n = 0; n < 10; n++)
    new Worker(new URL(import.meta.url));
} else {
  bc.postMessage('hello from every worker');
  bc.close();
}
C

BroadcastChannel Constructor

History
new BroadcastChannel(name): void
Attributes
name:<any>
要连接的通道名称。允许使用任何可以使用  `${name}` 转换为字符串的 JavaScript 值。
M

broadcastChannel.close

History
broadcastChannel.close(): void

关闭 BroadcastChannel 连接。

P

broadcastChannel.onmessage

History
  • 类型:<Function> 当收到消息时,使用单个 MessageEvent 参数调用。
P

broadcastChannel.onmessageerror

History
  • 类型:<Function> 当收到的消息无法被反序列化时调用。
M

broadcastChannel.postMessage

History
broadcastChannel.postMessage(message): void
Attributes
message:<any>
任何可克隆的 JavaScript 值。
M

broadcastChannel.ref

History
broadcastChannel.ref(): void

unref() 的反义词。对之前 unref() 过的 BroadcastChannel 调用 ref() 并不会让程序退出,如果它是唯一留下的活动句柄(默认行为)。如果端口已被 ref(),再次调用 ref() 无效。

M

broadcastChannel.unref

History
broadcastChannel.unref(): void

在 BroadcastChannel 上调用 unref() 允许线程退出,如果这是事件系统中唯一活动的句柄。如果 BroadcastChannel 已经 unref(),再次调用 unref() 无效。

类:MessageChannel

History

worker.MessageChannel 类的实例表示一个异步的双向通信通道。 MessageChannel 没有自己的方法。new MessageChannel() 生成一个带有 port1port2 属性的对象,这些属性引用链接的 MessagePort 实例。

import { MessageChannel } from 'node:worker_threads';

const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({ foo: 'bar' });
// 打印:received { foo: 'bar' } 来自 `port1.on('message')` 监听器

worker.MessagePort 类的实例表示异步双向通信通道的一端。它可用于在不同的 Worker 之间传输结构化数据、内存区域和其他 MessagePort

此实现与 浏览器 MessagePort 匹配。

事件:'close'

History

当通道的任意一侧断开连接时,会发出 'close' 事件。

import { MessageChannel } from 'node:worker_threads';
const { port1, port2 } = new MessageChannel();

// 打印:
//   foobar
//   closed!
port2.on('message', (message) => console.log(message));
port2.once('close', () => console.log('closed!'));

port1.postMessage('foobar');
port1.close();

事件:'message'

History
Attributes
value:<any>
传输的值

对于任何传入的消息,都会发出 'message' 事件,包含 port.postMessage() 的克隆输入。

此事件的监听器接收传递给 postMessage()value 参数的克隆,没有进一步的参数。

事件:'messageerror'

History
Attributes
error:<Error>
一个 Error 对象

当反序列化消息失败时,会发出 'messageerror' 事件。

目前,当在接收端实例化发布的 JS 对象发生错误时,会发出此事件。这种情况很少见,但可能会发生,例如,当在 vm.Context 中接收到某些 Node.js API 对象时(目前在该上下文中 Node.js API 不可用)。

M

port.close

History
port.close(): void

禁用连接任一侧的进一步消息发送。 当不再通过此 MessagePort 进行通信时,可以调用此方法。

'close' 事件 会在属于该通道的两个 MessagePort 实例上发出。

port.postMessage(value, transferList?): void
Attributes
value:<any>
transferList:<Object[]>

将 JavaScript 值发送到此通道的接收端。 value 以与 HTML 结构化克隆算法 兼容的方式进行传输。

特别是,与 JSON 的显著区别是:

  • value 可能包含循环引用。
  • value 可能包含内置 JS 类型的实例,例如 RegExpBigIntMapSet 等。
  • value 可能包含类型化数组,使用 ArrayBufferSharedArrayBuffer
  • value 可能包含 WebAssembly.Module 实例。
  • value 可能不包含原生(C++ 支持)对象,除了:
    Attributes
    {FileHandle},
    {Histogram},
    {KeyObject},
    {X509Certificate}。
import { MessageChannel } from 'node:worker_threads';
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const circularData = {};
circularData.foo = circularData;
// 打印:{ foo: [Circular] }
port2.postMessage(circularData);

transferList 可以是 <ArrayBuffer>MessagePortFileHandle 对象的列表。 传输后,它们在通道的发送端不再可用(即使它们不包含在 value 中)。与 子进程 不同,目前不支持传输网络套接字等句柄。

如果 value 包含 <SharedArrayBuffer> 实例,则它们可从任一线程访问。它们不能列在 transferList 中。

value 可能仍然包含不在 transferList 中的 ArrayBuffer 实例;在这种情况下,底层内存会被复制而不是移动。

import { MessageChannel } from 'node:worker_threads';
const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => console.log(message));

const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]);
// 这会发布 `uint8Array` 的副本:
port2.postMessage(uint8Array);
// 这不会复制数据,但会使 `uint8Array` 不可用:
port2.postMessage(uint8Array, [ uint8Array.buffer ]);

// `sharedUint8Array` 的内存可从原始副本和 `.on('message')` 接收的副本中访问:
const sharedUint8Array = new Uint8Array(new SharedArrayBuffer(4));
port2.postMessage(sharedUint8Array);

// 这将新创建的消息端口传输给接收者。
// 例如,这可用于在作为同一父线程子线程的多个 `Worker` 线程之间创建通信通道。
const otherChannel = new MessageChannel();
port2.postMessage({ port: otherChannel.port1 }, [ otherChannel.port1 ]);

消息对象会立即被克隆,并且可以在发布后修改而不会产生副作用。

有关此 API 背后的序列化和反序列化机制的更多信息,请参阅 node:v8 模块的序列化 API

所有 <TypedArray> 实例都是底层 <ArrayBuffer> 的视图。也就是说,实际上是 ArrayBuffer 存储原始数据,而 TypedArrayBuffer 对象提供了一种查看和操作数据的方式。为同一个 ArrayBuffer 实例创建多个视图是可能且常见的。在使用传输列表传输 ArrayBuffer 时必须非常小心,因为这样做会导致共享该 ArrayBuffer 的所有 TypedArrayBuffer 实例变得不可用。

const ab = new ArrayBuffer(10);

const u1 = new Uint8Array(ab);
const u2 = new Uint16Array(ab);

console.log(u2.length);  // 打印 5

port.postMessage(u1, [u1.buffer]);

console.log(u2.length);  // 打印 0

具体对于 Buffer 实例,底层的 ArrayBuffer 是可以传输还是克隆完全取决于实例是如何创建的,这通常无法可靠地确定。

可以使用 markAsUntransferable() 标记 ArrayBuffer,以指示它应该始终被克隆而从不被传输。

根据 Buffer 实例的创建方式,它可能拥有也可能不拥有其底层的 ArrayBuffer。除非已知 Buffer 实例拥有 ArrayBuffer,否则不得传输 ArrayBuffer。特别是,对于从内部 Buffer 池创建的 Buffer(例如使用 Buffer.from()Buffer.allocUnsafe()),无法传输它们,它们总是被克隆,这会发送整个 Buffer 池的副本。这种行为可能会带来意外的高内存使用和可能的安全隐患。

有关 Buffer 池化的更多详细信息,请参阅 Buffer.allocUnsafe()

使用 Buffer.alloc()Buffer.allocUnsafeSlow() 创建的 Buffer 实例的 ArrayBuffer 始终可以传输,但这样做会使这些 ArrayBuffer 的所有其他现有视图不可用。

因为对象克隆使用 HTML 结构化克隆算法,所以不可枚举的属性、属性访问器和对象原型不会被保留。特别是,{Buffer} 对象在接收端将被读取为普通的 <Uint8Array>,JavaScript 类的实例将被克隆为普通的 JavaScript 对象。

const b = Symbol('b');

class Foo {
  #a = 1;
  constructor() {
    this[b] = 2;
    this.c = 3;
  }

  get d() { return 4; }
}

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => console.log(data);

port2.postMessage(new Foo());

// 打印:{ c: 3 }

此限制扩展到许多内置对象,例如全局 URL 对象:

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => console.log(data);

port2.postMessage(new URL('https://example.org'));

// 打印:{ }
M

port.hasRef

History
port.hasRef(): void

如果为 true,MessagePort 对象将保持 Node.js 事件循环活动。

M

port.ref

History
port.ref(): void

unref() 的反义词。对之前 unref() 过的端口调用 ref() 并不会让程序退出,如果它是唯一留下的活动句柄(默认行为)。如果端口已被 ref(),再次调用 ref() 无效。

如果使用 .on('message') 附加或移除监听器,端口会根据是否存在事件监听器自动 ref()unref()

M

port.start

History
port.start(): void

开始在此 MessagePort 上接收消息。当将此端口用作事件发射器时,一旦附加了 'message' 监听器,就会自动调用此方法。

此方法存在是为了与 Web MessagePort API 保持一致。在 Node.js 中,它仅用于在没有事件监听器时忽略消息。Node.js 在处理 .onmessage 方面也有所不同。设置它会自动调用 .start(),但取消设置它会让消息排队,直到设置新的处理程序或端口被丢弃。

M

port.unref

History
port.unref(): void

在端口上调用 unref() 允许线程退出,如果这是事件系统中唯一的活动句柄。如果端口已经 unref(),再次调用 unref() 无效。

如果使用 .on('message') 附加或移除监听器,端口会根据是否存在事件监听器自动 ref()unref()

类:Worker

History
  • 继承:{EventEmitter}

Worker 类代表一个独立的 JavaScript 执行线程。 大多数 Node.js API 在其中可用。

Worker 环境内的显著差异包括:

可以在其他 Worker 内部创建 Worker 实例。

Web Workersnode:cluster 模块 一样,双向通信 可以通过线程间消息传递实现。在内部,Worker 拥有一对内置的 MessagePort, 它们在 Worker 创建时已经相互关联。虽然父端 的 MessagePort 对象没有直接暴露,但其功能通过 父线程 Worker 对象上的 worker.postMessage()worker.on('message') 事件 暴露。

要创建自定义消息通道(鼓励使用自定义通道而不是默认 全局通道,因为它有助于关注点分离),用户可以在任一线程上创建 一个 MessageChannel 对象,并通过 预先存在的通道(例如全局通道)将该 MessageChannel 上的 其中一个 MessagePort 传递给另一个线程。

参见 port.postMessage() 以获取有关消息如何传递的更多信息, 以及哪些类型的 JavaScript 值可以成功通过 线程屏障传输。

import assert from 'node:assert';
import {
  Worker, MessageChannel, MessagePort, isMainThread, parentPort,
} from 'node:worker_threads';
if (isMainThread) {
  const worker = new Worker(new URL(import.meta.url));
  const subChannel = new MessageChannel();
  worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
  subChannel.port2.on('message', (value) => {
    console.log('received:', value);
  });
} else {
  parentPort.once('message', (value) => {
    assert(value.hereIsYourPort instanceof MessagePort);
    value.hereIsYourPort.postMessage('the worker is sending this');
    value.hereIsYourPort.close();
  });
}
new Worker(filename, options?): void
Attributes
filename:<string> | <URL>
Worker 主脚本或模块的路径。必须是 绝对路径或相对路径(即相对于当前 工作目录),以  ./../ 开头,或者是使用 file:data: 协议的 WHATWG URL 对象。 当使用 data: URL 时,数据根据 MIME 类型使用 ECMAScript 模块加载器 进行解释。 如果 options.evaltrue ,则这是一个包含 JavaScript 代码 的字符串,而不是路径。
options:<Object>
argv:<any[]>
将被字符串化并追加到 worker 中的  process.argv 的参数列表。这与 workerData 非常相似,但这些值在全局 process.argv 上可用,就像它们 作为 CLI 选项传递给脚本一样。
如果设置,指定 Worker 线程内部  process.env 的初始值。作为一个特殊值, worker.SHARE_ENV 可用于 指定父线程和子线程应共享它们的 环境变量;在这种情况下,对一个线程的 process.env 对象的更改也会影响另一个线程。 默认: process.env
如果为  true 且第一个参数是 string ,则将 构造函数的第一个参数解释为一旦 worker 上线就执行的脚本。
execArgv:<string[]>
传递给 worker 的 node CLI 选项列表。 V8 选项(如  --max-old-space-size )和影响 进程的选项(如 --title )不受支持。如果设置,这将作为 worker 内部的 process.execArgv 提供。默认情况下,选项 从父线程继承。
stdin:<boolean>
如果设置为  true ,则 worker.stdin 提供一个可写流,其内容在 Worker 内部显示为 process.stdin 。默认情况下,不提供数据。
stdout:<boolean>
如果设置为  true ,则 worker.stdout 不会自动管道传输到父级的 process.stdout
stderr:<boolean>
如果设置为  true ,则 worker.stderr 不会自动管道传输到父级的 process.stderr
workerData:<any>
任何被克隆并作为  require('node:worker_threads').workerData 可用的 JavaScript 值。克隆 如 HTML 结构化克隆算法 中所述,如果对象无法被克隆(例如因为它包含 function ),则会抛出错误。
trackUnmanagedFds:<boolean>
如果设置为  true ,则 Worker 跟踪通过 fs.open()fs.close() 管理的原始文件描述符,并在 Worker 退出时关闭它们,类似于通过 FileHandle API 管理的网络套接字或文件描述符等其他 资源。此选项会自动被所有 嵌套 Worker 继承。 默认: true
transferList:<Object[]>
如果一个或多个  MessagePort 类对象 在 workerData 中传递,则这些 项需要 transferList ,否则将抛出 ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST 。 参见 port.postMessage() 以获取更多信息。
resourceLimits:<Object>
新 JS 引擎实例的一组可选资源限制。达到这些限制会导致  Worker 实例终止。这些限制仅影响 JS 引擎,不影响外部数据, 包括 ArrayBuffer 。即使设置了这些限制,如果进程遇到全局内存不足情况, 仍可能中止。
maxOldGenerationSizeMb:<number>
主堆的最大大小,单位为 MB。如果设置了命令行参数  --max-old-space-size ,它将 覆盖此设置。
maxYoungGenerationSizeMb:<number>
最近创建的对象的堆空间的最大大小。如果设置了命令行参数  --max-semi-space-size ,它将覆盖此设置。
codeRangeSizeMb:<number>
用于生成代码的预分配内存范围的大小。
stackSizeMb:<number>
线程的默认最大栈大小。 小值可能导致 Worker 实例不可用。 默认: 4
一个可选的  name ,用于替换线程名称 和工作线程标题,以便调试/识别, 使最终标题为 [worker ${id}] ${name} 。 此参数有一个最大允许大小,取决于操作 系统。如果提供的名称超过限制,它将被截断。

事件:'error'

History
Attributes
err:<any>

如果工作线程抛出未捕获的异常,则会发出 'error' 事件。在这种情况下,worker 将被终止。

事件:'exit'

History
Attributes
exitCode:<integer>

一旦 worker 停止,就会发出 'exit' 事件。如果 worker 通过调用 process.exit() 退出,exitCode 参数是 传递的退出代码。如果 worker 被终止,exitCode 参数是 1

这是任何 Worker 实例发出的最后一个事件。

事件:'message'

History
Attributes
value:<any>
传输的值

当工作线程调用了 require('node:worker_threads').parentPort.postMessage() 时,会发出 'message' 事件。 参见 port.on('message') 事件以获取更多详情。

所有从工作线程发送的消息都在 Worker 对象上发出 'exit' 事件 之前发出。

事件:'messageerror'

History
Attributes
error:<Error>
一个 Error 对象

当反序列化消息失败时,会发出 'messageerror' 事件。

事件:'online'

History

当工作线程开始执行 JavaScript 代码时,会发出 'online' 事件。

M

worker.cpuUsage

History
worker.cpuUsage(prev?): void

此方法返回一个 Promise,该 Promise 将解析为与 process.threadCpuUsage() 相同的对象, 如果 worker 不再运行,则拒绝并抛出 ERR_WORKER_NOT_RUNNING 错误。 此方法允许从实际线程外部观察统计信息。

M

worker.getHeapSnapshot

History
worker.getHeapSnapshot(options?): void
Attributes
options:<Object>
exposeInternals:<boolean>
如果为 true,则在堆快照中暴露内部信息。  默认: false
exposeNumericValues:<boolean>
如果为 true,则在 人工字段中暴露数值。 默认: false
返回: <Promise> 一个包含 V8 堆快照的可读流的 Promise

返回一个可读流,用于获取 Worker 当前状态的 V8 快照。 参见 v8.getHeapSnapshot() 以获取更多详情。

如果 Worker 线程不再运行(这可能发生在 'exit' 事件 发出之前),返回的 Promise 将立即 被 ERR_WORKER_NOT_RUNNING 错误拒绝。

M

worker.getHeapStatistics

History
worker.getHeapStatistics(): void

此方法返回一个 Promise,该 Promise 将解析为与 v8.getHeapStatistics() 相同的对象, 如果 worker 不再运行,则拒绝并抛出 ERR_WORKER_NOT_RUNNING 错误。 此方法允许从实际线程外部观察统计信息。

P

worker.performance

History

一个可用于查询 worker 实例性能信息的对象。

M

performance.eventLoopUtilization

History
performance.eventLoopUtilization(utilization1?, utilization2?): void
Attributes
utilization1:<Object>
之前调用  eventLoopUtilization() 的结果。
utilization2:<Object>
在  utilization1 之前调用 eventLoopUtilization() 的结果。
返回: <Object>
active:<number>
utilization:<number>

perf_hooks eventLoopUtilization() 相同的调用,但返回的是 worker 实例的值。

一个区别是,与主线程不同,worker 内的引导是在 事件循环内完成的。因此,一旦 worker 的脚本开始执行, 事件循环利用率立即可用。

idle 时间不增加并不表示 worker 卡在引导过程中。以下示例显示 worker 的整个 生命周期从未积累任何 idle 时间,但仍能够处理 消息。

import { Worker, isMainThread, parentPort } from 'node:worker_threads';

if (isMainThread) {
  const worker = new Worker(new URL(import.meta.url));
  setInterval(() => {
    worker.postMessage('hi');
    console.log(worker.performance.eventLoopUtilization());
  }, 100).unref();
} else {
  parentPort.on('message', () => console.log('msg')).unref();
  (function r(n) {
    if (--n < 0) return;
    const t = Date.now();
    while (Date.now() - t < 300);
    setImmediate(r, n);
  })(10);
}

Worker 的事件循环利用率仅在发出 'online' 事件 后可用,如果在此之前调用,或在 'exit' 事件 之后调用,则所有属性的值为 0

M

worker.postMessage

History
worker.postMessage(value, transferList?): void
Attributes
value:<any>
transferList:<Object[]>

发送消息给 worker,该消息通过 require('node:worker_threads').parentPort.on('message') 接收。 参见 port.postMessage() 以获取更多详情。

M

worker.ref

History
worker.ref(): void

unref() 的反操作,对之前 unref() 过的 worker 调用 ref() 不会 让程序退出,如果它是唯一活动的句柄(默认 行为)。如果 worker 是 ref() 过的,再次调用 ref() 没有效果。

P

worker.resourceLimits

History

提供此 Worker 线程的 JS 引擎资源约束集。 如果将 resourceLimits 选项传递给了 Worker 构造函数, 则与此值匹配。

如果 worker 已停止,返回值为一个空对象。

M

worker.startCpuProfile

History
worker.startCpuProfile(): void

启动 CPU 性能分析,然后返回一个 Promise,该 Promise 兑现为一个错误 或一个 CPUProfileHandle 对象。此 API 支持 await using 语法。

const { Worker } = require('node:worker_threads');

const worker = new Worker(`
  const { parentPort } = require('worker_threads');
  parentPort.on('message', () => {});
  `, { eval: true });

worker.on('online', async () => {
  const handle = await worker.startCpuProfile();
  const profile = await handle.stop();
  console.log(profile);
  worker.terminate();
});

await using 示例。

const { Worker } = require('node:worker_threads');

const w = new Worker(`
  const { parentPort } = require('node:worker_threads');
  parentPort.on('message', () => {});
  `, { eval: true });

w.on('online', async () => {
  // 返回时自动停止性能分析,且性能分析将被丢弃
  await using handle = await w.startCpuProfile();
});
M

worker.startHeapProfile

History
worker.startHeapProfile(): void

启动堆性能分析,然后返回一个 Promise,该 Promise 兑现为一个错误 或一个 HeapProfileHandle 对象。此 API 支持 await using 语法。

const { Worker } = require('node:worker_threads');

const worker = new Worker(`
  const { parentPort } = require('worker_threads');
  parentPort.on('message', () => {});
  `, { eval: true });

worker.on('online', async () => {
  const handle = await worker.startHeapProfile();
  const profile = await handle.stop();
  console.log(profile);
  worker.terminate();
});

await using 示例。

const { Worker } = require('node:worker_threads');

const w = new Worker(`
  const { parentPort } = require('node:worker_threads');
  parentPort.on('message', () => {});
  `, { eval: true });

w.on('online', async () => {
  // 返回时自动停止性能分析,且性能分析将被丢弃
  await using handle = await w.startHeapProfile();
});
P

worker.stderr

History

这是一个可读流,包含在 worker 线程内写入 process.stderr 的数据。如果未将 stderr: true 传递给 Worker 构造函数,则数据会管道传输到父线程的 process.stderr 流。

P

worker.stdin

History

如果将 stdin: true 传递给了 Worker 构造函数,这是一个 可写流。写入此流的数据将在 worker 线程中作为 process.stdin 可用。

P

worker.stdout

History

这是一个可读流,包含在 worker 线程内写入 process.stdout 的数据。如果未将 stdout: true 传递给 Worker 构造函数,则数据会管道传输到父线程的 process.stdout 流。

worker.terminate(): void

尽快停止 worker 线程中的所有 JavaScript 执行。 返回一个 Promise,用于获取退出代码,当 'exit' 事件 发出时兑现。

P

worker.threadId

History

引用线程的整数标识符。在 worker 线程内部, 它作为 require('node:worker_threads').threadId 可用。 此值在单个进程内的每个 Worker 实例中是唯一的。

P

worker.threadName

History
Attributes

引用线程的字符串标识符,如果线程未运行则为 null。 在 worker 线程内部,它作为 require('node:worker_threads').threadName 可用。

M

worker.unref

History
worker.unref(): void

对 worker 调用 unref() 允许线程退出,如果这是事件系统中唯一 活动的句柄。如果 worker 已经 unref() 过,再次调用 unref() 没有效果。

M

worker[Symbol.asyncDispose]

History
worker[Symbol.asyncDispose](): void

当处置作用域退出时调用 worker.terminate()

async function example() {
  await using worker = new Worker('for (;;) {}', { eval: true });
  // 当作用域退出时,Worker 会自动终止。
}

Worker 通过 <MessagePort> 利用消息传递来实现与 stdio 的交互。这意味着源自 Workerstdio 输出可能会被接收端阻塞 Node.js 事件循环的同步代码所阻塞。

import {
  Worker,
  isMainThread,
} from 'node:worker_threads';

if (isMainThread) {
  new Worker(new URL(import.meta.url));
  for (let n = 0; n < 1e10; n++) {
    // 循环以模拟工作。
  }
} else {
  // 此输出将被主线程中的 for 循环阻塞。
  console.log('foo');
}

从预加载脚本(使用 -r 命令行标志加载和运行的脚本)启动工作线程时要小心。除非显式设置了 execArgv 选项,否则新的 Worker 线程会自动继承正在运行的进程的命令行标志,并将预加载与主线程相同的预加载脚本。如果预加载脚本无条件地启动工作线程,则每个生成的线程都会再生成另一个线程,直到应用程序崩溃。