Skip to content

Commit

Permalink
Fix tail calls for latests Expression release
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli committed Nov 28, 2020
1 parent 32d0449 commit 468134d
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 58 deletions.
43 changes: 31 additions & 12 deletions aioreactive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ def from_iterable(cls, iter: Iterable[TSource]) -> "AsyncRx[TSource]":

@classmethod
def from_async_iterable(cls, iter: AsyncIterable[TSource]) -> "AsyncObservable[TSource]":
"""Convert an async iterable to an async observable stream.
Example:
>>> xs = AsyncRx.from_async_iterable(async_iterable)
Returns:
The source stream whose elements are pulled from the given
(async) iterable sequence.
"""

return AsyncRx(from_async_iterable(iter))

@classmethod
Expand Down Expand Up @@ -169,18 +179,16 @@ def concat(self, other: AsyncObservable[TSource]) -> AsyncObservable[TSource]:
return concat_seq([self, other])

def debounce(self, seconds: float) -> "AsyncRx[TSource]":
"""Debounce observable source.
Ignores values from a source stream which are followed by
another value before seconds has elapsed.
"""Debounce observable stream.
Example:
partial = debounce(5) # 5 seconds
Ignores values from an observable sequence which are followed by
another value before the given timeout.
Keyword arguments:
seconds -- Duration of the throttle period for each value
Args:
seconds (float): Number of seconds to debounce.
Returns partially applied function that takes a source sequence.
Returns:
The debounced stream.
"""

from .timeshift import debounce
Expand Down Expand Up @@ -322,7 +330,8 @@ def take(self, count: int) -> AsyncObservable[TSource]:
count Number of elements to take.
Returns:
Stream[TSource, TSource]: [description]
An observable sequence that contains the specified number of
elements from the start of the input sequence.
"""
from .filtering import take

