forked from Ameobea/tickgrinder
-
Notifications
You must be signed in to change notification settings - Fork 0
/
manager.js
148 lines (125 loc) · 4.39 KB
/
manager.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"use strict";
/*jslint node: true */
var express = require("express");
var path = require("path");
var bodyParser = require("body-parser");
var http = require("http");
var ws = require("nodejs-websocket");
var redis = require("redis");
var conf = require("./conf");
var manager = exports;
var uuid;
/// Generates a new V4 UUID in hyphenated form
function v4() {
function s4() {
return Math.floor((1 + Math.random()) * 0x10000)
.toString(16)
.substring(1);
}
return s4() + s4() + '-' + s4() + '-' + s4() + '-' +
s4() + '-' + s4() + s4() + s4();
}
manager.start = function(port){
var app = express();
var index = require('./routes/index');
var data = require("./routes/data");
app.engine('html', require('ejs').renderFile);
app.set('views', path.join(__dirname, 'views'));
app.set('view engine', 'ejs');
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({extended: true}));
app.listen(port, "0.0.0.0");
console.log("Manager webserver started!");
app.use("/", index);
app.use("/data", data);
app.use("/sources", express.static(__dirname + "/sources"));
var pubClient = getRedisClient();
var socketServer = ws.createServer(function(conn){
socketServer.on('error', function(err){
console.log(`Websocket server had some sort of error: ${err}`);
});
conn.on('text', function(txtMsg){ //broadcast to all
socketServer.connections.forEach(function(connection){
connection.sendText(txtMsg);
});
try {
var parsed = JSON.parse(txtMsg);
if(parsed.channel && parsed.message && parsed.message.cmd){
pubClient.publish(parsed.channel, JSON.stringify(parsed.message));
}
} catch(e) {}
});
}).listen(parseInt(conf.websocket_port), "0.0.0.0");
// usage: node manager.js uuid
uuid = process.argv[2];
if(!uuid) {
console.error("Usage: node manager.js uuid");
process.exit(0);
} else {
console.error(`MM now listening for commands on ${conf.redis_control_channel} and ${uuid}`);
}
// Create two Redis clients - one for subscribing and one for publishing
var subClient = getRedisClient();
subClient.subscribe(uuid);
subClient.subscribe(conf.redis_control_channel);
subClient.subscribe(conf.redis_responses_channel);
subClient.subscribe(conf.redis_log_channel);
subClient.on("message", (channel, message_str)=>{
// convert the {"Enum"}s to plain strings
message_str = message_str.replace(/{("\w*")}/g, "$1");
var wr_msg = JSON.parse(message_str);
// broadcast to websockets
socketServer.connections.forEach(function(connection){
var ws_msg = {channel: channel, message: wr_msg};
connection.sendText(JSON.stringify(ws_msg));
});
if(wr_msg.cmd && !wr_msg.cmd.Log){
var response = getResponse(wr_msg.cmd);
var wr_res = {uuid: wr_msg.uuid, res: response};
pubClient.publish(conf.redis_responses_channel, JSON.stringify(wr_res));
}
});
// signal to the platform that we're up and running
setTimeout(function(){
pubClient.publish(conf.redis_control_channel, JSON.stringify({uuid: v4(), cmd: {Ready: {instance_type: "MM", uuid: uuid}}}));
}, conf.cs_timeout);
app.use(function(err, req, res, next) {
res.status(err.status || 500);
console.log(err.stack);
res.render('error', {
message: err.message,
error: err
});
});
app.use(function(req, res, next) {
res.status(404).send('Resource not found');
});
};
manager.start(conf.mm_port);
/// Returns a new Redis client based on the settings in conf
function getRedisClient() {
var spl = conf.redis_host.split("://")[1].split(":");
return redis.createClient({
host: spl[0],
port: parseInt(spl[1]),
});
}
/// Processes a command and returns a Response to send back
function getResponse(command) {
switch(command) {
case "Ping":
var temp = JSON.parse(JSON.stringify(process.argv));
return {Pong: {args: temp.splice(2)}};
case "Kill":
// shut down in 3 seconds
setTimeout(function() {
console.log("MM is very tired...");
process.exit(0);
}, 3000);
return {Info: {info: "Shutting down in 3 seconds..."}};
case "Type":
return {Info: {info: "MM"}};
default:
return {Error: {status: "Command not recognized."}};
}
}