Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor import/sync drop dtable-server view-rows #429

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
581 changes: 331 additions & 250 deletions dtable_events/common_dataset/common_dataset_sync_utils.py

Large diffs are not rendered by default.

178 changes: 57 additions & 121 deletions dtable_events/common_dataset/common_dataset_syncer.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
import logging
import time
from datetime import datetime, timedelta
from threading import Thread

import jwt
import requests
from apscheduler.schedulers.blocking import BlockingScheduler

from dtable_events import init_db_session_class
from dtable_events.app.config import DTABLE_PRIVATE_KEY
from dtable_events.common_dataset.common_dataset_sync_utils import import_or_sync, set_common_dataset_invalid, set_common_dataset_sync_invalid
from dtable_events.common_dataset.common_dataset_sync_utils import import_sync_CDS, set_common_dataset_invalid, set_common_dataset_sync_invalid
from dtable_events.utils import get_opt_from_conf_or_env, parse_bool, uuid_str_to_36_chars, get_inner_dtable_server_url

logger = logging.getLogger(__name__)
from dtable_events.utils.dtable_server_api import DTableServerAPI

class CommonDatasetSyncer(object):

Expand Down Expand Up @@ -42,110 +37,62 @@ def is_enabled(self):
return self._enabled


def get_dtable_server_header(dtable_uuid):
try:
access_token = jwt.encode({
'dtable_uuid': dtable_uuid,
'username': 'dtable-events',
'permission': 'rw',
'exp': int(time.time()) + 60
},
DTABLE_PRIVATE_KEY,
algorithm='HS256'
)
except Exception as e:
logger.error(e)
return
return {'Authorization': 'Token ' + access_token}


def gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_id, dst_table_id, dataset_sync_id, dataset_id, db_session):
"""
return assets -> dict
"""
dst_headers = get_dtable_server_header(dst_dtable_uuid)
src_headers = get_dtable_server_header(src_dtable_uuid)

# request src_dtable
dtable_server_url = get_inner_dtable_server_url()
url = dtable_server_url.strip('/') + '/dtables/' + src_dtable_uuid + '?from=dtable_events'

src_dtable_server_api = DTableServerAPI('dtable-events', src_dtable_uuid, dtable_server_url)
dst_dtable_server_api = DTableServerAPI('dtable-events', dst_dtable_uuid, dtable_server_url)
try:
resp = requests.get(url, headers=src_headers)
src_dtable_json = resp.json()
src_dtable_metadata = src_dtable_server_api.get_metadata()
dst_dtable_metadata = dst_dtable_server_api.get_metadata()
except Exception as e:
logger.error('request src dtable: %s error: %s', src_dtable_uuid, e)
return
logging.error('request src dst dtable: %s, %s metadata error: %s', src_dtable_uuid, dst_dtable_uuid, e)
return None

# check src_table src_view
src_table = None
for table in src_dtable_json.get('tables', []):
if table.get('_id') == src_table_id:
src_table, src_view = None, None
for table in src_dtable_metadata.get('tables', []):
if table['_id'] == src_table_id:
src_table = table
break
if not src_table:
set_common_dataset_invalid(dataset_id, db_session)
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.error('Source table not found.')
return

src_view = None
if src_view_id:
for view in src_table.get('views', []):
if view.get('_id') == src_view_id:
src_view = view
break
if not src_view:
set_common_dataset_invalid(dataset_id, db_session)
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.error('Source view not found.')
return
else:
views = src_table.get('views', [])
if not views or not isinstance(views, list):
set_common_dataset_invalid(dataset_id, db_session)
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.error('No views found.')
return
src_view = views[0]
logging.warning('Source table not found.')
return None
for view in src_table.get('views', []):
if view['_id'] == src_view_id:
src_view = view
break
if not src_view:
set_common_dataset_invalid(dataset_id, db_session)
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.warning('Source view not found.')
return None

# get src columns
src_view_hidden_columns = src_view.get('hidden_columns', [])
if not src_view_hidden_columns:
src_columns = src_table.get('columns', [])
else:
src_columns = [col for col in src_table.get('columns', []) if col.get('key') not in src_view_hidden_columns]
src_columns = [col for col in src_table.get('columns', []) if col['key'] not in src_view.get('hidden_columns', [])]

# request dst_dtable
url = dtable_server_url.strip('/') + '/dtables/' + dst_dtable_uuid + '?from=dtable_events'
try:
resp = requests.get(url, headers=dst_headers)
dst_dtable_json = resp.json()
except Exception as e:
logging.error('request dst dtable: %s error: %s', dst_dtable_uuid, e)
return
src_version = src_dtable_metadata.get('version')

