Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dtable asset stats #452

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion dtable_events/app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
from dtable_events.tasks.dtable_updates_sender import DTableUpdatesSender
from dtable_events.tasks.dtable_real_time_rows_counter import DTableRealTimeRowsCounter
from dtable_events.tasks.ldap_syncer import LDAPSyncer
from dtable_events.tasks.big_data_storage_stats_worker import BigDataStorageStatsWorker
from dtable_events.tasks.dtable_asset_stats_worker import DTableAssetStatsWorker
from dtable_events.notification_rules.handler import NotificationRuleHandler
from dtable_events.notification_rules.dtable_notification_rules_scanner import DTableNofiticationRulesScanner
from dtable_events.automations.handler import AutomationRuleHandler
from dtable_events.automations.dtable_automation_rules_scanner import DTableAutomationRulesScanner
from dtable_events.webhook.webhook import Webhooker
from dtable_events.common_dataset.common_dataset_syncer import CommonDatasetSyncer
from dtable_events.tasks.big_data_storage_stats_worker import BigDataStorageStatsWorker
from dtable_events.data_sync.data_syncer import DataSyncer
from dtable_events.workflow.workflow_actions import WorkflowActionsHandler
from dtable_events.workflow.workflow_schedules_scanner import WorkflowSchedulesScanner
Expand All @@ -37,6 +38,8 @@ def __init__(self, config, task_mode):
self._dtable_real_time_rows_counter = DTableRealTimeRowsCounter(config)
self._workflow_actions_handler = WorkflowActionsHandler(config)
self._webhooker = Webhooker(config)
# seaf events
self._dtable_asset_stat_worker = DTableAssetStatsWorker(config)
# cron jobs
self._instant_notices_sender = InstantNoticeSender(config)
self._email_notices_sender = EmailNoticesSender(config)
Expand All @@ -63,6 +66,8 @@ def serve_forever(self):
self._dtable_real_time_rows_counter.start() # default True
self._workflow_actions_handler.start() # always True
self._webhooker.start() # always True
# seaf events
self._dtable_asset_stat_worker.start() # always True
# cron jobs
self._instant_notices_sender.start() # default True
self._email_notices_sender.start() # default True
Expand Down
36 changes: 35 additions & 1 deletion dtable_events/dtable_io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from selenium.webdriver.support.ui import WebDriverWait
from datetime import datetime

from seaserv import seafile_api

from dtable_events.app.config import DTABLE_WEB_SERVICE_URL, SESSION_COOKIE_NAME, INNER_DTABLE_DB_URL
from dtable_events.dtable_io.big_data import import_excel_to_db, update_excel_to_db, export_big_data_to_excel
from dtable_events.dtable_io.utils import setup_logger, \
Expand All @@ -34,7 +36,7 @@
from dtable_events.dtable_io.task_manager import task_manager
from dtable_events.statistics.db import save_email_sending_records, batch_save_email_sending_records
from dtable_events.data_sync.data_sync_utils import run_sync_emails
from dtable_events.utils import get_inner_dtable_server_url
from dtable_events.utils import get_inner_dtable_server_url, uuid_str_to_32_chars, uuid_str_to_36_chars
from dtable_events.utils.dtable_server_api import DTableServerAPI

