diff --git a/aioreactive/create.py b/aioreactive/create.py index 7f6ef60..7d442ec 100644 --- a/aioreactive/create.py +++ b/aioreactive/create.py @@ -1,7 +1,7 @@ import asyncio import logging from asyncio import Future -from typing import Any, AsyncIterable, Awaitable, Callable, Iterable, Optional, Tuple, TypeVar +from typing import Any, AsyncIterable, Awaitable, Callable, Iterable, Optional, Tuple, TypeVar, cast from expression.core import TailCallResult, aiotools, tailrec_async from expression.core.fn import TailCall @@ -66,7 +66,8 @@ async def worker(obv: AsyncObserver[TSource], _: CancellationToken) -> None: finally: await obv.aclose() - return of_async_worker(worker) + ret = of_async_worker(worker) + return cast(AsyncObservable[TSource], ret) # NOTE: pyright issue def of_async_iterable(iterable: AsyncIterable[TSource]) -> AsyncObservable[TSource]: @@ -160,7 +161,8 @@ async def worker(obv: AsyncObserver[TSource], token: CancellationToken) -> None: await obv.aclose() - return of_async_worker(worker) + ret = of_async_worker(worker) + return cast(AsyncObservable[TSource], ret) # NOTE: pyright issue def defer(factory: Callable[[], AsyncObservable[TSource]]) -> AsyncObservable[TSource]: diff --git a/aioreactive/msg.py b/aioreactive/msg.py index a4a3709..708a3aa 100644 --- a/aioreactive/msg.py +++ b/aioreactive/msg.py @@ -2,7 +2,7 @@ """ from abc import ABC from dataclasses import dataclass -from typing import Any, Iterable, NewType, Type, TypeVar, get_origin +from typing import Any, Iterable, NewType, TypeVar, get_origin from expression.core import SupportsMatch from expression.system import AsyncDisposable diff --git a/aioreactive/observers.py b/aioreactive/observers.py index 09dc610..89be5f0 100644 --- a/aioreactive/observers.py +++ b/aioreactive/observers.py @@ -295,7 +295,7 @@ async def aclose(self) -> None: self._is_stopped = True if self._has_value: - self.set_result(cast(TSource, self._last_value)) + self.set_result(cast("TSource", self._last_value)) else: self.cancel() await self._aclose() diff --git a/aioreactive/timeshift.py b/aioreactive/timeshift.py index ff48c0e..aa0f577 100644 --- a/aioreactive/timeshift.py +++ b/aioreactive/timeshift.py @@ -4,7 +4,7 @@ from typing import Iterable, NoReturn, Tuple, TypeVar from expression.collections import seq -from expression.core import MailboxProcessor, TailCall, TailCallResult, aiotools, match, pipe, snd, tailrec_async +from expression.core import MailboxProcessor, TailCall, TailCallResult, aiotools, match, pipe, fst, tailrec_async from expression.system import CancellationTokenSource from .combine import with_latest_from @@ -152,8 +152,12 @@ def _sample(source: AsyncObservable[TSource]) -> AsyncObservable[TSource]: timer = interval(seconds, seconds) if seconds > 0: - return pipe(source, with_latest_from(timer), map(snd)) - else: - return source + return pipe( + source, + with_latest_from(timer), + map(fst), + ) + + return source return _sample diff --git a/aioreactive/transform.py b/aioreactive/transform.py index a75a602..31333b8 100644 --- a/aioreactive/transform.py +++ b/aioreactive/transform.py @@ -69,7 +69,7 @@ def starmap_async(mapper: Callable[[TSource, int], Awaitable[TResult]]) -> Strea ... -def starmap_async(amapper: Callable[..., Awaitable[Any]]) -> Stream[TSource, Any]: +def starmap_async(amapper: Callable[..., Awaitable[TResult]]) -> Stream[Tuple[TSource, ...], TResult]: """Map async spreading arguments to the async mapper. Returns an observable sequence whose elements are the result of @@ -96,11 +96,11 @@ def handler(next: Callable[[TResult], Awaitable[None]], x: TSource) -> Awaitable @overload -def starmap(mapper: Callable[[TSource, int], TResult]) -> Stream[TSource, TResult]: +def starmap(mapper: Callable[[TSource, int], TResult]) -> Stream[Tuple[TSource, int], TResult]: ... -def starmap(mapper: Callable[..., Any]) -> Stream[TSource, Any]: +def starmap(mapper: Callable[..., TResult]) -> Stream[Tuple[TSource, ...], TResult]: """Map and spread the arguments to the mapper. Returns an observable sequence whose elements are the result of @@ -291,8 +291,12 @@ async def message_loop( inner_agent = MailboxProcessor.start(worker) - async def asend(xs: TSource) -> None: - pipe(xs, InnerObservableMsg, inner_agent.post) + async def asend(xs: AsyncObservable[TSource]) -> None: + pipe( + xs, + InnerObservableMsg, + inner_agent.post, + ) async def athrow(error: Exception) -> None: await safe_obv.athrow(error) diff --git a/examples/timeflies/timeflies.py b/examples/timeflies/timeflies.py index 9367dfb..9e92b5c 100644 --- a/examples/timeflies/timeflies.py +++ b/examples/timeflies/timeflies.py @@ -1,5 +1,4 @@ import asyncio -import logging import signal import sys from tkinter import Event, Frame, Label, Misc, Tk