-
Notifications
You must be signed in to change notification settings - Fork 0
/
recommender.py
170 lines (158 loc) · 7.07 KB
/
recommender.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#!/usr/bin/python3
# -*-coding: utf-8 -*-
"""
Make sure to run feature_extractor.py and model_clusters.py
(if result_clusters.pickle is not exists) before proceed
"""
import json
from heapq import heappush, nsmallest
import os
import pickle
import numpy
import psycopg2
from feature_extractor import FeatureExtractorW2V
con = psycopg2.connect(
user="postgres", host="localhost", dbname="olx_data", password="postgres")
con.autocommit = True
cur = con.cursor()
class Recommender(object):
"""
Predicts similar items
"""
def __init__(self, top_clusters, top_recommendations):
base_path = os.path.dirname(os.path.realpath(__file__))
path_to_result_clusters = os.path.join(
base_path, "result_clusters.pickle")
with open(path_to_result_clusters, "rb") as fin:
self.merged_clusters = pickle.load(fin)
_ = pickle.load(fin)
self.centroids = pickle.load(fin)
self.top_clusters = top_clusters
self.top_recommendations = top_recommendations
self.feature_extractor = FeatureExtractorW2V()
with open(self.feature_extractor.path_to_ngrams, "r") as fin:
self.frequently_n_grams = json.load(fin)
with open(self.feature_extractor.path_to_word_tfidf, "rb") as fin:
self.word_tfidf = pickle.load(fin)
self.nlp_model = self.feature_extractor.load_model_from_pickle()
@staticmethod
def predict_cluster(feature, centroids, n_closest):
"""
Find n closest clusters given item feature
:param feature: input feature
:param centroids: centroids of clusters model
:param n_closest: top n_closest
:return: n_closest clusters indicies
"""
clusters_heap = []
for cluster_index, centroid in enumerate(centroids):
distance = numpy.linalg.norm(centroid - numpy.array(feature))
heappush(clusters_heap, (distance, cluster_index))
return nsmallest(n_closest, clusters_heap)
@staticmethod
def get_and_print_item_info(item_id, info_type):
"""
Prints item info given item id
:param item_id: input item id
:param info_type: type of item (recommended, input,
recommended base line, etc)
"""
cur.execute(
"SELECT listing_title, listing_description, "
"category_l3_name_en FROM samples_train WHERE item_id=%s;",
(item_id,))
res = cur.fetchone()
recomended_title, recomended_description, \
recomended_category = res
print(
"%s info: %s %s %s %s" %
(info_type, item_id, recomended_title, recomended_description,
recomended_category))
def get_recommended_candidates_from_item(self, item_id, dataset_name):
"""
Recommend items for one new item id
:param item_id: input item_id
:param dataset_name: database name with item_id information
:return: recommended items list with (distance, item_id)
"""
query = "SELECT listing_title, listing_description, listing_price, " \
"category_sk, category_l1_name_en, category_l2_name_en, " \
"category_l3_name_en, listing_latitude, listing_longitude " \
"FROM %s WHERE item_id=%%s;" % dataset_name
cur.execute(query, (item_id,))
item_info = cur.fetchone()
vector_feature = self.feature_extractor.extract_features_per_item_info(
item_info, self.frequently_n_grams, self.nlp_model, self.word_tfidf)
return self.get_recommended_candidates(vector_feature[1])
def get_recommended_candidates_base_line(self, category):
"""
Base line recommendation: return n random items from same category as
input category
:param category: input category
:return: n recommended candidates (n is defined at
self.top_recommendations)
"""
cur.execute(
"SELECT item_id FROM samples_train WHERE category_l3_name_en=%s"
" OFFSET RANDOM() * (SELECT COUNT(*) FROM "
"samples_train WHERE category_l3_name_en=%s) LIMIT %s;",
(category, category, self.top_recommendations))
res = cur.fetchmany(self.top_recommendations)
candidates_to_recommend_base_line = []
for row in res:
item_id, = row
candidates_to_recommend_base_line.append((None, item_id))
return candidates_to_recommend_base_line
def get_recommended_candidates(self, vector_feature):
"""
Recommendation: given vector feature return items from top n closest
clusters as candidates to recommend
:param vector_feature: input vector feature
:return: recommended candidates from n closest clusters (n is defined at
self.top_clusters)
"""
n_closest_clusters = self.predict_cluster(
vector_feature, self.centroids, self.top_clusters)
candidates_to_recommend = []
for distance, cluster_index in n_closest_clusters:
for candidate in self.merged_clusters[cluster_index]:
cur.execute(
"SELECT vector_feature FROM samples_train "
"WHERE item_id=%s;" % candidate)
res_candidate = cur.fetchone()
vector_feature_candidate, = res_candidate
candidates_to_recommend.append(
(numpy.linalg.norm(
numpy.array(vector_feature) -
numpy.array(vector_feature_candidate)),
candidate))
return candidates_to_recommend
def get_recommended_candidates_for_test_data(self):
"""
Gets recommedations and base line recommendations for test data
"""
db_query = "SELECT item_id, listing_title, listing_description, " \
"category_l3_name_en, vector_feature FROM samples_test;"
cur.execute(db_query)
res = cur.fetchall()
for row in res:
item_id, title, description, category, vector_feature = row
candidates_to_recommend = self.get_recommended_candidates(
vector_feature)
candidates_to_recommend_base_line = \
self.get_recommended_candidates_base_line(category)
print("***********************************************************"
"***********************************************************")
print("Item info: %s %s %s %s" %
(item_id, title, description, category))
for _, recomended_id in candidates_to_recommend[
:self.top_recommendations]:
self.get_and_print_item_info(recomended_id, "Recommeded")
for _, recomended_id_base_line in candidates_to_recommend_base_line:
self.get_and_print_item_info(
recomended_id_base_line, "Recommeded base line")
if __name__ == "__main__":
recommender = Recommender(5, 5)
recommender.get_recommended_candidates_for_test_data()
result = recommender.get_recommended_candidates_from_item(
1, "samples_train")