dtable_io_logger = setup_logger('dtable_events_io.log')
Expand Down Expand Up @@ -1087,3 +1089,35 @@ def convert_big_data_view_to_execl(dtable_uuid, table_id, view_id, username, nam
dtable_io_logger.info('export big data table_id: %s, view_id: %s success!', table_id, view_id)


def calc_dtable_asset_stats(repo_id_dtable_uuids_dict, config):
from dtable_events.tasks.dtable_asset_stats_worker import update_dtable_asset_sizes

try:
db_session = init_db_session_class(config)()
except Exception as e:
dtable_io_logger.error('create db session failed. ERROR: {}'.format(e))
raise Exception('create db session failed. ERROR: {}'.format(e))
dtable_uuid_sizes = []
for repo_id, dtable_uuids in repo_id_dtable_uuids_dict.items():
try:
repo = seafile_api.get_repo(repo_id)
if not repo:
continue
for dtable_uuid in dtable_uuids:
asset_path = f'/asset/{uuid_str_to_36_chars(dtable_uuid)}'
asset_dir_id = seafile_api.get_dir_id_by_path(repo_id, asset_path)
if not asset_dir_id:
dtable_uuid_sizes.append([uuid_str_to_32_chars(dtable_uuid), 0])
continue
size = seafile_api.get_file_count_info_by_path(repo_id, asset_path).size
dtable_uuid_sizes.append([uuid_str_to_32_chars(dtable_uuid), size])
except Exception as e:
dtable_io_logger.exception(e)
dtable_io_logger.error('repo_id: %s dtable_uuids: %s stats asset error: %s', repo_id, dtable_uuids, e)
try:
update_dtable_asset_sizes(dtable_uuid_sizes, db_session)
except Exception as e:
dtable_io_logger.exception(e)
dtable_io_logger.error('update dtable asset sizes error: %s', e)
finally:
db_session.close()
22 changes: 22 additions & 0 deletions dtable_events/dtable_io/request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1155,3 +1155,25 @@ def query_plugin_email_send_status():

resp = dict(is_finished=is_finished)
return make_response((resp, 200))


@app.route('/add-calc-dtable-asset-stats', methods=['POST'])
def add_calc_dtable_asset_stats():
is_valid, error = check_auth_token(request)
if not is_valid:
return make_response((error, 403))
try:
data = json.loads(request.data)
except Exception as e:
logger.warning('add_calc_dtable_asset_stats parse json body error: %s', e)
return make_response(('request invalid, error: %s' % e, 400))
repo_id_dtable_uuids_dict = data.get('repo_id_dtable_uuids_dict')
if not repo_id_dtable_uuids_dict or not isinstance(repo_id_dtable_uuids_dict, dict):
return make_response(('repo_id_dtable_uuids_dict invalid', 400))
try:
task_id = task_manager.add_calc_dtable_asset_task(repo_id_dtable_uuids_dict)
except Exception as e:
logger.error('add_calc_dtable_asset_stats error: %s', e)
return make_response((e, 500))

return make_response(({'task_id': task_id}, 200))
9 changes: 9 additions & 0 deletions dtable_events/dtable_io/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,15 @@ def add_app_users_sync_task(self, dtable_uuid, app_name, app_id, table_name, tab

return task_id

def add_calc_dtable_asset_task(self, repo_id_dtable_uuids_dict):
from dtable_events.dtable_io import calc_dtable_asset_stats
task_id = str(uuid.uuid4())
task = (calc_dtable_asset_stats, (repo_id_dtable_uuids_dict, self.config))
self.tasks_queue.put(task_id)
self.tasks_map[task_id] = task

return task_id

def threads_is_alive(self):
info = {}
for t in self.threads:
Expand Down
129 changes: 129 additions & 0 deletions dtable_events/tasks/dtable_asset_stats_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# -*- coding: utf-8 -*-
import logging
import time
import json
import stat
import uuid
from datetime import datetime
from threading import Thread, Event

from seaserv import seafile_api

from dtable_events.db import init_db_session_class
from dtable_events.utils import uuid_str_to_36_chars, uuid_str_to_32_chars

logger = logging.getLogger(__name__)


def is_valid_uuid(test_str):
try:
uuid.UUID(test_str)
return True
except:
return False


def update_dtable_asset_sizes(dtable_uuid_sizes, db_session):
"""
:param dtable_uuid_sizes: a list of [dtable_uuid, size]
"""
step = 1000
updated_at = datetime.utcnow()
for i in range(0, len(dtable_uuid_sizes), step):
updates = ', '.join(["('%s', %s, '%s')" % (
uuid_str_to_32_chars(dtable_uuid_size[0]), dtable_uuid_size[1], updated_at
) for dtable_uuid_size in dtable_uuid_sizes[i: i+step]])
sql = '''
INSERT INTO dtable_asset_stats(dtable_uuid, size, updated_at) VALUES %s
ON DUPLICATE KEY UPDATE size=VALUES(size), updated_at=VALUES(updated_at)
''' % updates
db_session.execute(sql)
db_session.commit()


class DTableAssetStatsWorker(Thread):
def __init__(self, config):
Thread.__init__(self)
self._finished = Event()
self._db_session_class = init_db_session_class(config)
self.interval = 5 * 60 # listen to seafile event for some time and then calc dtable asset storage
self.last_stats_time = time.time()
self.flag_worker_running = False

def run(self):
logger.info('Starting handle dtable asset stats...')
repo_id_ctime_dict = {}
while not self._finished.is_set():
if not self.flag_worker_running and repo_id_ctime_dict and time.time() - self.last_stats_time > self.interval:
Thread(target=self.stats_dtable_asset_storage, args=(repo_id_ctime_dict,), daemon=True).start()
self.last_stats_time = time.time()
repo_id_ctime_dict = {}
self.flag_worker_running = True
msg = seafile_api.pop_event('seaf_server.event')
if not msg:
time.sleep(0.5)
continue
logger.debug('msg: %s', msg)
if not isinstance(msg, dict):
continue
content = msg.get('content')
if not isinstance(content, str) or '\t' not in content:
continue
if not content.startswith('repo-update'):
continue
ctime = msg.get('ctime')
if not isinstance(ctime, int) or ctime < time.time() - 30 * 60: # ignore messages half hour ago
continue
repo_id = content.split()[1]
if not is_valid_uuid(repo_id):
continue
if repo_id in repo_id_ctime_dict:
continue
repo_id_ctime_dict[repo_id] = ctime

def _stats_dtable_asset_storage(self, repo_id_ctime_dict):
logger.info('Starting stats repo dtable asset storage...')
dtable_uuid_sizes = []
for repo_id, ctime in repo_id_ctime_dict.items():
logger.debug('start stats repo: %s ctime: %s', repo_id, ctime)
try:
repo = seafile_api.get_repo(repo_id)
if not repo:
continue
asset_dirent = seafile_api.get_dirent_by_path(repo_id, '/asset')
if not asset_dirent or asset_dirent.mtime < ctime:
continue
dirents = seafile_api.list_dir_by_path(repo_id, '/asset', offset=-1, limit=-1)
for dirent in dirents:
if not stat.S_ISDIR(dirent.mode):
continue
if not is_valid_uuid(dirent.obj_name):
continue
logger.debug('start stats repo: %s dirent: %s', repo_id, dirent.obj_name)
if dirent.mtime >= ctime:
dtable_uuid = dirent.obj_name
size = seafile_api.get_file_count_info_by_path(repo_id, f'/asset/{dtable_uuid}').size
logger.debug('start stats repo: %s dirent: %s size: %s', repo_id, dirent.obj_name, size)
dtable_uuid_sizes.append([uuid_str_to_32_chars(dtable_uuid), size])
except Exception as e:
logger.exception(e)
logger.error('stats repo: %s error: %s', repo_id, e)
if not dtable_uuid_sizes:
return
logger.debug('totally need to update dtable: %s', len(dtable_uuid_sizes))
db_session = self._db_session_class()
try:
update_dtable_asset_sizes(dtable_uuid_sizes, db_session)
except Exception as e:
logger.exception(e)
logger.error('update dtable asset sizes error: %s', e)
finally:
db_session.close()

def stats_dtable_asset_storage(self, repo_id_ctime_dict):
try:
self._stats_dtable_asset_storage(repo_id_ctime_dict)
except Exception as e:
logger.exception(e)
finally:
self.flag_worker_running = False