-
Notifications
You must be signed in to change notification settings - Fork 1
/
shootings.py
438 lines (366 loc) · 14 KB
/
shootings.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
"""Module for downloading and analyzing the shooting victims database."""
import gzip
import tempfile
from dataclasses import dataclass
from typing import Literal, Optional
import boto3
import carto2gpd
import geopandas as gpd
import numpy as np
import pandas as pd
import requests
import simplejson as json
from dotenv import find_dotenv, load_dotenv
from loguru import logger
from pydantic import BaseModel, Field, validator
from shapely.geometry import Point
from . import DATA_DIR, EPSG
from .courts import CourtInfoByIncident
from .geo import *
from .streets import StreetHotSpots
from .utils import validate_data_schema
class Geometry(Point):
"""
Shapely point geometry.
"""
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not isinstance(v, Point):
raise TypeError("shapely point object required")
return v
@classmethod
def __modify_schema__(cls, field_schema, field):
pass
def upload_to_s3(data, filename):
"""Upload data to a public AWS s3 bucket."""
# Load the credentials
load_dotenv(find_dotenv())
# Initialize the s3 resource
s3 = boto3.client("s3")
# Compress JSON
json_str = data.to_json() + "\n"
json_bytes = json_str.encode("utf-8")
with tempfile.TemporaryDirectory() as tmpdir:
tmpfile = f"{tmpdir}/{filename}"
with gzip.open(tmpfile, "w") as fout:
fout.write(json_bytes)
# Upload to s3
BUCKET = "gun-violence-dashboard"
s3.upload_file(
tmpfile,
BUCKET,
filename,
ExtraArgs={
"ContentType": "application/json",
"ContentEncoding": "gzip",
"ACL": "public-read",
},
)
def carto2gpd_post(url, table_name, where=None, fields=None):
"""Query carto API with a post call"""
# Get the fields
if fields is None:
fields = "*"
else:
if "the_geom" not in fields:
fields.append("the_geom")
fields = ",".join(fields)
# Build the query
query = f"SELECT {fields} FROM {table_name}"
if where:
query += f" WHERE {where}"
# Make the request
params = dict(q=query, format="geojson", skipfields=["cartodb_id"])
r = requests.post(url, data=params)
if r.status_code == 200:
return gpd.GeoDataFrame.from_features(r.json(), crs="EPSG:4326")
else:
raise ValueError("Error querying carto API")
def add_geographic_info(df):
"""Add geographic info."""
# Get a fresh copy
df = df.copy().reset_index(drop=True)
# The original length
original_length = len(df)
# Check city limits
city_limits = get_city_limits().to_crs(df.crs)
outside_limits = ~df.geometry.within(city_limits.squeeze().geometry)
missing = outside_limits.sum()
# Set missing geometry to null
logger.info(f"{missing} shootings outside city limits")
if missing > 0:
df.loc[outside_limits, "geometry"] = np.nan
# Try to replace any missing geometries from criminal incidents
dc_key_list = ", ".join(
df.loc[df.geometry.isnull(), "dc_key"].apply(lambda x: f"'{x}'")
)
# Query with a post request
url = "https://phl.carto.com/api/v2/sql"
table_name = "incidents_part1_part2"
where = f"dc_key IN ( {dc_key_list} )"
incidents = carto2gpd_post(url, table_name, where=where, fields=["dc_key"]).to_crs(
df.crs
)
incidents["dc_key"] = incidents["dc_key"].astype(str)
# Did we get any matches
matches = len(incidents)
logger.info(f"Found {matches} matches for {missing} missing geometries")
# Merge
if matches > 0:
missing_sel = df.geometry.isnull()
missing = df.loc[missing_sel]
df2 = missing.drop(columns=["geometry"]).merge(
incidents[["dc_key", "geometry"]].drop_duplicates(subset=["dc_key"]),
on="dc_key",
how="left",
)
df = pd.concat([df.loc[~missing_sel], df2]).reset_index(drop=True)
def _add_geo_info(data, geo):
out = gpd.sjoin(data, geo, how="left", predicate="within")
# NOTE: sometimes this will match multiple geo boundaries
# REMOVE THEM
duplicated = out.index.duplicated()
if duplicated.sum():
out = out.loc[~duplicated]
return out.drop(labels=["index_right"], axis=1)
# Add geographic columns
geo_funcs = [
get_zip_codes,
get_police_districts,
get_council_districts,
get_neighborhoods,
get_school_catchments,
get_pa_house_districts,
get_pa_senate_districts,
]
for geo_func in geo_funcs:
df = df.pipe(_add_geo_info, geo_func().to_crs(df.crs))
# if geo columns are missing, geometry should be empty point
df.loc[df["neighborhood"].isnull(), "geometry"] = np.nan
# Check the length
if len(df) != original_length:
raise ValueError("Length of data has changed; this shouldn't happen!")
return df
def load_existing_shootings_data():
"""Load existing shootings data."""
files = sorted((DATA_DIR / "processed").glob("shootings_20*.json"))
return pd.concat([gpd.read_file(f) for f in files])
class ShootingVictimsSchema(BaseModel):
"""Schema for the shooting victims dataset."""
dc_key: str = Field(
title="Incident number",
description="The unique incident number assigned by the Police Department.",
)
race: Literal["B", "H", "W", "A", "Other/Unknown"] = Field(
title="Race/Ethnicity",
description=(
"The race/ethnicity of the shooting victim. "
"Allowed values include: 'B' = Black, Non-Hispanic, 'H' = Hispanic, "
"'W' = White, Non-Hispanic, 'A' = Asian, and 'Other/Unknown'"
),
)
sex: Literal["M", "F"] = Field(
title="Sex", description="The sex of the shooting victim."
)
fatal: Literal[True, False] = Field(
title="Fatal?", description="Whether the incident was fatal."
)
date: str = Field(
title="Date",
description="The datetime of the incident in the format 'Y/m/d H:M:S'",
)
age_group: Literal[
"Younger than 18", "18 to 30", "31 to 45", "Older than 45", "Unknown"
] = Field(tile="Age group", description="The victim's age group (or unknown).")
has_court_case: Literal[True, False] = Field(
title="Associated Court Case?",
description="Does the incident number have an associated court case?",
)
# Not all ages are known
age: Optional[float] = Field(
default=None,
title="Age",
description="The victim's age; missing in some cases.",
)
# Optional geographic add-ons
geometry: Optional[Geometry] = Field(
default=None,
description="The lat/lng point location of the shooting incident; missing in some cases.",
)
street_name: Optional[str] = Field(
default=None,
title="Street name",
description="The name of the street the incident occurred on, if available.",
)
block_number: Optional[float] = Field(
default=None,
title="Block number",
description="The block number where the incident occurred, if available.",
)
zip_code: Optional[str] = Field(
default=None,
title="ZIP Code",
description="The ZIP code where the incident occurred, if available.",
)
council_district: Optional[str] = Field(
default=None,
title="Council district",
description="The council district where the incident occurred, if available.",
)
police_district: Optional[str] = Field(
default=None,
title="Police district",
description="The police district where the incident occurred, if available.",
)
neighborhood: Optional[str] = Field(
default=None,
title="Neighborhood name",
description="The name of the neighborhood where the incident occurred, if available.",
)
school_name: Optional[str] = Field(
default=None,
title="School catchment",
description="The elementary school catchment where the incident occurred, if available.",
)
house_district: Optional[str] = Field(
default=None,
title="PA House district",
description="The PA House district where the incident occurred, if available.",
)
senate_district: Optional[str] = Field(
default=None,
title="Block number",
description="The PA Senate district where the incident occurred, if available.",
)
segment_id: Optional[str] = Field(
default=None,
title="Block number",
description="The ID of the street segment where the incident occurred, if available.",
)
@validator("dc_key")
def verify_dc_key(cls, v):
if not isinstance(v, str):
assert not np.isnan(v), "cannot be NaN"
else:
assert not v.endswith(".0"), "bad string formatting"
return v
@dataclass
class ShootingVictimsData:
"""Class for downloading and analyzing the shooting victims
database from Open Data Philly."""
debug: bool = False
ignore_checks: bool = False
ENDPOINT: str = "https://phl.carto.com/api/v2/sql"
TABLE_NAME: str = "shootings"
@validate_data_schema(ShootingVictimsSchema)
def get(self) -> gpd.GeoDataFrame:
"""Download and return the formatted data."""
if self.debug:
logger.debug("Downloading shooting victims database")
# Raw data from carto
df = carto2gpd.get(self.ENDPOINT, self.TABLE_NAME)
# Verify DC key first
missing_dc_keys = df["dc_key"].isnull()
if missing_dc_keys.sum() and not self.ignore_checks:
n = missing_dc_keys.sum()
raise ValueError(f"Found {n} rows with missing DC keys")
# Format
df = (
df.assign(
time=lambda df: df.time.replace("<Null>", np.nan).fillna("00:00:00"),
date=lambda df: pd.to_datetime(
df.date_.str.slice(0, 10).str.cat(df.time, sep=" ")
),
dc_key=lambda df: df.dc_key.astype(float).astype(int).astype(str),
year=lambda df: df.date.dt.year,
race=lambda df: df.race.fillna("Other/Unknown"),
age=lambda df: df.age.astype(float),
age_group=lambda df: np.select(
[
df.age <= 17,
(df.age > 17) & (df.age <= 30),
(df.age > 30) & (df.age <= 45),
(df.age > 45),
],
["Younger than 18", "18 to 30", "31 to 45", "Older than 45"],
default="Unknown",
),
fatal=lambda df: df.fatal.apply(lambda x: True if x == 1 else False),
)
.assign(
race=lambda df: df.race.where(df.latino != 1, other="H"),
)
.drop(labels=["point_x", "point_y", "date_", "time", "objectid"], axis=1)
.sort_values("date", ascending=False)
.reset_index(drop=True)
.assign(
date=lambda df: df.date.dt.strftime("%Y/%m/%d %H:%M:%S")
) # Convert date back to string
.to_crs(epsg=EPSG)
)
# Add the other category for race/ethnicity
main_race_categories = ["H", "W", "B", "A"]
sel = df.race.isin(main_race_categories)
df.loc[~sel, "race"] = "Other/Unknown"
# CHECKS
if not self.ignore_checks:
old_df = load_existing_shootings_data()
TOLERANCE = 100
# Check for too many rows
if len(df) - len(old_df) > TOLERANCE:
logger.info(f"Length of new data: {len(df)}")
logger.info(f"Length of old data: {len(old_df)}")
raise ValueError(
"New data seems to have too many rows...please manually confirm new data is correct."
)
# Check for too few rows
TOLERANCE = 10
if len(old_df) - len(df) > TOLERANCE:
logger.info(f"Length of new data: {len(df)}")
logger.info(f"Length of old data: {len(old_df)}")
raise ValueError(
"New data seems to have too few rows...please manually confirm new data is correct."
)
# Add geographic info
df = add_geographic_info(df)
# Handle NaN/None
df = df.assign(
geometry=lambda df: df.geometry.fillna(Point()),
)
# Value-added info for hot spots and court info
hotspots = StreetHotSpots(debug=self.debug)
courts = CourtInfoByIncident(debug=self.debug)
df = (
df.pipe(hotspots.merge)
.pipe(courts.merge)
.assign(segment_id=lambda df: df.segment_id.replace("", np.nan))
)
# Trim to the schema fields
fields = ShootingVictimsSchema.__fields__.keys()
df = df[fields]
return df
def save(self, data):
"""Save annual, processed data files."""
# Get the years from the date
years = pd.to_datetime(data["date"]).dt.year
# Get unique years
# IMPORTANT: this must be int so it is JSON serializable
unique_years = [int(year) for year in sorted(np.unique(years), reverse=True)]
json.dump(unique_years, (DATA_DIR / "processed" / "data_years.json").open("w"))
# Save each year's data to separate file
for year in unique_years:
if self.debug:
logger.debug(f"Saving {year} shootings as a GeoJSON file")
# Get data for this year
# Save in EPSG = 4326
data_yr = data.loc[years == year].to_crs(epsg=4326)
data_yr.to_file(
DATA_DIR / "processed" / f"shootings_{year}.json",
driver="GeoJSON",
index=False,
)
# Save to s3
upload_to_s3(data_yr, f"shootings_{year}.json")