subchannel.ts 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012
  1. /*
  2. * Copyright 2019 gRPC authors.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. *
  16. */
  17. import * as http2 from 'http2';
  18. import { ChannelCredentials } from './channel-credentials';
  19. import { Metadata } from './metadata';
  20. import { Call, Http2CallStream, WriteObject } from './call-stream';
  21. import { ChannelOptions } from './channel-options';
  22. import { PeerCertificate, checkServerIdentity, TLSSocket, CipherNameAndProtocol } from 'tls';
  23. import { ConnectivityState } from './connectivity-state';
  24. import { BackoffTimeout, BackoffOptions } from './backoff-timeout';
  25. import { getDefaultAuthority } from './resolver';
  26. import * as logging from './logging';
  27. import { LogVerbosity, Status } from './constants';
  28. import { getProxiedConnection, ProxyConnectionResult } from './http_proxy';
  29. import * as net from 'net';
  30. import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser';
  31. import { ConnectionOptions } from 'tls';
  32. import { FilterFactory, Filter, BaseFilter } from './filter';
  33. import {
  34. stringToSubchannelAddress,
  35. SubchannelAddress,
  36. subchannelAddressToString,
  37. } from './subchannel-address';
  38. import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, SocketInfo, SocketRef, unregisterChannelzRef, registerChannelzSocket, TlsInfo } from './channelz';
  39. import { ConnectivityStateListener } from './subchannel-interface';
  40. const clientVersion = require('../../package.json').version;
  41. const TRACER_NAME = 'subchannel';
  42. const FLOW_CONTROL_TRACER_NAME = 'subchannel_flowctrl';
  43. const MIN_CONNECT_TIMEOUT_MS = 20000;
  44. const INITIAL_BACKOFF_MS = 1000;
  45. const BACKOFF_MULTIPLIER = 1.6;
  46. const MAX_BACKOFF_MS = 120000;
  47. const BACKOFF_JITTER = 0.2;
  48. /* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't
  49. * have a constant for the max signed 32 bit integer, so this is a simple way
  50. * to calculate it */
  51. const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
  52. const KEEPALIVE_TIMEOUT_MS = 20000;
  53. export interface SubchannelCallStatsTracker {
  54. addMessageSent(): void;
  55. addMessageReceived(): void;
  56. }
  57. const {
  58. HTTP2_HEADER_AUTHORITY,
  59. HTTP2_HEADER_CONTENT_TYPE,
  60. HTTP2_HEADER_METHOD,
  61. HTTP2_HEADER_PATH,
  62. HTTP2_HEADER_TE,
  63. HTTP2_HEADER_USER_AGENT,
  64. } = http2.constants;
  65. /**
  66. * Get a number uniformly at random in the range [min, max)
  67. * @param min
  68. * @param max
  69. */
  70. function uniformRandom(min: number, max: number) {
  71. return Math.random() * (max - min) + min;
  72. }
  73. const tooManyPingsData: Buffer = Buffer.from('too_many_pings', 'ascii');
  74. export class Subchannel {
  75. /**
  76. * The subchannel's current connectivity state. Invariant: `session` === `null`
  77. * if and only if `connectivityState` is IDLE or TRANSIENT_FAILURE.
  78. */
  79. private connectivityState: ConnectivityState = ConnectivityState.IDLE;
  80. /**
  81. * The underlying http2 session used to make requests.
  82. */
  83. private session: http2.ClientHttp2Session | null = null;
  84. /**
  85. * Indicates that the subchannel should transition from TRANSIENT_FAILURE to
  86. * CONNECTING instead of IDLE when the backoff timeout ends.
  87. */
  88. private continueConnecting = false;
  89. /**
  90. * A list of listener functions that will be called whenever the connectivity
  91. * state changes. Will be modified by `addConnectivityStateListener` and
  92. * `removeConnectivityStateListener`
  93. */
  94. private stateListeners: ConnectivityStateListener[] = [];
  95. /**
  96. * A list of listener functions that will be called when the underlying
  97. * socket disconnects. Used for ending active calls with an UNAVAILABLE
  98. * status.
  99. */
  100. private disconnectListeners: Set<() => void> = new Set();
  101. private backoffTimeout: BackoffTimeout;
  102. /**
  103. * The complete user agent string constructed using channel args.
  104. */
  105. private userAgent: string;
  106. /**
  107. * The amount of time in between sending pings
  108. */
  109. private keepaliveTimeMs: number = KEEPALIVE_MAX_TIME_MS;
  110. /**
  111. * The amount of time to wait for an acknowledgement after sending a ping
  112. */
  113. private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS;
  114. /**
  115. * Timer reference for timeout that indicates when to send the next ping
  116. */
  117. private keepaliveIntervalId: NodeJS.Timer;
  118. /**
  119. * Timer reference tracking when the most recent ping will be considered lost
  120. */
  121. private keepaliveTimeoutId: NodeJS.Timer;
  122. /**
  123. * Indicates whether keepalive pings should be sent without any active calls
  124. */
  125. private keepaliveWithoutCalls = false;
  126. /**
  127. * Tracks calls with references to this subchannel
  128. */
  129. private callRefcount = 0;
  130. /**
  131. * Tracks channels and subchannel pools with references to this subchannel
  132. */
  133. private refcount = 0;
  134. /**
  135. * A string representation of the subchannel address, for logging/tracing
  136. */
  137. private subchannelAddressString: string;
  138. // Channelz info
  139. private readonly channelzEnabled: boolean = true;
  140. private channelzRef: SubchannelRef;
  141. private channelzTrace: ChannelzTrace;
  142. private callTracker = new ChannelzCallTracker();
  143. private childrenTracker = new ChannelzChildrenTracker();
  144. // Channelz socket info
  145. private channelzSocketRef: SocketRef | null = null;
  146. /**
  147. * Name of the remote server, if it is not the same as the subchannel
  148. * address, i.e. if connecting through an HTTP CONNECT proxy.
  149. */
  150. private remoteName: string | null = null;
  151. private streamTracker = new ChannelzCallTracker();
  152. private keepalivesSent = 0;
  153. private messagesSent = 0;
  154. private messagesReceived = 0;
  155. private lastMessageSentTimestamp: Date | null = null;
  156. private lastMessageReceivedTimestamp: Date | null = null;
  157. /**
  158. * A class representing a connection to a single backend.
  159. * @param channelTarget The target string for the channel as a whole
  160. * @param subchannelAddress The address for the backend that this subchannel
  161. * will connect to
  162. * @param options The channel options, plus any specific subchannel options
  163. * for this subchannel
  164. * @param credentials The channel credentials used to establish this
  165. * connection
  166. */
  167. constructor(
  168. private channelTarget: GrpcUri,
  169. private subchannelAddress: SubchannelAddress,
  170. private options: ChannelOptions,
  171. private credentials: ChannelCredentials
  172. ) {
  173. // Build user-agent string.
  174. this.userAgent = [
  175. options['grpc.primary_user_agent'],
  176. `grpc-node-js/${clientVersion}`,
  177. options['grpc.secondary_user_agent'],
  178. ]
  179. .filter((e) => e)
  180. .join(' '); // remove falsey values first
  181. if ('grpc.keepalive_time_ms' in options) {
  182. this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!;
  183. }
  184. if ('grpc.keepalive_timeout_ms' in options) {
  185. this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!;
  186. }
  187. if ('grpc.keepalive_permit_without_calls' in options) {
  188. this.keepaliveWithoutCalls =
  189. options['grpc.keepalive_permit_without_calls'] === 1;
  190. } else {
  191. this.keepaliveWithoutCalls = false;
  192. }
  193. this.keepaliveIntervalId = setTimeout(() => {}, 0);
  194. clearTimeout(this.keepaliveIntervalId);
  195. this.keepaliveTimeoutId = setTimeout(() => {}, 0);
  196. clearTimeout(this.keepaliveTimeoutId);
  197. const backoffOptions: BackoffOptions = {
  198. initialDelay: options['grpc.initial_reconnect_backoff_ms'],
  199. maxDelay: options['grpc.max_reconnect_backoff_ms'],
  200. };
  201. this.backoffTimeout = new BackoffTimeout(() => {
  202. this.handleBackoffTimer();
  203. }, backoffOptions);
  204. this.subchannelAddressString = subchannelAddressToString(subchannelAddress);
  205. if (options['grpc.enable_channelz'] === 0) {
  206. this.channelzEnabled = false;
  207. }
  208. this.channelzTrace = new ChannelzTrace();
  209. this.channelzRef = registerChannelzSubchannel(this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled);
  210. if (this.channelzEnabled) {
  211. this.channelzTrace.addTrace('CT_INFO', 'Subchannel created');
  212. }
  213. this.trace('Subchannel constructed with options ' + JSON.stringify(options, undefined, 2));
  214. }
  215. private getChannelzInfo(): SubchannelInfo {
  216. return {
  217. state: this.connectivityState,
  218. trace: this.channelzTrace,
  219. callTracker: this.callTracker,
  220. children: this.childrenTracker.getChildLists(),
  221. target: this.subchannelAddressString
  222. };
  223. }
  224. private getChannelzSocketInfo(): SocketInfo | null {
  225. if (this.session === null) {
  226. return null;
  227. }
  228. const sessionSocket = this.session.socket;
  229. const remoteAddress = sessionSocket.remoteAddress ? stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null;
  230. const localAddress = sessionSocket.localAddress ? stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort) : null;
  231. let tlsInfo: TlsInfo | null;
  232. if (this.session.encrypted) {
  233. const tlsSocket: TLSSocket = sessionSocket as TLSSocket;
  234. const cipherInfo: CipherNameAndProtocol & {standardName?: string} = tlsSocket.getCipher();
  235. const certificate = tlsSocket.getCertificate();
  236. const peerCertificate = tlsSocket.getPeerCertificate();
  237. tlsInfo = {
  238. cipherSuiteStandardName: cipherInfo.standardName ?? null,
  239. cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
  240. localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null,
  241. remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null
  242. };
  243. } else {
  244. tlsInfo = null;
  245. }
  246. const socketInfo: SocketInfo = {
  247. remoteAddress: remoteAddress,
  248. localAddress: localAddress,
  249. security: tlsInfo,
  250. remoteName: this.remoteName,
  251. streamsStarted: this.streamTracker.callsStarted,
  252. streamsSucceeded: this.streamTracker.callsSucceeded,
  253. streamsFailed: this.streamTracker.callsFailed,
  254. messagesSent: this.messagesSent,
  255. messagesReceived: this.messagesReceived,
  256. keepAlivesSent: this.keepalivesSent,
  257. lastLocalStreamCreatedTimestamp: this.streamTracker.lastCallStartedTimestamp,
  258. lastRemoteStreamCreatedTimestamp: null,
  259. lastMessageSentTimestamp: this.lastMessageSentTimestamp,
  260. lastMessageReceivedTimestamp: this.lastMessageReceivedTimestamp,
  261. localFlowControlWindow: this.session.state.localWindowSize ?? null,
  262. remoteFlowControlWindow: this.session.state.remoteWindowSize ?? null
  263. };
  264. return socketInfo;
  265. }
  266. private resetChannelzSocketInfo() {
  267. if (!this.channelzEnabled) {
  268. return;
  269. }
  270. if (this.channelzSocketRef) {
  271. unregisterChannelzRef(this.channelzSocketRef);
  272. this.childrenTracker.unrefChild(this.channelzSocketRef);
  273. this.channelzSocketRef = null;
  274. }
  275. this.remoteName = null;
  276. this.streamTracker = new ChannelzCallTracker();
  277. this.keepalivesSent = 0;
  278. this.messagesSent = 0;
  279. this.messagesReceived = 0;
  280. this.lastMessageSentTimestamp = null;
  281. this.lastMessageReceivedTimestamp = null;
  282. }
  283. private trace(text: string): void {
  284. logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
  285. }
  286. private refTrace(text: string): void {
  287. logging.trace(LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
  288. }
  289. private flowControlTrace(text: string): void {
  290. logging.trace(LogVerbosity.DEBUG, FLOW_CONTROL_TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
  291. }
  292. private internalsTrace(text: string): void {
  293. logging.trace(LogVerbosity.DEBUG, 'subchannel_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
  294. }
  295. private keepaliveTrace(text: string): void {
  296. logging.trace(LogVerbosity.DEBUG, 'keepalive', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text);
  297. }
  298. private handleBackoffTimer() {
  299. if (this.continueConnecting) {
  300. this.transitionToState(
  301. [ConnectivityState.TRANSIENT_FAILURE],
  302. ConnectivityState.CONNECTING
  303. );
  304. } else {
  305. this.transitionToState(
  306. [ConnectivityState.TRANSIENT_FAILURE],
  307. ConnectivityState.IDLE
  308. );
  309. }
  310. }
  311. /**
  312. * Start a backoff timer with the current nextBackoff timeout
  313. */
  314. private startBackoff() {
  315. this.backoffTimeout.runOnce();
  316. }
  317. private stopBackoff() {
  318. this.backoffTimeout.stop();
  319. this.backoffTimeout.reset();
  320. }
  321. private sendPing() {
  322. if (this.channelzEnabled) {
  323. this.keepalivesSent += 1;
  324. }
  325. this.keepaliveTrace('Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms');
  326. this.keepaliveTimeoutId = setTimeout(() => {
  327. this.keepaliveTrace('Ping timeout passed without response');
  328. this.handleDisconnect();
  329. }, this.keepaliveTimeoutMs);
  330. this.keepaliveTimeoutId.unref?.();
  331. try {
  332. this.session!.ping(
  333. (err: Error | null, duration: number, payload: Buffer) => {
  334. this.keepaliveTrace('Received ping response');
  335. clearTimeout(this.keepaliveTimeoutId);
  336. }
  337. );
  338. } catch (e) {
  339. /* If we fail to send a ping, the connection is no longer functional, so
  340. * we should discard it. */
  341. this.transitionToState(
  342. [ConnectivityState.READY],
  343. ConnectivityState.TRANSIENT_FAILURE
  344. );
  345. }
  346. }
  347. private startKeepalivePings() {
  348. this.keepaliveIntervalId = setInterval(() => {
  349. this.sendPing();
  350. }, this.keepaliveTimeMs);
  351. this.keepaliveIntervalId.unref?.();
  352. /* Don't send a ping immediately because whatever caused us to start
  353. * sending pings should also involve some network activity. */
  354. }
  355. /**
  356. * Stop keepalive pings when terminating a connection. This discards the
  357. * outstanding ping timeout, so it should not be called if the same
  358. * connection will still be used.
  359. */
  360. private stopKeepalivePings() {
  361. clearInterval(this.keepaliveIntervalId);
  362. clearTimeout(this.keepaliveTimeoutId);
  363. }
  364. private createSession(proxyConnectionResult: ProxyConnectionResult) {
  365. if (proxyConnectionResult.realTarget) {
  366. this.remoteName = uriToString(proxyConnectionResult.realTarget);
  367. this.trace('creating HTTP/2 session through proxy to ' + proxyConnectionResult.realTarget);
  368. } else {
  369. this.remoteName = null;
  370. this.trace('creating HTTP/2 session');
  371. }
  372. const targetAuthority = getDefaultAuthority(
  373. proxyConnectionResult.realTarget ?? this.channelTarget
  374. );
  375. let connectionOptions: http2.SecureClientSessionOptions =
  376. this.credentials._getConnectionOptions() || {};
  377. connectionOptions.maxSendHeaderBlockLength = Number.MAX_SAFE_INTEGER;
  378. if ('grpc-node.max_session_memory' in this.options) {
  379. connectionOptions.maxSessionMemory = this.options[
  380. 'grpc-node.max_session_memory'
  381. ];
  382. } else {
  383. /* By default, set a very large max session memory limit, to effectively
  384. * disable enforcement of the limit. Some testing indicates that Node's
  385. * behavior degrades badly when this limit is reached, so we solve that
  386. * by disabling the check entirely. */
  387. connectionOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER;
  388. }
  389. let addressScheme = 'http://';
  390. if ('secureContext' in connectionOptions) {
  391. addressScheme = 'https://';
  392. // If provided, the value of grpc.ssl_target_name_override should be used
  393. // to override the target hostname when checking server identity.
  394. // This option is used for testing only.
  395. if (this.options['grpc.ssl_target_name_override']) {
  396. const sslTargetNameOverride = this.options[
  397. 'grpc.ssl_target_name_override'
  398. ]!;
  399. connectionOptions.checkServerIdentity = (
  400. host: string,
  401. cert: PeerCertificate
  402. ): Error | undefined => {
  403. return checkServerIdentity(sslTargetNameOverride, cert);
  404. };
  405. connectionOptions.servername = sslTargetNameOverride;
  406. } else {
  407. const authorityHostname =
  408. splitHostPort(targetAuthority)?.host ?? 'localhost';
  409. // We want to always set servername to support SNI
  410. connectionOptions.servername = authorityHostname;
  411. }
  412. if (proxyConnectionResult.socket) {
  413. /* This is part of the workaround for
  414. * https://github.com/nodejs/node/issues/32922. Without that bug,
  415. * proxyConnectionResult.socket would always be a plaintext socket and
  416. * this would say
  417. * connectionOptions.socket = proxyConnectionResult.socket; */
  418. connectionOptions.createConnection = (authority, option) => {
  419. return proxyConnectionResult.socket!;
  420. };
  421. }
  422. } else {
  423. /* In all but the most recent versions of Node, http2.connect does not use
  424. * the options when establishing plaintext connections, so we need to
  425. * establish that connection explicitly. */
  426. connectionOptions.createConnection = (authority, option) => {
  427. if (proxyConnectionResult.socket) {
  428. return proxyConnectionResult.socket;
  429. } else {
  430. /* net.NetConnectOpts is declared in a way that is more restrictive
  431. * than what net.connect will actually accept, so we use the type
  432. * assertion to work around that. */
  433. return net.connect(this.subchannelAddress);
  434. }
  435. };
  436. }
  437. connectionOptions = {
  438. ...connectionOptions,
  439. ...this.subchannelAddress,
  440. };
  441. /* http2.connect uses the options here:
  442. * https://github.com/nodejs/node/blob/70c32a6d190e2b5d7b9ff9d5b6a459d14e8b7d59/lib/internal/http2/core.js#L3028-L3036
  443. * The spread operator overides earlier values with later ones, so any port
  444. * or host values in the options will be used rather than any values extracted
  445. * from the first argument. In addition, the path overrides the host and port,
  446. * as documented for plaintext connections here:
  447. * https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener
  448. * and for TLS connections here:
  449. * https://nodejs.org/api/tls.html#tls_tls_connect_options_callback. In
  450. * earlier versions of Node, http2.connect passes these options to
  451. * tls.connect but not net.connect, so in the insecure case we still need
  452. * to set the createConnection option above to create the connection
  453. * explicitly. We cannot do that in the TLS case because http2.connect
  454. * passes necessary additional options to tls.connect.
  455. * The first argument just needs to be parseable as a URL and the scheme
  456. * determines whether the connection will be established over TLS or not.
  457. */
  458. const session = http2.connect(
  459. addressScheme + targetAuthority,
  460. connectionOptions
  461. );
  462. this.session = session;
  463. this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!, this.channelzEnabled);
  464. if (this.channelzEnabled) {
  465. this.childrenTracker.refChild(this.channelzSocketRef);
  466. }
  467. session.unref();
  468. /* For all of these events, check if the session at the time of the event
  469. * is the same one currently attached to this subchannel, to ensure that
  470. * old events from previous connection attempts cannot cause invalid state
  471. * transitions. */
  472. session.once('connect', () => {
  473. if (this.session === session) {
  474. this.transitionToState(
  475. [ConnectivityState.CONNECTING],
  476. ConnectivityState.READY
  477. );
  478. }
  479. });
  480. session.once('close', () => {
  481. if (this.session === session) {
  482. this.trace('connection closed');
  483. this.transitionToState(
  484. [ConnectivityState.CONNECTING],
  485. ConnectivityState.TRANSIENT_FAILURE
  486. );
  487. /* Transitioning directly to IDLE here should be OK because we are not
  488. * doing any backoff, because a connection was established at some
  489. * point */
  490. this.transitionToState(
  491. [ConnectivityState.READY],
  492. ConnectivityState.IDLE
  493. );
  494. }
  495. });
  496. session.once(
  497. 'goaway',
  498. (errorCode: number, lastStreamID: number, opaqueData: Buffer) => {
  499. if (this.session === session) {
  500. /* See the last paragraph of
  501. * https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md#basic-keepalive */
  502. if (
  503. errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM &&
  504. opaqueData.equals(tooManyPingsData)
  505. ) {
  506. this.keepaliveTimeMs = Math.min(
  507. 2 * this.keepaliveTimeMs,
  508. KEEPALIVE_MAX_TIME_MS
  509. );
  510. logging.log(
  511. LogVerbosity.ERROR,
  512. `Connection to ${uriToString(this.channelTarget)} at ${
  513. this.subchannelAddressString
  514. } rejected by server because of excess pings. Increasing ping interval to ${
  515. this.keepaliveTimeMs
  516. } ms`
  517. );
  518. }
  519. this.trace(
  520. 'connection closed by GOAWAY with code ' +
  521. errorCode
  522. );
  523. this.transitionToState(
  524. [ConnectivityState.CONNECTING, ConnectivityState.READY],
  525. ConnectivityState.IDLE
  526. );
  527. }
  528. }
  529. );
  530. session.once('error', (error) => {
  531. /* Do nothing here. Any error should also trigger a close event, which is
  532. * where we want to handle that. */
  533. this.trace(
  534. 'connection closed with error ' +
  535. (error as Error).message
  536. );
  537. });
  538. if (logging.isTracerEnabled(TRACER_NAME)) {
  539. session.on('remoteSettings', (settings: http2.Settings) => {
  540. this.trace(
  541. 'new settings received' +
  542. (this.session !== session ? ' on the old connection' : '') +
  543. ': ' +
  544. JSON.stringify(settings)
  545. );
  546. });
  547. session.on('localSettings', (settings: http2.Settings) => {
  548. this.trace(
  549. 'local settings acknowledged by remote' +
  550. (this.session !== session ? ' on the old connection' : '') +
  551. ': ' +
  552. JSON.stringify(settings)
  553. );
  554. });
  555. }
  556. }
  557. private startConnectingInternal() {
  558. /* Pass connection options through to the proxy so that it's able to
  559. * upgrade it's connection to support tls if needed.
  560. * This is a workaround for https://github.com/nodejs/node/issues/32922
  561. * See https://github.com/grpc/grpc-node/pull/1369 for more info. */
  562. const connectionOptions: ConnectionOptions =
  563. this.credentials._getConnectionOptions() || {};
  564. if ('secureContext' in connectionOptions) {
  565. connectionOptions.ALPNProtocols = ['h2'];
  566. // If provided, the value of grpc.ssl_target_name_override should be used
  567. // to override the target hostname when checking server identity.
  568. // This option is used for testing only.
  569. if (this.options['grpc.ssl_target_name_override']) {
  570. const sslTargetNameOverride = this.options[
  571. 'grpc.ssl_target_name_override'
  572. ]!;
  573. connectionOptions.checkServerIdentity = (
  574. host: string,
  575. cert: PeerCertificate
  576. ): Error | undefined => {
  577. return checkServerIdentity(sslTargetNameOverride, cert);
  578. };
  579. connectionOptions.servername = sslTargetNameOverride;
  580. } else {
  581. if ('grpc.http_connect_target' in this.options) {
  582. /* This is more or less how servername will be set in createSession
  583. * if a connection is successfully established through the proxy.
  584. * If the proxy is not used, these connectionOptions are discarded
  585. * anyway */
  586. const targetPath = getDefaultAuthority(
  587. parseUri(this.options['grpc.http_connect_target'] as string) ?? {
  588. path: 'localhost',
  589. }
  590. );
  591. const hostPort = splitHostPort(targetPath);
  592. connectionOptions.servername = hostPort?.host ?? targetPath;
  593. }
  594. }
  595. }
  596. getProxiedConnection(
  597. this.subchannelAddress,
  598. this.options,
  599. connectionOptions
  600. ).then(
  601. (result) => {
  602. this.createSession(result);
  603. },
  604. (reason) => {
  605. this.transitionToState(
  606. [ConnectivityState.CONNECTING],
  607. ConnectivityState.TRANSIENT_FAILURE
  608. );
  609. }
  610. );
  611. }
  612. private handleDisconnect() {
  613. this.transitionToState(
  614. [ConnectivityState.READY],
  615. ConnectivityState.TRANSIENT_FAILURE);
  616. for (const listener of this.disconnectListeners.values()) {
  617. listener();
  618. }
  619. }
  620. /**
  621. * Initiate a state transition from any element of oldStates to the new
  622. * state. If the current connectivityState is not in oldStates, do nothing.
  623. * @param oldStates The set of states to transition from
  624. * @param newState The state to transition to
  625. * @returns True if the state changed, false otherwise
  626. */
  627. private transitionToState(
  628. oldStates: ConnectivityState[],
  629. newState: ConnectivityState
  630. ): boolean {
  631. if (oldStates.indexOf(this.connectivityState) === -1) {
  632. return false;
  633. }
  634. this.trace(
  635. ConnectivityState[this.connectivityState] +
  636. ' -> ' +
  637. ConnectivityState[newState]
  638. );
  639. if (this.channelzEnabled) {
  640. this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]);
  641. }
  642. const previousState = this.connectivityState;
  643. this.connectivityState = newState;
  644. switch (newState) {
  645. case ConnectivityState.READY:
  646. this.stopBackoff();
  647. const session = this.session!;
  648. session.socket.once('close', () => {
  649. if (this.session === session) {
  650. this.handleDisconnect();
  651. }
  652. });
  653. if (this.keepaliveWithoutCalls) {
  654. this.startKeepalivePings();
  655. }
  656. break;
  657. case ConnectivityState.CONNECTING:
  658. this.startBackoff();
  659. this.startConnectingInternal();
  660. this.continueConnecting = false;
  661. break;
  662. case ConnectivityState.TRANSIENT_FAILURE:
  663. if (this.session) {
  664. this.session.close();
  665. }
  666. this.session = null;
  667. this.resetChannelzSocketInfo();
  668. this.stopKeepalivePings();
  669. /* If the backoff timer has already ended by the time we get to the
  670. * TRANSIENT_FAILURE state, we want to immediately transition out of
  671. * TRANSIENT_FAILURE as though the backoff timer is ending right now */
  672. if (!this.backoffTimeout.isRunning()) {
  673. process.nextTick(() => {
  674. this.handleBackoffTimer();
  675. });
  676. }
  677. break;
  678. case ConnectivityState.IDLE:
  679. if (this.session) {
  680. this.session.close();
  681. }
  682. this.session = null;
  683. this.resetChannelzSocketInfo();
  684. this.stopKeepalivePings();
  685. break;
  686. default:
  687. throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
  688. }
  689. /* We use a shallow copy of the stateListeners array in case a listener
  690. * is removed during this iteration */
  691. for (const listener of [...this.stateListeners]) {
  692. listener(this, previousState, newState);
  693. }
  694. return true;
  695. }
  696. /**
  697. * Check if the subchannel associated with zero calls and with zero channels.
  698. * If so, shut it down.
  699. */
  700. private checkBothRefcounts() {
  701. /* If no calls, channels, or subchannel pools have any more references to
  702. * this subchannel, we can be sure it will never be used again. */
  703. if (this.callRefcount === 0 && this.refcount === 0) {
  704. if (this.channelzEnabled) {
  705. this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
  706. }
  707. this.transitionToState(
  708. [ConnectivityState.CONNECTING, ConnectivityState.READY],
  709. ConnectivityState.IDLE
  710. );
  711. if (this.channelzEnabled) {
  712. unregisterChannelzRef(this.channelzRef);
  713. }
  714. }
  715. }
  716. callRef() {
  717. this.refTrace(
  718. 'callRefcount ' +
  719. this.callRefcount +
  720. ' -> ' +
  721. (this.callRefcount + 1)
  722. );
  723. if (this.callRefcount === 0) {
  724. if (this.session) {
  725. this.session.ref();
  726. }
  727. this.backoffTimeout.ref();
  728. if (!this.keepaliveWithoutCalls) {
  729. this.startKeepalivePings();
  730. }
  731. }
  732. this.callRefcount += 1;
  733. }
  734. callUnref() {
  735. this.refTrace(
  736. 'callRefcount ' +
  737. this.callRefcount +
  738. ' -> ' +
  739. (this.callRefcount - 1)
  740. );
  741. this.callRefcount -= 1;
  742. if (this.callRefcount === 0) {
  743. if (this.session) {
  744. this.session.unref();
  745. }
  746. this.backoffTimeout.unref();
  747. if (!this.keepaliveWithoutCalls) {
  748. clearInterval(this.keepaliveIntervalId);
  749. }
  750. this.checkBothRefcounts();
  751. }
  752. }
  753. ref() {
  754. this.refTrace(
  755. 'refcount ' +
  756. this.refcount +
  757. ' -> ' +
  758. (this.refcount + 1)
  759. );
  760. this.refcount += 1;
  761. }
  762. unref() {
  763. this.refTrace(
  764. 'refcount ' +
  765. this.refcount +
  766. ' -> ' +
  767. (this.refcount - 1)
  768. );
  769. this.refcount -= 1;
  770. this.checkBothRefcounts();
  771. }
  772. unrefIfOneRef(): boolean {
  773. if (this.refcount === 1) {
  774. this.unref();
  775. return true;
  776. }
  777. return false;
  778. }
  779. /**
  780. * Start a stream on the current session with the given `metadata` as headers
  781. * and then attach it to the `callStream`. Must only be called if the
  782. * subchannel's current connectivity state is READY.
  783. * @param metadata
  784. * @param callStream
  785. */
  786. startCallStream(
  787. metadata: Metadata,
  788. callStream: Http2CallStream,
  789. extraFilters: Filter[]
  790. ) {
  791. const headers = metadata.toHttp2Headers();
  792. headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost();
  793. headers[HTTP2_HEADER_USER_AGENT] = this.userAgent;
  794. headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc';
  795. headers[HTTP2_HEADER_METHOD] = 'POST';
  796. headers[HTTP2_HEADER_PATH] = callStream.getMethod();
  797. headers[HTTP2_HEADER_TE] = 'trailers';
  798. let http2Stream: http2.ClientHttp2Stream;
  799. /* In theory, if an error is thrown by session.request because session has
  800. * become unusable (e.g. because it has received a goaway), this subchannel
  801. * should soon see the corresponding close or goaway event anyway and leave
  802. * READY. But we have seen reports that this does not happen
  803. * (https://github.com/googleapis/nodejs-firestore/issues/1023#issuecomment-653204096)
  804. * so for defense in depth, we just discard the session when we see an
  805. * error here.
  806. */
  807. try {
  808. http2Stream = this.session!.request(headers);
  809. } catch (e) {
  810. this.transitionToState(
  811. [ConnectivityState.READY],
  812. ConnectivityState.TRANSIENT_FAILURE
  813. );
  814. throw e;
  815. }
  816. let headersString = '';
  817. for (const header of Object.keys(headers)) {
  818. headersString += '\t\t' + header + ': ' + headers[header] + '\n';
  819. }
  820. logging.trace(
  821. LogVerbosity.DEBUG,
  822. 'call_stream',
  823. 'Starting stream [' + callStream.getCallNumber() + '] on subchannel ' +
  824. '(' + this.channelzRef.id + ') ' +
  825. this.subchannelAddressString +
  826. ' with headers\n' +
  827. headersString
  828. );
  829. this.flowControlTrace(
  830. 'local window size: ' +
  831. this.session!.state.localWindowSize +
  832. ' remote window size: ' +
  833. this.session!.state.remoteWindowSize
  834. );
  835. const streamSession = this.session;
  836. this.internalsTrace(
  837. 'session.closed=' +
  838. streamSession!.closed +
  839. ' session.destroyed=' +
  840. streamSession!.destroyed +
  841. ' session.socket.destroyed=' +
  842. streamSession!.socket.destroyed);
  843. let statsTracker: SubchannelCallStatsTracker;
  844. if (this.channelzEnabled) {
  845. this.callTracker.addCallStarted();
  846. callStream.addStatusWatcher(status => {
  847. if (status.code === Status.OK) {
  848. this.callTracker.addCallSucceeded();
  849. } else {
  850. this.callTracker.addCallFailed();
  851. }
  852. });
  853. this.streamTracker.addCallStarted();
  854. callStream.addStreamEndWatcher(success => {
  855. if (streamSession === this.session) {
  856. if (success) {
  857. this.streamTracker.addCallSucceeded();
  858. } else {
  859. this.streamTracker.addCallFailed();
  860. }
  861. }
  862. });
  863. statsTracker = {
  864. addMessageSent: () => {
  865. this.messagesSent += 1;
  866. this.lastMessageSentTimestamp = new Date();
  867. },
  868. addMessageReceived: () => {
  869. this.messagesReceived += 1;
  870. }
  871. }
  872. } else {
  873. statsTracker = {
  874. addMessageSent: () => {},
  875. addMessageReceived: () => {}
  876. }
  877. }
  878. callStream.attachHttp2Stream(http2Stream, this, extraFilters, statsTracker);
  879. }
  880. /**
  881. * If the subchannel is currently IDLE, start connecting and switch to the
  882. * CONNECTING state. If the subchannel is current in TRANSIENT_FAILURE,
  883. * the next time it would transition to IDLE, start connecting again instead.
  884. * Otherwise, do nothing.
  885. */
  886. startConnecting() {
  887. /* First, try to transition from IDLE to connecting. If that doesn't happen
  888. * because the state is not currently IDLE, check if it is
  889. * TRANSIENT_FAILURE, and if so indicate that it should go back to
  890. * connecting after the backoff timer ends. Otherwise do nothing */
  891. if (
  892. !this.transitionToState(
  893. [ConnectivityState.IDLE],
  894. ConnectivityState.CONNECTING
  895. )
  896. ) {
  897. if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) {
  898. this.continueConnecting = true;
  899. }
  900. }
  901. }
  902. /**
  903. * Get the subchannel's current connectivity state.
  904. */
  905. getConnectivityState() {
  906. return this.connectivityState;
  907. }
  908. /**
  909. * Add a listener function to be called whenever the subchannel's
  910. * connectivity state changes.
  911. * @param listener
  912. */
  913. addConnectivityStateListener(listener: ConnectivityStateListener) {
  914. this.stateListeners.push(listener);
  915. }
  916. /**
  917. * Remove a listener previously added with `addConnectivityStateListener`
  918. * @param listener A reference to a function previously passed to
  919. * `addConnectivityStateListener`
  920. */
  921. removeConnectivityStateListener(listener: ConnectivityStateListener) {
  922. const listenerIndex = this.stateListeners.indexOf(listener);
  923. if (listenerIndex > -1) {
  924. this.stateListeners.splice(listenerIndex, 1);
  925. }
  926. }
  927. addDisconnectListener(listener: () => void) {
  928. this.disconnectListeners.add(listener);
  929. }
  930. removeDisconnectListener(listener: () => void) {
  931. this.disconnectListeners.delete(listener);
  932. }
  933. /**
  934. * Reset the backoff timeout, and immediately start connecting if in backoff.
  935. */
  936. resetBackoff() {
  937. this.backoffTimeout.reset();
  938. this.transitionToState(
  939. [ConnectivityState.TRANSIENT_FAILURE],
  940. ConnectivityState.CONNECTING
  941. );
  942. }
  943. getAddress(): string {
  944. return this.subchannelAddressString;
  945. }
  946. getChannelzRef(): SubchannelRef {
  947. return this.channelzRef;
  948. }
  949. getRealSubchannel(): this {
  950. return this;
  951. }
  952. }