From 468134dc68cbe309498e2571bfd14b6016fccdc1 Mon Sep 17 00:00:00 2001 From: Dag Brattli Date: Sat, 28 Nov 2020 09:02:44 +0100 Subject: [PATCH] Fix tail calls for latests Expression release --- aioreactive/__init__.py | 43 +++++++++++++++++++++++++++----------- aioreactive/combine.py | 18 ++++++++++++---- aioreactive/create.py | 11 +++------- aioreactive/filtering.py | 18 +++------------- aioreactive/observables.py | 4 +--- aioreactive/timeshift.py | 29 ++++++++++++------------- 6 files changed, 65 insertions(+), 58 deletions(-) diff --git a/aioreactive/__init__.py b/aioreactive/__init__.py index 45feb1f..f604c4c 100644 --- a/aioreactive/__init__.py +++ b/aioreactive/__init__.py @@ -115,6 +115,16 @@ def from_iterable(cls, iter: Iterable[TSource]) -> "AsyncRx[TSource]": @classmethod def from_async_iterable(cls, iter: AsyncIterable[TSource]) -> "AsyncObservable[TSource]": + """Convert an async iterable to an async observable stream. + + Example: + >>> xs = AsyncRx.from_async_iterable(async_iterable) + + Returns: + The source stream whose elements are pulled from the given + (async) iterable sequence. + """ + return AsyncRx(from_async_iterable(iter)) @classmethod @@ -169,18 +179,16 @@ def concat(self, other: AsyncObservable[TSource]) -> AsyncObservable[TSource]: return concat_seq([self, other]) def debounce(self, seconds: float) -> "AsyncRx[TSource]": - """Debounce observable source. - - Ignores values from a source stream which are followed by - another value before seconds has elapsed. + """Debounce observable stream. - Example: - partial = debounce(5) # 5 seconds + Ignores values from an observable sequence which are followed by + another value before the given timeout. - Keyword arguments: - seconds -- Duration of the throttle period for each value + Args: + seconds (float): Number of seconds to debounce. - Returns partially applied function that takes a source sequence. + Returns: + The debounced stream. """ from .timeshift import debounce @@ -322,7 +330,8 @@ def take(self, count: int) -> AsyncObservable[TSource]: count Number of elements to take. Returns: - Stream[TSource, TSource]: [description] + An observable sequence that contains the specified number of + elements from the start of the input sequence. """ from .filtering import take @@ -432,7 +441,7 @@ def debounce(seconds: float) -> Stream[TSource, TSource]: value before seconds has elapsed. Example: - >>> partial = debounce(5) # 5 seconds + >>> ys = pipe(xs, debounce(5)) # 5 seconds Args: seconds: Duration of the throttle period for each value @@ -618,6 +627,15 @@ def flat_map_latest_async(mapper: Callable[[TSource], Awaitable[AsyncObservable[ def from_async_iterable(iter: AsyncIterable[TSource]) -> "AsyncObservable[TSource]": + """Convert an async iterable to an async observable stream. + + Example: + >>> xs = rx.from_async_iterable(async_iterable) + + Returns: + The source stream whose elements are pulled from the given + (async) iterable sequence. + """ from .create import of_async_iterable return AsyncRx(of_async_iterable(iter)) @@ -783,7 +801,8 @@ def take(count: int) -> Stream[TSource, TSource]: count Number of elements to take. Returns: - Stream[TSource, TSource]: [description] + An observable sequence that contains the specified number of + elements from the start of the input sequence. """ from .filtering import take diff --git a/aioreactive/combine.py b/aioreactive/combine.py index a18291b..de6eda9 100644 --- a/aioreactive/combine.py +++ b/aioreactive/combine.py @@ -1,10 +1,20 @@ import dataclasses import logging from dataclasses import dataclass -from typing import Any, Callable, Generic, Iterable, Tuple, TypeVar, cast +from typing import Any, Callable, Generic, Iterable, NoReturn, Tuple, TypeVar, cast from expression.collections import FrozenList, Map, frozenlist, map -from expression.core import MailboxProcessor, Nothing, Option, Result, Some, TailCall, match, pipe, tailrec_async +from expression.core import ( + MailboxProcessor, + Nothing, + Option, + TailCallResult, + Some, + TailCall, + match, + pipe, + tailrec_async, +) from expression.system import AsyncDisposable from .create import of_seq @@ -171,7 +181,7 @@ async def worker(inbox: MailboxProcessor[Msg]) -> None: @tailrec_async async def message_loop( source_value: Option[TSource], other_value: Option[TOther] - ) -> Result[AsyncObservable[TSource], Exception]: + ) -> TailCallResult[NoReturn]: cn = await inbox.receive() async def get_value(n: Notification[TSource]) -> Option[TSource]: @@ -246,7 +256,7 @@ async def subscribe_async(aobv: AsyncObserver[Tuple[TSource, TOther]]) -> AsyncD async def worker(inbox: MailboxProcessor[Msg]) -> None: @tailrec_async - async def message_loop(latest: Option[TOther]) -> Result[TSource, Exception]: + async def message_loop(latest: Option[TOther]) -> TailCallResult[NoReturn]: cn = await inbox.receive() async def get_value(n: Notification[TSource]) -> Option[TSource]: diff --git a/aioreactive/create.py b/aioreactive/create.py index 4de6dc3..f7cc63c 100644 --- a/aioreactive/create.py +++ b/aioreactive/create.py @@ -3,7 +3,7 @@ from asyncio import Future from typing import AsyncIterable, Awaitable, Callable, Iterable, Optional, Tuple, TypeVar -from expression.core import Ok, Result, aiotools, tailrec_async +from expression.core import TailCallResult, aiotools, tailrec_async from expression.core.fn import TailCall from expression.system import AsyncDisposable, CancellationToken, CancellationTokenSource @@ -70,11 +70,6 @@ async def worker(obv: AsyncObserver[TSource], _: CancellationToken) -> None: def of_async_iterable(iterable: AsyncIterable[TSource]) -> AsyncObservable[TSource]: - """Convert an async iterable to a source stream. - 2 - xs = from_async_iterable(async_iterable) - Returns the source stream whose elements are pulled from the - given (async) iterable sequence.""" - async def subscribe_async(observer: AsyncObserver[TSource]) -> AsyncDisposable: task: Optional[Future[TSource]] = None @@ -192,13 +187,13 @@ async def subscribe_async(aobv: AsyncObserver[int]) -> AsyncDisposable: cancel, token = canceller() @tailrec_async - async def handler(seconds: float, next: int) -> Result[None, Exception]: + async def handler(seconds: float, next: int) -> TailCallResult[None]: await asyncio.sleep(seconds) await aobv.asend(next) if not period: await aobv.aclose() - return Ok(None) + return None return TailCall(period, next + 1) diff --git a/aioreactive/filtering.py b/aioreactive/filtering.py index d506e20..a758225 100644 --- a/aioreactive/filtering.py +++ b/aioreactive/filtering.py @@ -1,10 +1,10 @@ -from typing import Any, Awaitable, Callable, List, Optional, Tuple, TypeVar, overload +from typing import Any, Awaitable, Callable, List, NoReturn, Optional, Tuple, TypeVar, overload from expression.collections import seq from expression.core import ( MailboxProcessor, Option, - Result, + TailCallResult, TailCall, aiotools, compose, @@ -122,7 +122,7 @@ async def subscribe_async(aobv: AsyncObserver[TSource]) -> AsyncDisposable: async def worker(inbox: MailboxProcessor[Notification[TSource]]) -> None: @tailrec_async - async def message_loop(latest: Notification[TSource]) -> Result[Notification[TSource], Exception]: + async def message_loop(latest: Notification[TSource]) -> TailCallResult[NoReturn]: n = await inbox.receive() async def get_latest() -> Notification[TSource]: @@ -236,18 +236,6 @@ async def asend(value: TSource) -> None: def take(count: int) -> Stream[TSource, TSource]: - """Take the first elements from the stream. - - Returns a specified number of contiguous elements from the start of - an observable sequence. - - Args: - count Number of elements to take. - - Returns: - Stream[TSource, TSource]: [description] - """ - if count < 0: raise ValueError("Count cannot be negative.") diff --git a/aioreactive/observables.py b/aioreactive/observables.py index 22c6f28..5ae4293 100644 --- a/aioreactive/observables.py +++ b/aioreactive/observables.py @@ -49,6 +49,4 @@ def __aiter__(self) -> AsyncIterator[TSource]: An async iterator. """ - obv: AsyncIteratorObserver[TSource] = AsyncIteratorObserver(self) - - return obv + return AsyncIteratorObserver(self) diff --git a/aioreactive/timeshift.py b/aioreactive/timeshift.py index 60c96be..1df3521 100644 --- a/aioreactive/timeshift.py +++ b/aioreactive/timeshift.py @@ -1,10 +1,19 @@ import asyncio import logging from datetime import datetime, timedelta -from typing import Iterable, Tuple, TypeVar +from typing import Iterable, NoReturn, Tuple, TypeVar from expression.collections import seq -from expression.core import MailboxProcessor, Result, TailCall, aiotools, match, pipe, tailrec_async, snd +from expression.core import ( + MailboxProcessor, + TailCall, + aiotools, + match, + pipe, + tailrec_async, + snd, + TailCallResult, +) from expression.system import CancellationTokenSource from .combine import with_latest_from @@ -38,7 +47,7 @@ def _delay(source: AsyncObservable[TSource]) -> AsyncObservable[TSource]: async def subscribe_async(aobv: AsyncObserver[TSource]) -> AsyncDisposable: async def worker(inbox: MailboxProcessor[Tuple[Notification[TSource], datetime]]) -> None: @tailrec_async - async def loop() -> Result[None, Exception]: + async def loop() -> TailCallResult[NoReturn]: ns, due_time = await inbox.receive() diff = due_time - datetime.utcnow() @@ -84,18 +93,6 @@ async def cancel() -> None: def debounce(seconds: float) -> Stream[TSource, TSource]: - """Debounce observable stream. - - Ignores values from an observable sequence which are followed by - another value before the given timeout. - - Args: - seconds (float): Number of seconds to debounce. - - Returns: - The debounced stream. - """ - def _debounce(source: AsyncObservable[TSource]) -> AsyncObservable[TSource]: async def subscribe_async(aobv: AsyncObserver[TSource]) -> AsyncDisposable: safe_obv, auto_detach = auto_detach_observer(aobv) @@ -103,7 +100,7 @@ async def subscribe_async(aobv: AsyncObserver[TSource]) -> AsyncDisposable: async def worker(inbox: MailboxProcessor[Tuple[Notification[TSource], int]]) -> None: @tailrec_async - async def message_loop(current_index: int) -> Result[TSource, Exception]: + async def message_loop(current_index: int) -> TailCallResult[NoReturn]: n, index = await inbox.receive() with match(n) as m: