client.ts 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710
  1. /*
  2. * Copyright 2019 gRPC authors.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. *
  16. */
  17. import {
  18. ClientDuplexStream,
  19. ClientDuplexStreamImpl,
  20. ClientReadableStream,
  21. ClientReadableStreamImpl,
  22. ClientUnaryCall,
  23. ClientUnaryCallImpl,
  24. ClientWritableStream,
  25. ClientWritableStreamImpl,
  26. ServiceError,
  27. callErrorFromStatus,
  28. SurfaceCall,
  29. } from './call';
  30. import { CallCredentials } from './call-credentials';
  31. import { Deadline, StatusObject } from './call-stream';
  32. import { Channel, ChannelImplementation } from './channel';
  33. import { ConnectivityState } from './connectivity-state';
  34. import { ChannelCredentials } from './channel-credentials';
  35. import { ChannelOptions } from './channel-options';
  36. import { Status } from './constants';
  37. import { Metadata } from './metadata';
  38. import { ClientMethodDefinition } from './make-client';
  39. import {
  40. getInterceptingCall,
  41. Interceptor,
  42. InterceptorProvider,
  43. InterceptorArguments,
  44. InterceptingCallInterface,
  45. } from './client-interceptors';
  46. import {
  47. ServerUnaryCall,
  48. ServerReadableStream,
  49. ServerWritableStream,
  50. ServerDuplexStream,
  51. } from './server-call';
  52. const CHANNEL_SYMBOL = Symbol();
  53. const INTERCEPTOR_SYMBOL = Symbol();
  54. const INTERCEPTOR_PROVIDER_SYMBOL = Symbol();
  55. const CALL_INVOCATION_TRANSFORMER_SYMBOL = Symbol();
  56. function isFunction<ResponseType>(
  57. arg: Metadata | CallOptions | UnaryCallback<ResponseType> | undefined
  58. ): arg is UnaryCallback<ResponseType> {
  59. return typeof arg === 'function';
  60. }
  61. export interface UnaryCallback<ResponseType> {
  62. (err: ServiceError | null, value?: ResponseType): void;
  63. }
  64. /* eslint-disable @typescript-eslint/no-explicit-any */
  65. export interface CallOptions {
  66. deadline?: Deadline;
  67. host?: string;
  68. parent?:
  69. | ServerUnaryCall<any, any>
  70. | ServerReadableStream<any, any>
  71. | ServerWritableStream<any, any>
  72. | ServerDuplexStream<any, any>;
  73. propagate_flags?: number;
  74. credentials?: CallCredentials;
  75. interceptors?: Interceptor[];
  76. interceptor_providers?: InterceptorProvider[];
  77. }
  78. /* eslint-enable @typescript-eslint/no-explicit-any */
  79. export interface CallProperties<RequestType, ResponseType> {
  80. argument?: RequestType;
  81. metadata: Metadata;
  82. call: SurfaceCall;
  83. channel: Channel;
  84. methodDefinition: ClientMethodDefinition<RequestType, ResponseType>;
  85. callOptions: CallOptions;
  86. callback?: UnaryCallback<ResponseType>;
  87. }
  88. export interface CallInvocationTransformer {
  89. (callProperties: CallProperties<any, any>): CallProperties<any, any>; // eslint-disable-line @typescript-eslint/no-explicit-any
  90. }
  91. export type ClientOptions = Partial<ChannelOptions> & {
  92. channelOverride?: Channel;
  93. channelFactoryOverride?: (
  94. address: string,
  95. credentials: ChannelCredentials,
  96. options: ClientOptions
  97. ) => Channel;
  98. interceptors?: Interceptor[];
  99. interceptor_providers?: InterceptorProvider[];
  100. callInvocationTransformer?: CallInvocationTransformer;
  101. };
  102. function getErrorStackString(error: Error): string {
  103. return error.stack!.split('\n').slice(1).join('\n');
  104. }
  105. /**
  106. * A generic gRPC client. Primarily useful as a base class for all generated
  107. * clients.
  108. */
  109. export class Client {
  110. private readonly [CHANNEL_SYMBOL]: Channel;
  111. private readonly [INTERCEPTOR_SYMBOL]: Interceptor[];
  112. private readonly [INTERCEPTOR_PROVIDER_SYMBOL]: InterceptorProvider[];
  113. private readonly [CALL_INVOCATION_TRANSFORMER_SYMBOL]?: CallInvocationTransformer;
  114. constructor(
  115. address: string,
  116. credentials: ChannelCredentials,
  117. options: ClientOptions = {}
  118. ) {
  119. options = Object.assign({}, options);
  120. this[INTERCEPTOR_SYMBOL] = options.interceptors ?? [];
  121. delete options.interceptors;
  122. this[INTERCEPTOR_PROVIDER_SYMBOL] = options.interceptor_providers ?? [];
  123. delete options.interceptor_providers;
  124. if (
  125. this[INTERCEPTOR_SYMBOL].length > 0 &&
  126. this[INTERCEPTOR_PROVIDER_SYMBOL].length > 0
  127. ) {
  128. throw new Error(
  129. 'Both interceptors and interceptor_providers were passed as options ' +
  130. 'to the client constructor. Only one of these is allowed.'
  131. );
  132. }
  133. this[CALL_INVOCATION_TRANSFORMER_SYMBOL] =
  134. options.callInvocationTransformer;
  135. delete options.callInvocationTransformer;
  136. if (options.channelOverride) {
  137. this[CHANNEL_SYMBOL] = options.channelOverride;
  138. } else if (options.channelFactoryOverride) {
  139. const channelFactoryOverride = options.channelFactoryOverride;
  140. delete options.channelFactoryOverride;
  141. this[CHANNEL_SYMBOL] = channelFactoryOverride(
  142. address,
  143. credentials,
  144. options
  145. );
  146. } else {
  147. this[CHANNEL_SYMBOL] = new ChannelImplementation(
  148. address,
  149. credentials,
  150. options
  151. );
  152. }
  153. }
  154. close(): void {
  155. this[CHANNEL_SYMBOL].close();
  156. }
  157. getChannel(): Channel {
  158. return this[CHANNEL_SYMBOL];
  159. }
  160. waitForReady(deadline: Deadline, callback: (error?: Error) => void): void {
  161. const checkState = (err?: Error) => {
  162. if (err) {
  163. callback(new Error('Failed to connect before the deadline'));
  164. return;
  165. }
  166. let newState;
  167. try {
  168. newState = this[CHANNEL_SYMBOL].getConnectivityState(true);
  169. } catch (e) {
  170. callback(new Error('The channel has been closed'));
  171. return;
  172. }
  173. if (newState === ConnectivityState.READY) {
  174. callback();
  175. } else {
  176. try {
  177. this[CHANNEL_SYMBOL].watchConnectivityState(
  178. newState,
  179. deadline,
  180. checkState
  181. );
  182. } catch (e) {
  183. callback(new Error('The channel has been closed'));
  184. }
  185. }
  186. };
  187. setImmediate(checkState);
  188. }
  189. private checkOptionalUnaryResponseArguments<ResponseType>(
  190. arg1: Metadata | CallOptions | UnaryCallback<ResponseType>,
  191. arg2?: CallOptions | UnaryCallback<ResponseType>,
  192. arg3?: UnaryCallback<ResponseType>
  193. ): {
  194. metadata: Metadata;
  195. options: CallOptions;
  196. callback: UnaryCallback<ResponseType>;
  197. } {
  198. if (isFunction(arg1)) {
  199. return { metadata: new Metadata(), options: {}, callback: arg1 };
  200. } else if (isFunction(arg2)) {
  201. if (arg1 instanceof Metadata) {
  202. return { metadata: arg1, options: {}, callback: arg2 };
  203. } else {
  204. return { metadata: new Metadata(), options: arg1, callback: arg2 };
  205. }
  206. } else {
  207. if (
  208. !(
  209. arg1 instanceof Metadata &&
  210. arg2 instanceof Object &&
  211. isFunction(arg3)
  212. )
  213. ) {
  214. throw new Error('Incorrect arguments passed');
  215. }
  216. return { metadata: arg1, options: arg2, callback: arg3 };
  217. }
  218. }
  219. makeUnaryRequest<RequestType, ResponseType>(
  220. method: string,
  221. serialize: (value: RequestType) => Buffer,
  222. deserialize: (value: Buffer) => ResponseType,
  223. argument: RequestType,
  224. metadata: Metadata,
  225. options: CallOptions,
  226. callback: UnaryCallback<ResponseType>
  227. ): ClientUnaryCall;
  228. makeUnaryRequest<RequestType, ResponseType>(
  229. method: string,
  230. serialize: (value: RequestType) => Buffer,
  231. deserialize: (value: Buffer) => ResponseType,
  232. argument: RequestType,
  233. metadata: Metadata,
  234. callback: UnaryCallback<ResponseType>
  235. ): ClientUnaryCall;
  236. makeUnaryRequest<RequestType, ResponseType>(
  237. method: string,
  238. serialize: (value: RequestType) => Buffer,
  239. deserialize: (value: Buffer) => ResponseType,
  240. argument: RequestType,
  241. options: CallOptions,
  242. callback: UnaryCallback<ResponseType>
  243. ): ClientUnaryCall;
  244. makeUnaryRequest<RequestType, ResponseType>(
  245. method: string,
  246. serialize: (value: RequestType) => Buffer,
  247. deserialize: (value: Buffer) => ResponseType,
  248. argument: RequestType,
  249. callback: UnaryCallback<ResponseType>
  250. ): ClientUnaryCall;
  251. makeUnaryRequest<RequestType, ResponseType>(
  252. method: string,
  253. serialize: (value: RequestType) => Buffer,
  254. deserialize: (value: Buffer) => ResponseType,
  255. argument: RequestType,
  256. metadata: Metadata | CallOptions | UnaryCallback<ResponseType>,
  257. options?: CallOptions | UnaryCallback<ResponseType>,
  258. callback?: UnaryCallback<ResponseType>
  259. ): ClientUnaryCall {
  260. const checkedArguments = this.checkOptionalUnaryResponseArguments<ResponseType>(
  261. metadata,
  262. options,
  263. callback
  264. );
  265. const methodDefinition: ClientMethodDefinition<
  266. RequestType,
  267. ResponseType
  268. > = {
  269. path: method,
  270. requestStream: false,
  271. responseStream: false,
  272. requestSerialize: serialize,
  273. responseDeserialize: deserialize,
  274. };
  275. let callProperties: CallProperties<RequestType, ResponseType> = {
  276. argument: argument,
  277. metadata: checkedArguments.metadata,
  278. call: new ClientUnaryCallImpl(),
  279. channel: this[CHANNEL_SYMBOL],
  280. methodDefinition: methodDefinition,
  281. callOptions: checkedArguments.options,
  282. callback: checkedArguments.callback,
  283. };
  284. if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
  285. callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
  286. callProperties
  287. ) as CallProperties<RequestType, ResponseType>;
  288. }
  289. const emitter: ClientUnaryCall = callProperties.call;
  290. const interceptorArgs: InterceptorArguments = {
  291. clientInterceptors: this[INTERCEPTOR_SYMBOL],
  292. clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
  293. callInterceptors: callProperties.callOptions.interceptors ?? [],
  294. callInterceptorProviders:
  295. callProperties.callOptions.interceptor_providers ?? [],
  296. };
  297. const call: InterceptingCallInterface = getInterceptingCall(
  298. interceptorArgs,
  299. callProperties.methodDefinition,
  300. callProperties.callOptions,
  301. callProperties.channel
  302. );
  303. /* This needs to happen before the emitter is used. Unfortunately we can't
  304. * enforce this with the type system. We need to construct this emitter
  305. * before calling the CallInvocationTransformer, and we need to create the
  306. * call after that. */
  307. emitter.call = call;
  308. if (callProperties.callOptions.credentials) {
  309. call.setCredentials(callProperties.callOptions.credentials);
  310. }
  311. let responseMessage: ResponseType | null = null;
  312. let receivedStatus = false;
  313. const callerStackError = new Error();
  314. call.start(callProperties.metadata, {
  315. onReceiveMetadata: (metadata) => {
  316. emitter.emit('metadata', metadata);
  317. },
  318. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  319. onReceiveMessage(message: any) {
  320. if (responseMessage !== null) {
  321. call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
  322. }
  323. responseMessage = message;
  324. },
  325. onReceiveStatus(status: StatusObject) {
  326. if (receivedStatus) {
  327. return;
  328. }
  329. receivedStatus = true;
  330. if (status.code === Status.OK) {
  331. if (responseMessage === null) {
  332. const callerStack = getErrorStackString(callerStackError);
  333. callProperties.callback!(callErrorFromStatus({
  334. code: Status.INTERNAL,
  335. details: 'No message received',
  336. metadata: status.metadata
  337. }, callerStack));
  338. } else {
  339. callProperties.callback!(null, responseMessage);
  340. }
  341. } else {
  342. const callerStack = getErrorStackString(callerStackError);
  343. callProperties.callback!(callErrorFromStatus(status, callerStack));
  344. }
  345. emitter.emit('status', status);
  346. },
  347. });
  348. call.sendMessage(argument);
  349. call.halfClose();
  350. return emitter;
  351. }
  352. makeClientStreamRequest<RequestType, ResponseType>(
  353. method: string,
  354. serialize: (value: RequestType) => Buffer,
  355. deserialize: (value: Buffer) => ResponseType,
  356. metadata: Metadata,
  357. options: CallOptions,
  358. callback: UnaryCallback<ResponseType>
  359. ): ClientWritableStream<RequestType>;
  360. makeClientStreamRequest<RequestType, ResponseType>(
  361. method: string,
  362. serialize: (value: RequestType) => Buffer,
  363. deserialize: (value: Buffer) => ResponseType,
  364. metadata: Metadata,
  365. callback: UnaryCallback<ResponseType>
  366. ): ClientWritableStream<RequestType>;
  367. makeClientStreamRequest<RequestType, ResponseType>(
  368. method: string,
  369. serialize: (value: RequestType) => Buffer,
  370. deserialize: (value: Buffer) => ResponseType,
  371. options: CallOptions,
  372. callback: UnaryCallback<ResponseType>
  373. ): ClientWritableStream<RequestType>;
  374. makeClientStreamRequest<RequestType, ResponseType>(
  375. method: string,
  376. serialize: (value: RequestType) => Buffer,
  377. deserialize: (value: Buffer) => ResponseType,
  378. callback: UnaryCallback<ResponseType>
  379. ): ClientWritableStream<RequestType>;
  380. makeClientStreamRequest<RequestType, ResponseType>(
  381. method: string,
  382. serialize: (value: RequestType) => Buffer,
  383. deserialize: (value: Buffer) => ResponseType,
  384. metadata: Metadata | CallOptions | UnaryCallback<ResponseType>,
  385. options?: CallOptions | UnaryCallback<ResponseType>,
  386. callback?: UnaryCallback<ResponseType>
  387. ): ClientWritableStream<RequestType> {
  388. const checkedArguments = this.checkOptionalUnaryResponseArguments<ResponseType>(
  389. metadata,
  390. options,
  391. callback
  392. );
  393. const methodDefinition: ClientMethodDefinition<
  394. RequestType,
  395. ResponseType
  396. > = {
  397. path: method,
  398. requestStream: true,
  399. responseStream: false,
  400. requestSerialize: serialize,
  401. responseDeserialize: deserialize,
  402. };
  403. let callProperties: CallProperties<RequestType, ResponseType> = {
  404. metadata: checkedArguments.metadata,
  405. call: new ClientWritableStreamImpl<RequestType>(serialize),
  406. channel: this[CHANNEL_SYMBOL],
  407. methodDefinition: methodDefinition,
  408. callOptions: checkedArguments.options,
  409. callback: checkedArguments.callback,
  410. };
  411. if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
  412. callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
  413. callProperties
  414. ) as CallProperties<RequestType, ResponseType>;
  415. }
  416. const emitter: ClientWritableStream<RequestType> = callProperties.call as ClientWritableStream<RequestType>;
  417. const interceptorArgs: InterceptorArguments = {
  418. clientInterceptors: this[INTERCEPTOR_SYMBOL],
  419. clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
  420. callInterceptors: callProperties.callOptions.interceptors ?? [],
  421. callInterceptorProviders:
  422. callProperties.callOptions.interceptor_providers ?? [],
  423. };
  424. const call: InterceptingCallInterface = getInterceptingCall(
  425. interceptorArgs,
  426. callProperties.methodDefinition,
  427. callProperties.callOptions,
  428. callProperties.channel
  429. );
  430. /* This needs to happen before the emitter is used. Unfortunately we can't
  431. * enforce this with the type system. We need to construct this emitter
  432. * before calling the CallInvocationTransformer, and we need to create the
  433. * call after that. */
  434. emitter.call = call;
  435. if (callProperties.callOptions.credentials) {
  436. call.setCredentials(callProperties.callOptions.credentials);
  437. }
  438. let responseMessage: ResponseType | null = null;
  439. let receivedStatus = false;
  440. const callerStackError = new Error();
  441. call.start(callProperties.metadata, {
  442. onReceiveMetadata: (metadata) => {
  443. emitter.emit('metadata', metadata);
  444. },
  445. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  446. onReceiveMessage(message: any) {
  447. if (responseMessage !== null) {
  448. call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
  449. }
  450. responseMessage = message;
  451. },
  452. onReceiveStatus(status: StatusObject) {
  453. if (receivedStatus) {
  454. return;
  455. }
  456. receivedStatus = true;
  457. if (status.code === Status.OK) {
  458. if (responseMessage === null) {
  459. const callerStack = getErrorStackString(callerStackError);
  460. callProperties.callback!(callErrorFromStatus({
  461. code: Status.INTERNAL,
  462. details: 'No message received',
  463. metadata: status.metadata
  464. }, callerStack));
  465. } else {
  466. callProperties.callback!(null, responseMessage);
  467. }
  468. } else {
  469. const callerStack = getErrorStackString(callerStackError);
  470. callProperties.callback!(callErrorFromStatus(status, callerStack));
  471. }
  472. emitter.emit('status', status);
  473. },
  474. });
  475. return emitter;
  476. }
  477. private checkMetadataAndOptions(
  478. arg1?: Metadata | CallOptions,
  479. arg2?: CallOptions
  480. ): { metadata: Metadata; options: CallOptions } {
  481. let metadata: Metadata;
  482. let options: CallOptions;
  483. if (arg1 instanceof Metadata) {
  484. metadata = arg1;
  485. if (arg2) {
  486. options = arg2;
  487. } else {
  488. options = {};
  489. }
  490. } else {
  491. if (arg1) {
  492. options = arg1;
  493. } else {
  494. options = {};
  495. }
  496. metadata = new Metadata();
  497. }
  498. return { metadata, options };
  499. }
  500. makeServerStreamRequest<RequestType, ResponseType>(
  501. method: string,
  502. serialize: (value: RequestType) => Buffer,
  503. deserialize: (value: Buffer) => ResponseType,
  504. argument: RequestType,
  505. metadata: Metadata,
  506. options?: CallOptions
  507. ): ClientReadableStream<ResponseType>;
  508. makeServerStreamRequest<RequestType, ResponseType>(
  509. method: string,
  510. serialize: (value: RequestType) => Buffer,
  511. deserialize: (value: Buffer) => ResponseType,
  512. argument: RequestType,
  513. options?: CallOptions
  514. ): ClientReadableStream<ResponseType>;
  515. makeServerStreamRequest<RequestType, ResponseType>(
  516. method: string,
  517. serialize: (value: RequestType) => Buffer,
  518. deserialize: (value: Buffer) => ResponseType,
  519. argument: RequestType,
  520. metadata?: Metadata | CallOptions,
  521. options?: CallOptions
  522. ): ClientReadableStream<ResponseType> {
  523. const checkedArguments = this.checkMetadataAndOptions(metadata, options);
  524. const methodDefinition: ClientMethodDefinition<
  525. RequestType,
  526. ResponseType
  527. > = {
  528. path: method,
  529. requestStream: false,
  530. responseStream: true,
  531. requestSerialize: serialize,
  532. responseDeserialize: deserialize,
  533. };
  534. let callProperties: CallProperties<RequestType, ResponseType> = {
  535. argument: argument,
  536. metadata: checkedArguments.metadata,
  537. call: new ClientReadableStreamImpl<ResponseType>(deserialize),
  538. channel: this[CHANNEL_SYMBOL],
  539. methodDefinition: methodDefinition,
  540. callOptions: checkedArguments.options,
  541. };
  542. if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
  543. callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
  544. callProperties
  545. ) as CallProperties<RequestType, ResponseType>;
  546. }
  547. const stream: ClientReadableStream<ResponseType> = callProperties.call as ClientReadableStream<ResponseType>;
  548. const interceptorArgs: InterceptorArguments = {
  549. clientInterceptors: this[INTERCEPTOR_SYMBOL],
  550. clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
  551. callInterceptors: callProperties.callOptions.interceptors ?? [],
  552. callInterceptorProviders:
  553. callProperties.callOptions.interceptor_providers ?? [],
  554. };
  555. const call: InterceptingCallInterface = getInterceptingCall(
  556. interceptorArgs,
  557. callProperties.methodDefinition,
  558. callProperties.callOptions,
  559. callProperties.channel
  560. );
  561. /* This needs to happen before the emitter is used. Unfortunately we can't
  562. * enforce this with the type system. We need to construct this emitter
  563. * before calling the CallInvocationTransformer, and we need to create the
  564. * call after that. */
  565. stream.call = call;
  566. if (callProperties.callOptions.credentials) {
  567. call.setCredentials(callProperties.callOptions.credentials);
  568. }
  569. let receivedStatus = false;
  570. const callerStackError = new Error();
  571. call.start(callProperties.metadata, {
  572. onReceiveMetadata(metadata: Metadata) {
  573. stream.emit('metadata', metadata);
  574. },
  575. // eslint-disable-next-line @typescript-eslint/no-explicit-any
  576. onReceiveMessage(message: any) {
  577. stream.push(message);
  578. },
  579. onReceiveStatus(status: StatusObject) {
  580. if (receivedStatus) {
  581. return;
  582. }
  583. receivedStatus = true;
  584. stream.push(null);
  585. if (status.code !== Status.OK) {
  586. const callerStack = getErrorStackString(callerStackError);
  587. stream.emit('error', callErrorFromStatus(status, callerStack));
  588. }
  589. stream.emit('status', status);
  590. },
  591. });
  592. call.sendMessage(argument);
  593. call.halfClose();
  594. return stream;
  595. }
  596. makeBidiStreamRequest<RequestType, ResponseType>(
  597. method: string,
  598. serialize: (value: RequestType) => Buffer,
  599. deserialize: (value: Buffer) => ResponseType,
  600. metadata: Metadata,
  601. options?: CallOptions
  602. ): ClientDuplexStream<RequestType, ResponseType>;
  603. makeBidiStreamRequest<RequestType, ResponseType>(
  604. method: string,
  605. serialize: (value: RequestType) => Buffer,
  606. deserialize: (value: Buffer) => ResponseType,
  607. options?: CallOptions
  608. ): ClientDuplexStream<RequestType, ResponseType>;
  609. makeBidiStreamRequest<RequestType, ResponseType>(
  610. method: string,
  611. serialize: (value: RequestType) => Buffer,
  612. deserialize: (value: Buffer) => ResponseType,
  613. metadata?: Metadata | CallOptions,
  614. options?: CallOptions
  615. ): ClientDuplexStream<RequestType, ResponseType> {
  616. const checkedArguments = this.checkMetadataAndOptions(metadata, options);
  617. const methodDefinition: ClientMethodDefinition<
  618. RequestType,
  619. ResponseType
  620. > = {
  621. path: method,
  622. requestStream: true,
  623. responseStream: true,
  624. requestSerialize: serialize,
  625. responseDeserialize: deserialize,
  626. };
  627. let callProperties: CallProperties<RequestType, ResponseType> = {
  628. metadata: checkedArguments.metadata,
  629. call: new ClientDuplexStreamImpl<RequestType, ResponseType>(
  630. serialize,
  631. deserialize
  632. ),
  633. channel: this[CHANNEL_SYMBOL],
  634. methodDefinition: methodDefinition,
  635. callOptions: checkedArguments.options,
  636. };
  637. if (this[CALL_INVOCATION_TRANSFORMER_SYMBOL]) {
  638. callProperties = this[CALL_INVOCATION_TRANSFORMER_SYMBOL]!(
  639. callProperties
  640. ) as CallProperties<RequestType, ResponseType>;
  641. }
  642. const stream: ClientDuplexStream<
  643. RequestType,
  644. ResponseType
  645. > = callProperties.call as ClientDuplexStream<RequestType, ResponseType>;
  646. const interceptorArgs: InterceptorArguments = {
  647. clientInterceptors: this[INTERCEPTOR_SYMBOL],
  648. clientInterceptorProviders: this[INTERCEPTOR_PROVIDER_SYMBOL],
  649. callInterceptors: callProperties.callOptions.interceptors ?? [],
  650. callInterceptorProviders:
  651. callProperties.callOptions.interceptor_providers ?? [],
  652. };
  653. const call: InterceptingCallInterface = getInterceptingCall(
  654. interceptorArgs,
  655. callProperties.methodDefinition,
  656. callProperties.callOptions,
  657. callProperties.channel
  658. );
  659. /* This needs to happen before the emitter is used. Unfortunately we can't
  660. * enforce this with the type system. We need to construct this emitter
  661. * before calling the CallInvocationTransformer, and we need to create the
  662. * call after that. */
  663. stream.call = call;
  664. if (callProperties.callOptions.credentials) {
  665. call.setCredentials(callProperties.callOptions.credentials);
  666. }
  667. let receivedStatus = false;
  668. const callerStackError = new Error();
  669. call.start(callProperties.metadata, {
  670. onReceiveMetadata(metadata: Metadata) {
  671. stream.emit('metadata', metadata);
  672. },
  673. onReceiveMessage(message: Buffer) {
  674. stream.push(message);
  675. },
  676. onReceiveStatus(status: StatusObject) {
  677. if (receivedStatus) {
  678. return;
  679. }
  680. receivedStatus = true;
  681. stream.push(null);
  682. if (status.code !== Status.OK) {
  683. const callerStack = getErrorStackString(callerStackError);
  684. stream.emit('error', callErrorFromStatus(status, callerStack));
  685. }
  686. stream.emit('status', status);
  687. },
  688. });
  689. return stream;
  690. }
  691. }