Skip to content

Commit

Permalink
feat: kafka listener, extended include_router (airtai#1374)
Browse files Browse the repository at this point in the history
* chore: update dependencies

* feat: extended include_router

* chore: update dependencies

* docs: gen API

* chore: bump version
  • Loading branch information
Lancetnik committed Apr 16, 2024
1 parent 96b58f5 commit 463c953
Show file tree
Hide file tree
Showing 20 changed files with 555 additions and 66 deletions.
1 change: 1 addition & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,7 @@ search:
- [PullSub](api/faststream/nats/schemas/PullSub.md)
- js_stream
- [JStream](api/faststream/nats/schemas/js_stream/JStream.md)
- [compile_nats_wildcard](api/faststream/nats/schemas/js_stream/compile_nats_wildcard.md)
- [is_subject_match_wildcard](api/faststream/nats/schemas/js_stream/is_subject_match_wildcard.md)
- pull_sub
- [PullSub](api/faststream/nats/schemas/pull_sub/PullSub.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.nats.schemas.js_stream.compile_nats_wildcard
2 changes: 1 addition & 1 deletion docs/docs/en/release.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ Such changes allows you two features previously unavailable
* suppress any exceptions and pas fall-back message body to publishers
* patch any outgoing message headers and other parameters

Without these features we just can't impelement [Observability Middleware](https://github.com/airtai/faststream/issues/916) or any similar tool, so it is the job to be done.
Without these features we just can't implement [Observability Middleware](https://github.com/airtai/faststream/issues/916) or any similar tool, so it is the job to be done.

Now you are free to get access at any message processing stage and we are one step closer to the framework we would like to create!

Expand Down
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Simple and fast framework to create message brokers based microservices."""

__version__ = "0.5.0"
__version__ = "0.5.1"

SERVICE_NAME = f"faststream-{__version__}"

Expand Down
46 changes: 39 additions & 7 deletions faststream/broker/core/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,55 @@ def publisher(
self._publishers = {**self._publishers, key: publisher}
return publisher

def include_router(self, router: "ABCBroker[Any]") -> None:
def include_router(
self,
router: "ABCBroker[Any]",
*,
prefix: str = "",
dependencies: Iterable["Depends"] = (),
middlewares: Iterable["BrokerMiddleware[MsgType]"] = (),
include_in_schema: Optional[bool] = None,
) -> None:
"""Includes a router in the current object."""
for h in router._subscribers.values():
h.add_prefix(self.prefix)
h.add_prefix("".join((self.prefix, prefix)))

if (key := hash(h)) not in self._subscribers:
h.include_in_schema = self._solve_include_in_schema(h.include_in_schema)
h._broker_middlewares = (*self._middlewares, *h._broker_middlewares)
h._broker_dependecies = (*self._dependencies, *h._broker_dependecies)
if include_in_schema is None:
h.include_in_schema = self._solve_include_in_schema(
h.include_in_schema
)
else:
h.include_in_schema = include_in_schema

h._broker_middlewares = (
*self._middlewares,
*middlewares,
*h._broker_middlewares,
)
h._broker_dependecies = (
*self._dependencies,
*dependencies,
*h._broker_dependecies,
)
self._subscribers = {**self._subscribers, key: h}

for p in router._publishers.values():
p.add_prefix(self.prefix)

if (key := hash(p)) not in self._publishers:
p.include_in_schema = self._solve_include_in_schema(p.include_in_schema)
p._broker_middlewares = (*self._middlewares, *p._broker_middlewares)
if include_in_schema is None:
p.include_in_schema = self._solve_include_in_schema(
p.include_in_schema
)
else:
p.include_in_schema = include_in_schema

p._broker_middlewares = (
*self._middlewares,
*middlewares,
*p._broker_middlewares,
)
self._publishers = {**self._publishers, key: p}

def include_routers(
Expand Down
131 changes: 131 additions & 0 deletions faststream/kafka/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

if TYPE_CHECKING:
from aiokafka import ConsumerRecord
from aiokafka.abc import ConsumerRebalanceListener
from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from fast_depends.dependencies import Depends

Expand Down Expand Up @@ -303,6 +304,38 @@ def subscriber(
Optional[int],
Doc("Number of messages to consume as one batch."),
] = None,
listener: Annotated[
Optional["ConsumerRebalanceListener"],
Doc("""
Optionally include listener
callback, which will be called before and after each rebalance
operation.
As part of group management, the consumer will keep track of
the list of consumers that belong to a particular group and
will trigger a rebalance operation if one of the following
events trigger:
* Number of partitions change for any of the subscribed topics
* Topic is created or deleted
* An existing member of the consumer group dies
* A new member is added to the consumer group
When any of these events are triggered, the provided listener
will be invoked first to indicate that the consumer's
assignment has been revoked, and then again when the new
assignment has been received. Note that this listener will
immediately override any listener set in a previous call
to subscribe. It is guaranteed, however, that the partitions
revoked/assigned
through this interface are from topics subscribed in this call.
"""),
] = None,
pattern: Annotated[
Optional[str],
Doc("""
Pattern to match available topics. You must provide either topics or pattern, but not both.
"""),
] = None,
# broker args
dependencies: Annotated[
Iterable["Depends"],
Expand Down Expand Up @@ -595,6 +628,38 @@ def subscriber(
Optional[int],
Doc("Number of messages to consume as one batch."),
] = None,
listener: Annotated[
Optional["ConsumerRebalanceListener"],
Doc("""
Optionally include listener
callback, which will be called before and after each rebalance
operation.
As part of group management, the consumer will keep track of
the list of consumers that belong to a particular group and
will trigger a rebalance operation if one of the following
events trigger:
* Number of partitions change for any of the subscribed topics
* Topic is created or deleted
* An existing member of the consumer group dies
* A new member is added to the consumer group
When any of these events are triggered, the provided listener
will be invoked first to indicate that the consumer's
assignment has been revoked, and then again when the new
assignment has been received. Note that this listener will
immediately override any listener set in a previous call
to subscribe. It is guaranteed, however, that the partitions
revoked/assigned
through this interface are from topics subscribed in this call.
"""),
] = None,
pattern: Annotated[
Optional[str],
Doc("""
Pattern to match available topics. You must provide either topics or pattern, but not both.
"""),
] = None,
# broker args
dependencies: Annotated[
Iterable["Depends"],
Expand Down Expand Up @@ -887,6 +952,38 @@ def subscriber(
Optional[int],
Doc("Number of messages to consume as one batch."),
] = None,
listener: Annotated[
Optional["ConsumerRebalanceListener"],
Doc("""
Optionally include listener
callback, which will be called before and after each rebalance
operation.
As part of group management, the consumer will keep track of
the list of consumers that belong to a particular group and
will trigger a rebalance operation if one of the following
events trigger:
* Number of partitions change for any of the subscribed topics
* Topic is created or deleted
* An existing member of the consumer group dies
* A new member is added to the consumer group
When any of these events are triggered, the provided listener
will be invoked first to indicate that the consumer's
assignment has been revoked, and then again when the new
assignment has been received. Note that this listener will
immediately override any listener set in a previous call
to subscribe. It is guaranteed, however, that the partitions
revoked/assigned
through this interface are from topics subscribed in this call.
"""),
] = None,
pattern: Annotated[
Optional[str],
Doc("""
Pattern to match available topics. You must provide either topics or pattern, but not both.
"""),
] = None,
# broker args
dependencies: Annotated[
Iterable["Depends"],
Expand Down Expand Up @@ -1182,6 +1279,38 @@ def subscriber(
Optional[int],
Doc("Number of messages to consume as one batch."),
] = None,
listener: Annotated[
Optional["ConsumerRebalanceListener"],
Doc("""
Optionally include listener
callback, which will be called before and after each rebalance
operation.
As part of group management, the consumer will keep track of
the list of consumers that belong to a particular group and
will trigger a rebalance operation if one of the following
events trigger:
* Number of partitions change for any of the subscribed topics
* Topic is created or deleted
* An existing member of the consumer group dies
* A new member is added to the consumer group
When any of these events are triggered, the provided listener
will be invoked first to indicate that the consumer's
assignment has been revoked, and then again when the new
assignment has been received. Note that this listener will
immediately override any listener set in a previous call
to subscribe. It is guaranteed, however, that the partitions
revoked/assigned
through this interface are from topics subscribed in this call.
"""),
] = None,
pattern: Annotated[
Optional[str],
Doc("""
Pattern to match available topics. You must provide either topics or pattern, but not both.
"""),
] = None,
# broker args
dependencies: Annotated[
Iterable["Depends"],
Expand Down Expand Up @@ -1271,6 +1400,8 @@ def subscriber(
batch_timeout_ms=batch_timeout_ms,
max_records=max_records,
group_id=group_id,
listener=listener,
pattern=pattern,
builder=builder,
is_manual=not auto_commit,
# subscriber args
Expand Down
Loading

0 comments on commit 463c953

Please sign in to comment.