-
Notifications
You must be signed in to change notification settings - Fork 14
/
amazon_scrape.py
186 lines (148 loc) · 5.8 KB
/
amazon_scrape.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
from time import sleep
from lxml import html
import requests
import dataset
import csv
from threading import Thread
from queue import Queue
import sys
import random
import logging
logging.basicConfig(
format='%(asctime)s %(name)s %(levelname)s %(threadName)s %(message)s',
level=logging.INFO
)
# Number of workers
THREADS = 30
# List of subdomains to use with gmodules.
PROXY_SUB = [
'hosting', 'ig', '0.blogger', '1.blogger',
'blogger', '0.open', '1.open', '2.open'
]
# Gmodules proxy url, you can set the cache time using the *refresh* param.
# Disguise the request as a google bot using an actual google server.
# *** do not tell google that ***
PROXY = 'gmodules.com/gadgets/proxy?refresh=0&' \
'container=gplus&gadgets=http%3A%2F%2Forkut.com%2Fimg.xml&url='
# Amazon URL
URL = 'http://www.amazon.com/dp/'
# Strip a list of str and encode as utf8
def strip(str_list):
str_list = [x.encode('utf-8') for x in str_list]
return list(filter(None, map(lambda x: x.strip(), str_list)))
# Worker
def fetch():
# One DB connection per thread,
# bc I had too many issues using one instance shared between threads
db_thread = dataset.connect('sqlite:///dataset.db')
while True:
if not q.empty():
identifier = q.get()
# Randomly selects a subdomain for gmodules
subdomain = 'http://www.{}.'.format(random.choice(PROXY_SUB))
page = requests.get(subdomain + PROXY + URL + identifier)
try:
doc = html.fromstring(page.content)
# Product title
name = strip(doc.xpath('//h1[@id="title"]//text()'))
# List of the products category tree
category = strip(
doc.xpath(
'//a[@class="a-link-normal a-color-tertiary"]//text()'
)
)
# List of features of the product
features = strip(
doc.xpath(
'//div[@id="feature-bullets"]//'
'span[@class="a-list-item"]/text()'
)
)
# Description or editor review of the product
description = strip(
doc.xpath('//div[@id="productDescription"]//p/text()')
)
# Grabs page title to detect if
# amazon classified this request as a bot
page_title = strip(
doc.xpath('//title/text()')
)
# Transform all lists into strings
name = b' '.join(name) if name else ''
category = b' > '.join(category) if category else ''
features = b'. '.join(features) if features else ''
description = b' '.join(description) if description else ''
page_title = b''.join(page_title) if page_title else ''
data = {
'identifier': identifier,
'name': name,
'category': category,
'features': features,
'description': description,
'status': str(page.status_code)
}
# Only add to database if the product has one
# of the attributes: category, features, description or name
# OR if the request status code is 404
# and the title is not "Robot Check".
if any(
[category, features, description, name]
) or (
data.get('status') == '404' and page_title != 'Robot Check'
):
# Not using transaction bc of
# issues a ran into using sqlite on disk
db_thread['products'].insert(data)
logging.info('done fetching for {} - {} {}'.format(
URL + identifier, data.get('status'), page_title)
)
else:
pass
# Amazon knows you are a bot.
# logging.info('done fetching for {} - {} {}'.format(
# URL + identifier, '302', page_title)
# )
# The identifier should be put once again into the Queue.
# But the denial rate is bigger than task consumption.
# q.put(identifier)
q.task_done()
page.close()
except Exception as e:
logging.exception(e)
# Start queue prefetching number of threads * 2
q = Queue(THREADS * 2)
# Start workers
for i in range(THREADS):
t = Thread(target=fetch)
t.daemon = True
t.start()
try:
db = dataset.connect('sqlite:///dataset.db')
# Create a dict of ASINS already fetched,
# so it wont be fetched again.
id_filter = {}
id_filter = db.query('SELECT identifier FROM products')
id_filter = {x['identifier']: True for x in id_filter}
# Load csv with the ASINS.
asins = csv.reader(open('asins.csv'), delimiter=',')
for i in asins:
# Discard fetched ASINS.
if not id_filter.get(i[0], False):
while True:
# Add ASIN to the queue if it is not full.
# **Queue size: threads x 2**
if not q.full():
logging.info(
'enqueue request for {} - line: {} - qsize: {}'.format(
i[0],
asins.line_num,
q.qsize()
)
)
q.put(i[0])
break
# Wait 3 seconds before trying to enqueue again
sleep(3)
q.join()
except KeyboardInterrupt:
sys.exit(1)