Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Db update auto rules limit #566

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dtable_events/app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from dtable_events.notification_rules.dtable_notification_rules_scanner import DTableNofiticationRulesScanner
from dtable_events.automations.handler import AutomationRuleHandler
from dtable_events.automations.dtable_automation_rules_scanner import DTableAutomationRulesScanner
from dtable_events.automations.auto_rules_stats_updater import auto_rules_stats_updater
from dtable_events.webhook.webhook import Webhooker
from dtable_events.common_dataset.common_dataset_syncer import CommonDatasetSyncer
from dtable_events.tasks.big_data_storage_stats_worker import BigDataStorageStatsWorker
Expand Down Expand Up @@ -56,6 +57,8 @@ def __init__(self, config, task_mode):
self._license_expiring_notices_sender = LicenseExpiringNoticesSender()
# convert pdf manager
conver_page_to_pdf_manager.init(config)
# automation rules statistics updater
auto_rules_stats_updater.init(config)

def serve_forever(self):

Expand Down Expand Up @@ -87,3 +90,5 @@ def serve_forever(self):
self._license_expiring_notices_sender.start() # always True
# convert pdf manager
conver_page_to_pdf_manager.start() # always True
# automation rules statistics updater
auto_rules_stats_updater.start()
5 changes: 5 additions & 0 deletions dtable_events/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
sys.path.insert(0, dtable_web_dir)

try:
# settings in dtable-web
import seahub.settings as seahub_settings
DTABLE_WEB_SERVICE_URL = getattr(seahub_settings, 'DTABLE_WEB_SERVICE_URL', 'http://127.0.0.1')
DTABLE_PRIVATE_KEY = getattr(seahub_settings, 'DTABLE_PRIVATE_KEY', '')
Expand All @@ -45,6 +46,10 @@
TRASH_CLEAN_AFTER_DAYS = getattr(seahub_settings, 'TRASH_CLEAN_AFTER_DAYS', 30)
LICENSE_PATH = getattr(seahub_settings, 'LICENSE_PATH', '/shared/seatable-license.txt')
IS_PRO_VERSION = getattr(seahub_settings, 'IS_PRO_VERSION', False)

