-
Notifications
You must be signed in to change notification settings - Fork 0
/
chong.py
123 lines (96 loc) · 3.32 KB
/
chong.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import sys
from PyQt6.QtWidgets import QApplication, QMainWindow, QTextEdit
from PyQt6.QtCore import Qt, QTimer
import threading
import queue
import time
class WorkerThread(threading.Thread):
def __init__(self, output_queue):
super().__init__()
self.output_queue = output_queue
def run(self):
# 模拟线程中的输出
for i in range(10):
output = f"Thread {threading.get_ident()}: Message {i}"
self.output_queue.put(output)
time.sleep(1)
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.textEdit = QTextEdit()
self.setCentralWidget(self.textEdit)
self.output_queue = queue.Queue()
self.worker_threads = []
self.start_threads()
self.timer = QTimer()
self.timer.timeout.connect(self.update_output)
self.timer.start(1000)
def start_threads(self):
for _ in range(3):
thread = WorkerThread(self.output_queue)
thread.start()
self.worker_threads.append(thread)
def update_output(self):
while not self.output_queue.empty():
output = self.output_queue.get()
self.textEdit.append(output.strip())
if __name__ == "__main__":
app = QApplication(sys.argv)
mainWindow = MainWindow()
mainWindow.show()
sys.exit(app.exec())
import sys
from PyQt6.QtCore import Qt, QThread, pyqtSignal, QObject
from PyQt6.QtWidgets import QApplication, QMainWindow, QTextEdit
from io import StringIO
import contextlib
class PrintOutput(QObject):
outputReceived = pyqtSignal(str)
def __init__(self):
super().__init__()
self.outputBuffer = StringIO()
def write(self, s):
self.outputBuffer.write(s)
self.outputBuffer.seek(0)
self.outputReceived.emit(self.outputBuffer.getvalue())
self.outputBuffer.seek(0, 2)
def flush(self):
pass
@contextlib.contextmanager
def redirect_stdout(new_target):
old_target, sys.stdout = sys.stdout, new_target
try:
yield new_target
finally:
sys.stdout = old_target
class WorkerThread(QThread):
outputReceived = pyqtSignal(str)
def run(self):
with redirect_stdout(PrintOutput()) as output:
# 模拟线程中的输出
for i in range(10):
print(f"Thread {self.currentThreadId()}: Message {i}")
self.msleep(1000)
self.outputReceived.emit(output.outputBuffer.getvalue())
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.textEdit = QTextEdit()
self.setCentralWidget(self.textEdit)
self.workerThreads = []
self.startThreads()
def startThreads(self):
for i in range(3):
thread = WorkerThread()
thread.started.connect(lambda id=i: self.appendOutput(f"Thread {id} started"))
thread.finished.connect(lambda id=i: self.appendOutput(f"Thread {id} finished"))
thread.outputReceived.connect(self.appendOutput)
self.workerThreads.append(thread)
thread.start()
def appendOutput(self, output):
self.textEdit.append(output.strip())
if __name__ == "__main__":
app = QApplication(sys.argv)
mainWindow = MainWindow()
mainWindow.show()
sys.exit(app.exec())