Skip to content

Commit

Permalink
Merge pull request #86 from meedan/cv2-4639-task-timing
Browse files Browse the repository at this point in the history
CV2-4639 add metrics for task timing
  • Loading branch information
DGaffney authored May 28, 2024
2 parents 7869707 + d9fc706 commit fdbaff9
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 45 deletions.
11 changes: 0 additions & 11 deletions .env_file

This file was deleted.

5 changes: 5 additions & 0 deletions .env_file.test
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@ S3_ENDPOINT=http://minio:9000
AWS_DEFAULT_REGION=us-east-1
AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
OTEL_SERVICE_NAME=my-service-name
OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
OTEL_EXPORTER_OTLP_ENDPOINT="https://api.honeycomb.io"
OTEL_EXPORTER_OTLP_HEADERS="x-honeycomb-team=XXX"
HONEYCOMB_API_ENDPOINT="https://api.honeycomb.io"
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
*.cpython-39.pyc
*.pyc
.env_file
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ services:
- AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION}
- S3_ENDPOINT=${S3_ENDPOINT}
- QUEUE_SUFFIX=${QUEUE_SUFFIX}
- OTEL_SERVICE_NAME=${OTEL_SERVICE_NAME}
- OTEL_EXPORTER_OTLP_PROTOCOL=${OTEL_EXPORTER_OTLP_PROTOCOL}
- OTEL_EXPORTER_OTLP_ENDPOINT=${OTEL_EXPORTER_OTLP_ENDPOINT}
- OTEL_EXPORTER_OTLP_HEADERS=${OTEL_EXPORTER_OTLP_HEADERS}
- HONEYCOMB_API_KEY=${HONEYCOMB_API_KEY}
- HONEYCOMB_API_ENDPOINT=${HONEYCOMB_API_ENDPOINT}
env_file:
- ./.env_file
depends_on:
Expand Down
51 changes: 43 additions & 8 deletions lib/queue/worker.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import pdb
import os
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import json
from typing import List, Tuple
from typing import List, Tuple, Any
from lib import schemas
from lib.logger import logger
from lib.queue.queue import Queue, MAX_RETRIES
from lib.model.model import Model
from lib.sentry import capture_custom_message
from lib.helpers import get_environment_setting
from lib.telemetry import OpenTelemetryExporter

TIMEOUT_SECONDS = int(os.getenv("WORK_TIMEOUT_SECONDS", "60"))
OPEN_TELEMETRY_EXPORTER = OpenTelemetryExporter(service_name="QueueWorkerService", local_debug=False)

class QueueWorker(Queue):
@classmethod
Expand Down Expand Up @@ -60,11 +64,11 @@ def safely_respond(self, model: Model) -> List[schemas.Message]:
if not messages_with_queues:
return []
messages = self.extract_messages(messages_with_queues, model)
responses, success = self.execute_with_timeout(model.respond, messages, timeout_seconds=TIMEOUT_SECONDS)
responses, success = self.execute_with_timeout(model, messages, timeout_seconds=TIMEOUT_SECONDS)
if success:
self.delete_processed_messages(messages_with_queues)
else:
self.increment_message_error_counts(messages_with_queues) # Add the new functionality here
self.increment_message_error_counts(messages_with_queues)
return responses

@staticmethod
Expand All @@ -82,31 +86,62 @@ def extract_messages(messages_with_queues: List[Tuple], model: Model) -> List[sc
for message, queue in messages_with_queues]