# env
ENV_SEAFILE_CENTRAL_CONF_DIR = os.environ.get('SEAFILE_CENTRAL_CONF_DIR', '')
ENV_CCNET_CONF_PATH = os.path.join(ENV_SEAFILE_CENTRAL_CONF_DIR, 'ccnet.conf')
except Exception as e:
logger.critical("Can not import dtable_web settings: %s." % e)
raise RuntimeError("Can not import dtable_web settings: %s" % e)
Expand Down
46 changes: 34 additions & 12 deletions dtable_events/automations/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from seaserv import seafile_api
from dtable_events.automations.models import get_third_party_account
from dtable_events.automations.auto_rules_stats_updater import auto_rules_stats_updater
from dtable_events.app.metadata_cache_managers import BaseMetadataCacheManager
from dtable_events.app.event_redis import redis_cache
from dtable_events.app.config import DTABLE_WEB_SERVICE_URL, DTABLE_PRIVATE_KEY, \
Expand Down Expand Up @@ -2927,6 +2928,7 @@ def __init__(self, data, db_session, raw_trigger, raw_actions, options, metadata
self.trigger_count = options.get('trigger_count', None)
self.org_id = options.get('org_id', None)
self.creator = options.get('creator', None)
self.owner = options.get('owner', None)
self.data = data
self.db_session = db_session

Expand All @@ -2950,15 +2952,15 @@ def __init__(self, data, db_session, raw_trigger, raw_actions, options, metadata

self.metadata_cache_manager = metadata_cache_manager

self.cache_key = 'AUTOMATION_RULE:%s' % self.rule_id
self.cache_key = 'AUTOMATION_RULE:%s' % uuid_str_to_36_chars(self.dtable_uuid)
self.task_run_success = True

self.done_actions = False
self.load_trigger_and_actions(raw_trigger, raw_actions)

self.current_valid = True

self.per_minute_trigger_limit = per_minute_trigger_limit or 50
self.per_minute_trigger_limit = per_minute_trigger_limit or 1000

self.warnings = []

Expand Down Expand Up @@ -3441,41 +3443,61 @@ def update_last_trigger_time(self):
trigger_count=trigger_count+1,
update_at=:trigger_time
'''

set_statistic_sql_org = '''
INSERT INTO org_auto_rules_statistics (org_id, trigger_date, trigger_count, update_at) VALUES
(:org_id, :trigger_date, 1, :trigger_time)
ON DUPLICATE KEY UPDATE
trigger_count=trigger_count+1,
update_at=:trigger_time
'''
set_statistic_sql_user_per_month = '''
INSERT INTO user_auto_rules_statistics_per_month(username, trigger_count, month, updated_at) VALUES
(:username, 1, :month, :trigger_time)
ON DUPLICATE KEY UPDATE
trigger_count=trigger_count+1,
updated_at=:trigger_time
'''
set_statistic_sql_org_per_month = '''
INSERT INTO org_auto_rules_statistics_per_month(org_id, trigger_count, month, updated_at) VALUES
(:org_id, 1, :month, :trigger_time)
ON DUPLICATE KEY UPDATE
trigger_count=trigger_count+1,
updated_at=:trigger_time
'''
set_last_trigger_time_sql = '''
UPDATE dtable_automation_rules SET last_trigger_time=:trigger_time, trigger_count=:trigger_count WHERE id=:rule_id;
'''

sqls = [set_last_trigger_time_sql]
if self.org_id:
if self.org_id == -1:
sqls.append(set_statistic_sql_user)
if '@seafile_group' not in self.owner:
sqls.append(set_statistic_sql_user)
sqls.append(set_statistic_sql_user_per_month)
else:
sqls.append(set_statistic_sql_org)
sqls.append(set_statistic_sql_org_per_month)

cur_date = datetime.now().date()
cur_year, cur_month = cur_date.year, cur_date.month
trigger_date = date(year=cur_year, month=cur_month, day=1)
sql_data = {
'rule_id': self.rule_id,
'trigger_time': datetime.utcnow(),
'trigger_date': trigger_date,
'trigger_count': self.trigger_count + 1,
'username': self.creator,
'org_id': self.org_id,
'month': str(date.today())[:7]
}
for sql in sqls:
self.db_session.execute(sql, {
'rule_id': self.rule_id,
'trigger_time': datetime.utcnow(),
'trigger_date': trigger_date,
'trigger_count': self.trigger_count + 1,
'username': self.creator,
'org_id': self.org_id
})
self.db_session.execute(sql, sql_data)
self.db_session.commit()
except Exception as e:
logger.exception('set rule: %s error: %s', self.rule_id, e)

auto_rules_stats_updater.add_info(sql_data)

if self.run_condition == PER_UPDATE and self.per_minute_trigger_limit > 0:
trigger_times = redis_cache.get(self.cache_key)
if not trigger_times:
Expand Down
138 changes: 138 additions & 0 deletions dtable_events/automations/auto_rules_stats_updater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import logging
from datetime import date
from queue import Queue
from threading import Thread

from seaserv import ccnet_api

from dtable_events.app.config import ENV_CCNET_CONF_PATH, get_config, DTABLE_WEB_SERVICE_URL
from dtable_events.db import init_db_session_class
from dtable_events.utils.dtable_web_api import DTableWebAPI

logger = logging.getLogger(__name__)


class AutoRulesStatsUpdater:

def init(self, config):
self.config = config
self.queue = Queue()
self.dtable_web_api = DTableWebAPI(DTABLE_WEB_SERVICE_URL)
self.roles = None
self.db_session_class = init_db_session_class(config)

ccnet_config = get_config(ENV_CCNET_CONF_PATH)
if ccnet_config.has_section('Database'):
ccnet_db_name = ccnet_config.get('Database', 'DB', fallback='ccnet')
else:
ccnet_db_name = 'ccnet'
self.ccnet_db_name = ccnet_db_name

def get_roles(self):
if self.roles:
return self.roles
self.roles = self.dtable_web_api.internal_roles()
return self.roles

def get_user_quota(self, db_session, username):
sql = "SELECT username, auto_rules_limit_per_month FROM user_quota WHERE username=:username"
row = db_session.execute(sql, {'username': username}).fetchone()
if row and row.auto_rules_limit_per_month and row.auto_rules_limit_per_month != 0:
return row.auto_rules_limit_per_month
user = ccnet_api.get_emailuser(username)
user_role = user.role
return self.roles.get(user_role, {}).get('automation_rules_limit_per_month', -1)

def get_org_quota(self, db_session, org_id):
sql = "SELECT org_id, auto_rules_limit_per_month FROM organizations_org_quota WHERE org_id=:org_id"
row = db_session.execute(sql, {'org_id': org_id}).fetchone()
if row and row.auto_rules_limit_per_month and row.auto_rules_limit_per_month != 0:
return row.auto_rules_limit_per_month
sql = "SELECT role FROM organizations_orgsettings WHERE org_id=:org_id"
row = db_session.execute(sql, {'org_id': org_id}).fetchone()
if not row:
org_role = 'org_default' # check from dtable-web/seahub/role_permissions/settings DEFAULT_ENABLED_ROLE_PERMISSIONS[ORG_DEFAULT]
else:
org_role = row.role
return self.get_roles().get(org_role, {}).get('automation_rules_limit_per_month', -1)

def get_user_usage(self, db_session, username):
"""
:return: trigger_count -> int, has_sent_warning -> bool
"""
sql = "SELECT trigger_count, has_sent_warning FROM user_auto_rules_statistics_per_month WHERE username=:username AND month=:month"
row = db_session.execute(sql, {'username': username, 'month': str(date.today())[:7]}).fetchone()
if not row:
return 0, False
return row.trigger_count, row.has_sent_warning

def get_org_usage(self, db_session, org_id):
"""
:return: trigger_count -> int, has_sent_warning -> bool
"""
sql = "SELECT trigger_count, has_sent_warning FROM org_auto_rules_statistics_per_month WHERE org_id=:org_id AND month=:month"
row = db_session.execute(sql, {'org_id': org_id, 'month': str(date.today())[:7]}).fetchone()
if not row:
return 0, False
return row.trigger_count, row.has_sent_warning

def update_user(self, db_session, username):
limit = self.get_user_quota(db_session, username)
if limit < 0:
return
usage, has_sent_warning = self.get_user_usage(db_session, username)
if usage >= limit:
sql = "UPDATE user_auto_rules_statistics_per_month SET is_exceed=1 WHERE username=:username"
db_session.execute(sql, {'username': username})
db_session.commit()
return
if not has_sent_warning and usage >= limit * 0.9:
self.dtable_web_api.internal_add_notification([username], 'autorule_trigger_reach_warning', {'limit': limit, 'usage': usage})
sql = "UPDATE user_auto_rules_statistics_per_month SET has_sent_warning=1 WHERE username=:username"
db_session.execute(sql, {'username': username})
db_session.commit()

def update_org(self, db_session, org_id):
limit = self.get_org_quota(db_session, org_id)
if limit < 0:
return
usage, has_sent_warning = self.get_org_usage(db_session, org_id)
if usage >= limit:
sql = "UPDATE org_auto_rules_statistics_per_month SET is_exceed=1 WHERE org_id=:org_id"
db_session.execute(sql, {'org_id': org_id})
db_session.commit()
return
if not has_sent_warning and usage >= limit * 0.9:
admins = []
sql = "SELECT email FROM %s.OrgUser WHERE org_id=:org_id AND is_staff=1" % self.ccnet_db_name
for row in db_session.execute(sql, {'org_id': org_id}):
admins.append(row.email)
self.dtable_web_api.internal_add_notification(admins, 'autorule_trigger_reach_warning', {'limit': limit, 'usage': usage})
sql = "UPDATE org_auto_rules_statistics_per_month SET has_sent_warning=1 WHERE org_id=:org_id"
db_session.execute(sql, {'org_id': org_id})
db_session.commit()

def update_stats(self):
while True:
auto_rule_info = self.queue.get()
username = auto_rule_info.get('username')
org_id = auto_rule_info.get('org_id')
db_session = self.db_session_class()
try:
if org_id == -1 and username:
self.update_user(db_session, username)
elif org_id != -1:
self.update_org(db_session, org_id)
except Exception as e:
logger.exception('update stats info: %s error: %s', auto_rule_info, e)
finally:
db_session.close()

def start(self):
Thread(target=self.update_stats, daemon=True, name='Thread-auto-rules-stats-updater').start()

def add_info(self, info):
self.queue.put(info)


auto_rules_stats_updater = AutoRulesStatsUpdater()
Loading
Loading