Skip to content

Commit

Permalink
Fix memory leak in task manager task runner (elastic#193612)
Browse files Browse the repository at this point in the history
In this PR, I'm fixing a memory leak that was introduced in
elastic#190093 where every task runner
class object wouldn't free up in memory because it subscribed to the
`pollIntervalConfiguration$` observable. To fix this, I moved the
observable up a class into `TaskPollingLifecycle` which only gets
created once on plugin start and then pass down the pollInterval value
via a function call the task runner class can call.
  • Loading branch information
mikecote authored Sep 20, 2024
1 parent 1a192bc commit cf6e8b5
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 14 deletions.
9 changes: 6 additions & 3 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
private taskClaiming: TaskClaiming;
private bufferedStore: BufferedTaskStore;
private readonly executionContext: ExecutionContextStart;
private readonly pollIntervalConfiguration$: Observable<number>;

private logger: Logger;
public pool: TaskPool;
Expand All @@ -95,6 +94,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven

private usageCounter?: UsageCounter;
private config: TaskManagerConfig;
private currentPollInterval: number;

/**
* Initializes the task manager, preventing any further addition of middleware,
Expand Down Expand Up @@ -123,7 +123,10 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
this.executionContext = executionContext;
this.usageCounter = usageCounter;
this.config = config;
this.pollIntervalConfiguration$ = pollIntervalConfiguration$;
this.currentPollInterval = config.poll_interval;
pollIntervalConfiguration$.subscribe((pollInterval) => {
this.currentPollInterval = pollInterval;
});

const emitEvent = (event: TaskLifecycleEvent) => this.events$.next(event);

Expand Down Expand Up @@ -225,7 +228,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
config: this.config,
allowReadingInvalidState: this.config.allow_reading_invalid_state,
strategy: this.config.claim_strategy,
pollIntervalConfiguration$: this.pollIntervalConfiguration$,
getPollInterval: () => this.currentPollInterval,
});
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import _ from 'lodash';
import sinon from 'sinon';
import { secondsFromNow } from '../lib/intervals';
import { asOk, asErr } from '../lib/result_type';
import { BehaviorSubject } from 'rxjs';
import {
createTaskRunError,
TaskErrorSource,
Expand Down Expand Up @@ -2502,7 +2501,7 @@ describe('TaskManagerRunner', () => {
}),
allowReadingInvalidState: opts.allowReadingInvalidState || false,
strategy: opts.strategy ?? CLAIM_STRATEGY_UPDATE_BY_QUERY,
pollIntervalConfiguration$: new BehaviorSubject(500),
getPollInterval: () => 500,
});

if (stage === TaskRunningStage.READY_TO_RUN) {
Expand Down
14 changes: 5 additions & 9 deletions x-pack/plugins/task_manager/server/task_running/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
* rescheduling, middleware application, etc.
*/

import { Observable } from 'rxjs';
import apm from 'elastic-apm-node';
import { v4 as uuidv4 } from 'uuid';
import { withSpan } from '@kbn/apm-utils';
Expand Down Expand Up @@ -113,7 +112,7 @@ type Opts = {
config: TaskManagerConfig;
allowReadingInvalidState: boolean;
strategy: string;
pollIntervalConfiguration$: Observable<number>;
getPollInterval: () => number;
} & Pick<Middleware, 'beforeRun' | 'beforeMarkRunning'>;

export enum TaskRunResult {
Expand Down Expand Up @@ -166,7 +165,7 @@ export class TaskManagerRunner implements TaskRunner {
private config: TaskManagerConfig;
private readonly taskValidator: TaskValidator;
private readonly claimStrategy: string;
private currentPollInterval: number;
private getPollInterval: () => number;

/**
* Creates an instance of TaskManagerRunner.
Expand All @@ -192,7 +191,7 @@ export class TaskManagerRunner implements TaskRunner {
config,
allowReadingInvalidState,
strategy,
pollIntervalConfiguration$,
getPollInterval,
}: Opts) {
this.instance = asPending(sanitizeInstance(instance));
this.definitions = definitions;
Expand All @@ -212,10 +211,7 @@ export class TaskManagerRunner implements TaskRunner {
allowReadingInvalidState,
});
this.claimStrategy = strategy;
this.currentPollInterval = config.poll_interval;
pollIntervalConfiguration$.subscribe((pollInterval) => {
this.currentPollInterval = pollInterval;
});
this.getPollInterval = getPollInterval;
}

/**
Expand Down Expand Up @@ -656,7 +652,7 @@ export class TaskManagerRunner implements TaskRunner {
startedAt: this.instance.task.startedAt,
schedule: updatedTaskSchedule,
},
this.currentPollInterval
this.getPollInterval()
),
state,
schedule: updatedTaskSchedule,
Expand Down

0 comments on commit cf6e8b5

Please sign in to comment.