AsyncQueue.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. /*
  2. MIT License http://www.opensource.org/licenses/mit-license.php
  3. Author Tobias Koppers @sokra
  4. */
  5. "use strict";
  6. const { AsyncSeriesHook, SyncHook } = require("tapable");
  7. const { makeWebpackError } = require("../HookWebpackError");
  8. const WebpackError = require("../WebpackError");
  9. const ArrayQueue = require("./ArrayQueue");
  10. const QUEUED_STATE = 0;
  11. const PROCESSING_STATE = 1;
  12. const DONE_STATE = 2;
  13. let inHandleResult = 0;
  14. /**
  15. * Defines the callback callback.
  16. * @template T
  17. * @callback Callback
  18. * @param {(WebpackError | null)=} err
  19. * @param {(T | null)=} result
  20. * @returns {void}
  21. */
  22. /**
  23. * Represents AsyncQueueEntry.
  24. * @template T
  25. * @template K
  26. * @template R
  27. */
  28. class AsyncQueueEntry {
  29. /**
  30. * Creates an instance of AsyncQueueEntry.
  31. * @param {T} item the item
  32. * @param {Callback<R>} callback the callback
  33. */
  34. constructor(item, callback) {
  35. this.item = item;
  36. /** @type {typeof QUEUED_STATE | typeof PROCESSING_STATE | typeof DONE_STATE} */
  37. this.state = QUEUED_STATE;
  38. /** @type {Callback<R> | undefined} */
  39. this.callback = callback;
  40. /** @type {Callback<R>[] | undefined} */
  41. this.callbacks = undefined;
  42. /** @type {R | null | undefined} */
  43. this.result = undefined;
  44. /** @type {WebpackError | null | undefined} */
  45. this.error = undefined;
  46. }
  47. }
  48. /**
  49. * Defines the get key type used by this module.
  50. * @template T, K
  51. * @typedef {(item: T) => K} getKey
  52. */
  53. /**
  54. * Defines the processor type used by this module.
  55. * @template T, R
  56. * @typedef {(item: T, callback: Callback<R>) => void} Processor
  57. */
  58. /**
  59. * Represents AsyncQueue.
  60. * @template T
  61. * @template K
  62. * @template R
  63. */
  64. class AsyncQueue {
  65. /**
  66. * Creates an instance of AsyncQueue.
  67. * @param {object} options options object
  68. * @param {string=} options.name name of the queue
  69. * @param {number=} options.parallelism how many items should be processed at once
  70. * @param {string=} options.context context of execution
  71. * @param {AsyncQueue<EXPECTED_ANY, EXPECTED_ANY, EXPECTED_ANY>=} options.parent parent queue, which will have priority over this queue and with shared parallelism
  72. * @param {getKey<T, K>=} options.getKey extract key from item
  73. * @param {Processor<T, R>} options.processor async function to process items
  74. */
  75. constructor({ name, context, parallelism, parent, processor, getKey }) {
  76. this._name = name;
  77. this._context = context || "normal";
  78. this._parallelism = parallelism || 1;
  79. this._processor = processor;
  80. this._getKey =
  81. getKey ||
  82. /** @type {getKey<T, K>} */ ((item) => /** @type {T & K} */ (item));
  83. /** @type {Map<K, AsyncQueueEntry<T, K, R>>} */
  84. this._entries = new Map();
  85. /** @type {ArrayQueue<AsyncQueueEntry<T, K, R>>} */
  86. this._queued = new ArrayQueue();
  87. /** @type {AsyncQueue<T, K, R>[] | undefined} */
  88. this._children = undefined;
  89. this._activeTasks = 0;
  90. this._willEnsureProcessing = false;
  91. this._needProcessing = false;
  92. this._stopped = false;
  93. /** @type {AsyncQueue<T, K, R>} */
  94. this._root = parent ? parent._root : this;
  95. if (parent) {
  96. if (this._root._children === undefined) {
  97. this._root._children = [this];
  98. } else {
  99. this._root._children.push(this);
  100. }
  101. }
  102. this.hooks = {
  103. /** @type {AsyncSeriesHook<[T]>} */
  104. beforeAdd: new AsyncSeriesHook(["item"]),
  105. /** @type {SyncHook<[T]>} */
  106. added: new SyncHook(["item"]),
  107. /** @type {AsyncSeriesHook<[T]>} */
  108. beforeStart: new AsyncSeriesHook(["item"]),
  109. /** @type {SyncHook<[T]>} */
  110. started: new SyncHook(["item"]),
  111. /** @type {SyncHook<[T, WebpackError | null | undefined, R | null | undefined]>} */
  112. result: new SyncHook(["item", "error", "result"])
  113. };
  114. this._ensureProcessing = this._ensureProcessing.bind(this);
  115. }
  116. /**
  117. * Returns context of execution.
  118. * @returns {string} context of execution
  119. */
  120. getContext() {
  121. return this._context;
  122. }
  123. /**
  124. * Updates context using the provided value.
  125. * @param {string} value context of execution
  126. */
  127. setContext(value) {
  128. this._context = value;
  129. }
  130. /**
  131. * Processes the provided item.
  132. * @param {T} item an item
  133. * @param {Callback<R>} callback callback function
  134. * @returns {void}
  135. */
  136. add(item, callback) {
  137. if (this._stopped) return callback(new WebpackError("Queue was stopped"));
  138. this.hooks.beforeAdd.callAsync(item, (err) => {
  139. if (err) {
  140. callback(
  141. makeWebpackError(err, `AsyncQueue(${this._name}).hooks.beforeAdd`)
  142. );
  143. return;
  144. }
  145. const key = this._getKey(item);
  146. const entry = this._entries.get(key);
  147. if (entry !== undefined) {
  148. if (entry.state === DONE_STATE) {
  149. if (inHandleResult++ > 3) {
  150. process.nextTick(() => callback(entry.error, entry.result));
  151. } else {
  152. callback(entry.error, entry.result);
  153. }
  154. inHandleResult--;
  155. } else if (entry.callbacks === undefined) {
  156. entry.callbacks = [callback];
  157. } else {
  158. entry.callbacks.push(callback);
  159. }
  160. return;
  161. }
  162. const newEntry = new AsyncQueueEntry(item, callback);
  163. if (this._stopped) {
  164. this.hooks.added.call(item);
  165. this._root._activeTasks++;
  166. process.nextTick(() =>
  167. this._handleResult(newEntry, new WebpackError("Queue was stopped"))
  168. );
  169. } else {
  170. this._entries.set(key, newEntry);
  171. this._queued.enqueue(newEntry);
  172. const root = this._root;
  173. root._needProcessing = true;
  174. if (root._willEnsureProcessing === false) {
  175. root._willEnsureProcessing = true;
  176. setImmediate(root._ensureProcessing);
  177. }
  178. this.hooks.added.call(item);
  179. }
  180. });
  181. }
  182. /**
  183. * Processes the provided item.
  184. * @param {T} item an item
  185. * @returns {void}
  186. */
  187. invalidate(item) {
  188. const key = this._getKey(item);
  189. const entry =
  190. /** @type {AsyncQueueEntry<T, K, R>} */
  191. (this._entries.get(key));
  192. this._entries.delete(key);
  193. if (entry.state === QUEUED_STATE) {
  194. this._queued.delete(entry);
  195. }
  196. }
  197. /**
  198. * Waits for an already started item
  199. * @param {T} item an item
  200. * @param {Callback<R>} callback callback function
  201. * @returns {void}
  202. */
  203. waitFor(item, callback) {
  204. const key = this._getKey(item);
  205. const entry = this._entries.get(key);
  206. if (entry === undefined) {
  207. return callback(
  208. new WebpackError(
  209. "waitFor can only be called for an already started item"
  210. )
  211. );
  212. }
  213. if (entry.state === DONE_STATE) {
  214. process.nextTick(() => callback(entry.error, entry.result));
  215. } else if (entry.callbacks === undefined) {
  216. entry.callbacks = [callback];
  217. } else {
  218. entry.callbacks.push(callback);
  219. }
  220. }
  221. /**
  222. * Describes how this stop operation behaves.
  223. * @returns {void}
  224. */
  225. stop() {
  226. this._stopped = true;
  227. const queue = this._queued;
  228. this._queued = new ArrayQueue();
  229. const root = this._root;
  230. for (const entry of queue) {
  231. this._entries.delete(
  232. this._getKey(/** @type {AsyncQueueEntry<T, K, R>} */ (entry).item)
  233. );
  234. root._activeTasks++;
  235. this._handleResult(
  236. /** @type {AsyncQueueEntry<T, K, R>} */ (entry),
  237. new WebpackError("Queue was stopped")
  238. );
  239. }
  240. }
  241. /**
  242. * Increase parallelism.
  243. * @returns {void}
  244. */
  245. increaseParallelism() {
  246. const root = this._root;
  247. root._parallelism++;
  248. /* istanbul ignore next */
  249. if (root._willEnsureProcessing === false && root._needProcessing) {
  250. root._willEnsureProcessing = true;
  251. setImmediate(root._ensureProcessing);
  252. }
  253. }
  254. /**
  255. * Decrease parallelism.
  256. * @returns {void}
  257. */
  258. decreaseParallelism() {
  259. const root = this._root;
  260. root._parallelism--;
  261. }
  262. /**
  263. * Checks whether this async queue is processing.
  264. * @param {T} item an item
  265. * @returns {boolean} true, if the item is currently being processed
  266. */
  267. isProcessing(item) {
  268. const key = this._getKey(item);
  269. const entry = this._entries.get(key);
  270. return entry !== undefined && entry.state === PROCESSING_STATE;
  271. }
  272. /**
  273. * Checks whether this async queue is queued.
  274. * @param {T} item an item
  275. * @returns {boolean} true, if the item is currently queued
  276. */
  277. isQueued(item) {
  278. const key = this._getKey(item);
  279. const entry = this._entries.get(key);
  280. return entry !== undefined && entry.state === QUEUED_STATE;
  281. }
  282. /**
  283. * Checks whether this async queue is done.
  284. * @param {T} item an item
  285. * @returns {boolean} true, if the item is currently queued
  286. */
  287. isDone(item) {
  288. const key = this._getKey(item);
  289. const entry = this._entries.get(key);
  290. return entry !== undefined && entry.state === DONE_STATE;
  291. }
  292. /**
  293. * Describes how this ensure processing operation behaves.
  294. * @returns {void}
  295. */
  296. _ensureProcessing() {
  297. while (this._activeTasks < this._parallelism) {
  298. const entry = this._queued.dequeue();
  299. if (entry === undefined) break;
  300. this._activeTasks++;
  301. entry.state = PROCESSING_STATE;
  302. this._startProcessing(entry);
  303. }
  304. this._willEnsureProcessing = false;
  305. if (this._queued.length > 0) return;
  306. if (this._children !== undefined) {
  307. for (const child of this._children) {
  308. while (this._activeTasks < this._parallelism) {
  309. const entry = child._queued.dequeue();
  310. if (entry === undefined) break;
  311. this._activeTasks++;
  312. entry.state = PROCESSING_STATE;
  313. child._startProcessing(entry);
  314. }
  315. if (child._queued.length > 0) return;
  316. }
  317. }
  318. if (!this._willEnsureProcessing) this._needProcessing = false;
  319. }
  320. /**
  321. * Processes the provided entry.
  322. * @param {AsyncQueueEntry<T, K, R>} entry the entry
  323. * @returns {void}
  324. */
  325. _startProcessing(entry) {
  326. this.hooks.beforeStart.callAsync(entry.item, (err) => {
  327. if (err) {
  328. this._handleResult(
  329. entry,
  330. makeWebpackError(err, `AsyncQueue(${this._name}).hooks.beforeStart`)
  331. );
  332. return;
  333. }
  334. let inCallback = false;
  335. try {
  336. this._processor(entry.item, (e, r) => {
  337. inCallback = true;
  338. this._handleResult(entry, e, r);
  339. });
  340. } catch (err) {
  341. if (inCallback) throw err;
  342. this._handleResult(entry, /** @type {WebpackError} */ (err), null);
  343. }
  344. this.hooks.started.call(entry.item);
  345. });
  346. }
  347. /**
  348. * Processes the provided entry.
  349. * @param {AsyncQueueEntry<T, K, R>} entry the entry
  350. * @param {(WebpackError | null)=} err error, if any
  351. * @param {(R | null)=} result result, if any
  352. * @returns {void}
  353. */
  354. _handleResult(entry, err, result) {
  355. this.hooks.result.callAsync(entry.item, err, result, (hookError) => {
  356. const error = hookError
  357. ? makeWebpackError(hookError, `AsyncQueue(${this._name}).hooks.result`)
  358. : err;
  359. const callback = /** @type {Callback<R>} */ (entry.callback);
  360. const callbacks = entry.callbacks;
  361. entry.state = DONE_STATE;
  362. entry.callback = undefined;
  363. entry.callbacks = undefined;
  364. entry.result = result;
  365. entry.error = error;
  366. const root = this._root;
  367. root._activeTasks--;
  368. if (root._willEnsureProcessing === false && root._needProcessing) {
  369. root._willEnsureProcessing = true;
  370. setImmediate(root._ensureProcessing);
  371. }
  372. if (inHandleResult++ > 3) {
  373. process.nextTick(() => {
  374. callback(error, result);
  375. if (callbacks !== undefined) {
  376. for (const callback of callbacks) {
  377. callback(error, result);
  378. }
  379. }
  380. });
  381. } else {
  382. callback(error, result);
  383. if (callbacks !== undefined) {
  384. for (const callback of callbacks) {
  385. callback(error, result);
  386. }
  387. }
  388. }
  389. inHandleResult--;
  390. });
  391. }
  392. clear() {
  393. this._entries.clear();
  394. this._queued.clear();
  395. this._activeTasks = 0;
  396. this._willEnsureProcessing = false;
  397. this._needProcessing = false;
  398. this._stopped = false;
  399. }
  400. }
  401. module.exports = AsyncQueue;