call-stream.js 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709
  1. "use strict";
  2. /*
  3. * Copyright 2019 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. Object.defineProperty(exports, "__esModule", { value: true });
  19. exports.Http2CallStream = exports.InterceptingListenerImpl = exports.isInterceptingListener = void 0;
  20. const http2 = require("http2");
  21. const os = require("os");
  22. const constants_1 = require("./constants");
  23. const metadata_1 = require("./metadata");
  24. const stream_decoder_1 = require("./stream-decoder");
  25. const logging = require("./logging");
  26. const constants_2 = require("./constants");
  27. const TRACER_NAME = 'call_stream';
  28. const { HTTP2_HEADER_STATUS, HTTP2_HEADER_CONTENT_TYPE, NGHTTP2_CANCEL, } = http2.constants;
  29. /**
  30. * Should do approximately the same thing as util.getSystemErrorName but the
  31. * TypeScript types don't have that function for some reason so I just made my
  32. * own.
  33. * @param errno
  34. */
  35. function getSystemErrorName(errno) {
  36. for (const [name, num] of Object.entries(os.constants.errno)) {
  37. if (num === errno) {
  38. return name;
  39. }
  40. }
  41. return 'Unknown system error ' + errno;
  42. }
  43. function getMinDeadline(deadlineList) {
  44. let minValue = Infinity;
  45. for (const deadline of deadlineList) {
  46. const deadlineMsecs = deadline instanceof Date ? deadline.getTime() : deadline;
  47. if (deadlineMsecs < minValue) {
  48. minValue = deadlineMsecs;
  49. }
  50. }
  51. return minValue;
  52. }
  53. function isInterceptingListener(listener) {
  54. return (listener.onReceiveMetadata !== undefined &&
  55. listener.onReceiveMetadata.length === 1);
  56. }
  57. exports.isInterceptingListener = isInterceptingListener;
  58. class InterceptingListenerImpl {
  59. constructor(listener, nextListener) {
  60. this.listener = listener;
  61. this.nextListener = nextListener;
  62. this.processingMetadata = false;
  63. this.hasPendingMessage = false;
  64. this.processingMessage = false;
  65. this.pendingStatus = null;
  66. }
  67. processPendingMessage() {
  68. if (this.hasPendingMessage) {
  69. this.nextListener.onReceiveMessage(this.pendingMessage);
  70. this.pendingMessage = null;
  71. this.hasPendingMessage = false;
  72. }
  73. }
  74. processPendingStatus() {
  75. if (this.pendingStatus) {
  76. this.nextListener.onReceiveStatus(this.pendingStatus);
  77. }
  78. }
  79. onReceiveMetadata(metadata) {
  80. this.processingMetadata = true;
  81. this.listener.onReceiveMetadata(metadata, (metadata) => {
  82. this.processingMetadata = false;
  83. this.nextListener.onReceiveMetadata(metadata);
  84. this.processPendingMessage();
  85. this.processPendingStatus();
  86. });
  87. }
  88. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  89. onReceiveMessage(message) {
  90. /* If this listener processes messages asynchronously, the last message may
  91. * be reordered with respect to the status */
  92. this.processingMessage = true;
  93. this.listener.onReceiveMessage(message, (msg) => {
  94. this.processingMessage = false;
  95. if (this.processingMetadata) {
  96. this.pendingMessage = msg;
  97. this.hasPendingMessage = true;
  98. }
  99. else {
  100. this.nextListener.onReceiveMessage(msg);
  101. this.processPendingStatus();
  102. }
  103. });
  104. }
  105. onReceiveStatus(status) {
  106. this.listener.onReceiveStatus(status, (processedStatus) => {
  107. if (this.processingMetadata || this.processingMessage) {
  108. this.pendingStatus = processedStatus;
  109. }
  110. else {
  111. this.nextListener.onReceiveStatus(processedStatus);
  112. }
  113. });
  114. }
  115. }
  116. exports.InterceptingListenerImpl = InterceptingListenerImpl;
  117. class Http2CallStream {
  118. constructor(methodName, channel, options, filterStackFactory, channelCallCredentials, callNumber) {
  119. this.methodName = methodName;
  120. this.channel = channel;
  121. this.options = options;
  122. this.channelCallCredentials = channelCallCredentials;
  123. this.callNumber = callNumber;
  124. this.http2Stream = null;
  125. this.pendingRead = false;
  126. this.isWriteFilterPending = false;
  127. this.pendingWrite = null;
  128. this.pendingWriteCallback = null;
  129. this.writesClosed = false;
  130. this.decoder = new stream_decoder_1.StreamDecoder();
  131. this.isReadFilterPending = false;
  132. this.canPush = false;
  133. /**
  134. * Indicates that an 'end' event has come from the http2 stream, so there
  135. * will be no more data events.
  136. */
  137. this.readsClosed = false;
  138. this.statusOutput = false;
  139. this.unpushedReadMessages = [];
  140. this.unfilteredReadMessages = [];
  141. // Status code mapped from :status. To be used if grpc-status is not received
  142. this.mappedStatusCode = constants_1.Status.UNKNOWN;
  143. // This is populated (non-null) if and only if the call has ended
  144. this.finalStatus = null;
  145. this.subchannel = null;
  146. this.listener = null;
  147. this.internalError = null;
  148. this.configDeadline = Infinity;
  149. this.statusWatchers = [];
  150. this.streamEndWatchers = [];
  151. this.callStatsTracker = null;
  152. this.filterStack = filterStackFactory.createFilter(this);
  153. this.credentials = channelCallCredentials;
  154. this.disconnectListener = () => {
  155. this.endCall({
  156. code: constants_1.Status.UNAVAILABLE,
  157. details: 'Connection dropped',
  158. metadata: new metadata_1.Metadata(),
  159. });
  160. };
  161. if (this.options.parentCall &&
  162. this.options.flags & constants_1.Propagate.CANCELLATION) {
  163. this.options.parentCall.on('cancelled', () => {
  164. this.cancelWithStatus(constants_1.Status.CANCELLED, 'Cancelled by parent call');
  165. });
  166. }
  167. }
  168. outputStatus() {
  169. var _a;
  170. /* Precondition: this.finalStatus !== null */
  171. if (this.listener && !this.statusOutput) {
  172. this.statusOutput = true;
  173. const filteredStatus = this.filterStack.receiveTrailers(this.finalStatus);
  174. this.trace('ended with status: code=' +
  175. filteredStatus.code +
  176. ' details="' +
  177. filteredStatus.details +
  178. '"');
  179. this.statusWatchers.forEach(watcher => watcher(filteredStatus));
  180. /* We delay the actual action of bubbling up the status to insulate the
  181. * cleanup code in this class from any errors that may be thrown in the
  182. * upper layers as a result of bubbling up the status. In particular,
  183. * if the status is not OK, the "error" event may be emitted
  184. * synchronously at the top level, which will result in a thrown error if
  185. * the user does not handle that event. */
  186. process.nextTick(() => {
  187. var _a;
  188. (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveStatus(filteredStatus);
  189. });
  190. /* Leave the http2 stream in flowing state to drain incoming messages, to
  191. * ensure that the stream closure completes. The call stream already does
  192. * not push more messages after the status is output, so the messages go
  193. * nowhere either way. */
  194. (_a = this.http2Stream) === null || _a === void 0 ? void 0 : _a.resume();
  195. if (this.subchannel) {
  196. this.subchannel.callUnref();
  197. this.subchannel.removeDisconnectListener(this.disconnectListener);
  198. }
  199. }
  200. }
  201. trace(text) {
  202. logging.trace(constants_2.LogVerbosity.DEBUG, TRACER_NAME, '[' + this.callNumber + '] ' + text);
  203. }
  204. /**
  205. * On first call, emits a 'status' event with the given StatusObject.
  206. * Subsequent calls are no-ops.
  207. * @param status The status of the call.
  208. */
  209. endCall(status) {
  210. /* If the status is OK and a new status comes in (e.g. from a
  211. * deserialization failure), that new status takes priority */
  212. if (this.finalStatus === null || this.finalStatus.code === constants_1.Status.OK) {
  213. this.finalStatus = status;
  214. this.maybeOutputStatus();
  215. }
  216. this.destroyHttp2Stream();
  217. }
  218. maybeOutputStatus() {
  219. if (this.finalStatus !== null) {
  220. /* The combination check of readsClosed and that the two message buffer
  221. * arrays are empty checks that there all incoming data has been fully
  222. * processed */
  223. if (this.finalStatus.code !== constants_1.Status.OK ||
  224. (this.readsClosed &&
  225. this.unpushedReadMessages.length === 0 &&
  226. this.unfilteredReadMessages.length === 0 &&
  227. !this.isReadFilterPending)) {
  228. this.outputStatus();
  229. }
  230. }
  231. }
  232. push(message) {
  233. this.trace('pushing to reader message of length ' +
  234. (message instanceof Buffer ? message.length : null));
  235. this.canPush = false;
  236. process.nextTick(() => {
  237. var _a;
  238. /* If we have already output the status any later messages should be
  239. * ignored, and can cause out-of-order operation errors higher up in the
  240. * stack. Checking as late as possible here to avoid any race conditions.
  241. */
  242. if (this.statusOutput) {
  243. return;
  244. }
  245. (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveMessage(message);
  246. this.maybeOutputStatus();
  247. });
  248. }
  249. handleFilterError(error) {
  250. this.cancelWithStatus(constants_1.Status.INTERNAL, error.message);
  251. }
  252. handleFilteredRead(message) {
  253. /* If we the call has already ended with an error, we don't want to do
  254. * anything with this message. Dropping it on the floor is correct
  255. * behavior */
  256. if (this.finalStatus !== null && this.finalStatus.code !== constants_1.Status.OK) {
  257. this.maybeOutputStatus();
  258. return;
  259. }
  260. this.isReadFilterPending = false;
  261. if (this.canPush) {
  262. this.http2Stream.pause();
  263. this.push(message);
  264. }
  265. else {
  266. this.trace('unpushedReadMessages.push message of length ' + message.length);
  267. this.unpushedReadMessages.push(message);
  268. }
  269. if (this.unfilteredReadMessages.length > 0) {
  270. /* nextMessage is guaranteed not to be undefined because
  271. unfilteredReadMessages is non-empty */
  272. const nextMessage = this.unfilteredReadMessages.shift();
  273. this.filterReceivedMessage(nextMessage);
  274. }
  275. }
  276. filterReceivedMessage(framedMessage) {
  277. /* If we the call has already ended with an error, we don't want to do
  278. * anything with this message. Dropping it on the floor is correct
  279. * behavior */
  280. if (this.finalStatus !== null && this.finalStatus.code !== constants_1.Status.OK) {
  281. this.maybeOutputStatus();
  282. return;
  283. }
  284. this.trace('filterReceivedMessage of length ' + framedMessage.length);
  285. this.isReadFilterPending = true;
  286. this.filterStack
  287. .receiveMessage(Promise.resolve(framedMessage))
  288. .then(this.handleFilteredRead.bind(this), this.handleFilterError.bind(this));
  289. }
  290. tryPush(messageBytes) {
  291. if (this.isReadFilterPending) {
  292. this.trace('unfilteredReadMessages.push message of length ' +
  293. (messageBytes && messageBytes.length));
  294. this.unfilteredReadMessages.push(messageBytes);
  295. }
  296. else {
  297. this.filterReceivedMessage(messageBytes);
  298. }
  299. }
  300. handleTrailers(headers) {
  301. this.streamEndWatchers.forEach(watcher => watcher(true));
  302. let headersString = '';
  303. for (const header of Object.keys(headers)) {
  304. headersString += '\t\t' + header + ': ' + headers[header] + '\n';
  305. }
  306. this.trace('Received server trailers:\n' + headersString);
  307. let metadata;
  308. try {
  309. metadata = metadata_1.Metadata.fromHttp2Headers(headers);
  310. }
  311. catch (e) {
  312. metadata = new metadata_1.Metadata();
  313. }
  314. const metadataMap = metadata.getMap();
  315. let code = this.mappedStatusCode;
  316. if (code === constants_1.Status.UNKNOWN &&
  317. typeof metadataMap['grpc-status'] === 'string') {
  318. const receivedStatus = Number(metadataMap['grpc-status']);
  319. if (receivedStatus in constants_1.Status) {
  320. code = receivedStatus;
  321. this.trace('received status code ' + receivedStatus + ' from server');
  322. }
  323. metadata.remove('grpc-status');
  324. }
  325. let details = '';
  326. if (typeof metadataMap['grpc-message'] === 'string') {
  327. try {
  328. details = decodeURI(metadataMap['grpc-message']);
  329. }
  330. catch (e) {
  331. details = metadataMap['grpc-message'];
  332. }
  333. metadata.remove('grpc-message');
  334. this.trace('received status details string "' + details + '" from server');
  335. }
  336. const status = { code, details, metadata };
  337. // This is a no-op if the call was already ended when handling headers.
  338. this.endCall(status);
  339. }
  340. writeMessageToStream(message, callback) {
  341. var _a;
  342. (_a = this.callStatsTracker) === null || _a === void 0 ? void 0 : _a.addMessageSent();
  343. this.http2Stream.write(message, callback);
  344. }
  345. attachHttp2Stream(stream, subchannel, extraFilters, callStatsTracker) {
  346. this.filterStack.push(extraFilters);
  347. if (this.finalStatus !== null) {
  348. stream.close(NGHTTP2_CANCEL);
  349. }
  350. else {
  351. this.trace('attachHttp2Stream from subchannel ' + subchannel.getAddress());
  352. this.http2Stream = stream;
  353. this.subchannel = subchannel;
  354. this.callStatsTracker = callStatsTracker;
  355. subchannel.addDisconnectListener(this.disconnectListener);
  356. subchannel.callRef();
  357. stream.on('response', (headers, flags) => {
  358. var _a;
  359. let headersString = '';
  360. for (const header of Object.keys(headers)) {
  361. headersString += '\t\t' + header + ': ' + headers[header] + '\n';
  362. }
  363. this.trace('Received server headers:\n' + headersString);
  364. switch (headers[':status']) {
  365. // TODO(murgatroid99): handle 100 and 101
  366. case 400:
  367. this.mappedStatusCode = constants_1.Status.INTERNAL;
  368. break;
  369. case 401:
  370. this.mappedStatusCode = constants_1.Status.UNAUTHENTICATED;
  371. break;
  372. case 403:
  373. this.mappedStatusCode = constants_1.Status.PERMISSION_DENIED;
  374. break;
  375. case 404:
  376. this.mappedStatusCode = constants_1.Status.UNIMPLEMENTED;
  377. break;
  378. case 429:
  379. case 502:
  380. case 503:
  381. case 504:
  382. this.mappedStatusCode = constants_1.Status.UNAVAILABLE;
  383. break;
  384. default:
  385. this.mappedStatusCode = constants_1.Status.UNKNOWN;
  386. }
  387. if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) {
  388. this.handleTrailers(headers);
  389. }
  390. else {
  391. let metadata;
  392. try {
  393. metadata = metadata_1.Metadata.fromHttp2Headers(headers);
  394. }
  395. catch (error) {
  396. this.endCall({
  397. code: constants_1.Status.UNKNOWN,
  398. details: error.message,
  399. metadata: new metadata_1.Metadata(),
  400. });
  401. return;
  402. }
  403. try {
  404. const finalMetadata = this.filterStack.receiveMetadata(metadata);
  405. (_a = this.listener) === null || _a === void 0 ? void 0 : _a.onReceiveMetadata(finalMetadata);
  406. }
  407. catch (error) {
  408. this.endCall({
  409. code: constants_1.Status.UNKNOWN,
  410. details: error.message,
  411. metadata: new metadata_1.Metadata(),
  412. });
  413. }
  414. }
  415. });
  416. stream.on('trailers', (headers) => {
  417. this.handleTrailers(headers);
  418. });
  419. stream.on('data', (data) => {
  420. /* If the status has already been output, allow the http2 stream to
  421. * drain without processing the data. */
  422. if (this.statusOutput) {
  423. return;
  424. }
  425. this.trace('receive HTTP/2 data frame of length ' + data.length);
  426. const messages = this.decoder.write(data);
  427. for (const message of messages) {
  428. this.trace('parsed message of length ' + message.length);
  429. this.callStatsTracker.addMessageReceived();
  430. this.tryPush(message);
  431. }
  432. });
  433. stream.on('end', () => {
  434. this.readsClosed = true;
  435. this.maybeOutputStatus();
  436. });
  437. stream.on('close', () => {
  438. /* Use process.next tick to ensure that this code happens after any
  439. * "error" event that may be emitted at about the same time, so that
  440. * we can bubble up the error message from that event. */
  441. process.nextTick(() => {
  442. var _a;
  443. this.trace('HTTP/2 stream closed with code ' + stream.rstCode);
  444. /* If we have a final status with an OK status code, that means that
  445. * we have received all of the messages and we have processed the
  446. * trailers and the call completed successfully, so it doesn't matter
  447. * how the stream ends after that */
  448. if (((_a = this.finalStatus) === null || _a === void 0 ? void 0 : _a.code) === constants_1.Status.OK) {
  449. return;
  450. }
  451. let code;
  452. let details = '';
  453. switch (stream.rstCode) {
  454. case http2.constants.NGHTTP2_NO_ERROR:
  455. /* If we get a NO_ERROR code and we already have a status, the
  456. * stream completed properly and we just haven't fully processed
  457. * it yet */
  458. if (this.finalStatus !== null) {
  459. return;
  460. }
  461. code = constants_1.Status.INTERNAL;
  462. details = `Received RST_STREAM with code ${stream.rstCode}`;
  463. break;
  464. case http2.constants.NGHTTP2_REFUSED_STREAM:
  465. code = constants_1.Status.UNAVAILABLE;
  466. details = 'Stream refused by server';
  467. break;
  468. case http2.constants.NGHTTP2_CANCEL:
  469. code = constants_1.Status.CANCELLED;
  470. details = 'Call cancelled';
  471. break;
  472. case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM:
  473. code = constants_1.Status.RESOURCE_EXHAUSTED;
  474. details = 'Bandwidth exhausted or memory limit exceeded';
  475. break;
  476. case http2.constants.NGHTTP2_INADEQUATE_SECURITY:
  477. code = constants_1.Status.PERMISSION_DENIED;
  478. details = 'Protocol not secure enough';
  479. break;
  480. case http2.constants.NGHTTP2_INTERNAL_ERROR:
  481. code = constants_1.Status.INTERNAL;
  482. if (this.internalError === null) {
  483. /* This error code was previously handled in the default case, and
  484. * there are several instances of it online, so I wanted to
  485. * preserve the original error message so that people find existing
  486. * information in searches, but also include the more recognizable
  487. * "Internal server error" message. */
  488. details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`;
  489. }
  490. else {
  491. if (this.internalError.code === 'ECONNRESET' || this.internalError.code === 'ETIMEDOUT') {
  492. code = constants_1.Status.UNAVAILABLE;
  493. details = this.internalError.message;
  494. }
  495. else {
  496. /* The "Received RST_STREAM with code ..." error is preserved
  497. * here for continuity with errors reported online, but the
  498. * error message at the end will probably be more relevant in
  499. * most cases. */
  500. details = `Received RST_STREAM with code ${stream.rstCode} triggered by internal client error: ${this.internalError.message}`;
  501. }
  502. }
  503. break;
  504. default:
  505. code = constants_1.Status.INTERNAL;
  506. details = `Received RST_STREAM with code ${stream.rstCode}`;
  507. }
  508. // This is a no-op if trailers were received at all.
  509. // This is OK, because status codes emitted here correspond to more
  510. // catastrophic issues that prevent us from receiving trailers in the
  511. // first place.
  512. this.endCall({ code, details, metadata: new metadata_1.Metadata() });
  513. });
  514. });
  515. stream.on('error', (err) => {
  516. /* We need an error handler here to stop "Uncaught Error" exceptions
  517. * from bubbling up. However, errors here should all correspond to
  518. * "close" events, where we will handle the error more granularly */
  519. /* Specifically looking for stream errors that were *not* constructed
  520. * from a RST_STREAM response here:
  521. * https://github.com/nodejs/node/blob/8b8620d580314050175983402dfddf2674e8e22a/lib/internal/http2/core.js#L2267
  522. */
  523. if (err.code !== 'ERR_HTTP2_STREAM_ERROR') {
  524. this.trace('Node error event: message=' +
  525. err.message +
  526. ' code=' +
  527. err.code +
  528. ' errno=' +
  529. getSystemErrorName(err.errno) +
  530. ' syscall=' +
  531. err.syscall);
  532. this.internalError = err;
  533. }
  534. this.streamEndWatchers.forEach(watcher => watcher(false));
  535. });
  536. if (this.pendingWrite) {
  537. if (!this.pendingWriteCallback) {
  538. throw new Error('Invalid state in write handling code');
  539. }
  540. this.trace('sending data chunk of length ' +
  541. this.pendingWrite.length +
  542. ' (deferred)');
  543. try {
  544. this.writeMessageToStream(this.pendingWrite, this.pendingWriteCallback);
  545. }
  546. catch (error) {
  547. this.endCall({
  548. code: constants_1.Status.UNAVAILABLE,
  549. details: `Write failed with error ${error.message}`,
  550. metadata: new metadata_1.Metadata()
  551. });
  552. }
  553. }
  554. this.maybeCloseWrites();
  555. }
  556. }
  557. start(metadata, listener) {
  558. this.trace('Sending metadata');
  559. this.listener = listener;
  560. this.channel._startCallStream(this, metadata);
  561. this.maybeOutputStatus();
  562. }
  563. destroyHttp2Stream() {
  564. var _a;
  565. // The http2 stream could already have been destroyed if cancelWithStatus
  566. // is called in response to an internal http2 error.
  567. if (this.http2Stream !== null && !this.http2Stream.destroyed) {
  568. /* If the call has ended with an OK status, communicate that when closing
  569. * the stream, partly to avoid a situation in which we detect an error
  570. * RST_STREAM as a result after we have the status */
  571. let code;
  572. if (((_a = this.finalStatus) === null || _a === void 0 ? void 0 : _a.code) === constants_1.Status.OK) {
  573. code = http2.constants.NGHTTP2_NO_ERROR;
  574. }
  575. else {
  576. code = http2.constants.NGHTTP2_CANCEL;
  577. }
  578. this.trace('close http2 stream with code ' + code);
  579. this.http2Stream.close(code);
  580. }
  581. }
  582. cancelWithStatus(status, details) {
  583. this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"');
  584. this.endCall({ code: status, details, metadata: new metadata_1.Metadata() });
  585. }
  586. getDeadline() {
  587. const deadlineList = [this.options.deadline];
  588. if (this.options.parentCall && this.options.flags & constants_1.Propagate.DEADLINE) {
  589. deadlineList.push(this.options.parentCall.getDeadline());
  590. }
  591. if (this.configDeadline) {
  592. deadlineList.push(this.configDeadline);
  593. }
  594. return getMinDeadline(deadlineList);
  595. }
  596. getCredentials() {
  597. return this.credentials;
  598. }
  599. setCredentials(credentials) {
  600. this.credentials = this.channelCallCredentials.compose(credentials);
  601. }
  602. getStatus() {
  603. return this.finalStatus;
  604. }
  605. getPeer() {
  606. var _a, _b;
  607. return (_b = (_a = this.subchannel) === null || _a === void 0 ? void 0 : _a.getAddress()) !== null && _b !== void 0 ? _b : this.channel.getTarget();
  608. }
  609. getMethod() {
  610. return this.methodName;
  611. }
  612. getHost() {
  613. return this.options.host;
  614. }
  615. setConfigDeadline(configDeadline) {
  616. this.configDeadline = configDeadline;
  617. }
  618. addStatusWatcher(watcher) {
  619. this.statusWatchers.push(watcher);
  620. }
  621. addStreamEndWatcher(watcher) {
  622. this.streamEndWatchers.push(watcher);
  623. }
  624. addFilters(extraFilters) {
  625. this.filterStack.push(extraFilters);
  626. }
  627. getCallNumber() {
  628. return this.callNumber;
  629. }
  630. startRead() {
  631. /* If the stream has ended with an error, we should not emit any more
  632. * messages and we should communicate that the stream has ended */
  633. if (this.finalStatus !== null && this.finalStatus.code !== constants_1.Status.OK) {
  634. this.readsClosed = true;
  635. this.maybeOutputStatus();
  636. return;
  637. }
  638. this.canPush = true;
  639. if (this.http2Stream === null) {
  640. this.pendingRead = true;
  641. }
  642. else {
  643. if (this.unpushedReadMessages.length > 0) {
  644. const nextMessage = this.unpushedReadMessages.shift();
  645. this.push(nextMessage);
  646. return;
  647. }
  648. /* Only resume reading from the http2Stream if we don't have any pending
  649. * messages to emit */
  650. this.http2Stream.resume();
  651. }
  652. }
  653. maybeCloseWrites() {
  654. if (this.writesClosed &&
  655. !this.isWriteFilterPending &&
  656. this.http2Stream !== null) {
  657. this.trace('calling end() on HTTP/2 stream');
  658. this.http2Stream.end();
  659. }
  660. }
  661. sendMessageWithContext(context, message) {
  662. this.trace('write() called with message of length ' + message.length);
  663. const writeObj = {
  664. message,
  665. flags: context.flags,
  666. };
  667. const cb = (error) => {
  668. var _a, _b;
  669. let code = constants_1.Status.UNAVAILABLE;
  670. if (((_a = error) === null || _a === void 0 ? void 0 : _a.code) === 'ERR_STREAM_WRITE_AFTER_END') {
  671. code = constants_1.Status.INTERNAL;
  672. }
  673. if (error) {
  674. this.cancelWithStatus(code, `Write error: ${error.message}`);
  675. }
  676. (_b = context.callback) === null || _b === void 0 ? void 0 : _b.call(context);
  677. };
  678. this.isWriteFilterPending = true;
  679. this.filterStack.sendMessage(Promise.resolve(writeObj)).then((message) => {
  680. this.isWriteFilterPending = false;
  681. if (this.http2Stream === null) {
  682. this.trace('deferring writing data chunk of length ' + message.message.length);
  683. this.pendingWrite = message.message;
  684. this.pendingWriteCallback = cb;
  685. }
  686. else {
  687. this.trace('sending data chunk of length ' + message.message.length);
  688. try {
  689. this.writeMessageToStream(message.message, cb);
  690. }
  691. catch (error) {
  692. this.endCall({
  693. code: constants_1.Status.UNAVAILABLE,
  694. details: `Write failed with error ${error.message}`,
  695. metadata: new metadata_1.Metadata()
  696. });
  697. }
  698. this.maybeCloseWrites();
  699. }
  700. }, this.handleFilterError.bind(this));
  701. }
  702. halfClose() {
  703. this.trace('end() called');
  704. this.writesClosed = true;
  705. this.maybeCloseWrites();
  706. }
  707. }
  708. exports.Http2CallStream = Http2CallStream;
  709. //# sourceMappingURL=call-stream.js.map