Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Refactor python async trampoline handling #3547

Merged
merged 3 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,22 @@ pythonVersion = "3.10"
typeCheckingMode = "strict"

[tool.isort]
profile = "black"
atomic = true
lines_after_imports = 2
lines_between_types = 1
multi_line_output = 3 # corresponds to -m flag
include_trailing_comma = true # corresponds to -tc flag
line_length = 88
known_third_party = ["cognite","pytest"]
py_version=310

[tool.ruff]
# Keep in sync with .pre-commit-config.yaml
line-length = 120
ignore = []
target-version = "py310"
select = ["E", "W", "F", "I", "T", "RUF", "TID", "UP"]
exclude = ["tests", "build", "temp", "src/fable_library", "src/fable_library_rust", "src/fable_library_php"]
include =["*.py"]

[tool.ruff.pydocstyle]
convention = "google"

[tool.ruff.isort]
lines-after-imports = 2


[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
115 changes: 77 additions & 38 deletions src/fable-library-py/fable_library/async_.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
import asyncio
from __future__ import annotations

import asyncio
from asyncio import Future, ensure_future
from collections.abc import Awaitable, Callable, Iterable
from concurrent.futures import ThreadPoolExecutor
from threading import Timer
from typing import (
Any,
Awaitable,
Callable,
Iterable,
List,
Literal,
Optional,
TypeVar,
Union,
)

from .async_builder import (
Continuations,
Async,
CancellationToken,
Continuations,
IAsyncContext,
OperationCanceledError,
Trampoline,
Expand All @@ -30,8 +26,10 @@
)

# F# generated code (from Choice.fs)
from .choice import Choice_makeChoice1Of2 # type: ignore
from .choice import Choice_makeChoice2Of2 # type: ignore
from .choice import (
Choice_makeChoice1Of2, # type: ignore
Choice_makeChoice2Of2, # type: ignore
)
from .task import TaskCompletionSource


Expand All @@ -47,6 +45,7 @@ def cont(ctx: IAsyncContext[Any]):

default_cancellation_token = CancellationToken()


# see AsyncBuilder.Delay
def delay(generator: Callable[[], Async[_T]]):
def cont(ctx: IAsyncContext[_T]):
Expand All @@ -55,7 +54,7 @@ def cont(ctx: IAsyncContext[_T]):
return protected_cont(cont)


def create_cancellation_token(arg: Union[int, bool, None] = None) -> CancellationToken:
def create_cancellation_token(arg: int | bool | None = None) -> CancellationToken:
cancelled = arg if isinstance(arg, bool) else False
token = CancellationToken(cancelled)
if isinstance(arg, int):
Expand All @@ -81,7 +80,6 @@ def is_cancellation_requested(token: CancellationToken) -> bool:
def sleep(millisecondsDueTime: int) -> Async[None]:
def cont(ctx: IAsyncContext[None]):
def cancel():
timer.cancel()
ctx.on_cancel(OperationCanceledError())

token_id = ctx.cancel_token.add_listener(cancel)
Expand All @@ -90,32 +88,31 @@ def timeout():
ctx.cancel_token.remove_listener(token_id)
ctx.on_success(None)

timer = Timer(millisecondsDueTime / 1000.0, timeout)
timer.start()
due_time = millisecondsDueTime / 1000.0
ctx.trampoline.run_later(timeout, due_time)

return protected_cont(cont)


def ignore(computation: Async[Any]) -> Async[None]:
def binder(_: Optional[Any] = None) -> Async[None]:
def binder(_: Any | None = None) -> Async[None]:
return protected_return(None)

return protected_bind(computation, binder)


def parallel(computations: Iterable[Async[_T]]) -> Async[List[_T]]:
def delayed() -> Async[List[_T]]:
def parallel(computations: Iterable[Async[_T]]) -> Async[list[_T]]:
def delayed() -> Async[list[_T]]:
tasks: Iterable[Future[_T]] = map(start_as_task, computations) # type: ignore
all: Future[List[_T]] = asyncio.gather(*tasks)

all: Future[list[_T]] = asyncio.gather(*tasks)
return await_task(all)

return delay(delayed)


def sequential(computations: Iterable[Async[_T]]) -> Async[List[Optional[_T]]]:
def delayed() -> Async[List[Optional[_T]]]:
results: List[_T] = []
def sequential(computations: Iterable[Async[_T]]) -> Async[list[_T | None]]:
def delayed() -> Async[list[_T | None]]:
results: list[_T] = []

def _arrow20(_arg: Async[_T]) -> Async[None]:
cmp: Async[_T] = _arg
Expand All @@ -127,7 +124,7 @@ def _arrow19(_arg_1: _T) -> Async[None]:

return singleton.Bind(cmp, _arrow19)

def _arrow21(__unit: Literal[None] = None) -> Async[List[_T]]:
def _arrow21(__unit: Literal[None] = None) -> Async[list[_T]]:
return singleton.Return(results)

return singleton.Combine(
Expand Down Expand Up @@ -189,17 +186,15 @@ def callback(conts: Continuations[_T]) -> None:
continuation = conts

task.add_done_callback(done)
return from_continuations(callback) # type: ignore
return from_continuations(callback)


def start_with_continuations(
computation: Async[_T],
continuation: Optional[Callable[[_T], None]] = None,
exception_continuation: Optional[Callable[[Exception], None]] = None,
cancellation_continuation: Optional[
Callable[[OperationCanceledError], None]
] = None,
cancellation_token: Optional[CancellationToken] = None,
continuation: Callable[[_T], None] | None = None,
exception_continuation: Callable[[Exception], None] | None = None,
cancellation_continuation: Callable[[OperationCanceledError], None] | None = None,
cancellation_token: CancellationToken | None = None,
) -> None:
"""Runs an asynchronous computation.

Expand All @@ -223,7 +218,7 @@ def start_with_continuations(


def start_as_task(
computation: Async[_T], cancellation_token: Optional[CancellationToken] = None
computation: Async[_T], cancellation_token: CancellationToken | None = None
) -> Awaitable[_T]:
"""Executes a computation in the thread pool.

Expand Down Expand Up @@ -251,24 +246,63 @@ def cancel(_: OperationCanceledError) -> None:
return tcs.get_task()


def start_child(computation: Async[_T], ms: int | None = None) -> Async[Async[_T]]:
if ms:
computation_with_timeout = protected_bind(
parallel(computation, throw_after(ms)), lambda xs: protected_return(xs[0])
)
return start_child(computation_with_timeout)

task = start_as_task(computation)

def cont(ctx: IAsyncContext[Async[_T]]) -> None:
def on_success(_: Async[_T]) -> None:
ctx.on_success(await_task(task))

on_error = ctx.on_error
on_cancel = ctx.on_cancel
trampoline = ctx.trampoline
cancel_token = ctx.cancel_token

ctx_ = IAsyncContext.create(
on_success, on_error, on_cancel, trampoline, cancel_token
)
computation(ctx_)

return protected_cont(cont)


def start_immediate(
computation: Async[Any],
cancellation_token: Optional[CancellationToken] = None,
cancellation_token: CancellationToken | None = None,
) -> None:
"""Start computation immediately.

Runs an asynchronous computation, starting immediately on the
current operating system thread
"""
return start_with_continuations(computation, cancellation_token=cancellation_token)
try:
asyncio.get_event_loop()
except RuntimeError:

async def runner() -> None:
return start_with_continuations(
computation, cancellation_token=cancellation_token
)

return asyncio.run(runner())
else:
return start_with_continuations(
computation, cancellation_token=cancellation_token
)


_executor: Optional[ThreadPoolExecutor] = None
_executor: ThreadPoolExecutor | None = None


def start(
computation: Callable[[IAsyncContext[Any]], None],
cancellation_token: Optional[CancellationToken] = None,
cancellation_token: CancellationToken | None = None,
) -> None:
"""Starts the asynchronous computation.

Expand All @@ -290,16 +324,16 @@ def worker() -> None:

def run_synchronously(
computation: Async[_T],
cancellation_token: Optional[CancellationToken] = None,
) -> Optional[_T]:
cancellation_token: CancellationToken | None = None,
) -> _T | None:
"""Run computation synchronously.

Runs an asynchronous computation and awaits its result on the
calling thread. Propagates an exception should the computation yield
one. This call is blocking.
"""

async def runner() -> Optional[_T]:
async def runner() -> _T | None:
return await start_as_task(computation, cancellation_token=cancellation_token)

return asyncio.run(runner())
Expand All @@ -312,11 +346,16 @@ async def runner() -> Optional[_T]:
"cancellation_token",
"catch_async",
"create_cancellation_token",
"delay",
"from_continuations",
"ignore",
"is_cancellation_requested",
"parallel",
"sequential",
"sleep",
"start",
"start_as_task",
"start_child",
"start_immediate",
"start_with_continuations",
]
Loading
Loading