-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpredict_helper.py
189 lines (168 loc) · 8.59 KB
/
predict_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
from nlp_to_phenome import StrokeSettings, Concept2Mapping, escape_lable_to_filename
from LabelModel import LabelModel, CustomisedRecoginiser
from annotation_docs import PhenotypeAnn
from learners import PhenomeLearners
import utils
import logging
from os.path import join
from ann_converter import AnnConverter
from os import listdir
from os.path import isfile, exists
import sys
def predict(settings):
ann_dir = settings['test_ann_dir']
test_text_dir = settings['test_fulltext_dir']
_concept_mapping = settings['concept_mapping_file']
_learning_model_dir = settings['learning_model_dir']
_labels = utils.read_text_file(settings['entity_types_file'])
ignore_mappings = utils.load_json_data(settings['ignore_mapping_file'])
_cm_obj = Concept2Mapping(_concept_mapping)
doc2predicted = {}
no_models_labels = []
for phenotype in _labels:
logging.info('working on [%s]' % phenotype)
_learning_model_file = _learning_model_dir + '/%s.lm' % phenotype
if not exists(_learning_model_file):
# if previous learnt model not exists, skip
no_models_labels.append(phenotype)
continue
_ml_model_file_ptn = _learning_model_dir + '/' + phenotype + '_%s_DT.model'
lm = LabelModel.deserialise(_learning_model_file)
# pass the concept2mapping object to the label model instance
lm.concept_mapping = _cm_obj
lm.max_dimensions = 30
data = lm.load_data_for_predict(
ann_dir=ann_dir,
ignore_mappings=ignore_mappings, ignore_context=True,
separate_by_label=True,
full_text_dir=test_text_dir)
for lbl in data['lbl2data']:
X = data['lbl2data'][lbl]['X']
logging.debug(X)
doc_anns = data['lbl2data'][lbl]['doc_anns']
label_model_predict(lm, _ml_model_file_ptn, data['lbl2data'], doc2predicted)
return doc2predicted, no_models_labels
def label_model_predict(lm, model_file_pattern, lbl2data, doc2predicted,
mention_pattern=None, mention_prediction_param=None):
for lbl in lbl2data:
mp_predicted = None
if mention_pattern is not None:
mp_predicted = mention_pattern.predict(lbl2data[lbl]['doc_anns'], cr=mention_prediction_param)
X = lbl2data[lbl]['X']
doc_anns = lbl2data[lbl]['doc_anns']
if lbl in lm.rare_labels:
logging.info('%s to be predicted using %s' % (lbl, lm.rare_labels[lbl]))
PhenomeLearners.predict_use_simple_stats_in_action(lm.rare_labels[lbl],
item_size=len(X),
doc2predicted=doc2predicted,
doc_anns=doc_anns,
mp_predicted=mp_predicted, keep_predict_prob=True)
else:
if len(X) > 0:
logging.debug('%s, dimensions %s' % (lbl, len(X[0])))
PhenomeLearners.predict_use_model_in_action(X, model_file=model_file_pattern % escape_lable_to_filename(lbl),
pca_model_file=None,
doc2predicted=doc2predicted,
doc_anns=doc_anns,
mp_predicted=mp_predicted, keep_predict_prob=True)
def hybrid_prediciton(settings):
d2p, labels2work = predict(settings)
ann_dir = settings['test_ann_dir']
test_text_dir = settings['test_fulltext_dir']
_concept_mapping = settings['concept_mapping_file']
_learning_model_dir = settings['learning_model_dir']
_labels = utils.read_text_file(settings['entity_types_file'])
ignore_mappings = utils.load_json_data(settings['ignore_mapping_file'])
_cm_obj = Concept2Mapping(_concept_mapping)
file_keys = [f[:f.rfind('.')].replace('se_ann_', '') for f in listdir(ann_dir) if isfile(join(ann_dir, f))]
logging.info('labels to use direct nlp prediction: [%s]' % labels2work)
# convert SemEHRAnn to PhenotypeAnn
doc2predicted = {}
for d in d2p:
for t in d2p[d]:
ann = t['ann']
prob = t['prob']
if hasattr(ann, 'cui'):
lbl = _cm_obj.concept2label[ann.cui][0]
pheAnn = PhenotypeAnn(ann.str, ann.start, ann.end, ann.negation, ann.temporality, ann.experiencer,
'StudyName', lbl)
put_ann_label(lbl, pheAnn, doc2predicted, d, prob=prob)
else:
put_ann_label(ann.minor_type, ann, doc2predicted, d, prob=prob)
for fk in file_keys:
cr = CustomisedRecoginiser(join(ann_dir, '%s.json' % fk), _concept_mapping)
d = fk
for ann in cr.annotations:
if ann.cui in _cm_obj.concept2label:
lbl = _cm_obj.concept2label[ann.cui][0]
if lbl in labels2work:
pheAnn = PhenotypeAnn(ann.str, ann.start, ann.end, ann.negation, ann.temporality, ann.experiencer,
'StudyName', lbl)
put_ann_label(lbl, pheAnn, doc2predicted, d)
for ann in cr.phenotypes:
if ann.minor_type in labels2work:
put_ann_label(ann.minor_type, ann, doc2predicted, d)
return doc2predicted
def direct_nlp_prediction(settings):
ann_dir = settings['test_ann_dir']
test_text_dir = settings['test_fulltext_dir']
_concept_mapping = settings['concept_mapping_file']
_learning_model_dir = settings['learning_model_dir']
_labels = utils.read_text_file(settings['entity_types_file'])
ignore_mappings = utils.load_json_data(settings['ignore_mapping_file'])
_cm_obj = Concept2Mapping(_concept_mapping)
file_keys = [f[:f.rfind('.')].replace('se_ann_', '') for f in listdir(ann_dir) if isfile(join(ann_dir, f))]
doc2predicted = {}
for fk in file_keys:
cr = CustomisedRecoginiser(join(ann_dir, '%s.json' % fk), _concept_mapping)
d = fk
for ann in cr.annotations:
if ann.cui in _cm_obj.concept2label:
lbl = _cm_obj.concept2label[ann.cui][0]
pheAnn = PhenotypeAnn(ann.str, ann.start, ann.end, ann.negation, ann.temporality, ann.experiencer,
'StudyName', lbl)
if ann.negation != 'Affirmed' or len(ann.ruled_by) > 0:
continue
put_ann_label(lbl, pheAnn, doc2predicted, d)
for ann in cr.phenotypes:
put_ann_label(ann.minor_type, ann, doc2predicted, d)
return doc2predicted
def put_ann_label(lbl, pheAnn, doc2predicted, d, prob=None):
labeled_ann = {'label': lbl,
'ann': pheAnn}
if prob is not None:
labeled_ann['prob'] = prob
if d not in doc2predicted:
doc2predicted[d] = [labeled_ann]
else:
doc2predicted[d].append(labeled_ann)
def output_eHOST_format(doc2precited, output_folder):
for d in doc2precited:
xml = AnnConverter.to_eHOST(d, doc2precited[d])
utils.save_string(str(xml), join(output_folder, '%s.txt.knowtator.xml' % d))
def predict_to_eHOST_results(predict_setting):
ss = StrokeSettings(predict_setting)
if 'predict_mode' in ss.settings and ss.settings['predict_mode'] == 'direct_nlp':
logging.info('predicting with direct nlp...')
predicted_results = direct_nlp_prediction(ss.settings)
elif 'predict_mode' in ss.settings and ss.settings['predict_mode'] == 'hybrid':
logging.info('predicting with hybrid...')
predicted_results = hybrid_prediciton(ss.settings)
else:
logging.info('predicting...')
predicted_results, _ = predict(ss.settings)
output_eHOST_format(predicted_results, ss.settings['output_folder'])
logging.info('results saved to %s' % ss.settings['output_folder'])
if 'output_file' in ss.settings:
d2ann = {}
for d in predicted_results:
d2ann[d] = [{'label': t['label'], 'ann': t['ann'].to_dict(), 'prob': t['prob'] if 'prob' in t else ''}
for t in predicted_results[d]]
utils.save_json_array(d2ann, ss.settings['output_file'])
if __name__ == "__main__":
logging.basicConfig(level='DEBUG', format='[%(filename)s:%(lineno)d] %(name)s %(asctime)s %(message)s')
# predict_to_eHOST_results('./settings/prediction_task_direct.json')
if len(sys.argv) != 2:
print('the syntax is [python prediction_helper.py PROCESS_SETTINGS_FILE_PATH]')
else:
predict_to_eHOST_results(sys.argv[1])