Skip to content

Commit

Permalink
feat: add broker.request method (airtai#1649)
Browse files Browse the repository at this point in the history
* feat: add broker.request method

* feat: kafka request support

* feat: confluent request support

* merge main

* feat: confluent request tests

* docs: generate API References

* tests: fix broken tests

* tests: refactor confluent test client

* docs: update rpc examples

* chore: deprecate message.decoded_body

* refactor: FastAPI 0.5.0 compatibility

* docs: remove useless API

* refactor: correct Consumer Protocol

* fix: correct Confluent FakeConsumer

* Ignore override

* Proofread docs

* Remove unused ignores

* Add ignore redundant-cast

* fix: correct merge

* lint: fix precommit

* fix: decoded_body public field compatibility

* fix: request respects consume middleware

* fix: request respects consume middleware for all brokers

* chore: bump version

* docs: generate API References

* fix: request respects global middlewares scope

---------

Co-authored-by: Lancetnik <Lancetnik@users.noreply.github.com>
Co-authored-by: Kumaran Rajendhiran <kumaran@airt.ai>
  • Loading branch information
3 people committed Aug 24, 2024
1 parent 5e52cf3 commit 8ec5a42
Show file tree
Hide file tree
Showing 84 changed files with 2,910 additions and 605 deletions.
13 changes: 10 additions & 3 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ search:
- [BrokerUsecase](api/faststream/broker/core/usecase/BrokerUsecase.md)
- fastapi
- [StreamMessage](api/faststream/broker/fastapi/StreamMessage.md)
- [StreamRoute](api/faststream/broker/fastapi/StreamRoute.md)
- [StreamRouter](api/faststream/broker/fastapi/StreamRouter.md)
- context
- [Context](api/faststream/broker/fastapi/context/Context.md)
Expand All @@ -365,8 +364,9 @@ search:
- [get_fastapi_native_dependant](api/faststream/broker/fastapi/get_dependant/get_fastapi_native_dependant.md)
- route
- [StreamMessage](api/faststream/broker/fastapi/route/StreamMessage.md)
- [StreamRoute](api/faststream/broker/fastapi/route/StreamRoute.md)
- [build_faststream_to_fastapi_parser](api/faststream/broker/fastapi/route/build_faststream_to_fastapi_parser.md)
- [make_fastapi_execution](api/faststream/broker/fastapi/route/make_fastapi_execution.md)
- [wrap_callable_to_fastapi_compatible](api/faststream/broker/fastapi/route/wrap_callable_to_fastapi_compatible.md)
- router
- [StreamRouter](api/faststream/broker/fastapi/router/StreamRouter.md)
- message
Expand Down Expand Up @@ -565,11 +565,13 @@ search:
- [HandlerException](api/faststream/exceptions/HandlerException.md)
- [IgnoredException](api/faststream/exceptions/IgnoredException.md)
- [NackMessage](api/faststream/exceptions/NackMessage.md)
- [OperationForbiddenError](api/faststream/exceptions/OperationForbiddenError.md)
- [RejectMessage](api/faststream/exceptions/RejectMessage.md)
- [SetupError](api/faststream/exceptions/SetupError.md)
- [SkipMessage](api/faststream/exceptions/SkipMessage.md)
- [StopApplication](api/faststream/exceptions/StopApplication.md)
- [StopConsume](api/faststream/exceptions/StopConsume.md)
- [SubscriberNotFound](api/faststream/exceptions/SubscriberNotFound.md)
- [ValidationError](api/faststream/exceptions/ValidationError.md)
- kafka
- [KafkaBroker](api/faststream/kafka/KafkaBroker.md)
Expand Down Expand Up @@ -844,6 +846,7 @@ search:
- usecase
- [LogicPublisher](api/faststream/rabbit/publisher/usecase/LogicPublisher.md)
- [PublishKwargs](api/faststream/rabbit/publisher/usecase/PublishKwargs.md)
- [RequestPublishKwargs](api/faststream/rabbit/publisher/usecase/RequestPublishKwargs.md)
- response
- [RabbitResponse](api/faststream/rabbit/response/RabbitResponse.md)
- router
Expand Down Expand Up @@ -990,8 +993,12 @@ search:
- [LogicSubscriber](api/faststream/redis/subscriber/usecase/LogicSubscriber.md)
- [StreamSubscriber](api/faststream/redis/subscriber/usecase/StreamSubscriber.md)
- testing
- [ChannelVisitor](api/faststream/redis/testing/ChannelVisitor.md)
- [FakeProducer](api/faststream/redis/testing/FakeProducer.md)
- [ListVisitor](api/faststream/redis/testing/ListVisitor.md)
- [StreamVisitor](api/faststream/redis/testing/StreamVisitor.md)
- [TestRedisBroker](api/faststream/redis/testing/TestRedisBroker.md)
- [Visitor](api/faststream/redis/testing/Visitor.md)
- [build_message](api/faststream/redis/testing/build_message.md)
- security
- [BaseSecurity](api/faststream/security/BaseSecurity.md)
Expand All @@ -1006,7 +1013,6 @@ search:
- [TestApp](api/faststream/testing/app/TestApp.md)
- broker
- [TestBroker](api/faststream/testing/broker/TestBroker.md)
- [call_handler](api/faststream/testing/broker/call_handler.md)
- [patch_broker_calls](api/faststream/testing/broker/patch_broker_calls.md)
- types
- [LoggerProto](api/faststream/types/LoggerProto.md)
Expand Down Expand Up @@ -1046,6 +1052,7 @@ search:
- [call_or_await](api/faststream/utils/functions/call_or_await.md)
- [drop_response_type](api/faststream/utils/functions/drop_response_type.md)
- [fake_context](api/faststream/utils/functions/fake_context.md)
- [return_input](api/faststream/utils/functions/return_input.md)
- [sync_fake_context](api/faststream/utils/functions/sync_fake_context.md)
- [timeout_scope](api/faststream/utils/functions/timeout_scope.md)
- [to_async](api/faststream/utils/functions/to_async.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.fastapi.route.build_faststream_to_fastapi_parser
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.fastapi.route.wrap_callable_to_fastapi_compatible
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/exceptions/OperationForbiddenError.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.exceptions.OperationForbiddenError
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ search:
boost: 0.5
---

::: faststream.broker.fastapi.route.StreamRoute
::: faststream.exceptions.SubscriberNotFound
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.rabbit.publisher.usecase.RequestPublishKwargs
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/redis/testing/ChannelVisitor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.redis.testing.ChannelVisitor
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ search:
boost: 0.5
---

::: faststream.broker.fastapi.StreamRoute
::: faststream.redis.testing.ListVisitor
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ search:
boost: 0.5
---

::: faststream.testing.broker.call_handler
::: faststream.redis.testing.StreamVisitor
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/redis/testing/Visitor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.redis.testing.Visitor
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/utils/functions/return_input.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.utils.functions.return_input
12 changes: 4 additions & 8 deletions docs/docs/en/nats/rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,15 @@ Just send a message like a regular one and get a response synchronously.

It is very close to the common **requests** syntax:

```python hl_lines="1 4"
msg = await broker.publish(
```python hl_lines="3"
from faststream.nats import NatsMessage

msg: NatsMessage = await broker.request(
"Hi!",
subject="test",
rpc=True,
)
```

Also, you have two extra options to control this behavior:

* `#!python rpc_timeout: Optional[float] = 30.0` - controls how long you are waiting for a response.
* `#!python raise_timeout: bool = False` - by default, a timeout request returns `None`, but if you need to raise a `TimeoutException` directly, you can specify this option.

## Reply-To

Also, if you want to create a permanent request-reply data flow, probably, you should create a permanent subject to consume responses.
Expand Down
3 changes: 3 additions & 0 deletions docs/docs/en/rabbit/ack.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ async def base_handler(body: str):
...
```

!!! tip
**FastStream** identifies the message by its `message_id`. To make this option work, you should manually set this field on the producer side (if your library doesn't set it automatically).

!!! bug
At the moment, attempts are counted only by the current consumer. If the message goes to another consumer, it will have its own counter.
Subsequently, this logic will be reworked.
Expand Down
12 changes: 4 additions & 8 deletions docs/docs/en/rabbit/rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,15 @@ Just send a message like a regular one and get a response synchronously.

It is very close to common **requests** syntax:

```python hl_lines="1 4"
msg = await broker.publish(
```python hl_lines="3"
from faststream.rabbit import RabbitMessage

msg: RabbitMessage = await broker.request(
"Hi!",
queue="test",
rpc=True,
)
```

Also, you have two extra options to control this behavior:

* `#!python rpc_timeout: Optional[float] = 30.0` - controls how long you are waiting for a response
* `#!python raise_timeout: bool = False` - by default, a timeout request returns `None`, but if you need to raise a `TimeoutException` directly, you can specify this option

## Reply-To

Also, if you want to create a permanent request-reply data flow, probably, you should create a permanent queue to consume responses.
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/en/redis/rpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ To implement **Redis** RPC with `RedisBroker` in **FastStream**, follow the step

3. Send RPC messages through `RedisBroker` and await responses on the correct data type.

After your application has started and the subscribers are ready to receive messages, you can publish messages with the `rpc` option enabled. Additionally, you can set an `rpc_timeout` to decide how long the publisher should wait for a response before timing out.
Additionally, you can set a `timeout` to decide how long the publisher should wait for a response before timing out.

```python linenums="1"
```python linenums="1" hl_lines="5 12 19"
{!> docs_src/redis/rpc/app.py [ln:26-49] !}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async def custom_parser(
original_parser: Callable[[Message], Awaitable[KafkaMessage]],
) -> KafkaMessage:
parsed_msg = await original_parser(msg)
parsed_msg.message_id = parsed_msg.headers["custom_message_id"]
parsed_msg.message_id = parsed_msg.headers.get("custom_message_id")
return parsed_msg


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async def custom_parser(
original_parser: Callable[[ConsumerRecord], Awaitable[KafkaMessage]],
) -> KafkaMessage:
parsed_msg = await original_parser(msg)
parsed_msg.message_id = parsed_msg.headers["custom_message_id"]
parsed_msg.message_id = parsed_msg.headers.get("custom_message_id")
return parsed_msg


Expand Down
2 changes: 1 addition & 1 deletion docs/docs_src/getting_started/serialization/parser_nats.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async def custom_parser(
original_parser: Callable[[Msg], Awaitable[NatsMessage]],
) -> NatsMessage:
parsed_msg = await original_parser(msg)
parsed_msg.message_id = parsed_msg.headers["custom_message_id"]
parsed_msg.message_id = parsed_msg.headers.get("custom_message_id")
return parsed_msg


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async def custom_parser(
original_parser: Callable[[IncomingMessage], Awaitable[RabbitMessage]],
) -> RabbitMessage:
parsed_msg = await original_parser(msg)
parsed_msg.message_id = parsed_msg.headers["custom_message_id"]
parsed_msg.message_id = parsed_msg.headers.get("custom_message_id")
return parsed_msg


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ async def custom_parser(
original_parser: Callable[[PubSubMessage], Awaitable[RedisMessage]],
) -> RedisMessage:
parsed_msg = await original_parser(msg)
parsed_msg.message_id = parsed_msg.headers["custom_message_id"]
parsed_msg.message_id = parsed_msg.headers.get("custom_message_id")
return parsed_msg


Expand Down
20 changes: 10 additions & 10 deletions docs/docs_src/redis/rpc/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from faststream import FastStream, Logger
from faststream.redis import RedisBroker
from faststream.redis import RedisBroker, RedisMessage

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)
Expand Down Expand Up @@ -27,23 +27,23 @@ async def handle_stream(msg: str, logger: Logger):
async def t():
msg = "Hi!"

assert msg == await broker.publish(
response: RedisMessage = await broker.request(
"Hi!",
channel="test-channel",
rpc=True,
rpc_timeout=3.0,
timeout=3.0,
)
assert await response.decode() == msg

assert msg == await broker.publish(
response: RedisMessage = await broker.request(
"Hi!",
list="test-list",
rpc=True,
rpc_timeout=3.0,
timeout=3.0,
)
assert await response.decode() == msg

assert msg == await broker.publish(
response: RedisMessage = await broker.request(
"Hi!",
stream="test-stream",
rpc=True,
rpc_timeout=3.0,
timeout=3.0,
)
assert await response.decode() == msg
3 changes: 2 additions & 1 deletion examples/e05_rpc_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ async def handle(msg, logger: Logger):

@app.after_startup
async def test_publishing():
assert (await broker.publish("ping", "test-queue", rpc=True)) == "pong"
response = await broker.request("ping", "test-queue")
assert await response.decode() == "pong"
2 changes: 1 addition & 1 deletion examples/e10_middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def subscriber_middleware(
msg: RabbitMessage,
) -> Any:
print(f"call handler middleware with body: {msg}")
msg.decoded_body = "fake message"
msg._decoded_body = "fake message"
result = await call_next(msg)
print("handler middleware out")
return result
Expand Down
4 changes: 2 additions & 2 deletions examples/nats/e02_basic_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ async def handler(msg: str, logger: Logger):

@app.after_startup
async def test_send():
response = await broker.publish("Hi!", "subject", rpc=True)
assert response == "Response"
response = await broker.request("Hi!", "subject")
assert await response.decode() == "Response"
18 changes: 9 additions & 9 deletions examples/redis/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@ async def handle_stream(msg: str, logger: Logger):
async def t():
msg = "Hi!"

assert msg == await broker.publish(
response = await broker.request(
"Hi!",
channel="test-channel",
rpc=True,
rpc_timeout=3.0,
timeout=3.0,
)
assert await response.decode() == msg

assert msg == await broker.publish(
response = await broker.request(
"Hi!",
list="test-list",
rpc=True,
rpc_timeout=3.0,
timeout=3.0,
)
assert await response.decode() == msg

assert msg == await broker.publish(
response = await broker.request(
"Hi!",
stream="test-stream",
rpc=True,
rpc_timeout=3.0,
timeout=3.0,
)
assert await response.decode() == msg
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Simple and fast framework to create message brokers based microservices."""

__version__ = "0.5.18"
__version__ = "0.5.19"

SERVICE_NAME = f"faststream-{__version__}"
1 change: 1 addition & 0 deletions faststream/asgi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ async def __call__(self, scope: "Scope", receive: "Receive", send: "Send") -> No
async def start_lifespan_context(self) -> AsyncIterator[None]:
async with anyio.create_task_group() as tg, self.lifespan_context():
tg.start_soon(self._startup)

try:
yield
finally:
Expand Down
Loading

0 comments on commit 8ec5a42

Please sign in to comment.