Skip to content

Commit

Permalink
Merge branch 'master' into cv2-4851-error-capturing
Browse files Browse the repository at this point in the history
  • Loading branch information
DGaffney committed Jul 4, 2024
2 parents 346e596 + b1084d2 commit 8f56c9b
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 22 deletions.
75 changes: 75 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,78 @@ services:
condition: service_healthy
redis:
condition: service_healthy
video:
build: .
platform: linux/amd64
volumes:
- "./:/app"
env_file:
- ./.env_file
environment:
ROLE: worker
MODEL_NAME: video.Model
depends_on:
elasticmq:
condition: service_healthy
redis:
condition: service_healthy
mean_tokens:
build: .
platform: linux/amd64
volumes:
- "./:/app"
env_file:
- ./.env_file
environment:
ROLE: worker
MODEL_NAME: mean_tokens.Model
depends_on:
elasticmq:
condition: service_healthy
redis:
condition: service_healthy
fasttext:
build: .
platform: linux/amd64
volumes:
- "./:/app"
env_file:
- ./.env_file
environment:
ROLE: worker
MODEL_NAME: fasttext.Model
depends_on:
elasticmq:
condition: service_healthy
redis:
condition: service_healthy
fptg:
build: .
platform: linux/amd64
volumes:
- "./:/app"
env_file:
- ./.env_file
environment:
ROLE: worker
MODEL_NAME: fptg.Model
depends_on:
elasticmq:
condition: service_healthy
redis:
condition: service_healthy
indian_sbert:
build: .
platform: linux/amd64
volumes:
- "./:/app"
env_file:
- ./.env_file
environment:
ROLE: worker
MODEL_NAME: indian_sbert.Model
depends_on:
elasticmq:
condition: service_healthy
redis:
condition: service_healthy
34 changes: 26 additions & 8 deletions lib/queue/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from lib import schemas
from lib.logger import logger
from lib.queue.queue import Queue


class QueueProcessor(Queue):
@classmethod
def create(cls, model_name: str = None, batch_size: int = 10):
Expand All @@ -16,18 +18,22 @@ def create(cls, model_name: str = None, batch_size: int = 10):
input_queue_name = Queue.get_output_queue_name(model_name)
logger.info(f"Starting queue with: ('{input_queue_name}', {batch_size})")
return QueueProcessor(input_queue_name, batch_size)

def __init__(self, input_queue_name: str, output_queue_name: str = None, batch_size: int = 1):

def __init__(
self, input_queue_name: str, output_queue_name: str = None, batch_size: int = 1
):
"""
Start a specific queue - must pass input_queue_name - optionally pass output_queue_name, batch_size.
"""
super().__init__()
self.input_queue_name = input_queue_name
self.input_queues = self.restrict_queues_to_suffix(self.get_or_create_queues(input_queue_name), Queue.get_queue_suffix())
self.input_queues = self.restrict_queues_to_suffix(
self.get_or_create_queues(input_queue_name), Queue.get_queue_suffix()
)
self.all_queues = self.store_queue_map(self.input_queues)
logger.info(f"Processor listening to queues of {self.all_queues}")
self.batch_size = batch_size

def send_callbacks(self) -> List[schemas.Message]:
"""
Main routine. Given a model, in a loop, read tasks from input_queue_name at batch_size depth,
Expand All @@ -37,11 +43,14 @@ def send_callbacks(self) -> List[schemas.Message]:
messages_with_queues = self.receive_messages(self.batch_size)
if messages_with_queues:
logger.info(f"About to respond to: ({messages_with_queues})")
bodies = [schemas.parse_message(json.loads(message.body)) for message, queue in messages_with_queues]
bodies = [
schemas.parse_message(json.loads(message.body))
for message, queue in messages_with_queues
]
for body in bodies:
self.send_callback(body)
self.delete_messages(messages_with_queues)

def send_callback(self, message):
"""
Rescue against failures when attempting to respond (i.e. fingerprint) from models.
Expand All @@ -50,6 +59,15 @@ def send_callback(self, message):
logger.info(f"Message for callback is: {message}")
try:
callback_url = message.body.callback_url
requests.post(callback_url, json=message.dict())
response = requests.post(
callback_url,
json=message.dict(),
# headers={"Content-Type": "application/json"},
)
# check for error with the callback
if response.ok != True:
logger.error(f"Callback error responding to {callback_url} :{response}")
except Exception as e:
logger.error(f"Callback fail! Failed with {e} on {callback_url} with message of {message}")
logger.error(
f"Callback fail! Failed with {e} on {callback_url} with message of {message}"
)
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ opentelemetry-api==1.24.0
opentelemetry-exporter-otlp-proto-http==1.24.0
opentelemetry-sdk==1.24.0
redis==5.0.6
pact-python==2.2.0
pact-python==2.2.0
numpy==1.26.4
protobuf==3.20.0
29 changes: 16 additions & 13 deletions start_all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@
# Start the first process in the background
uvicorn main:app --host 0.0.0.0 --port ${PRESTO_PORT} --reload &

# Start the second process in the foreground
NUM_WORKERS=${NUM_WORKERS:-1} # Default to 1 worker if not specified
# Check if ROLE is set to "worker" and start workers if true
if [ "$ROLE" = "worker" ]; then
NUM_WORKERS=${NUM_WORKERS:-1} # Default to 1 worker if not specified

for i in $(seq 1 $NUM_WORKERS)
do
(
while true; do
echo "Starting run_worker.py instance $i..."
python run_worker.py worker-$i
echo "run_worker.py instance $i exited. Restarting..."
sleep 30 # Prevent potential rapid restart loop
done
) &. # run workers as background processes
done
for i in $(seq 1 $NUM_WORKERS)
do
(
while true; do
echo "Starting run_worker.py instance $i..."
python run_worker.py worker-$i
echo "run_worker.py instance $i exited. Restarting..."
sleep 30 # Prevent potential rapid restart loop
done
) & # run workers as background processes
done
fi

# Start the second process in the foreground
python run_processor.py

0 comments on commit 8f56c9b

Please sign in to comment.