Nfsv4Connection.js 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.Nfsv4Connection = void 0;
  4. const tslib_1 = require("tslib");
  5. const Reader_1 = require("@jsonjoy.com/buffers/lib/Reader");
  6. const Nfsv4Decoder_1 = require("../Nfsv4Decoder");
  7. const Nfsv4FullEncoder_1 = require("../Nfsv4FullEncoder");
  8. const rm_1 = require("../../../rm");
  9. const rpc_1 = require("../../../rpc");
  10. const msg = tslib_1.__importStar(require("../messages"));
  11. const constants_1 = require("../constants");
  12. const Nfsv4CompoundProcCtx_1 = require("./Nfsv4CompoundProcCtx");
  13. const EMPTY_AUTH = new rpc_1.RpcOpaqueAuth(0, constants_1.EMPTY_READER);
  14. class Nfsv4Connection {
  15. constructor(opts) {
  16. this.closed = false;
  17. this.maxIncomingMessage = 2 * 1024 * 1024;
  18. this.maxBackpressure = 2 * 1024 * 1024;
  19. this.lastXid = 0;
  20. this.__uncorkTimer = null;
  21. this.debug = !!opts.debug;
  22. this.logger = opts.logger || console;
  23. const duplex = (this.duplex = opts.duplex);
  24. this.ops = opts.ops;
  25. this.rmDecoder = new rm_1.RmRecordDecoder();
  26. this.rpcDecoder = new rpc_1.RpcMessageDecoder();
  27. this.nfsDecoder = new Nfsv4Decoder_1.Nfsv4Decoder();
  28. const nfsEncoder = (this.nfsEncoder = new Nfsv4FullEncoder_1.Nfsv4FullEncoder());
  29. this.writer = nfsEncoder.writer;
  30. this.rmEncoder = nfsEncoder.rmEncoder;
  31. this.rpcEncoder = nfsEncoder.rpcEncoder;
  32. duplex.on('data', this.onData.bind(this));
  33. duplex.on('timeout', () => this.close());
  34. duplex.on('close', (hadError) => {
  35. this.close();
  36. });
  37. duplex.on('error', (err) => {
  38. this.logger.error('SOCKET ERROR:', err);
  39. this.close();
  40. });
  41. }
  42. onData(data) {
  43. const { rmDecoder, rpcDecoder } = this;
  44. rmDecoder.push(data);
  45. let record = rmDecoder.readRecord();
  46. while (record) {
  47. if (record.size()) {
  48. const rpcMessage = rpcDecoder.decodeMessage(record);
  49. if (rpcMessage)
  50. this.onRpcMessage(rpcMessage);
  51. else {
  52. this.close();
  53. return;
  54. }
  55. }
  56. record = rmDecoder.readRecord();
  57. }
  58. }
  59. onRpcMessage(msg) {
  60. if (msg instanceof rpc_1.RpcCallMessage) {
  61. this.lastXid = msg.xid;
  62. this.onRpcCallMessage(msg);
  63. }
  64. else if (msg instanceof rpc_1.RpcAcceptedReplyMessage) {
  65. throw new Error('Not implemented RpcAcceptedReplyMessage');
  66. }
  67. else if (msg instanceof rpc_1.RpcRejectedReplyMessage) {
  68. throw new Error('Not implemented RpcRejectedReplyMessage');
  69. }
  70. }
  71. onRpcCallMessage(procedure) {
  72. const { debug, logger, writer, rmEncoder } = this;
  73. const { xid, proc } = procedure;
  74. switch (proc) {
  75. case 1: {
  76. if (debug)
  77. logger.log(`\n<COMPOUND{${xid}}>`);
  78. if (!(procedure.params instanceof Reader_1.Reader))
  79. return;
  80. const compound = this.nfsDecoder.decodeCompoundRequest(procedure.params);
  81. if (compound instanceof msg.Nfsv4CompoundRequest) {
  82. new Nfsv4CompoundProcCtx_1.Nfsv4CompoundProcCtx(this, compound)
  83. .exec()
  84. .then((procResponse) => {
  85. if (debug)
  86. logger.log(`</COMPOUND{${xid}}>`);
  87. this.nfsEncoder.writeAcceptedCompoundReply(xid, EMPTY_AUTH, procResponse);
  88. this.write(writer.flush());
  89. })
  90. .catch((err) => {
  91. logger.error('NFS COMPOUND error:', xid, err);
  92. this.nfsEncoder.writeRejectedReply(xid, 10006);
  93. });
  94. }
  95. else
  96. this.closeWithError(4);
  97. break;
  98. }
  99. case 0: {
  100. if (debug)
  101. logger.log('NULL', procedure);
  102. const state = rmEncoder.startRecord();
  103. this.rpcEncoder.writeAcceptedReply(xid, EMPTY_AUTH, 0);
  104. rmEncoder.endRecord(state);
  105. this.write(writer.flush());
  106. break;
  107. }
  108. default: {
  109. if (debug)
  110. logger.error(`Unknown procedure: ${proc}`);
  111. }
  112. }
  113. }
  114. closeWithError(error) {
  115. if (this.debug)
  116. this.logger.log(`Closing with error: RpcAcceptStat = ${error}, xid = ${this.lastXid}`);
  117. const xid = this.lastXid;
  118. if (xid) {
  119. const state = this.rmEncoder.startRecord();
  120. const verify = new rpc_1.RpcOpaqueAuth(0, constants_1.EMPTY_READER);
  121. this.rpcEncoder.writeAcceptedReply(xid, verify, error);
  122. this.rmEncoder.endRecord(state);
  123. const bin = this.writer.flush();
  124. this.duplex.write(bin);
  125. }
  126. this.close();
  127. }
  128. close() {
  129. if (this.closed)
  130. return;
  131. this.closed = true;
  132. clearImmediate(this.__uncorkTimer);
  133. this.__uncorkTimer = null;
  134. const duplex = this.duplex;
  135. duplex.removeAllListeners();
  136. if (!duplex.destroyed)
  137. duplex.destroy();
  138. }
  139. write(buf) {
  140. if (this.closed)
  141. return;
  142. const duplex = this.duplex;
  143. if (duplex.writableLength > this.maxBackpressure) {
  144. this.closeWithError(5);
  145. return;
  146. }
  147. const __uncorkTimer = this.__uncorkTimer;
  148. if (!__uncorkTimer)
  149. duplex.cork();
  150. duplex.write(buf);
  151. if (!__uncorkTimer)
  152. this.__uncorkTimer = setImmediate(() => {
  153. this.__uncorkTimer = null;
  154. duplex.uncork();
  155. });
  156. }
  157. send() { }
  158. }
  159. exports.Nfsv4Connection = Nfsv4Connection;
  160. //# sourceMappingURL=Nfsv4Connection.js.map