-
Notifications
You must be signed in to change notification settings - Fork 1
/
random_indexing.py
425 lines (378 loc) · 17.4 KB
/
random_indexing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
import os
import codecs
import argparse
import time
import string
import numpy as np
from halo import Halo
from sklearn.neighbors import NearestNeighbors
"""
This file is part of the computer assignments for the course DD2418 Language engineering at KTH.
Created 2018 by Dmytro Kalpakchi and Johan Boye.
"""
##
## @brief Class for creating word vectors using Random Indexing technique.
## @author Dmytro Kalpakchi <dmytroka@kth.se>
## @date November 2018
##
class RandomIndexing(object):
##
## @brief Object initializer Initializes the Random Indexing algorithm
## with the necessary hyperparameters and the textfiles that
## will serve as corpora for generating word vectors
##
## The `self.__vocab` instance variable is initialized as a Python's set. If you're unfamiliar with sets, please
## follow this link to find out more: https://docs.python.org/3/tutorial/datastructures.html#sets.
##
## @param self The RI object itself (is omitted in the descriptions of other functions)
## @param filenames The filenames of the text files (7 Harry
## Potter books) that will serve as corpora
## for generating word vectors. Stored in an
## instance variable self.__sources.
## @param dimension The dimension of the word vectors (both
## context and random). Stored in an
## instance variable self.__dim.
## @param non_zero The number of non zero elements in a
## random word vector. Stored in an
## instance variable self.__non_zero.
## @param non_zero_values The possible values of non zero elements
## used when initializing a random word. Stored in an
## instance variable self.__non_zero_values.
## vector
## @param left_window_size The left window size. Stored in an
## instance variable self__lws.
## @param right_window_size The right window size. Stored in an
## instance variable self__rws.
##
def __init__(self, filenames, dimension=2000, non_zero=100, non_zero_values=list([-1, 1]), left_window_size=3, right_window_size=3):
self.__sources = filenames
self.__vocab = set()
self.__dim = dimension
self.__non_zero = non_zero
# there is a list call in a non_zero_values just for Doxygen documentation purposes
# otherwise, it gets documented as "[-1,"
self.__non_zero_values = non_zero_values
self.__lws = left_window_size
self.__rws = right_window_size
self.__cv = dict()
self.__rv = dict()
# NEW for search engine
def get_files(self):
return self.__sources
##
## @brief A function cleaning the line from punctuation and digits
##
## The function takes a line from the text file as a string,
## removes all the punctuation and digits from it and returns
## all words in the cleaned line.
##
## @param line The line of the text file to be cleaned
##
## @return A list of words in a cleaned line
##
def clean_line(self, line):
# YOUR CODE HERE
cleaned = []
words = line.split(' ')
for w in words:
clean_w = []
for c in w:
if c.isalpha():
clean_w.append(c)
if len(clean_w) > 0:
cleaned.append(''.join(clean_w))
return cleaned
##
## @brief A generator function providing one cleaned line at a time
##
## This function reads every file from the source files line by
## line and returns a special kind of iterator, called
## generator, returning one cleaned line a time.
##
## If you are unfamiliar with Python's generators, please read
## more following these links:
## - https://docs.python.org/3/howto/functional.html#generators
## - https://wiki.python.org/moin/Generators
##
## @return A generator yielding one cleaned line at a time
##
def text_gen(self):
for fname in self.__sources:
with open(fname, encoding='utf8', errors='ignore') as f:
for line in f:
yield self.clean_line(line)
##
## @brief Build vocabulary of words from the provided text files.
##
## Goes through all the cleaned lines and adds each word of the
## line to a vocabulary stored in a variable `self.__vocab`. The
## words, stored in the vocabulary, should be unique.
##
## **Note**: this function is where the first pass through all files is made
## (using the `text_gen` function)
##
def build_vocabulary(self):
# YOUR CODE HERE
for line in self.text_gen():
for word in line:
self.__vocab.add(word) #__vocab is set
self.write_vocabulary()
##
## @brief Get the size of the vocabulary
##
## @return The size of the vocabulary
##
@property
def vocabulary_size(self):
return len(self.__vocab)
##
## @brief Creates word embeddings using Random Indexing.
##
## The function stores the created word embeddings (or so called context vectors) in `self.__cv`.
## Random vectors used to create word embeddings are stored in `self.__rv`.
##
## Context vectors are created by looping through each cleaned line and updating the context
## vectors following the Random Indexing approach, i.e. using the words in the sliding window.
## The size of the sliding window is governed by two instance variables `self.__lws` (left window size)
## and `self.__rws` (right window size).
##
## For instance, let's consider a sentence:
## I really like programming assignments.
## Let's assume that the left part of the sliding window has size 1 (`self.__lws` = 1) and the right
## part has size 2 (`self.__rws` = 2). Then, the sliding windows will be constructed as follows:
## \verbatim
## I really like programming assignments.
## ^ r r
## I really like programming assignments.
## l ^ r r
## I really like programming assignments.
## l ^ r r
## I really like programming assignments.
## l ^ r
## I really like programming assignments.
## l ^
## \endverbatim
## where "^" denotes the word we're currently at, "l" denotes the words in the left part of the
## sliding window and "r" denotes the words in the right part of the sliding window.
##
## Implementation tips:
## - make sure to understand how generators work! Refer to the documentation of a `text_gen` function
## for more description.
## - the easiest way is to make `self.__cv` and `self.__rv` dictionaries with keys being words (as strings)
## and values being the context vectors.
##
## **Note**: this function is where the second pass through all files is made (using the `text_gen` function).
## The first one was done when calling `build_vocabulary` function. This might not the most
## efficient solution from the time perspective, but it's quite efficient from the memory
## perspective, given that we are using generators, which are lazily evaluated, instead of
## keeping all the cleaned lines in memory as a gigantic list.
##
def create_word_vectors(self):
# YOUR CODE HERE
#Create random vectors and initialize context vectors
for w in self.__vocab:
rv = np.zeros(self.__dim, dtype=int)
nz = np.random.binomial(1, 0.5, self.__dim)
nz[nz==0] = -1
c = np.random.choice(range(self.__dim), self.__non_zero, replace=False)
rv[c] = nz[c]
self.__rv[w] = rv
self.__cv[w] = np.zeros(self.__dim, dtype=int)
#Random indexing
for curr_line in self.text_gen():
for i in range(len(curr_line)):
for j in range(max(i - self.__lws, 0), i):
self.__cv[curr_line[i]] += self.__rv[curr_line[j]]
for j in range(min(i+1, len(curr_line)), min(i+1 + self.__rws, len(curr_line))):
self.__cv[curr_line[i]] += self.__rv[curr_line[j]]
print('Word vectors created.')
##
## @brief Function returning k nearest neighbors with distances for each word in `words`
##
## We suggest using nearest neighbors implementation from scikit-learn
## (https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.NearestNeighbors.html). Check
## carefully their documentation regarding the parameters passed to the algorithm.
##
## To describe how the function operates, imagine you want to find 5 nearest neighbors for the words
## "Harry" and "Potter" using cosine distance (which can be computed as 1 - cosine similarity).
## For that you would need to call `self.find_nearest(["Harry", "Potter"], k=5, metric='cosine')`.
## The output of the function would then be the following list of lists of tuples (LLT)
## (all words and distances are just example values):
## \verbatim
## [[('Harry', 0.0), ('Hagrid', 0.07), ('Snape', 0.08), ('Dumbledore', 0.08), ('Hermione', 0.09)],
## [('Potter', 0.0), ('quickly', 0.21), ('asked', 0.22), ('lied', 0.23), ('okay', 0.24)]]
## \endverbatim
## The i-th element of the LLT would correspond to k nearest neighbors for the i-th word in the `words`
## list, provided as an argument. Each tuple contains a word and a similarity/distance metric.
## The tuples are sorted either by descending similarity or by ascending distance.
##
## @param words A list of words, for which the nearest neighbors should be returned
## @param k A number of nearest neighbors to be returned
## @param metric A similarity/distance metric to be used (defaults to cosine distance)
##
## @return A list of list of tuples in the format specified in the function description
##
def find_nearest(self, words, k=5, metric='cosine'):
# YOUR CODE HERE
#Training
knn = NearestNeighbors(n_neighbors=k, metric=metric)
X = np.zeros((len(self.__cv), self.__dim))
vocab_idx = []
for w, v in self.__cv.items():
X[len(vocab_idx)] = self.__cv[w]
vocab_idx.append(w)
knn.fit(X)
#Querying
Q = None
indexes = []
for idx, w in enumerate(words):
wv = self.get_word_vector(w)
if wv is not None:
indexes.append(w)
# else:
# print(f'{w} not present in vocabulary.')
if len(indexes) > 0:
Q = np.zeros((len(indexes), self.__dim))
for i, w in enumerate(indexes):
Q[i] = self.__cv[w]
dist, indices = knn.kneighbors(Q)
#Formatting results
results = []
for i in range(len(indexes)):
neighbors = []
for j in range(k):
neighbors.append((vocab_idx[indices[i][j]], round(dist[i][j], 3)))
results.append(neighbors)
return results
else:
return None
##
## @brief Returns a vector for the word obtained after Random Indexing is finished
##
## @param word The word as a string
##
## @return The word vector if the word exists in the vocabulary and None otherwise.
##
def get_word_vector(self, word):
# YOUR CODE HERE
return self.__cv[word] if word in self.__cv else None
##
## @brief Checks if the vocabulary is written as a text file
##
## @return True if the vocabulary file is written and False otherwise
##
def vocab_exists(self):
return os.path.exists('vocab.txt')
##
## @brief Reads a vocabulary from a text file having one word per line.
##
## @return True if the vocabulary exists was read from the file and False otherwise
## (note that exception handling in case the reading failes is not implemented)
##
def read_vocabulary(self):
vocab_exists = self.vocab_exists()
if vocab_exists:
with open('vocab.txt') as f:
for line in f:
self.__vocab.add(line.strip())
self.__i2w = list(self.__vocab)
return vocab_exists
##
## @brief Writes a vocabulary as a text file containing one word from the vocabulary per row.
##
def write_vocabulary(self):
with open('vocab.txt', 'w') as f:
for w in self.__vocab:
f.write('{}\n'.format(w))
##
## @brief Main function call to train word embeddings
##
## If vocabulary file exists, it reads the vocabulary from the file (to speed up the program),
## otherwise, it builds a vocabulary by reading and cleaning all the Harry Potter books and
## storing unique words.
##
## After the vocabulary is created/read, the word embeddings are created using Random Indexing.
##
def train(self):
spinner = Halo(spinner='arrow3')
if self.vocab_exists():
spinner.start(text="Reading vocabulary...")
start = time.time()
self.read_vocabulary()
spinner.succeed(text="Read vocabulary in {}s. Size: {} words".format(round(time.time() - start, 2), ri.vocabulary_size))
else:
spinner.start(text="Building vocabulary...")
start = time.time()
self.build_vocabulary()
spinner.succeed(text="Built vocabulary in {}s. Size: {} words".format(round(time.time() - start, 2), ri.vocabulary_size))
spinner.start(text="Creating vectors using random indexing...")
start = time.time()
self.create_word_vectors()
spinner.succeed("Created random indexing vectors in {}s.".format(round(time.time() - start, 2)))
spinner.succeed(text="Execution is finished! Please enter words of interest (separated by space):")
#NEW
def write_to_file(self):
try:
print('Writing word vectors to file...')
with open('ri.txt', 'w') as f:
C = self.__cv
for w, v in C.items():
norm = np.linalg.norm(v)
norm_v = v / (norm if norm > 0 else 0.1)
f.write(w + ' ' + ' '.join(map(lambda x: str(x), norm_v)) + '\n')
print('Done.')
except:
print("Error: failing to write model to the file")
##
## @brief Trains word embeddings and enters the interactive loop, where you can
## enter a word and get a list of k nearest neighours.
##
def train_and_persist(self, persist=False):
self.train()
self.write_to_file()
print("PRESS q FOR EXIT")
text = input('> ')
if persist:
while text != 'q':
text = text.split()
neighbors = self.find_nearest(text)
for w, n in zip(text, neighbors):
print("Neighbors for {}: {}".format(w, n))
text = input('> ')
#NEW for search engine
def load(self, vector_file):
counter = 0
with codecs.open(vector_file, 'r', 'utf-8') as f:
for line in f:
counter += 1
word = line[:line.find(' ')]
v_list = np.asarray(list(map(lambda x: float(x), line.split()[1:])))
self.__cv[word] = v_list
print(f'{counter} random vectors loaded.')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Random Indexing word embeddings')
parser.add_argument('-fv', '--force-vocabulary', action='store_true', help='regenerate vocabulary')
parser.add_argument('-c', '--cleaning', action='store_true', default=False)
parser.add_argument('-co', '--cleaned_output', default='cleaned_example.txt', help='Output file name for the cleaned text')
args = parser.parse_args()
if args.force_vocabulary:
os.remove('vocab.txt')
if args.cleaning:
ri = RandomIndexing(['example.txt'])
with open(args.cleaned_output, 'w') as f:
for part in ri.text_gen():
f.write("{}\n".format(" ".join(part)))
else:
# dir_name = "data"
# filenames = [os.path.join(dir_name, fn) for fn in os.listdir(dir_name)]
filenames = ['podcast_text.txt']
ri = RandomIndexing(filenames, dimension=200, non_zero=10)
#TESTING
# ri.build_vocabulary()
# print(f'Vocabulary: {ri.vocabulary_size} unique words.')
# ri.create_word_vectors()
# neighbors = ri.find_nearest(['Harry', 'Potter'], k=10)
# if neighbors is not None:
# for word in neighbors:
# print(word)
ri.train_and_persist()