Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Nodejs clustering to take advantage of multi-core hardware #68626

Closed
5 of 21 tasks
rudolf opened this issue Jun 9, 2020 · 19 comments
Closed
5 of 21 tasks

Use Nodejs clustering to take advantage of multi-core hardware #68626

rudolf opened this issue Jun 9, 2020 · 19 comments
Assignees
Labels
enhancement New value added to drive a business result impact:low Addressing this issue will have a low level of impact on the quality/strength of our product. loe:small Small Level of Effort Meta performance Team:Core Core services & architecture: plugins, logging, config, saved objects, http, ES client, i18n, etc

Comments

@rudolf
Copy link
Contributor

rudolf commented Jun 9, 2020

Kibana uses a single Node process/thread to serve HTTP traffic. This means Kibana cannot take advantage of multi-core hardware making it expensive to scale out Kibana.

This should be an optional way to run Kibana since users running Kibana inside docker containers might choose to rather use their container orchestration to run a container (with a single kibana process) per host CPU.

Link to RFC: #94057
POC PR: #93380

Phase 0: Investigation

Phase 1: Initial implementation

Phase 2: Beta

  • Enable clustering in production as an opt-in beta feature
  • Evaluate & implement telemetry metrics, if any

Phase 3: GA

  • Enable clustering by default
  • Finalize full user-facing documentation
@rudolf rudolf added the Team:Core Core services & architecture: plugins, logging, config, saved objects, http, ES client, i18n, etc label Jun 9, 2020
@elasticmachine
Copy link
Contributor

Pinging @elastic/kibana-platform (Team:Platform)

@pgayvallet
Copy link
Contributor

Is this under platform or operation's scope?

@joshdover
Copy link
Contributor

This is Platform. Operations is continuing to focus more on Build & CI rather than server performance or runtime.

@pgayvallet
Copy link
Contributor

So what exactly do we want here? Would we just spawn multiple forks that are exact copies of the master process and let the node cluster implementation share/dispatch http calls on the listening port to them, or do we need something more finely grained? Are there some tasks that should only be performed from the master?

An example coming to mind would be telemetry: would each fork send the telemetry data on their own, or should the forks send their data to the master using IPC, which would be aggregated and sent as a single request from the master?

@rudolf
Copy link
Contributor Author

rudolf commented Jun 15, 2020

I don't think we need anything fancy, just letting node cluster load balance across several processes. And then we need to make sure everything keeps working the way it should, like that multiple processes logging to a single file keeps on working.

For plugins, the only kind of work I can think of that might cause problems is if there's a process that relies on kibana.uuid being unique per process. We should probably audit things with task queues like reporting, task manager, alerting, telemetry.

We could expose an cluster.isMaster flag to all plugins so that plugins could optimize the work they do. Saved object migrations would be another thing that would be better to only run on the master.

@kobelb
Copy link
Contributor

kobelb commented Jun 16, 2020

This potentially complicates the concurrency controls that Reporting takes advantage of via esqueue currently and task-manager in the future.

When a report is run, it spawns a headless browser that opens Kibana and takes a screenshot. This headless browser consumes quite a bit of memory and CPU from the server running Kibana. As such, Reporting has limits in place that only allow a single instance of the browser per Kibana process to be run at a time. When there are multiple Kibana processes running on the same machine, this becomes more complicated. Allowing Reporting to only run on the "master" seems reasonable enough.

Are we specifying --max-old-space-size for any of the Kibana distributable of OS packages? AFAIK, this argument will be passed all child-processes, so this might behave unexpectedly.

@mshustov
Copy link
Contributor

mshustov commented Nov 5, 2020

In the 7.12 release, the team is going to investigate the basic architecture and possible breaking changes.

@pgayvallet
Copy link
Contributor

