processAsyncTree.js 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. /*
  2. MIT License http://www.opensource.org/licenses/mit-license.php
  3. Author Tobias Koppers @sokra
  4. */
  5. "use strict";
  6. /**
  7. * Walks a dynamically expanding async work tree with bounded concurrency.
  8. * Each processed item may enqueue more items through `push`, allowing callers
  9. * to model breadth-first or depth-first discovery without managing the queue
  10. * themselves.
  11. * @template T
  12. * @template {Error} E
  13. * @param {Iterable<T>} items initial items
  14. * @param {number} concurrency number of items running in parallel
  15. * @param {(item: T, push: (item: T) => void, callback: (err?: E) => void) => void} processor worker which pushes more items
  16. * @param {(err?: E) => void} callback all items processed
  17. * @returns {void}
  18. */
  19. const processAsyncTree = (items, concurrency, processor, callback) => {
  20. const queue = [...items];
  21. if (queue.length === 0) return callback();
  22. let processing = 0;
  23. let finished = false;
  24. let processScheduled = true;
  25. /**
  26. * Enqueues a newly discovered item and schedules queue processing when the
  27. * current concurrency budget allows more work to start.
  28. * @param {T} item item
  29. */
  30. const push = (item) => {
  31. queue.push(item);
  32. if (!processScheduled && processing < concurrency) {
  33. processScheduled = true;
  34. process.nextTick(processQueue);
  35. }
  36. };
  37. /**
  38. * Handles completion of a single processor call, propagating the first
  39. * error and scheduling more queued work when possible.
  40. * @param {E | null | undefined} err error
  41. */
  42. const processorCallback = (err) => {
  43. processing--;
  44. if (err && !finished) {
  45. finished = true;
  46. callback(err);
  47. return;
  48. }
  49. if (!processScheduled) {
  50. processScheduled = true;
  51. process.nextTick(processQueue);
  52. }
  53. };
  54. const processQueue = () => {
  55. if (finished) return;
  56. while (processing < concurrency && queue.length > 0) {
  57. processing++;
  58. const item = /** @type {T} */ (queue.pop());
  59. processor(item, push, processorCallback);
  60. }
  61. processScheduled = false;
  62. if (queue.length === 0 && processing === 0 && !finished) {
  63. finished = true;
  64. callback();
  65. }
  66. };
  67. processQueue();
  68. };
  69. module.exports = processAsyncTree;