Skip to content

Commit

Permalink
Merge pull request #7 from aserper/add_static_map_to_posts
Browse files Browse the repository at this point in the history
Add static map to posts
  • Loading branch information
aserper committed Mar 23, 2024
2 parents 9dc9abd + e454489 commit 53048f3
Show file tree
Hide file tree
Showing 5 changed files with 120,280 additions and 13 deletions.
4 changes: 3 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from message_manager import MessageManager

def main():
messageManager = MessageManager()
print("Connecting to server and starting listening to events...")

while True:
try:
with RocketAlertAPI().listenToServerEvents() as response:
Expand All @@ -23,7 +25,7 @@ def main():
print("Event is None.")
else:
print("Processing event...")
MessageManager().postMessage(eventData)
messageManager.postMessage(eventData)
print("Event processed completed successfully.")

except KeyboardInterrupt:
Expand Down
9 changes: 7 additions & 2 deletions mastodon_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def __init__(self):
self.clientSecret = os.environ["MASTO_CLIENTSECRET"]
self.api_baseurl = os.environ["MASTO_BASEURL"]

def sendMessage(self, content):
def sendMessage(self, content, file):
if len(content) > MAX_CHARACTERS:
content = self.truncateToMaxMessageSize(content)

Expand All @@ -26,8 +26,13 @@ def sendMessage(self, content):

if not isinstance(content, (list)):
content = [content]

for message in content:
mastodon.status_post(message)
if file is None:
mastodon.status_post(message)
else:
mastodon.status_post(message, media_ids=mastodon.media_post(media_file=file))


# Splits a message string whose length > MAX_CHARACTERS into a list of
# messages, the length of each of which < MAX_CHARACTERS
Expand Down
63 changes: 57 additions & 6 deletions message_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,70 @@
from mastodon_bot import MastodonBot
from twitter_bot import TwitterBot
from message_builder import AlertMessageBuilder
import os
import polyline
import json
import requests
import urllib

class MessageManager:
def postMessage(self, content):
def __init__(self):
self.styleId = "dark-v11"
self.accessToken = os.environ["MAPBOX_TOKEN"]
self.mapFile = "tmp_static_map.png"
self.strokeColor = "ff0000"
self.strokeFill = "bb1b1b"

try:
file = open("polygons.json")
self.polygons = json.load(file)
file.close()
except Exception as e:
self.polygons = None

# Retrieves a static map with a polygon path of the alert location and saves it as a file
# Returns True if map file saved successfully, False otherwise
def getStaticMap(self, eventData):
if not self.polygons:
return False

polygon = self.polygons.get(str(eventData["taCityId"]), None)
if polygon is None:
return False

lat = eventData["lat"]
lon = eventData["lon"]
# Encode polygon to polyline format
polylineEncoded = polyline.encode(polygon, 5)
# URL encode the polyline
URLEncoded = urllib.parse.quote(polylineEncoded.encode('utf-8'))
overlay = f"path+{self.strokeColor}+{self.strokeFill}({URLEncoded})"
url = f"https://api.mapbox.com/styles/v1/mapbox/{self.styleId}/static/{overlay}/{lon},{lat},10,0/400x400@2x?access_token={self.accessToken}"

try:
# Retrieve map and save to a file
with open(self.mapFile, 'wb') as f:
f.write(requests.get(url).content)
return True
except Exception as e:
print(f"getStaticMap() - Error writing file: {e}")
return False

def postMessage(self, eventData):
print("Building alert message...")
content = AlertMessageBuilder().buildAlerts(content)
content = AlertMessageBuilder().buildAlerts(eventData)
print(content)

MastodonBot().sendMessage(content)
hasMap = self.getStaticMap(eventData)
file = self.mapFile if hasMap else None

MastodonBot().sendMessage(content, file)
print("Message posted to Mastodon.")

telegtamFooter = "[RocketAlert.live](https://RocketAlert.live)"
TelegramBot().sendMessage(f"{content}{telegtamFooter}")
TelegramBot().sendMessage(f"{content}{telegtamFooter}", file)
print("Message posted to Telegram.")

TwitterBot().sendMessage(content)
print("Message posted to Twitter.")
# Disabling posting to Twitter for now as current tier limit doesn't support our use case.
# TwitterBot().sendMessage(content)
# print("Message posted to Twitter.")
Loading

0 comments on commit 53048f3

Please sign in to comment.