We need to take into account logging here (especially rotating logs #56291)

  • should every 'instance' of the local cluster writes to a distinct file?
  • should they all write to the same file?
    • in that case, log rotation should only be performed from a given 'master', and the other instances needs to be notified when the rolling occurs to reload their file descriptor, as it was done for legacy logging.
    • do we want 'child' instances to communicate to the 'master' to have only one instance effectively writing to disk (would require a significant change in the logging architecture)

@pgayvallet
Copy link
Contributor

Summary of the preliminary investigation (see the POC PR: #93380)

Architecture summary

In 'classic' mode, the Kibana server is started in the main NodeJS process.

Screenshot 2021-03-03 at 16 52 50

In clustering mode, the main NodeJS process would only start the coordinator, which would then fork workers using the cluster API. NodeJS underlying socket implementation allows multiple processes to listen to the same ports, performing http traffic balancing between the workers for us.

Screenshot 2021-03-03 at 16 53 12

The coordinator sole responsibility is to orchestrate the workers. It would not be a 'super' worker handling both the job of a worker while being in charge of managing the other workers.

Cross-worker communication

We will be adding a new clustering service to core, that will add the necessary cluster APIs

For some of our changes (such as the /status API, see below), we will need some kind of cross-worker communication. Such communication will need to pass through the coordinator, which will also serve as an 'event bus'.

The base API for such communication will be exposed from the clustering service.

export interface ClusteringServiceSetup {
  // [...]
  broadcast: (type: string, payload?: ClusterMessagePayload, options?: BroadcastOptions) => void;
  addMessageHandler: (type: string, handler: MessageHandler) => MessageHandlerUnsubscribeFn;
}

Note: to reduce cluster/non-cluster mode divergence, in non-clustering mode, these APIs would just be no-ops. It will avoid to force (most) code to check in which mode Kibana is running before calling them.

Note: we could eventually use an Observable pattern instead of an handler one for addMessageHandler.

Executing code on a single worker

In some scenario, we would like to have some part of the code executed from a single process. The SO migration would be a good example: we don't need to have each worker try to perform the migration, and we'd prefer to have one performing/trying the migration, and the other wait for it. Due to the architecture, we can't have the coordinator perform such single-process jobs, as it doesn't actually run a Kibana's Server.

There are various ways to address such use-case. What seems to be the best compromise right know would be the concept of 'main worker'. The coordinator would arbitrary elect a worker as the 'main' one. The clustering service would then expose an API to let workers identify themselves as main or not.

Note: in non-clustering mode, isMainWorker would always return true, to reduce cluster/non-cluster mode divergence.

export interface ClusteringServiceSetup {
  // [...]
  isMainWorker: () => boolean;
}

To take the example of SO migration, the KibanaMigrator.runMigrations implementation could change to (naive implementation, not even handling returned promise properly here, also message handler should probably be registered earlier to be sure we do not miss the message)

runMigration() {
   if(clustering.isMainWorker()) {
     this.runMigrationsInternal().then((result) => {
        applyMigrationState(result);
        clustering.broadcast('migration-complete', { payload: result }, { persist: true }); // persist: true will send message even if subscriber subscribe after the message was actually sent
      })
   }
   else {
     const unsubscribe = clustering.addMessageHandler('migration-complete', ({ payload: result }) => {
       applyMigrationState(result);
     });
   }
}

Sharing state between workers

Not sure if we will really need that, or if IPC broadcast will be sufficient. If we do need shared state, we will probably have to use syscall libraries to share buffers such as mmap-io, and expose an higher level API for that from core.

Performance testing

Test performed against a local development machine, with a 8-core cpu (2.4 GHz 8-Core Intel Core i9 - 32 GB 2400 MHz DDR4).

Non-cluster mode

perf-no-clustering

Cluster, 2 workers

perf-clustering-2-worker

Cluster, 4 workers

perf-4-workers

  • Between non-clustered and 2-worker cluster mode, we observe a 20/25% gain in the 50th percentile response time. Gain for the 75th and 95th are between 10% and 40%

  • Between 2-worker and 4-workers cluster mode, the gain on 50th is negligible, but the 75th and the 95th and significantly better on the 4-workers results, sometimes up to 100% gain (factor 2 ratio)

There is currently no easy way to test the performance improvements this could provide on Cloud. On Cloud, Kibana is running in a containerised environment using CPU CFS quota and CPU shares. If we want to investigate perf improvement on Cloud further, our only option currently is to setup a similar-ish environment locally (which wasn't done during the initial investigation)

Technical impacts in Core

Handling multi-process logs

We need to decide how we will be handling multiple processes outputting logs.

Example of log output in a 2 workers cluster:

[2021-03-02T10:23:41.834+01:00][INFO ][plugins-service] Plugin initialization disabled.
[2021-03-02T10:23:41.840+01:00][INFO ][plugins-service] Plugin initialization disabled.
[2021-03-02T10:23:41.900+01:00][WARN ][savedobjects-service] Skipping Saved Object migrations on startup. Note: Individual documents will still be migrated when read or written.
[2021-03-02T10:23:41.903+01:00][WARN ][savedobjects-service] Skipping Saved Object migrations on startup. Note: Individual documents will still be migrated when read or written.

The workers logs are interleaved, and, most importantly, there is no way to see which process the log is coming from. We will need to address that.

Identified options currently are:

  • Have a distinct logging configuration for each worker

We could do that by automatically adding a suffix depending on the process to the appropriate appenders configuration. Note that this doesn’t solve the problem for the console appender, which is probably a no-go.

  • Add the process info to the log pattern and output it with the log message

We could add the process name information to the log messages, and add a new conversion to be able to display it with the pattern layout, such as %process for example.

the default pattern could evolve to (ideally, only when clustering is enabled)

[%date][%level][%process][%logger] %message

This will require some changes in the logging system implementation, as currently the BaseLogger has no logic to enhance the log record with context information before forwarding it to its appenders, but looks like the correct solution.

The rolling file appender

The rolling process of the rolling-file appender is going to be problematic in clustered mode, as it will cause concurrency issues during the rolling. We need to find a way to have this rolling process clustered-proof.

Identified options are:

  • have the rolling file appenders coordinate themselves when rolling

By using a broadcast message based mutex mechanism, the appenders could acquire a ‘lock’ to roll a specific file, and release the lock once the rotation is done.

  • have the coordinator process perform the rolling

Another option would be to have the coordinator perform the rotation instead. When a rolling is required, the appender would send a message to the coordinator, which would perform the rolling and notify the workers once the process is complete. Note that this option is even more complicated than the previous one, as it forces to move the rolling implementation outside of the appender, without any significant upsides.

  • centralize the logging system in the coordinator (and have the workers send messages to the coordinator when using the logging system)

We could go further, and change the way the logging system works in clustering mode by having the coordinator centralize the logging system. The worker’s logger implementation would just send messages to the coordinator. If this may be a correct design, the main downside is that the logging implementation would be totally different in cluster and non cluster mode, and is overall way more work that the other options.

If no option is trivial, I feel like option 1) is still the most pragmatic one. Option 3) is probably a better design, but represents more work. It would probably be fine if all our appenders were impacted, but as only the rolling-file one is, I feel like going with 1) would be ok.

