-
Notifications
You must be signed in to change notification settings - Fork 28
/
geotags-extract
executable file
·184 lines (157 loc) · 6.38 KB
/
geotags-extract
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#!/usr/bin/python
# docstring used for argparse too, which doesn't know how to remove the indent.
'''
This script parses a day file containing tweets, extracts the ones with a
geotag, splits those into tokens, and saves the individual (token or n-gram,
geotag) pairs into a SpatiaLite database.'''
help_epilogue = '''
DB_FILE should not immediately follow --input; otherwise, it will be consumed
by the latter.
A note on --bigpages: Internal documentation for our large parallel
filesystems notes that the "highest bandwidths come from large I/O request
sizes, larger than 100KB". This suggests that "PRAGMA page_size = 65536" (the
maximum) may improve performance. However, brief tests on my workstation's
ext4 filesystem show reduced performance (roughly 2x) for the queries I
tested. Thus, my advice is to build without --bigpages and change the page
size later (if needed) when you are specifically working on the parallel
filesystems. See <http://www.sqlite.org/pragma.html#pragma_page_size>.'''
# The full dataset is accumulated into memory before being saved, in order to
# accommodate some level of parallelism into the same database file.
#
# FIXME/BUGS:
#
# * This script is not idempotent. Running it twice on the same input day file
# means you get everything in that file twice. (Re-loading a file could be
# implemented by looking at the max/min tweet IDs in a day file and deleting
# those tokens before re-loading them.)'''
import argparse
import os
from django.contrib.gis import geos
import db_glue
import geo.base
import geo.srs
import tweet
import u
SCHEMA_VERSION = 5
### Setup ###
ap = argparse.ArgumentParser(description=__doc__,
epilog=help_epilogue,
formatter_class=argparse.RawTextHelpFormatter)
ap._optionals.title = 'help' # see http://bugs.python.org/issue9694
gr = ap.add_argument_group('mode (one or both of)')
gr.add_argument('--init',
action='store_true',
help='initialize db before reading tweets')
gr.add_argument('--input',
metavar='FILES',
nargs='+',
help='tweet day files to read')
gr = ap.add_argument_group('output')
gr.add_argument('database_file',
metavar='DB_FILE',
help='database file to save tokens into')
gr = ap.add_argument_group('initialization')
gr.add_argument('--bigpages',
action='store_true',
help='use 64KiB pages')
gr = ap.add_argument_group('misc')
gr.add_argument('--heartbeat',
metavar='N',
type=int,
help='print a debug heartbeat every N geotagged tweets')
gr.add_argument('--limit',
metavar='N',
type=int,
help='load this many geotagged tweets per file and then stop')
gr.add_argument('--verbose',
action='store_true',
help='give more verbose output')
args = u.parse_args(ap)
if (not (args.init or args.input)):
ap.error('at least one of --init and --input is required')
if (not args.init and args.bigpages):
ap.error('--bigpages is only valid with --init')
l = u.logging_init('twgex')
### Main ###
def main():
l.info('starting')
if (args.init and os.path.exists(args.database_file)):
l.info('deleting %s per --init' % (args.database_file))
os.unlink(args.database_file)
db = db_glue.DB(args.database_file, create=(args.init))
l.debug('connected to database %s' % (args.database_file))
if (args.init is not None):
db_init(db)
if (args.input):
assert (db.metadata_get('schema_version') == str(SCHEMA_VERSION))
for filename in args.input:
load_day_file(db, filename)
l.info('done')
def db_init(db):
l.info('initializing %s' % (args.database_file))
if (not db.is_empty()):
u.abort('database is not empty, cannot initialize')
if (args.bigpages):
db.sql("PRAGMA page_size = 65536")
db_init_spatial_metadata(db)
db.sql("""CREATE TABLE metadata (
key TEXT PRIMARY KEY NOT NULL,
value TEXT NOT NULL)""")
db.metadata_set('schema_version', SCHEMA_VERSION)
db.sql("""CREATE TABLE tweet (
tweet_id INTEGER NOT NULL,
created_at TIMESTAMP NOT NULL,
day INTEGER NOT NULL,
hour INTEGER NOT NULL,
text TEXT NOT NULL,
user_screen_name TEXT NOT NULL,
user_description TEXT,
user_lang TEXT NOT NULL,
user_location TEXT,
user_time_zone TEXT)""")
db.sql("SELECT AddGeometryColumn('tweet', 'geom', ?, 'POINT', 'XY')",
(u.WGS84_SRID,))
indexes_create(db)
def db_init_spatial_metadata(db):
db.sql("SELECT InitSpatialMetadata()")
# add some more spatial reference systems that we need
# see http://www.gaia-gis.it/gaia-sins/spatialite-cookbook/html/metadata.html
def insert_srs(srid, ref_sys_name, proj4text):
db.insert('spatial_ref_sys', { 'srid': srid,
'auth_name': 'LOCAL',
'auth_srid': srid,
'ref_sys_name': ref_sys_name,
'proj4text': proj4text })
for (srid, srs) in geo.srs.CUSTOM_SRS.iteritems():
insert_srs(srid, srs[0], srs[1])
def load_day_file(db, filename):
# open day file
reader = tweet.Reader(filename)
l.debug('opened %s' % (filename))
# loop through tweets
tweet_ct = 0
tweet_inserts = list()
for tw in reader:
if (tw.geotagged_p()):
assert (tw.created_at.utcoffset().total_seconds() == 0) # require UTC
tweet_inserts.append(tw.to_dict())
tweet_ct += 1
if (args.heartbeat is not None and tweet_ct % args.heartbeat == 0):
l.debug('%d geotagged tweets' % (tweet_ct))
if (args.limit is not None and tweet_ct >= args.limit):
break
# save tweets
l.debug('inserting')
for i in tweet_inserts:
db.insert('tweet', i)
l.debug('committing')
db.commit()
l.info('loaded %d geotagged tweets from %s' % (tweet_ct, filename))
### Supporting functions ###
def indexes_create(db):
db.sql("CREATE INDEX tweet_created_at_idx ON tweet(created_at)")
def indexes_delete(db):
raise False, "unimplemented"
### Make it a script ###
if (__name__ == '__main__'):
main()