dispatcher.js 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", {
  3. value: true
  4. });
  5. exports.Dispatcher = void 0;
  6. var _ipc = require("../common/ipc");
  7. var _utils = require("playwright-core/lib/utils");
  8. var _workerHost = require("./workerHost");
  9. /**
  10. * Copyright Microsoft Corporation. All rights reserved.
  11. *
  12. * Licensed under the Apache License, Version 2.0 (the "License");
  13. * you may not use this file except in compliance with the License.
  14. * You may obtain a copy of the License at
  15. *
  16. * http://www.apache.org/licenses/LICENSE-2.0
  17. *
  18. * Unless required by applicable law or agreed to in writing, software
  19. * distributed under the License is distributed on an "AS IS" BASIS,
  20. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  21. * See the License for the specific language governing permissions and
  22. * limitations under the License.
  23. */
  24. class Dispatcher {
  25. constructor(config, reporter, failureTracker) {
  26. this._workerSlots = [];
  27. this._queue = [];
  28. this._queuedOrRunningHashCount = new Map();
  29. this._finished = new _utils.ManualPromise();
  30. this._isStopped = true;
  31. this._allTests = [];
  32. this._config = void 0;
  33. this._reporter = void 0;
  34. this._failureTracker = void 0;
  35. this._extraEnvByProjectId = new Map();
  36. this._producedEnvByProjectId = new Map();
  37. this._config = config;
  38. this._reporter = reporter;
  39. this._failureTracker = failureTracker;
  40. }
  41. async _scheduleJob() {
  42. // 1. Find a job to run.
  43. if (this._isStopped || !this._queue.length) return;
  44. const job = this._queue[0];
  45. // 2. Find a worker with the same hash, or just some free worker.
  46. let index = this._workerSlots.findIndex(w => !w.busy && w.worker && w.worker.hash() === job.workerHash && !w.worker.didSendStop());
  47. if (index === -1) index = this._workerSlots.findIndex(w => !w.busy);
  48. // No workers available, bail out.
  49. if (index === -1) return;
  50. // 3. Claim both the job and the worker, run the job and release the worker.
  51. this._queue.shift();
  52. this._workerSlots[index].busy = true;
  53. await this._startJobInWorker(index, job);
  54. this._workerSlots[index].busy = false;
  55. // 4. Check the "finished" condition.
  56. this._checkFinished();
  57. // 5. We got a free worker - perhaps we can immediately start another job?
  58. void this._scheduleJob();
  59. }
  60. async _startJobInWorker(index, job) {
  61. const stopCallback = () => this.stop().catch(() => {});
  62. const jobDispatcher = new JobDispatcher(job, this._reporter, this._failureTracker, stopCallback);
  63. if (jobDispatcher.skipWholeJob()) return;
  64. let worker = this._workerSlots[index].worker;
  65. // 1. Restart the worker if it has the wrong hash or is being stopped already.
  66. if (worker && (worker.hash() !== job.workerHash || worker.didSendStop())) {
  67. await worker.stop();
  68. worker = undefined;
  69. if (this._isStopped)
  70. // Check stopped signal after async hop.
  71. return;
  72. }
  73. this._workerSlots[index].jobDispatcher = jobDispatcher;
  74. // 2. Start the worker if it is down.
  75. let startError;
  76. if (!worker) {
  77. worker = this._createWorker(job, index, (0, _ipc.serializeConfig)(this._config));
  78. this._workerSlots[index].worker = worker;
  79. worker.on('exit', () => this._workerSlots[index].worker = undefined);
  80. startError = await worker.start();
  81. if (this._isStopped)
  82. // Check stopped signal after async hop.
  83. return;
  84. }
  85. // 3. Run the job.
  86. if (startError) jobDispatcher.onExit(startError);else jobDispatcher.runInWorker(worker);
  87. const result = await jobDispatcher.jobResult;
  88. this._workerSlots[index].jobDispatcher = undefined;
  89. this._updateCounterForWorkerHash(job.workerHash, -1);
  90. // 4. When worker encounters error, we stop it and create a new one.
  91. // We also do not keep the worker alive if it cannot serve any more jobs.
  92. if (result.didFail) void worker.stop(true /* didFail */);else if (this._isWorkerRedundant(worker)) void worker.stop();
  93. // 5. Possibly schedule a new job with leftover tests and/or retries.
  94. if (!this._isStopped && result.newJob) {
  95. this._queue.unshift(result.newJob);
  96. this._updateCounterForWorkerHash(job.workerHash, +1);
  97. }
  98. }
  99. _checkFinished() {
  100. if (this._finished.isDone()) return;
  101. // Check that we have no more work to do.
  102. if (this._queue.length && !this._isStopped) return;
  103. // Make sure all workers have finished the current job.
  104. if (this._workerSlots.some(w => w.busy)) return;
  105. this._finished.resolve();
  106. }
  107. _isWorkerRedundant(worker) {
  108. let workersWithSameHash = 0;
  109. for (const slot of this._workerSlots) {
  110. if (slot.worker && !slot.worker.didSendStop() && slot.worker.hash() === worker.hash()) workersWithSameHash++;
  111. }
  112. return workersWithSameHash > this._queuedOrRunningHashCount.get(worker.hash());
  113. }
  114. _updateCounterForWorkerHash(hash, delta) {
  115. this._queuedOrRunningHashCount.set(hash, delta + (this._queuedOrRunningHashCount.get(hash) || 0));
  116. }
  117. async run(testGroups, extraEnvByProjectId) {
  118. this._extraEnvByProjectId = extraEnvByProjectId;
  119. this._queue = testGroups;
  120. this._allTests = testGroups.map(g => g.tests).flat();
  121. for (const group of testGroups) this._updateCounterForWorkerHash(group.workerHash, +1);
  122. this._isStopped = false;
  123. this._workerSlots = [];
  124. // 0. Stop right away if we have reached max failures.
  125. if (this._failureTracker.hasReachedMaxFailures()) void this.stop();
  126. // 1. Allocate workers.
  127. for (let i = 0; i < this._config.config.workers; i++) this._workerSlots.push({
  128. busy: false
  129. });
  130. // 2. Schedule enough jobs.
  131. for (let i = 0; i < this._workerSlots.length; i++) void this._scheduleJob();
  132. this._checkFinished();
  133. // 3. More jobs are scheduled when the worker becomes free.
  134. // 4. Wait for all jobs to finish.
  135. await this._finished;
  136. }
  137. _createWorker(testGroup, parallelIndex, loaderData) {
  138. const projectConfig = this._config.projects.find(p => p.id === testGroup.projectId);
  139. const outputDir = projectConfig.project.outputDir;
  140. const worker = new _workerHost.WorkerHost(testGroup, parallelIndex, loaderData, this._extraEnvByProjectId.get(testGroup.projectId) || {}, outputDir);
  141. const handleOutput = params => {
  142. var _this$_workerSlots$pa;
  143. const chunk = chunkFromParams(params);
  144. if (worker.didFail()) {
  145. // Note: we keep reading stdio from workers that are currently stopping after failure,
  146. // to debug teardown issues. However, we avoid spoiling the test result from
  147. // the next retry.
  148. return {
  149. chunk
  150. };
  151. }
  152. const currentlyRunning = (_this$_workerSlots$pa = this._workerSlots[parallelIndex].jobDispatcher) === null || _this$_workerSlots$pa === void 0 ? void 0 : _this$_workerSlots$pa.currentlyRunning();
  153. if (!currentlyRunning) return {
  154. chunk
  155. };
  156. return {
  157. chunk,
  158. test: currentlyRunning.test,
  159. result: currentlyRunning.result
  160. };
  161. };
  162. worker.on('stdOut', params => {
  163. const {
  164. chunk,
  165. test,
  166. result
  167. } = handleOutput(params);
  168. result === null || result === void 0 ? void 0 : result.stdout.push(chunk);
  169. this._reporter.onStdOut(chunk, test, result);
  170. });
  171. worker.on('stdErr', params => {
  172. const {
  173. chunk,
  174. test,
  175. result
  176. } = handleOutput(params);
  177. result === null || result === void 0 ? void 0 : result.stderr.push(chunk);
  178. this._reporter.onStdErr(chunk, test, result);
  179. });
  180. worker.on('teardownErrors', params => {
  181. this._failureTracker.onWorkerError();
  182. for (const error of params.fatalErrors) this._reporter.onError(error);
  183. });
  184. worker.on('exit', () => {
  185. const producedEnv = this._producedEnvByProjectId.get(testGroup.projectId) || {};
  186. this._producedEnvByProjectId.set(testGroup.projectId, {
  187. ...producedEnv,
  188. ...worker.producedEnv()
  189. });
  190. });
  191. return worker;
  192. }
  193. producedEnvByProjectId() {
  194. return this._producedEnvByProjectId;
  195. }
  196. async stop() {
  197. if (this._isStopped) return;
  198. this._isStopped = true;
  199. await Promise.all(this._workerSlots.map(({
  200. worker
  201. }) => worker === null || worker === void 0 ? void 0 : worker.stop()));
  202. this._checkFinished();
  203. }
  204. _reportTestEnd(test, result) {
  205. this._reporter.onTestEnd(test, result);
  206. this._failureTracker.onTestEnd(test, result);
  207. if (this._failureTracker.hasReachedMaxFailures()) this.stop().catch(e => {});
  208. }
  209. }
  210. exports.Dispatcher = Dispatcher;
  211. class JobDispatcher {
  212. constructor(_job, _reporter, _failureTracker, _stopCallback) {
  213. this.jobResult = new _utils.ManualPromise();
  214. this._listeners = [];
  215. this._failedTests = new Set();
  216. this._remainingByTestId = new Map();
  217. this._dataByTestId = new Map();
  218. this._parallelIndex = 0;
  219. this._workerIndex = 0;
  220. this._currentlyRunning = void 0;
  221. this._job = _job;
  222. this._reporter = _reporter;
  223. this._failureTracker = _failureTracker;
  224. this._stopCallback = _stopCallback;
  225. this._remainingByTestId = new Map(this._job.tests.map(e => [e.id, e]));
  226. }
  227. _onTestBegin(params) {
  228. const test = this._remainingByTestId.get(params.testId);
  229. if (!test) {
  230. // TODO: this should never be the case, report an internal error?
  231. return;
  232. }
  233. const result = test._appendTestResult();
  234. this._dataByTestId.set(test.id, {
  235. test,
  236. result,
  237. steps: new Map()
  238. });
  239. result.parallelIndex = this._parallelIndex;
  240. result.workerIndex = this._workerIndex;
  241. result.startTime = new Date(params.startWallTime);
  242. this._reporter.onTestBegin(test, result);
  243. this._currentlyRunning = {
  244. test,
  245. result
  246. };
  247. }
  248. _onTestEnd(params) {
  249. if (this._failureTracker.hasReachedMaxFailures()) {
  250. // Do not show more than one error to avoid confusion, but report
  251. // as interrupted to indicate that we did actually start the test.
  252. params.status = 'interrupted';
  253. params.errors = [];
  254. }
  255. const data = this._dataByTestId.get(params.testId);
  256. if (!data) {
  257. // TODO: this should never be the case, report an internal error?
  258. return;
  259. }
  260. this._dataByTestId.delete(params.testId);
  261. this._remainingByTestId.delete(params.testId);
  262. const {
  263. result,
  264. test
  265. } = data;
  266. result.duration = params.duration;
  267. result.errors = params.errors;
  268. result.error = result.errors[0];
  269. result.status = params.status;
  270. test.expectedStatus = params.expectedStatus;
  271. test.annotations = params.annotations;
  272. test.timeout = params.timeout;
  273. const isFailure = result.status !== 'skipped' && result.status !== test.expectedStatus;
  274. if (isFailure) this._failedTests.add(test);
  275. this._reportTestEnd(test, result);
  276. this._currentlyRunning = undefined;
  277. }
  278. _onStepBegin(params) {
  279. const data = this._dataByTestId.get(params.testId);
  280. if (!data) {
  281. // The test has finished, but steps are still coming. Just ignore them.
  282. return;
  283. }
  284. const {
  285. result,
  286. steps,
  287. test
  288. } = data;
  289. const parentStep = params.parentStepId ? steps.get(params.parentStepId) : undefined;
  290. const step = {
  291. title: params.title,
  292. titlePath: () => {
  293. const parentPath = (parentStep === null || parentStep === void 0 ? void 0 : parentStep.titlePath()) || [];
  294. return [...parentPath, params.title];
  295. },
  296. parent: parentStep,
  297. category: params.category,
  298. startTime: new Date(params.wallTime),
  299. duration: -1,
  300. steps: [],
  301. location: params.location
  302. };
  303. steps.set(params.stepId, step);
  304. (parentStep || result).steps.push(step);
  305. this._reporter.onStepBegin(test, result, step);
  306. }
  307. _onStepEnd(params) {
  308. const data = this._dataByTestId.get(params.testId);
  309. if (!data) {
  310. // The test has finished, but steps are still coming. Just ignore them.
  311. return;
  312. }
  313. const {
  314. result,
  315. steps,
  316. test
  317. } = data;
  318. const step = steps.get(params.stepId);
  319. if (!step) {
  320. this._reporter.onStdErr('Internal error: step end without step begin: ' + params.stepId, test, result);
  321. return;
  322. }
  323. step.duration = params.wallTime - step.startTime.getTime();
  324. if (params.error) step.error = params.error;
  325. steps.delete(params.stepId);
  326. this._reporter.onStepEnd(test, result, step);
  327. }
  328. _onAttach(params) {
  329. const data = this._dataByTestId.get(params.testId);
  330. if (!data) {
  331. // The test has finished, but attachments are still coming. Just ignore them.
  332. return;
  333. }
  334. const attachment = {
  335. name: params.name,
  336. path: params.path,
  337. contentType: params.contentType,
  338. body: params.body !== undefined ? Buffer.from(params.body, 'base64') : undefined
  339. };
  340. data.result.attachments.push(attachment);
  341. }
  342. _failTestWithErrors(test, errors) {
  343. const runData = this._dataByTestId.get(test.id);
  344. // There might be a single test that has started but has not finished yet.
  345. let result;
  346. if (runData) {
  347. result = runData.result;
  348. } else {
  349. result = test._appendTestResult();
  350. this._reporter.onTestBegin(test, result);
  351. }
  352. result.errors = [...errors];
  353. result.error = result.errors[0];
  354. result.status = errors.length ? 'failed' : 'skipped';
  355. this._reportTestEnd(test, result);
  356. this._failedTests.add(test);
  357. }
  358. _massSkipTestsFromRemaining(testIds, errors) {
  359. for (const test of this._remainingByTestId.values()) {
  360. if (!testIds.has(test.id)) continue;
  361. if (!this._failureTracker.hasReachedMaxFailures()) {
  362. this._failTestWithErrors(test, errors);
  363. errors = []; // Only report errors for the first test.
  364. }
  365. this._remainingByTestId.delete(test.id);
  366. }
  367. if (errors.length) {
  368. // We had fatal errors after all tests have passed - most likely in some teardown.
  369. // Let's just fail the test run.
  370. this._failureTracker.onWorkerError();
  371. for (const error of errors) this._reporter.onError(error);
  372. }
  373. }
  374. _onDone(params) {
  375. // We won't file remaining if:
  376. // - there are no remaining
  377. // - we are here not because something failed
  378. // - no unrecoverable worker error
  379. if (!this._remainingByTestId.size && !this._failedTests.size && !params.fatalErrors.length && !params.skipTestsDueToSetupFailure.length && !params.fatalUnknownTestIds && !params.unexpectedExitError) {
  380. this._finished({
  381. didFail: false
  382. });
  383. return;
  384. }
  385. for (const testId of params.fatalUnknownTestIds || []) {
  386. const test = this._remainingByTestId.get(testId);
  387. if (test) {
  388. this._remainingByTestId.delete(testId);
  389. this._failTestWithErrors(test, [{
  390. message: `Test not found in the worker process. Make sure test title does not change.`
  391. }]);
  392. }
  393. }
  394. if (params.fatalErrors.length) {
  395. // In case of fatal errors, report first remaining test as failing with these errors,
  396. // and all others as skipped.
  397. this._massSkipTestsFromRemaining(new Set(this._remainingByTestId.keys()), params.fatalErrors);
  398. }
  399. // Handle tests that should be skipped because of the setup failure.
  400. this._massSkipTestsFromRemaining(new Set(params.skipTestsDueToSetupFailure), []);
  401. if (params.unexpectedExitError) {
  402. // When worker exits during a test, we blame the test itself.
  403. //
  404. // The most common situation when worker exits while not running a test is:
  405. // worker failed to require the test file (at the start) because of an exception in one of imports.
  406. // In this case, "skip" all remaining tests, to avoid running into the same exception over and over.
  407. if (this._currentlyRunning) this._massSkipTestsFromRemaining(new Set([this._currentlyRunning.test.id]), [params.unexpectedExitError]);else this._massSkipTestsFromRemaining(new Set(this._remainingByTestId.keys()), [params.unexpectedExitError]);
  408. }
  409. const retryCandidates = new Set();
  410. const serialSuitesWithFailures = new Set();
  411. for (const failedTest of this._failedTests) {
  412. retryCandidates.add(failedTest);
  413. let outermostSerialSuite;
  414. for (let parent = failedTest.parent; parent; parent = parent.parent) {
  415. if (parent._parallelMode === 'serial') outermostSerialSuite = parent;
  416. }
  417. if (outermostSerialSuite) serialSuitesWithFailures.add(outermostSerialSuite);
  418. }
  419. // If we have failed tests that belong to a serial suite,
  420. // we should skip all future tests from the same serial suite.
  421. const testsBelongingToSomeSerialSuiteWithFailures = [...this._remainingByTestId.values()].filter(test => {
  422. let parent = test.parent;
  423. while (parent && !serialSuitesWithFailures.has(parent)) parent = parent.parent;
  424. return !!parent;
  425. });
  426. this._massSkipTestsFromRemaining(new Set(testsBelongingToSomeSerialSuiteWithFailures.map(test => test.id)), []);
  427. for (const serialSuite of serialSuitesWithFailures) {
  428. // Add all tests from failed serial suites for possible retry.
  429. // These will only be retried together, because they have the same
  430. // "retries" setting and the same number of previous runs.
  431. serialSuite.allTests().forEach(test => retryCandidates.add(test));
  432. }
  433. const remaining = [...this._remainingByTestId.values()];
  434. for (const test of retryCandidates) {
  435. if (test.results.length < test.retries + 1) remaining.push(test);
  436. }
  437. // This job is over, we will schedule another one.
  438. const newJob = remaining.length ? {
  439. ...this._job,
  440. tests: remaining
  441. } : undefined;
  442. this._finished({
  443. didFail: true,
  444. newJob
  445. });
  446. }
  447. onExit(data) {
  448. const unexpectedExitError = data.unexpectedly ? {
  449. message: `Error: worker process exited unexpectedly (code=${data.code}, signal=${data.signal})`
  450. } : undefined;
  451. this._onDone({
  452. skipTestsDueToSetupFailure: [],
  453. fatalErrors: [],
  454. unexpectedExitError
  455. });
  456. }
  457. _finished(result) {
  458. _utils.eventsHelper.removeEventListeners(this._listeners);
  459. this.jobResult.resolve(result);
  460. }
  461. runInWorker(worker) {
  462. this._parallelIndex = worker.parallelIndex;
  463. this._workerIndex = worker.workerIndex;
  464. const runPayload = {
  465. file: this._job.requireFile,
  466. entries: this._job.tests.map(test => {
  467. return {
  468. testId: test.id,
  469. retry: test.results.length
  470. };
  471. })
  472. };
  473. worker.runTestGroup(runPayload);
  474. this._listeners = [_utils.eventsHelper.addEventListener(worker, 'testBegin', this._onTestBegin.bind(this)), _utils.eventsHelper.addEventListener(worker, 'testEnd', this._onTestEnd.bind(this)), _utils.eventsHelper.addEventListener(worker, 'stepBegin', this._onStepBegin.bind(this)), _utils.eventsHelper.addEventListener(worker, 'stepEnd', this._onStepEnd.bind(this)), _utils.eventsHelper.addEventListener(worker, 'attach', this._onAttach.bind(this)), _utils.eventsHelper.addEventListener(worker, 'done', this._onDone.bind(this)), _utils.eventsHelper.addEventListener(worker, 'exit', this.onExit.bind(this))];
  475. }
  476. skipWholeJob() {
  477. // If all the tests in a group are skipped, we report them immediately
  478. // without sending anything to a worker. This avoids creating unnecessary worker processes.
  479. //
  480. // However, if there is at least one non-skipped test in a group, we'll send
  481. // the whole group to the worker process and report tests in the natural order,
  482. // with skipped tests mixed in-between non-skipped. This makes
  483. // for a better reporter experience.
  484. const allTestsSkipped = this._job.tests.every(test => test.expectedStatus === 'skipped');
  485. if (allTestsSkipped && !this._failureTracker.hasReachedMaxFailures()) {
  486. for (const test of this._job.tests) {
  487. const result = test._appendTestResult();
  488. this._reporter.onTestBegin(test, result);
  489. result.status = 'skipped';
  490. this._reportTestEnd(test, result);
  491. }
  492. return true;
  493. }
  494. return false;
  495. }
  496. currentlyRunning() {
  497. return this._currentlyRunning;
  498. }
  499. _reportTestEnd(test, result) {
  500. this._reporter.onTestEnd(test, result);
  501. this._failureTracker.onTestEnd(test, result);
  502. if (this._failureTracker.hasReachedMaxFailures()) this._stopCallback();
  503. }
  504. }
  505. function chunkFromParams(params) {
  506. if (typeof params.text === 'string') return params.text;
  507. return Buffer.from(params.buffer, 'base64');
  508. }