-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathitem_62.py
131 lines (94 loc) · 2.96 KB
/
item_62.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import asyncio
import tempfile
import time
import threading
class NoNewData(Exception):
pass
async def async_tell(handle):
return handle.tell()
async def async_seek(handle, *args):
return handle.seek(*args)
async def async_read(handle):
return handle.readline()
async def async_readline(handle):
offset = await async_tell(handle)
await async_seek(handle, 0, 2)
length = await async_tell(handle)
if length == offset:
raise NoNewData
await async_seek(handle, 0)
return await async_read(handle)
def readline(handle):
offset = handle.tell()
handle.seek(0, 2)
length = handle.tell()
if length == offset:
raise NoNewData
handle.seek(offset, 0)
return handle.readline()
def tail_file(handle, interval, write_func):
while not handle.closed:
try:
line = readline(handle)
except NoNewData:
time.sleep(interval)
except ValueError as err:
print(err)
else:
write_func(line)
def run_threads(handles, interval, size):
with tempfile.TemporaryFile() as output:
lock = threading.Lock()
def write(data):
with lock:
output.write(data)
threads = []
for handle in handles:
args = (handle, interval, write)
thread = threading.Thread(target=tail_file, args=args)
thread.start()
threads.append(thread)
for handle in handles:
handle.close()
for thread in threads:
thread.join()
print(f"Expected: {size}, got: {output.tell()}")
def get_handles(n_files):
handles = [tempfile.TemporaryFile() for _ in range(n_files)]
size = 0
for handle in handles:
handle.write(b"Hello world\n")
size += handle.tell()
handle.seek(0)
return handles, size
n_files = 2
interval = 0.1
handles, size = get_handles(n_files)
run_threads(handles, interval, size)
async def tail_file_async(output, handle, interval, write_func):
loop = asyncio.get_event_loop()
while not handle.closed:
try:
line = await loop.run_in_executor(None, readline, handle)
except NoNewData:
await asyncio.sleep(interval)
except ValueError:
pass
else:
await write_func(output, line)
async def write_async(output, text):
output.write(text)
async def run_tasks_mixed(handles, interval, size):
with tempfile.TemporaryFile() as output:
tasks = []
for handle in handles:
coro = tail_file_async(output, handle, interval, write_async)
task = asyncio.create_task(coro)
tasks.append(task)
await asyncio.sleep(0.5)
for handle in handles:
handle.close()
await asyncio.gather(*tasks)
print(f"Expected: {size}, got: {output.tell()}")
handles, size = get_handles(n_files)
asyncio.run(run_tasks_mixed(handles, interval, size))