Skip to content

Commit

Permalink
fix: respect ignored exceptions (airtai#1735)
Browse files Browse the repository at this point in the history
* fix: respect ignored exceptions

* docs: generate API References

---------

Co-authored-by: Lancetnik <Lancetnik@users.noreply.github.com>
Co-authored-by: Davor Runje <davor@airt.ai>
  • Loading branch information
3 people committed Aug 29, 2024
1 parent 71a8b62 commit af63e30
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 20 deletions.
1 change: 1 addition & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ search:
- exception
- [BaseExceptionMiddleware](api/faststream/broker/middlewares/exception/BaseExceptionMiddleware.md)
- [ExceptionMiddleware](api/faststream/broker/middlewares/exception/ExceptionMiddleware.md)
- [ignore_handler](api/faststream/broker/middlewares/exception/ignore_handler.md)
- logging
- [CriticalLogMiddleware](api/faststream/broker/middlewares/logging/CriticalLogMiddleware.md)
- proto
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.middlewares.exception.ignore_handler
76 changes: 58 additions & 18 deletions faststream/broker/middlewares/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
Callable,
ContextManager,
Dict,
List,
NoReturn,
Optional,
Tuple,
Type,
Union,
overload,
Expand All @@ -14,6 +17,7 @@
from typing_extensions import Literal, TypeAlias

from faststream.broker.middlewares.base import BaseMiddleware
from faststream.exceptions import IgnoredException
from faststream.utils import apply_types, context
from faststream.utils.functions import sync_fake_context, to_async

Expand All @@ -29,9 +33,17 @@

CastedGeneralExceptionHandler: TypeAlias = Callable[..., Awaitable[None]]
CastedPublishingExceptionHandler: TypeAlias = Callable[..., Awaitable["Any"]]
CastedHandlers: TypeAlias = Dict[Type[Exception], CastedGeneralExceptionHandler]
CastedPublishingHandlers: TypeAlias = Dict[
Type[Exception], CastedPublishingExceptionHandler
CastedHandlers: TypeAlias = List[
Tuple[
Type[Exception],
CastedGeneralExceptionHandler,
]
]
CastedPublishingHandlers: TypeAlias = List[
Tuple[
Type[Exception],
CastedPublishingExceptionHandler,
]
]


Expand All @@ -57,7 +69,7 @@ async def consume_scope(
except Exception as exc:
exc_type = type(exc)

for handler_type, handler in self._publish_handlers.items():
for handler_type, handler in self._publish_handlers:
if issubclass(exc_type, handler_type):
return await handler(exc)

Expand All @@ -70,7 +82,7 @@ async def after_processed(
exc_tb: Optional["TracebackType"] = None,
) -> Optional[bool]:
if exc_type:
for handler_type, handler in self._handlers.items():
for handler_type, handler in self._handlers:
if issubclass(exc_type, handler_type):
# TODO: remove it after context will be moved to middleware
# In case parser/decoder error occurred
Expand Down Expand Up @@ -98,20 +110,34 @@ class ExceptionMiddleware:

def __init__(
self,
handlers: Optional[Dict[Type[Exception], GeneralExceptionHandler]] = None,
handlers: Optional[
Dict[
Type[Exception],
GeneralExceptionHandler,
]
] = None,
publish_handlers: Optional[
Dict[Type[Exception], PublishingExceptionHandler]
Dict[
Type[Exception],
PublishingExceptionHandler,
]
] = None,
) -> None:
self._handlers = {
exc_type: apply_types(to_async(handler))
for exc_type, handler in (handlers or {}).items()
}

self._publish_handlers = {
exc_type: apply_types(to_async(handler))
for exc_type, handler in (publish_handlers or {}).items()
}
self._handlers: CastedHandlers = [
(IgnoredException, ignore_handler),
*(
(exc_type, apply_types(to_async(handler)))
for exc_type, handler in (handlers or {}).items()
),
]

self._publish_handlers: CastedPublishingHandlers = [
(IgnoredException, ignore_handler),
*(
(exc_type, apply_types(to_async(handler)))
for exc_type, handler in (publish_handlers or {}).items()
),
]

@overload
def add_handler(
Expand Down Expand Up @@ -140,7 +166,12 @@ def add_handler(
def pub_wrapper(
func: PublishingExceptionHandler,
) -> PublishingExceptionHandler:
self._publish_handlers[exc] = apply_types(to_async(func))
self._publish_handlers.append(
(
exc,
apply_types(to_async(func)),
)
)
return func

return pub_wrapper
Expand All @@ -150,7 +181,12 @@ def pub_wrapper(
def default_wrapper(
func: GeneralExceptionHandler,
) -> GeneralExceptionHandler:
self._handlers[exc] = apply_types(to_async(func))
self._handlers.append(
(
exc,
apply_types(to_async(func)),
)
)
return func

return default_wrapper
Expand All @@ -162,3 +198,7 @@ def __call__(self, msg: Optional[Any]) -> BaseExceptionMiddleware:
publish_handlers=self._publish_handlers,
msg=msg,
)


async def ignore_handler(exception: IgnoredException) -> NoReturn:
raise exception
39 changes: 37 additions & 2 deletions tests/brokers/base/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,39 @@ async def subscriber2(msg=Context("message")):
assert event.is_set()
assert mock.call_count == 0

async def test_exception_middleware_do_not_catch_skip_msg(
self, event: asyncio.Event, queue: str, mock: Mock, raw_broker
):
mid = ExceptionMiddleware()

@mid.add_handler(Exception)
async def value_error_handler(exc):
mock()

broker = self.broker_class(middlewares=(mid,))
args, kwargs = self.get_subscriber_params(queue)

@broker.subscriber(*args, **kwargs)
async def subscriber(m):
event.set()
raise SkipMessage

broker = self.patch_broker(raw_broker, broker)

async with broker:
await broker.start()
await asyncio.wait(
(
asyncio.create_task(broker.publish("", queue)),
asyncio.create_task(event.wait()),
),
timeout=self.timeout,
)
await asyncio.sleep(0.001)

assert event.is_set()
assert mock.call_count == 0

async def test_exception_middleware_reraise(
self, event: asyncio.Event, queue: str, mock: Mock, raw_broker
):
Expand Down Expand Up @@ -629,7 +662,7 @@ async def value_error_handler(exc):

mid2 = ExceptionMiddleware(handlers={ValueError: value_error_handler})

assert mid1._handlers.keys() == mid2._handlers.keys()
assert [x[0] for x in mid1._handlers] == [x[0] for x in mid2._handlers]

async def test_exception_middleware_init_publish_handler_same(self):
mid1 = ExceptionMiddleware()
Expand All @@ -640,7 +673,9 @@ async def value_error_handler(exc):

mid2 = ExceptionMiddleware(publish_handlers={ValueError: value_error_handler})

assert mid1._publish_handlers.keys() == mid2._publish_handlers.keys()
assert [x[0] for x in mid1._publish_handlers] == [
x[0] for x in mid2._publish_handlers
]

async def test_exception_middleware_decoder_error(
self, event: asyncio.Event, queue: str, mock: Mock, raw_broker
Expand Down

0 comments on commit af63e30

Please sign in to comment.