Skip to content

Commit

Permalink
Add benchmarking (airtai#206)
Browse files Browse the repository at this point in the history
* Add benchmarking code

* Run isort and black

* Add guide to benchmark fastkafka app

* Rerun notebook to update test results

* Update decorator signature

* Fix mypy issues

* Update guide for benchmarking

* Cleanup log format and update benchmark guide
  • Loading branch information
kumaranvpl committed Apr 7, 2023
1 parent f99c148 commit 3269c58
Show file tree
Hide file tree
Showing 19 changed files with 1,285 additions and 194 deletions.
97 changes: 85 additions & 12 deletions fastkafka/_application/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
import types
from asyncio import iscoroutinefunction # do not use the version from inspect
from collections import namedtuple
from contextlib import AbstractAsyncContextManager
from datetime import datetime, timedelta
from functools import wraps
from inspect import signature
from pathlib import Path
from typing import *
from unittest.mock import AsyncMock, MagicMock
from contextlib import AbstractAsyncContextManager

import anyio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
Expand All @@ -40,6 +41,7 @@
KafkaServiceInfo,
export_async_spec,
)
from .._components.benchmarking import _benchmark
from .._components.logger import get_logger
from .._components.meta import delegates, export, filter_using_signature, patch
from .._components.producer_decorator import ProduceCallable, producer_decorator
Expand Down Expand Up @@ -133,6 +135,12 @@ def _get_contact_info(
return ContactInfo(name=name, url=url, email=email)

# %% ../../nbs/015_FastKafka.ipynb 18
I = TypeVar("I", bound=BaseModel)
O = TypeVar("O", BaseModel, Awaitable[BaseModel])

F = TypeVar("F", bound=Callable)

# %% ../../nbs/015_FastKafka.ipynb 19
@export("fastkafka")
class FastKafka:
@delegates(_get_kafka_config)
Expand Down Expand Up @@ -219,6 +227,8 @@ def __init__(
Union[AIOKafkaProducer, AIOKafkaProducerManager]
] = []

self.benchmark_results: Dict[str, Dict[str, Any]] = {}

# background tasks
self._scheduled_bg_tasks: List[Callable[..., Coroutine[Any, Any, Any]]] = []
self._bg_task_group_generator: Optional[anyio.abc.TaskGroup] = None
Expand Down Expand Up @@ -304,6 +314,14 @@ def produces( # type: ignore
) -> ProduceCallable:
raise NotImplementedError

def benchmark(
self,
interval: Union[int, timedelta] = 1,
*,
sliding_window_size: Optional[int] = None,
) -> Callable[[F], F]:
raise NotImplementedError

def run_in_background(
self,
) -> Callable[[], Any]:
Expand Down Expand Up @@ -339,7 +357,7 @@ async def _shutdown_producers(self) -> None:
async def _shutdown_bg_tasks(self) -> None:
raise NotImplementedError

# %% ../../nbs/015_FastKafka.ipynb 24
# %% ../../nbs/015_FastKafka.ipynb 25
@patch
@delegates(AIOKafkaConsumer)
def consumes(
Expand Down Expand Up @@ -388,7 +406,7 @@ def _decorator(

return _decorator

# %% ../../nbs/015_FastKafka.ipynb 26
# %% ../../nbs/015_FastKafka.ipynb 27
@patch
@delegates(AIOKafkaProducer)
def produces(
Expand Down Expand Up @@ -435,14 +453,14 @@ def _decorator(

return _decorator

# %% ../../nbs/015_FastKafka.ipynb 28
# %% ../../nbs/015_FastKafka.ipynb 29
@patch
def get_topics(self: FastKafka) -> Iterable[str]:
produce_topics = set(self._producers_store.keys())
consume_topics = set(self._consumers_store.keys())
return consume_topics.union(produce_topics)

# %% ../../nbs/015_FastKafka.ipynb 30
# %% ../../nbs/015_FastKafka.ipynb 31
@patch
def run_in_background(
self: FastKafka,
Expand Down Expand Up @@ -479,7 +497,7 @@ def _decorator(

return _decorator

# %% ../../nbs/015_FastKafka.ipynb 34
# %% ../../nbs/015_FastKafka.ipynb 35
@patch
def _populate_consumers(
self: FastKafka,
Expand Down Expand Up @@ -509,7 +527,7 @@ async def _shutdown_consumers(
if self._kafka_consumer_tasks:
await asyncio.wait(self._kafka_consumer_tasks)

# %% ../../nbs/015_FastKafka.ipynb 36
# %% ../../nbs/015_FastKafka.ipynb 37
# TODO: Add passing of vars
async def _create_producer( # type: ignore
*,
Expand Down Expand Up @@ -606,7 +624,7 @@ async def _shutdown_producers(self: FastKafka) -> None:
}
)

# %% ../../nbs/015_FastKafka.ipynb 38
# %% ../../nbs/015_FastKafka.ipynb 39
@patch
async def _populate_bg_tasks(
self: FastKafka,
Expand Down Expand Up @@ -642,7 +660,7 @@ async def _shutdown_bg_tasks(
f"_shutdown_bg_tasks() : Execution finished for background task '{task.get_name()}'"
)

# %% ../../nbs/015_FastKafka.ipynb 40
# %% ../../nbs/015_FastKafka.ipynb 41
@patch
async def _start(self: FastKafka) -> None:
def is_shutting_down_f(self: FastKafka = self) -> bool:
Expand All @@ -667,7 +685,7 @@ async def _stop(self: FastKafka) -> None:
self._is_shutting_down = False
self._is_started = False

# %% ../../nbs/015_FastKafka.ipynb 46
# %% ../../nbs/015_FastKafka.ipynb 47
@patch
def create_docs(self: FastKafka) -> None:
export_async_spec(
Expand All @@ -682,7 +700,7 @@ def create_docs(self: FastKafka) -> None:
asyncapi_path=self._asyncapi_path,
)

# %% ../../nbs/015_FastKafka.ipynb 50
# %% ../../nbs/015_FastKafka.ipynb 51
class AwaitedMock:
@staticmethod
def _await_for(f: Callable[..., Any]) -> Callable[..., Any]:
Expand Down Expand Up @@ -718,7 +736,7 @@ def __init__(self, o: Any):
if inspect.ismethod(f):
setattr(self, name, self._await_for(f))

# %% ../../nbs/015_FastKafka.ipynb 51
# %% ../../nbs/015_FastKafka.ipynb 52
@patch
def create_mocks(self: FastKafka) -> None:
"""Creates self.mocks as a named tuple mapping a new function obtained by calling the original functions and a mock"""
Expand Down Expand Up @@ -784,3 +802,58 @@ def sync_inner(
for name, (f, producer, kwargs) in self._producers_store.items()
}
)

# %% ../../nbs/015_FastKafka.ipynb 58
@patch
def benchmark(
self: FastKafka,
interval: Union[int, timedelta] = 1,
*,
sliding_window_size: Optional[int] = None,
) -> Callable[[Callable[[I], Optional[O]]], Callable[[I], Optional[O]]]:
"""Decorator to benchmark produces/consumes functions
Args:
interval: Period to use to calculate throughput. If value is of type int,
then it will be used as seconds. If value is of type timedelta,
then it will be used as it is. default: 1 - one second
sliding_window_size: The size of the sliding window to use to calculate
average throughput. default: None - By default average throughput is
not calculated
"""

def _decorator(func: Callable[[I], Optional[O]]) -> Callable[[I], Optional[O]]:
func_name = f"{func.__module__}.{func.__qualname__}"

@wraps(func)
def wrapper(
*args: I,
**kwargs: I,
) -> Optional[O]:
_benchmark(
interval=interval,
sliding_window_size=sliding_window_size,
func_name=func_name,
benchmark_results=self.benchmark_results,
)
return func(*args, **kwargs)

@wraps(func)
async def async_wrapper(
*args: I,
**kwargs: I,
) -> Optional[O]:
_benchmark(
interval=interval,
sliding_window_size=sliding_window_size,
func_name=func_name,
benchmark_results=self.benchmark_results,
)
return await func(*args, **kwargs) # type: ignore

if inspect.iscoroutinefunction(func):
return async_wrapper # type: ignore
else:
return wrapper

return _decorator
2 changes: 1 addition & 1 deletion fastkafka/_application/tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from .app import FastKafka
from .._components.meta import delegates, patch
from .._testing.apache_kafka_broker import ApacheKafkaBroker
from .._testing.local_redpanda_broker import LocalRedpandaBroker
from .._testing.in_memory_broker import InMemoryBroker
from .._testing.local_redpanda_broker import LocalRedpandaBroker

# %% ../../nbs/016_Tester.ipynb 6
class Tester(FastKafka):
Expand Down
77 changes: 77 additions & 0 deletions fastkafka/_components/benchmarking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# AUTOGENERATED! DO NOT EDIT! File to edit: ../../nbs/017_Benchmarking.ipynb.

# %% auto 0
__all__ = ['logger']

# %% ../../nbs/017_Benchmarking.ipynb 1
from collections import deque
from datetime import datetime, timedelta
from functools import wraps
from statistics import mean, stdev
from typing import *

from .logger import get_logger

# %% ../../nbs/017_Benchmarking.ipynb 4
logger = get_logger("fastkafka.benchmark")

# %% ../../nbs/017_Benchmarking.ipynb 5
def _benchmark(
interval: Union[int, timedelta] = 1,
*,
sliding_window_size: Optional[int] = None,
func_name: str,
benchmark_results: Dict[str, Dict[str, Any]],
) -> None:
"""Used to record the benchmark results(throughput, average throughput, standard deviation) of a given function
Args:
interval: the time interval after which the benchmark results are logged.
sliding_window_size: the maximum number of benchmark results to use to calculate average throughput and standard deviation.
func_name: the name of the function to be benchmarked.
benchmark_results: a dictionary containing the benchmark results of all functions.
"""
if isinstance(interval, int):
interval = timedelta(seconds=interval)
if func_name not in benchmark_results:
benchmark_results[func_name] = {
"count": 0,
"last_count": 0,
"start": None,
"last_start": None,
"history": [],
}
if sliding_window_size is not None:
benchmark_results[func_name]["history"] = deque(maxlen=sliding_window_size)

benchmark_results[func_name]["count"] += 1

if benchmark_results[func_name]["count"] == 1:
benchmark_results[func_name]["start"] = benchmark_results[func_name][
"last_start"
] = datetime.utcnow()

diff = datetime.utcnow() - benchmark_results[func_name]["last_start"]
if diff >= interval:
throughput = (
benchmark_results[func_name]["count"]
- benchmark_results[func_name]["last_count"]
) / (diff / timedelta(seconds=1))
log_msg = f"Throughput = {throughput:5,.0f}"

if sliding_window_size is not None:
benchmark_results[func_name]["history"].append(throughput)

log_msg += f", Avg throughput = {mean(benchmark_results[func_name]['history']):5,.0f}"
# if len(benchmark_results[func_name]["history"]) > 1:
# log_msg += f", Standard deviation of throughput is {stdev(benchmark_results[func_name]['history']):5,.0f}"
log_msg = (
log_msg
+ f" - For {func_name}(interval={interval.seconds},{sliding_window_size=})"
)
logger.info(log_msg)

benchmark_results[func_name]["last_start"] = datetime.utcnow()
benchmark_results[func_name]["last_count"] = benchmark_results[func_name][
"count"
]
4 changes: 4 additions & 0 deletions fastkafka/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
'fastkafka/_application/app.py'),
'fastkafka._application.app.FastKafka._stop': ( 'fastkafka.html#fastkafka._stop',
'fastkafka/_application/app.py'),
'fastkafka._application.app.FastKafka.benchmark': ( 'fastkafka.html#fastkafka.benchmark',
'fastkafka/_application/app.py'),
'fastkafka._application.app.FastKafka.consumes': ( 'fastkafka.html#fastkafka.consumes',
'fastkafka/_application/app.py'),
'fastkafka._application.app.FastKafka.create_docs': ( 'fastkafka.html#fastkafka.create_docs',
Expand Down Expand Up @@ -177,6 +179,8 @@
'fastkafka/_components/asyncapi.py'),
'fastkafka._components.asyncapi.yaml_file_cmp': ( 'asyncapi.html#yaml_file_cmp',
'fastkafka/_components/asyncapi.py')},
'fastkafka._components.benchmarking': { 'fastkafka._components.benchmarking._benchmark': ( 'benchmarking.html#_benchmark',
'fastkafka/_components/benchmarking.py')},
'fastkafka._components.docs_dependencies': { 'fastkafka._components.docs_dependencies._check_npm': ( 'docs_dependencies.html#_check_npm',
'fastkafka/_components/docs_dependencies.py'),
'fastkafka._components.docs_dependencies._check_npm_with_local': ( 'docs_dependencies.html#_check_npm_with_local',
Expand Down
25 changes: 12 additions & 13 deletions fastkafka/_testing/in_memory_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,29 @@
__all__ = ['logger', 'create_consumer_record', 'ConsumerMetadata', 'InMemoryBroker', 'InMemoryConsumer', 'InMemoryProducer']

# %% ../../nbs/001_InMemoryBroker.ipynb 1
import asyncio
import copy
import inspect
import uuid
from collections import namedtuple
from dataclasses import dataclass
from contextlib import contextmanager
import inspect
import asyncio
import copy

from dataclasses import dataclass
from typing import *
import fastkafka._application.app
import fastkafka._components.aiokafka_consumer_loop
import fastkafka._components.aiokafka_producer_manager

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from aiokafka.structs import ConsumerRecord, TopicPartition, RecordMetadata
from aiokafka.structs import ConsumerRecord, RecordMetadata, TopicPartition

import fastkafka._application.app
import fastkafka._components.aiokafka_consumer_loop
import fastkafka._components.aiokafka_producer_manager
from .._components.logger import get_logger
from fastkafka._components.meta import (
_get_default_kwargs_from_sig,
classcontextmanager,
copy_func,
patch,
delegates,
classcontextmanager,
_get_default_kwargs_from_sig,
patch,
)
from .._components.logger import get_logger

# %% ../../nbs/001_InMemoryBroker.ipynb 3
logger = get_logger(__name__)
Expand Down
Loading

0 comments on commit 3269c58

Please sign in to comment.