sender.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607
  1. /* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Duplex" }] */
  2. 'use strict';
  3. const { Duplex } = require('stream');
  4. const { randomFillSync } = require('crypto');
  5. const {
  6. types: { isUint8Array }
  7. } = require('util');
  8. const PerMessageDeflate = require('./permessage-deflate');
  9. const { EMPTY_BUFFER, kWebSocket, NOOP } = require('./constants');
  10. const { isBlob, isValidStatusCode } = require('./validation');
  11. const { mask: applyMask, toBuffer } = require('./buffer-util');
  12. const kByteLength = Symbol('kByteLength');
  13. const maskBuffer = Buffer.alloc(4);
  14. const RANDOM_POOL_SIZE = 8 * 1024;
  15. let randomPool;
  16. let randomPoolPointer = RANDOM_POOL_SIZE;
  17. const DEFAULT = 0;
  18. const DEFLATING = 1;
  19. const GET_BLOB_DATA = 2;
  20. /**
  21. * HyBi Sender implementation.
  22. */
  23. class Sender {
  24. /**
  25. * Creates a Sender instance.
  26. *
  27. * @param {Duplex} socket The connection socket
  28. * @param {Object} [extensions] An object containing the negotiated extensions
  29. * @param {Function} [generateMask] The function used to generate the masking
  30. * key
  31. */
  32. constructor(socket, extensions, generateMask) {
  33. this._extensions = extensions || {};
  34. if (generateMask) {
  35. this._generateMask = generateMask;
  36. this._maskBuffer = Buffer.alloc(4);
  37. }
  38. this._socket = socket;
  39. this._firstFragment = true;
  40. this._compress = false;
  41. this._bufferedBytes = 0;
  42. this._queue = [];
  43. this._state = DEFAULT;
  44. this.onerror = NOOP;
  45. this[kWebSocket] = undefined;
  46. }
  47. /**
  48. * Frames a piece of data according to the HyBi WebSocket protocol.
  49. *
  50. * @param {(Buffer|String)} data The data to frame
  51. * @param {Object} options Options object
  52. * @param {Boolean} [options.fin=false] Specifies whether or not to set the
  53. * FIN bit
  54. * @param {Function} [options.generateMask] The function used to generate the
  55. * masking key
  56. * @param {Boolean} [options.mask=false] Specifies whether or not to mask
  57. * `data`
  58. * @param {Buffer} [options.maskBuffer] The buffer used to store the masking
  59. * key
  60. * @param {Number} options.opcode The opcode
  61. * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
  62. * modified
  63. * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
  64. * RSV1 bit
  65. * @return {(Buffer|String)[]} The framed data
  66. * @public
  67. */
  68. static frame(data, options) {
  69. let mask;
  70. let merge = false;
  71. let offset = 2;
  72. let skipMasking = false;
  73. if (options.mask) {
  74. mask = options.maskBuffer || maskBuffer;
  75. if (options.generateMask) {
  76. options.generateMask(mask);
  77. } else {
  78. if (randomPoolPointer === RANDOM_POOL_SIZE) {
  79. /* istanbul ignore else */
  80. if (randomPool === undefined) {
  81. //
  82. // This is lazily initialized because server-sent frames must not
  83. // be masked so it may never be used.
  84. //
  85. randomPool = Buffer.alloc(RANDOM_POOL_SIZE);
  86. }
  87. randomFillSync(randomPool, 0, RANDOM_POOL_SIZE);
  88. randomPoolPointer = 0;
  89. }
  90. mask[0] = randomPool[randomPoolPointer++];
  91. mask[1] = randomPool[randomPoolPointer++];
  92. mask[2] = randomPool[randomPoolPointer++];
  93. mask[3] = randomPool[randomPoolPointer++];
  94. }
  95. skipMasking = (mask[0] | mask[1] | mask[2] | mask[3]) === 0;
  96. offset = 6;
  97. }
  98. let dataLength;
  99. if (typeof data === 'string') {
  100. if (
  101. (!options.mask || skipMasking) &&
  102. options[kByteLength] !== undefined
  103. ) {
  104. dataLength = options[kByteLength];
  105. } else {
  106. data = Buffer.from(data);
  107. dataLength = data.length;
  108. }
  109. } else {
  110. dataLength = data.length;
  111. merge = options.mask && options.readOnly && !skipMasking;
  112. }
  113. let payloadLength = dataLength;
  114. if (dataLength >= 65536) {
  115. offset += 8;
  116. payloadLength = 127;
  117. } else if (dataLength > 125) {
  118. offset += 2;
  119. payloadLength = 126;
  120. }
  121. const target = Buffer.allocUnsafe(merge ? dataLength + offset : offset);
  122. target[0] = options.fin ? options.opcode | 0x80 : options.opcode;
  123. if (options.rsv1) target[0] |= 0x40;
  124. target[1] = payloadLength;
  125. if (payloadLength === 126) {
  126. target.writeUInt16BE(dataLength, 2);
  127. } else if (payloadLength === 127) {
  128. target[2] = target[3] = 0;
  129. target.writeUIntBE(dataLength, 4, 6);
  130. }
  131. if (!options.mask) return [target, data];
  132. target[1] |= 0x80;
  133. target[offset - 4] = mask[0];
  134. target[offset - 3] = mask[1];
  135. target[offset - 2] = mask[2];
  136. target[offset - 1] = mask[3];
  137. if (skipMasking) return [target, data];
  138. if (merge) {
  139. applyMask(data, mask, target, offset, dataLength);
  140. return [target];
  141. }
  142. applyMask(data, mask, data, 0, dataLength);
  143. return [target, data];
  144. }
  145. /**
  146. * Sends a close message to the other peer.
  147. *
  148. * @param {Number} [code] The status code component of the body
  149. * @param {(String|Buffer)} [data] The message component of the body
  150. * @param {Boolean} [mask=false] Specifies whether or not to mask the message
  151. * @param {Function} [cb] Callback
  152. * @public
  153. */
  154. close(code, data, mask, cb) {
  155. let buf;
  156. if (code === undefined) {
  157. buf = EMPTY_BUFFER;
  158. } else if (typeof code !== 'number' || !isValidStatusCode(code)) {
  159. throw new TypeError('First argument must be a valid error code number');
  160. } else if (data === undefined || !data.length) {
  161. buf = Buffer.allocUnsafe(2);
  162. buf.writeUInt16BE(code, 0);
  163. } else {
  164. const length = Buffer.byteLength(data);
  165. if (length > 123) {
  166. throw new RangeError('The message must not be greater than 123 bytes');
  167. }
  168. buf = Buffer.allocUnsafe(2 + length);
  169. buf.writeUInt16BE(code, 0);
  170. if (typeof data === 'string') {
  171. buf.write(data, 2);
  172. } else if (isUint8Array(data)) {
  173. buf.set(data, 2);
  174. } else {
  175. throw new TypeError('Second argument must be a string or a Uint8Array');
  176. }
  177. }
  178. const options = {
  179. [kByteLength]: buf.length,
  180. fin: true,
  181. generateMask: this._generateMask,
  182. mask,
  183. maskBuffer: this._maskBuffer,
  184. opcode: 0x08,
  185. readOnly: false,
  186. rsv1: false
  187. };
  188. if (this._state !== DEFAULT) {
  189. this.enqueue([this.dispatch, buf, false, options, cb]);
  190. } else {
  191. this.sendFrame(Sender.frame(buf, options), cb);
  192. }
  193. }
  194. /**
  195. * Sends a ping message to the other peer.
  196. *
  197. * @param {*} data The message to send
  198. * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
  199. * @param {Function} [cb] Callback
  200. * @public
  201. */
  202. ping(data, mask, cb) {
  203. let byteLength;
  204. let readOnly;
  205. if (typeof data === 'string') {
  206. byteLength = Buffer.byteLength(data);
  207. readOnly = false;
  208. } else if (isBlob(data)) {
  209. byteLength = data.size;
  210. readOnly = false;
  211. } else {
  212. data = toBuffer(data);
  213. byteLength = data.length;
  214. readOnly = toBuffer.readOnly;
  215. }
  216. if (byteLength > 125) {
  217. throw new RangeError('The data size must not be greater than 125 bytes');
  218. }
  219. const options = {
  220. [kByteLength]: byteLength,
  221. fin: true,
  222. generateMask: this._generateMask,
  223. mask,
  224. maskBuffer: this._maskBuffer,
  225. opcode: 0x09,
  226. readOnly,
  227. rsv1: false
  228. };
  229. if (isBlob(data)) {
  230. if (this._state !== DEFAULT) {
  231. this.enqueue([this.getBlobData, data, false, options, cb]);
  232. } else {
  233. this.getBlobData(data, false, options, cb);
  234. }
  235. } else if (this._state !== DEFAULT) {
  236. this.enqueue([this.dispatch, data, false, options, cb]);
  237. } else {
  238. this.sendFrame(Sender.frame(data, options), cb);
  239. }
  240. }
  241. /**
  242. * Sends a pong message to the other peer.
  243. *
  244. * @param {*} data The message to send
  245. * @param {Boolean} [mask=false] Specifies whether or not to mask `data`
  246. * @param {Function} [cb] Callback
  247. * @public
  248. */
  249. pong(data, mask, cb) {
  250. let byteLength;
  251. let readOnly;
  252. if (typeof data === 'string') {
  253. byteLength = Buffer.byteLength(data);
  254. readOnly = false;
  255. } else if (isBlob(data)) {
  256. byteLength = data.size;
  257. readOnly = false;
  258. } else {
  259. data = toBuffer(data);
  260. byteLength = data.length;
  261. readOnly = toBuffer.readOnly;
  262. }
  263. if (byteLength > 125) {
  264. throw new RangeError('The data size must not be greater than 125 bytes');
  265. }
  266. const options = {
  267. [kByteLength]: byteLength,
  268. fin: true,
  269. generateMask: this._generateMask,
  270. mask,
  271. maskBuffer: this._maskBuffer,
  272. opcode: 0x0a,
  273. readOnly,
  274. rsv1: false
  275. };
  276. if (isBlob(data)) {
  277. if (this._state !== DEFAULT) {
  278. this.enqueue([this.getBlobData, data, false, options, cb]);
  279. } else {
  280. this.getBlobData(data, false, options, cb);
  281. }
  282. } else if (this._state !== DEFAULT) {
  283. this.enqueue([this.dispatch, data, false, options, cb]);
  284. } else {
  285. this.sendFrame(Sender.frame(data, options), cb);
  286. }
  287. }
  288. /**
  289. * Sends a data message to the other peer.
  290. *
  291. * @param {*} data The message to send
  292. * @param {Object} options Options object
  293. * @param {Boolean} [options.binary=false] Specifies whether `data` is binary
  294. * or text
  295. * @param {Boolean} [options.compress=false] Specifies whether or not to
  296. * compress `data`
  297. * @param {Boolean} [options.fin=false] Specifies whether the fragment is the
  298. * last one
  299. * @param {Boolean} [options.mask=false] Specifies whether or not to mask
  300. * `data`
  301. * @param {Function} [cb] Callback
  302. * @public
  303. */
  304. send(data, options, cb) {
  305. const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
  306. let opcode = options.binary ? 2 : 1;
  307. let rsv1 = options.compress;
  308. let byteLength;
  309. let readOnly;
  310. if (typeof data === 'string') {
  311. byteLength = Buffer.byteLength(data);
  312. readOnly = false;
  313. } else if (isBlob(data)) {
  314. byteLength = data.size;
  315. readOnly = false;
  316. } else {
  317. data = toBuffer(data);
  318. byteLength = data.length;
  319. readOnly = toBuffer.readOnly;
  320. }
  321. if (this._firstFragment) {
  322. this._firstFragment = false;
  323. if (
  324. rsv1 &&
  325. perMessageDeflate &&
  326. perMessageDeflate.params[
  327. perMessageDeflate._isServer
  328. ? 'server_no_context_takeover'
  329. : 'client_no_context_takeover'
  330. ]
  331. ) {
  332. rsv1 = byteLength >= perMessageDeflate._threshold;
  333. }
  334. this._compress = rsv1;
  335. } else {
  336. rsv1 = false;
  337. opcode = 0;
  338. }
  339. if (options.fin) this._firstFragment = true;
  340. const opts = {
  341. [kByteLength]: byteLength,
  342. fin: options.fin,
  343. generateMask: this._generateMask,
  344. mask: options.mask,
  345. maskBuffer: this._maskBuffer,
  346. opcode,
  347. readOnly,
  348. rsv1
  349. };
  350. if (isBlob(data)) {
  351. if (this._state !== DEFAULT) {
  352. this.enqueue([this.getBlobData, data, this._compress, opts, cb]);
  353. } else {
  354. this.getBlobData(data, this._compress, opts, cb);
  355. }
  356. } else if (this._state !== DEFAULT) {
  357. this.enqueue([this.dispatch, data, this._compress, opts, cb]);
  358. } else {
  359. this.dispatch(data, this._compress, opts, cb);
  360. }
  361. }
  362. /**
  363. * Gets the contents of a blob as binary data.
  364. *
  365. * @param {Blob} blob The blob
  366. * @param {Boolean} [compress=false] Specifies whether or not to compress
  367. * the data
  368. * @param {Object} options Options object
  369. * @param {Boolean} [options.fin=false] Specifies whether or not to set the
  370. * FIN bit
  371. * @param {Function} [options.generateMask] The function used to generate the
  372. * masking key
  373. * @param {Boolean} [options.mask=false] Specifies whether or not to mask
  374. * `data`
  375. * @param {Buffer} [options.maskBuffer] The buffer used to store the masking
  376. * key
  377. * @param {Number} options.opcode The opcode
  378. * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
  379. * modified
  380. * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
  381. * RSV1 bit
  382. * @param {Function} [cb] Callback
  383. * @private
  384. */
  385. getBlobData(blob, compress, options, cb) {
  386. this._bufferedBytes += options[kByteLength];
  387. this._state = GET_BLOB_DATA;
  388. blob
  389. .arrayBuffer()
  390. .then((arrayBuffer) => {
  391. if (this._socket.destroyed) {
  392. const err = new Error(
  393. 'The socket was closed while the blob was being read'
  394. );
  395. //
  396. // `callCallbacks` is called in the next tick to ensure that errors
  397. // that might be thrown in the callbacks behave like errors thrown
  398. // outside the promise chain.
  399. //
  400. process.nextTick(callCallbacks, this, err, cb);
  401. return;
  402. }
  403. this._bufferedBytes -= options[kByteLength];
  404. const data = toBuffer(arrayBuffer);
  405. if (!compress) {
  406. this._state = DEFAULT;
  407. this.sendFrame(Sender.frame(data, options), cb);
  408. this.dequeue();
  409. } else {
  410. this.dispatch(data, compress, options, cb);
  411. }
  412. })
  413. .catch((err) => {
  414. //
  415. // `onError` is called in the next tick for the same reason that
  416. // `callCallbacks` above is.
  417. //
  418. process.nextTick(onError, this, err, cb);
  419. });
  420. }
  421. /**
  422. * Dispatches a message.
  423. *
  424. * @param {(Buffer|String)} data The message to send
  425. * @param {Boolean} [compress=false] Specifies whether or not to compress
  426. * `data`
  427. * @param {Object} options Options object
  428. * @param {Boolean} [options.fin=false] Specifies whether or not to set the
  429. * FIN bit
  430. * @param {Function} [options.generateMask] The function used to generate the
  431. * masking key
  432. * @param {Boolean} [options.mask=false] Specifies whether or not to mask
  433. * `data`
  434. * @param {Buffer} [options.maskBuffer] The buffer used to store the masking
  435. * key
  436. * @param {Number} options.opcode The opcode
  437. * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
  438. * modified
  439. * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
  440. * RSV1 bit
  441. * @param {Function} [cb] Callback
  442. * @private
  443. */
  444. dispatch(data, compress, options, cb) {
  445. if (!compress) {
  446. this.sendFrame(Sender.frame(data, options), cb);
  447. return;
  448. }
  449. const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
  450. this._bufferedBytes += options[kByteLength];
  451. this._state = DEFLATING;
  452. perMessageDeflate.compress(data, options.fin, (_, buf) => {
  453. if (this._socket.destroyed) {
  454. const err = new Error(
  455. 'The socket was closed while data was being compressed'
  456. );
  457. callCallbacks(this, err, cb);
  458. return;
  459. }
  460. this._bufferedBytes -= options[kByteLength];
  461. this._state = DEFAULT;
  462. options.readOnly = false;
  463. this.sendFrame(Sender.frame(buf, options), cb);
  464. this.dequeue();
  465. });
  466. }
  467. /**
  468. * Executes queued send operations.
  469. *
  470. * @private
  471. */
  472. dequeue() {
  473. while (this._state === DEFAULT && this._queue.length) {
  474. const params = this._queue.shift();
  475. this._bufferedBytes -= params[3][kByteLength];
  476. Reflect.apply(params[0], this, params.slice(1));
  477. }
  478. }
  479. /**
  480. * Enqueues a send operation.
  481. *
  482. * @param {Array} params Send operation parameters.
  483. * @private
  484. */
  485. enqueue(params) {
  486. this._bufferedBytes += params[3][kByteLength];
  487. this._queue.push(params);
  488. }
  489. /**
  490. * Sends a frame.
  491. *
  492. * @param {(Buffer | String)[]} list The frame to send
  493. * @param {Function} [cb] Callback
  494. * @private
  495. */
  496. sendFrame(list, cb) {
  497. if (list.length === 2) {
  498. this._socket.cork();
  499. this._socket.write(list[0]);
  500. this._socket.write(list[1], cb);
  501. this._socket.uncork();
  502. } else {
  503. this._socket.write(list[0], cb);
  504. }
  505. }
  506. }
  507. module.exports = Sender;
  508. /**
  509. * Calls queued callbacks with an error.
  510. *
  511. * @param {Sender} sender The `Sender` instance
  512. * @param {Error} err The error to call the callbacks with
  513. * @param {Function} [cb] The first callback
  514. * @private
  515. */
  516. function callCallbacks(sender, err, cb) {
  517. if (typeof cb === 'function') cb(err);
  518. for (let i = 0; i < sender._queue.length; i++) {
  519. const params = sender._queue[i];
  520. const callback = params[params.length - 1];
  521. if (typeof callback === 'function') callback(err);
  522. }
  523. }
  524. /**
  525. * Handles a `Sender` error.
  526. *
  527. * @param {Sender} sender The `Sender` instance
  528. * @param {Error} err The error
  529. * @param {Function} [cb] The first pending callback
  530. * @private
  531. */
  532. function onError(sender, err, cb) {
  533. callCallbacks(sender, err, cb);
  534. sender.onerror(err);
  535. }