# check dst_table
dst_table = None
for table in dst_dtable_json.get('tables', []):
if table.get('_id') == dst_table_id:
dst_table = table
break
if not dst_table:
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.warning('Destination table: %s not found.' % dst_table_id)
return
if dst_table_id:
for table in dst_dtable_metadata.get('tables', []):
if table['_id'] == dst_table_id:
dst_table = table
break
if not dst_table:
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.warning('Destination table not found.')
return None

return {
'dst_headers': dst_headers,
'src_headers': src_headers,
'src_table': src_table,
'src_view': src_view,
'src_table_name': src_table['name'],
'src_view_name': src_view['name'],
'src_columns': src_columns,
'dst_columns': dst_table.get('columns'),
'dst_rows': dst_table.get('rows'),
'dst_table_name': dst_table.get('name'),
'src_version': src_dtable_json.get('version')
'src_version': src_version,
'dst_table_name': dst_table['name'] if dst_table else None,
'dst_columns': dst_table['columns'] if dst_table else None
}


Expand Down Expand Up @@ -189,7 +136,7 @@ def update_sync_time_and_version(db_session, update_map):


def check_common_dataset(db_session):
dataset_sync_list = list_pending_common_dataset_syncs(db_session)
dataset_sync_list = list(list_pending_common_dataset_syncs(db_session))
sync_count = 0
dataset_update_map = {}
for dataset_sync in dataset_sync_list:
Expand All @@ -207,61 +154,50 @@ def check_common_dataset(db_session):
if not assets:
continue

dst_headers = assets.get('dst_headers')
src_table = assets.get('src_table')
src_view = assets.get('src_view')
src_columns = assets.get('src_columns')
src_headers = assets.get('src_headers')
dst_columns = assets.get('dst_columns')
dst_rows = assets.get('dst_rows')
dst_table_name = assets.get('dst_table_name')
dtable_src_version = assets.get('src_version')

if dtable_src_version == last_src_version:
if assets.get('src_version') == last_src_version:
continue

try:
result = import_or_sync({
'dst_dtable_uuid': dst_dtable_uuid,
result = import_sync_CDS({
'src_dtable_uuid': src_dtable_uuid,
'src_rows': src_table.get('rows', []),
'src_columns': src_columns,
'src_table_name': src_table.get('name'),
'src_view_name': src_view.get('name'),
'src_headers': src_headers,
'dst_dtable_uuid': dst_dtable_uuid,
'src_table_name': assets.get('src_table_name'),
'src_view_name': assets.get('src_view_name'),
'src_columns': assets.get('src_columns'),
'src_version': assets.get('src_version'),
'dst_table_id': dst_table_id,
'dst_table_name': dst_table_name,
'dst_headers': dst_headers,
'dst_columns': dst_columns,
'dst_rows': dst_rows,
'lang': 'en' # TODO: lang
'dst_table_name': assets.get('dst_table_name'),
'dst_columns': assets.get('dst_columns'),
'operator': 'dtable-events',
'lang': 'en', # TODO: lang
'dataset_id': dataset_id
})
except Exception as e:
logger.error('sync common dataset error: %s', e)
logging.error('sync common dataset error: %s', e)
continue
else:
if result.get('error_msg'):
logger.error(result['error_msg'])
logging.error(result['error_msg'])
if result.get('error_type') == 'generate_synced_columns_error':
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
continue

dataset_update_map[dataset_sync_id] = dtable_src_version
dataset_update_map[dataset_sync_id] = assets.get('src_version')
sync_count += 1

if sync_count == 1000:
try:
update_sync_time_and_version(db_session, dataset_update_map)
except Exception as e:
logger.error(f'update sync time and src_version failed, error: {e}')
logging.error(f'update sync time and src_version failed, error: {e}')
dataset_update_map = {}
sync_count = 0

if dataset_update_map:
try:
update_sync_time_and_version(db_session, dataset_update_map)
except Exception as e:
logger.error(f'update sync time and src_version failed, error: {e}')
logging.error(f'update sync time and src_version failed, error: {e}')


class CommonDatasetSyncerTimer(Thread):
Expand All @@ -279,7 +215,7 @@ def timed_job():
try:
check_common_dataset(db_session)
except Exception as e:
logger.exception('check periodcal common dataset syncs error: %s', e)
logging.exception('check periodcal common dataset syncs error: %s', e)
finally:
db_session.close()

Expand Down
Loading