From 6f86b735876ed0b23dc4133079f3e0d48287f35f Mon Sep 17 00:00:00 2001 From: Vikram Patki <54442035+patkivikram@users.noreply.github.com> Date: Fri, 15 Jan 2021 10:56:56 -0500 Subject: [PATCH] backpressure fix during recovery Issue #74 (#75) --- faust/tables/recovery.py | 2 ++ faust/transport/consumer.py | 2 +- faust/types/app.py | 2 ++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index c83c75b51..1f545234c 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -530,6 +530,7 @@ async def _restart_recovery(self) -> None: def _set_recovery_started(self) -> None: self.in_recovery = True + self.app.in_recovery = True self._recovery_ended = None self._recovery_started_at = monotonic() self._active_events_received_at.clear() @@ -539,6 +540,7 @@ def _set_recovery_started(self) -> None: def _set_recovery_ended(self) -> None: self.in_recovery = False + self.app.in_recovery = False self._recovery_ended_at = monotonic() self._active_events_received_at.clear() self._standby_events_received_at.clear() diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index 3c8260a30..6812e1df5 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -519,7 +519,7 @@ def _set_active_tps(self, tps: Set[TP]) -> Set[TP]: def on_buffer_full(self, tp: TP) -> None: # do not remove the partition when in recovery - if not self.app.rebalancing: + if not self.app.in_recovery: active_partitions = self._get_active_partitions() active_partitions.discard(tp) self._buffered_partitions.add(tp) diff --git a/faust/types/app.py b/faust/types/app.py index 8c2c0be6d..8b9e687bd 100644 --- a/faust/types/app.py +++ b/faust/types/app.py @@ -179,6 +179,8 @@ class AppT(ServiceT): #: Set to true if the worker is currently rebalancing. rebalancing: bool = False rebalancing_count: int = 0 + #: Set to true when the worker is in recovery + in_recovery: bool = False #: Set to true if the assignment is empty # This flag is set by App._on_partitions_assigned