Skip to content

Commit

Permalink
Fixed topics with dots causing failure of tester instantiation (airta…
Browse files Browse the repository at this point in the history
…i#306)

* Fix tester crashing when mirroring topics with dots

* Polish

* r0.6.1 init

* version upgraded
  • Loading branch information
sternakt committed May 19, 2023
1 parent 4b47d6a commit e60eec0
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Deploy nbdev-mkdocs generated documentation to GitHub Pages

on:
push:
branches: [ "main", "master" ]
branches: [ "main", "master"]
workflow_dispatch:
jobs:
deploy:
Expand Down
2 changes: 1 addition & 1 deletion docusaurus/sidebars.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion fastkafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.6.0"
__version__ = "0.7.0dev"
# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/010_Application_export.ipynb.

# %% auto 0
Expand Down
8 changes: 4 additions & 4 deletions fastkafka/_application/tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async def skeleton_func(msg: BaseModel) -> None:
sig = inspect.signature(skeleton_func)

# adjust name
mirror_func.__name__ = "on_" + topic
mirror_func.__name__ = "on_" + topic.replace(".", "_")

# adjust arg and return val
sig = sig.replace(
Expand Down Expand Up @@ -189,7 +189,7 @@ async def skeleton_func(msg: BaseModel) -> BaseModel:
sig = inspect.signature(skeleton_func)

# adjust name
mirror_func.__name__ = "to_" + topic
mirror_func.__name__ = "to_" + topic.replace(".", "_")

# adjust arg and return val
sig = sig.replace(
Expand All @@ -205,9 +205,9 @@ def create_mirrors(self: Tester) -> None:
for app in self.apps:
for topic, (consumer_f, _, _, _) in app._consumers_store.items():
mirror_f = mirror_consumer(topic, consumer_f)
mirror_f = self.produces()(mirror_f) # type: ignore
mirror_f = self.produces(topic=topic)(mirror_f) # type: ignore
setattr(self, mirror_f.__name__, mirror_f)
for topic, (producer_f, _, _) in app._producers_store.items():
mirror_f = mirror_producer(topic, producer_f)
mirror_f = self.consumes()(mirror_f) # type: ignore
mirror_f = self.consumes(topic=topic)(mirror_f) # type: ignore
setattr(self, mirror_f.__name__, mirror_f)
126 changes: 105 additions & 21 deletions nbs/016_Tester.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,9 @@
"[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'preprocessed_signals'})\n",
"[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'preprocessed_signals'}\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n",
"[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'preprocessed_signals': 1}. \n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:9788', 'auto_offset_reset': 'latest', 'max_poll_records': 100}\n",
"[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'preprocessed_signals': 1}. \n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n",
"[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'predictions'})\n",
"[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'predictions'}\n",
Expand All @@ -451,10 +451,10 @@
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n",
"[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 249907...\n",
"[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 249907 terminated.\n",
"[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 249547...\n",
"[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 249547 terminated.\n",
"[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 371147...\n",
"[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 371147 terminated.\n",
"[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 370787...\n",
"[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 370787 terminated.\n",
"ok\n"
]
}
Expand Down Expand Up @@ -507,7 +507,7 @@
" sig = inspect.signature(skeleton_func)\n",
"\n",
" # adjust name\n",
" mirror_func.__name__ = \"on_\" + topic\n",
" mirror_func.__name__ = \"on_\" + topic.replace(\".\", \"_\")\n",
"\n",
" # adjust arg and return val\n",
" sig = sig.replace(\n",
Expand Down Expand Up @@ -574,7 +574,7 @@
"\n",
"for topic, (producer_f, _, _) in app._producers_store.items():\n",
" mirror = mirror_producer(topic, producer_f)\n",
" assert mirror.__name__ == \"on_\" + topic\n",
" assert mirror.__name__ == \"on_\" + topic.replace(\".\", \"_\")\n",
" assert inspect.signature(mirror).parameters[\"msg\"].annotation.__name__ == \"TestMsg\""
]
},
Expand All @@ -592,18 +592,20 @@
" msg_type = inspect.signature(consumer_f).parameters[\"msg\"]\n",
"\n",
" msg_type_unwrapped = unwrap_list_type(msg_type)\n",
" \n",
"\n",
" async def skeleton_func(msg: BaseModel) -> BaseModel:\n",
" return msg\n",
"\n",
" mirror_func = skeleton_func\n",
" sig = inspect.signature(skeleton_func)\n",
"\n",
" # adjust name\n",
" mirror_func.__name__ = \"to_\" + topic\n",
" mirror_func.__name__ = \"to_\" + topic.replace(\".\", \"_\")\n",
"\n",
" # adjust arg and return val\n",
" sig = sig.replace(parameters=[msg_type], return_annotation=msg_type_unwrapped.annotation)\n",
" sig = sig.replace(\n",
" parameters=[msg_type], return_annotation=msg_type_unwrapped.annotation\n",
" )\n",
"\n",
" mirror_func.__signature__ = sig # type: ignore\n",
" return mirror_func"
Expand Down Expand Up @@ -647,14 +649,88 @@
" for app in self.apps:\n",
" for topic, (consumer_f, _, _, _) in app._consumers_store.items():\n",
" mirror_f = mirror_consumer(topic, consumer_f)\n",
" mirror_f = self.produces()(mirror_f) # type: ignore\n",
" mirror_f = self.produces(topic=topic)(mirror_f) # type: ignore\n",
" setattr(self, mirror_f.__name__, mirror_f)\n",
" for topic, (producer_f, _, _) in app._producers_store.items():\n",
" mirror_f = mirror_producer(topic, producer_f)\n",
" mirror_f = self.consumes()(mirror_f) # type: ignore\n",
" mirror_f = self.consumes(topic=topic)(mirror_f) # type: ignore\n",
" setattr(self, mirror_f.__name__, mirror_f)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7ac99fc1",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._start() called\n",
"[INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._patch_consumers_and_producers(): Patching consumers and producers!\n",
"[INFO] fastkafka._testing.in_memory_broker: InMemoryBroker starting\n",
"[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'\n",
"[INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()\n",
"[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'\n",
"[INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': 'localhost:9092', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n",
"[INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n",
"[INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called\n",
"[INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['this.should_work.now']\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': 'localhost:9092', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n",
"[INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n",
"[INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called\n",
"[INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['some.dots.my_topic']\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n",
"Sending prediction: msg='prediction'\n",
"[INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n",
"[INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called\n",
"[INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n",
"[INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called\n",
"[INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._stop() called\n",
"[INFO] fastkafka._testing.in_memory_broker: InMemoryBroker stopping\n",
"ok\n"
]
}
],
"source": [
"# Test mirroring with \".\" in topic names\n",
"\n",
"\n",
"class TestMsg(BaseModel):\n",
" msg: str = Field(...)\n",
"\n",
"\n",
"second_app = FastKafka(kafka_brokers=dict(localhost=dict(url=\"localhost\", port=9092)))\n",
"\n",
"\n",
"@second_app.consumes(topic=\"this.should_work.now\")\n",
"async def on_preprocessed_signals(msg: TestMsg):\n",
" await to_predictions(TestMsg(msg=\"prediction\"))\n",
"\n",
"\n",
"@second_app.produces(topic=\"some.dots.my_topic\")\n",
"async def to_predictions(prediction: TestMsg) -> TestMsg:\n",
" print(f\"Sending prediction: {prediction}\")\n",
" return [prediction]\n",
"\n",
"\n",
"async with Tester(second_app) as tester:\n",
" await tester.to_this_should_work_now(TestMsg(msg=\"signal\"))\n",
" await tester.awaited_mocks.on_some_dots_my_topic.assert_called(timeout=5)\n",
"print(\"ok\")"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -938,11 +1014,11 @@
"[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:9092', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n",
"[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n",
"[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'preprocessed_signals'})\n",
"[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'preprocessed_signals'}\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n",
"[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:9092', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n",
"[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'preprocessed_signals': 1}. \n",
Expand All @@ -958,16 +1034,16 @@
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:9092', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n",
"[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'preprocessed_signals': 1}. \n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n",
"[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'topic1'})\n",
"[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'topic1'}\n",
"[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'predictions'})\n",
"[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'predictions'}\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n",
"[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'topic2'})\n",
"[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'topic2'}\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n",
"[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'predictions'})\n",
"[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'predictions'}\n",
"[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'topic1'})\n",
"[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'topic1'}\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n",
"[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'predictions': 1}. \n",
"[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'topic2': 1}. \n",
Expand All @@ -984,10 +1060,10 @@
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.\n",
"[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n",
"[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 247517...\n",
"[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 247517 terminated.\n",
"[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 247156...\n",
"[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 247156 terminated.\n",
"[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 372674...\n",
"[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 372674 terminated.\n",
"[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 372314...\n",
"[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 372314 terminated.\n",
"ok\n"
]
}
Expand Down Expand Up @@ -1019,6 +1095,14 @@
" await tester.awaited_mocks.on_predictions.assert_called(timeout=5)\n",
"print(\"ok\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e2cdbf5c",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
2 changes: 1 addition & 1 deletion settings.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
### Python library ###
repo = fastkafka
lib_name = %(repo)s
version = 0.6.0
version = 0.7.0dev
min_python = 3.8
license = apache2

Expand Down

0 comments on commit e60eec0

Please sign in to comment.