From 2ca085cffc1ea465c93f0a21fa02d58a1c5ddb2b Mon Sep 17 00:00:00 2001 From: yuvipanda Date: Wed, 26 Dec 2018 22:59:45 -0800 Subject: [PATCH] Add check for readyness of process --- simpervisor/process.py | 55 ++++++++++++++++++++++++- tests/child_scripts/simplehttpserver.py | 23 +++++++++++ tests/test_ready.py | 49 ++++++++++++++++++++++ 3 files changed, 126 insertions(+), 1 deletion(-) create mode 100644 tests/child_scripts/simplehttpserver.py create mode 100644 tests/test_ready.py diff --git a/simpervisor/process.py b/simpervisor/process.py index 2b9b105..2dc5cbb 100644 --- a/simpervisor/process.py +++ b/simpervisor/process.py @@ -3,6 +3,7 @@ """ import signal import asyncio +import time import logging from simpervisor import atexitasync @@ -15,11 +16,13 @@ class KilledProcessError(Exception): pass class SupervisedProcess: - def __init__(self, name, *args, always_restart=False, **kwargs): + def __init__(self, name, *args, always_restart=False, ready_func=None, ready_timeout=5, **kwargs): self.always_restart = always_restart self.name = name self._proc_args = args self._proc_kwargs = kwargs + self.ready_func = ready_func + self.ready_timeout = ready_timeout self.proc: asyncio.Process = None # asyncio.Process has no 'poll', so we keep that state internally @@ -104,6 +107,7 @@ async def _restart_process_if_needed(self): exits. If we restart the process, `start()` sets this up again. """ retcode = await self.proc.wait() + # FIXME: Do we need to aquire a lock somewhere in this method? atexitasync.remove_handler(self._handle_signal) self._debug_log( 'exited', f'{self.name} exited with code {retcode}', @@ -158,6 +162,55 @@ async def kill(self): raise KilledProcessError(f"Process {self.name} has already been explicitly killed") return await self._signal_and_wait(signal.SIGKILL) + + async def ready(self): + """ + Wait for process to become 'ready' + """ + # FIXME: Should this be internal and part of 'start'? + # FIXME: Do we need some locks here? + # Repeatedly run ready_func with a timeout until it returns true + # FIXME, parameterize these numbers + start_time = time.time() + wait_time = 0.01 + + while True: + if time.time() - start_time > self.ready_timeout: + # We have exceeded our timeout, so return + return False + + # Make sure we haven't been killed yet since the last loop + # We explicitly do *not* check if we are running, since we might be + # restarting in a loop while the readyness check is happening + if self._killed or not self.proc: + return False + + # FIXME: What's the timeout for each readyness check handler? + # FIXME: We should probably check again if our process is still running + # FIXME: Should we be locking something here? + is_ready = await asyncio.wait_for(self.ready_func(self), 1) + cur_time = time.time() - start_time + self._debug_log( + 'ready-wait', + f'Readyness: {is_ready} after {cur_time} seconds, next check in {wait_time}s', + {'wait_time': wait_time, 'ready': is_ready, 'elapsed_time': cur_time} + ) + if is_ready: + return True + await asyncio.sleep(wait_time) + + # FIXME: Be more sophisticated here with backoff & jitter + wait_time = 2 * wait_time + if (time.time() + wait_time) > (start_time + self.ready_timeout): + # If we wait for wait_time, we'll be over the ready_timeout + # So let's clamp wait_time so that wait_time is just enough + # to get us to ready_timeout seconds since start_time + # FIXME: This means wait_time can be negative... + wait_time = (start_time + self.ready_timeout) - time.time() - 0.01 + + return False + + # Pass through methods specific methods from proc # We don't pass through everything, just a subset we know is safe # and would work. diff --git a/tests/child_scripts/simplehttpserver.py b/tests/child_scripts/simplehttpserver.py new file mode 100644 index 0000000..fe44137 --- /dev/null +++ b/tests/child_scripts/simplehttpserver.py @@ -0,0 +1,23 @@ +""" +Simple echo http server +""" +import time +import os +import sys +from aiohttp import web + +wait_time = float(sys.argv[1]) +print(f'waiting {wait_time}') +time.sleep(wait_time) + +PORT = os.environ['PORT'] + +routes = web.RouteTableDef() + +@routes.get('/') +async def hello(request): + return web.Response(text="Hello, world") + +app = web.Application() +app.add_routes(routes) +web.run_app(app, port=PORT) diff --git a/tests/test_ready.py b/tests/test_ready.py new file mode 100644 index 0000000..aa6e195 --- /dev/null +++ b/tests/test_ready.py @@ -0,0 +1,49 @@ +import sys +import time +import pytest +import os +from simpervisor import SupervisedProcess +import aiohttp +import logging + +@pytest.mark.asyncio +async def test_ready(): + """ + Test web app's readyness + """ + httpserver_file = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + 'child_scripts', + 'simplehttpserver.py' + ) + + port = '9005' + # We tell our server to wait this many seconds before it starts serving + ready_time = 3.0 + + async def _ready_func(p): + url = f'http://localhost:{port}' + async with aiohttp.ClientSession() as session: + try: + async with session.get(url) as resp: + logging.debug(f'Got code {resp.status} back from {url}') + return resp.status == 200 + except aiohttp.ClientConnectionError: + logging.debug(f'Connection to {url} refused') + return False + + proc = SupervisedProcess( + 'socketserver', + sys.executable, httpserver_file, str(ready_time), + ready_func=_ready_func, + env={'PORT': port} + ) + + try: + await proc.start() + start_time = time.time() + assert (await proc.ready()) + assert time.time() - start_time > ready_time + finally: + # Clean up our process after ourselves + await proc.kill() \ No newline at end of file