Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task to clean database using cron schedule #670

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dtable_events/app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from dtable_events.webhook.webhook import Webhooker
from dtable_events.common_dataset.common_dataset_syncer import CommonDatasetSyncer
from dtable_events.tasks.big_data_storage_stats_worker import BigDataStorageStatsWorker
from dtable_events.tasks.clean_db_records_worker import CleanDBRecordsWorker
from dtable_events.data_sync.data_syncer import DataSyncer
from dtable_events.workflow.workflow_actions import WorkflowActionsHandler
from dtable_events.workflow.workflow_schedules_scanner import WorkflowSchedulesScanner
Expand Down Expand Up @@ -55,6 +56,7 @@ def __init__(self, config, task_mode):
self._ldap_syncer = LDAPSyncer(config)
self._common_dataset_syncer = CommonDatasetSyncer(self, config)
self._big_data_storage_stats_worker = BigDataStorageStatsWorker(config)
self._clean_db_records_worker = CleanDBRecordsWorker(config)
self._data_syncr = DataSyncer(config)
self._workflow_schedule_scanner = WorkflowSchedulesScanner(config)
self._dtable_asset_trash_cleaner = DTableAssetTrashCleaner(config)
Expand Down Expand Up @@ -87,6 +89,7 @@ def serve_forever(self):
self._ldap_syncer.start() # default False
self._common_dataset_syncer.start() # default True
self._big_data_storage_stats_worker.start() # always True
self._clean_db_records_worker.start()
self._data_syncr.start() # default True
self._workflow_schedule_scanner.start() # default True
self._dtable_asset_trash_cleaner.start() # always True
Expand Down
241 changes: 241 additions & 0 deletions dtable_events/tasks/clean_db_records_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
# -*- coding: utf-8 -*-

import logging
from dataclasses import dataclass
from threading import Thread

from apscheduler.schedulers.blocking import BlockingScheduler
from sqlalchemy import text

from dtable_events.db import init_db_session_class

__all__ = [
'CleanDBRecordsWorker',
]


class CleanDBRecordsWorker(object):
def __init__(self, config):
self._db_session_class = init_db_session_class(config)

try:
self._parse_config(config)
except:
logging.exception('Could not parse config, using default retention config instead')
# Use default configuration
self._retention_config = RetentionConfig()

def _parse_config(self, config):
section_name = 'CLEAN DB'

# Read retention times from config file
dtable_snapshot = config.getint(section_name, 'keep_dtable_snapshot_days', fallback=365)
activities = config.getint(section_name, 'keep_activities_days', fallback=30)
operation_log = config.getint(section_name, 'keep_operation_log_days', fallback=14)
delete_operation_log = config.getint(section_name, 'keep_delete_operation_log_days', fallback=30)
notifications_usernotification = config.getint(section_name, 'keep_notifications_usernotification_days', fallback=30)
dtable_notifications = config.getint(section_name, 'keep_dtable_notifications_days', fallback=30)
session_log = config.getint(section_name, 'keep_session_log_days', fallback=30)
auto_rules_task_log = config.getint(section_name, 'keep_auto_rules_task_log_days', fallback=30)
# Disabled by default
user_activity_statistics = config.getint(section_name, 'keep_user_activity_statistics_days', fallback=0)

self._retention_config = RetentionConfig(
dtable_snapshot=dtable_snapshot,
activities=activities,
operation_log=operation_log,
delete_operation_log=delete_operation_log,
notifications_usernotification=notifications_usernotification,
dtable_notifications=dtable_notifications,
session_log=session_log,
auto_rules_task_log=auto_rules_task_log,
user_activity_statistics=user_activity_statistics,
)

def start(self):
logging.info('Start clean db records worker')
logging.info('Using the following retention config: %s', self._retention_config)

CleanDBRecordsTask(self._db_session_class, self._retention_config).start()


@dataclass
class RetentionConfig:
"""All retention times are in days"""
dtable_snapshot: int = 365
activities: int = 30
operation_log: int = 14
delete_operation_log: int = 30
notifications_usernotification: int = 30
dtable_notifications: int = 30
session_log: int = 30
auto_rules_task_log: int = 30
# Disabled by default
user_activity_statistics: int = 0


class CleanDBRecordsTask(Thread):
def __init__(self, db_session_class, retention_config: RetentionConfig):
super(CleanDBRecordsTask, self).__init__()
self.db_session_class = db_session_class
self.retention_config = retention_config

def run(self):
schedule = BlockingScheduler()

@schedule.scheduled_job('cron', day_of_week='*', hour='0', minute='30', misfire_grace_time=600)
def timed_job():
logging.info('Start cleaning database...')

session = self.db_session_class()

try:
clean_snapshots(session, self.retention_config.dtable_snapshot)
clean_activities(session, self.retention_config.activities)
clean_operation_log(session, self.retention_config.operation_log)
clean_delete_operation_log(session, self.retention_config.delete_operation_log)
clean_notifications(session, self.retention_config.dtable_notifications)
clean_user_notifications(session, self.retention_config.notifications_usernotification)
clean_sessions(session, self.retention_config.session_log)
clean_django_sessions(session)
clean_auto_rules_task_log(session, self.retention_config.auto_rules_task_log)
clean_user_activity_statistics(session, self.retention_config.user_activity_statistics)
except:
logging.exception('Could not clean database')
finally:
session.close()

