Skip to content

Commit

Permalink
hotfix: correct NatsResponse processing in RPC case (airtai#1654)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Aug 8, 2024
1 parent a0879b7 commit f9324a3
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 18 deletions.
6 changes: 3 additions & 3 deletions docs/docs/en/release.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ hide:

Well, seems like it is the biggest patch release ever 😃

#### Detail Reponses
#### Detail Responses

First of all, thanks to all new contributors, who helps us to improve the project! They made a huge impact to this release by adding new Kafka security machanisms and extend Response API - now you can use `broker.Response` to publish detailt information from handler
First of all, thanks to all new contributors, who helps us to improve the project! They made a huge impact to this release by adding new Kafka security mechanisms and extend Response API - now you can use `broker.Response` to publish detail information from handler

```python
@broker.subscriber("in")
Expand All @@ -33,7 +33,7 @@ async def handler(msg):

Also, we added a new huge feature - [**ASGI** support](https://faststream.airt.ai/latest/getting-started/asgi/#other-asgi-compatibility)!

Nope, we are not HTTP-framework now, but it is a little ASGI implemetnation to provide you with an ability to host documentation, use k8s http-probes and serve metrics in the same with you broker runtime without any dependencies.
Nope, we are not HTTP-framework now, but it is a little ASGI implementation to provide you with an ability to host documentation, use k8s http-probes and serve metrics in the same with you broker runtime without any dependencies.

You just need to use **AsgiFastStream** class

Expand Down
12 changes: 1 addition & 11 deletions faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
"""Simple and fast framework to create message brokers based microservices."""

__version__ = "0.5.16"
__version__ = "0.5.17"

SERVICE_NAME = f"faststream-{__version__}"

INSTALL_YAML = """
To generate YAML documentation, please install dependencies:\n
pip install PyYAML
"""

INSTALL_WATCHFILES = """
To use restart feature, please install dependencies:\n
pip install watchfiles
"""
2 changes: 1 addition & 1 deletion faststream/cli/docs/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

import typer

from faststream.__about__ import INSTALL_WATCHFILES, INSTALL_YAML
from faststream._compat import json_dumps, model_parse
from faststream.asyncapi.generate import get_app_schema
from faststream.asyncapi.schema import Schema
from faststream.asyncapi.site import serve_app
from faststream.cli.utils.imports import import_from_string
from faststream.exceptions import INSTALL_WATCHFILES, INSTALL_YAML

docs_app = typer.Typer(pretty_exceptions_short=True)

Expand Down
4 changes: 2 additions & 2 deletions faststream/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
from typer.core import TyperOption

from faststream import FastStream
from faststream.__about__ import INSTALL_WATCHFILES, __version__
from faststream.__about__ import __version__
from faststream.cli.docs.app import docs_app
from faststream.cli.utils.imports import import_from_string
from faststream.cli.utils.logs import LogLevels, get_log_level, set_log_level
from faststream.cli.utils.parser import parse_cli_args
from faststream.exceptions import SetupError, ValidationError
from faststream.exceptions import INSTALL_WATCHFILES, SetupError, ValidationError

if TYPE_CHECKING:
from faststream.broker.core.usecase import BrokerUsecase
Expand Down
13 changes: 12 additions & 1 deletion faststream/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,15 @@ def __init__(self, fields: Iterable[str] = ()) -> None:
)


NOT_CONNECTED_YET = "Please, `connect()` the broker first"
NOT_CONNECTED_YET = "Please, `connect()` the broker first."


INSTALL_YAML = """
To generate YAML documentation, please install dependencies:\n
pip install PyYAML
"""

INSTALL_WATCHFILES = """
To use restart feature, please install dependencies:\n
pip install watchfiles
"""
1 change: 1 addition & 0 deletions faststream/nats/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async def publish( # type: ignore[override]
rpc: bool = False,
rpc_timeout: Optional[float] = 30.0,
raise_timeout: bool = False,
**kwargs: Any, # suprress stream option
) -> Optional[Any]:
payload, content_type = encode_message(message)

Expand Down
22 changes: 22 additions & 0 deletions tests/brokers/nats/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,25 @@ async def handle_next(msg=Context("message")):
body=b"1",
correlation_id="1",
)

@pytest.mark.asyncio()
async def test_response_for_rpc(
self,
queue: str,
event: asyncio.Event,
):
pub_broker = self.get_broker(apply_types=True)

@pub_broker.subscriber(queue)
async def handle():
return NatsResponse("Hi!", correlation_id="1")

async with self.patch_broker(pub_broker) as br:
await br.start()

response = await asyncio.wait_for(
br.publish("", queue, rpc=True),
timeout=3,
)

assert response == "Hi!", response
22 changes: 22 additions & 0 deletions tests/brokers/rabbit/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,25 @@ async def handle_next(msg=Context("message")):
assert m.mock.call_args.kwargs.get("persist")

mock.assert_called_once_with(body=b"1")

@pytest.mark.asyncio()
async def test_response_for_rpc(
self,
queue: str,
event: asyncio.Event,
):
pub_broker = self.get_broker(apply_types=True)

@pub_broker.subscriber(queue)
async def handle():
return RabbitResponse("Hi!", correlation_id="1")

async with self.patch_broker(pub_broker) as br:
await br.start()

response = await asyncio.wait_for(
br.publish("", queue, rpc=True),
timeout=3,
)

assert response == "Hi!", response
22 changes: 22 additions & 0 deletions tests/brokers/redis/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,25 @@ async def resp(msg=Context("message")):
body=b"1",
correlation_id="1",
)

@pytest.mark.asyncio()
async def test_response_for_rpc(
self,
queue: str,
event: asyncio.Event,
):
pub_broker = self.get_broker(apply_types=True)

@pub_broker.subscriber(queue)
async def handle():
return RedisResponse("Hi!", correlation_id="1")

async with self.patch_broker(pub_broker) as br:
await br.start()

response = await asyncio.wait_for(
br.publish("", queue, rpc=True),
timeout=3,
)

assert response == "Hi!", response

0 comments on commit f9324a3

Please sign in to comment.