-
Notifications
You must be signed in to change notification settings - Fork 0
/
twitter_utilities.py
158 lines (142 loc) · 5.44 KB
/
twitter_utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import tweepy
import os
import time
from pathlib import Path
import pandas as pd
def authentication(pathToDevKeyAndSecret, pathToTwitterAuthData):
try:
f = open(pathToDevKeyAndSecret, "r")
except IOError:
print("file with key and secret of Twitter app not found")
print("ask to the developer or register your app\n")
exit()
consumer_key = f.readline().rstrip('\n')
consumer_secret = f.readline().rstrip('\n')
f.close()
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
twitterAuthData = Path(pathToTwitterAuthData)
if(not twitterAuthData.is_file() or
os.stat(pathToTwitterAuthData).st_size == 0):
# no previous authentication data, need to autenthicate via browser
try:
redirect_url = auth.get_authorization_url()
print("Redirect url:", redirect_url)
except tweepy.TweepError:
print('Error! Failed to get request token.')
verifier = input('Verifier:')
try:
auth.get_access_token(verifier)
except tweepy.TweepError:
print('Error! Failed to get access token.')
access_token = auth.access_token
access_token_secret = auth.access_token_secret
twitterAuthData = open(pathToTwitterAuthData, "w")
twitterAuthData.write(auth.access_token+"\n" +
auth.access_token_secret+"\n")
twitterAuthData.close()
else:
twitterAuthData = open(pathToTwitterAuthData, "r")
access_token = twitterAuthData.readline().rstrip('\n')
access_token_secret = twitterAuthData.readline().rstrip('\n')
twitterAuthData.close()
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
print('Authentication completed with success')
return api
def limit_handled(cursor):
while True:
try:
yield next(cursor)
except tweepy.RateLimitError:
print("Sleeping")
time.sleep(15*60)
except tweepy.TweepError as e:
print(e)
code_elem = str(e).split(" ")
code_err = code_elem[-1]
if code_err == '429':
print("Sleeping")
time.sleep(15*60)
else:
return
except StopIteration:
return
def store_users(api, ids, output_file):
print("Searching users...\n")
users_from_tweetids = []
for tweetid in ids['tweet_id']:
try:
status = api.get_status(int(tweetid), trim_user=True)
users_from_tweetids.append(status.user.id)
except tweepy.RateLimitError:
print("sleeping")
time.sleep(15*60)
except tweepy.TweepError as e:
print(e)
continue
users_from_tweetids = set(users_from_tweetids)
users_from_tweetids = list(users_from_tweetids)
fout = open(output_file, "a")
for user in users_from_tweetids:
fout.write(str(user)+"\n")
print("All users stored\n")
def store_timelines_as_txt_cleaned(api, users_id, fout_path, since_id):
counter = 0
for user in users_id:
print(str(counter))
counter += 1
fout = open(fout_path+str(user), "w")
for status in limit_handled(tweepy.Cursor(api.user_timeline,
user_id=user, since_id=since_id,
tweet_mode="extended").items()):
fout.write(str(status.id)+"\t")
if (status.full_text.startswith("RT @") is True):
try:
status = status.retweeted_status
except:
status = status
new_tweet = ""
tweet_cleaned = status.full_text.split("\n")
for sintagma in tweet_cleaned:
new_tweet = new_tweet + " " + sintagma
new_tweet2 = ""
tweet_cleaned2 = new_tweet.split("\t")
for sintagma2 in tweet_cleaned2:
new_tweet2 = new_tweet2 + " " + sintagma2
fout.write(new_tweet2 + "\n")
fout.close()
def store_timelines_as_df(api, users_id, fout_path, since_id):
print("Collecting users timelines since " + str(since_id) + "\n")
counter = 1
for user in users_id:
try:
open(fout_path+str(user)+".csv", "r")
print("already downloaded")
counter += 1
#aggiungere check dimensione file utente
continue
except FileNotFoundError:
print("new user")
print(str(counter)+"/"+str(len(users_id)))
counter += 1
data = {'user_id': [],
'tweet_id': [],
'text': []
}
df = pd.DataFrame(data)
for status in limit_handled(tweepy.Cursor(api.user_timeline,
user_id=user, since_id=since_id,
tweet_mode="extended").items()):
if (status.full_text.startswith("RT @") is True):
try:
status = status.retweeted_status
except:
status = status
new_row = {
'user_id': str(user),
'tweet_id': str(status.id),
'text': status.full_text
}
df = df.append(new_row, ignore_index=True)
df.to_csv(fout_path+str(user)+".csv", index=False)
print("Collection completed\n")