Skip to content

Commit

Permalink
Merge pull request #8 from aserper/en_array_of_alerts
Browse files Browse the repository at this point in the history
Support array of alerts
  • Loading branch information
aserper committed Mar 28, 2024
2 parents b2d0b6f + f596634 commit 30e63a8
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 19 deletions.
2 changes: 1 addition & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def main():
print("Event is None.")
else:
print("Processing event...")
messageManager.postMessage(eventData)
messageManager.postMessage(eventData["alerts"])
print("Event processed completed successfully.")

except KeyboardInterrupt:
Expand Down
2 changes: 1 addition & 1 deletion message_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __buildAlertList(self, alerts):
for alert in alerts:
alertText = self.__buildAlert(alert)
if text:
text = f"{text}\n\n"
text = f"{text}\n"
text = f"{text}{alertText}"
return text

Expand Down
42 changes: 26 additions & 16 deletions message_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,38 +26,48 @@ def __init__(self):

# Retrieves a static map with a polygon path of the alert location and saves it as a file
# Returns True if map file saved successfully, False otherwise
def getStaticMap(self, eventData):
def buildStaticMap(self, alerts):
if not self.polygons:
return False

polygon = self.polygons.get(str(eventData["taCityId"]), None)
if polygon is None:
return False
overlays = []
for alert in alerts:
overlay = self.buildPolygonOverlay(alert)
if overlay is not None:
overlays.append(overlay)

lat = eventData["lat"]
lon = eventData["lon"]
# Encode polygon to polyline format
polylineEncoded = polyline.encode(polygon, 5)
# URL encode the polyline
URLEncoded = urllib.parse.quote(polylineEncoded.encode('utf-8'))
overlay = f"path+{self.strokeColor}+{self.strokeFill}({URLEncoded})"
url = f"https://api.mapbox.com/styles/v1/mapbox/{self.styleId}/static/{overlay}/{lon},{lat},10,0/400x400@2x?access_token={self.accessToken}"
overlaysString = ','.join(overlays)
url = f"https://api.mapbox.com/styles/v1/mapbox/{self.styleId}/static/{overlaysString}/auto/400x400@2x?padding=100&access_token={self.accessToken}"

try:
# Retrieve map and save to a file
with open(self.mapFile, 'wb') as f:
f.write(requests.get(url).content)
return True
except Exception as e:
print(f"getStaticMap() - Error writing file: {e}")
print(f"buildStaticMap() - Error writing file: {e}")
return False

def postMessage(self, eventData):
# Returns a URLEncoded polyline overlay for the alert
# location's polygon
def buildPolygonOverlay(self, alert):
polygon = self.polygons.get(str(alert["taCityId"]), None)
if polygon is None:
return None

# Encode polygon to polyline format
polylineEncoded = polyline.encode(polygon, 5)
# URL encode the polyline
URLEncoded = urllib.parse.quote(polylineEncoded.encode('utf-8'))
overlay = f"path+{self.strokeColor}+{self.strokeFill}({URLEncoded})"
return overlay

def postMessage(self, alerts):
print("Building alert message...")
content = AlertMessageBuilder().buildAlerts(eventData)
content = AlertMessageBuilder().buildAlerts(alerts)
print(content)

hasMap = self.getStaticMap(eventData)
hasMap = self.buildStaticMap(alerts)
file = self.mapFile if hasMap else None

MastodonBot().sendMessage(content, file)
Expand Down
2 changes: 1 addition & 1 deletion rocket_alert_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
class RocketAlertAPI:
def __init__(self):
self.baseURL = "https://agg.rocketalert.live/api/v1/alerts"
self.baseURL = "https://agg.rocketalert.live/api/v2/alerts"
self.customHeaderValue = os.environ['CUSTOM_HEADER_KEY']
self.customHeaderKey = "x-ra-agg-secret"
self.headers = {
Expand Down

0 comments on commit 30e63a8

Please sign in to comment.