From 657fa64a7e124c2a354ded03ab6737a40c569103 Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Sat, 2 Dec 2023 18:33:16 +0800 Subject: [PATCH 1/5] stats user/org trigger-auto-rule --- dtable_events/app/app.py | 3 + dtable_events/app/config.py | 8 + dtable_events/automations/actions.py | 24 ++- dtable_events/automations/auto_rules_utils.py | 76 ++++++-- .../dtable_automation_rules_scanner.py | 32 +++- .../dtable_automation_rules_stats_updater.py | 170 ++++++++++++++++++ dtable_events/utils/dtable_web_api.py | 10 ++ 7 files changed, 298 insertions(+), 25 deletions(-) create mode 100644 dtable_events/automations/dtable_automation_rules_stats_updater.py diff --git a/dtable_events/app/app.py b/dtable_events/app/app.py index 35a9209f..4cbb9e7a 100644 --- a/dtable_events/app/app.py +++ b/dtable_events/app/app.py @@ -14,6 +14,7 @@ from dtable_events.notification_rules.dtable_notification_rules_scanner import DTableNofiticationRulesScanner from dtable_events.automations.handler import AutomationRuleHandler from dtable_events.automations.dtable_automation_rules_scanner import DTableAutomationRulesScanner +from dtable_events.automations.dtable_automation_rules_stats_updater import DTableAutomationRulesStatsUpdater from dtable_events.webhook.webhook import Webhooker from dtable_events.common_dataset.common_dataset_syncer import CommonDatasetSyncer from dtable_events.tasks.big_data_storage_stats_worker import BigDataStorageStatsWorker @@ -47,6 +48,7 @@ def __init__(self, config, task_mode): self._dtable_updates_sender = DTableUpdatesSender(config) self._dtable_notification_rules_scanner = DTableNofiticationRulesScanner(config) self._dtable_automation_rules_scanner = DTableAutomationRulesScanner(config) + self._dtable_automation_rules_stats_updater = DTableAutomationRulesStatsUpdater(config) self._ldap_syncer = LDAPSyncer(config) self._common_dataset_syncer = CommonDatasetSyncer(config) self._big_data_storage_stats_worker = BigDataStorageStatsWorker(config) @@ -78,6 +80,7 @@ def serve_forever(self): self._dtable_updates_sender.start() # default True self._dtable_notification_rules_scanner.start() # default True self._dtable_automation_rules_scanner.start() # default True + self._dtable_automation_rules_stats_updater.start() # always True self._ldap_syncer.start() # default False self._common_dataset_syncer.start() # default True self._big_data_storage_stats_worker.start() # always True diff --git a/dtable_events/app/config.py b/dtable_events/app/config.py index 4a21ce93..deff0405 100644 --- a/dtable_events/app/config.py +++ b/dtable_events/app/config.py @@ -19,6 +19,7 @@ sys.path.insert(0, dtable_web_dir) try: + # settings in dtable-web import seahub.settings as seahub_settings DTABLE_WEB_SERVICE_URL = getattr(seahub_settings, 'DTABLE_WEB_SERVICE_URL', 'http://127.0.0.1') DTABLE_PRIVATE_KEY = getattr(seahub_settings, 'DTABLE_PRIVATE_KEY', '') @@ -45,6 +46,13 @@ TRASH_CLEAN_AFTER_DAYS = getattr(seahub_settings, 'TRASH_CLEAN_AFTER_DAYS', 30) LICENSE_PATH = getattr(seahub_settings, 'LICENSE_PATH', '/shared/seatable-license.txt') IS_PRO_VERSION = getattr(seahub_settings, 'IS_PRO_VERSION', False) + + # env + ENV_SEAFILE_CENTRAL_CONF_DIR = os.environ['SEAFILE_CENTRAL_CONF_DIR'] + if ENV_SEAFILE_CENTRAL_CONF_DIR: + ENV_CCNET_CONF_PATH = os.path.join(ENV_SEAFILE_CENTRAL_CONF_DIR, 'ccnet.conf') + elif 'CCNET_CONF_DIR' in os.environ: + ENV_CCNET_CONF_PATH = os.path.join(os.environ['CCNET_CONF_DIR'], 'ccnet.conf') except Exception as e: logger.critical("Can not import dtable_web settings: %s." % e) raise RuntimeError("Can not import dtable_web settings: %s" % e) diff --git a/dtable_events/automations/actions.py b/dtable_events/automations/actions.py index ff6d6bdb..1733f7eb 100644 --- a/dtable_events/automations/actions.py +++ b/dtable_events/automations/actions.py @@ -2950,7 +2950,7 @@ def __init__(self, data, db_session, raw_trigger, raw_actions, options, metadata self.metadata_cache_manager = metadata_cache_manager - self.cache_key = 'AUTOMATION_RULE:%s' % self.rule_id + self.cache_key = 'AUTOMATION_RULE:%s' % uuid_str_to_36_chars(self.dtable_uuid) self.task_run_success = True self.done_actions = False @@ -2958,7 +2958,7 @@ def __init__(self, data, db_session, raw_trigger, raw_actions, options, metadata self.current_valid = True - self.per_minute_trigger_limit = per_minute_trigger_limit or 50 + self.per_minute_trigger_limit = per_minute_trigger_limit or 1000 self.warnings = [] @@ -3441,7 +3441,6 @@ def update_last_trigger_time(self): trigger_count=trigger_count+1, update_at=:trigger_time ''' - set_statistic_sql_org = ''' INSERT INTO org_auto_rules_statistics (org_id, trigger_date, trigger_count, update_at) VALUES (:org_id, :trigger_date, 1, :trigger_time) @@ -3449,6 +3448,20 @@ def update_last_trigger_time(self): trigger_count=trigger_count+1, update_at=:trigger_time ''' + set_statistic_sql_user_per_month = ''' + INSERT INTO user_auto_rules_statistics_per_month(username, trigger_count, month, updated_at) VALUES + (:username, 1, :month, :trigger_time) + ON DUPLICATE KEY UPDATE + trigger_count=trigger_count+1, + updated_at=:trigger_time + ''' + set_statistic_sql_org_per_month = ''' + INSERT INTO org_auto_rules_statistics_per_month(org_id, trigger_count, month, updated_at) VALUES + (:org_id, 1, :month, :trigger_time) + ON DUPLICATE KEY UPDATE + trigger_count=trigger_count+1, + updated_at=:trigger_time + ''' set_last_trigger_time_sql = ''' UPDATE dtable_automation_rules SET last_trigger_time=:trigger_time, trigger_count=:trigger_count WHERE id=:rule_id; ''' @@ -3457,8 +3470,10 @@ def update_last_trigger_time(self): if self.org_id: if self.org_id == -1: sqls.append(set_statistic_sql_user) + sqls.append(set_statistic_sql_user_per_month) else: sqls.append(set_statistic_sql_org) + sqls.append(set_statistic_sql_org_per_month) cur_date = datetime.now().date() cur_year, cur_month = cur_date.year, cur_date.month @@ -3470,7 +3485,8 @@ def update_last_trigger_time(self): 'trigger_date': trigger_date, 'trigger_count': self.trigger_count + 1, 'username': self.creator, - 'org_id': self.org_id + 'org_id': self.org_id, + 'month': str(date.today())[:7] }) self.db_session.commit() except Exception as e: diff --git a/dtable_events/automations/auto_rules_utils.py b/dtable_events/automations/auto_rules_utils.py index a71acf19..38047b72 100644 --- a/dtable_events/automations/auto_rules_utils.py +++ b/dtable_events/automations/auto_rules_utils.py @@ -1,16 +1,50 @@ import logging +from datetime import date from dtable_events import init_db_session_class from dtable_events.app.metadata_cache_managers import RuleIntentMetadataCacheManger, RuleIntervalMetadataCacheManager from dtable_events.automations.actions import AutomationRule +from dtable_events.utils import uuid_str_to_32_chars logger = logging.getLogger(__name__) +def can_trgger_by_dtable(dtable_uuid, db_session): + sql = "SELECT w.owner, w.org_id FROM workspaces w JOIN dtables d ON w.id=d.workspace_id WHERE d.uuid=:dtable_uuid" + try: + workspace = db_session.execute(sql, {'dtable_uuid': uuid_str_to_32_chars(dtable_uuid)}).fetchone() + except Exception as e: + logger.error('check dtable: %s workspace error: %s', dtable_uuid, e) + return True + if not workspace: + logger.error('dtable: %s workspace not found', dtable_uuid) + return True + month = str(date.today())[:7] + if workspace.org_id == -1: + if '@seafile_group' in workspace.owner: # groups not belong to orgs can always trigger auto rules + return True + sql = "SELECT is_exceed FROM user_auto_rules_statistics_per_month WHERE username=:username AND month=:month" + try: + user_per_month = db_session.execute(sql, {'username': workspace.owner, 'month': month}).fetchone() + except Exception as e: + logger.error('check user: %s auto rule per month error: %s', workspace.owner, e) + return True + if not user_per_month: + return True + return not user_per_month.is_exceed + else: + sql = "SELECT is_exceed FROM org_auto_rules_statistics_per_month WHERE org_id=:org_id AND month=:month" + try: + org_per_month = db_session.execute(sql, {'org_id': workspace.org_id, 'month': month}).fetchone() + except Exception as e: + logger.error('check org: %s auto rule per month error: %s', workspace.org_id, e) + return True + if not org_per_month: + return True + return not org_per_month.is_exceed + + def scan_triggered_automation_rules(event_data, db_session, per_minute_trigger_limit): - # if event_data.get('op_user') == 'Automation Rule': - # # For preventing loop do automation actions, foribidden triggering actions!!! - # return dtable_uuid = event_data.get('dtable_uuid') automation_rule_id = event_data.get('automation_rule_id') sql = """ @@ -19,27 +53,31 @@ def scan_triggered_automation_rules(event_data, db_session, per_minute_trigger_l """ try: - rules = db_session.execute(sql, {'dtable_uuid': dtable_uuid, 'rule_id': automation_rule_id}).fetchall() + rule = db_session.execute(sql, {'dtable_uuid': dtable_uuid, 'rule_id': automation_rule_id}).fetchone() except Exception as e: logger.error('checkout auto rules error: %s', e) return + if not rule: + return + + if not can_trgger_by_dtable(dtable_uuid, db_session): + return rule_intent_metadata_cache_manager = RuleIntentMetadataCacheManger() - for rule_id, run_condition, trigger, actions, last_trigger_time, dtable_uuid, trigger_count, org_id, creator in rules: - options = { - 'rule_id': rule_id, - 'run_condition': run_condition, - 'dtable_uuid': dtable_uuid, - 'trigger_count': trigger_count, - 'org_id': org_id, - 'creator': creator, - 'last_trigger_time': last_trigger_time, - } - try: - auto_rule = AutomationRule(event_data, db_session, trigger, actions, options, rule_intent_metadata_cache_manager, per_minute_trigger_limit=per_minute_trigger_limit) - auto_rule.do_actions() - except Exception as e: - logger.error('auto rule: %s do actions error: %s', rule_id, e) + options = { + 'rule_id': rule.id, + 'run_condition': rule.run_condition, + 'dtable_uuid': dtable_uuid, + 'trigger_count': rule.trigger_count, + 'org_id': rule.org_id, + 'creator': rule.creator, + 'last_trigger_time': rule.last_trigger_time, + } + try: + auto_rule = AutomationRule(event_data, db_session, rule.trigger, rule.actions, options, rule_intent_metadata_cache_manager, per_minute_trigger_limit=per_minute_trigger_limit) + auto_rule.do_actions() + except Exception as e: + logger.error('auto rule: %s do actions error: %s', rule.id, e) def run_regular_execution_rule(rule, db_session, metadata_cache_manager): diff --git a/dtable_events/automations/dtable_automation_rules_scanner.py b/dtable_events/automations/dtable_automation_rules_scanner.py index 55f75b98..226fcf50 100644 --- a/dtable_events/automations/dtable_automation_rules_scanner.py +++ b/dtable_events/automations/dtable_automation_rules_scanner.py @@ -1,4 +1,5 @@ import logging +from collections import defaultdict from datetime import datetime, timedelta from threading import Thread @@ -63,16 +64,43 @@ def scan_dtable_automation_rules(db_session): per_day_check_time = datetime.utcnow() - timedelta(hours=23) per_week_check_time = datetime.utcnow() - timedelta(days=6) per_month_check_time = datetime.utcnow() - timedelta(days=27) # consider the least month-days 28 in February (the 2nd month) in common years - rules = db_session.execute(sql, { + rules = list(db_session.execute(sql, { 'per_day_check_time': per_day_check_time, 'per_week_check_time': per_week_check_time, 'per_month_check_time': per_month_check_time - }) + })) + + dtable_uuids = list({rule.dtable_uuid for rule in rules}) + step = 1000 + usernames = [] + org_ids = [] + owner_dtable_dict = defaultdict(list) # username/org_id: [dtable_uuid...] + exceed_dtable_uuids = set() + month = str(datetime.today())[:7] + for i in range(0, len(rules), step): + sql = "SELECT d.uuid, w.owner, w.org_id FROM dtables d JOIN workspaces w ON d.workspace_id=workspace.id WHERE d.uuid IN :dtable_uuids" + for dtable in db_session.execute(sql, {'dtable_uuids': dtable_uuids[i, i+step]}): + if dtable.org_id == -1 and '@seafile_group' not in dtable.owner: + owner_dtable_dict[dtable.owner].append(dtable.uuid) + usernames.append(dtable.owner) + elif dtable.org_id != -1: + owner_dtable_dict[dtable.org_id].append(dtable.uuid) + org_ids.append(dtable.org_id) + for i in range(0, len(usernames), step): + sql = "SELECT username FROM user_auto_rules_statistics_per_month WHERE username in :usernames WHERE is_exceed=1 AND month=:month" + for user_per_month in db_session.execute(sql, {'usernames': usernames[i: i+step], 'month': month}): + exceed_dtable_uuids += set(owner_dtable_dict[user_per_month.username]) + for i in range(0, len(org_ids), step): + sql = "SELECT org_id FROM org_auto_rules_statistics_per_month WHERE org_id in :org_ids WHERE is_exceed=1 AND month=:month" + for org_per_month in db_session.execute(sql, {'org_ids': org_ids[i: i+step], 'month': month}): + exceed_dtable_uuids += set(owner_dtable_dict[org_per_month.org_id]) # each base's metadata only requested once and recorded in memory # The reason why it doesn't cache metadata in redis is metadatas in interval rules need to be up-to-date rule_interval_metadata_cache_manager = RuleIntervalMetadataCacheManager() for rule in rules: + if rule.dtable_uuid in exceed_dtable_uuids: + continue try: run_regular_execution_rule(rule, db_session, rule_interval_metadata_cache_manager) except Exception as e: diff --git a/dtable_events/automations/dtable_automation_rules_stats_updater.py b/dtable_events/automations/dtable_automation_rules_stats_updater.py new file mode 100644 index 00000000..50d29749 --- /dev/null +++ b/dtable_events/automations/dtable_automation_rules_stats_updater.py @@ -0,0 +1,170 @@ +import logging +import time +from datetime import date +from threading import Thread + +from apscheduler.schedulers.blocking import BlockingScheduler + +from dtable_events.app.config import ENV_CCNET_CONF_PATH, get_config, DTABLE_WEB_SERVICE_URL +from dtable_events.db import init_db_session_class +from dtable_events.utils.dtable_web_api import DTableWebAPI + +logger = logging.getLogger(__name__) + + +class DTableAutomationRulesStatsUpdater: + + def __init__(self, config): + self.session_class = init_db_session_class(config) + ccnet_config = get_config(ENV_CCNET_CONF_PATH) + if ccnet_config.has_section('Database'): + ccnet_db_name = ccnet_config.get('Database', 'DB', fallback='ccnet') + else: + ccnet_db_name = 'ccnet' + self.ccnet_db_name = ccnet_db_name + + def start(self): + DTableAutomationRulesStatsUpdaterTimer(self.session_class, self.ccnet_db_name).start() + + +class DTableAutomationRulesStatsUpdaterTimer(Thread): + + def __init__(self, session_class, ccnet_db_name): + super(DTableAutomationRulesStatsUpdaterTimer, self).__init__() + self.session_class = session_class + self.ccnet_db_name = ccnet_db_name + self.daemon = True + + def get_roles(self): + dtable_web_api = DTableWebAPI(DTABLE_WEB_SERVICE_URL) + try: + roles = dtable_web_api.internal_roles()['roles'] + except Exception as e: + logger.error('get roles error: %s', e) + return {} + return roles + + def update_users_stats(self, roles, db_session): + month = str(date.today())[:7] + limit, offset = 1000, 0 + username_trigger_count = {} + exceed_usernames = [] + usernames = [] + db_set_usernames_set = set() + while True: + sql = "SELECT username, trigger_count FROM user_auto_rules_statistics_per_month WHERE month=:month LIMIT :offset, :limit" + users_stats = list(db_session.execute(sql, {'month': month, 'limit': limit, 'offset': offset})) + if len(users_stats) < limit: + break + for user_stats in users_stats: + username_trigger_count[user_stats.username] = user_stats.trigger_count + usernames.append(user_stats.username) + offset += limit + logger.debug('query out %s users', len(usernames)) + # query db user_quota + step = 1000 + for i in range(0, len(usernames), step): + sql = "SELECT username, auto_rules_limit_per_month FROM user_quota WHERE username in :usernames" + for user_quota in db_session.execute(sql, {'usernames': usernames[i: i+step]}): + logger.debug('username: %s auto_rules_limit_per_month: %s trigger_count: %s', + user_quota.username, user_quota.auto_rules_limit_per_month, username_trigger_count[user_quota.username]) + if user_quota.auto_rules_limit_per_month is None: + continue + elif user_quota.auto_rules_limit_per_month < 0: # need to query role + db_set_usernames_set.add(user_quota.username) + continue + elif user_quota.auto_rules_limit_per_month == 0: + continue + db_set_usernames_set.add(user_quota.username) + if user_quota.auto_rules_limit_per_month <= username_trigger_count[user_quota.username]: + exceed_usernames.append(user_quota.username) + # query db users role + usernames = [username for username in usernames if username not in db_set_usernames_set] + logger.debug('totally %s users need to check roles', len(usernames)) + if usernames: + for i in range(0, len(usernames), step): + sql = "SELECT email, role FROM %s.UserRole WHERE email in :usernames" % self.ccnet_db_name + for role in db_session.execute(sql, {'usernames': usernames[i: i+step]}): + role_trigger_limit = roles.get(role.role, {}).get('automation_rules_limit_per_month', -1) + logger.debug('check role user: %s role: %s role_trigger_limit: %s trigger_count: %s', role.email, role.role, role_trigger_limit, username_trigger_count[role.email]) + if role_trigger_limit < 0: + continue + if role_trigger_limit <= username_trigger_count[role.email]: + exceed_usernames.append(role.email) + # update exceed + for i in range(0, len(exceed_usernames), step): + sql = "UPDATE user_auto_rules_statistics_per_month SET is_exceed=1 WHERE month=:month AND username IN :usernames" + db_session.execute(sql, {'month': month, 'usernames': exceed_usernames[i: i+step]}) + db_session.commit() + + def update_orgs_stats(self, roles, db_session): + month = str(date.today())[:7] + limit, offset = 1000, 0 + org_id_trigger_count = {} + exceed_org_ids = [] + org_ids = [] + db_set_org_ids_set = set() + while True: + sql = "SELECT org_id, trigger_count FROM org_auto_rules_statistics_per_month WHERE month=:month LIMIT :offset, :limit" + orgs_stats = list(db_session.execute(sql, {'month': month, 'limit': limit, 'offset': offset})) + for org_stats in orgs_stats: + org_id_trigger_count[org_stats.org_id] = org_stats.trigger_count + org_ids.append(org_stats.org_id) + if len(orgs_stats) < limit: + break + offset += limit + logger.debug('query out %s orgs', len(org_ids)) + # query db user_quota + step = 1000 + for i in range(0, len(org_ids), step): + sql = "SELECT org_id, auto_rules_limit_per_month FROM organizations_org_quota WHERE org_id in :org_ids" + for org_quota in db_session.execute(sql, {'org_ids': org_ids[i: i+step]}): + logger.debug('org: %s auto_rules_limit_per_month: %s trigger_count: %s', + org_quota.org_id, org_quota.auto_rules_limit_per_month, org_id_trigger_count[org_quota.org_id]) + if org_quota.auto_rules_limit_per_month is None: + continue + elif org_quota.auto_rules_limit_per_month < 0: + db_set_org_ids_set.add(org_quota.org_id) + continue + elif org_quota.auto_rules_limit_per_month == 0: # need to check role + continue + db_set_org_ids_set.add(org_quota.org_id) + if org_quota.auto_rules_limit_per_month <= org_id_trigger_count[org_quota.org_id]: + exceed_org_ids.append(org_quota.org_id) + # query db orgs role + org_ids = [org_id for org_id in org_ids if org_id not in db_set_org_ids_set] + logger.debug('totally %s orgs need to check roles', len(org_ids)) + if org_ids: + for i in range(0, len(org_ids), step): + sql = "SELECT org_id, role FROM organizations_orgsettings WHERE org_id in :org_ids" + for role in db_session.execute(sql, {'org_ids': org_ids[i: i+step]}): + role_trigger_limit = roles.get(role.role, {}).get('automation_rules_limit_per_month', -1) + logger.debug('check role org: %s role: %s role_trigger_limit: %s trigger_count: %s', role.org_id, role.role, role_trigger_limit, org_id_trigger_count[role.org_id]) + if role_trigger_limit < 0: + continue + if role_trigger_limit <= org_id_trigger_count[role.org_id]: + exceed_org_ids.append(role.org_id) + # update exceed + for i in range(0, len(exceed_org_ids), step): + sql = "UPDATE org_auto_rules_statistics_per_month SET is_exceed=1 WHERE month=:month AND org_id IN :org_ids" + db_session.execute(sql, {'month': month, 'org_ids': exceed_org_ids[i: i+step]}) + db_session.commit() + + def update_stats(self): + db_session = self.session_class() + roles = self.get_roles() + logger.debug('roles: %s', roles) + try: + self.update_users_stats(roles, db_session) + self.update_orgs_stats(roles, db_session) + except Exception as e: + logger.exception('update users/orgs auto rule stats error: %s', e) + + def run(self): + sched = BlockingScheduler() + + @sched.scheduled_job('cron', day_of_week='*', hour='*', minute='52') + def update(): + self.update_stats() + + sched.start() diff --git a/dtable_events/utils/dtable_web_api.py b/dtable_events/utils/dtable_web_api.py index e908cf6b..50368040 100644 --- a/dtable_events/utils/dtable_web_api.py +++ b/dtable_events/utils/dtable_web_api.py @@ -130,3 +130,13 @@ def internal_add_notification(self, to_users, msg_type, detail): 'type': msg_type }, headers=headers) return parse_response(resp) + + def internal_roles(self): + logger.debug('internal roles') + url = '%(server_url)s/api/v2.1/internal-roles/?from=dtable_events' % { + 'server_url': self.dtable_web_service_url + } + token = jwt.encode({'is_internal': True}, DTABLE_PRIVATE_KEY, algorithm='HS256') + headers = {'Authorization': 'Token ' + token} + resp = requests.get(url, headers=headers) + return parse_response(resp) From 2dccfc1b572387a812260053b9cdfd5eb12056f4 Mon Sep 17 00:00:00 2001 From: AlexHappy <1223408988@qq.com> Date: Sun, 3 Dec 2023 02:36:35 +0800 Subject: [PATCH 2/5] fix update user auto-rules stats sql and opt auto-rules --- dtable_events/automations/actions.py | 6 ++-- dtable_events/automations/auto_rules_utils.py | 32 +++++++++++-------- .../dtable_automation_rules_scanner.py | 3 +- .../dtable_automation_rules_stats_updater.py | 6 ++-- dtable_events/dtable_io/request_handler.py | 5 +-- dtable_events/dtable_io/task_manager.py | 5 +-- 6 files changed, 34 insertions(+), 23 deletions(-) diff --git a/dtable_events/automations/actions.py b/dtable_events/automations/actions.py index 1733f7eb..f28d5b33 100644 --- a/dtable_events/automations/actions.py +++ b/dtable_events/automations/actions.py @@ -2927,6 +2927,7 @@ def __init__(self, data, db_session, raw_trigger, raw_actions, options, metadata self.trigger_count = options.get('trigger_count', None) self.org_id = options.get('org_id', None) self.creator = options.get('creator', None) + self.owner = options.get('owner', None) self.data = data self.db_session = db_session @@ -3469,8 +3470,9 @@ def update_last_trigger_time(self): sqls = [set_last_trigger_time_sql] if self.org_id: if self.org_id == -1: - sqls.append(set_statistic_sql_user) - sqls.append(set_statistic_sql_user_per_month) + if '@seafile_group' not in self.owner: + sqls.append(set_statistic_sql_user) + sqls.append(set_statistic_sql_user_per_month) else: sqls.append(set_statistic_sql_org) sqls.append(set_statistic_sql_org_per_month) diff --git a/dtable_events/automations/auto_rules_utils.py b/dtable_events/automations/auto_rules_utils.py index 38047b72..e64043a3 100644 --- a/dtable_events/automations/auto_rules_utils.py +++ b/dtable_events/automations/auto_rules_utils.py @@ -10,38 +10,41 @@ def can_trgger_by_dtable(dtable_uuid, db_session): + """ + :return: workspace -> obj with `owner` and `org_id` or None, can_trigger -> bool + """ sql = "SELECT w.owner, w.org_id FROM workspaces w JOIN dtables d ON w.id=d.workspace_id WHERE d.uuid=:dtable_uuid" try: workspace = db_session.execute(sql, {'dtable_uuid': uuid_str_to_32_chars(dtable_uuid)}).fetchone() except Exception as e: logger.error('check dtable: %s workspace error: %s', dtable_uuid, e) - return True + return None, True if not workspace: logger.error('dtable: %s workspace not found', dtable_uuid) - return True + return None, True month = str(date.today())[:7] if workspace.org_id == -1: if '@seafile_group' in workspace.owner: # groups not belong to orgs can always trigger auto rules - return True + return workspace, True sql = "SELECT is_exceed FROM user_auto_rules_statistics_per_month WHERE username=:username AND month=:month" try: user_per_month = db_session.execute(sql, {'username': workspace.owner, 'month': month}).fetchone() except Exception as e: logger.error('check user: %s auto rule per month error: %s', workspace.owner, e) - return True + return workspace, True if not user_per_month: - return True - return not user_per_month.is_exceed + return workspace, True + return workspace, not user_per_month.is_exceed else: sql = "SELECT is_exceed FROM org_auto_rules_statistics_per_month WHERE org_id=:org_id AND month=:month" try: org_per_month = db_session.execute(sql, {'org_id': workspace.org_id, 'month': month}).fetchone() except Exception as e: logger.error('check org: %s auto rule per month error: %s', workspace.org_id, e) - return True + return workspace, True if not org_per_month: - return True - return not org_per_month.is_exceed + return workspace, True + return None, not org_per_month.is_exceed def scan_triggered_automation_rules(event_data, db_session, per_minute_trigger_limit): @@ -60,7 +63,8 @@ def scan_triggered_automation_rules(event_data, db_session, per_minute_trigger_l if not rule: return - if not can_trgger_by_dtable(dtable_uuid, db_session): + workspace, can_trigger = can_trgger_by_dtable(dtable_uuid, db_session) + if not can_trigger: return rule_intent_metadata_cache_manager = RuleIntentMetadataCacheManger() @@ -69,9 +73,10 @@ def scan_triggered_automation_rules(event_data, db_session, per_minute_trigger_l 'run_condition': rule.run_condition, 'dtable_uuid': dtable_uuid, 'trigger_count': rule.trigger_count, - 'org_id': rule.org_id, + 'org_id': workspace.org_id if workspace else rule.org_id, 'creator': rule.creator, 'last_trigger_time': rule.last_trigger_time, + 'owner': workspace.owner if workspace else None } try: auto_rule = AutomationRule(event_data, db_session, rule.trigger, rule.actions, options, rule_intent_metadata_cache_manager, per_minute_trigger_limit=per_minute_trigger_limit) @@ -90,8 +95,9 @@ def run_regular_execution_rule(rule, db_session, metadata_cache_manager): options['last_trigger_time'] = rule[4] options['dtable_uuid'] = rule[5] options['trigger_count'] = rule[6] - options['org_id'] = rule[7] - options['creator'] = rule[8] + options['creator'] = rule[7] + options['owner'] = rule[8] + options['org_id'] = rule[9] try: auto_rule = AutomationRule(None, db_session, trigger, actions, options, metadata_cache_manager) auto_rule.do_actions() diff --git a/dtable_events/automations/dtable_automation_rules_scanner.py b/dtable_events/automations/dtable_automation_rules_scanner.py index 226fcf50..a22e5de2 100644 --- a/dtable_events/automations/dtable_automation_rules_scanner.py +++ b/dtable_events/automations/dtable_automation_rules_scanner.py @@ -54,8 +54,9 @@ def is_enabled(self): def scan_dtable_automation_rules(db_session): sql = ''' - SELECT `dar`.`id`, `run_condition`, `trigger`, `actions`, `last_trigger_time`, `dtable_uuid`, `trigger_count`, `org_id`, dar.`creator` FROM dtable_automation_rules dar + SELECT `dar`.`id`, `run_condition`, `trigger`, `actions`, `last_trigger_time`, `dtable_uuid`, `trigger_count`, dar.`creator`, w.`owner`, w.`org_id` FROM dtable_automation_rules dar JOIN dtables d ON dar.dtable_uuid=d.uuid + JOIN workspaces w ON d.workspace_id=w.id WHERE ((run_condition='per_day' AND (last_trigger_time<:per_day_check_time OR last_trigger_time IS NULL)) OR (run_condition='per_week' AND (last_trigger_time<:per_week_check_time OR last_trigger_time IS NULL)) OR (run_condition='per_month' AND (last_trigger_time<:per_month_check_time OR last_trigger_time IS NULL))) diff --git a/dtable_events/automations/dtable_automation_rules_stats_updater.py b/dtable_events/automations/dtable_automation_rules_stats_updater.py index 50d29749..9bfeb978 100644 --- a/dtable_events/automations/dtable_automation_rules_stats_updater.py +++ b/dtable_events/automations/dtable_automation_rules_stats_updater.py @@ -54,11 +54,11 @@ def update_users_stats(self, roles, db_session): while True: sql = "SELECT username, trigger_count FROM user_auto_rules_statistics_per_month WHERE month=:month LIMIT :offset, :limit" users_stats = list(db_session.execute(sql, {'month': month, 'limit': limit, 'offset': offset})) - if len(users_stats) < limit: - break for user_stats in users_stats: username_trigger_count[user_stats.username] = user_stats.trigger_count usernames.append(user_stats.username) + if len(users_stats) < limit: + break offset += limit logger.debug('query out %s users', len(usernames)) # query db user_quota @@ -163,7 +163,7 @@ def update_stats(self): def run(self): sched = BlockingScheduler() - @sched.scheduled_job('cron', day_of_week='*', hour='*', minute='52') + @sched.scheduled_job('cron', day_of_week='*', hour='*', minute='*/30') def update(): self.update_stats() diff --git a/dtable_events/dtable_io/request_handler.py b/dtable_events/dtable_io/request_handler.py index 2f333982..645197b5 100644 --- a/dtable_events/dtable_io/request_handler.py +++ b/dtable_events/dtable_io/request_handler.py @@ -653,7 +653,8 @@ def add_run_auto_rule_task(): if not isinstance(data, dict): return make_response(('Bad request', 400)) - username = data.get('username') + creator = data.get('creator') + owner = data.get('owner') org_id = data.get('org_id') run_condition = data.get('run_condition') trigger = data.get('trigger') @@ -663,7 +664,7 @@ def add_run_auto_rule_task(): try: task_id = task_manager.add_run_auto_rule_task( - automation_rule_id, username, org_id, dtable_uuid, run_condition, trigger, actions) + automation_rule_id, creator, owner, org_id, dtable_uuid, run_condition, trigger, actions) except Exception as e: logger.error(e) return make_response((e, 500)) diff --git a/dtable_events/dtable_io/task_manager.py b/dtable_events/dtable_io/task_manager.py index 00e2c7a8..e844274e 100644 --- a/dtable_events/dtable_io/task_manager.py +++ b/dtable_events/dtable_io/task_manager.py @@ -158,14 +158,15 @@ def add_append_excel_csv_upload_file_task(self, username, repo_id, file_name, dt self.tasks_map[task_id] = task return task_id - def add_run_auto_rule_task(self, automation_rule_id, username, org_id, dtable_uuid, run_condition, trigger, actions): + def add_run_auto_rule_task(self, automation_rule_id, creator, owner, org_id, dtable_uuid, run_condition, trigger, actions): from dtable_events.automations.auto_rules_utils import run_auto_rule_task task_id = str(uuid.uuid4()) options = { 'run_condition': run_condition, 'dtable_uuid': dtable_uuid, 'org_id': org_id, - 'creator': username, + 'owner': owner, + 'creator': creator, 'rule_id': automation_rule_id } From 8f8e2202bc45f5094d611bb5f9a9d4660d150f34 Mon Sep 17 00:00:00 2001 From: AlexHappy <1223408988@qq.com> Date: Sun, 3 Dec 2023 02:56:03 +0800 Subject: [PATCH 3/5] fix auto-rules scanner query stats --- dtable_events/automations/dtable_automation_rules_scanner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dtable_events/automations/dtable_automation_rules_scanner.py b/dtable_events/automations/dtable_automation_rules_scanner.py index a22e5de2..c48d3c36 100644 --- a/dtable_events/automations/dtable_automation_rules_scanner.py +++ b/dtable_events/automations/dtable_automation_rules_scanner.py @@ -88,11 +88,11 @@ def scan_dtable_automation_rules(db_session): owner_dtable_dict[dtable.org_id].append(dtable.uuid) org_ids.append(dtable.org_id) for i in range(0, len(usernames), step): - sql = "SELECT username FROM user_auto_rules_statistics_per_month WHERE username in :usernames WHERE is_exceed=1 AND month=:month" + sql = "SELECT username FROM user_auto_rules_statistics_per_month WHERE username in :usernames AND month=:month AND is_exceed=1" for user_per_month in db_session.execute(sql, {'usernames': usernames[i: i+step], 'month': month}): exceed_dtable_uuids += set(owner_dtable_dict[user_per_month.username]) for i in range(0, len(org_ids), step): - sql = "SELECT org_id FROM org_auto_rules_statistics_per_month WHERE org_id in :org_ids WHERE is_exceed=1 AND month=:month" + sql = "SELECT org_id FROM org_auto_rules_statistics_per_month WHERE org_id in :org_ids AND month=:month AND is_exceed=1" for org_per_month in db_session.execute(sql, {'org_ids': org_ids[i: i+step], 'month': month}): exceed_dtable_uuids += set(owner_dtable_dict[org_per_month.org_id]) From 7d71681754a8aadc50a29d4e37407ae2d66c1f8c Mon Sep 17 00:00:00 2001 From: AlexHappy <1223408988@qq.com> Date: Sun, 3 Dec 2023 03:04:15 +0800 Subject: [PATCH 4/5] fix unittest --- dtable_events/app/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dtable_events/app/config.py b/dtable_events/app/config.py index deff0405..1f56d7e6 100644 --- a/dtable_events/app/config.py +++ b/dtable_events/app/config.py @@ -48,7 +48,7 @@ IS_PRO_VERSION = getattr(seahub_settings, 'IS_PRO_VERSION', False) # env - ENV_SEAFILE_CENTRAL_CONF_DIR = os.environ['SEAFILE_CENTRAL_CONF_DIR'] + ENV_SEAFILE_CENTRAL_CONF_DIR = os.environ.get('SEAFILE_CENTRAL_CONF_DIR') if ENV_SEAFILE_CENTRAL_CONF_DIR: ENV_CCNET_CONF_PATH = os.path.join(ENV_SEAFILE_CENTRAL_CONF_DIR, 'ccnet.conf') elif 'CCNET_CONF_DIR' in os.environ: From 6fab0c8bb116f974980ff37d7a38aea6b8af3aa1 Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Fri, 23 Feb 2024 15:13:27 +0800 Subject: [PATCH 5/5] update is_exceed in new thread, add has_sent_warning field and send notification when auto rules trigger reach warning --- dtable_events/app/app.py | 8 +- dtable_events/app/config.py | 7 +- dtable_events/automations/actions.py | 22 ++- .../automations/auto_rules_stats_updater.py | 138 ++++++++++++++ dtable_events/automations/auto_rules_utils.py | 4 +- .../dtable_automation_rules_stats_updater.py | 170 ------------------ dtable_events/utils/dtable_web_api.py | 2 +- 7 files changed, 161 insertions(+), 190 deletions(-) create mode 100644 dtable_events/automations/auto_rules_stats_updater.py delete mode 100644 dtable_events/automations/dtable_automation_rules_stats_updater.py diff --git a/dtable_events/app/app.py b/dtable_events/app/app.py index 4cbb9e7a..fe01afa8 100644 --- a/dtable_events/app/app.py +++ b/dtable_events/app/app.py @@ -14,7 +14,7 @@ from dtable_events.notification_rules.dtable_notification_rules_scanner import DTableNofiticationRulesScanner from dtable_events.automations.handler import AutomationRuleHandler from dtable_events.automations.dtable_automation_rules_scanner import DTableAutomationRulesScanner -from dtable_events.automations.dtable_automation_rules_stats_updater import DTableAutomationRulesStatsUpdater +from dtable_events.automations.auto_rules_stats_updater import auto_rules_stats_updater from dtable_events.webhook.webhook import Webhooker from dtable_events.common_dataset.common_dataset_syncer import CommonDatasetSyncer from dtable_events.tasks.big_data_storage_stats_worker import BigDataStorageStatsWorker @@ -48,7 +48,6 @@ def __init__(self, config, task_mode): self._dtable_updates_sender = DTableUpdatesSender(config) self._dtable_notification_rules_scanner = DTableNofiticationRulesScanner(config) self._dtable_automation_rules_scanner = DTableAutomationRulesScanner(config) - self._dtable_automation_rules_stats_updater = DTableAutomationRulesStatsUpdater(config) self._ldap_syncer = LDAPSyncer(config) self._common_dataset_syncer = CommonDatasetSyncer(config) self._big_data_storage_stats_worker = BigDataStorageStatsWorker(config) @@ -58,6 +57,8 @@ def __init__(self, config, task_mode): self._license_expiring_notices_sender = LicenseExpiringNoticesSender() # convert pdf manager conver_page_to_pdf_manager.init(config) + # automation rules statistics updater + auto_rules_stats_updater.init(config) def serve_forever(self): @@ -80,7 +81,6 @@ def serve_forever(self): self._dtable_updates_sender.start() # default True self._dtable_notification_rules_scanner.start() # default True self._dtable_automation_rules_scanner.start() # default True - self._dtable_automation_rules_stats_updater.start() # always True self._ldap_syncer.start() # default False self._common_dataset_syncer.start() # default True self._big_data_storage_stats_worker.start() # always True @@ -90,3 +90,5 @@ def serve_forever(self): self._license_expiring_notices_sender.start() # always True # convert pdf manager conver_page_to_pdf_manager.start() # always True + # automation rules statistics updater + auto_rules_stats_updater.start() diff --git a/dtable_events/app/config.py b/dtable_events/app/config.py index 1f56d7e6..2a33f0ee 100644 --- a/dtable_events/app/config.py +++ b/dtable_events/app/config.py @@ -48,11 +48,8 @@ IS_PRO_VERSION = getattr(seahub_settings, 'IS_PRO_VERSION', False) # env - ENV_SEAFILE_CENTRAL_CONF_DIR = os.environ.get('SEAFILE_CENTRAL_CONF_DIR') - if ENV_SEAFILE_CENTRAL_CONF_DIR: - ENV_CCNET_CONF_PATH = os.path.join(ENV_SEAFILE_CENTRAL_CONF_DIR, 'ccnet.conf') - elif 'CCNET_CONF_DIR' in os.environ: - ENV_CCNET_CONF_PATH = os.path.join(os.environ['CCNET_CONF_DIR'], 'ccnet.conf') + ENV_SEAFILE_CENTRAL_CONF_DIR = os.environ.get('SEAFILE_CENTRAL_CONF_DIR', '') + ENV_CCNET_CONF_PATH = os.path.join(ENV_SEAFILE_CENTRAL_CONF_DIR, 'ccnet.conf') except Exception as e: logger.critical("Can not import dtable_web settings: %s." % e) raise RuntimeError("Can not import dtable_web settings: %s" % e) diff --git a/dtable_events/automations/actions.py b/dtable_events/automations/actions.py index f28d5b33..d4f6813f 100644 --- a/dtable_events/automations/actions.py +++ b/dtable_events/automations/actions.py @@ -15,6 +15,7 @@ from seaserv import seafile_api from dtable_events.automations.models import get_third_party_account +from dtable_events.automations.auto_rules_stats_updater import auto_rules_stats_updater from dtable_events.app.metadata_cache_managers import BaseMetadataCacheManager from dtable_events.app.event_redis import redis_cache from dtable_events.app.config import DTABLE_WEB_SERVICE_URL, DTABLE_PRIVATE_KEY, \ @@ -3480,20 +3481,23 @@ def update_last_trigger_time(self): cur_date = datetime.now().date() cur_year, cur_month = cur_date.year, cur_date.month trigger_date = date(year=cur_year, month=cur_month, day=1) + sql_data = { + 'rule_id': self.rule_id, + 'trigger_time': datetime.utcnow(), + 'trigger_date': trigger_date, + 'trigger_count': self.trigger_count + 1, + 'username': self.creator, + 'org_id': self.org_id, + 'month': str(date.today())[:7] + } for sql in sqls: - self.db_session.execute(sql, { - 'rule_id': self.rule_id, - 'trigger_time': datetime.utcnow(), - 'trigger_date': trigger_date, - 'trigger_count': self.trigger_count + 1, - 'username': self.creator, - 'org_id': self.org_id, - 'month': str(date.today())[:7] - }) + self.db_session.execute(sql, sql_data) self.db_session.commit() except Exception as e: logger.exception('set rule: %s error: %s', self.rule_id, e) + auto_rules_stats_updater.add_info(sql_data) + if self.run_condition == PER_UPDATE and self.per_minute_trigger_limit > 0: trigger_times = redis_cache.get(self.cache_key) if not trigger_times: diff --git a/dtable_events/automations/auto_rules_stats_updater.py b/dtable_events/automations/auto_rules_stats_updater.py new file mode 100644 index 00000000..6e4442d1 --- /dev/null +++ b/dtable_events/automations/auto_rules_stats_updater.py @@ -0,0 +1,138 @@ +import logging +from datetime import date +from queue import Queue +from threading import Thread + +from seaserv import ccnet_api + +from dtable_events.app.config import ENV_CCNET_CONF_PATH, get_config, DTABLE_WEB_SERVICE_URL +from dtable_events.db import init_db_session_class +from dtable_events.utils.dtable_web_api import DTableWebAPI + +logger = logging.getLogger(__name__) + + +class AutoRulesStatsUpdater: + + def init(self, config): + self.config = config + self.queue = Queue() + self.dtable_web_api = DTableWebAPI(DTABLE_WEB_SERVICE_URL) + self.roles = None + self.db_session_class = init_db_session_class(config) + + ccnet_config = get_config(ENV_CCNET_CONF_PATH) + if ccnet_config.has_section('Database'): + ccnet_db_name = ccnet_config.get('Database', 'DB', fallback='ccnet') + else: + ccnet_db_name = 'ccnet' + self.ccnet_db_name = ccnet_db_name + + def get_roles(self): + if self.roles: + return self.roles + self.roles = self.dtable_web_api.internal_roles() + return self.roles + + def get_user_quota(self, db_session, username): + sql = "SELECT username, auto_rules_limit_per_month FROM user_quota WHERE username=:username" + row = db_session.execute(sql, {'username': username}).fetchone() + if row and row.auto_rules_limit_per_month and row.auto_rules_limit_per_month != 0: + return row.auto_rules_limit_per_month + user = ccnet_api.get_emailuser(username) + user_role = user.role + return self.roles.get(user_role, {}).get('automation_rules_limit_per_month', -1) + + def get_org_quota(self, db_session, org_id): + sql = "SELECT org_id, auto_rules_limit_per_month FROM organizations_org_quota WHERE org_id=:org_id" + row = db_session.execute(sql, {'org_id': org_id}).fetchone() + if row and row.auto_rules_limit_per_month and row.auto_rules_limit_per_month != 0: + return row.auto_rules_limit_per_month + sql = "SELECT role FROM organizations_orgsettings WHERE org_id=:org_id" + row = db_session.execute(sql, {'org_id': org_id}).fetchone() + if not row: + org_role = 'org_default' # check from dtable-web/seahub/role_permissions/settings DEFAULT_ENABLED_ROLE_PERMISSIONS[ORG_DEFAULT] + else: + org_role = row.role + return self.get_roles().get(org_role, {}).get('automation_rules_limit_per_month', -1) + + def get_user_usage(self, db_session, username): + """ + :return: trigger_count -> int, has_sent_warning -> bool + """ + sql = "SELECT trigger_count, has_sent_warning FROM user_auto_rules_statistics_per_month WHERE username=:username AND month=:month" + row = db_session.execute(sql, {'username': username, 'month': str(date.today())[:7]}).fetchone() + if not row: + return 0, False + return row.trigger_count, row.has_sent_warning + + def get_org_usage(self, db_session, org_id): + """ + :return: trigger_count -> int, has_sent_warning -> bool + """ + sql = "SELECT trigger_count, has_sent_warning FROM org_auto_rules_statistics_per_month WHERE org_id=:org_id AND month=:month" + row = db_session.execute(sql, {'org_id': org_id, 'month': str(date.today())[:7]}).fetchone() + if not row: + return 0, False + return row.trigger_count, row.has_sent_warning + + def update_user(self, db_session, username): + limit = self.get_user_quota(db_session, username) + if limit < 0: + return + usage, has_sent_warning = self.get_user_usage(db_session, username) + if usage >= limit: + sql = "UPDATE user_auto_rules_statistics_per_month SET is_exceed=1 WHERE username=:username" + db_session.execute(sql, {'username': username}) + db_session.commit() + return + if not has_sent_warning and usage >= limit * 0.9: + self.dtable_web_api.internal_add_notification([username], 'autorule_trigger_reach_warning', {'limit': limit, 'usage': usage}) + sql = "UPDATE user_auto_rules_statistics_per_month SET has_sent_warning=1 WHERE username=:username" + db_session.execute(sql, {'username': username}) + db_session.commit() + + def update_org(self, db_session, org_id): + limit = self.get_org_quota(db_session, org_id) + if limit < 0: + return + usage, has_sent_warning = self.get_org_usage(db_session, org_id) + if usage >= limit: + sql = "UPDATE org_auto_rules_statistics_per_month SET is_exceed=1 WHERE org_id=:org_id" + db_session.execute(sql, {'org_id': org_id}) + db_session.commit() + return + if not has_sent_warning and usage >= limit * 0.9: + admins = [] + sql = "SELECT email FROM %s.OrgUser WHERE org_id=:org_id AND is_staff=1" % self.ccnet_db_name + for row in db_session.execute(sql, {'org_id': org_id}): + admins.append(row.email) + self.dtable_web_api.internal_add_notification(admins, 'autorule_trigger_reach_warning', {'limit': limit, 'usage': usage}) + sql = "UPDATE org_auto_rules_statistics_per_month SET has_sent_warning=1 WHERE org_id=:org_id" + db_session.execute(sql, {'org_id': org_id}) + db_session.commit() + + def update_stats(self): + while True: + auto_rule_info = self.queue.get() + username = auto_rule_info.get('username') + org_id = auto_rule_info.get('org_id') + db_session = self.db_session_class() + try: + if org_id == -1 and username: + self.update_user(db_session, username) + elif org_id != -1: + self.update_org(db_session, org_id) + except Exception as e: + logger.exception('update stats info: %s error: %s', auto_rule_info, e) + finally: + db_session.close() + + def start(self): + Thread(target=self.update_stats, daemon=True, name='Thread-auto-rules-stats-updater').start() + + def add_info(self, info): + self.queue.put(info) + + +auto_rules_stats_updater = AutoRulesStatsUpdater() diff --git a/dtable_events/automations/auto_rules_utils.py b/dtable_events/automations/auto_rules_utils.py index e64043a3..7e754b15 100644 --- a/dtable_events/automations/auto_rules_utils.py +++ b/dtable_events/automations/auto_rules_utils.py @@ -9,7 +9,7 @@ logger = logging.getLogger(__name__) -def can_trgger_by_dtable(dtable_uuid, db_session): +def can_trigger_by_dtable(dtable_uuid, db_session): """ :return: workspace -> obj with `owner` and `org_id` or None, can_trigger -> bool """ @@ -63,7 +63,7 @@ def scan_triggered_automation_rules(event_data, db_session, per_minute_trigger_l if not rule: return - workspace, can_trigger = can_trgger_by_dtable(dtable_uuid, db_session) + workspace, can_trigger = can_trigger_by_dtable(dtable_uuid, db_session) if not can_trigger: return diff --git a/dtable_events/automations/dtable_automation_rules_stats_updater.py b/dtable_events/automations/dtable_automation_rules_stats_updater.py deleted file mode 100644 index 9bfeb978..00000000 --- a/dtable_events/automations/dtable_automation_rules_stats_updater.py +++ /dev/null @@ -1,170 +0,0 @@ -import logging -import time -from datetime import date -from threading import Thread - -from apscheduler.schedulers.blocking import BlockingScheduler - -from dtable_events.app.config import ENV_CCNET_CONF_PATH, get_config, DTABLE_WEB_SERVICE_URL -from dtable_events.db import init_db_session_class -from dtable_events.utils.dtable_web_api import DTableWebAPI - -logger = logging.getLogger(__name__) - - -class DTableAutomationRulesStatsUpdater: - - def __init__(self, config): - self.session_class = init_db_session_class(config) - ccnet_config = get_config(ENV_CCNET_CONF_PATH) - if ccnet_config.has_section('Database'): - ccnet_db_name = ccnet_config.get('Database', 'DB', fallback='ccnet') - else: - ccnet_db_name = 'ccnet' - self.ccnet_db_name = ccnet_db_name - - def start(self): - DTableAutomationRulesStatsUpdaterTimer(self.session_class, self.ccnet_db_name).start() - - -class DTableAutomationRulesStatsUpdaterTimer(Thread): - - def __init__(self, session_class, ccnet_db_name): - super(DTableAutomationRulesStatsUpdaterTimer, self).__init__() - self.session_class = session_class - self.ccnet_db_name = ccnet_db_name - self.daemon = True - - def get_roles(self): - dtable_web_api = DTableWebAPI(DTABLE_WEB_SERVICE_URL) - try: - roles = dtable_web_api.internal_roles()['roles'] - except Exception as e: - logger.error('get roles error: %s', e) - return {} - return roles - - def update_users_stats(self, roles, db_session): - month = str(date.today())[:7] - limit, offset = 1000, 0 - username_trigger_count = {} - exceed_usernames = [] - usernames = [] - db_set_usernames_set = set() - while True: - sql = "SELECT username, trigger_count FROM user_auto_rules_statistics_per_month WHERE month=:month LIMIT :offset, :limit" - users_stats = list(db_session.execute(sql, {'month': month, 'limit': limit, 'offset': offset})) - for user_stats in users_stats: - username_trigger_count[user_stats.username] = user_stats.trigger_count - usernames.append(user_stats.username) - if len(users_stats) < limit: - break - offset += limit - logger.debug('query out %s users', len(usernames)) - # query db user_quota - step = 1000 - for i in range(0, len(usernames), step): - sql = "SELECT username, auto_rules_limit_per_month FROM user_quota WHERE username in :usernames" - for user_quota in db_session.execute(sql, {'usernames': usernames[i: i+step]}): - logger.debug('username: %s auto_rules_limit_per_month: %s trigger_count: %s', - user_quota.username, user_quota.auto_rules_limit_per_month, username_trigger_count[user_quota.username]) - if user_quota.auto_rules_limit_per_month is None: - continue - elif user_quota.auto_rules_limit_per_month < 0: # need to query role - db_set_usernames_set.add(user_quota.username) - continue - elif user_quota.auto_rules_limit_per_month == 0: - continue - db_set_usernames_set.add(user_quota.username) - if user_quota.auto_rules_limit_per_month <= username_trigger_count[user_quota.username]: - exceed_usernames.append(user_quota.username) - # query db users role - usernames = [username for username in usernames if username not in db_set_usernames_set] - logger.debug('totally %s users need to check roles', len(usernames)) - if usernames: - for i in range(0, len(usernames), step): - sql = "SELECT email, role FROM %s.UserRole WHERE email in :usernames" % self.ccnet_db_name - for role in db_session.execute(sql, {'usernames': usernames[i: i+step]}): - role_trigger_limit = roles.get(role.role, {}).get('automation_rules_limit_per_month', -1) - logger.debug('check role user: %s role: %s role_trigger_limit: %s trigger_count: %s', role.email, role.role, role_trigger_limit, username_trigger_count[role.email]) - if role_trigger_limit < 0: - continue - if role_trigger_limit <= username_trigger_count[role.email]: - exceed_usernames.append(role.email) - # update exceed - for i in range(0, len(exceed_usernames), step): - sql = "UPDATE user_auto_rules_statistics_per_month SET is_exceed=1 WHERE month=:month AND username IN :usernames" - db_session.execute(sql, {'month': month, 'usernames': exceed_usernames[i: i+step]}) - db_session.commit() - - def update_orgs_stats(self, roles, db_session): - month = str(date.today())[:7] - limit, offset = 1000, 0 - org_id_trigger_count = {} - exceed_org_ids = [] - org_ids = [] - db_set_org_ids_set = set() - while True: - sql = "SELECT org_id, trigger_count FROM org_auto_rules_statistics_per_month WHERE month=:month LIMIT :offset, :limit" - orgs_stats = list(db_session.execute(sql, {'month': month, 'limit': limit, 'offset': offset})) - for org_stats in orgs_stats: - org_id_trigger_count[org_stats.org_id] = org_stats.trigger_count - org_ids.append(org_stats.org_id) - if len(orgs_stats) < limit: - break - offset += limit - logger.debug('query out %s orgs', len(org_ids)) - # query db user_quota - step = 1000 - for i in range(0, len(org_ids), step): - sql = "SELECT org_id, auto_rules_limit_per_month FROM organizations_org_quota WHERE org_id in :org_ids" - for org_quota in db_session.execute(sql, {'org_ids': org_ids[i: i+step]}): - logger.debug('org: %s auto_rules_limit_per_month: %s trigger_count: %s', - org_quota.org_id, org_quota.auto_rules_limit_per_month, org_id_trigger_count[org_quota.org_id]) - if org_quota.auto_rules_limit_per_month is None: - continue - elif org_quota.auto_rules_limit_per_month < 0: - db_set_org_ids_set.add(org_quota.org_id) - continue - elif org_quota.auto_rules_limit_per_month == 0: # need to check role - continue - db_set_org_ids_set.add(org_quota.org_id) - if org_quota.auto_rules_limit_per_month <= org_id_trigger_count[org_quota.org_id]: - exceed_org_ids.append(org_quota.org_id) - # query db orgs role - org_ids = [org_id for org_id in org_ids if org_id not in db_set_org_ids_set] - logger.debug('totally %s orgs need to check roles', len(org_ids)) - if org_ids: - for i in range(0, len(org_ids), step): - sql = "SELECT org_id, role FROM organizations_orgsettings WHERE org_id in :org_ids" - for role in db_session.execute(sql, {'org_ids': org_ids[i: i+step]}): - role_trigger_limit = roles.get(role.role, {}).get('automation_rules_limit_per_month', -1) - logger.debug('check role org: %s role: %s role_trigger_limit: %s trigger_count: %s', role.org_id, role.role, role_trigger_limit, org_id_trigger_count[role.org_id]) - if role_trigger_limit < 0: - continue - if role_trigger_limit <= org_id_trigger_count[role.org_id]: - exceed_org_ids.append(role.org_id) - # update exceed - for i in range(0, len(exceed_org_ids), step): - sql = "UPDATE org_auto_rules_statistics_per_month SET is_exceed=1 WHERE month=:month AND org_id IN :org_ids" - db_session.execute(sql, {'month': month, 'org_ids': exceed_org_ids[i: i+step]}) - db_session.commit() - - def update_stats(self): - db_session = self.session_class() - roles = self.get_roles() - logger.debug('roles: %s', roles) - try: - self.update_users_stats(roles, db_session) - self.update_orgs_stats(roles, db_session) - except Exception as e: - logger.exception('update users/orgs auto rule stats error: %s', e) - - def run(self): - sched = BlockingScheduler() - - @sched.scheduled_job('cron', day_of_week='*', hour='*', minute='*/30') - def update(): - self.update_stats() - - sched.start() diff --git a/dtable_events/utils/dtable_web_api.py b/dtable_events/utils/dtable_web_api.py index 50368040..5dc81abc 100644 --- a/dtable_events/utils/dtable_web_api.py +++ b/dtable_events/utils/dtable_web_api.py @@ -139,4 +139,4 @@ def internal_roles(self): token = jwt.encode({'is_internal': True}, DTABLE_PRIVATE_KEY, algorithm='HS256') headers = {'Authorization': 'Token ' + token} resp = requests.get(url, headers=headers) - return parse_response(resp) + return parse_response(resp).get('roles', [])