Skip to content

Commit

Permalink
backpressure fix during recovery Issue #74 (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram authored Jan 15, 2021
1 parent 0a4d059 commit 6f86b73
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 1 deletion.
2 changes: 2 additions & 0 deletions faust/tables/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ async def _restart_recovery(self) -> None:

def _set_recovery_started(self) -> None:
self.in_recovery = True
self.app.in_recovery = True
self._recovery_ended = None
self._recovery_started_at = monotonic()
self._active_events_received_at.clear()
Expand All @@ -539,6 +540,7 @@ def _set_recovery_started(self) -> None:

def _set_recovery_ended(self) -> None:
self.in_recovery = False
self.app.in_recovery = False
self._recovery_ended_at = monotonic()
self._active_events_received_at.clear()
self._standby_events_received_at.clear()
Expand Down
2 changes: 1 addition & 1 deletion faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ def _set_active_tps(self, tps: Set[TP]) -> Set[TP]:

def on_buffer_full(self, tp: TP) -> None:
# do not remove the partition when in recovery
if not self.app.rebalancing:
if not self.app.in_recovery:
active_partitions = self._get_active_partitions()
active_partitions.discard(tp)
self._buffered_partitions.add(tp)
Expand Down
2 changes: 2 additions & 0 deletions faust/types/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ class AppT(ServiceT):
#: Set to true if the worker is currently rebalancing.
rebalancing: bool = False
rebalancing_count: int = 0
#: Set to true when the worker is in recovery
in_recovery: bool = False

#: Set to true if the assignment is empty
# This flag is set by App._on_partitions_assigned
Expand Down

0 comments on commit 6f86b73

Please sign in to comment.