-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
executable file
·238 lines (209 loc) · 9.1 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
#############################################################################
# Copyright (C) 2023 CrowdWare
#
# This file is part of Shift.
#
# Shift is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Shift is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Shift. If not, see <http://www.gnu.org/licenses/>.
#
#############################################################################
from datetime import datetime, timedelta
import os
from flask import Flask
from flask import request
from flask import jsonify
from shift_keys import SHIFT_API_KEY
from shift_keys import SHIFT_DB_PWD
from shift_keys import SHIFT_DB_USER
from shift_keys import SHIFT_DATABASE
from shift_keys import SHIFT_SECRET_KEY
from shift_keys import SHIFT_CLIENT_KEY
from mysql.connector import connect
from mysql.connector.errors import IntegrityError
from Crypto.Cipher import AES
import binascii
def dbConnect():
db = connect(unix_socket="/var/run/mysqld/mysqld.sock",
user=SHIFT_DB_USER,
password=SHIFT_DB_PWD,
database=SHIFT_DATABASE)
return db
def isScooping(scoop):
scoop_date = datetime(1970, 1, 1) + timedelta(seconds=scoop)
time_since = datetime.now() - scoop_date
seconds = int(time_since.total_seconds())
if (seconds / 60.0 / 60.0) > 20:
return False
return True
def getScoopTimeHoursAgo(hoursAgo=24):
first_date = datetime(1970, 1, 1)
time_since = datetime.now() - first_date
seconds = int(time_since.total_seconds())
seconds -= hoursAgo * 60 * 60
return seconds
def decryptStringGCM(cipherText: str) -> str:
try:
data = binascii.unhexlify(cipherText)
iv, tag = data[:12], data[-16:]
cipher = AES.new(SHIFT_SECRET_KEY.encode('utf-8'), AES.MODE_GCM, iv)
plaintext = cipher.decrypt_and_verify(data[12:-16], tag)
return plaintext.decode("utf-8")
except ValueError as error:
return ""
def encryptStringGCM(plainText):
try:
iv = os.urandom(12)
secretKey = SHIFT_SECRET_KEY.encode('utf-8')
cipher = AES.new(secretKey, AES.MODE_GCM, iv)
ciphertext, tag = cipher.encrypt_and_digest(plainText.encode('utf-8'))
encryptedData = iv + ciphertext + tag
encryptedHex = binascii.hexlify(encryptedData).decode('utf-8')
return encryptedHex
except ValueError as error:
return ""
app = Flask(__name__)
@app.route('/')
def hello_world():
return 'Hello here is the webservice of Shift!'
@app.route('/register', methods=['POST'])
def register():
content = request.json
key = decryptStringGCM(content['key'])
name = content['name']
uuid = content['uuid']
ruuid = content['ruuid']
country = content['country']
language = content['language']
test = content["test"] # used only for unit testing
if key != SHIFT_API_KEY:
return jsonify(isError=True, message="wrong api key", statusCode=200)
if test != "true":
conn = None
try:
conn = dbConnect()
if uuid != ruuid:
curs = conn.cursor(dictionary=True)
query = 'SELECT COUNT(*) AS count FROM account WHERE uuid = "' + ruuid + '"'
curs.execute(query)
row = curs.fetchone()
count = row['count']
if count != 1:
return jsonify(isError=True, message="The referer id is not correct.", statusCode=200)
curs = conn.cursor()
query = 'INSERT INTO account(name, uuid, ruuid, scooping, country, language) VALUES("' + name + '", "' + uuid + '", "' + ruuid + '", 0, "' + country + '","' + language +'")'
curs.execute(query)
conn.commit()
except IntegrityError as error:
return jsonify(isError=True, message=error.msg, statusCode=200)
finally:
if conn != None:
conn.close()
return jsonify(isError=False, message="Success", statusCode=200)
@app.route('/setscooping', methods=['POST'])
def scooping():
content = request.json
key = decryptStringGCM(content['key'])
uuid = content['uuid']
time = content['time']
test = content["test"] # used only for unit testing
if key != SHIFT_API_KEY:
return jsonify(isError = True, message = "wrong api key: ", statusCode = 200)
first_date = datetime(1970, 1, 1)
time_since = datetime.now() - first_date
seconds = int(time_since.total_seconds())
level_1 = 0
level_2 = 0
level_3 = 0
if test == "true":
level_1 = 9
level_2 = 99
level_3 = 999
else:
conn = None
scoopTime24HoursAgo = getScoopTimeHoursAgo()
try:
conn = dbConnect()
curs = conn.cursor(dictionary=True)
query = 'SELECT COUNT(*) AS count FROM account WHERE uuid = "' + uuid + '"'
curs.execute(query)
row = curs.fetchone()
count = row['count']
if count != 1:
return jsonify(isError=True, message="The user id is not registered.", statusCode=200)
curs = conn.cursor()
query = 'UPDATE account SET scooping = ' + str(seconds) + ' WHERE uuid = "' + uuid + '"'
curs.execute(query)
conn.commit()
curs = conn.cursor(dictionary=True)
query = 'SELECT COUNT(*) AS count FROM account WHERE ruuid = "' + uuid + '" AND scooping > ' + str(scoopTime24HoursAgo)
curs.execute(query)
row = curs.fetchone()
level_1 = row['count']
curs = conn.cursor(dictionary=True)
query = 'SELECT COUNT(*) AS count FROM account WHERE ruuid IN (SELECT uuid FROM account WHERE ruuid = "' + uuid + '") AND scooping > ' + str(scoopTime24HoursAgo)
curs.execute(query)
row = curs.fetchone()
level_2 = row['count']
curs = conn.cursor(dictionary=True)
query = 'SELECT COUNT(*) AS count FROM account WHERE ruuid IN (SELECT uuid FROM account WHERE ruuid IN (SELECT uuid FROM account WHERE ruuid = "' + uuid + '")) AND scooping >' + str(scoopTime24HoursAgo)
curs.execute(query)
row = curs.fetchone()
level_3 = row['count']
except IntegrityError as error:
return jsonify(isError=True, message=error.msg, statusCode=200)
finally:
if conn != None:
conn.close()
return jsonify(isError = False,
message = "Success",
key = encryptStringGCM(SHIFT_CLIENT_KEY + time),
statusCode = 200,
count_1 = level_1,
count_2 = level_2,
count_3 = level_3)
@app.route('/matelist', methods=['POST'])
def friendlist():
content = request.json
key = decryptStringGCM(content['key'])
uuid = content['uuid']
test = content["test"] # used only for unit testing
if key != SHIFT_API_KEY:
return jsonify(isError=True,
message="wrong api key",
statusCode=200)
accounts = []
if test == "true":
not_scooping = int((datetime.now() - timedelta(seconds=72001) - datetime(1970, 1, 1)).total_seconds())
scooping = int((datetime.now() - timedelta(seconds=71999) - datetime(1970, 1, 1)).total_seconds())
accounts.append({'uuid' : '1234567890', 'name' : 'Testuser 1', 'country' : 'England', 'scooping' : isScooping(0), 'friends_count' : 3})
accounts.append({'uuid' : '1234567891', 'name' : 'Testuser 2', 'country' : 'Germany' ,'scooping' : isScooping(not_scooping), 'friends_count' : 4})
accounts.append({'uuid' : '1234567892', 'name' : 'Testuser 3', 'country' : 'Brasil' , 'scooping' : isScooping(scooping), 'friends_count' : 5})
else:
conn = None
try:
conn = dbConnect()
curs = conn.cursor(dictionary=True)
query = 'SELECT uuid, name, country, scooping, (SELECT COUNT(*) FROM account WHERE ruuid = a.uuid) AS friends_count FROM account a WHERE ruuid = "' + uuid + '" and uuid <> "' + uuid + '" ORDER BY friends_count desc, scooping desc'
curs.execute(query)
for row in curs:
accounts.append({'uuid' : row['uuid'], 'name' : row['name'], 'country' : row['country'], 'scooping' : isScooping(row['scooping']), 'friends_count' : row['friends_count']})
except IntegrityError as error:
return jsonify(isError=True, message=error.msg, statusCode=200)
finally:
if conn != None:
conn.close()
return jsonify(isError=False,
message="Success",
statusCode=200, data=accounts)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)