This repository has been archived by the owner on Mar 10, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
360 lines (332 loc) · 13.6 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
import datetime
import logging
import os
import time
import traceback
from urllib.parse import quote
import pandas as pd
import requests
from sqlalchemy.schema import DropTable
from sqlalchemy import inspect
from sqlalchemy.exc import NoSuchTableError
from tenacity import retry, wait_exponential, stop_after_attempt
from zoomus import ZoomClient
import config
from mailer import Mailer
from timer import elapsed
RETRY_PARAMS = {
"stop": stop_after_attempt(3),
"wait": wait_exponential(multiplier=2, min=4, max=10),
}
class Connector:
"""
Data connector for extracting Zoom data from API, transforming into
dataframes, and loading into database.
"""
def __init__(self):
self.client = ZoomClient(config.ZOOM_KEY, config.ZOOM_SECRET)
self.sql = config.db_connection()
self.inspector = inspect(self.sql.engine)
def drop_table(self, table_name):
"""Drop table before loading new data to reset schema on load"""
try:
table = self.sql.table(table_name)
self.sql.engine.execute(DropTable(table))
except NoSuchTableError:
logging.debug(
f"No such table {table_name} exists to drop. This command ignored."
)
@elapsed
@retry(**RETRY_PARAMS)
def load_users(self):
"""Load Zoom user data in order to query meetings"""
page = 1
table_name = "Zoom_Users"
total_users = 0
response = self.client.user.list(page_size=300, page_number=page).json()
page_count = response["page_count"]
if response:
self.drop_table(table_name)
while page <= page_count:
response = self.client.user.list(page_size=300, page_number=page).json()
results = response.get("users")
if results:
total_users += len(results)
users = pd.DataFrame(results)
users = users.reindex(columns=config.USER_COLUMNS)
self.sql.insert_into(table_name, users)
logging.debug(f"Inserted {len(users)} records into {table_name}")
page += 1
logging.info(f"Inserted {total_users} users into {table_name}")
def _get_meeting_uuids(self):
"""
Get list of all meeting uuids that there is not participants
data for yet in the database
"""
if self.inspector.has_table(
table_name="Zoom_Participants", schema=self.sql.schema
):
meetings = self.sql.query(
"""SELECT DISTINCT zm.uuid
FROM custom.Zoom_Meetings zm
LEFT JOIN custom.Zoom_Participants zp
ON zm.uuid = zp.meeting_uuid
WHERE zp.meeting_uuid IS NULL"""
)
else:
meetings = pd.read_sql_table(
table_name="Zoom_Meetings", con=self.sql.engine, schema=self.sql.schema
)
meeting_ids = meetings["uuid"].values.flatten().tolist()
return meeting_ids
@elapsed
@retry(**RETRY_PARAMS)
def load_participants(self):
"""Load Zoom meeting participants"""
table_name = "Zoom_Participants"
uuids = self._get_meeting_uuids()
total_participants = 0
for uuid in uuids:
page_token = True
params = {"meeting_id": uuid, "page_size": 300, "type": "past"}
while page_token:
response = self.client.metric.list_participants(**params).json()
if response.get("code") == 429:
logging.info("Rate limit reached; waiting 10 seconds...")
time.sleep(10)
results = response.get("participants")
if results:
results = [dict(item, meeting_uuid=uuid) for item in results]
total_participants += len(results)
participants = pd.DataFrame(results)
self.sql.insert_into(table_name, participants)
logging.debug(
f"Inserted {len(participants)} participants for meeting {uuid} into {table_name}"
)
page_token = response.get("next_page_token")
params["next_page_token"] = page_token
logging.info(
f"Inserted {total_participants} participants for {len(uuids)} meetings into {table_name}"
)
@elapsed
@retry(**RETRY_PARAMS)
def load_groups(self):
"""Load Zoom permission groups"""
table_name = "Zoom_Groups"
response = self.client.group.list().json()
if response:
self.drop_table(table_name)
results = response.get("groups")
if results:
groups = pd.DataFrame(results)
self.sql.insert_into(table_name, groups)
logging.info(f"Inserted {len(groups)} records into {table_name}")
def _get_group_ids(self, groupname=None):
"""Get list of group ids to iterate API calls for group members"""
groups = pd.read_sql_table(
table_name="Zoom_Groups", con=self.sql.engine, schema=self.sql.schema
)
if groupname:
groups = groups[groups["name"] == groupname]
group_ids = groups["id"].values.flatten().tolist()
return group_ids
@elapsed
@retry(**RETRY_PARAMS)
def load_group_members(self):
"""Load Zoom group member data"""
table_name = "Zoom_GroupMembers"
self.drop_table(table_name)
total_group_members = 0
for group_id in self._get_group_ids():
page = 1
params = {"groupid": group_id, "page_size": 300, "page_number": page}
response = self.client.group.list_members(**params).json()
page_count = response["page_count"]
while page <= page_count:
response = self.client.group.list_members(**params).json()
results = response.get("members")
if results:
total_group_members += len(results)
members = pd.DataFrame(results)
members["groupId"] = group_id
self.sql.insert_into(table_name, members)
logging.debug(f"Inserted {len(members)} records into {table_name}")
page += 1
params["page"] = page
logging.info(f"Inserted {total_group_members} group members into {table_name}")
def _get_students(self):
"""Query database for new students without Zoom accounts"""
return pd.read_sql_table(
"vw_Zoom_NewStudentAccounts", con=self.sql.engine, schema=self.sql.schema
)
@elapsed
def create_student_accounts(self):
"""Create Zoom accounts for students and add to Students group"""
students = self._get_students()
BASIC_USER = 1
students["type"] = BASIC_USER
students = students.to_dict(orient="records")
emails = [{"email": student["email"]} for student in students]
student_group_id = self._get_group_ids("Students")[0]
# Create accounts
for student in students:
try:
r = self.client.user.create(action="create", user_info=student)
r.raise_for_status()
logging.info(f"Created account for {student['email']}")
except requests.exceptions.HTTPError as e:
logging.error(e)
# Add to Students group
try:
r = self.client.group.add_members(groupid=student_group_id, members=emails)
r.raise_for_status()
logging.info(f"Added {len(emails)} new users to Students group.")
except requests.exceptions.HTTPError as e:
logging.error(e)
logging.error(r.text)
@elapsed
@retry(**RETRY_PARAMS)
def load_meetings(self):
"""
Load Zoom Meetings data for the prior day based on the last
date data is available for in the database. If no prior data
exists, it will pull from the start of August for the current
school year.
"""
run_date = self.get_last_meeting_date()
if run_date >= datetime.datetime.today().date():
return
table_name = "Zoom_Meetings"
page_token = True
meetings = []
params = {
"page_size": 300,
"type": "past",
"from": run_date,
"to": run_date,
}
while page_token:
response = self.client.metric.list_meetings(**params).json()
if response.get("code") == 429:
logging.info("Rate limit reached; waiting 60 seconds...")
time.sleep(60)
results = response.get("meetings")
if results:
meetings.extend(results)
page_token = response.get("next_page_token")
params["next_page_token"] = page_token
if meetings:
df = pd.DataFrame(meetings)
self.sql.insert_into(table_name, df)
logging.info(
f"Inserted {len(df)} meeting records into {table_name} for {run_date.strftime('%Y-%m-%d')}"
)
def get_school_start_date(self):
"""Get the first day in Aug of the current school year"""
today = datetime.datetime.today()
if today.month > 6:
year = today.year
else:
year = today.year - 1
return datetime.date(year, 8, 1)
def get_last_meeting_date(self):
"""
Get the last date that meeting data is available for in the
database, otherwise return the first day in August.
"""
date = self.get_school_start_date()
if self.inspector.has_table(table_name="Zoom_Meetings", schema=self.sql.schema):
df = pd.read_sql_table(
table_name="Zoom_Meetings", con=self.sql.engine, schema=self.sql.schema
)
previous_date = df.start_time.max()
if len(df) > 0 and previous_date:
previous_date = datetime.datetime.strptime(
previous_date, "%Y-%m-%dT%H:%M:%S%z"
)
date = previous_date.date() + datetime.timedelta(days=1)
return date
def get_meeting_settings(self):
"""Load Zoom meeting settings"""
table_name = "Zoom_Meeting_Settings"
meeting_ids = self._get_meeting_ids()
total_settings = 0
for meeting_id in meeting_ids:
page_token = True
params = {"id": meeting_id, "page_size": 300, "type": "past"}
results = []
while page_token:
response = self.client.meeting.get(**params).json()
# HTTP Error 429: Too Many Requests
if response.get("code") == 429:
logging.info("Rate limit reached; waiting 10 seconds...")
time.sleep(10)
# Zoom Error 3001: Meeting does not exist
if response.get("code") == 3001:
logging.debug(response.get("message"))
settings = response.get("settings", {})
results.append(self._format_settings_data(settings, meeting_id))
page_token = response.get("next_page_token")
params["next_page_token"] = page_token
if results:
total_settings += len(results)
settings = pd.DataFrame(results)
self.sql.insert_into(table_name, settings)
logging.debug(
f"Inserted {len(settings)} settings for meeting {meeting_id} into {table_name}"
)
logging.info(
f"Inserted {total_settings} settings for {len(meeting_ids)} meetings into {table_name}"
)
def _get_meeting_ids(self):
"""
Get list of all meeting ids that there is not settings
data for yet in the database
"""
if self.sql.engine.has_table("Zoom_Meeting_Settings", schema=self.sql.schema):
meetings = self.sql.query(
"""SELECT DISTINCT zm.id
FROM custom.Zoom_Meetings zm
LEFT JOIN custom.Zoom_Meeting_Settings zms
ON zm.id = zms.meeting_id
WHERE zms.meeting_id IS NULL"""
)
else:
meetings = pd.read_sql_table(
table_name="Zoom_Meetings", con=self.sql.engine, schema=self.sql.schema
)
meeting_ids = meetings["id"].values.flatten().tolist()
return meeting_ids
def _format_settings_data(self, item, meeting_id):
"""Extract meeting settings fields related to authentication."""
return {
"meeting_id": meeting_id,
"enforce_login": item.get("enforce_login"),
"enforce_login_domains": item.get("enforce_login_domains"),
"waiting_room": item.get("waiting_room"),
"meeting_authentication": item.get("meeting_authentication"),
"authentication_domains": item.get("authentication_domains"),
"authentication_name": item.get("authentication_name"),
}
def main():
config.set_logging()
connector = Connector()
if config.USERS:
connector.load_users()
connector.load_groups()
connector.load_group_members()
if config.ACCOUNTS:
connector.create_student_accounts()
if config.MEETINGS:
connector.load_meetings()
connector.load_participants()
connector.get_meeting_settings()
if __name__ == "__main__":
try:
main()
error_message = None
except Exception as e:
logging.exception(e)
error_message = traceback.format_exc()
if config.ENABLE_MAILER:
Mailer("Zoom Connector").notify(error_message=error_message)