Skip to content

Commit

Permalink
If sync API is used in a running loop start a new loop in a separate …
Browse files Browse the repository at this point in the history
…thread (#47)
  • Loading branch information
jacobtomlinson authored May 5, 2023
1 parent c00ecc6 commit 7253f4e
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 67 deletions.
169 changes: 103 additions & 66 deletions kr8s/_asyncio.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,107 @@
# SPDX-FileCopyrightText: Copyright (c) 2023, MrNaif2018, Dask Developers, NVIDIA
# SPDX-License-Identifier: MIT License
# SPDX-License-URL: https://github.com/bitcartcc/universalasync/blob/d3979113316431a24f0260804442d29a38e414a2/LICENSE
# Forked from https://github.com/bitcartcc/universalasync/tree/d3979113316431a24f0260804442d29a38e414a2
# SPDX-FileCopyrightText: Copyright (c) 2023, Jupyter Development Team., MrNaif2018, Dask Developers, NVIDIA
# SPDX-License-Identifier: MIT License, BSD 3-Clause License
#
# This file is a fork of universalasync (commit d397911) and jupyter-core (commit 98b9a1a).
# Both projects attempt to solve the same problem: how to run nested asyncio tasks.
# Neither solution quite fit in here, so we forked them and combined them.
#
# universalasync License: https://github.com/bitcartcc/universalasync/blob/d397911/LICENSE
# jupyter-core License: https://github.com/jupyter/jupyter_core/blob/98b9a1a/COPYING.md
#
# This implementation uses the _TaskRunner from jupyter-core, but with a modified version of the wraps
# decorator from universalasync.

import asyncio
import functools
import atexit
import inspect
from typing import Any, AsyncGenerator, Callable, Generator, Tuple

import threading
from typing import (
Any,
AsyncGenerator,
Awaitable,
Callable,
Generator,
Optional,
Tuple,
TypeVar,
)

T = TypeVar("T")


class _TaskRunner:
"""A task runner that runs an asyncio event loop on a background thread."""

def __init__(self):
self.__io_loop: Optional[asyncio.AbstractEventLoop] = None
self.__runner_thread: Optional[threading.Thread] = None
self.__lock = threading.Lock()
atexit.register(self._close)

def _close(self):
if self.__io_loop:
self.__io_loop.stop()

def _runner(self):
loop = self.__io_loop
assert loop is not None # noqa
try:
loop.run_forever()
finally:
loop.close()

def run(self, coro):
"""Synchronously run a coroutine on a background thread."""
with self.__lock:
name = f"{threading.current_thread().name} - runner"
if self.__io_loop is None:
self.__io_loop = asyncio.new_event_loop()
self.__runner_thread = threading.Thread(
target=self._runner, daemon=True, name=name
)
self.__runner_thread.start()
fut = asyncio.run_coroutine_threadsafe(coro, self.__io_loop)
return fut.result(None)


_runner_map = {}


def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]:
"""Wraps coroutine in a function that blocks until it has executed.
Parameters
----------
coro : coroutine-function
The coroutine-function to be executed.
Returns
-------
result :
Whatever the coroutine-function returns.
"""

def _get_event_loop() -> asyncio.AbstractEventLoop:
try:
return asyncio.get_event_loop_policy().get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
def wrapped(*args, **kwargs):
name = threading.current_thread().name
inner = coro(*args, **kwargs)
try:
# If a loop is currently running use a task runner to
# run in a new loop in a separate thread.
asyncio.get_running_loop()
if name not in _runner_map:
_runner_map[name] = _TaskRunner()
handler = _runner_map[name].run
except RuntimeError:
# Otherwise just run in a new loop.
handler = asyncio.run

if inspect.isawaitable(inner):
return handler(inner)
if inspect.isasyncgen(inner):
return iter_over_async(inner, lambda inner: handler(inner))

def get_event_loop() -> asyncio.AbstractEventLoop:
"""Useful utility for getting event loop. Acts like get_event_loop(), but also creates new event loop if needed
This will return a working event loop in 100% of cases.
Returns:
asyncio.AbstractEventLoop: event loop
"""
loop = _get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
wrapped.__doc__ = coro.__doc__
return wrapped


def iter_over_async(agen: AsyncGenerator, run_func: Callable) -> Generator:
Expand All @@ -49,45 +121,10 @@ async def get_next() -> Tuple[bool, Any]:
yield obj


def run_sync_ctx(coroutine: Any, loop: asyncio.AbstractEventLoop) -> Any:
if inspect.isawaitable(coroutine):
return asyncio.run(coroutine)

if inspect.isasyncgen(coroutine):
return iter_over_async(coroutine, lambda coro: asyncio.run(coro))


def async_to_sync_wraps(function: Callable) -> Callable:
"""Wrap an async method/property to universal method.
This allows to run wrapped methods in both async and sync contexts transparently without any additional code
When run from another thread, it runs coroutines in new thread's event loop
See :ref:`Example <example>` for full example
Args:
function (Callable): function/property to wrap
Returns:
Callable: modified function
"""

@functools.wraps(function)
def async_to_sync_wrap(*args: Any, **kwargs: Any) -> Any:
loop = get_event_loop()
coroutine = function(*args, **kwargs)

return run_sync_ctx(coroutine, loop)

result = async_to_sync_wrap
return result


def sync(source: object) -> object:
"""Convert all public async methods/properties of an object to universal methods.
See :func:`async_to_sync_wraps` for more info
See :func:`run_sync` for more info
Args:
source (object): object to convert
Expand All @@ -104,12 +141,12 @@ def sync(source: object) -> object:
method
):
function = getattr(source, name)
setattr(source, name, async_to_sync_wraps(function))
setattr(source, name, run_sync(function))

elif name == "__aenter__" and not hasattr(source, "__enter__"):
setattr(source, "__enter__", async_to_sync_wraps(method))
setattr(source, "__enter__", run_sync(method))

elif name == "__aexit__" and not hasattr(source, "__exit__"):
setattr(source, "__exit__", async_to_sync_wraps(method))
setattr(source, "__exit__", run_sync(method))

return source
1 change: 0 additions & 1 deletion kr8s/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def test_version_sync():
assert "major" in version


@pytest.mark.xfail(reason="Cannot run nested event loops", raises=RuntimeError)
async def test_version_sync_in_async():
kubernetes = kr8s.api()
version = kubernetes.version()
Expand Down

0 comments on commit 7253f4e

Please sign in to comment.