-
Notifications
You must be signed in to change notification settings - Fork 375
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ensure that only registered cluster workers are asked to report metrics #182
base: master
Are you sure you want to change the base?
Changes from all commits
372bb75
7634cdf
564bc13
3ae56ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,20 +10,31 @@ | |
|
||
const cluster = require('cluster'); | ||
const Registry = require('./registry'); | ||
const Gauge = require('./gauge'); | ||
const util = require('./util'); | ||
const aggregators = require('./metricAggregators').aggregators; | ||
|
||
const GET_METRICS_REQ = 'prom-client:getMetricsReq'; | ||
const GET_METRICS_RES = 'prom-client:getMetricsRes'; | ||
const REG_METRICS_WORKER = 'prom-client:registerMetricsWorker'; | ||
|
||
let registries = [Registry.globalRegistry]; | ||
let requestCtr = 0; // Concurrency control | ||
let listenersAdded = false; | ||
const coordinatedWorkers = new Set(); | ||
const requests = new Map(); // Pending requests for workers' local metrics. | ||
|
||
class AggregatorRegistry extends Registry { | ||
constructor() { | ||
/** | ||
Create an AggregatorRegistry instance. Accepts an optional `options` object. | ||
|
||
Options are: | ||
coordinated If false (default), request metrics from all cluster workers. If true, request metrics only from workers that have required prom-client. | ||
* @param {object?} options object | ||
*/ | ||
constructor({ coordinated } = {}) { | ||
super(); | ||
this.coordinated = coordinated || false; | ||
addListeners(); | ||
} | ||
|
||
|
@@ -32,13 +43,15 @@ class AggregatorRegistry extends Registry { | |
* returned Promise resolve with the same value; either may be used. | ||
* @param {Function?} callback (err, metrics) => any | ||
* @return {Promise<string>} Promise that resolves with the aggregated | ||
* metrics. | ||
* metrics. | ||
*/ | ||
clusterMetrics(callback) { | ||
const requestId = requestCtr++; | ||
|
||
return new Promise((resolve, reject) => { | ||
const nWorkers = Object.keys(cluster.workers).length; | ||
const nWorkers = this.coordinated | ||
? coordinatedWorkers.size | ||
: Object.keys(cluster.workers).length; | ||
|
||
function done(err, result) { | ||
// Don't resolve/reject the promise if a callback is provided | ||
|
@@ -56,6 +69,7 @@ class AggregatorRegistry extends Registry { | |
|
||
const request = { | ||
responses: [], | ||
workerCount: nWorkers, | ||
pending: nWorkers, | ||
done, | ||
errorTimeout: setTimeout(() => { | ||
|
@@ -71,7 +85,19 @@ class AggregatorRegistry extends Registry { | |
type: GET_METRICS_REQ, | ||
requestId | ||
}; | ||
for (const id in cluster.workers) cluster.workers[id].send(message); | ||
const workers = this.coordinated | ||
? coordinatedWorkers | ||
: Object.keys(cluster.workers); | ||
for (const id of workers) { | ||
const worker = cluster.workers[id]; | ||
if (worker === undefined || worker.isDead() || !worker.isConnected()) { | ||
request.pending--; | ||
request.workerCount--; | ||
coordinatedWorkers.delete(id); // Set is safe to mutate while iterating | ||
} else { | ||
worker.send(message); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
|
@@ -81,7 +107,7 @@ class AggregatorRegistry extends Registry { | |
* the method specified by their `aggregator` property, or by summation if | ||
* `aggregator` is undefined. | ||
* @param {Array} metricsArr Array of metrics, each of which created by | ||
* `registry.getMetricsAsJSON()`. | ||
* `registry.getMetricsAsJSON()`. | ||
* @return {Registry} aggregated registry. | ||
*/ | ||
static aggregate(metricsArr) { | ||
|
@@ -122,7 +148,7 @@ class AggregatorRegistry extends Registry { | |
* Sets the registry or registries to be aggregated. Call from workers to | ||
* use a registry/registries other than the default global registry. | ||
* @param {Array<Registry>|Registry} regs Registry or registries to be | ||
* aggregated. | ||
* aggregated. | ||
* @return {void} | ||
*/ | ||
static setRegistries(regs) { | ||
|
@@ -167,9 +193,19 @@ function addListeners() { | |
if (request.failed) return; // Callback already run with Error. | ||
|
||
const registry = AggregatorRegistry.aggregate(request.responses); | ||
const g = new Gauge({ | ||
name: 'nodejs_prom_client_cluster_workers', | ||
help: 'Number of connected cluster workers reporting to prometheus', | ||
registers: [registry] | ||
}); | ||
g.set(request.workerCount); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noted, however this is very specific to the coordinated option - perhaps it should be exposed only when coordinated is true. Perhaps a similar metric could be added to the default metrics collected that counts all the cluster workers? |
||
const promString = registry.metrics(); | ||
request.done(null, promString); | ||
} | ||
} else if (message.type === REG_METRICS_WORKER) { | ||
//setup coordinated workers | ||
const workerId = message.workerId; | ||
coordinatedWorkers.add(workerId); | ||
} | ||
}); | ||
} | ||
|
@@ -186,6 +222,11 @@ if (cluster.isWorker) { | |
}); | ||
} | ||
}); | ||
|
||
process.send({ | ||
type: REG_METRICS_WORKER, | ||
workerId: cluster.worker.id | ||
}); | ||
} | ||
|
||
module.exports = AggregatorRegistry; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the only reason this needs to be an option because setting it to
true
makes the order of forking vs.new AggregatorRegistry()
matter? I'd rather go for an implementation that isn't sensitive to that, e.g. the worker repeatedly attempts to register with the master until the master acks the registration.Having a heterogeneous pool of workers (not all of them setting up prom-client) is unusual, but I think the behavior achieved when this is
true
is what should happen by default.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the only reason to make it an option is that it would break backwards compatibility.
Given that a metrics client should be as unobtrusive as possible, I would be wary of adding a prolonged discovery phase. I would certainly prefer in my consuming codebase to keep things simple and accept that ordering matters.
In the case of the homogenous cluster, the previous implementation cleanly sidesteps a lot of the issues with “garbage collecting” the workers and nobody else had complained so far :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, we will be breaking backwards compatibility pretty hard soon ish (see #177, #178 and #180), so don't let semver stop you from writing the code you'd like to write :D