forked from meetmangukiya/instagram-scraper
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinstagram_scraper.py
119 lines (103 loc) · 4.32 KB
/
instagram_scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import argparse
import csv
import os
import re
from typing import List
import requests
from requests_html import HTMLSession
# Source: http://blog.jstassen.com/2016/03/code-regex-for-instagram-username-and-hashtags/
REGEXES = {
'hashtag': re.compile('(?:#)([A-Za-z0-9_](?:(?:[A-Za-z0-9_]|(?:\.(?!\.))){0,28}(?:[A-Za-z0-9_]))?)'),
'username': re.compile('(?:@)([A-Za-z0-9_](?:(?:[A-Za-z0-9_]|(?:\.(?!\.))){0,28}(?:[A-Za-z0-9_]))?)'),
}
def scrape_instagram_tag(tag: str, total_count: int=50, existing: set=None):
"""
Scrape and yield recently tagged instagram photos.
"""
if existing is None:
existing = set()
url = f'https://www.instagram.com/explore/tags/{tag}'
session = HTMLSession()
req = session.get(url)
imgs = set(existing)
count = 0
page = 0
while count <= total_count:
req.html.render(scrolldown=page)
images = req.html.xpath('//img[@alt]')
page += 1
for image in images:
if count > total_count:
break
try:
url, caption = image.attrs['src'], image.attrs['alt']
except:
pass
else:
if url in imgs:
continue
imgs.add(url)
hashtags = set(REGEXES['hashtag'].findall(caption))
mentions = set(REGEXES['username'].findall(caption))
count += 1
yield url, caption, hashtags, mentions
def scrape_instagram(tags: List[str], total_count: int=50, existing: set=None):
"""
:param tags:
List of tags that need to be scraped.
:param total_count:
Total number of images to be scraped.
:param existing:
Set of URLs to escape.
"""
if existing is None:
existing = set()
for tag in tags:
yield from scrape_instagram_tag(tag, total_count, existing)
def main(tags, total_count, should_continue):
def _single_tag_processing(tag, total_count, existing_links, start):
os.makedirs(f'data/{tag}', exist_ok=True)
with open(f'data/{tag}/data.csv', 'a' if existing_links else 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile, delimiter=',')
for count, (url, caption, hashtags, mentions) in enumerate(scrape_instagram_tag(
tag, total_count, existing_links), start):
try:
req = requests.get(url)
with open(f'data/{tag}/{count}.jpg', 'wb') as img:
img.write(req.content)
except:
print(f'An error occured while downloading {url}')
else:
writer.writerow([
f'{count}.jpg',
url,
caption.replace('\n', '\\n'),
', '.join(hashtags),
', '.join(mentions)
])
print(f'[{tag}] downloaded {url} as {count}.jpg in data/{tag}')
for tag in tags:
existing_links = set()
start = 0
if os.path.exists(f'data/{tag}/data.csv') and should_continue:
with open(f'data/{tag}/data.csv', newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
for i, row in enumerate(reader):
existing_links.add(row[1])
start = i + 1
_single_tag_processing(tag, total_count, existing_links, start)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--tags', '-t', nargs='+',
help='Tags to scrape images from')
parser.add_argument('--count', '-c', type=int, default=50,
help='Total number of images to scrape for each given '
'tag.')
parser.add_argument('--continue', '-C',
default=False, action='store_true', dest='cont',
help='See existing data, and do not parse those again, '
'and append to the data file, instead of a rewrite')
args = parser.parse_args()
assert args.tags, "Enter tags to scrape! Use --tags option, see help."
assert args.count, "Enter total number of images to scrape using --count option, see help."
main(args.tags, args.count, args.cont)