The status API

In clustering mode, the workers will all have an individual status. One could have a connectivity issue with ES while the other ones are green. Hitting the /status endpoint will reach a random (and different each time) worker, meaning that it would not be possible to know the status of the cluster as a whole.

We will need to add a centralized status service in the coordinator. Also, as the /status endpoint cannot be served from the coordinator, we will also need to have the workers retrieve the global status from the coordinator to serve the status endpoint.

We may also want to have the /status endpoint display each individual worker status in addition to the global status, which may be breaking change in the /status API response format.

PID file

Without changes, each worker is going to try to write and read the same PID file. Also, this breaks the whole pid file usage, as the PID stored in the file will be a arbitrary worker’s PID, instead of the coordinator (main process) PID.

In clustering mode, we will need to have to coordinator handle the PID file logic, and to disable pid file handling in the worker's environment service.

SavedObjects migration

In the current state, all workers are going to try to perform the migration. Ideally, we would have only one process perform the migration, and the other ones just wait for a ready signal. We can’t easily have the coordinator do it, so we would probably have to leverage the ‘main worker’ concept to do so.

The SO migration v2 is supposed to be resilient to concurrent attempts though, as we already support multi-instances Kibana, so this probably can be considered as an improvement (cc @rudolf)

Open questions / things to solve

