Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Significant speedup of Kafka producer #236

Merged
merged 8 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP
  • Loading branch information
sternakt committed Apr 13, 2023
commit 0a5507213f2f8528f85edcd9b2bea06cae401d4d
6 changes: 0 additions & 6 deletions fastkafka/_application/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,12 +546,6 @@ async def _create_producer( # type: ignore
f"_create_producer() : created producer using the config: '{sanitize_kafka_config(**config)}'"
)

if not iscoroutinefunction(callback):
producer = AIOKafkaProducerManager(
producer,
**filter_using_signature(AIOKafkaProducerManager, **override_config),
)

await producer.start()

producers_list.append(producer)
Expand Down
3 changes: 3 additions & 0 deletions fastkafka/_components/aiokafka_consumer_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ async def process_message_callback(
logger.warning(
f"_aiokafka_consumer_loop(): Unexpected exception '{e}' caught and ignored for messages: {msgs}"
)
logger.info(
f"_aiokafka_consumer_loop(): Consumer loop shutting down, waiting for send_stream to drain..."
)

# %% ../../nbs/011_ConsumerLoop.ipynb 24
def sanitize_kafka_config(**kwargs: Any) -> Dict[str, Any]:
Expand Down
14 changes: 9 additions & 5 deletions fastkafka/_components/aiokafka_producer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from aiokafka import AIOKafkaProducer
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream

from .logger import get_logger
from .logger import get_logger, cached_log

# %% ../../nbs/012_ProducerManager.ipynb 4
logger = get_logger(__name__)
Expand Down Expand Up @@ -83,19 +83,23 @@ async def stop(self) -> None:
await self.producer.stop()
logger.info("AIOKafkaProducerManager.stop(): Finished")

async def send_with_throttle(self, data, stream):
async def _send_with_throttle(self, *, topic, msg, key, stream):
while not self.shutting_down:
try:
stream.send_nowait(data)
break
except anyio.WouldBlock:
logger.log_and_timeout(
"Send stream full and blocking, throttling...", level=logging.INFO
cached_log(
logger,
f"Send stream full and blocking for {topic=}, throttling...",
level=logging.WARNING,
)
await asyncio.sleep(0.01)

def send(self, topic: str, msg: bytes, key: Optional[bytes] = None) -> None:
if not self.shutting_down:
asyncio.create_task(
self.send_with_throttle((topic, msg, key), self.send_stream)
self._send_with_throttle(
topic=topic, msg=msg, key=key, stream=self.send_stream
)
)
35 changes: 13 additions & 22 deletions fastkafka/_components/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# %% auto 0
__all__ = ['should_supress_timestamps', 'logger_spaces_added', 'supress_timestamps', 'get_default_logger_configuration',
'get_logger', 'set_level', 'true_after']
'get_logger', 'set_level', 'true_after', 'cached_log']

# %% ../../nbs/Logger.ipynb 2
import logging
Expand Down Expand Up @@ -135,32 +135,23 @@ def set_level(level: int) -> None:
logger.setLevel(level)

# %% ../../nbs/Logger.ipynb 18
def true_after(seconds: float) -> Callable[[], bool]:
def true_after(seconds: Union[int, float]) -> Callable[[], bool]:
"""Function returning True after a given number of seconds"""
t = datetime.now()

def _true_after(seconds: float = seconds, t: datetime = t) -> bool:
def _true_after(seconds: Union[int, float] = seconds, t: datetime = t) -> bool:
return (datetime.now() - t) > timedelta(seconds=seconds)

return _true_after

# %% ../../nbs/Logger.ipynb 19
def cached_log(
self: logging.Logger, msg: str, level: int, timeout: Union[int, float] = 5
):
if not hasattr(self, "_timeouted_msgs"):
self._timeouted_msgs = {}

@patch
def log_and_timeout(self: logging.Logger, msg: str, level: int, timeout: int = 5):
try:
self.timeouted_msgs
pass
except AttributeError:
self.timeouted_msgs = {}

if msg not in self.timeouted_msgs or self.timeouted_msgs[msg]():
self.timeouted_msgs[msg] = true_after(timeout)

if level == logging.DEBUG:
self.debug(msg)
if level == logging.INFO:
self.info(msg)
if level == logging.WARNING:
self.warning(msg)
if level == logging.ERROR:
self.error(msg)
if msg not in self._timeouted_msgs or self._timeouted_msgs[msg]():
self._timeouted_msgs[msg] = true_after(timeout)

self.log(level, msg)
16 changes: 14 additions & 2 deletions fastkafka/_components/producer_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# %% ../../nbs/013_ProducerDecorator.ipynb 1
import functools
import json
import asyncio
from asyncio import iscoroutinefunction # do not use the version from inspect
from collections import namedtuple
from dataclasses import dataclass
Expand Down Expand Up @@ -60,6 +61,11 @@ def producer_decorator(
) -> ProduceCallable:
"""todo: write documentation"""

loop = asyncio.get_event_loop()

def release_callback(fut):
pass

@functools.wraps(func)
async def _produce_async(
*args: List[Any],
Expand All @@ -73,20 +79,26 @@ async def _produce_async(
fut = await producer.send(
topic, _to_json_utf8(wrapped_val.message), key=wrapped_val.key
)
msg = await fut
fut.add_done_callback(release_callback)
return return_val

@functools.wraps(func)
def _produce_sync(
*args: List[Any],
producer_store: Dict[str, Any] = producer_store,
f: Callable[..., ProduceReturnTypes] = func, # type: ignore
loop=loop,
**kwargs: Any
) -> ProduceReturnTypes:
return_val = f(*args, **kwargs)
wrapped_val = _wrap_in_event(return_val)
_, producer, _ = producer_store[topic]
producer.send(topic, _to_json_utf8(wrapped_val.message), key=wrapped_val.key)
fut = loop.run_until_complete(
producer.send(
topic, _to_json_utf8(wrapped_val.message), key=wrapped_val.key
)
)
fut.add_done_callback(release_callback)
return return_val

return _produce_async if iscoroutinefunction(func) else _produce_sync
10 changes: 5 additions & 5 deletions fastkafka/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@
'fastkafka/_components/aiokafka_producer_manager.py'),
'fastkafka._components.aiokafka_producer_manager.AIOKafkaProducerManager.__init__': ( 'producermanager.html#aiokafkaproducermanager.__init__',
'fastkafka/_components/aiokafka_producer_manager.py'),
'fastkafka._components.aiokafka_producer_manager.AIOKafkaProducerManager._send_with_throttle': ( 'producermanager.html#aiokafkaproducermanager._send_with_throttle',
'fastkafka/_components/aiokafka_producer_manager.py'),
'fastkafka._components.aiokafka_producer_manager.AIOKafkaProducerManager.send': ( 'producermanager.html#aiokafkaproducermanager.send',
'fastkafka/_components/aiokafka_producer_manager.py'),
'fastkafka._components.aiokafka_producer_manager.AIOKafkaProducerManager.send_with_throttle': ( 'producermanager.html#aiokafkaproducermanager.send_with_throttle',
'fastkafka/_components/aiokafka_producer_manager.py'),
'fastkafka._components.aiokafka_producer_manager.AIOKafkaProducerManager.start': ( 'producermanager.html#aiokafkaproducermanager.start',
'fastkafka/_components/aiokafka_producer_manager.py'),
'fastkafka._components.aiokafka_producer_manager.AIOKafkaProducerManager.stop': ( 'producermanager.html#aiokafkaproducermanager.stop',
Expand Down Expand Up @@ -199,12 +199,12 @@
'fastkafka/_components/helpers.py'),
'fastkafka._components.helpers.in_notebook': ( 'internal_helpers.html#in_notebook',
'fastkafka/_components/helpers.py')},
'fastkafka._components.logger': { 'fastkafka._components.logger.get_default_logger_configuration': ( 'logger.html#get_default_logger_configuration',
'fastkafka._components.logger': { 'fastkafka._components.logger.cached_log': ( 'logger.html#cached_log',
'fastkafka/_components/logger.py'),
'fastkafka._components.logger.get_default_logger_configuration': ( 'logger.html#get_default_logger_configuration',
'fastkafka/_components/logger.py'),
'fastkafka._components.logger.get_logger': ( 'logger.html#get_logger',
'fastkafka/_components/logger.py'),
'fastkafka._components.logger.logging.Logger.log_and_timeout': ( 'logger.html#logging.logger.log_and_timeout',
'fastkafka/_components/logger.py'),
'fastkafka._components.logger.set_level': ( 'logger.html#set_level',
'fastkafka/_components/logger.py'),
'fastkafka._components.logger.supress_timestamps': ( 'logger.html#supress_timestamps',
Expand Down
19 changes: 17 additions & 2 deletions nbs/011_ConsumerLoop.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,10 @@
" except Exception as e:\n",
" logger.warning(\n",
" f\"_aiokafka_consumer_loop(): Unexpected exception '{e}' caught and ignored for messages: {msgs}\"\n",
" )"
" )\n",
" logger.info(\n",
" f\"_aiokafka_consumer_loop(): Consumer loop shutting down, waiting for send_stream to drain...\"\n",
" )"
]
},
{
Expand Down Expand Up @@ -1000,9 +1003,21 @@
],
"metadata": {
"kernelspec": {
"display_name": "python3",
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.6"
}
},
"nbformat": 4,
Expand Down
Loading