-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspotify_etl.py
117 lines (91 loc) · 3.6 KB
/
spotify_etl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import sqlalchemy
import pandas as pd
from sqlalchemy.orm import sessionmaker
import requests
import json
from datetime import datetime
import datetime
import sqlite3
# Generate your token here: https://developer.spotify.com/console/get-recently-played/
# Note: You need a Spotify account (can be easily created for free)
def check_if_valid_data(df: pd.DataFrame) -> bool:
# Check if dataframe is empty
if df.empty:
print("No songs downloaded. Finishing execution")
return False
# Primary Key Check
if pd.Series(df['played_at']).is_unique:
pass
else:
raise Exception("Primary Key check is violated")
# Check for nulls
if df.isnull().values.any():
raise Exception("Null values found")
# Check that all timestamps are of yesterday's date
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
yesterday = yesterday.replace(hour=0, minute=0, second=0, microsecond=0)
timestamps = df["timestamp"].tolist()
for timestamp in timestamps:
if datetime.datetime.strptime(timestamp, '%Y-%m-%d') != yesterday:
raise Exception("At least one of the returned songs does not have a yesterday's timestamp")
return True
def run_spotify_etl():
DATABASE_LOCATION = "sqlite:///my_played_tracks.sqlite"
# USER_ID = ''
TOKEN = ''
# Extract part of the ETL process
headers = {
"Accept" : "application/json",
"Content-Type" : "application/json",
"Authorization" : "Bearer {token}".format(token=TOKEN)
}
# Convert time to Unix timestamp in miliseconds
today = datetime.datetime.now()
yesterday = today - datetime.timedelta(days=1)
yesterday_unix_timestamp = int(yesterday.timestamp()) * 1000
# Download all songs you've listened to "after yesterday", which means in the last 24 hours
r = requests.get("https://api.spotify.com/v1/me/player/recently-played?after={time}".format(time=yesterday_unix_timestamp), headers = headers)
data = r.json()
song_names = []
artist_names = []
played_at_list = []
timestamps = []
# Extracting only the relevant bits of data from the json object
for song in data["items"]:
song_names.append(song["track"]["name"])
artist_names.append(song["track"]["album"]["artists"][0]["name"])
played_at_list.append(song["played_at"])
timestamps.append(song["played_at"][0:10])
# Prepare a dictionary in order to turn it into a pandas dataframe below
song_dict = {
"song_name" : song_names,
"artist_name": artist_names,
"played_at" : played_at_list,
"timestamp" : timestamps
}
print(song_dict)
song_df = pd.DataFrame(song_dict, columns = ["song_name", "artist_name", "played_at", "timestamp"])
# Validate
if check_if_valid_data(song_df):
print("Data valid, proceed to Load stage")
# Load
engine = sqlalchemy.create_engine(DATABASE_LOCATION)
conn = sqlite3.connect('my_played_tracks.sqlite')
cursor = conn.cursor()
sql_query = """
CREATE TABLE IF NOT EXISTS my_played_tracks(
song_name VARCHAR(200),
artist_name VARCHAR(200),
played_at VARCHAR(200),
timestamp VARCHAR(200),
CONSTRAINT primary_key_constraint PRIMARY KEY (played_at)
)
"""
cursor.execute(sql_query)
print("Opened database successfully")
try:
song_df.to_sql("my_played_tracks", engine, index=False, if_exists='append')
except:
print("Data already exists in the database")
conn.close()
print("Close database successfully")