Expand Down Expand Up @@ -432,7 +441,7 @@ def debounce(seconds: float) -> Stream[TSource, TSource]:
value before seconds has elapsed.
Example:
>>> partial = debounce(5) # 5 seconds
>>> ys = pipe(xs, debounce(5)) # 5 seconds
Args:
seconds: Duration of the throttle period for each value
Expand Down Expand Up @@ -618,6 +627,15 @@ def flat_map_latest_async(mapper: Callable[[TSource], Awaitable[AsyncObservable[


def from_async_iterable(iter: AsyncIterable[TSource]) -> "AsyncObservable[TSource]":
"""Convert an async iterable to an async observable stream.
Example:
>>> xs = rx.from_async_iterable(async_iterable)
Returns:
The source stream whose elements are pulled from the given
(async) iterable sequence.
"""
from .create import of_async_iterable

return AsyncRx(of_async_iterable(iter))
Expand Down Expand Up @@ -783,7 +801,8 @@ def take(count: int) -> Stream[TSource, TSource]:
count Number of elements to take.
Returns:
Stream[TSource, TSource]: [description]
An observable sequence that contains the specified number of
elements from the start of the input sequence.
"""
from .filtering import take

Expand Down
18 changes: 14 additions & 4 deletions aioreactive/combine.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
import dataclasses
import logging
from dataclasses import dataclass
from typing import Any, Callable, Generic, Iterable, Tuple, TypeVar, cast
from typing import Any, Callable, Generic, Iterable, NoReturn, Tuple, TypeVar, cast

from expression.collections import FrozenList, Map, frozenlist, map
from expression.core import MailboxProcessor, Nothing, Option, Result, Some, TailCall, match, pipe, tailrec_async
from expression.core import (
MailboxProcessor,
Nothing,
Option,
TailCallResult,
Some,
TailCall,
match,
pipe,
tailrec_async,
)
from expression.system import AsyncDisposable

from .create import of_seq
Expand Down Expand Up @@ -171,7 +181,7 @@ async def worker(inbox: MailboxProcessor[Msg]) -> None:
@tailrec_async
async def message_loop(
source_value: Option[TSource], other_value: Option[TOther]
) -> Result[AsyncObservable[TSource], Exception]:
) -> TailCallResult[NoReturn]:
cn = await inbox.receive()

async def get_value(n: Notification[TSource]) -> Option[TSource]:
Expand Down Expand Up @@ -246,7 +256,7 @@ async def subscribe_async(aobv: AsyncObserver[Tuple[TSource, TOther]]) -> AsyncD

async def worker(inbox: MailboxProcessor[Msg]) -> None:
@tailrec_async
async def message_loop(latest: Option[TOther]) -> Result[TSource, Exception]:
async def message_loop(latest: Option[TOther]) -> TailCallResult[NoReturn]:
cn = await inbox.receive()

async def get_value(n: Notification[TSource]) -> Option[TSource]:
Expand Down
11 changes: 3 additions & 8 deletions aioreactive/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from asyncio import Future
from typing import AsyncIterable, Awaitable, Callable, Iterable, Optional, Tuple, TypeVar

from expression.core import Ok, Result, aiotools, tailrec_async
from expression.core import TailCallResult, aiotools, tailrec_async
from expression.core.fn import TailCall
from expression.system import AsyncDisposable, CancellationToken, CancellationTokenSource

Expand Down Expand Up @@ -70,11 +70,6 @@ async def worker(obv: AsyncObserver[TSource], _: CancellationToken) -> None:


def of_async_iterable(iterable: AsyncIterable[TSource]) -> AsyncObservable[TSource]:
"""Convert an async iterable to a source stream.
2 - xs = from_async_iterable(async_iterable)
Returns the source stream whose elements are pulled from the
given (async) iterable sequence."""

async def subscribe_async(observer: AsyncObserver[TSource]) -> AsyncDisposable:
task: Optional[Future[TSource]] = None

Expand Down Expand Up @@ -192,13 +187,13 @@ async def subscribe_async(aobv: AsyncObserver[int]) -> AsyncDisposable:
cancel, token = canceller()

@tailrec_async
async def handler(seconds: float, next: int) -> Result[None, Exception]:
async def handler(seconds: float, next: int) -> TailCallResult[None]:
await asyncio.sleep(seconds)
await aobv.asend(next)

if not period:
await aobv.aclose()
return Ok(None)
return None

return TailCall(period, next + 1)

Expand Down
18 changes: 3 additions & 15 deletions aioreactive/filtering.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from typing import Any, Awaitable, Callable, List, Optional, Tuple, TypeVar, overload
from typing import Any, Awaitable, Callable, List, NoReturn, Optional, Tuple, TypeVar, overload

from expression.collections import seq
from expression.core import (
MailboxProcessor,
Option,
Result,
TailCallResult,
TailCall,
aiotools,
compose,
Expand Down Expand Up @@ -122,7 +122,7 @@ async def subscribe_async(aobv: AsyncObserver[TSource]) -> AsyncDisposable:

async def worker(inbox: MailboxProcessor[Notification[TSource]]) -> None:
@tailrec_async
async def message_loop(latest: Notification[TSource]) -> Result[Notification[TSource], Exception]:
async def message_loop(latest: Notification[TSource]) -> TailCallResult[NoReturn]:
n = await inbox.receive()

async def get_latest() -> Notification[TSource]:
Expand Down Expand Up @@ -236,18 +236,6 @@ async def asend(value: TSource) -> None:


def take(count: int) -> Stream[TSource, TSource]:
"""Take the first elements from the stream.
Returns a specified number of contiguous elements from the start of
an observable sequence.
Args:
count Number of elements to take.
Returns:
Stream[TSource, TSource]: [description]
"""

if count < 0:
raise ValueError("Count cannot be negative.")

Expand Down
4 changes: 1 addition & 3 deletions aioreactive/observables.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,4 @@ def __aiter__(self) -> AsyncIterator[TSource]:
An async iterator.
"""

obv: AsyncIteratorObserver[TSource] = AsyncIteratorObserver(self)

return obv
return AsyncIteratorObserver(self)
29 changes: 13 additions & 16 deletions aioreactive/timeshift.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Iterable, Tuple, TypeVar
from typing import Iterable, NoReturn, Tuple, TypeVar

from expression.collections import seq
from expression.core import MailboxProcessor, Result, TailCall, aiotools, match, pipe, tailrec_async, snd
from expression.core import (
MailboxProcessor,
TailCall,
aiotools,
match,
pipe,
tailrec_async,
snd,
TailCallResult,
)
from expression.system import CancellationTokenSource

from .combine import with_latest_from
Expand Down Expand Up @@ -38,7 +47,7 @@ def _delay(source: AsyncObservable[TSource]) -> AsyncObservable[TSource]:
async def subscribe_async(aobv: AsyncObserver[TSource]) -> AsyncDisposable:
async def worker(inbox: MailboxProcessor[Tuple[Notification[TSource], datetime]]) -> None:
@tailrec_async
async def loop() -> Result[None, Exception]:
async def loop() -> TailCallResult[NoReturn]:
ns, due_time = await inbox.receive()

diff = due_time - datetime.utcnow()
Expand Down Expand Up @@ -84,26 +93,14 @@ async def cancel() -> None:


def debounce(seconds: float) -> Stream[TSource, TSource]:
"""Debounce observable stream.
Ignores values from an observable sequence which are followed by
another value before the given timeout.
Args:
seconds (float): Number of seconds to debounce.
Returns:
The debounced stream.
"""

def _debounce(source: AsyncObservable[TSource]) -> AsyncObservable[TSource]:
async def subscribe_async(aobv: AsyncObserver[TSource]) -> AsyncDisposable:
safe_obv, auto_detach = auto_detach_observer(aobv)
infinite: Iterable[int] = seq.infinite()

async def worker(inbox: MailboxProcessor[Tuple[Notification[TSource], int]]) -> None:
@tailrec_async
async def message_loop(current_index: int) -> Result[TSource, Exception]:
async def message_loop(current_index: int) -> TailCallResult[NoReturn]:
n, index = await inbox.receive()

with match(n) as m:
Expand Down

0 comments on commit 468134d

Please sign in to comment.