@staticmethod
def execute_with_timeout(func, args, timeout_seconds: int) -> List[schemas.Message]:
def execute_with_timeout(model, args, timeout_seconds: int) -> List[schemas.Message]:
"""
Executes a given hasher/fingerprinter with a specified timeout. If the hasher/fingerprinter execution time exceeds the timeout,
logs an error and returns an empty list.
Parameters:
- func (callable): The function to execute.
- model (callable): The model to execute a respond request upon.
- args (any): The arguments to pass to the function.
- timeout_seconds (int): The maximum number of seconds to wait for the function to complete.
Returns:
- List[schemas.Message]: The result of the function if it completes within the timeout, otherwise an empty list.
"""
start_time = time.time()
try:
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(func, args)
return future.result(timeout=timeout_seconds), True
future = executor.submit(model.respond, args)
result = future.result(timeout=timeout_seconds)
execution_time = time.time() - start_time
QueueWorker.log_execution_time(model.model_name, execution_time)
QueueWorker.log_execution_status(model.model_name, "successful_message_response")
return result, True
except TimeoutError:
error_message = "Model respond timeout exceeded."
QueueWorker.log_and_handle_error(error_message)
QueueWorker.log_execution_status(model.model_name, "timeout_message_response")
return [], False
except Exception as e:
QueueWorker.log_and_handle_error(str(e))
QueueWorker.log_execution_status(model.model_name, "error_message_response")
return [], False

@staticmethod
def log_execution_time(func_name: str, execution_time: float):
"""
Logs the execution time of a function to OpenTelemetry.
Parameters:
- func_name (str): The name of the function that was executed.
- execution_time (float): The time taken to execute the function.
"""
logger.debug(f"Function {func_name} executed in {execution_time:.2f} seconds.")
OPEN_TELEMETRY_EXPORTER.log_execution_time(func_name, execution_time)

@staticmethod
def log_execution_status(func_name: str, logging_metric: str):
"""
Logs the execution of a function to CloudWatch.
Parameters:
- func_name (str): The name of the function that was executed.
- logging_metric (func): The function to log the message status to - log as success, timeout, or error
"""
logger.info(f"Function {func_name} executed, passing to {logging_metric}")
OPEN_TELEMETRY_EXPORTER.log_execution_status(func_name, logging_metric)

