diff --git a/dtable_events/app/app.py b/dtable_events/app/app.py index b77d61e5..0a26e291 100644 --- a/dtable_events/app/app.py +++ b/dtable_events/app/app.py @@ -8,13 +8,14 @@ from dtable_events.tasks.dtable_updates_sender import DTableUpdatesSender from dtable_events.tasks.dtable_real_time_rows_counter import DTableRealTimeRowsCounter from dtable_events.tasks.ldap_syncer import LDAPSyncer +from dtable_events.tasks.big_data_storage_stats_worker import BigDataStorageStatsWorker +from dtable_events.tasks.dtable_asset_stats_worker import DTableAssetStatsWorker from dtable_events.notification_rules.handler import NotificationRuleHandler from dtable_events.notification_rules.dtable_notification_rules_scanner import DTableNofiticationRulesScanner from dtable_events.automations.handler import AutomationRuleHandler from dtable_events.automations.dtable_automation_rules_scanner import DTableAutomationRulesScanner from dtable_events.webhook.webhook import Webhooker from dtable_events.common_dataset.common_dataset_syncer import CommonDatasetSyncer -from dtable_events.tasks.big_data_storage_stats_worker import BigDataStorageStatsWorker from dtable_events.data_sync.data_syncer import DataSyncer from dtable_events.workflow.workflow_actions import WorkflowActionsHandler from dtable_events.workflow.workflow_schedules_scanner import WorkflowSchedulesScanner @@ -37,6 +38,8 @@ def __init__(self, config, task_mode): self._dtable_real_time_rows_counter = DTableRealTimeRowsCounter(config) self._workflow_actions_handler = WorkflowActionsHandler(config) self._webhooker = Webhooker(config) + # seaf events + self._dtable_asset_stat_worker = DTableAssetStatsWorker(config) # cron jobs self._instant_notices_sender = InstantNoticeSender(config) self._email_notices_sender = EmailNoticesSender(config) @@ -63,6 +66,8 @@ def serve_forever(self): self._dtable_real_time_rows_counter.start() # default True self._workflow_actions_handler.start() # always True self._webhooker.start() # always True + # seaf events + self._dtable_asset_stat_worker.start() # always True # cron jobs self._instant_notices_sender.start() # default True self._email_notices_sender.start() # default True diff --git a/dtable_events/dtable_io/__init__.py b/dtable_events/dtable_io/__init__.py index e940198f..c1ba525b 100644 --- a/dtable_events/dtable_io/__init__.py +++ b/dtable_events/dtable_io/__init__.py @@ -16,6 +16,8 @@ from selenium.webdriver.support.ui import WebDriverWait from datetime import datetime +from seaserv import seafile_api + from dtable_events.app.config import DTABLE_WEB_SERVICE_URL, SESSION_COOKIE_NAME, INNER_DTABLE_DB_URL from dtable_events.dtable_io.big_data import import_excel_to_db, update_excel_to_db, export_big_data_to_excel from dtable_events.dtable_io.utils import setup_logger, \ @@ -34,7 +36,7 @@ from dtable_events.dtable_io.task_manager import task_manager from dtable_events.statistics.db import save_email_sending_records, batch_save_email_sending_records from dtable_events.data_sync.data_sync_utils import run_sync_emails -from dtable_events.utils import get_inner_dtable_server_url +from dtable_events.utils import get_inner_dtable_server_url, uuid_str_to_32_chars, uuid_str_to_36_chars from dtable_events.utils.dtable_server_api import DTableServerAPI dtable_io_logger = setup_logger('dtable_events_io.log') @@ -1087,3 +1089,35 @@ def convert_big_data_view_to_execl(dtable_uuid, table_id, view_id, username, nam dtable_io_logger.info('export big data table_id: %s, view_id: %s success!', table_id, view_id) +def calc_dtable_asset_stats(repo_id_dtable_uuids_dict, config): + from dtable_events.tasks.dtable_asset_stats_worker import update_dtable_asset_sizes + + try: + db_session = init_db_session_class(config)() + except Exception as e: + dtable_io_logger.error('create db session failed. ERROR: {}'.format(e)) + raise Exception('create db session failed. ERROR: {}'.format(e)) + dtable_uuid_sizes = [] + for repo_id, dtable_uuids in repo_id_dtable_uuids_dict.items(): + try: + repo = seafile_api.get_repo(repo_id) + if not repo: + continue + for dtable_uuid in dtable_uuids: + asset_path = f'/asset/{uuid_str_to_36_chars(dtable_uuid)}' + asset_dir_id = seafile_api.get_dir_id_by_path(repo_id, asset_path) + if not asset_dir_id: + dtable_uuid_sizes.append([uuid_str_to_32_chars(dtable_uuid), 0]) + continue + size = seafile_api.get_file_count_info_by_path(repo_id, asset_path).size + dtable_uuid_sizes.append([uuid_str_to_32_chars(dtable_uuid), size]) + except Exception as e: + dtable_io_logger.exception(e) + dtable_io_logger.error('repo_id: %s dtable_uuids: %s stats asset error: %s', repo_id, dtable_uuids, e) + try: + update_dtable_asset_sizes(dtable_uuid_sizes, db_session) + except Exception as e: + dtable_io_logger.exception(e) + dtable_io_logger.error('update dtable asset sizes error: %s', e) + finally: + db_session.close() diff --git a/dtable_events/dtable_io/request_handler.py b/dtable_events/dtable_io/request_handler.py index 9bbe4722..610e7216 100644 --- a/dtable_events/dtable_io/request_handler.py +++ b/dtable_events/dtable_io/request_handler.py @@ -1155,3 +1155,25 @@ def query_plugin_email_send_status(): resp = dict(is_finished=is_finished) return make_response((resp, 200)) + + +@app.route('/add-calc-dtable-asset-stats', methods=['POST']) +def add_calc_dtable_asset_stats(): + is_valid, error = check_auth_token(request) + if not is_valid: + return make_response((error, 403)) + try: + data = json.loads(request.data) + except Exception as e: + logger.warning('add_calc_dtable_asset_stats parse json body error: %s', e) + return make_response(('request invalid, error: %s' % e, 400)) + repo_id_dtable_uuids_dict = data.get('repo_id_dtable_uuids_dict') + if not repo_id_dtable_uuids_dict or not isinstance(repo_id_dtable_uuids_dict, dict): + return make_response(('repo_id_dtable_uuids_dict invalid', 400)) + try: + task_id = task_manager.add_calc_dtable_asset_task(repo_id_dtable_uuids_dict) + except Exception as e: + logger.error('add_calc_dtable_asset_stats error: %s', e) + return make_response((e, 500)) + + return make_response(({'task_id': task_id}, 200)) diff --git a/dtable_events/dtable_io/task_manager.py b/dtable_events/dtable_io/task_manager.py index 7711bd7c..f6fe4e01 100644 --- a/dtable_events/dtable_io/task_manager.py +++ b/dtable_events/dtable_io/task_manager.py @@ -310,6 +310,15 @@ def add_app_users_sync_task(self, dtable_uuid, app_name, app_id, table_name, tab return task_id + def add_calc_dtable_asset_task(self, repo_id_dtable_uuids_dict): + from dtable_events.dtable_io import calc_dtable_asset_stats + task_id = str(uuid.uuid4()) + task = (calc_dtable_asset_stats, (repo_id_dtable_uuids_dict, self.config)) + self.tasks_queue.put(task_id) + self.tasks_map[task_id] = task + + return task_id + def threads_is_alive(self): info = {} for t in self.threads: diff --git a/dtable_events/tasks/dtable_asset_stats_worker.py b/dtable_events/tasks/dtable_asset_stats_worker.py new file mode 100644 index 00000000..d9f34df1 --- /dev/null +++ b/dtable_events/tasks/dtable_asset_stats_worker.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- +import logging +import time +import json +import stat +import uuid +from datetime import datetime +from threading import Thread, Event + +from seaserv import seafile_api + +from dtable_events.db import init_db_session_class +from dtable_events.utils import uuid_str_to_36_chars, uuid_str_to_32_chars + +logger = logging.getLogger(__name__) + + +def is_valid_uuid(test_str): + try: + uuid.UUID(test_str) + return True + except: + return False + + +def update_dtable_asset_sizes(dtable_uuid_sizes, db_session): + """ + :param dtable_uuid_sizes: a list of [dtable_uuid, size] + """ + step = 1000 + updated_at = datetime.utcnow() + for i in range(0, len(dtable_uuid_sizes), step): + updates = ', '.join(["('%s', %s, '%s')" % ( + uuid_str_to_32_chars(dtable_uuid_size[0]), dtable_uuid_size[1], updated_at + ) for dtable_uuid_size in dtable_uuid_sizes[i: i+step]]) + sql = ''' + INSERT INTO dtable_asset_stats(dtable_uuid, size, updated_at) VALUES %s + ON DUPLICATE KEY UPDATE size=VALUES(size), updated_at=VALUES(updated_at) + ''' % updates + db_session.execute(sql) + db_session.commit() + + +class DTableAssetStatsWorker(Thread): + def __init__(self, config): + Thread.__init__(self) + self._finished = Event() + self._db_session_class = init_db_session_class(config) + self.interval = 5 * 60 # listen to seafile event for some time and then calc dtable asset storage + self.last_stats_time = time.time() + self.flag_worker_running = False + + def run(self): + logger.info('Starting handle dtable asset stats...') + repo_id_ctime_dict = {} + while not self._finished.is_set(): + if not self.flag_worker_running and repo_id_ctime_dict and time.time() - self.last_stats_time > self.interval: + Thread(target=self.stats_dtable_asset_storage, args=(repo_id_ctime_dict,), daemon=True).start() + self.last_stats_time = time.time() + repo_id_ctime_dict = {} + self.flag_worker_running = True + msg = seafile_api.pop_event('seaf_server.event') + if not msg: + time.sleep(0.5) + continue + logger.debug('msg: %s', msg) + if not isinstance(msg, dict): + continue + content = msg.get('content') + if not isinstance(content, str) or '\t' not in content: + continue + if not content.startswith('repo-update'): + continue + ctime = msg.get('ctime') + if not isinstance(ctime, int) or ctime < time.time() - 30 * 60: # ignore messages half hour ago + continue + repo_id = content.split()[1] + if not is_valid_uuid(repo_id): + continue + if repo_id in repo_id_ctime_dict: + continue + repo_id_ctime_dict[repo_id] = ctime + + def _stats_dtable_asset_storage(self, repo_id_ctime_dict): + logger.info('Starting stats repo dtable asset storage...') + dtable_uuid_sizes = [] + for repo_id, ctime in repo_id_ctime_dict.items(): + logger.debug('start stats repo: %s ctime: %s', repo_id, ctime) + try: + repo = seafile_api.get_repo(repo_id) + if not repo: + continue + asset_dirent = seafile_api.get_dirent_by_path(repo_id, '/asset') + if not asset_dirent or asset_dirent.mtime < ctime: + continue + dirents = seafile_api.list_dir_by_path(repo_id, '/asset', offset=-1, limit=-1) + for dirent in dirents: + if not stat.S_ISDIR(dirent.mode): + continue + if not is_valid_uuid(dirent.obj_name): + continue + logger.debug('start stats repo: %s dirent: %s', repo_id, dirent.obj_name) + if dirent.mtime >= ctime: + dtable_uuid = dirent.obj_name + size = seafile_api.get_file_count_info_by_path(repo_id, f'/asset/{dtable_uuid}').size + logger.debug('start stats repo: %s dirent: %s size: %s', repo_id, dirent.obj_name, size) + dtable_uuid_sizes.append([uuid_str_to_32_chars(dtable_uuid), size]) + except Exception as e: + logger.exception(e) + logger.error('stats repo: %s error: %s', repo_id, e) + if not dtable_uuid_sizes: + return + logger.debug('totally need to update dtable: %s', len(dtable_uuid_sizes)) + db_session = self._db_session_class() + try: + update_dtable_asset_sizes(dtable_uuid_sizes, db_session) + except Exception as e: + logger.exception(e) + logger.error('update dtable asset sizes error: %s', e) + finally: + db_session.close() + + def stats_dtable_asset_storage(self, repo_id_ctime_dict): + try: + self._stats_dtable_asset_storage(repo_id_ctime_dict) + except Exception as e: + logger.exception(e) + finally: + self.flag_worker_running = False