1
0

worker_pool.js 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. //https://nodejs.org/api/async_context.html#using-asyncresource-for-a-worker-thread-pool
  2. import { AsyncResource } from 'node:async_hooks';
  3. import { EventEmitter } from 'node:events';
  4. import path from 'node:path';
  5. import { Worker } from 'node:worker_threads';
  6. const __dirname = path.resolve();
  7. const kTaskInfo = Symbol('kTaskInfo');
  8. const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
  9. class WorkerPoolTaskInfo extends AsyncResource {
  10. constructor(callback) {
  11. super('WorkerPoolTaskInfo');
  12. this.callback = callback;
  13. }
  14. done(err, result) {
  15. this.runInAsyncScope(this.callback, null, err, result);
  16. this.emitDestroy(); // `TaskInfo`s are used only once.
  17. }
  18. }
  19. class WorkerPool extends EventEmitter {
  20. constructor(numThreads) {
  21. super();
  22. this.numThreads = numThreads;
  23. this.workers = [];
  24. this.freeWorkers = [];
  25. this.tasks = [];
  26. for (let i = 0; i < numThreads; i++)
  27. this.addNewWorker();
  28. // Any time the kWorkerFreedEvent is emitted, dispatch
  29. // the next task pending in the queue, if any.
  30. this.on(kWorkerFreedEvent, () => {
  31. if (this.tasks.length > 0) {
  32. const { task, callback } = this.tasks.shift();
  33. this.runTask(task, callback);
  34. }
  35. });
  36. }
  37. addNewWorker() {
  38. const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
  39. worker.on('message', (result) => {
  40. // In case of success: Call the callback that was passed to `runTask`,
  41. // remove the `TaskInfo` associated with the Worker, and mark it as free
  42. // again.
  43. worker[kTaskInfo].done(null, result);
  44. worker[kTaskInfo] = null;
  45. this.freeWorkers.push(worker);
  46. this.emit(kWorkerFreedEvent);
  47. });
  48. worker.on('error', (err) => {
  49. // In case of an uncaught exception: Call the callback that was passed to
  50. // `runTask` with the error.
  51. if (worker[kTaskInfo])
  52. worker[kTaskInfo].done(err, null);
  53. else
  54. this.emit('error', err);
  55. // Remove the worker from the list and start a new Worker to replace the
  56. // current one.
  57. this.workers.splice(this.workers.indexOf(worker), 1);
  58. this.addNewWorker();
  59. });
  60. this.workers.push(worker);
  61. this.freeWorkers.push(worker);
  62. this.emit(kWorkerFreedEvent);
  63. }
  64. runTask(task, callback) {
  65. if (this.freeWorkers.length === 0) {
  66. // No free threads, wait until a worker thread becomes free.
  67. this.tasks.push({ task, callback });
  68. return;
  69. }
  70. const worker = this.freeWorkers.pop();
  71. worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
  72. worker.postMessage(task);
  73. }
  74. close() {
  75. for (const worker of this.workers) worker.terminate();
  76. }
  77. }
  78. export default WorkerPool;