-
Notifications
You must be signed in to change notification settings - Fork 9
/
models.py
247 lines (219 loc) · 19.8 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
import tensorflow as tf
class CNN:
def __init__(self, x1_maxlen, x2_maxlen, y_len, embedding, filter_sizes, num_filters, hidden_size, state_size, x3_size):
self.input_x1 = tf.placeholder(tf.int32, [None, x1_maxlen], name="post_text")
self.input_x1_len = tf.placeholder(tf.int32, [None, ], name="post_text_len")
self.input_x2 = tf.placeholder(tf.int32, [None, x2_maxlen], name="target_description")
self.input_x2_len = tf.placeholder(tf.int32, [None, ], name="target_description_len")
self.input_x3 = tf.placeholder(tf.float32, [None, x3_size], name="image_feature")
self.input_y = tf.placeholder(tf.float32, [None, y_len], name="truth_class")
self.input_z = tf.placeholder(tf.float32, [None, 1], name="truth_mean")
self.dropout_rate_embedding = tf.placeholder(tf.float32, name="dropout_rate_embedding")
self.dropout_rate_hidden = tf.placeholder(tf.float32, name="dropout_rate_hidden")
self.dropout_rate_cell = tf.placeholder(tf.float32, name="dropout_rate_cell")
self.batch_size = tf.placeholder(tf.int32, name="batch_size")
self.W = tf.get_variable(shape=embedding.shape, initializer=tf.constant_initializer(embedding), name="embedding")
self.embedded_input_x1 = tf.nn.embedding_lookup(self.W, self.input_x1)
self.embedded_input_x1 = tf.layers.dropout(self.embedded_input_x1, rate=1-self.dropout_rate_embedding)
self.embedded_input_x1_expanded = tf.expand_dims(self.embedded_input_x1, -1)
pooled_outputs1 = []
for i, filter_size in enumerate(filter_sizes):
with tf.name_scope("1-conv-maxpool-%s" % filter_size):
filter_shape = [filter_size, embedding.shape[1], 1, num_filters]
W = tf.Variable(tf.truncated_normal(filter_shape, stddev=0.1), name="filter_weights")
b = tf.Variable(tf.constant(0.1, shape=[num_filters]), name="filter_biases")
conv = tf.nn.conv2d(self.embedded_input_x1_expanded, W, strides=[1, 1, 1, 1], padding="VALID", name="conv")
h = tf.nn.relu(tf.nn.bias_add(conv, b), name="relu")
pooled = tf.nn.max_pool(h, ksize=[1, x1_maxlen-filter_size+1, 1, 1], strides=[1, 1, 1, 1], padding="VALID", name="pool")
pooled_outputs1.append(pooled)
num_features = num_filters*len(filter_sizes)
self.h_pool1 = tf.concat(pooled_outputs1, 3)
self.h_pool_flat1 = tf.reshape(self.h_pool1, [-1, num_features])
if x3_size:
self.compressed_input_x3 = tf.layers.dense(tf.layers.dense(self.input_x3, 1024, activation=tf.nn.relu), 256, activation=tf.nn.relu)
self.h_pool_flat1 = tf.concat([self.h_pool_flat1, self.compressed_input_x3], axis=-1)
if hidden_size:
self.h_pool_flat1 = tf.layers.dense(self.h_pool_flat1, hidden_size, activation=tf.nn.relu)
self.h_drop1 = tf.layers.dropout(self.h_pool_flat1, rate=1-self.dropout_rate_hidden)
self.scores = tf.layers.dense(inputs=self.h_drop1, units=y_len)
if y_len == 1:
self.predictions = tf.nn.sigmoid(self.scores, name="prediction")
self.loss = tf.losses.mean_squared_error(self.input_z, self.predictions)
self.mse = tf.losses.mean_squared_error(self.input_z, self.predictions)
correct_predictions = tf.equal(tf.cast(tf.round(self.predictions), tf.int32), tf.cast(tf.round(self.input_y), tf.int32))
self.accuracy = tf.reduce_mean(tf.cast(correct_predictions, "float"), name="accuracy")
elif y_len == 2:
self.loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=self.scores, labels=self.input_y))
self.predictions = tf.slice(tf.nn.softmax(self.scores), [0, 0], [-1, 1], name="prediction")
self.mse = tf.losses.mean_squared_error(self.input_z, self.predictions)
correct_predictions = tf.equal(tf.argmax(self.scores, 1), tf.argmax(self.input_y, 1))
self.accuracy = tf.reduce_mean(tf.cast(correct_predictions, "float"), name="accuracy")
elif y_len == 4:
self.normalised_scores = tf.nn.softmax(self.scores, name="distribution")
self.loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=self.input_y, logits=self.scores))
self.predictions = tf.matmul(self.normalised_scores, tf.constant([0, 0.3333333333, 0.6666666666, 1.0], shape=[4, 1]), name="prediction")
self.mse = tf.losses.mean_squared_error(self.input_z, self.predictions)
correct_predictions = tf.equal(tf.argmax(tf.matmul(self.normalised_scores, tf.constant([1, 0, 1, 0, 0, 1, 0, 1], shape=[4, 2], dtype=tf.float32)), 1), tf.argmax(tf.matmul(self.input_y, tf.constant([1, 0, 1, 0, 0, 1, 0, 1], shape=[4, 2], dtype=tf.float32)), 1))
self.accuracy = tf.reduce_mean(tf.cast(correct_predictions, "float"), name="accuracy")
class DAN:
def __init__(self, x1_maxlen, x2_maxlen, y_len, embedding, filter_sizes, num_filters, hidden_size, state_size, x3_size):
self.input_x1 = tf.placeholder(tf.int32, [None, x1_maxlen], name="post_text")
self.input_x1_len = tf.placeholder(tf.int32, [None, ], name="post_text_len")
self.input_x2 = tf.placeholder(tf.int32, [None, x2_maxlen], name="target_description")
self.input_x2_len = tf.placeholder(tf.int32, [None, ], name="target_description_len")
self.input_x3 = tf.placeholder(tf.float32, [None, x3_size], name="image_feature")
self.input_y = tf.placeholder(tf.float32, [None, y_len], name="truth_class")
self.input_z = tf.placeholder(tf.float32, [None, 1], name="truth_mean")
self.dropout_rate_embedding = tf.placeholder(tf.float32, name="dropout_rate_embedding")
self.dropout_rate_hidden = tf.placeholder(tf.float32, name="dropout_rate_hidden")
self.dropout_rate_cell = tf.placeholder(tf.float32, name="dropout_rate_cell")
self.batch_size = tf.placeholder(tf.int32, name="batch_size")
self.W = tf.get_variable(shape=embedding.shape, initializer=tf.constant_initializer(embedding), name="embedding")
self.embedded_input_x1 = tf.nn.embedding_lookup(self.W, self.input_x1)
self.embedded_input_x1 = tf.layers.dropout(self.embedded_input_x1, rate=1-self.dropout_rate_embedding)
# self.avg_input_x1 = tf.reduce_mean(self.embedded_input_x1, axis=1)
mask = tf.cast(tf.contrib.keras.backend.repeat_elements(tf.expand_dims(tf.sequence_mask(self.input_x1_len, x1_maxlen), axis=-1), embedding.shape[1], axis=2), tf.float32)
masked_embedded_input_x1 = tf.multiply(self.embedded_input_x1, mask)
self.avg_input_x1 = tf.reduce_sum(masked_embedded_input_x1, axis=1)/tf.reduce_sum(mask, axis=1)
if hidden_size:
self.avg_input_x1 = tf.layers.dense(self.avg_input_x1, hidden_size, activation=tf.nn.relu)
self.h_drop1 = tf.layers.dropout(self.avg_input_x1, rate=1-self.dropout_rate_hidden)
self.scores = tf.layers.dense(inputs=self.h_drop1, units=y_len)
if y_len == 1:
self.predictions = tf.nn.sigmoid(self.scores, name="prediction")
self.loss = tf.losses.mean_squared_error(self.input_z, self.predictions)
self.mse = tf.losses.mean_squared_error(self.input_z, self.predictions)
correct_predictions = tf.equal(tf.cast(tf.round(self.predictions), tf.int32), tf.cast(tf.round(self.input_y), tf.int32))
self.accuracy = tf.reduce_mean(tf.cast(correct_predictions, "float"), name="accuracy")
elif y_len == 2:
self.loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=self.scores, labels=self.input_y))
self.predictions = tf.slice(tf.nn.softmax(self.scores), [0, 0], [-1, 1], name="prediction")
self.mse = tf.losses.mean_squared_error(self.input_z, self.predictions)
correct_predictions = tf.equal(tf.argmax(self.scores, 1), tf.argmax(self.input_y, 1))
self.accuracy = tf.reduce_mean(tf.cast(correct_predictions, "float"), name="accuracy")
elif y_len == 4:
self.normalised_scores = tf.nn.softmax(self.scores, name="distribution")
self.loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=self.input_y, logits=self.scores))
self.predictions = tf.matmul(self.normalised_scores, tf.constant([0, 0.3333333333, 0.6666666666, 1.0], shape=[4, 1]), name="prediction")
self.mse = tf.losses.mean_squared_error(self.input_z, self.predictions)
correct_predictions = tf.equal(tf.argmax(tf.matmul(self.normalised_scores, tf.constant([1, 0, 1, 0, 0, 1, 0, 1], shape=[4, 2], dtype=tf.float32)), 1), tf.argmax(tf.matmul(self.input_y, tf.constant([1, 0, 1, 0, 0, 1, 0, 1], shape=[4, 2], dtype=tf.float32)), 1))
self.accuracy = tf.reduce_mean(tf.cast(correct_predictions, "float"), name="accuracy")
def extract_last(output, lengths):
batch_range = tf.range(tf.shape(output)[0])
batch_idx = tf.stack([batch_range, lengths-1], axis=-1)
return tf.gather_nd(output, batch_idx)
class BiRNN:
def __init__(self, x1_maxlen, x2_maxlen, y_len, embedding, filter_sizes, num_filters, hidden_size, state_size, x3_size):
self.input_x1 = tf.placeholder(tf.int32, [None, x1_maxlen], name="post_text")
self.input_x1_len = tf.placeholder(tf.int32, [None, ], name="post_text_len")
self.input_x2 = tf.placeholder(tf.int32, [None, x2_maxlen], name="target_description")
self.input_x2_len = tf.placeholder(tf.int32, [None, ], name="target_description_len")
self.input_x3 = tf.placeholder(tf.float32, [None, x3_size], name="image_feature")
self.input_y = tf.placeholder(tf.float32, [None, y_len], name="truth_class")
self.input_z = tf.placeholder(tf.float32, [None, 1], name="truth_mean")
self.dropout_rate_embedding = tf.placeholder(tf.float32, name="dropout_rate_embedding")
self.dropout_rate_hidden = tf.placeholder(tf.float32, name="dropout_rate_hidden")
self.dropout_rate_cell = tf.placeholder(tf.float32, name="dropout_rate_cell")
self.batch_size = tf.placeholder(tf.int32, [], name="batch_size")
self.W = tf.get_variable(shape=embedding.shape, initializer=tf.constant_initializer(embedding), name="embedding")
self.embedded_input_x1 = tf.nn.embedding_lookup(self.W, self.input_x1)
self.embedded_input_x1 = tf.layers.dropout(self.embedded_input_x1, rate=1-self.dropout_rate_embedding)
cell_fw = tf.contrib.rnn.GRUCell(state_size)
cell_dropout_fw = tf.contrib.rnn.DropoutWrapper(cell_fw, output_keep_prob=1-self.dropout_rate_cell)
initial_state_fw = cell_fw.zero_state(self.batch_size, tf.float32)
cell_bw = tf.contrib.rnn.GRUCell(state_size)
cell_dropout_bw = tf.contrib.rnn.DropoutWrapper(cell_bw, output_keep_prob=1-self.dropout_rate_cell)
initial_state_bw = cell_bw.zero_state(self.batch_size, tf.float32)
outputs, states = tf.nn.bidirectional_dynamic_rnn(cell_fw=cell_dropout_fw, cell_bw=cell_dropout_bw, inputs=self.embedded_input_x1, sequence_length=self.input_x1_len, initial_state_bw=initial_state_bw, initial_state_fw=initial_state_fw)
bi_outputs = tf.concat(outputs, 2)
mask = tf.cast(tf.contrib.keras.backend.repeat_elements(tf.expand_dims(tf.sequence_mask(self.input_x1_len, x1_maxlen), axis=-1), 2*state_size, axis=2), tf.float32)
self.h_drop = tf.layers.dropout(tf.concat([extract_last(outputs[0], self.input_x1_len), outputs[1][:, 0, :]], -1), rate=1-self.dropout_rate_hidden)
# self.h_drop = tf.layers.dropout(tf.reduce_sum(bi_outputs, axis=1)/tf.reduce_sum(mask, axis=1), rate=1-self.dropout_rate_hidden)
#
# self.h_drop = tf.layers.dropout(tf.reduce_max(bi_outputs, axis=1), rate=1-self.dropout_rate_hidden)
self.scores = tf.layers.dense(inputs=self.h_drop, units=y_len)
if y_len == 1:
self.predictions = tf.nn.sigmoid(self.scores, name="prediction")
self.loss = tf.losses.mean_squared_error(self.input_z, self.predictions)
self.mse = tf.losses.mean_squared_error(self.input_z, self.predictions)
correct_predictions = tf.equal(tf.cast(tf.round(self.predictions), tf.int32), tf.cast(tf.round(self.input_y), tf.int32))
self.accuracy = tf.reduce_mean(tf.cast(correct_predictions, "float"), name="accuracy")
elif y_len == 2:
self.loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=self.scores, labels=self.input_y))
self.predictions = tf.slice(tf.nn.softmax(self.scores), [0, 0], [-1, 1], name="prediction")
self.mse = tf.losses.mean_squared_error(self.input_z, self.predictions)
correct_predictions = tf.equal(tf.argmax(self.scores, 1), tf.argmax(self.input_y, 1))
self.accuracy = tf.reduce_mean(tf.cast(correct_predictions, "float"), name="accuracy")
elif y_len == 4:
self.normalised_scores = tf.nn.softmax(self.scores, name="distribution")
self.loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=self.input_y, logits=self.scores))
self.predictions = tf.matmul(self.normalised_scores, tf.constant([0, 0.3333333333, 0.6666666666, 1.0], shape=[4, 1]), name="prediction")
self.mse = tf.losses.mean_squared_error(self.input_z, self.predictions)
correct_predictions = tf.equal(tf.argmax(tf.matmul(self.normalised_scores, tf.constant([1, 0, 1, 0, 0, 1, 0, 1], shape=[4, 2], dtype=tf.float32)), 1), tf.argmax(tf.matmul(self.input_y, tf.constant([1, 0, 1, 0, 0, 1, 0, 1], shape=[4, 2], dtype=tf.float32)), 1))
self.accuracy = tf.reduce_mean(tf.cast(correct_predictions, "float"), name="accuracy")
class SAN:
def __init__(self, x1_maxlen, x2_maxlen, y_len, embedding, filter_sizes, num_filters, hidden_size, state_size, x3_size, attention_size, view_size=1, alpha=0, beta=0):
if view_size == 1:
beta = 0
self.input_x1 = tf.placeholder(tf.int32, [None, x1_maxlen], name="post_text")
self.input_x1_len = tf.placeholder(tf.int32, [None, ], name="post_text_len")
self.input_x2 = tf.placeholder(tf.int32, [None, x2_maxlen], name="target_description")
self.input_x2_len = tf.placeholder(tf.int32, [None, ], name="target_description_len")
self.input_x3 = tf.placeholder(tf.float32, [None, x3_size], name="image_feature")
self.input_y = tf.placeholder(tf.float32, [None, y_len], name="truth_class")
self.input_z = tf.placeholder(tf.float32, [None, 1], name="truth_mean")
self.dropout_rate_embedding = tf.placeholder(tf.float32, name="dropout_rate_embedding")
self.dropout_rate_hidden = tf.placeholder(tf.float32, name="dropout_rate_hidden")
self.dropout_rate_cell = tf.placeholder(tf.float32, name="dropout_rate_cell")
self.batch_size = tf.placeholder(tf.int32, [], name="batch_size")
with tf.variable_scope("embedding"):
self.W = tf.get_variable(shape=embedding.shape, initializer=tf.constant_initializer(embedding), name="embedding")
self.embedded_input_x1 = tf.nn.embedding_lookup(self.W, self.input_x1)
self.embedded_input_x1 = tf.layers.dropout(self.embedded_input_x1, rate=1-self.dropout_rate_embedding)
with tf.variable_scope("biRNN"):
cell_fw = tf.contrib.rnn.GRUCell(state_size)
cell_dropout_fw = tf.contrib.rnn.DropoutWrapper(cell_fw, output_keep_prob=1-self.dropout_rate_cell)
initial_state_fw = cell_fw.zero_state(self.batch_size, tf.float32)
cell_bw = tf.contrib.rnn.GRUCell(state_size)
cell_dropout_bw = tf.contrib.rnn.DropoutWrapper(cell_bw, output_keep_prob=1-self.dropout_rate_cell)
initial_state_bw = cell_bw.zero_state(self.batch_size, tf.float32)
outputs, states = tf.nn.bidirectional_dynamic_rnn(cell_fw=cell_dropout_fw, cell_bw=cell_dropout_bw, inputs=self.embedded_input_x1, sequence_length=self.input_x1_len, initial_state_bw=initial_state_bw, initial_state_fw=initial_state_fw)
bi_outputs = tf.concat(outputs, 2)
with tf.variable_scope("attention"):
W_1 = tf.get_variable(shape=[2*state_size, attention_size], initializer=tf.contrib.layers.xavier_initializer(), name="W_1")
W_2 = tf.get_variable(shape=[attention_size, view_size], initializer=tf.contrib.layers.xavier_initializer(), name="W_2")
reshaped_bi_outputs = tf.reshape(bi_outputs, shape=[-1, 2*state_size])
if x3_size:
# self.compressed_input_x3 = tf.contrib.keras.backend.repeat(tf.layers.dense(tf.layers.dense(self.input_x3, 1024, activation=tf.nn.tanh), attention_size, activation=tf.nn.tanh), x1_maxlen)
self.compressed_input_x3 = tf.contrib.keras.backend.repeat(tf.layers.dense(self.input_x3, attention_size, activation=tf.nn.tanh), x1_maxlen)
self.compressed_input_x3 = tf.reshape(self.compressed_input_x3, shape=[-1, attention_size])
self.attention = tf.nn.softmax(tf.reshape(tf.matmul(tf.nn.tanh(tf.matmul(reshaped_bi_outputs, W_1)+self.compressed_input_x3), W_2), shape=[self.batch_size, x1_maxlen, view_size]), dim=1)
else:
self.attention = tf.nn.softmax(tf.reshape(tf.matmul(tf.nn.tanh(tf.matmul(reshaped_bi_outputs, W_1)), W_2), shape=[self.batch_size, x1_maxlen, view_size]), dim=1)
attention_output = tf.reshape(tf.matmul(tf.transpose(bi_outputs, perm=[0, 2, 1]), self.attention), shape=[self.batch_size, view_size*2*state_size])
with tf.variable_scope("penalty"):
attention_t = tf.transpose(self.attention, perm=[0, 2, 1])
attention_t_attention = tf.matmul(attention_t, self.attention)
identity = tf.reshape(tf.tile(tf.diag(tf.ones([view_size])), [self.batch_size, 1]), shape=[self.batch_size, view_size, view_size])
self.penalised_term = tf.square(tf.norm(attention_t_attention-identity, ord="euclidean", axis=[1, 2]))
self.h_drop = tf.layers.dropout(attention_output, rate=1-self.dropout_rate_hidden)
self.scores = tf.layers.dense(inputs=self.h_drop, units=y_len)
if y_len == 1:
self.predictions = tf.nn.sigmoid(self.scores, name="prediction")
self.loss = tf.reduce_mean(tf.square(tf.subtract(self.input_z, self.predictions))+beta*self.penalised_term)
self.mse = tf.losses.mean_squared_error(self.input_z, self.predictions)
correct_predictions = tf.equal(tf.cast(tf.round(self.predictions), tf.int32), tf.cast(tf.round(self.input_y), tf.int32))
self.accuracy = tf.reduce_mean(tf.cast(correct_predictions, "float"), name="accuracy")
elif y_len == 2:
self.loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=self.scores, labels=self.input_y)+beta*self.penalised_term)
self.predictions = tf.slice(tf.nn.softmax(self.scores), [0, 0], [-1, 1], name="prediction")
self.mse = tf.losses.mean_squared_error(self.input_z, self.predictions)
correct_predictions = tf.equal(tf.argmax(self.scores, 1), tf.argmax(self.input_y, 1))
self.accuracy = tf.reduce_mean(tf.cast(correct_predictions, "float"), name="accuracy")
elif y_len == 4:
self.normalised_scores = tf.nn.softmax(self.scores, name="distribution")
self.loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=self.input_y, logits=self.scores)+beta*self.penalised_term)
self.predictions = tf.matmul(self.normalised_scores, tf.constant([0, 0.3333333333, 0.6666666666, 1.0], shape=[4, 1]), name="prediction")
self.mse = tf.losses.mean_squared_error(self.input_z, self.predictions)
correct_predictions = tf.equal(tf.argmax(tf.matmul(self.normalised_scores, tf.constant([1, 0, 1, 0, 0, 1, 0, 1], shape=[4, 2], dtype=tf.float32)), 1), tf.argmax(tf.matmul(self.input_y, tf.constant([1, 0, 1, 0, 0, 1, 0, 1], shape=[4, 2], dtype=tf.float32)), 1))
self.accuracy = tf.reduce_mean(tf.cast(correct_predictions, "float"), name="accuracy")