-
Notifications
You must be signed in to change notification settings - Fork 28
/
collect
executable file
·440 lines (396 loc) · 17 KB
/
collect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
#!/usr/bin/env python2.7
# This script slurps down tweets from the Twitter sample stream and puts them
# in files, unparsed, with a configurable number of tweets per file. Logic
# such as parsing is the responsibility of downstream programs.
#
# If the program receives SIGTERM or SIGINT (the latter generated by Control-C
# while running in a terminal), it will shut down gracefully.
#
# This script produces a fairly large amount of data; as of December 2011,
# here are some rough per-day estimates. Note that Twitter volume is both
# spiky and exponentially increasing (with 2-3 doublings per year). The
# following assumes 2350 bytes/tweet, or 370 after compression (gzip -9),
# which is what I measured in a brief test.
#
# Compressed?
# Level Sample Tweets no yes
# --------------- -------------------
# gardenhose 10% 30M 70GB 10GB
# spritzer 1% 3M 7GB 1GB
# FIXME/TODO:
#
# * It might be worthwhile to use dictionary config -- ATM it's kind of
# awkward.
#
# * Log memory use during heartbeat?
#
# * Log both the IP connected to as well as the IP resolved by an independent
# process (in order to check for DNS TTL problems).
#
# * Warn when exponential backoff gets large, and remove rate limiting on
# connection attempts.
#
# * Change to .gz compression of HTTP stream to save bandwidth.
#
# BUGS/QUIRKS:
#
# * Tweet counts may not be exactly correct, as the stream includes things
# which are not tweets and we do not parse anything.
#
# * If somehow two tweet files are opened in the same second, the second will
# overwrite the first.
#
# * Reconnection rate limiting doesn't include the reconnect delay.
#
# * Consider more exotic compressors; e.g., PPMd in 7z saves us about 45%.
#
# * While Twitter suggests three different classes of reconnection delay
# (immediate, linear backoff, exponential backoff), we just do exponential
# for simplicity and because the network between here and Twitter can be
# flaky. In the latter case, we want to give things time to settle instead
# of burning up our connection limit with too-fast reconnects.
import argparse
import collections
import errno
import gzip
import hashlib
import os
import os.path
import re
import signal
import sys
import time
import daemon
import tweetstream as ts # need my hacked version of tweetstream
import u
### Constants ###
SECONDS_PER_DAY = 86400
### Setup ###
ap = argparse.ArgumentParser(
description="Save unparsed tweets from streaming API into files.")
ap.add_argument("--config",
required=True,
help="location of config file",
metavar="FILE")
ap.add_argument("--daemon",
action="store_true",
help="run as a daemon process")
ap.add_argument("--daemon-debug",
action="store_true",
help="don't close stdout and stderr when daemonizing")
ap.add_argument("--verbose",
action="store_true",
help="be more verbose on console")
args = u.parse_args(ap)
c = u.configure(args.config)
l = None # set up logging later, after (potentially) daemonization
# where to put the tweet files
dumppath = u.path_configured(c.get("coll", "tweets_dir"))
# The signal handlers set this to true; the main loop then checks for this.
# (Signal handling setup is done in main_real() because DaemonContext mucks
# with the signal handling, and I don't want to do it redundantly.)
g_shutdown = False
# The HTTP libraries read these environment variables to decide whether to use
# a proxy, so clear them if needed.
if (c.getboolean("coll", "no_proxy")):
del os.environ["HTTP_PROXY"]
del os.environ["http_proxy"]
del os.environ["https_proxy"]
# FIXME: Read this code, then vomit. When you're done cleaning your workspace,
# read on for an explanation. urllib2, which tweetstream uses to connect to
# Twitter, doesn't provide any hook for us to specify the source IP address,
# so we are forced to use this monkey patch instead. We wrap the real
# httplib.HTTPSConnection class (called by urllib2) with one that has a
# default source_address and swap it in for the real version. Inspired by
# http://stackoverflow.com/questions/1150332/source-interface-with-python-and-urllib2,
# which does the same thing at a different level.
source_ip = c.get("coll", "source_ip")
if (source_ip):
import httplib
HTTPSConnection_real = httplib.HTTPSConnection
class HTTPSConnection_monkey(HTTPSConnection_real):
def __init__(*a, **kw):
HTTPSConnection_real.__init__(*a, source_address=(source_ip, 0), **kw)
httplib.HTTPSConnection = HTTPSConnection_monkey
### Program ###
def handle_sigterm(signum, frame):
global g_shutdown
g_shutdown = True
class ShutdownException(Exception):
"Raised when a shutdown command has been received and it is time to quit."
pass
class Tweet_Collector(object):
def __init__(self):
# FIXME: config checking here is kind of lame... can we put it elsewhere?
self.stream = None
self.keywords = None
self.tweets_total = 0
self.tweets_per_file = c.getint("coll", "tweets_per_file")
self.tweets_since_file = None
self.bytes_since_file = None
self.last_file_time = None
self.fp = None
self.filebase = None
self.s_per_heartbeat_conf = c.getint("coll", "seconds_per_heartbeat")
if (not u.is_power_2(self.s_per_heartbeat_conf)):
u.abort("seconds_per_heartbeat must be a power of 2; %d is not"
% (self.s_per_heartbeat_conf))
self.tweets_since_heartbeat = None
self.last_heartbeat_time = None
self.connect_log = \
collections.deque(maxlen=c.getint("coll", "connect_limit"))
self.reconnect_delay = c.getfloat("coll", "reconnect_delay_base")
# FIXME: This is causing more problems than it solves; disable for now.
# if (((c.getfloat("coll", "reconnect_delay_max")
# + c.getfloat("coll", "connect_ok_duration"))
# * c.getint("coll", "connect_limit"))
# > (c.getfloat("coll", "connect_limit_interval") * 0.8)):
# u.abort("connection limits might be unreachable; check your config")
def main(self):
if (args.daemon):
d = daemon.DaemonContext()
d.umask=0007
if (args.daemon_debug):
d.stderr=sys.stderr
d.stdout=sys.stdout
with d:
self.main_real()
else:
self.main_real()
def main_real(self):
global l
l = u.logging_init("twcol")
try:
# See note above with g_shutdown for why these are here.
signal.signal(signal.SIGTERM, handle_sigterm)
signal.signal(signal.SIGINT, handle_sigterm)
l.info("starting, my pid is %d" % (os.getpid()))
if (source_ip):
l.info("collecting on IP %s" % source_ip)
l.info("tweet files in %s" % (dumppath))
if (args.daemon_debug):
u.abort("running with --daemon-debug will cause you trouble later")
self.stream_init()
while True:
try:
self.collect()
except ShutdownException:
break
except ts.ReconnectError, x:
l.warning("connection failed: %s" % (str(x)))
if (not self.reconnect_barrier()):
u.abort("reconnect vetoed")
l.debug("reconnecting")
l.info("done")
except Exception:
u.abort("unhandled exception:", exc_info=True)
def collect(self):
self.connect_log.append(time.time()) # log attempts, not success
self.stream._init_conn()
l.info("connected to Twitter")
now = time.time()
self.s_per_heartbeat = None
self.heartbeat_reset(now)
byte_ct = 0 # in case we lose connection before 1st tweet
fp = self.file_open(now)
try:
for tweet in self.stream:
if (g_shutdown):
l.info("shutdown request received")
# We want the raw tweet, i.e., the sequence of bytes that came out
# of the socket. In Python 2.7, this is the "str" type.
assert isinstance(tweet, str)
byte_ct = len(tweet)
self.fp.write(tweet)
self.heartbeat_maybe()
self.file_rotate_maybe(byte_ct, stopping=g_shutdown)
if (g_shutdown):
raise ShutdownException
# error injection
#if (u.rand.random() < 0.01):
# self.stream.close()
# raise ts.ReconnectExponentiallyError("test")
except (ts.ConnectionError):
# The connection failed somehow. Close out this collection session;
# we'll let the caller deal with reconnection issues if needed.
self.file_rotate_maybe(byte_ct, stopping=True)
raise
def file_close(self, now):
self.fp.close()
l.debug("closed %s" % (self.filebase))
seconds_ct = now - self.last_file_time
tweets_per_second = self.tweets_since_file / seconds_ct
tweets_per_day = tweets_per_second * SECONDS_PER_DAY
bytes_raw_per_second = self.bytes_since_file / seconds_ct
bytes_raw_per_day = bytes_raw_per_second * SECONDS_PER_DAY
bytes_comp = os.path.getsize("%s/%s.json.gz" % (dumppath, self.filebase))
bytes_comp_per_second = bytes_comp / seconds_ct
bytes_comp_per_day = bytes_comp_per_second * SECONDS_PER_DAY
info_fp = open("%s/%s.stats" % (dumppath, self.filebase), "w")
def p(msg):
l.debug(" " + msg)
print >>info_fp, msg
p("seconds %13.1f %9s"
% (seconds_ct, u.fmt_seconds(seconds_ct)))
p("tweets %11d %9s"
% (self.tweets_since_file, u.fmt_si(self.tweets_since_file)))
p("tweets_per_second %13.1f"
% (tweets_per_second))
p("tweets_per_day %13.1f %9s"
% (tweets_per_day, u.fmt_si(tweets_per_day)))
p("bytes_raw %11d %9s"
% (self.bytes_since_file, u.fmt_bytes(self.bytes_since_file)))
p("bytes_raw_per_second %13.1f %9s"
% (bytes_raw_per_second, u.fmt_bytes(bytes_raw_per_second)))
p("bytes_raw_per_day %13.1f %9s"
% (bytes_raw_per_day, u.fmt_bytes(bytes_raw_per_day)))
p("bytes_comp %11d %9s"
% (bytes_comp, u.fmt_bytes(bytes_comp)))
p("bytes_comp_per_second %13.1f %9s"
% (bytes_comp_per_second, u.fmt_bytes(bytes_comp_per_second)))
p("bytes_comp_per_day %13.1f %9s"
% (bytes_comp_per_day, u.fmt_bytes(bytes_comp_per_day)))
if (self.keywords):
keywords_fp = open("%s/%s.keywords" % (dumppath, self.filebase), "w")
keywords_fp.write(self.keywords.dump())
def file_open(self, now):
# FIXME: This should be UTC, but I'm not bothering to fix it now. If
# changed, should rename all historic files as well.
subdir = time.strftime('%Y-%m', time.localtime(now))
filebase = time.strftime('%Y%m%d_%H%M%S', time.localtime(now))
try:
os.makedirs('%s/%s' % (dumppath, subdir))
l.info('created subdirectory %s' % (subdir))
except OSError, x:
if (x.errno != errno.EEXIST):
raise
self.filebase = '%s/%s' % (subdir, filebase)
self.fp = gzip.open('%s/%s.json.gz' % (dumppath, self.filebase), 'wb', 9)
l.debug('opened %s' % (self.filebase))
self.tweets_since_file = 0
self.bytes_since_file = 0
self.last_file_time = now
def file_rotate_maybe(self, byte_ct, stopping=False):
self.bytes_since_file += byte_ct
self.tweets_since_file += 1
if (stopping or self.tweets_since_file >= self.tweets_per_file):
assert (stopping or self.tweets_since_file == self.tweets_per_file)
now = time.time()
self.file_close(now)
if (not stopping):
self.file_open(now)
def heartbeat_maybe(self):
self.tweets_total += 1
self.tweets_since_heartbeat += 1
now = time.time()
seconds_ct = now - self.last_heartbeat_time
if (seconds_ct >= self.s_per_heartbeat):
tweets_per_second = self.tweets_since_heartbeat / seconds_ct
tweets_per_day = tweets_per_second * SECONDS_PER_DAY
l.debug("%d tweets, %d in last %s (%s/s, %s/day)"
% (self.tweets_total,
self.tweets_since_heartbeat,
u.fmt_seconds(seconds_ct),
u.fmt_si(tweets_per_second),
u.fmt_si(tweets_per_day)))
self.heartbeat_reset(now)
def heartbeat_reset(self, now):
self.tweets_since_heartbeat = 0
self.last_heartbeat_time = now
# The reasoning here: we want more frequent heartbeats when the
# connection is just starting up. So, we'll start with a heartbeat every
# 1 second and then double the interval after each beat until we reach
# the configured interval.
if (self.s_per_heartbeat is None):
self.s_per_heartbeat = 1
elif (self.s_per_heartbeat < self.s_per_heartbeat_conf):
self.s_per_heartbeat *= 2
assert (1 <= self.s_per_heartbeat <= self.s_per_heartbeat_conf)
def reconnect_barrier(self):
"""This function declares an intention to reconnect. Sleep for an
appropriate time, then return True if the reconnection is permissible
or False otherwise. The role of this function is to enforce
connection frequency and other limits. If the reconnect is rejected,
a reason is logged."""
now = time.time()
# Check whether the reconnect would violate the frequency limit (and
# also log some stuff about the check). Recall that _log is a limited
# length deque; specifically, the first item will be the timestamp of
# the nth most recent connection. If this was within the limit time
# interval, then too many connections occurred within the interval and
# we should veto the proposed one.
count = len(self.connect_log)
limit_interval = c.getfloat("coll", "connect_limit_interval")
total_interval = now - self.connect_log[0]
l.info("%d connections in last %s"
% (count, u.fmt_seconds(total_interval)))
if (count >= c.getint("coll", "connect_limit")
and total_interval < limit_interval):
l.error("connection frequency is too high")
return False
# No reason to veto; sleep and then return.
last_interval = now - self.connect_log[-1]
l.info("last connect attempt %s ago" % (u.fmt_seconds(last_interval)))
if (last_interval >= c.getfloat("coll", "connect_ok_duration")):
l.debug("last connection was stable; resetting reconnect delay")
self.reconnect_delay = c.getfloat("coll", "reconnect_delay_base")
l.info("reconnect ok; sleeping %s"
% (u.fmt_seconds(self.reconnect_delay)))
if (self.reconnect_delay == c.getfloat("coll", "reconnect_delay_max")):
l.warning("maximum delay before reconnect reached")
time.sleep(self.reconnect_delay)
if (g_shutdown):
l.info("shutdown request received during sleep, vetoing reconnect")
return False
self.reconnect_delay = min((self.reconnect_delay
* c.getfloat("coll", "reconnect_delay_mult")),
c.getfloat("coll", "reconnect_delay_max"))
return True
def stream_init(self):
keywords_file = c.get("coll", "keywords_file")
if (not keywords_file):
self.keywords = None
self.stream = ts.SampleStream(
c.get("coll", "consumer_key"),
c.get("coll", "consumer_secret"),
c.get("coll", "access_token"),
c.get("coll", "access_secret"),
timeout=c.getint("coll", "socket_timeout"),
raw=True)
l.info("initialized statuses/sample stream")
else:
self.keywords = Keywords(u.path_configured(keywords_file))
self.stream = ts.FilterStream(
c.get("coll", "consumer_key"),
c.get("coll", "consumer_secret"),
c.get("coll", "access_token"),
c.get("coll", "access_secret"),
timeout=c.getint("coll", "socket_timeout"),
raw=True,
track=iter(self.keywords))
l.info("initialized statuses/filter stream with %d of %d keywords"
% (len(self.keywords), c.getint("coll", "keywords_limit")))
class Keywords(object):
__slots__ = ("keywords")
def __init__(self, filename):
self.keywords = self.parse(filename)
def __iter__(self):
return self.keywords.__iter__()
def __len__(self):
return len(self.keywords)
def dump(self):
s = "\n".join(sorted(self.keywords)) + "\n"
return ('# { "keyword_count" : %d, "md5sum" : "%s" }\n%s'
% (len(self), hashlib.md5(s).hexdigest(), s))
def parse(self, filename):
text = open(filename).read()
# Remove comments. FIXME: re.MULTILINE flag doesn't work for some reason.
text = re.sub(r"(?m)#.*$", "", text)
keywords = set()
for line in text.split("\n"):
kw = line.strip().lower()
if (kw):
keywords.add(kw)
return keywords
if (__name__ == "__main__"):
Tweet_Collector().main()