@staticmethod
def log_and_handle_error(error):
"""
Expand Down Expand Up @@ -145,4 +180,4 @@ def increment_message_error_counts(self, messages_with_queues: List[Tuple]):
updated_message = schemas.parse_message(message_body)
updated_message.retry_count = retry_count
queue.delete_messages(Entries=[self.delete_message_entry(message)])
self.push_message(self.input_queue_name, updated_message)
self.push_message(self.input_queue_name, updated_message)
85 changes: 85 additions & 0 deletions lib/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import pdb
import os
from opentelemetry import metrics
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader, ConsoleMetricExporter
from opentelemetry.metrics import NoOpMeterProvider
import logging

HONEYCOMB_HOST = os.getenv('HONEYCOMB_API_ENDPOINT', 'https://api.honeycomb.io')
ENV = os.getenv("DEPLOY_ENV", "development")

class OpenTelemetryExporter:
"""
Provides a basic implementation of OpenTelemetry metrics configured to provide
simple counters to services so they can log metrics to a service like Honeycomb.
"""

def __init__(self, service_name: str, local_debug=False) -> None:

self.service_name = service_name
resource = Resource(
attributes={
SERVICE_NAME: self.service_name,
"env.label": ENV,
}
)

if local_debug:
# write metrics to console instead of sending them
reader = PeriodicExportingMetricReader(
ConsoleMetricExporter(),
export_interval_millis=int(os.getenv("METRICS_REPORTING_INTERVAL", "60000")),
)
meter_provider = MeterProvider(resource=resource, metric_readers=[reader])
else:
honeycomb_api_key = os.getenv("HONEYCOMB_API_KEY")
honeycomb_dataset = os.getenv("HONEYCOMB_DATASET", "presto")
if not honeycomb_api_key:
logging.warning("Metrics telemetry is not enabled because no HONEYCOMB_API_KEY found, running in no-op mode")
meter_provider = NoOpMeterProvider()
else:
logging.debug(f"Metrics telemetry will be sent to {HONEYCOMB_HOST}")
reader = PeriodicExportingMetricReader(
OTLPMetricExporter(
endpoint=f"{HONEYCOMB_HOST}/v1/metrics",
headers={
"X-Honeycomb-Team": honeycomb_api_key,
"X-Honeycomb-Dataset": honeycomb_dataset,
},
),
export_interval_millis=int(os.getenv("METRICS_REPORTING_INTERVAL", "60000")),
)
meter_provider = MeterProvider(resource=resource, metric_readers=[reader])
metrics.set_meter_provider(meter_provider)
self.meter = metrics.get_meter(service_name)
self.execution_time_gauge = self.meter.create_gauge(
name="execution_time",
unit="s",
description="Time taken for function execution"
)
self.successful_message_response = self.meter.create_counter(
name="successful_message_response",
unit="s",
description="Successful Message Response"
)
self.timeout_message_response = self.meter.create_counter(
name="timeout_message_response",
unit="s",
description="Timed Out Message Response"
)
self.error_message_response = self.meter.create_counter(
name="error_message_response",
unit="s",
description="Errored Message Response"
)

def log_execution_time(self, func_name: str, execution_time: float):
env_name = os.getenv("DEPLOY_ENV", "development")
self.execution_time_gauge.set(execution_time, {"function_name": func_name, "env": env_name})

def log_execution_status(self, func_name: str, function_name: str):
env_name = os.getenv("DEPLOY_ENV", "development")
getattr(self, function_name).add(1, {"function_name": func_name, "env": env_name})
5 changes: 4 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@ langcodes==3.3.0
requests==2.31.0
pytest==7.4.0
sentry-sdk==1.30.0
yake==0.4.8
yake==0.4.8
opentelemetry-api==1.24.0
opentelemetry-exporter-otlp-proto-http==1.24.0
opentelemetry-sdk==1.24.0
68 changes: 43 additions & 25 deletions test/lib/queue/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,33 @@
import unittest
from unittest.mock import MagicMock, patch
import numpy as np

import time
from typing import Union, List
from lib.model.generic_transformer import GenericTransformerModel
from lib.queue.queue import Queue
from lib.queue.worker import QueueWorker
from lib import schemas
from test.lib.queue.fake_sqs_message import FakeSQSMessage
from concurrent.futures import TimeoutError
class TestModelTimeout:
def __init__(self):
self.model_name = "timeout.TestModelTimeout"

def respond(self, messages: Union[List[schemas.Message], schemas.Message]) -> List[schemas.Message]:
raise TimeoutError

class TestModelNoTimeout:
def __init__(self):
self.model_name = "timeout.TestModelNoTimeout"

def respond(self, messages: Union[List[schemas.Message], schemas.Message]) -> List[schemas.Message]:
return ["response"]

class TestQueueWorker(unittest.TestCase):
@patch('lib.queue.queue.boto3.resource')
@patch('lib.helpers.get_environment_setting', return_value='us-west-1')
def setUp(self, mock_get_env_setting, mock_boto_resource):#, mock_restrict_queues_by_suffix):
@patch('lib.telemetry.OpenTelemetryExporter.log_execution_time')
def setUp(self, mock_log_execution_time, mock_get_env_setting, mock_boto_resource):
self.model = GenericTransformerModel(None)
self.model.model_name = "generic"
self.mock_model = MagicMock()
Expand All @@ -36,7 +51,7 @@ def setUp(self, mock_get_env_setting, mock_boto_resource):#, mock_restrict_queue
self.mock_sqs_resource.queues.filter.return_value = [self.mock_input_queue, self.mock_output_queue, self.mock_dlq_queue]
mock_boto_resource.return_value = self.mock_sqs_resource

# Initialize the SQSQueue instance
# Initialize the QueueWorker instance
self.queue = QueueWorker(self.queue_name_input, self.queue_name_output, self.queue_name_dlq)

def test_get_output_queue_name(self):
Expand All @@ -45,8 +60,31 @@ def test_get_output_queue_name(self):
def test_get_dead_letter_queue_name(self):
self.assertEqual(self.queue.get_dead_letter_queue_name().replace(".fifo", ""), (self.queue.get_input_queue_name()+'_dlq').replace(".fifo", ""))

@patch('lib.queue.worker.QueueWorker.log_and_handle_error')
@patch('lib.queue.worker.time.time', side_effect=[0, 1])
def test_execute_with_timeout_failure(self, mock_time, mock_log_error):
responses, success = self.queue.execute_with_timeout(TestModelTimeout(), [], timeout_seconds=1)
self.assertEqual(responses, [])
self.assertFalse(success)
mock_log_error.assert_called_once_with("Model respond timeout exceeded.")

@patch('lib.queue.worker.QueueWorker.log_and_handle_error')
@patch('lib.queue.worker.time.time', side_effect=[0, 0.5])
@patch('lib.queue.worker.QueueWorker.log_execution_time')
@patch('lib.queue.worker.QueueWorker.log_execution_status')
def test_execute_with_timeout_success(self, mock_log_execution_status, mock_log_execution_time, mock_time, mock_log_error):
responses, success = self.queue.execute_with_timeout(TestModelNoTimeout(), [], timeout_seconds=1)
self.assertEqual(responses in [[], ["response"]], True)
self.assertTrue(success)
mock_log_error.assert_not_called()
mock_log_execution_time.assert_called_once_with('timeout.TestModelNoTimeout', 0.5)
mock_log_execution_status.assert_called_once_with('timeout.TestModelNoTimeout', 'successful_message_response')

def test_process(self):
self.queue.receive_messages = MagicMock(return_value=[(FakeSQSMessage(receipt_handle="blah", body=json.dumps({"body": {"id": 1, "callback_url": "http://example.com", "text": "This is a test"}})), self.mock_input_queue)])
self.queue.receive_messages = MagicMock(return_value=[(FakeSQSMessage(receipt_handle="blah", body=json.dumps({
"body": {"id": 1, "callback_url": "http://example.com", "text": "This is a test"},
"model_name": "generic"
})), self.mock_input_queue)])
self.queue.input_queue = MagicMock(return_value=None)
self.model.model = self.mock_model
self.model.model.encode = MagicMock(return_value=np.array([[4, 5, 6], [7, 8, 9]]))
Expand Down Expand Up @@ -175,29 +213,9 @@ def test_extract_messages(self):
self.assertEqual(extracted_messages[1].body.text, "Test message 2")
self.assertEqual(extracted_messages[0].model_name, "generic")

@patch('lib.queue.worker.QueueWorker.log_and_handle_error')
def test_execute_with_timeout_success(self, mock_log_error):
def test_func(args):
return ["response"]

responses, success = QueueWorker.execute_with_timeout(test_func, [], timeout_seconds=1)
self.assertEqual(responses, ["response"])
self.assertTrue(success)
mock_log_error.assert_not_called()

@patch('lib.queue.worker.QueueWorker.log_and_handle_error')
def test_execute_with_timeout_failure(self, mock_log_error):
def test_func(args):
raise TimeoutError

responses, success = QueueWorker.execute_with_timeout(test_func, [], timeout_seconds=1)
self.assertEqual(responses, [])
self.assertFalse(success)
mock_log_error.assert_called_once_with("Model respond timeout exceeded.")

@patch('lib.queue.worker.logger.error')
def test_log_and_handle_error(self, mock_logger_error):
QueueWorker.log_and_handle_error("Test error")
self.queue.log_and_handle_error("Test error")
mock_logger_error.assert_called_once_with("Test error")

@patch('lib.queue.worker.QueueWorker.delete_messages')
Expand Down
Loading

0 comments on commit fdbaff9

Please sign in to comment.