diff --git a/faust/__init__.py b/faust/__init__.py index d5febf348..938e50aac 100644 --- a/faust/__init__.py +++ b/faust/__init__.py @@ -24,7 +24,7 @@ from typing import Any, Mapping, NamedTuple, Optional, Sequence, Tuple -__version__ = '1.13.0' +__version__ = '1.13.1' __author__ = 'Robinhood Markets, Inc.' __contact__ = 'contact@fauststream.com' __homepage__ = 'http://faust.readthedocs.io/' diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index 87d9204b3..1a27299c3 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -78,6 +78,7 @@ from mode.utils.text import pluralize from mode.utils.times import Seconds from faust.exceptions import ProducerSendError +from intervaltree import IntervalTree from faust.types import AppT, ConsumerMessage, Message, RecordMetadata, TP from faust.types.core import HeadersArg from faust.types.transports import ( @@ -430,7 +431,7 @@ def __init__(self, self.commit_livelock_soft_timeout = ( commit_livelock_soft_timeout or self.app.conf.broker_commit_livelock_soft_timeout) - self._gap = defaultdict(list) + self._gap = defaultdict(IntervalTree) self._acked = defaultdict(list) self._acked_index = defaultdict(set) self._read_offset = defaultdict(lambda: None) @@ -975,13 +976,22 @@ def _new_offset(self, tp: TP) -> Optional[int]: # the return value will be: 36 if acked: max_offset = max(acked) - gap_for_tp = self._gap[tp] + gap_for_tp: IntervalTree = self._gap[tp] if gap_for_tp: - gap_index = next((i for i, x in enumerate(gap_for_tp) - if x > max_offset), len(gap_for_tp)) - gaps = gap_for_tp[:gap_index] - acked.extend(gaps) - gap_for_tp[:gap_index] = [] + # find all the ranges up to the max of acked, add them in to acked, + # and chop them off the gap. + candidates = gap_for_tp.overlap(0, max_offset) + # note: merge_overlaps will sort the intervaltree and will ensure that + # the intervals left over don't overlap each other. So can sort by their + # start without worrying about ends overlapping. + sorted_candidates = sorted(candidates, key=lambda x: x.begin) + if sorted_candidates: + stuff_to_add = [] + for entry in sorted_candidates: + stuff_to_add.extend(range(entry.begin, entry.end)) + new_max_offset = max(stuff_to_add[-1], max_offset + 1) + acked.extend(stuff_to_add) + gap_for_tp.chop(0, new_max_offset) acked.sort() # Note: acked is always kept sorted. # find first list of consecutive numbers @@ -997,12 +1007,18 @@ async def on_task_error(self, exc: BaseException) -> None: """Call when processing a message failed.""" await self.commit() - def _add_gap(self, tp: TP, offset_from: int, offset_to: int) -> None: + async def _add_gap(self, tp: TP, offset_from: int, offset_to: int) -> None: committed = self._committed_offset[tp] gap_for_tp = self._gap[tp] - for offset in range(offset_from, offset_to): - if committed is None or offset > committed: - gap_for_tp.append(offset) + if committed is not None: + offset_from = max(offset_from, committed + 1) + # intervaltree intervals exclude the end + if offset_from <= offset_to: + gap_for_tp.addi(offset_from, offset_to + 1) + # sleep 0 to allow other coroutines to get some loop time + # for example, to answer health checks while building the gap + await asyncio.sleep(0) + gap_for_tp.merge_overlaps() async def _drain_messages( self, fetcher: ServiceT) -> None: # pragma: no cover @@ -1050,7 +1066,7 @@ async def _drain_messages( if gap > 1 and r_offset: acks_enabled = acks_enabled_for(message.topic) if acks_enabled: - self._add_gap(tp, r_offset + 1, offset) + await self._add_gap(tp, r_offset + 1, offset) if commit_every is not None: if self._n_acked >= commit_every: self._n_acked = 0 diff --git a/requirements/dist.txt b/requirements/dist.txt index f41b125b9..e37f8b7db 100644 --- a/requirements/dist.txt +++ b/requirements/dist.txt @@ -12,3 +12,4 @@ tox>=2.3.1 twine vulture wheel>=0.29.0 +intervaltree \ No newline at end of file