Skip to content

Commit

Permalink
adding intervaltree to manage gaps in topics to prevent OOM
Browse files Browse the repository at this point in the history
  • Loading branch information
vmirz committed Nov 22, 2023
1 parent f238c77 commit f46f0a0
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 13 deletions.
2 changes: 1 addition & 1 deletion faust/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from typing import Any, Mapping, NamedTuple, Optional, Sequence, Tuple

__version__ = '1.13.0'
__version__ = '1.13.1'
__author__ = 'Robinhood Markets, Inc.'
__contact__ = 'contact@fauststream.com'
__homepage__ = 'http://faust.readthedocs.io/'
Expand Down
40 changes: 28 additions & 12 deletions faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
from mode.utils.text import pluralize
from mode.utils.times import Seconds
from faust.exceptions import ProducerSendError
from intervaltree import IntervalTree
from faust.types import AppT, ConsumerMessage, Message, RecordMetadata, TP
from faust.types.core import HeadersArg
from faust.types.transports import (
Expand Down Expand Up @@ -430,7 +431,7 @@ def __init__(self,
self.commit_livelock_soft_timeout = (
commit_livelock_soft_timeout or
self.app.conf.broker_commit_livelock_soft_timeout)
self._gap = defaultdict(list)
self._gap = defaultdict(IntervalTree)
self._acked = defaultdict(list)
self._acked_index = defaultdict(set)
self._read_offset = defaultdict(lambda: None)
Expand Down Expand Up @@ -975,13 +976,22 @@ def _new_offset(self, tp: TP) -> Optional[int]:
# the return value will be: 36
if acked:
max_offset = max(acked)
gap_for_tp = self._gap[tp]
gap_for_tp: IntervalTree = self._gap[tp]
if gap_for_tp:
gap_index = next((i for i, x in enumerate(gap_for_tp)
if x > max_offset), len(gap_for_tp))
gaps = gap_for_tp[:gap_index]
acked.extend(gaps)
gap_for_tp[:gap_index] = []
# find all the ranges up to the max of acked, add them in to acked,
# and chop them off the gap.
candidates = gap_for_tp.overlap(0, max_offset)
# note: merge_overlaps will sort the intervaltree and will ensure that
# the intervals left over don't overlap each other. So can sort by their
# start without worrying about ends overlapping.
sorted_candidates = sorted(candidates, key=lambda x: x.begin)
if sorted_candidates:
stuff_to_add = []
for entry in sorted_candidates:
stuff_to_add.extend(range(entry.begin, entry.end))
new_max_offset = max(stuff_to_add[-1], max_offset + 1)
acked.extend(stuff_to_add)
gap_for_tp.chop(0, new_max_offset)
acked.sort()
# Note: acked is always kept sorted.
# find first list of consecutive numbers
Expand All @@ -997,12 +1007,18 @@ async def on_task_error(self, exc: BaseException) -> None:
"""Call when processing a message failed."""
await self.commit()

def _add_gap(self, tp: TP, offset_from: int, offset_to: int) -> None:
async def _add_gap(self, tp: TP, offset_from: int, offset_to: int) -> None:
committed = self._committed_offset[tp]
gap_for_tp = self._gap[tp]
for offset in range(offset_from, offset_to):
if committed is None or offset > committed:
gap_for_tp.append(offset)
if committed is not None:
offset_from = max(offset_from, committed + 1)
# intervaltree intervals exclude the end
if offset_from <= offset_to:
gap_for_tp.addi(offset_from, offset_to + 1)
# sleep 0 to allow other coroutines to get some loop time
# for example, to answer health checks while building the gap
await asyncio.sleep(0)
gap_for_tp.merge_overlaps()

async def _drain_messages(
self, fetcher: ServiceT) -> None: # pragma: no cover
Expand Down Expand Up @@ -1050,7 +1066,7 @@ async def _drain_messages(
if gap > 1 and r_offset:
acks_enabled = acks_enabled_for(message.topic)
if acks_enabled:
self._add_gap(tp, r_offset + 1, offset)
await self._add_gap(tp, r_offset + 1, offset)
if commit_every is not None:
if self._n_acked >= commit_every:
self._n_acked = 0
Expand Down
1 change: 1 addition & 0 deletions requirements/dist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ tox>=2.3.1
twine
vulture
wheel>=0.29.0
intervaltree

0 comments on commit f46f0a0

Please sign in to comment.