From 3d6815119ca1ec989f704f626530f938c857a8e5 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Fri, 9 Dec 2022 17:12:57 +0000 Subject: [PATCH] Upgrade uasyncio release used in tests --- libs/micropython/uasyncio/core.py | 61 +++++++----- libs/micropython/uasyncio/event.py | 20 ++-- libs/micropython/uasyncio/funcs.py | 129 ++++++++++++++++++-------- libs/micropython/uasyncio/lock.py | 6 +- libs/micropython/uasyncio/manifest.py | 20 ++-- libs/micropython/uasyncio/stream.py | 58 ++++++++---- libs/micropython/uasyncio/task.py | 55 +++++------ 7 files changed, 222 insertions(+), 127 deletions(-) diff --git a/libs/micropython/uasyncio/core.py b/libs/micropython/uasyncio/core.py index d74763f..10a3108 100644 --- a/libs/micropython/uasyncio/core.py +++ b/libs/micropython/uasyncio/core.py @@ -41,7 +41,7 @@ def __iter__(self): def __next__(self): if self.state is not None: - _task_queue.push_sorted(cur_task, self.state) + _task_queue.push(cur_task, self.state) self.state = None return None else: @@ -115,11 +115,11 @@ def wait_io_event(self, dt): # print('poll', s, sm, ev) if ev & ~select.POLLOUT and sm[0] is not None: # POLLIN or error - _task_queue.push_head(sm[0]) + _task_queue.push(sm[0]) sm[0] = None if ev & ~select.POLLIN and sm[1] is not None: # POLLOUT or error - _task_queue.push_head(sm[1]) + _task_queue.push(sm[1]) sm[1] = None if sm[0] is None and sm[1] is None: self._dequeue(s) @@ -142,7 +142,7 @@ def create_task(coro): if not hasattr(coro, "send"): raise TypeError("coroutine expected") t = Task(coro, globals()) - _task_queue.push_head(t) + _task_queue.push(t) return t @@ -167,7 +167,7 @@ def run_until_complete(main_task=None): _io_queue.wait_io_event(dt) # Get next task to run and continue it - t = _task_queue.pop_head() + t = _task_queue.pop() cur_task = t try: # Continue running the coroutine, it's responsible for rescheduling itself @@ -175,6 +175,10 @@ def run_until_complete(main_task=None): if not exc: t.coro.send(None) else: + # If the task is finished and on the run queue and gets here, then it + # had an exception and was not await'ed on. Throwing into it now will + # raise StopIteration and the code below will catch this and run the + # call_exception_handler function. t.data = None t.coro.throw(exc) except excs_all as er: @@ -185,22 +189,37 @@ def run_until_complete(main_task=None): if isinstance(er, StopIteration): return er.value raise er - # Schedule any other tasks waiting on the completion of this task - waiting = False - if hasattr(t, "waiting"): - while t.waiting.peek(): - _task_queue.push_head(t.waiting.pop_head()) + if t.state: + # Task was running but is now finished. + waiting = False + if t.state is True: + # "None" indicates that the task is complete and not await'ed on (yet). + t.state = None + elif callable(t.state): + # The task has a callback registered to be called on completion. + t.state(t, er) + t.state = False waiting = True - t.waiting = None # Free waiting queue head - if not waiting and not isinstance(er, excs_stop): - # An exception ended this detached task, so queue it for later - # execution to handle the uncaught exception if no other task retrieves - # the exception in the meantime (this is handled by Task.throw). - _task_queue.push_head(t) - # Indicate task is done by setting coro to the task object itself - t.coro = t - # Save return value of coro to pass up to caller - t.data = er + else: + # Schedule any other tasks waiting on the completion of this task. + while t.state.peek(): + _task_queue.push(t.state.pop()) + waiting = True + # "False" indicates that the task is complete and has been await'ed on. + t.state = False + if not waiting and not isinstance(er, excs_stop): + # An exception ended this detached task, so queue it for later + # execution to handle the uncaught exception if no other task retrieves + # the exception in the meantime (this is handled by Task.throw). + _task_queue.push(t) + # Save return value of coro to pass up to caller. + t.data = er + elif t.state is None: + # Task is already finished and nothing await'ed on the task, + # so call the exception handler. + _exc_context["exception"] = exc + _exc_context["future"] = t + Loop.call_exception_handler(_exc_context) # Create a new task from a coroutine and run it until it finishes @@ -237,7 +256,7 @@ def run_until_complete(aw): def stop(): global _stop_task if _stop_task is not None: - _task_queue.push_head(_stop_task) + _task_queue.push(_stop_task) # If stop() is called again, do nothing _stop_task = None diff --git a/libs/micropython/uasyncio/event.py b/libs/micropython/uasyncio/event.py index c28ad1f..3b5e79d 100644 --- a/libs/micropython/uasyncio/event.py +++ b/libs/micropython/uasyncio/event.py @@ -17,7 +17,7 @@ def set(self): # Note: This must not be called from anything except the thread running # the asyncio loop (i.e. neither hard or soft IRQ, or a different thread). while self.waiting.peek(): - core._task_queue.push_head(self.waiting.pop_head()) + core._task_queue.push(self.waiting.pop()) self.state = True def clear(self): @@ -26,7 +26,7 @@ def clear(self): async def wait(self): if not self.state: # Event not set, put the calling task on the event's waiting queue - self.waiting.push_head(core.cur_task) + self.waiting.push(core.cur_task) # Set calling task's data to the event's queue so it can be removed if needed core.cur_task.data = self.waiting yield @@ -36,27 +36,29 @@ async def wait(self): # MicroPython-extension: This can be set from outside the asyncio event loop, # such as other threads, IRQs or scheduler context. Implementation is a stream # that asyncio will poll until a flag is set. -# Note: Unlike Event, this is self-clearing. +# Note: Unlike Event, this is self-clearing after a wait(). try: import uio class ThreadSafeFlag(uio.IOBase): def __init__(self): - self._flag = 0 + self.state = 0 def ioctl(self, req, flags): if req == 3: # MP_STREAM_POLL - return self._flag * flags + return self.state * flags return None def set(self): - self._flag = 1 + self.state = 1 + + def clear(self): + self.state = 0 async def wait(self): - if not self._flag: + if not self.state: yield core._io_queue.queue_read(self) - self._flag = 0 - + self.state = 0 except ImportError: pass diff --git a/libs/micropython/uasyncio/funcs.py b/libs/micropython/uasyncio/funcs.py index 93f4fd2..96883e4 100644 --- a/libs/micropython/uasyncio/funcs.py +++ b/libs/micropython/uasyncio/funcs.py @@ -1,49 +1,51 @@ # MicroPython uasyncio module -# MIT license; Copyright (c) 2019-2020 Damien P. George +# MIT license; Copyright (c) 2019-2022 Damien P. George from . import core +def _run(waiter, aw): + try: + result = await aw + status = True + except BaseException as er: + result = None + status = er + if waiter.data is None: + # The waiter is still waiting, cancel it. + if waiter.cancel(): + # Waiter was cancelled by us, change its CancelledError to an instance of + # CancelledError that contains the status and result of waiting on aw. + # If the wait_for task subsequently gets cancelled externally then this + # instance will be reset to a CancelledError instance without arguments. + waiter.data = core.CancelledError(status, result) + + async def wait_for(aw, timeout, sleep=core.sleep): aw = core._promote_to_task(aw) if timeout is None: return await aw - def runner(waiter, aw): - nonlocal status, result - try: - result = await aw - s = True - except BaseException as er: - s = er - if status is None: - # The waiter is still waiting, set status for it and cancel it. - status = s - waiter.cancel() - # Run aw in a separate runner task that manages its exceptions. - status = None - result = None - runner_task = core.create_task(runner(core.cur_task, aw)) + runner_task = core.create_task(_run(core.cur_task, aw)) try: # Wait for the timeout to elapse. await sleep(timeout) except core.CancelledError as er: - if status is True: - # aw completed successfully and cancelled the sleep, so return aw's result. - return result - elif status is None: + status = er.value + if status is None: # This wait_for was cancelled externally, so cancel aw and re-raise. - status = True runner_task.cancel() raise er + elif status is True: + # aw completed successfully and cancelled the sleep, so return aw's result. + return er.args[1] else: # aw raised an exception, propagate it out to the caller. raise status # The sleep finished before aw, so cancel aw and raise TimeoutError. - status = True runner_task.cancel() await runner_task raise core.TimeoutError @@ -53,22 +55,75 @@ def wait_for_ms(aw, timeout): return wait_for(aw, timeout, core.sleep_ms) +class _Remove: + @staticmethod + def remove(t): + pass + + async def gather(*aws, return_exceptions=False): + if not aws: + return [] + + def done(t, er): + # Sub-task "t" has finished, with exception "er". + nonlocal state + if gather_task.data is not _Remove: + # The main gather task has already been scheduled, so do nothing. + # This happens if another sub-task already raised an exception and + # woke the main gather task (via this done function), or if the main + # gather task was cancelled externally. + return + elif not return_exceptions and not isinstance(er, StopIteration): + # A sub-task raised an exception, indicate that to the gather task. + state = er + else: + state -= 1 + if state: + # Still some sub-tasks running. + return + # Gather waiting is done, schedule the main gather task. + core._task_queue.push(gather_task) + ts = [core._promote_to_task(aw) for aw in aws] for i in range(len(ts)): - try: - # TODO handle cancel of gather itself - # if ts[i].coro: - # iter(ts[i]).waiting.push_head(cur_task) - # try: - # yield - # except CancelledError as er: - # # cancel all waiting tasks - # raise er - ts[i] = await ts[i] - except Exception as er: - if return_exceptions: - ts[i] = er - else: - raise er + if ts[i].state is not True: + # Task is not running, gather not currently supported for this case. + raise RuntimeError("can't gather") + # Register the callback to call when the task is done. + ts[i].state = done + + # Set the state for execution of the gather. + gather_task = core.cur_task + state = len(ts) + cancel_all = False + + # Wait for the a sub-task to need attention. + gather_task.data = _Remove + try: + yield + except core.CancelledError as er: + cancel_all = True + state = er + + # Clean up tasks. + for i in range(len(ts)): + if ts[i].state is done: + # Sub-task is still running, deregister the callback and cancel if needed. + ts[i].state = True + if cancel_all: + ts[i].cancel() + elif isinstance(ts[i].data, StopIteration): + # Sub-task ran to completion, get its return value. + ts[i] = ts[i].data.value + else: + # Sub-task had an exception with return_exceptions==True, so get its exception. + ts[i] = ts[i].data + + # Either this gather was cancelled, or one of the sub-tasks raised an exception with + # return_exceptions==False, so reraise the exception here. + if state is not 0: + raise state + + # Return the list of return values of each sub-task. return ts diff --git a/libs/micropython/uasyncio/lock.py b/libs/micropython/uasyncio/lock.py index bddca29..f50213d 100644 --- a/libs/micropython/uasyncio/lock.py +++ b/libs/micropython/uasyncio/lock.py @@ -22,8 +22,8 @@ def release(self): raise RuntimeError("Lock not acquired") if self.waiting.peek(): # Task(s) waiting on lock, schedule next Task - self.state = self.waiting.pop_head() - core._task_queue.push_head(self.state) + self.state = self.waiting.pop() + core._task_queue.push(self.state) else: # No Task waiting so unlock self.state = 0 @@ -31,7 +31,7 @@ def release(self): async def acquire(self): if self.state != 0: # Lock unavailable, put the calling Task on the waiting queue - self.waiting.push_head(core.cur_task) + self.waiting.push(core.cur_task) # Set calling task's data to the lock's queue so it can be removed if needed core.cur_task.data = self.waiting try: diff --git a/libs/micropython/uasyncio/manifest.py b/libs/micropython/uasyncio/manifest.py index f5fa27b..d425a46 100644 --- a/libs/micropython/uasyncio/manifest.py +++ b/libs/micropython/uasyncio/manifest.py @@ -1,13 +1,15 @@ -# This list of frozen files doesn't include task.py because that's provided by the C module. -freeze( - "..", +# This list of package files doesn't include task.py because that's provided +# by the C module. +package( + "uasyncio", ( - "uasyncio/__init__.py", - "uasyncio/core.py", - "uasyncio/event.py", - "uasyncio/funcs.py", - "uasyncio/lock.py", - "uasyncio/stream.py", + "__init__.py", + "core.py", + "event.py", + "funcs.py", + "lock.py", + "stream.py", ), + base_path="..", opt=3, ) diff --git a/libs/micropython/uasyncio/stream.py b/libs/micropython/uasyncio/stream.py index 395ff1f..785e435 100644 --- a/libs/micropython/uasyncio/stream.py +++ b/libs/micropython/uasyncio/stream.py @@ -26,9 +26,21 @@ async def wait_closed(self): # TODO yield? self.s.close() - async def read(self, n): + async def read(self, n=-1): + r = b"" + while True: + yield core._io_queue.queue_read(self.s) + r2 = self.s.read(n) + if r2 is not None: + if n >= 0: + return r2 + if not len(r2): + return r + r += r2 + + async def readinto(self, buf): yield core._io_queue.queue_read(self.s) - return self.s.read(n) + return self.s.readinto(buf) async def readexactly(self, n): r = b"" @@ -52,9 +64,19 @@ async def readline(self): return l def write(self, buf): + if not self.out_buf: + # Try to write immediately to the underlying stream. + ret = self.s.write(buf) + if ret == len(buf): + return + if ret is not None: + buf = buf[ret:] self.out_buf += buf async def drain(self): + if not self.out_buf: + # Drain must always yield, so a tight loop of write+drain can't block the scheduler. + return await core.sleep_ms(0) mv = memoryview(self.out_buf) off = 0 while off < len(mv): @@ -75,8 +97,8 @@ async def open_connection(host, port): from uerrno import EINPROGRESS import usocket as socket - ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking! - s = socket.socket() + ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking! + s = socket.socket(ai[0], ai[1], ai[2]) s.setblocking(False) ss = Stream(s) try: @@ -103,16 +125,7 @@ def close(self): async def wait_closed(self): await self.task - async def _serve(self, cb, host, port, backlog): - import usocket as socket - - ai = socket.getaddrinfo(host, port)[0] # TODO this is blocking! - s = socket.socket() - s.setblocking(False) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.bind(ai[-1]) - s.listen(backlog) - self.task = core.cur_task + async def _serve(self, s, cb): # Accept incoming connections while True: try: @@ -134,9 +147,20 @@ async def _serve(self, cb, host, port, backlog): # Helper function to start a TCP stream server, running as a new task # TODO could use an accept-callback on socket read activity instead of creating a task async def start_server(cb, host, port, backlog=5): - s = Server() - core.create_task(s._serve(cb, host, port, backlog)) - return s + import usocket as socket + + # Create and bind server socket. + host = socket.getaddrinfo(host, port)[0] # TODO this is blocking! + s = socket.socket() + s.setblocking(False) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind(host[-1]) + s.listen(backlog) + + # Create and return server object and task. + srv = Server() + srv.task = core.create_task(srv._serve(s, cb)) + return srv ################################################################################ diff --git a/libs/micropython/uasyncio/task.py b/libs/micropython/uasyncio/task.py index 68ddf49..4ead2a1 100644 --- a/libs/micropython/uasyncio/task.py +++ b/libs/micropython/uasyncio/task.py @@ -99,19 +99,18 @@ def __init__(self): def peek(self): return self.heap - def push_sorted(self, v, key): + def push(self, v, key=None): + assert v.ph_child is None + assert v.ph_next is None v.data = None - v.ph_key = key - v.ph_child = None - v.ph_next = None + v.ph_key = key if key is not None else core.ticks() self.heap = ph_meld(v, self.heap) - def push_head(self, v): - self.push_sorted(v, core.ticks()) - - def pop_head(self): + def pop(self): v = self.heap - self.heap = ph_pairing(self.heap.ph_child) + assert v.ph_next is None + self.heap = ph_pairing(v.ph_child) + v.ph_child = None return v def remove(self, v): @@ -123,6 +122,7 @@ class Task: def __init__(self, coro, globals=None): self.coro = coro # Coroutine of this Task self.data = None # General data for queue it is waiting on + self.state = True # None, False, True, a callable, or a TaskQueue instance self.ph_key = 0 # Pairing heap self.ph_child = None # Paring heap self.ph_child_last = None # Paring heap @@ -130,30 +130,33 @@ def __init__(self, coro, globals=None): self.ph_rightmost_parent = None # Paring heap def __iter__(self): - if self.coro is self: - # Signal that the completed-task has been await'ed on. - self.waiting = None - elif not hasattr(self, "waiting"): - # Lazily allocated head of linked list of Tasks waiting on completion of this task. - self.waiting = TaskQueue() + if not self.state: + # Task finished, signal that is has been await'ed on. + self.state = False + elif self.state is True: + # Allocated head of linked list of Tasks waiting on completion of this task. + self.state = TaskQueue() + elif type(self.state) is not TaskQueue: + # Task has state used for another purpose, so can't also wait on it. + raise RuntimeError("can't wait") return self def __next__(self): - if self.coro is self: + if not self.state: # Task finished, raise return value to caller so it can continue. raise self.data else: # Put calling task on waiting queue. - self.waiting.push_head(core.cur_task) + self.state.push(core.cur_task) # Set calling task's data to this task that it waits on, to double-link it. core.cur_task.data = self def done(self): - return self.coro is self + return not self.state def cancel(self): # Check if task is already finished. - if self.coro is self: + if not self.state: return False # Can't cancel self (not supported yet). if self is core.cur_task: @@ -165,20 +168,10 @@ def cancel(self): if hasattr(self.data, "remove"): # Not on the main running queue, remove the task from the queue it's on. self.data.remove(self) - core._task_queue.push_head(self) + core._task_queue.push(self) elif core.ticks_diff(self.ph_key, core.ticks()) > 0: # On the main running queue but scheduled in the future, so bring it forward to now. core._task_queue.remove(self) - core._task_queue.push_head(self) + core._task_queue.push(self) self.data = core.CancelledError return True - - def throw(self, value): - # This task raised an exception which was uncaught; handle that now. - # Set the data because it was cleared by the main scheduling loop. - self.data = value - if not hasattr(self, "waiting"): - # Nothing await'ed on the task so call the exception handler. - core._exc_context["exception"] = value - core._exc_context["future"] = self - core.Loop.call_exception_handler(core._exc_context)