Memory consumption

In cluster mode, node options such as max-old-space-size will be used by all processes. E.g using --max-old-space-size=1024 in a 2 workers cluster would have a maximum memory usage of 3gb (1coordinator + 2workers). If this something we will need to document somewhere?

Workers error handling

Usually when using cluster, the coordinator is supposed to recreate workers when they terminate unexpectedly. However given Kibana's architecture, some failures are not recoverable (workers failing because of config validation, failed migration...). Should we try to distinguish recoverable and non-recoverable errors, or are we good terminating the main Kibana process when any worker terminates unexpectedly?

Data folder

The data folder (path.data) is currently the same for all workers. We still have to identify with the teams if this is going to be a problem, in which case we would have to create and use distinct data folders per worker.

One easy solution would be, when clustering is enabled, to create a sub folder under path.data for each worker. If we need to do so, should this be considered a breaking change, or do we assume the usage of the data folder is an implementation detail and not part of our public ‘API’?

instanceUUID

The same instance UUID (server.uuid / {dataFolder}/uuid) is currently used by all the workers. We still need to identify with the teams if this is going to be a problem, in which case we would need to have distinct instance uuid per workers.

Note that this could be a breaking change, as the single server.uuid configuration property would not be enough. We may want to introduce a new workerId variable and associated API to expose to plugins?

The Dev CLI

In development mode, we are already spawning processes from the main process: The Kibana server running in the main process actually just kickstarts core to get the configuration and services required to instantiate the CliDevMode (from the legacy service), which itself spawns a child process to run the actual Kibana server.

Even if not technically blocking, it would greatly simplify parts of the workers and coordinator logic if we were able to finally extract the dev cli to no longer depend on core and to instantiate a 'temporary' server to access the Legacy service. Note that extracting and refactoring the dev cli is going to be required anyway to remove the last bits of legacy, as it currently relies on the legacy configuration to run (and is running from the legacy service).

Technical impact on plugins

Identifying things that may break

  • Concurrent access to the same resources

Is there, for example, some part of the code that is accessing and writing files from the data folder (or anywhere else) and makes the assumption that it is the sole process actually writing to that file?

  • Using instanceUUID as a unique Kibana process identifier

Is there, for example, schedulers that are using the instanceUUID a single process id, in opposition to a single Kibana instance id? Are there situations where having the same instance UUID for all the workers is going to be a problem?

  • Things needing to run only once per Kibana instance

Is there any part of the code that needs to be executed only once in a multi-worker mode, such as initialization code, or starting schedulers? An example would be reporting's queueFactory pooling. As we want to only be running a single headless at a time per Kibana instance, only one worker should have pooling enabled.

Identified required changes

Probably not exhaustive for now. (plugin owners, we need your help here!)

  • Reporting

