-
Notifications
You must be signed in to change notification settings - Fork 0
/
clusterer.py
90 lines (70 loc) · 2.81 KB
/
clusterer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import re
from copy import deepcopy
from parser.doctree_parser import Parser
from settings.language import ClusterKeywords
class Flow:
tokens: list
name: str
def __init__(self) -> None:
self.tokens = []
class Flows:
processed_indexes = {}
latest_flow: Flow = None
class Cluster:
_inner_flows: list[Flow] = []
_main_flows: list[Flow] = []
def __init__(self) -> None:
pass
def _find_inner_diagrams(self, token: str, index: int, flows: Flows):
if ClusterKeywords.CLUSTER in token:
flow = deepcopy(Flow())
flow.name = (
re.search(rf"(?<={ClusterKeywords.CLUSTER})(.*?)$", token)[0]
.lstrip()
.rstrip()
)
flows.latest_flow = deepcopy(flow)
flows.processed_indexes.update({"start_index": index})
elif ClusterKeywords.END_CLUSTER in token:
self._inner_flows.append(deepcopy(flows.latest_flow))
flows.processed_indexes.update({"end_index": index})
flows.latest_flow = None
return True
if flows.latest_flow is not None:
flows.latest_flow.tokens.append(token)
# TODO refactor repeated functionality
def _find_main_flows(self, token: str, index: int, flows: Flows):
if ClusterKeywords.MAIN_CLUSTER in token:
flow = deepcopy(Flow())
flow.name = (
re.search(rf"(?<={ClusterKeywords.MAIN_CLUSTER})(.*?)$", token)[0]
.lstrip()
.rstrip()
)
flows.latest_flow = deepcopy(flow)
flows.processed_indexes.update({"start_index": index})
elif ClusterKeywords.END_MAIN_CLUSTER in token:
self._main_flows.append(deepcopy(flows.latest_flow))
flows.processed_indexes.update({"end_index": index})
flows.latest_flow = None
return True
if flows.latest_flow is not None:
flows.latest_flow.tokens.append(token)
def extract_flows(self, file_name_list: list[str]):
for file in file_name_list:
parser = Parser()
parsed = parser.parse(file)
flows = Flows()
for index, token_raw in enumerate(parsed):
indexes_processed = self._find_inner_diagrams(
str(token_raw), index, flows
)
if indexes_processed:
start_index = flows.processed_indexes["start_index"]
end_index = flows.processed_indexes["end_index"] + 1
del parsed[start_index:end_index]
main_flows = Flows()
for index, token_raw in enumerate(parsed):
indexes_processed = self._find_main_flows(
str(token_raw), index, main_flows
)