-
Notifications
You must be signed in to change notification settings - Fork 0
/
simulate_runner.py
69 lines (58 loc) · 2.13 KB
/
simulate_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import argparse
import asyncio
import datetime
import threading
import pandas as pd
from ai.analyze import analyze_by_llm
from ai.preprocess import preprocess_metric_data
from bot.bot import client, run_bot, send_alert
from monitor.service import cloudrun
cloudrun.CloudRunManager()
class RealTimeDataSimulator:
def __init__(self, df: pd.DataFrame):
self.data = df
self.current_index = 500
def get_next_chunk(self, chunk_size: int = 10):
# This method returns the next chunk of data
start_index = self.current_index
end_index = start_index + chunk_size
if end_index > len(self.data):
end_index = len(self.data)
chunk = self.data[start_index:end_index]
self.current_index = end_index
return chunk
def is_end(self):
# This method checks if the end of the data is reached
return self.current_index >= len(self.data)
async def main(data_directory: str):
# channel_id = os.getenv("DISCORD_DST_CHANNEL_ID")
simulate_data: pd.DataFrame = preprocess_metric_data(data_directory)
data_simulator = RealTimeDataSimulator(simulate_data)
bot_thread = threading.Thread(target=run_bot)
bot_thread.start()
while not client.is_ready():
await asyncio.sleep(1)
# channel = await client.get_channel(channel_id)
# await channel.send(f"開始模擬 {data_directory} 資料")
while not data_simulator.is_end():
chunk = data_simulator.get_next_chunk(10)
result = analyze_by_llm(chunk)
print(result)
result["timestamp"] = datetime.datetime.now()
result["cpu"] = 0
result["memory"] = 0
result["instance"] = 0
asyncio.run_coroutine_threadsafe(send_alert(message_dict=result), client.loop)
# time.sleep(6)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run a real-time data simulation and analysis system"
)
parser.add_argument(
"--data",
required=False,
help="Directory of the data to preprocess",
default="data/test/",
)
args = parser.parse_args()
asyncio.run(main(args.data))