From 372bb75a66def213be09a474cd9aed8d0f5a458b Mon Sep 17 00:00:00 2001 From: Orestis Markou Date: Mon, 26 Mar 2018 19:05:34 +0200 Subject: [PATCH 1/4] ensure that only registered cluster workers are asked to report metrics --- lib/cluster.js | 49 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/lib/cluster.js b/lib/cluster.js index 84118c36..b2f42733 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -10,11 +10,13 @@ const cluster = require('cluster'); const Registry = require('./registry'); +const Gauge = require('./gauge'); const util = require('./util'); const aggregators = require('./metricAggregators').aggregators; const GET_METRICS_REQ = 'prom-client:getMetricsReq'; const GET_METRICS_RES = 'prom-client:getMetricsRes'; +const REG_METRICS_WORKER = 'prom-client:registerMetricsWorker'; let registries = [Registry.globalRegistry]; let requestCtr = 0; // Concurrency control @@ -25,7 +27,22 @@ class AggregatorRegistry extends Registry { constructor() { super(); addListeners(); - } + this.aliveWorkers = new Set(); + let that = this; + cluster.on('message', function(worker, message) { + if (arguments.length === 2) { + // pre-Node.js v6.0 + message = worker; + worker = undefined; + } + if (message.type === REG_METRICS_WORKER) { + let workerId = message.workerId; + console.log("metrics registering workerId", workerId); + that.aliveWorkers.add(workerId); + console.log(that.aliveWorkers); + } + }); + } /** * Gets aggregated metrics for all workers. The optional callback and @@ -36,9 +53,10 @@ class AggregatorRegistry extends Registry { */ clusterMetrics(callback) { const requestId = requestCtr++; + let that = this; return new Promise((resolve, reject) => { - const nWorkers = Object.keys(cluster.workers).length; + const nWorkers = that.aliveWorkers.size; function done(err, result) { // Don't resolve/reject the promise if a callback is provided @@ -56,6 +74,7 @@ class AggregatorRegistry extends Registry { const request = { responses: [], + workerCount: nWorkers, pending: nWorkers, done, errorTimeout: setTimeout(() => { @@ -71,7 +90,18 @@ class AggregatorRegistry extends Registry { type: GET_METRICS_REQ, requestId }; - for (const id in cluster.workers) cluster.workers[id].send(message); + let failedWorkers = new Set(); + for (const id of that.aliveWorkers) { + let worker = cluster.workers[id]; + if (worker === undefined || worker.isDead() || !worker.isConnected()) { + failedWorkers.add(id); + request.pending --; + request.workerCount --; + } else { + worker.send(message) + } + } + that.aliveWorkers = new Set([...that.aliveWorkers].filter(x => !failedWorkers.has(x))); }); } @@ -167,10 +197,16 @@ function addListeners() { if (request.failed) return; // Callback already run with Error. const registry = AggregatorRegistry.aggregate(request.responses); + let g = new Gauge({ + name: 'nodejs_prom_client_cluster_workers', + help: 'Number of connected cluster workers reporting to prometheus', + registers: [registry] + }) + g.set(request.workerCount); const promString = registry.metrics(); request.done(null, promString); } - } + } }); } } @@ -186,6 +222,11 @@ if (cluster.isWorker) { }); } }); + + process.send({ + type: REG_METRICS_WORKER, + workerId: cluster.worker.id + }) } module.exports = AggregatorRegistry; From 7634cdfb8688305150d77c92d225a3247550c450 Mon Sep 17 00:00:00 2001 From: Orestis Markou Date: Tue, 27 Mar 2018 10:31:49 +0200 Subject: [PATCH 2/4] convert spaces to tabs, remove console.logs --- lib/cluster.js | 78 +++++++++++++++++++++++++------------------------- 1 file changed, 39 insertions(+), 39 deletions(-) diff --git a/lib/cluster.js b/lib/cluster.js index b2f42733..7721b9f9 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -27,36 +27,34 @@ class AggregatorRegistry extends Registry { constructor() { super(); addListeners(); - this.aliveWorkers = new Set(); - let that = this; + this.aliveWorkers = new Set(); + const that = this; cluster.on('message', function(worker, message) { if (arguments.length === 2) { // pre-Node.js v6.0 message = worker; worker = undefined; } - if (message.type === REG_METRICS_WORKER) { - let workerId = message.workerId; - console.log("metrics registering workerId", workerId); - that.aliveWorkers.add(workerId); - console.log(that.aliveWorkers); - } - }); - } + if (message.type === REG_METRICS_WORKER) { + const workerId = message.workerId; + that.aliveWorkers.add(workerId); + } + }); + } /** * Gets aggregated metrics for all workers. The optional callback and * returned Promise resolve with the same value; either may be used. * @param {Function?} callback (err, metrics) => any * @return {Promise} Promise that resolves with the aggregated - * metrics. + * metrics. */ clusterMetrics(callback) { const requestId = requestCtr++; - let that = this; + const that = this; return new Promise((resolve, reject) => { - const nWorkers = that.aliveWorkers.size; + const nWorkers = that.aliveWorkers.size; function done(err, result) { // Don't resolve/reject the promise if a callback is provided @@ -74,7 +72,7 @@ class AggregatorRegistry extends Registry { const request = { responses: [], - workerCount: nWorkers, + workerCount: nWorkers, pending: nWorkers, done, errorTimeout: setTimeout(() => { @@ -90,18 +88,20 @@ class AggregatorRegistry extends Registry { type: GET_METRICS_REQ, requestId }; - let failedWorkers = new Set(); - for (const id of that.aliveWorkers) { - let worker = cluster.workers[id]; - if (worker === undefined || worker.isDead() || !worker.isConnected()) { - failedWorkers.add(id); - request.pending --; - request.workerCount --; - } else { - worker.send(message) - } - } - that.aliveWorkers = new Set([...that.aliveWorkers].filter(x => !failedWorkers.has(x))); + const failedWorkers = new Set(); + for (const id of that.aliveWorkers) { + const worker = cluster.workers[id]; + if (worker === undefined || worker.isDead() || !worker.isConnected()) { + failedWorkers.add(id); + request.pending--; + request.workerCount--; + } else { + worker.send(message); + } + } + that.aliveWorkers = new Set( + [...that.aliveWorkers].filter(x => !failedWorkers.has(x)) + ); }); } @@ -111,7 +111,7 @@ class AggregatorRegistry extends Registry { * the method specified by their `aggregator` property, or by summation if * `aggregator` is undefined. * @param {Array} metricsArr Array of metrics, each of which created by - * `registry.getMetricsAsJSON()`. + * `registry.getMetricsAsJSON()`. * @return {Registry} aggregated registry. */ static aggregate(metricsArr) { @@ -152,7 +152,7 @@ class AggregatorRegistry extends Registry { * Sets the registry or registries to be aggregated. Call from workers to * use a registry/registries other than the default global registry. * @param {Array|Registry} regs Registry or registries to be - * aggregated. + * aggregated. * @return {void} */ static setRegistries(regs) { @@ -197,16 +197,16 @@ function addListeners() { if (request.failed) return; // Callback already run with Error. const registry = AggregatorRegistry.aggregate(request.responses); - let g = new Gauge({ - name: 'nodejs_prom_client_cluster_workers', - help: 'Number of connected cluster workers reporting to prometheus', - registers: [registry] - }) - g.set(request.workerCount); + const g = new Gauge({ + name: 'nodejs_prom_client_cluster_workers', + help: 'Number of connected cluster workers reporting to prometheus', + registers: [registry] + }); + g.set(request.workerCount); const promString = registry.metrics(); request.done(null, promString); } - } + } }); } } @@ -223,10 +223,10 @@ if (cluster.isWorker) { } }); - process.send({ - type: REG_METRICS_WORKER, - workerId: cluster.worker.id - }) + process.send({ + type: REG_METRICS_WORKER, + workerId: cluster.worker.id + }); } module.exports = AggregatorRegistry; From 564bc132f7d06ca3fec0f0993739e21ead230099 Mon Sep 17 00:00:00 2001 From: Orestis Markou Date: Tue, 27 Mar 2018 11:07:26 +0200 Subject: [PATCH 3/4] move new coordinated cluster behaviour behind a constructor flag in AggregatorRegistry --- lib/cluster.js | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/lib/cluster.js b/lib/cluster.js index 7721b9f9..fb98b744 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -21,25 +21,21 @@ const REG_METRICS_WORKER = 'prom-client:registerMetricsWorker'; let registries = [Registry.globalRegistry]; let requestCtr = 0; // Concurrency control let listenersAdded = false; +const coordinatedWorkers = new Set(); const requests = new Map(); // Pending requests for workers' local metrics. class AggregatorRegistry extends Registry { - constructor() { + /** + Create an AggregatorRegistry instance. Accepts an optional `options` object. + + Options are: + coordinated If false (default), request metrics from all cluster workers. If true, request metrics only from workers that have required prom-client. + * @param {object?} options object + */ + constructor({ coordinated } = {}) { super(); + this.coordinated = coordinated || false; addListeners(); - this.aliveWorkers = new Set(); - const that = this; - cluster.on('message', function(worker, message) { - if (arguments.length === 2) { - // pre-Node.js v6.0 - message = worker; - worker = undefined; - } - if (message.type === REG_METRICS_WORKER) { - const workerId = message.workerId; - that.aliveWorkers.add(workerId); - } - }); } /** @@ -51,10 +47,11 @@ class AggregatorRegistry extends Registry { */ clusterMetrics(callback) { const requestId = requestCtr++; - const that = this; return new Promise((resolve, reject) => { - const nWorkers = that.aliveWorkers.size; + const nWorkers = this.coordinated + ? coordinatedWorkers.size + : Object.keys(cluster.workers).length; function done(err, result) { // Don't resolve/reject the promise if a callback is provided @@ -88,20 +85,19 @@ class AggregatorRegistry extends Registry { type: GET_METRICS_REQ, requestId }; - const failedWorkers = new Set(); - for (const id of that.aliveWorkers) { + const workers = this.coordinated + ? coordinatedWorkers + : Object.keys(cluster.workers); + for (const id of workers) { const worker = cluster.workers[id]; if (worker === undefined || worker.isDead() || !worker.isConnected()) { - failedWorkers.add(id); request.pending--; request.workerCount--; + coordinatedWorkers.delete(id); // Set is safe to mutate while iterating } else { worker.send(message); } } - that.aliveWorkers = new Set( - [...that.aliveWorkers].filter(x => !failedWorkers.has(x)) - ); }); } @@ -205,6 +201,10 @@ function addListeners() { g.set(request.workerCount); const promString = registry.metrics(); request.done(null, promString); + } else if (message.type === REG_METRICS_WORKER) { + //setup coordinated workers + const workerId = message.workerId; + coordinatedWorkers.add(workerId); } } }); From 3ae56aeed098a3b5d32eb5f26b874593a7305cf6 Mon Sep 17 00:00:00 2001 From: Orestis Markou Date: Tue, 27 Mar 2018 11:17:17 +0200 Subject: [PATCH 4/4] fix misplaced if statement --- lib/cluster.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/cluster.js b/lib/cluster.js index fb98b744..f9ce4382 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -201,11 +201,11 @@ function addListeners() { g.set(request.workerCount); const promString = registry.metrics(); request.done(null, promString); - } else if (message.type === REG_METRICS_WORKER) { - //setup coordinated workers - const workerId = message.workerId; - coordinatedWorkers.add(workerId); } + } else if (message.type === REG_METRICS_WORKER) { + //setup coordinated workers + const workerId = message.workerId; + coordinatedWorkers.add(workerId); } }); }