Skip to content

Commit

Permalink
feat(monitor): summarize tasks according to actual time
Browse files Browse the repository at this point in the history
Signed-off-by: Rongrong <i@rong.moe>
  • Loading branch information
Rongronggg9 committed May 2, 2024
1 parent 5fafb27 commit 551bff1
Showing 1 changed file with 43 additions and 17 deletions.
60 changes: 43 additions & 17 deletions src/command/monitor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import annotations
from typing import Union
from typing import Union, Final
from collections.abc import MutableMapping, Iterable, Mapping

import gc
Expand All @@ -23,6 +23,8 @@

logger = log.getLogger('RSStT.monitor')

TIMEOUT: Final[int] = 10 * 60 # 10 minutes

# it may cause memory leak, but they are too small that leaking thousands of that is still not a big deal!
__user_unsub_all_lock_bucket: dict[int, asyncio.Lock] = defaultdict(asyncio.Lock)
__user_blocked_counter = Counter()
Expand All @@ -41,12 +43,14 @@ def setter(self, value):


class MonitoringStat(AbstractContextManager):
# TODO: rewrite it to use real time?
# TODO: make __monitor directly call this class's method to log and make statistics
class Meta:
counter: MutableMapping[str, int] = Counter()
monitoring_counts = 0
summary_period = 10
last_summary_time: float = env.loop.time()
task_finished: set[object] = set()
task_in_progress: set[object] = set()
task_stuck: set[object] = set()
summary_period: float = TIMEOUT # seconds

not_updated: int = _gen_property('not_updated')
cached: int = _gen_property('cached')
Expand All @@ -73,27 +77,49 @@ def _stat(counter: Mapping) -> str:
)))

def __init__(self):
self.print_summary()
self.counter: MutableMapping[str, int] = Counter()
self._token = object()
self.Meta.task_in_progress.add(self._token)

def __exit__(self, *args):
meta = self.Meta
meta.counter += self.counter
level = logging.DEBUG
if self.timeout or self.cancelled or self.unknown_error or self.timeout_unknown_error:
level = logging.WARNING
msg = f'Finished a monitoring task: {self._stat(self.counter)}'
logger.log(level, msg)
self.Meta.monitoring_counts += 1
if meta.monitoring_counts == meta.summary_period:
try:
meta.counter += self.counter
level = logging.DEBUG
if self.timeout or self.cancelled or self.unknown_error or self.timeout_unknown_error:
level = logging.WARNING
msg = f'Finished a monitoring task: {self._stat(self.counter)}'
logger.log(level, msg)
finally:
meta.task_in_progress.discard(self._token)
meta.task_stuck.discard(self._token)
meta.task_finished.add(self._token)
self.print_summary()
meta.monitoring_counts = 0
meta.counter.clear()
gc.collect()

@classmethod
def print_summary(cls):
meta = cls.Meta
logger.info(f'Summary of the last {meta.monitoring_counts} monitoring tasks: ' + cls._stat(meta.counter))
now = env.loop.time()
time_diff = round(now - meta.last_summary_time)
if time_diff < meta.summary_period:
return
logger.info(
f'{len(meta.task_finished)} monitoring tasks finished in the past {time_diff}s'
+ (f', while {len(meta.task_in_progress)} are still in progress' if meta.task_in_progress else '') +
f'. Subtask summary of finished tasks: {cls._stat(meta.counter)}'
)
if meta.task_stuck:
logger.warning(
f'{len(meta.task_stuck)} monitoring tasks are still in progress after >{time_diff}s, '
'are they stuck?'
)
meta.last_summary_time = now
meta.task_stuck |= meta.task_in_progress
meta.task_in_progress.clear()
meta.task_finished.clear()
meta.counter.clear()
gc.collect()


async def run_monitor_task():
Expand All @@ -104,7 +130,7 @@ async def run_monitor_task():
feeds = await db.Feed.filter(id__in=feed_id_to_monitor)

logger.debug('Started a monitoring task.')
wait_for = 10 * 60
wait_for = TIMEOUT

with MonitoringStat() as stat:
task_feed_map = {
Expand Down

0 comments on commit 551bff1

Please sign in to comment.