This repository has been archived by the owner on Nov 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase.py
137 lines (110 loc) · 4.31 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import sqlite3
import uuid
import utils
# Connect to database
db_config = utils.getFromConfig("database")
db_name = db_config.get('db_name', 'database')
hostname = db_config.get('hostname', 'localhost')
port = db_config.get('port', 27017)
if db_config.get('remote'):
conn = sqlite3.connect(f"file:{hostname}:{port}/{db_name}?mode=rw", uri=True, check_same_thread=False)
else:
conn = sqlite3.connect(f"{db_name}.db", check_same_thread=False)
cursor = conn.cursor()
# Create database table
database_base = '''
CREATE TABLE IF NOT EXISTS urls (
shortcode TEXT PRIMARY KEY,
url TEXT NOT NULL,
metadata TEXT
)
'''
cursor.execute(database_base)
conn.commit()
######################
# Database functions #
######################
def shortcode_exists(shortcode: str) -> bool:
cursor.execute('SELECT 1 FROM urls WHERE shortcode = ?', (shortcode,))
return cursor.fetchone() is not None
def insert_url(url: str, shortcode: str = None):
if shortcode is None:
shortcode = generate_unique_shortcode()
cursor.execute('INSERT INTO urls (shortcode, url) VALUES (?, ?)', (shortcode, url))
conn.commit()
# Generate a unique shortcode using hex representation of a UUID
def generate_unique_shortcode() -> str:
while True:
shortcode = uuid.uuid4().hex[:utils.getFromConfig("short_url_length")]
if not shortcode_exists(shortcode):
return shortcode
# Get URL from shortcode
def get_url(shortcode) -> str:
cursor.execute('SELECT url FROM urls WHERE shortcode = ?', (shortcode,))
result = cursor.fetchone()
return result[0] if result else None
# Get all URLs in a list of URLShortcode objects
def get_all_urls() -> list[utils.URLShortcode]:
cursor.execute('SELECT * FROM urls')
return [utils.URLShortcode(url, shortcode, metadata) for shortcode, url, metadata in cursor.fetchall()]
def update_url(shortcode, new_url) -> None:
cursor.execute('UPDATE urls SET url = ? WHERE shortcode = ?', (new_url, shortcode))
conn.commit()
def delete_url(shortcode) -> None:
cursor.execute('DELETE FROM urls WHERE shortcode = ?', (shortcode,))
conn.commit()
def purgeAllData() -> None:
cursor.execute('DROP TABLE urls')
conn.commit()
cursor.execute(database_base)
conn.commit()
# Append metadata
def appendMetadata(shortcode: str, key: str, value: str) -> None:
cursor.execute('SELECT metadata FROM urls WHERE shortcode = ?', (shortcode,))
metadata = cursor.fetchone()[0]
if metadata is None or metadata == 'None':
metadata = {}
else:
metadata = eval(metadata)
metadata[key] = value
cursor.execute('UPDATE urls SET metadata = ? WHERE shortcode = ?', (str(metadata), shortcode))
conn.commit()
# Get metadata
def getMetadata(shortcode: str) -> dict:
cursor.execute('SELECT metadata FROM urls WHERE shortcode = ?', (shortcode,))
metadata = cursor.fetchone()[0]
return eval(metadata) if metadata is not None else {}
# Remove metadata
def removeMetadata(shortcode: str, key: str) -> None:
cursor.execute('SELECT metadata FROM urls WHERE shortcode = ?', (shortcode,))
metadata = cursor.fetchone()[0]
if metadata is None or metadata == 'None':
return
metadata = eval(metadata)
if key in metadata:
metadata.pop(key)
cursor.execute('UPDATE urls SET metadata = ? WHERE shortcode = ?', (str(metadata), shortcode))
conn.commit()
# Update metadata
def updateMetadata(shortcode: str, key: str, value: str) -> None:
cursor.execute('SELECT metadata FROM urls WHERE shortcode = ?', (shortcode,))
metadata = cursor.fetchone()[0]
if metadata is None or metadata == 'None':
return
metadata = eval(metadata)
if key in metadata:
metadata[key] = value
cursor.execute('UPDATE urls SET metadata = ? WHERE shortcode = ?', (str(metadata), shortcode))
conn.commit()
def hideUrl(shortcode: str) -> None:
appendMetadata(shortcode, 'hidden', True)
# Test the database functions
if __name__ == '__main__':
input('This will purge all data in the database.\nPress Enter to continue...')
purgeAllData()
for i in range(100):
print(i)
insert_url(f'https://example.com/{i}', {i})
# print(eval(get_all_urls()[0].metadata))
from main import main
main()