From 4561d435c7f40308724d17216954e3fba40617ef Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Thu, 16 Mar 2023 18:30:53 +0800 Subject: [PATCH 1/5] add dtable asset stats worker --- dtable_events/app/app.py | 7 +- .../tasks/dtable_asset_stats_worker.py | 162 ++++++++++++++++++ 2 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 dtable_events/tasks/dtable_asset_stats_worker.py diff --git a/dtable_events/app/app.py b/dtable_events/app/app.py index b77d61e5..0a26e291 100644 --- a/dtable_events/app/app.py +++ b/dtable_events/app/app.py @@ -8,13 +8,14 @@ from dtable_events.tasks.dtable_updates_sender import DTableUpdatesSender from dtable_events.tasks.dtable_real_time_rows_counter import DTableRealTimeRowsCounter from dtable_events.tasks.ldap_syncer import LDAPSyncer +from dtable_events.tasks.big_data_storage_stats_worker import BigDataStorageStatsWorker +from dtable_events.tasks.dtable_asset_stats_worker import DTableAssetStatsWorker from dtable_events.notification_rules.handler import NotificationRuleHandler from dtable_events.notification_rules.dtable_notification_rules_scanner import DTableNofiticationRulesScanner from dtable_events.automations.handler import AutomationRuleHandler from dtable_events.automations.dtable_automation_rules_scanner import DTableAutomationRulesScanner from dtable_events.webhook.webhook import Webhooker from dtable_events.common_dataset.common_dataset_syncer import CommonDatasetSyncer -from dtable_events.tasks.big_data_storage_stats_worker import BigDataStorageStatsWorker from dtable_events.data_sync.data_syncer import DataSyncer from dtable_events.workflow.workflow_actions import WorkflowActionsHandler from dtable_events.workflow.workflow_schedules_scanner import WorkflowSchedulesScanner @@ -37,6 +38,8 @@ def __init__(self, config, task_mode): self._dtable_real_time_rows_counter = DTableRealTimeRowsCounter(config) self._workflow_actions_handler = WorkflowActionsHandler(config) self._webhooker = Webhooker(config) + # seaf events + self._dtable_asset_stat_worker = DTableAssetStatsWorker(config) # cron jobs self._instant_notices_sender = InstantNoticeSender(config) self._email_notices_sender = EmailNoticesSender(config) @@ -63,6 +66,8 @@ def serve_forever(self): self._dtable_real_time_rows_counter.start() # default True self._workflow_actions_handler.start() # always True self._webhooker.start() # always True + # seaf events + self._dtable_asset_stat_worker.start() # always True # cron jobs self._instant_notices_sender.start() # default True self._email_notices_sender.start() # default True diff --git a/dtable_events/tasks/dtable_asset_stats_worker.py b/dtable_events/tasks/dtable_asset_stats_worker.py new file mode 100644 index 00000000..7748996e --- /dev/null +++ b/dtable_events/tasks/dtable_asset_stats_worker.py @@ -0,0 +1,162 @@ +# -*- coding: utf-8 -*- +import logging +import time +import json +import stat +import uuid +from datetime import datetime +from threading import Thread, Event + +from seaserv import seafile_api + +from dtable_events.app.event_redis import RedisClient +from dtable_events.db import init_db_session_class +from dtable_events.utils import uuid_str_to_36_chars, uuid_str_to_32_chars + +logger = logging.getLogger(__name__) + + +def is_valid_uuid(test_str): + try: + uuid.UUID(test_str) + return True + except: + return False + + +def update_dtable_asset_sizes(dtable_uuid_sizes, db_session): + """ + :param dtable_uuid_sizes: a list of [dtable_uuid, size] + """ + step = 1000 + updated_at = datetime.utcnow() + for i in range(0, len(dtable_uuid_sizes), step): + updates = ', '.join(["('%s', %s, '%s')" % tuple(dtable_uuid_size + [updated_at]) for dtable_uuid_size in dtable_uuid_sizes[i: i+step]]) + sql = ''' + INSERT INTO dtable_asset_stats(dtable_uuid, size, updated_at) VALUES %s + ON DUPLICATE KEY UPDATE size=VALUES(size), updated_at=VALUES(updated_at) + ''' % updates + try: + db_session.execute(sql) + db_session.commit() + except Exception as e: + logger.error('update dtable asset assets error: %s', e) + + +class DTableAssetStatsWorker(Thread): + def __init__(self, config): + Thread.__init__(self) + self._finished = Event() + self._db_session_class = init_db_session_class(config) + self.interval = 5 * 60 # listen to seafile event for 5 mins and then calc dtable asset storage + self.last_stat_time = time.time() + self._redis_client = RedisClient(config) + + def run(self): + Thread(target=self.listen_seaf_events_and_update, daemon=True).start() + Thread(target=self.listen_redis_and_update, daemon=True).start() + + def listen_seaf_events_and_update(self): + logger.info('Starting handle dtable asset stats...') + repo_id_ctime_dict = {} + while not self._finished.is_set(): + if repo_id_ctime_dict and time.time() - self.last_stat_time > self.interval: + Thread(target=self.stat_dtable_asset_storage, args=(repo_id_ctime_dict,), daemon=True).start() + self.last_stat_time = time.time() + repo_id_ctime_dict = {} + msg = seafile_api.pop_event('seaf_server.event') + if not msg: + time.sleep(0.5) + continue + logger.debug('msg: %s', msg) + if not isinstance(msg, dict): + continue + content = msg.get('content') + if not isinstance(content, str) or '\t' not in content: + continue + ctime = msg.get('ctime') + if not isinstance(ctime, int) or ctime < time.time() - 30 * 60: # ignore messages half hour ago + continue + repo_id = content.split()[1] + if not is_valid_uuid(repo_id): + continue + if repo_id in repo_id_ctime_dict: + continue + repo_id_ctime_dict[repo_id] = ctime + + def stat_dtable_asset_storage(self, repo_id_ctime_dict): + dtable_uuid_sizes = [] + for repo_id, ctime in repo_id_ctime_dict.items(): + logger.debug('start stat repo: %s ctime: %s', repo_id, ctime) + asset_dir_id = seafile_api.get_dir_id_by_path(repo_id, '/asset') + if not asset_dir_id: + continue + try: + dirents = seafile_api.list_dir_by_path(repo_id, '/asset', offset=-1, limit=-1) + except Exception as e: + logger.error('repo: %s, get dirents error: %s', repo_id, e) + continue + for dirent in dirents: + if not stat.S_ISDIR(dirent.mode): + continue + if not is_valid_uuid(dirent.obj_name): + continue + logger.debug('start stat repo: %s dirent: %s', repo_id, dirent.obj_name) + if dirent.mtime > ctime - 5: + dtable_uuid = dirent.obj_name + try: + size = seafile_api.get_file_count_info_by_path(repo_id, f'/asset/{dtable_uuid}').size + logger.debug('start stat repo: %s dirent: %s size: %s', repo_id, dirent.obj_name, size) + except Exception as e: + logger.error('get dtable: %s asset error: %s', dtable_uuid, e) + continue + dtable_uuid_sizes.append([uuid_str_to_32_chars(dtable_uuid), size]) + if not dtable_uuid_sizes: + return + logger.debug('totally need to update dtable: %s', len(dtable_uuid_sizes)) + db_session = self._db_session_class() + try: + update_dtable_asset_sizes(dtable_uuid_sizes) + except Exception as e: + logger.exception(e) + logger.error('update dtable asset sizes error: %s', e) + finally: + db_session.close() + + def listen_redis_and_update(self): + logger.info('Starting handle table rows count...') + subscriber = self._redis_client.get_subscriber('stat-asset') + while not self._finished.is_set(): + try: + message = subscriber.get_message() + if message is not None: + dtable_uuid_repo_ids = json.loads(message['data']) + session = self._db_session_class() + try: + self.stat_dtable_uuids(dtable_uuid_repo_ids, session) + except Exception as e: + logger.error('Handle table rows count: %s' % e) + finally: + session.close() + else: + time.sleep(0.5) + except Exception as e: + logger.error('Failed get message from redis: %s' % e) + subscriber = self._redis_client.get_subscriber('count-rows') + + def stat_dtable_uuids(self, dtable_uuid_repo_ids, db_session): + dtable_uuid_sizes = [] + for dtable_uuid, repo_id in dtable_uuid_repo_ids: + try: + asset_path = f'/asset/{uuid_str_to_36_chars(dtable_uuid)}' + asset_dir_id = seafile_api.get_dir_id_by_path(repo_id, asset_path) + if not asset_dir_id: + dtable_uuid_sizes.append([uuid_str_to_32_chars(dtable_uuid), 0]) + size = seafile_api.get_file_count_info_by_path(repo_id, asset_path).size + dtable_uuid_sizes.append([uuid_str_to_32_chars(dtable_uuid), size]) + logger.debug('redis repo: %s dtable_uuid: %s size: %s', repo_id, dtable_uuid, size) + except Exception as e: + logger.exception(e) + logger.error('check repo: %s dtable: %s asset size error: %s', repo_id, dtable_uuid, e) + logger.debug('redis totally need to update dtable: %s', len(dtable_uuid_sizes)) + update_dtable_asset_sizes(dtable_uuid_sizes, db_session) From 4ed4946caaf7904f87b9bd8e826c9ec294fa85b7 Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Thu, 16 Mar 2023 18:31:47 +0800 Subject: [PATCH 2/5] opt code --- dtable_events/tasks/dtable_asset_stats_worker.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/dtable_events/tasks/dtable_asset_stats_worker.py b/dtable_events/tasks/dtable_asset_stats_worker.py index 7748996e..d3755176 100644 --- a/dtable_events/tasks/dtable_asset_stats_worker.py +++ b/dtable_events/tasks/dtable_asset_stats_worker.py @@ -49,7 +49,7 @@ def __init__(self, config): self._finished = Event() self._db_session_class = init_db_session_class(config) self.interval = 5 * 60 # listen to seafile event for 5 mins and then calc dtable asset storage - self.last_stat_time = time.time() + self.last_stats_time = time.time() self._redis_client = RedisClient(config) def run(self): @@ -60,9 +60,9 @@ def listen_seaf_events_and_update(self): logger.info('Starting handle dtable asset stats...') repo_id_ctime_dict = {} while not self._finished.is_set(): - if repo_id_ctime_dict and time.time() - self.last_stat_time > self.interval: - Thread(target=self.stat_dtable_asset_storage, args=(repo_id_ctime_dict,), daemon=True).start() - self.last_stat_time = time.time() + if repo_id_ctime_dict and time.time() - self.last_stats_time > self.interval: + Thread(target=self.stats_dtable_asset_storage, args=(repo_id_ctime_dict,), daemon=True).start() + self.last_stats_time = time.time() repo_id_ctime_dict = {} msg = seafile_api.pop_event('seaf_server.event') if not msg: @@ -84,7 +84,7 @@ def listen_seaf_events_and_update(self): continue repo_id_ctime_dict[repo_id] = ctime - def stat_dtable_asset_storage(self, repo_id_ctime_dict): + def stats_dtable_asset_storage(self, repo_id_ctime_dict): dtable_uuid_sizes = [] for repo_id, ctime in repo_id_ctime_dict.items(): logger.debug('start stat repo: %s ctime: %s', repo_id, ctime) @@ -133,7 +133,7 @@ def listen_redis_and_update(self): dtable_uuid_repo_ids = json.loads(message['data']) session = self._db_session_class() try: - self.stat_dtable_uuids(dtable_uuid_repo_ids, session) + self.stats_dtable_uuids(dtable_uuid_repo_ids, session) except Exception as e: logger.error('Handle table rows count: %s' % e) finally: @@ -144,7 +144,7 @@ def listen_redis_and_update(self): logger.error('Failed get message from redis: %s' % e) subscriber = self._redis_client.get_subscriber('count-rows') - def stat_dtable_uuids(self, dtable_uuid_repo_ids, db_session): + def stats_dtable_uuids(self, dtable_uuid_repo_ids, db_session): dtable_uuid_sizes = [] for dtable_uuid, repo_id in dtable_uuid_repo_ids: try: From 1cc234bb70a05779ca3c173dcb6606eaa224130e Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Fri, 17 Mar 2023 18:20:16 +0800 Subject: [PATCH 3/5] add calc dtable asset stats task --- dtable_events/dtable_io/__init__.py | 32 +++++++++++++- dtable_events/dtable_io/request_handler.py | 22 ++++++++++ dtable_events/dtable_io/task_manager.py | 9 ++++ .../tasks/dtable_asset_stats_worker.py | 43 ++++++++----------- 4 files changed, 81 insertions(+), 25 deletions(-) diff --git a/dtable_events/dtable_io/__init__.py b/dtable_events/dtable_io/__init__.py index e940198f..e9464ca5 100644 --- a/dtable_events/dtable_io/__init__.py +++ b/dtable_events/dtable_io/__init__.py @@ -16,6 +16,8 @@ from selenium.webdriver.support.ui import WebDriverWait from datetime import datetime +from seaserv import seafile_api + from dtable_events.app.config import DTABLE_WEB_SERVICE_URL, SESSION_COOKIE_NAME, INNER_DTABLE_DB_URL from dtable_events.dtable_io.big_data import import_excel_to_db, update_excel_to_db, export_big_data_to_excel from dtable_events.dtable_io.utils import setup_logger, \ @@ -34,7 +36,7 @@ from dtable_events.dtable_io.task_manager import task_manager from dtable_events.statistics.db import save_email_sending_records, batch_save_email_sending_records from dtable_events.data_sync.data_sync_utils import run_sync_emails -from dtable_events.utils import get_inner_dtable_server_url +from dtable_events.utils import get_inner_dtable_server_url, uuid_str_to_36_chars from dtable_events.utils.dtable_server_api import DTableServerAPI dtable_io_logger = setup_logger('dtable_events_io.log') @@ -1087,3 +1089,31 @@ def convert_big_data_view_to_execl(dtable_uuid, table_id, view_id, username, nam dtable_io_logger.info('export big data table_id: %s, view_id: %s success!', table_id, view_id) +def calc_dtable_asset_stats(repo_id_dtable_uuids_dict, config): + from dtable_events.tasks.dtable_asset_stats_worker import update_dtable_asset_sizes + + try: + db_session = init_db_session_class(config)() + except Exception as e: + db_session = None + dtable_io_logger.error('create db session failed. ERROR: {}'.format(e)) + raise Exception('create db session failed. ERROR: {}'.format(e)) + dtable_uuid_sizes = [] + for repo_id, dtable_uuids in repo_id_dtable_uuids_dict.items(): + try: + repo = seafile_api.get_repo(repo_id) + if not repo: + continue + for dtable_uuid in dtable_uuids: + asset_path = f'/asset/{uuid_str_to_36_chars(dtable_uuid)}' + asset_dir_id = seafile_api.get_dir_id_by_path(repo_id, asset_path) + if not asset_dir_id: + dtable_uuid_sizes.append([dtable_uuid, 0]) + continue + size = seafile_api.get_file_count_info_by_path(repo_id, asset_path).size + dtable_uuid_sizes.append([dtable_uuid, size]) + except Exception as e: + dtable_io_logger.exception(e) + dtable_io_logger.error('repo_id: %s dtable_uuids: %s stats asset error: %s', repo_id, dtable_uuids, e) + update_dtable_asset_sizes(dtable_uuid_sizes, db_session) + db_session.close() diff --git a/dtable_events/dtable_io/request_handler.py b/dtable_events/dtable_io/request_handler.py index 9bbe4722..610e7216 100644 --- a/dtable_events/dtable_io/request_handler.py +++ b/dtable_events/dtable_io/request_handler.py @@ -1155,3 +1155,25 @@ def query_plugin_email_send_status(): resp = dict(is_finished=is_finished) return make_response((resp, 200)) + + +@app.route('/add-calc-dtable-asset-stats', methods=['POST']) +def add_calc_dtable_asset_stats(): + is_valid, error = check_auth_token(request) + if not is_valid: + return make_response((error, 403)) + try: + data = json.loads(request.data) + except Exception as e: + logger.warning('add_calc_dtable_asset_stats parse json body error: %s', e) + return make_response(('request invalid, error: %s' % e, 400)) + repo_id_dtable_uuids_dict = data.get('repo_id_dtable_uuids_dict') + if not repo_id_dtable_uuids_dict or not isinstance(repo_id_dtable_uuids_dict, dict): + return make_response(('repo_id_dtable_uuids_dict invalid', 400)) + try: + task_id = task_manager.add_calc_dtable_asset_task(repo_id_dtable_uuids_dict) + except Exception as e: + logger.error('add_calc_dtable_asset_stats error: %s', e) + return make_response((e, 500)) + + return make_response(({'task_id': task_id}, 200)) diff --git a/dtable_events/dtable_io/task_manager.py b/dtable_events/dtable_io/task_manager.py index 7711bd7c..f6fe4e01 100644 --- a/dtable_events/dtable_io/task_manager.py +++ b/dtable_events/dtable_io/task_manager.py @@ -310,6 +310,15 @@ def add_app_users_sync_task(self, dtable_uuid, app_name, app_id, table_name, tab return task_id + def add_calc_dtable_asset_task(self, repo_id_dtable_uuids_dict): + from dtable_events.dtable_io import calc_dtable_asset_stats + task_id = str(uuid.uuid4()) + task = (calc_dtable_asset_stats, (repo_id_dtable_uuids_dict, self.config)) + self.tasks_queue.put(task_id) + self.tasks_map[task_id] = task + + return task_id + def threads_is_alive(self): info = {} for t in self.threads: diff --git a/dtable_events/tasks/dtable_asset_stats_worker.py b/dtable_events/tasks/dtable_asset_stats_worker.py index d3755176..d8547b14 100644 --- a/dtable_events/tasks/dtable_asset_stats_worker.py +++ b/dtable_events/tasks/dtable_asset_stats_worker.py @@ -53,10 +53,6 @@ def __init__(self, config): self._redis_client = RedisClient(config) def run(self): - Thread(target=self.listen_seaf_events_and_update, daemon=True).start() - Thread(target=self.listen_redis_and_update, daemon=True).start() - - def listen_seaf_events_and_update(self): logger.info('Starting handle dtable asset stats...') repo_id_ctime_dict = {} while not self._finished.is_set(): @@ -87,30 +83,29 @@ def listen_seaf_events_and_update(self): def stats_dtable_asset_storage(self, repo_id_ctime_dict): dtable_uuid_sizes = [] for repo_id, ctime in repo_id_ctime_dict.items(): - logger.debug('start stat repo: %s ctime: %s', repo_id, ctime) - asset_dir_id = seafile_api.get_dir_id_by_path(repo_id, '/asset') - if not asset_dir_id: - continue + logger.debug('start stats repo: %s ctime: %s', repo_id, ctime) try: - dirents = seafile_api.list_dir_by_path(repo_id, '/asset', offset=-1, limit=-1) - except Exception as e: - logger.error('repo: %s, get dirents error: %s', repo_id, e) - continue - for dirent in dirents: - if not stat.S_ISDIR(dirent.mode): + repo = seafile_api.get_repo(repo_id) + if not repo: continue - if not is_valid_uuid(dirent.obj_name): + asset_dir_id = seafile_api.get_dir_id_by_path(repo_id, '/asset') + if not asset_dir_id: continue - logger.debug('start stat repo: %s dirent: %s', repo_id, dirent.obj_name) - if dirent.mtime > ctime - 5: - dtable_uuid = dirent.obj_name - try: - size = seafile_api.get_file_count_info_by_path(repo_id, f'/asset/{dtable_uuid}').size - logger.debug('start stat repo: %s dirent: %s size: %s', repo_id, dirent.obj_name, size) - except Exception as e: - logger.error('get dtable: %s asset error: %s', dtable_uuid, e) + dirents = seafile_api.list_dir_by_path(repo_id, '/asset', offset=-1, limit=-1) + for dirent in dirents: + if not stat.S_ISDIR(dirent.mode): + continue + if not is_valid_uuid(dirent.obj_name): continue - dtable_uuid_sizes.append([uuid_str_to_32_chars(dtable_uuid), size]) + logger.debug('start stats repo: %s dirent: %s', repo_id, dirent.obj_name) + if dirent.mtime > ctime - 5: + dtable_uuid = dirent.obj_name + size = seafile_api.get_file_count_info_by_path(repo_id, f'/asset/{dtable_uuid}').size + logger.debug('start stats repo: %s dirent: %s size: %s', repo_id, dirent.obj_name, size) + dtable_uuid_sizes.append([uuid_str_to_32_chars(dtable_uuid), size]) + except Exception as e: + logger.exception(e) + logger.error('stats repo: %s error: %s', repo_id, e) if not dtable_uuid_sizes: return logger.debug('totally need to update dtable: %s', len(dtable_uuid_sizes)) From b44ed1138569bd569c063b3df59a89c58bda7dbf Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Mon, 20 Mar 2023 16:50:50 +0800 Subject: [PATCH 4/5] opt stats --- dtable_events/dtable_io/__init__.py | 16 +++-- .../tasks/dtable_asset_stats_worker.py | 62 ++++--------------- 2 files changed, 23 insertions(+), 55 deletions(-) diff --git a/dtable_events/dtable_io/__init__.py b/dtable_events/dtable_io/__init__.py index e9464ca5..c1ba525b 100644 --- a/dtable_events/dtable_io/__init__.py +++ b/dtable_events/dtable_io/__init__.py @@ -36,7 +36,7 @@ from dtable_events.dtable_io.task_manager import task_manager from dtable_events.statistics.db import save_email_sending_records, batch_save_email_sending_records from dtable_events.data_sync.data_sync_utils import run_sync_emails -from dtable_events.utils import get_inner_dtable_server_url, uuid_str_to_36_chars +from dtable_events.utils import get_inner_dtable_server_url, uuid_str_to_32_chars, uuid_str_to_36_chars from dtable_events.utils.dtable_server_api import DTableServerAPI dtable_io_logger = setup_logger('dtable_events_io.log') @@ -1095,7 +1095,6 @@ def calc_dtable_asset_stats(repo_id_dtable_uuids_dict, config): try: db_session = init_db_session_class(config)() except Exception as e: - db_session = None dtable_io_logger.error('create db session failed. ERROR: {}'.format(e)) raise Exception('create db session failed. ERROR: {}'.format(e)) dtable_uuid_sizes = [] @@ -1108,12 +1107,17 @@ def calc_dtable_asset_stats(repo_id_dtable_uuids_dict, config): asset_path = f'/asset/{uuid_str_to_36_chars(dtable_uuid)}' asset_dir_id = seafile_api.get_dir_id_by_path(repo_id, asset_path) if not asset_dir_id: - dtable_uuid_sizes.append([dtable_uuid, 0]) + dtable_uuid_sizes.append([uuid_str_to_32_chars(dtable_uuid), 0]) continue size = seafile_api.get_file_count_info_by_path(repo_id, asset_path).size - dtable_uuid_sizes.append([dtable_uuid, size]) + dtable_uuid_sizes.append([uuid_str_to_32_chars(dtable_uuid), size]) except Exception as e: dtable_io_logger.exception(e) dtable_io_logger.error('repo_id: %s dtable_uuids: %s stats asset error: %s', repo_id, dtable_uuids, e) - update_dtable_asset_sizes(dtable_uuid_sizes, db_session) - db_session.close() + try: + update_dtable_asset_sizes(dtable_uuid_sizes, db_session) + except Exception as e: + dtable_io_logger.exception(e) + dtable_io_logger.error('update dtable asset sizes error: %s', e) + finally: + db_session.close() diff --git a/dtable_events/tasks/dtable_asset_stats_worker.py b/dtable_events/tasks/dtable_asset_stats_worker.py index d8547b14..0d995f9b 100644 --- a/dtable_events/tasks/dtable_asset_stats_worker.py +++ b/dtable_events/tasks/dtable_asset_stats_worker.py @@ -31,16 +31,15 @@ def update_dtable_asset_sizes(dtable_uuid_sizes, db_session): step = 1000 updated_at = datetime.utcnow() for i in range(0, len(dtable_uuid_sizes), step): - updates = ', '.join(["('%s', %s, '%s')" % tuple(dtable_uuid_size + [updated_at]) for dtable_uuid_size in dtable_uuid_sizes[i: i+step]]) + updates = ', '.join(["('%s', %s, '%s')" % ( + uuid_str_to_32_chars(dtable_uuid_size[0]), dtable_uuid_size[1], updated_at + ) for dtable_uuid_size in dtable_uuid_sizes[i: i+step]]) sql = ''' INSERT INTO dtable_asset_stats(dtable_uuid, size, updated_at) VALUES %s ON DUPLICATE KEY UPDATE size=VALUES(size), updated_at=VALUES(updated_at) ''' % updates - try: - db_session.execute(sql) - db_session.commit() - except Exception as e: - logger.error('update dtable asset assets error: %s', e) + db_session.execute(sql) + db_session.commit() class DTableAssetStatsWorker(Thread): @@ -48,7 +47,7 @@ def __init__(self, config): Thread.__init__(self) self._finished = Event() self._db_session_class = init_db_session_class(config) - self.interval = 5 * 60 # listen to seafile event for 5 mins and then calc dtable asset storage + self.interval = 5 * 60 # listen to seafile event for some time and then calc dtable asset storage self.last_stats_time = time.time() self._redis_client = RedisClient(config) @@ -70,6 +69,8 @@ def run(self): content = msg.get('content') if not isinstance(content, str) or '\t' not in content: continue + if not content.startswith('repo-update'): + continue ctime = msg.get('ctime') if not isinstance(ctime, int) or ctime < time.time() - 30 * 60: # ignore messages half hour ago continue @@ -81,6 +82,7 @@ def run(self): repo_id_ctime_dict[repo_id] = ctime def stats_dtable_asset_storage(self, repo_id_ctime_dict): + logger.info('Starting stats repo dtable asset storage...') dtable_uuid_sizes = [] for repo_id, ctime in repo_id_ctime_dict.items(): logger.debug('start stats repo: %s ctime: %s', repo_id, ctime) @@ -88,8 +90,8 @@ def stats_dtable_asset_storage(self, repo_id_ctime_dict): repo = seafile_api.get_repo(repo_id) if not repo: continue - asset_dir_id = seafile_api.get_dir_id_by_path(repo_id, '/asset') - if not asset_dir_id: + asset_dirent = seafile_api.get_dirent_by_path(repo_id, '/asset') + if not asset_dirent or asset_dirent.mtime < ctime: continue dirents = seafile_api.list_dir_by_path(repo_id, '/asset', offset=-1, limit=-1) for dirent in dirents: @@ -98,7 +100,7 @@ def stats_dtable_asset_storage(self, repo_id_ctime_dict): if not is_valid_uuid(dirent.obj_name): continue logger.debug('start stats repo: %s dirent: %s', repo_id, dirent.obj_name) - if dirent.mtime > ctime - 5: + if dirent.mtime >= ctime: dtable_uuid = dirent.obj_name size = seafile_api.get_file_count_info_by_path(repo_id, f'/asset/{dtable_uuid}').size logger.debug('start stats repo: %s dirent: %s size: %s', repo_id, dirent.obj_name, size) @@ -111,47 +113,9 @@ def stats_dtable_asset_storage(self, repo_id_ctime_dict): logger.debug('totally need to update dtable: %s', len(dtable_uuid_sizes)) db_session = self._db_session_class() try: - update_dtable_asset_sizes(dtable_uuid_sizes) + update_dtable_asset_sizes(dtable_uuid_sizes, db_session) except Exception as e: logger.exception(e) logger.error('update dtable asset sizes error: %s', e) finally: db_session.close() - - def listen_redis_and_update(self): - logger.info('Starting handle table rows count...') - subscriber = self._redis_client.get_subscriber('stat-asset') - while not self._finished.is_set(): - try: - message = subscriber.get_message() - if message is not None: - dtable_uuid_repo_ids = json.loads(message['data']) - session = self._db_session_class() - try: - self.stats_dtable_uuids(dtable_uuid_repo_ids, session) - except Exception as e: - logger.error('Handle table rows count: %s' % e) - finally: - session.close() - else: - time.sleep(0.5) - except Exception as e: - logger.error('Failed get message from redis: %s' % e) - subscriber = self._redis_client.get_subscriber('count-rows') - - def stats_dtable_uuids(self, dtable_uuid_repo_ids, db_session): - dtable_uuid_sizes = [] - for dtable_uuid, repo_id in dtable_uuid_repo_ids: - try: - asset_path = f'/asset/{uuid_str_to_36_chars(dtable_uuid)}' - asset_dir_id = seafile_api.get_dir_id_by_path(repo_id, asset_path) - if not asset_dir_id: - dtable_uuid_sizes.append([uuid_str_to_32_chars(dtable_uuid), 0]) - size = seafile_api.get_file_count_info_by_path(repo_id, asset_path).size - dtable_uuid_sizes.append([uuid_str_to_32_chars(dtable_uuid), size]) - logger.debug('redis repo: %s dtable_uuid: %s size: %s', repo_id, dtable_uuid, size) - except Exception as e: - logger.exception(e) - logger.error('check repo: %s dtable: %s asset size error: %s', repo_id, dtable_uuid, e) - logger.debug('redis totally need to update dtable: %s', len(dtable_uuid_sizes)) - update_dtable_asset_sizes(dtable_uuid_sizes, db_session) From 6a243c57097fafd6641a2075782b01a205583b72 Mon Sep 17 00:00:00 2001 From: Alex Happy <1223408988@qq.com> Date: Sun, 9 Apr 2023 23:54:21 +0800 Subject: [PATCH 5/5] opt stats worker --- dtable_events/tasks/dtable_asset_stats_worker.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/dtable_events/tasks/dtable_asset_stats_worker.py b/dtable_events/tasks/dtable_asset_stats_worker.py index 0d995f9b..d9f34df1 100644 --- a/dtable_events/tasks/dtable_asset_stats_worker.py +++ b/dtable_events/tasks/dtable_asset_stats_worker.py @@ -9,7 +9,6 @@ from seaserv import seafile_api -from dtable_events.app.event_redis import RedisClient from dtable_events.db import init_db_session_class from dtable_events.utils import uuid_str_to_36_chars, uuid_str_to_32_chars @@ -49,16 +48,17 @@ def __init__(self, config): self._db_session_class = init_db_session_class(config) self.interval = 5 * 60 # listen to seafile event for some time and then calc dtable asset storage self.last_stats_time = time.time() - self._redis_client = RedisClient(config) + self.flag_worker_running = False def run(self): logger.info('Starting handle dtable asset stats...') repo_id_ctime_dict = {} while not self._finished.is_set(): - if repo_id_ctime_dict and time.time() - self.last_stats_time > self.interval: + if not self.flag_worker_running and repo_id_ctime_dict and time.time() - self.last_stats_time > self.interval: Thread(target=self.stats_dtable_asset_storage, args=(repo_id_ctime_dict,), daemon=True).start() self.last_stats_time = time.time() repo_id_ctime_dict = {} + self.flag_worker_running = True msg = seafile_api.pop_event('seaf_server.event') if not msg: time.sleep(0.5) @@ -81,7 +81,7 @@ def run(self): continue repo_id_ctime_dict[repo_id] = ctime - def stats_dtable_asset_storage(self, repo_id_ctime_dict): + def _stats_dtable_asset_storage(self, repo_id_ctime_dict): logger.info('Starting stats repo dtable asset storage...') dtable_uuid_sizes = [] for repo_id, ctime in repo_id_ctime_dict.items(): @@ -119,3 +119,11 @@ def stats_dtable_asset_storage(self, repo_id_ctime_dict): logger.error('update dtable asset sizes error: %s', e) finally: db_session.close() + + def stats_dtable_asset_storage(self, repo_id_ctime_dict): + try: + self._stats_dtable_asset_storage(repo_id_ctime_dict) + except Exception as e: + logger.exception(e) + finally: + self.flag_worker_running = False