-
Notifications
You must be signed in to change notification settings - Fork 141
/
input_gae.js
96 lines (89 loc) · 2.85 KB
/
input_gae.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
var base_input = require('../lib/base_input'),
http = require('http'),
https = require('https'),
util = require('util'),
url = require('url'),
logger = require('log4node');
function InputGae() {
base_input.BaseInput.call(this);
this.mergeConfig({
name: 'Gae',
host_field: 'host',
port_field: 'port',
required_params: ['key'],
optional_params: ['ssl', 'polling', 'servlet_name', 'access_logs_type', 'access_logs_field_name', 'type'],
default_values: {
'ssl': false,
'polling': 60,
'servlet_name': 'logs',
},
start_hook: this.start,
});
}
util.inherits(InputGae, base_input.BaseInput);
InputGae.prototype.start = function(callback) {
this.proto = this.ssl ? https : http;
this.base_url = (this.ssl ? 'https' : 'http') + '://' + this.host + ':' + this.port + '/' + this.servlet_name;
this.current_timestamp = (new Date()).getTime();
logger.info('Start polling log from Google App Engine to', this.base_url, 'polling period', this.polling);
this.interval = setInterval(function() {
this.poll();
}.bind(this), this.polling * 1000);
this.poll();
callback();
};
InputGae.prototype.poll = function() {
if (this.in_progress) {
return;
}
this.in_progress = true;
var options = url.parse(this.base_url + '?start_timestamp=' + this.current_timestamp + '&log_key=' + this.key);
options.rejectUnauthorized = false;
var req = this.proto.get(options, function(res) {
if (res.statusCode === 200) {
var current_buffer = '';
res.on('data', function(l) {
current_buffer += l.toString();
var lines = current_buffer.split('\n');
current_buffer = lines.pop();
lines.forEach(function(l) {
try {
if (l !== '') {
var o = JSON.parse(l);
if (this.access_logs_type && this.access_logs_field_name && o[this.access_logs_field_name]) {
o.type = this.access_logs_type;
}
if (this.type && !o.type) {
o.type = this.type;
}
this.emit('data', o);
}
}
catch(e) {
this.emit('error', e);
}
}.bind(this));
}.bind(this));
this.current_timestamp = res.headers['x-log-end-timestamp'];
res.on('end', function() {
this.in_progress = false;
}.bind(this));
}
else {
this.emit('error', new Error('Google app engine return wrong return code ' + res.statusCode));
this.in_progress = false;
}
}.bind(this));
req.on('error', function(err) {
this.emit('error', err);
this.in_progress = false;
}.bind(this));
};
InputGae.prototype.close = function(callback) {
clearInterval(this.interval);
logger.info('Closing Google App Engine poller to ', this.host + ':' + this.port);
callback();
};
exports.create = function() {
return new InputGae();
};