Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Offload manual executions to workers #11284

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions packages/cli/src/push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { PushPayload, PushType } from '@n8n/api-types';
import type { Application } from 'express';
import { ServerResponse } from 'http';
import type { Server } from 'http';
import { InstanceSettings } from 'n8n-core';
import type { Socket } from 'net';
import { Container, Service } from 'typedi';
import { parse as parseUrl } from 'url';
Expand Down Expand Up @@ -39,7 +40,10 @@ export class Push extends TypedEmitter<PushEvents> {

private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush);

constructor(private readonly orchestrationService: OrchestrationService) {
constructor(
private readonly orchestrationService: OrchestrationService,
private readonly instanceSettings: InstanceSettings,
) {
super();

if (useWebSockets) this.backend.on('message', (msg) => this.emit('message', msg));
Expand Down Expand Up @@ -82,13 +86,23 @@ export class Push extends TypedEmitter<PushEvents> {
}

send<Type extends PushType>(type: Type, data: PushPayload<Type>, pushRef: string) {
/**
* Multi-main setup: In a manual webhook execution, the main process that
* handles a webhook might not be the same as the main process that created
* the webhook. If so, the handler process commands the creator process to
* relay the former's execution lifecycle events to the creator's frontend.
*/
if (this.orchestrationService.isMultiMainSetupEnabled && !this.backend.hasPushRef(pushRef)) {
const isWorker = this.instanceSettings.instanceType === 'worker';

const isMainOnMultiMain =
this.instanceSettings.instanceType === 'main' &&
this.orchestrationService.isMultiMainSetupEnabled;

if (isWorker || (isMainOnMultiMain && !this.backend.hasPushRef(pushRef))) {
/**
* Worker: Since a worker has no connection to the UI, send a command via
* pubsub so that the main process who holds that session can relay
* execution lifecycle events to it.
*
* Multi-main setup: In a manual webhook execution, the main process that
* handles a webhook might not be the same as the main process that created
* the webhook. If so, the handler process commands the creator process to
* relay the former's execution lifecycle events to the creator's frontend.
*/
const payload = { type, args: data, pushRef };
void this.orchestrationService.publish('relay-execution-lifecycle-event', payload);
return;
Expand Down
12 changes: 11 additions & 1 deletion packages/cli/src/scaling/job-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,23 @@ export class JobProcessor {
executionTimeoutTimestamp,
);

const { pushRef } = job.data;

additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerExecuter(
execution.mode,
job.data.executionId,
execution.workflowData,
{ retryOf: execution.retryOf as string },
{ retryOf: execution.retryOf as string, pushRef },
);

if (pushRef) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unnecessary-type-assertion
additionalData.sendDataToUI = WorkflowExecuteAdditionalData.sendDataToUI.bind({
sessionId: pushRef,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
}) as any;
}

additionalData.hooks.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => {
const msg: RespondToWebhookMessage = {
Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/scaling/scaling.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export type JobId = Job['id'];
export type JobData = {
executionId: string;
loadStaticData: boolean;
pushRef?: string;
};

export type JobResult = {
Expand Down
13 changes: 12 additions & 1 deletion packages/cli/src/workflow-execute-additional-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import type { PushType } from '@n8n/api-types';
import { GlobalConfig } from '@n8n/config';
import { WorkflowExecute } from 'n8n-core';
import { InstanceSettings, WorkflowExecute } from 'n8n-core';
import {
ApplicationError,
ErrorReporterProxy as ErrorReporter,
Expand Down Expand Up @@ -1078,6 +1078,17 @@ export function getWorkflowHooksWorkerExecuter(
hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}

if (mode === 'manual' && Container.get(InstanceSettings).instanceType === 'worker') {
const pushHooks = hookFunctionsPush();
for (const key of Object.keys(pushHooks)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
// eslint-disable-next-line prefer-spread
hookFunctions[key].push.apply(hookFunctions[key], pushHooks[key]);
}
}

return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
}

Expand Down
3 changes: 2 additions & 1 deletion packages/cli/src/workflow-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ export class WorkflowRunner {
this.activeExecutions.attachResponsePromise(executionId, responsePromise);
}

if (this.executionsMode === 'queue' && data.executionMode !== 'manual') {
if (this.executionsMode === 'queue') {
// Do not run "manual" executions in bull because sending events to the
// frontend would not be possible
await this.enqueueExecution(executionId, data, loadStaticData, realtime);
Expand Down Expand Up @@ -369,6 +369,7 @@ export class WorkflowRunner {
const jobData: JobData = {
executionId,
loadStaticData: !!loadStaticData,
pushRef: data.pushRef,
};

if (!this.scalingService) {
Expand Down
Loading