-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
205 lines (177 loc) · 6.26 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""
Author: Priyanka Shukla
This script extracts user songs played in the last 24 hours,
loads the data into a database
"""
from datetime import datetime
import datetime
import base64
import urllib
import psycopg2
import sqlalchemy
import pandas as pd
import requests as re
from selenium import webdriver
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from os import environ as env
DATABASE_ENGINE = env.get("DATABASE_ENGINE")
TOKEN_URL = env.get("TOKEN_URL")
CLIENT_ID = env.get("CLIENT_ID")
CLIENT_SECRET = env.get("CLIENT_SECRET")
CHROME_DRIVER_PATH = env.get("CHROME_DRIVER_PATH")
# pylint: disable=W0621,W0703
def check_if_valid_data(data_frame: pd.DataFrame) -> bool:
"""
Function to check if data is valid
param: DataFrame
returns: Boolean
"""
# Check if dataframe is empty
if data_frame.empty:
print("No songs downloaded. Finishing execution")
return False
# Primary key check
if pd.Series(data_frame["played_at"]).is_unique:
pass
else:
raise Exception("Primary key check failed. Terminating program")
# Check if null values exist
if data_frame.isnull().values.any():
raise Exception("Null values found. Terminating program")
# Check that all timestamps are of yesterday's date
# yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
# yesterday = yesterday.replace(hour=0, minute=0, second=0, microsecond=0)
# today = datetime.datetime.now()
# yesterday = today - datetime.timedelta(days = 1)
# print("yesterday:", yesterday)
# timestamps = data_frame["ts"].tolist()
# print("timestamps", timestamps)
# for timestamp in timestamps:
# # trim_ts = datetime.datetime.strptime(str(timestamp), '%Y-%m-%d')
# trim_ts = str(timestamp)
# print("trim_ts:",trim_ts)
# if trim_ts < yesterday:
# raise Exception(
# "At least one of the returned songs does not have yesterday's timestamp"
# )
return True
def access_token():
"""
This method is to generate access token
returns: access token
"""
auth_code = {
"response_type": "code",
"client_id": CLIENT_ID,
"scope": "user-read-recently-played",
"redirect_uri": "http://localhost:8080",
}
driver = webdriver.Chrome(CHROME_DRIVER_PATH)
driver.get(
"https://accounts.spotify.com/authorize?" + urllib.parse.urlencode(auth_code)
)
wait = WebDriverWait(driver, 60)
wait.until(EC.url_contains("http://localhost:8080"))
get_url = driver.current_url
print("The current url is:" + str(get_url))
url_code = str(get_url)
driver.quit()
idx = (url_code.find("=")) + 1
code = (url_code[idx:-4].lstrip()).rstrip()
print("code:", code)
# set header
encode_id_secret = f"{CLIENT_ID}:{CLIENT_SECRET}".encode("ascii")
auth_header = base64.b64encode(encode_id_secret)
auth_header = auth_header.decode("ascii")
headers = {
"Authorization": f"Basic {auth_header}",
"Content-Type": "application/x-www-form-urlencoded",
}
# data
payload = {
"code": code,
"redirect_uri": "http://localhost:8080",
"grant_type": "authorization_code",
}
# Make a request to the /token endpoint to get an access token
access_token_request = re.post(
TOKEN_URL, headers=headers, data=payload, timeout=180
)
# convert the response to JSON
access_token_response_data = access_token_request.json()
try:
return access_token_response_data["access_token"]
except KeyError:
err = "\x1b[0;30;41m" + "Error ocured" + "\x1b[0m"
print(err, "(Make sure you enter right code)")
return None
if __name__ == "__main__":
access_token = access_token()
print(f"access_token: {access_token}")
headers = {"Authorization": f"Bearer {access_token}"}
today = datetime.datetime.now()
today_unix_ts = int(today.timestamp()) * 1000
yesterday = today - datetime.timedelta(days=1)
# print("yesterday ts:" , yesterday)
yesterday_unix_ts = int(yesterday.timestamp()) * 1000
# print("yesterday ts unix ", yesterday_unix_ts)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
RECENTLY_PLAYED_URL = "https://api.spotify.com/v1/me/player/recently-played"
recently_played = re.get(f"{RECENTLY_PLAYED_URL}", headers=headers, timeout=180)
print(recently_played)
data = recently_played.json()
song_names = []
artist_names = []
played_at_list = []
timestamps = []
for song in data["items"]:
song_names.append(song["track"]["name"])
artist_names.append(song["track"]["album"]["artists"][0]["name"])
played_at_list.append(song["played_at"])
timestamps.append(song["played_at"][0:10])
song_dict = {
"song_name": song_names,
"artist_name": artist_names,
"played_at": played_at_list,
"ts": timestamps,
}
song_data_frame = pd.DataFrame(data=song_dict)
print("Spotify data data_frame format:", song_data_frame)
# Validate
if check_if_valid_data(song_data_frame):
print("Data valid, proceed to load stage.")
# Load
conn = psycopg2.connect("dbname=spotify_trends user=postgres password=pri123")
db_engine = sqlalchemy.create_engine(DATABASE_ENGINE)
# print("db-eng ",db_engine)
if conn:
cursor = conn.cursor()
SQL_QUERY = """
CREATE TABLE IF NOT EXISTS tracks.my_played_tracks(
song_name VARCHAR(250) NOT NULL,
artist_name VARCHAR(250) NOT NULL,
played_at TIMESTAMP PRIMARY KEY NOT NULL
);
"""
cursor.execute(SQL_QUERY)
conn.commit()
print("Database opened successfully")
else:
print("Database connection failed")
try:
song_data_frame.to_sql(
"my_played_tracks",
con=db_engine,
index=False,
schema="tracks",
if_exists="append",
)
except Exception as e:
print("Data is already present")
conn.close()
print("Database closed")
# Schedule