forked from robinhilliard/pipes
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy paththreads.py
88 lines (70 loc) · 2.66 KB
/
threads.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from typing import (
Callable,
List,
Optional,
TypeVar,
Union,
)
from typing_extensions import Concatenate, ParamSpec, TypeAlias
from pipe_operator.python_flow.base import Pipe, PipeStart
TInput = TypeVar("TInput")
TOutput = TypeVar("TOutput")
TValue = TypeVar("TValue")
FuncParams = ParamSpec("FuncParams")
ThreadId: TypeAlias = Union[str, int]
class ThreadPipe(Pipe[TInput, FuncParams, TInput]):
"""
Pipe-able element that runs the given instructions in a separate thread.
Much like `Tap`, it performs a side-effect and does not impact the original value.
Useful for performing async/parallel actions.
Can be used alongside `ThreadWait` to wait for specific/all threads to finish.
Args:
thread_id (str): A unique identifier (within this pipe) for the thread. Useful for `ThreadWait`.
f (Callable[Concatenate[TInput, FuncParams], object]): The function that will be called in the thread.
args (FuncParams.args): All args (except the first) that will be passed to the function `f`.
kwargs (FuncParams.kwargs): All kwargs that will be passed to the function `f`.
Examples:
>>> import time
>>> (
... PipeStart(3)
... >> ThreadPipe("t1", lambda _: time.sleep(0.1))
... >> ThreadWait(["t1"])
... >> PipeEnd()
... )
3
"""
__slots__ = Pipe.__slots__ + ("thread_id",)
def __init__(
self,
thread_id: ThreadId,
f: Callable[Concatenate[TInput, FuncParams], object],
*args: FuncParams.args,
**kwargs: FuncParams.kwargs,
) -> None:
self.thread_id = thread_id
kwargs["_thread"] = True
super().__init__(f, *args, **kwargs) # type: ignore
def __rrshift__(self, other: "PipeStart") -> PipeStart[TInput]:
# Never called, but needed for typechecking
return other.__rshift__(self)
class ThreadWait:
"""
Pipe-able element used to wait for thread(s) (from `ThreadPipe`) to finish.
Args:
thread_ids (Optional[List[str]]): A list of thread identifiers to wait for. If not provided, all threads will be waited for.
Examples:
>>> import time
>>> (
... PipeStart(3)
... >> ThreadPipe("t1", lambda _: time.sleep(0.1))
... >> ThreadWait(["t1"])
... >> PipeEnd()
... )
3
"""
__slots__ = ("thread_ids",)
def __init__(self, thread_ids: Optional[List[str]] = None) -> None:
self.thread_ids = thread_ids
def __rrshift__(self, other: PipeStart[TValue]) -> PipeStart[TValue]:
# Never called, but needed for typechecking
return other