schedule.start()


def clean_snapshots(session, keep_days: int):
if keep_days <= 0:
logging.info('Skipping "dtable_snapshot" since retention time is set to %d', keep_days)
return

logging.info('Cleaning "dtable_snapshot" table (older than %d days)', keep_days)

sql = 'DELETE FROM `dtable_snapshot` WHERE `ctime` < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL :days DAY))*1000'
result = session.execute(text(sql), {'days': keep_days})
session.commit()

logging.info('Removed %d entries from "dtable_snapshot"', result.rowcount)


def clean_activities(session, keep_days: int):
if keep_days <= 0:
logging.info('Skipping "activities" since retention time is set to %d', keep_days)
return

logging.info('Cleaning "activities" table (older than %d days)', keep_days)

sql = 'DELETE FROM `activities` WHERE `op_time` < DATE_SUB(NOW(), INTERVAL :days DAY)'
result = session.execute(text(sql), {'days': keep_days})
session.commit()

logging.info('Removed %d entries from "activities"', result.rowcount)


def clean_operation_log(session, keep_days: int):
if keep_days <= 0:
logging.info('Skipping "operation_log" since retention time is set to %d', keep_days)
return

logging.info('Cleaning "operation_log" table (older than %d days)', keep_days)

sql = 'DELETE FROM `operation_log` WHERE `op_time` < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL :days DAY))*1000'
result = session.execute(text(sql), {'days': keep_days})
session.commit()

logging.info('Removed %d entries from "operation_log"', result.rowcount)


def clean_delete_operation_log(session, keep_days: int):
if keep_days <= 0:
logging.info('Skipping "delete_operation_log" since retention time is set to %d', keep_days)
return

logging.info('Cleaning "delete_operation_log" table (older than %d days)', keep_days)

sql = 'DELETE FROM `delete_operation_log` WHERE `op_time` < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL :days DAY))*1000'
result = session.execute(text(sql), {'days': keep_days})
session.commit()

logging.info('Removed %d entries from "delete_operation_log"', result.rowcount)


def clean_user_notifications(session, keep_days: int):
if keep_days <= 0:
logging.info('Skipping "notifications_usernotification" since retention time is set to %d', keep_days)
return

logging.info('Cleaning "notifications_usernotification" table (older than %d days)', keep_days)

sql = 'DELETE FROM `notifications_usernotification` WHERE `timestamp` < DATE_SUB(NOW(), INTERVAL :days DAY)'
result = session.execute(text(sql), {'days': keep_days})
session.commit()

logging.info('Removed %d entries from "notifications_usernotification"', result.rowcount)


def clean_notifications(session, keep_days: int):
if keep_days <= 0:
logging.info('Skipping "dtable_notifications" since retention time is set to %d', keep_days)
return

logging.info('Cleaning "dtable_notifications" table (older than %d days)', keep_days)

sql = 'DELETE FROM `dtable_notifications` WHERE `created_at` < DATE_SUB(NOW(), INTERVAL :days DAY)'
result = session.execute(text(sql), {'days': keep_days})
session.commit()

logging.info('Removed %d entries from "dtable_notifications"', result.rowcount)


def clean_sessions(session, keep_days: int):
if keep_days <= 0:
logging.info('Skipping "session_log" since retention time is set to %d', keep_days)
return

logging.info('Cleaning "session_log" table (older than %d days)', keep_days)

sql = 'DELETE FROM `session_log` WHERE `op_time` < DATE_SUB(NOW(), INTERVAL :days DAY)'
result = session.execute(text(sql), {'days': keep_days})
session.commit()

logging.info('Removed %d entries from "session_log"', result.rowcount)

def clean_django_sessions(session):
logging.info('Cleaning expired entries from "django_session" table')

sql = 'DELETE FROM `django_session` WHERE `expire_date` < CURRENT_TIMESTAMP()'
result = session.execute(text(sql))
session.commit()

logging.info('Removed %d entries from "django_session"', result.rowcount)

def clean_auto_rules_task_log(session, keep_days: int):
if keep_days <= 0:
logging.info('Skipping "auto_rules_task_log" since retention time is set to %d', keep_days)
return

logging.info('Cleaning "auto_rules_task_log" table (older than %d days)', keep_days)

sql = 'DELETE FROM `auto_rules_task_log` WHERE `trigger_time` < DATE_SUB(NOW(), INTERVAL :days DAY)'
result = session.execute(text(sql), {'days': keep_days})
session.commit()

logging.info('Removed %d entries from "auto_rules_task_log"', result.rowcount)

def clean_user_activity_statistics(session, keep_days: int):
if keep_days <= 0:
logging.info('Skipping "user_activity_statistics" since retention time is set to %d', keep_days)
return

logging.info('Cleaning "user_activity_statistics" table (older than %d days)', keep_days)

sql = 'DELETE FROM `user_activity_statistics` WHERE `timestamp` < DATE_SUB(NOW(), INTERVAL :days DAY)'
result = session.execute(text(sql), {'days': keep_days})
session.commit()

logging.info('Removed %d entries from "user_activity_statistics"', result.rowcount)
Loading