Skip to content

Commit

Permalink
Fix return type for fluent chaining
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli committed Sep 28, 2024
1 parent 91050fe commit be08567
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions aioreactive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def from_iterable(cls, iter: Iterable[_TOther]) -> AsyncRx[_TOther]:
return AsyncRx(from_iterable(iter))

@staticmethod
def from_async_iterable(iter: AsyncIterable[_TResult]) -> AsyncObservable[_TResult]:
def from_async_iterable(iter: AsyncIterable[_TResult]) -> AsyncRx[_TResult]:
"""Convert an async iterable to an async observable stream.
Example:
Expand All @@ -159,7 +159,7 @@ def single(cls, value: _TSource) -> AsyncRx[_TSource]:
def as_async_observable(self) -> AsyncObservable[_TSource]:
return AsyncAnonymousObservable(self.subscribe_async)

def choose(self, chooser: Callable[[_TSource], Option[_TSource]]) -> AsyncObservable[_TSource]:
def choose(self, chooser: Callable[[_TSource], Option[_TSource]]) -> AsyncRx[_TSource]:
"""Choose.
Applies the given function to each element of the stream and returns
Expand All @@ -175,7 +175,7 @@ def choose(self, chooser: Callable[[_TSource], Option[_TSource]]) -> AsyncObserv
"""
return AsyncRx(pipe(self, choose(chooser)))

def choose_async(self, chooser: Callable[[_TSource], Awaitable[Option[_TSource]]]) -> AsyncObservable[_TSource]:
def choose_async(self, chooser: Callable[[_TSource], Awaitable[Option[_TSource]]]) -> AsyncRx[_TSource]:
"""Choose async.
Applies the given async function to each element of the stream and
Expand Down Expand Up @@ -226,7 +226,7 @@ def delay(self, seconds: float) -> AsyncRx[_TSource]:

return AsyncRx(pipe(self, delay(seconds)))

def distinct_until_changed(self) -> AsyncObservable[_TSource]:
def distinct_until_changed(self) -> AsyncRx[_TSource]:
from .filtering import distinct_until_changed

return AsyncRx(distinct_until_changed(self))
Expand All @@ -250,7 +250,7 @@ def filter(self, predicate: Callable[[_TSource], bool]) -> AsyncRx[_TSource]:

return AsyncRx(pipe(self, _filter(predicate)))

def filteri(self, predicate: Callable[[_TSource, int], bool]) -> AsyncObservable[_TSource]:
def filteri(self, predicate: Callable[[_TSource, int], bool]) -> AsyncRx[_TSource]:
"""Filter with index.
Filters the elements of an observable sequence based on a predicate
Expand Down Expand Up @@ -325,7 +325,7 @@ def reduce_async(
) -> AsyncRx[_TResult]:
return pipe(self, reduce_async(accumulator, initial), AsyncRx[_TResult])

def skip(self, count: int) -> AsyncObservable[_TSource]:
def skip(self, count: int) -> AsyncRx[_TSource]:
"""Skip items from start of the stream.
Bypasses a specified number of elements in an observable sequence
Expand Down Expand Up @@ -388,7 +388,7 @@ def starmap(self: AsyncRx[tuple[Unpack[_V]]], mapper: Callable[[Unpack[_V]], _TR
"""
return AsyncRx(pipe(self, starmap(mapper)))

def take(self, count: int) -> AsyncObservable[_TSource]:
def take(self, count: int) -> AsyncRx[_TSource]:
"""Take the first elements from the stream.
Returns a specified number of contiguous elements from the start of
Expand All @@ -405,7 +405,7 @@ def take(self, count: int) -> AsyncObservable[_TSource]:

return AsyncRx(pipe(self, take(count)))

def take_last(self, count: int) -> AsyncObservable[_TSource]:
def take_last(self, count: int) -> AsyncRx[_TSource]:
"""Take last elements from stream.
Returns a specified number of contiguous elements from the end of an
Expand Down

0 comments on commit be08567

Please sign in to comment.