We will probably want to restrict to a single headless per Kibana instance (see #68626 (comment)). For that, we will have to change the logic in createQueueFactory to only have the 'main' worker pool for reporting tasks.

  • Telemetry

Sending the data to the remote telemetry server should probably only be performed from a single worker, see FetcherTask

Note that it seems that sending the data multiple times doesn’t have any real consequences, so this should be considered non-blocking and only an improvement.

  • TaskManager

Do we want the task scheduler to be running on a single worker? (see TaskClaiming, TaskScheduling, TaskPollingLifecycle)

  • Alerting

Do we need the alerting task runner (TaskRunner, TaskRunnerFactory) to be running on a single worker?

Summary of the user-facing breaking changes

  • status API

If we want to have the status API returns each individual worker's status, we will need to change the output of the status API in clustering mode. Note that the new format for /api/status is still behind a v8format flag, meaning that if we do these changes before 8.0, we won't be introducing any breaking change later.

  • instanceUUID

Depending on our decision regarding the instanceUUID problematic, we may have to change the server.uuid configuration property when clustering mode is enabled, which would be a breaking change.

  • distinct logging configuration

If we decide to have each worker output log in distinct files, we would have to change the logging configuration to add a prefix/suffix to each log file, which would be a breaking change. Note that this option is probably not the one we'll choose.

  • data folder

We may need to have each worker uses a distinct data folder. Should this be considered a breaking change?

@Bamieh
Copy link
Member

Bamieh commented Mar 8, 2021

@pgayvallet Thank you for this awesome write up!

For telemetry we have two main areas affected by enabling clustering:

  1. Server side fetcher: The telemetry/server/fetcher.ts will attempt sending the telemtery usage multiple times once per day from each process. We do store a state in the SavedObjects store of the last time the usage was sent to prevent sending multiple times (although race conditions might occur).

  2. Tasks storing telemetry data: We have tasks across several plugins storing data in savedobjects specifically for telemetry. Under clustering these tasks will be registered multiple times.

Fixing the points above is an optimizations rather than blockers. We already handle receiving multiple reports and storing the data multiple times.

@afharo
Copy link
Member

afharo commented Mar 8, 2021

I'll add to @Bamieh's comment another use case for telemetry: rollups (I don't know if it was considered in point 2.) :)

@afharo
Copy link
Member

afharo commented Mar 8, 2021

Regarding routing, I'd like to add another use case: from time to time, we reconsider adding support to websockets in Kibana. When running an API in cluster mode, it requires some tweaks to the HTTP server, one of them is handling sticky-sessions.

This changes the way the HTTP server is bootstrapped: The coordinator opens the port and adds a piece of logic to pick up the worker that should handle the incoming request:

/** Coordinator */
const workers = [...]

// Open up the server and send sockets to child. Use pauseOnConnect to prevent
// the sockets from being read before they are sent to the child process.
const server = require('net').createServer({ pauseOnConnect: true });
server.on('connection', (socket) => {
  const worker = findStickySessionWorker();  
  worker.send('socket', socket);
});
server.listen(1337);

/** Worker */
process.on('message', (m, socket) => {
  if (m === 'socket') {
    if (socket) {
      // Check that the client socket exists.
      // It is possible for the socket to be closed between the time it is
      // sent and the time it is received in the child process.
      socket.end(`Request handled with ${process.argv[2]} priority`);
    }
  }
});

This could be a solution to the /status request if we can serve those ones straight away from the coordinator. Although I'm not entirely sure because the snippet from above didn't read the socket before passing it through to the workers, and it may add some overhead to each request, potentially reducing the performance boost we are looking for.

@pmuellr
Copy link
Member

pmuellr commented Mar 8, 2021

I'm not sure alerting itself is affected, but presumably task manager is, and so alerting (and other things) are indirectly affected via task manager. I'll just focus on task manager here.

Currently task manager does "claims" for jobs to run based on the server uuid. I think this would still work with workers - each task manager in the worker would be doing "claims" for the same server uuid, which I think is basically the same as setting your max_workers to "current max_workers * number of workers". We'd want to try to stagger the times these run, otherwise they would likely be stepping all over each other, causing a lot of unnecessary "already claimed" hits when we poll for available work.

Probably the "real" story for this should be a little more involved. We probably only want one worker doing the task claims, and then when it gets tasks to run, sends them to a worker to run. But this seems like we'd need to have some separate management of those worker-based tasks, since we want to distribute the load.

That implies having some introspection on when workers die, and when new workers start up to replace them. Or is that not going to happen? We start n workers at startup, and if any "die", we terminate Kibana?

@pgayvallet
Copy link
Contributor

I'm not sure alerting itself is affected, but presumably task manager is, and so alerting (and other things) are indirectly affected via task manager. I'll just focus on task manager here.

It seems all things registering task to the task manager are affected too, as the tasks are going to be registered multiple times?

We probably only want one worker doing the task claims, and then when it gets tasks to run, sends them to a worker to run.

Yea, mid/long term, this is the kind of architecture we'd like to go to. Shorter term, just executing the tasks on a single worker is probably good enough

That implies having some introspection on when workers die, and when new workers start up to replace them. Or is that not going to happen? We start n workers at startup, and if any "die", we terminate Kibana?

This is still opened to discussion, see the Workers error handling section.

@pmuellr
Copy link
Member

pmuellr commented Mar 8, 2021

It seems all things registering task to the task manager are affected too, as the tasks are going to be registered multiple times?

For cases where tasks are scheduled at plugin startup, we have an API to ensure the task only gets scheduled once (one task document created) - ensureScheduled(). We need that for multi-Kibana deployments anyway. Other task usage, like scheduling an action to run when an alert is triggered, uses a different flow where a new task document is always created. But those aren't a problem, because only one Kibana process will be processing the alert and figuring out the action needs to be scheduled. So I don't think that is a problem.

Yea, mid/long term, this is the kind of architecture we'd like to go to. Shorter term, just executing the tasks on a single worker is probably good enough

We actually have an issue open to allow configuring task manager with zero task workers. Basically for diagnostic reasons, because lots of stuff will stop working if no tasks will ever run. We perhaps we could use this to run task manager "normally" in the "main" process, but not run it in the workers by setting task workers to zero. Perhaps the "main" process would end up being reserved for one-off things that can't/shouldn't run in worker processes.

@pgayvallet
Copy link
Contributor

pgayvallet commented Mar 9, 2021

Perhaps the "main" process would end up being reserved for one-off things that can't/shouldn't run in worker processes.

The 'main' process is the coordinator. It doesn't load plugins, nor even start an actual Kibana server, so this would not be possible. This is why the 'main worker' concept was introduced.

We could eventually change our Plugin and core services API to allow loading distinct code on the coordinator and the workers, but this seems overcomplicated and hard to 'backport' to non-cluster mode compared to the 'main worker' trick that is just always true on non-cluster mode.

@pgayvallet
Copy link
Contributor

After a discussion with @kobelb, we decided to change the format to an RFC to have it go through an architecture review.

The discussion should continue on #94057

@AlexP-Elastic
Copy link

@rudolf

Sync with Cloud team to understand the impact on cloud. Does each Kibana instance only get on core? Should we disable clustering on cloud, or is there any benefit to having two processes to take advantage of hyperthreading?

The way it works in cloud is that you get access to all cores BUT you have a CFS quota that is between 308,433 microseconds (per 100K microsecond slices) "fully boosted" and 46,264 microsecond ("boost" depleted, 1GB RAM instance; 69,397 microseconds 2GB; 92,529 microseconds 4GB; 185,059 microseconds 8GB). The boosting is a bit complex but it is likely you can assume a Kibana instance will normally have the boost quota vs the base one

So I think you will get at least some benefit?

If you want to benchmark to understand better, you can just run the docker kibana with --cpu-period="100000" and --cpu-quota="<value from above>" and --memory=<corresponding instance size>

@lukeelmers
Copy link
Member

Closing at not planned for now since our main reason for pursuing it in the near term was to isolate background tasks, which we instead achieved by introducing node.roles and running a duplicate Kibana process. We can re-open if we feel this becoming a higher priority in the future.

@lukeelmers lukeelmers closed this as not planned Won't fix, can't repro, duplicate, stale Oct 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New value added to drive a business result impact:low Addressing this issue will have a low level of impact on the quality/strength of our product. loe:small Small Level of Effort Meta performance Team:Core Core services & architecture: plugins, logging, config, saved objects, http, ES client, i18n, etc
Projects
Status: Done (7.13)
Development

No branches or pull requests