diff --git a/dtable_events/common_dataset/common_dataset_sync_utils.py b/dtable_events/common_dataset/common_dataset_sync_utils.py index 405c5434..8c804b41 100644 --- a/dtable_events/common_dataset/common_dataset_sync_utils.py +++ b/dtable_events/common_dataset/common_dataset_sync_utils.py @@ -1,14 +1,17 @@ # -*- coding: utf-8 -*- import logging -import os import re -import sys from copy import deepcopy -import requests from dateutil import parser -from dtable_events.utils.constants import ColumnTypes + +from dtable_events.utils.sql_generator import BaseSQLGenerator +from dtable_events.app.config import INNER_DTABLE_DB_URL +from dtable_events.common_dataset.dtable_db_cell_validators import validate_table_db_cell_value from dtable_events.utils import get_inner_dtable_server_url +from dtable_events.utils.constants import ColumnTypes +from dtable_events.utils.dtable_server_api import DTableServerAPI +from dtable_events.utils.dtable_db_api import DTableDBAPI logger = logging.getLogger(__name__) @@ -16,6 +19,8 @@ SRC_ROWS_LIMIT = 50000 +INSERT_UPDATE_ROWS_LIMIT = 1000 +DELETE_ROWS_LIMIT = 10000 DATA_NEED_KEY_VALUES = { @@ -233,14 +238,13 @@ def generate_synced_columns(src_columns, dst_columns=None): return to_be_updated_columns, to_be_appended_columns, None -def generate_synced_rows(converted_rows, src_rows, src_columns, synced_columns, dst_rows=None): +def generate_synced_rows(converted_rows, src_columns, synced_columns, dst_rows=None, to_archive=False): """ generate synced rows divided into `rows to be updated`, `rows to be appended` and `rows to be deleted` return: to_be_updated_rows, to_be_appended_rows, to_be_deleted_row_ids """ converted_rows_dict = {row.get('_id'): row for row in converted_rows} - src_rows_dict = {row.get('_id'): row for row in src_rows} synced_columns_dict = {col.get('key'): col for col in synced_columns} to_be_updated_rows, to_be_appended_rows, transfered_row_ids = [], [], {} @@ -249,13 +253,12 @@ def generate_synced_rows(converted_rows, src_rows, src_columns, synced_columns, to_be_deleted_row_ids = [] for row in dst_rows: row_id = row.get('_id') - src_row = src_rows_dict.get(row_id) converted_row = converted_rows_dict.get(row_id) - if not converted_row or not src_row: + if not converted_row: to_be_deleted_row_ids.append(row_id) continue - update_row = generate_single_row(converted_row, src_row, src_columns, synced_columns_dict, dst_row=row) + update_row = generate_single_row(converted_row, src_columns, synced_columns_dict, dst_row=row, to_archive=to_archive) if update_row: update_row['_id'] = row_id to_be_updated_rows.append(update_row) @@ -263,10 +266,9 @@ def generate_synced_rows(converted_rows, src_rows, src_columns, synced_columns, for converted_row in converted_rows: row_id = converted_row.get('_id') - src_row = src_rows_dict.get(row_id) - if not src_row or transfered_row_ids.get(row_id): + if transfered_row_ids.get(row_id): continue - append_row = generate_single_row(converted_row, src_row, src_columns, synced_columns_dict, dst_row=None) + append_row = generate_single_row(converted_row, src_columns, synced_columns_dict, dst_row=None, to_archive=to_archive) if append_row: append_row['_id'] = row_id to_be_appended_rows.append(append_row) @@ -339,8 +341,7 @@ def get_link_formula_converted_cell_value(transfered_column, converted_cell_valu return value.strftime('%Y-%m-%d') -def get_converted_cell_value(converted_cell_value, src_row, transfered_column, col): - col_key = col.get('key') +def get_converted_cell_value(converted_cell_value, transfered_column, col): col_type = col.get('type') if col_type in [ ColumnTypes.TEXT, @@ -362,7 +363,7 @@ def get_converted_cell_value(converted_cell_value, src_row, transfered_column, c ColumnTypes.URL, ColumnTypes.GEOLOCATION ]: - return deepcopy(src_row.get(col_key)) + return deepcopy(converted_cell_value) elif col_type == ColumnTypes.SINGLE_SELECT: if not isinstance(converted_cell_value, str): @@ -445,7 +446,7 @@ def get_converted_cell_value(converted_cell_value, src_row, transfered_column, c elif result_type == 'string': if converted_cell_value: return str(converted_cell_value) - return src_row.get(col_key) + return deepcopy(converted_cell_value) def is_equal(v1, v2, column_type): @@ -512,19 +513,22 @@ def is_equal(v1, v2, column_type): return False -def generate_single_row(converted_row, src_row, src_columns, transfered_columns_dict, dst_row=None): +def generate_single_row(converted_row, src_columns, transfered_columns_dict, dst_row=None, to_archive=False): """ generate new single row according to src column type - :param converted_row: {'_id': '', 'column_name_1': '', 'col_name_2'; ''} from dtable-db + :param converted_row: {'_id': '', 'column_key_1': '', 'col_key_2'; ''} from dtable-db :param src_columns: [{'key': 'column_key_1', 'name': 'column_name_1'}] :param transfered_columns_dict: {'col_key_1': {'key': 'column_key_1', 'name': 'column_name_1'}} :param dst_row: {'_id': '', 'column_key_1': '', 'col_key_2': ''} + :param to_archive: is row for dtable-db + + :return: dataset_row => if to_archive is True {col_name1: value1,...} else {col_key1: value1,...} """ dataset_row = {} op_type = 'update' if not dst_row: op_type = 'append' - dst_row = deepcopy(dst_row) if dst_row else {'_id': src_row.get('_id')} + dst_row = deepcopy(dst_row) if dst_row else {'_id': converted_row.get('_id')} for col in src_columns: col_key = col.get('key') @@ -533,298 +537,376 @@ def generate_single_row(converted_row, src_row, src_columns, transfered_columns_ if not transfered_column: continue + if to_archive and col['key'] in ['_creator', '_ctime', '_last_modifier', '_mtime']: + continue + if op_type == 'update': - converted_cell_value = get_converted_cell_value(converted_cell_value, src_row, transfered_column, col) + converted_cell_value = get_converted_cell_value(converted_cell_value, transfered_column, col) if not is_equal(dst_row.get(col_key), converted_cell_value, transfered_column['type']): - dataset_row[col_key] = converted_cell_value + if not to_archive: + dataset_row[col_key] = converted_cell_value + else: + dataset_row[col['name']] = validate_table_db_cell_value(transfered_column, converted_cell_value) else: - dataset_row[col_key] = get_converted_cell_value(converted_cell_value, src_row, transfered_column, col) + converted_cell_value = get_converted_cell_value(converted_cell_value, transfered_column, col) + if not to_archive: + dataset_row[col_key] = converted_cell_value + else: + dataset_row[col['name']] = validate_table_db_cell_value(transfered_column, converted_cell_value) return dataset_row -def import_or_sync(import_sync_context): - """ - import or sync common dataset - return: { - dst_table_id: destination table id, - error_msg: error msg, - task_status_code: return frontend status code, 40x 50x... - } - """ - # extract necessary assets - dst_dtable_uuid = import_sync_context.get('dst_dtable_uuid') - src_dtable_uuid = import_sync_context.get('src_dtable_uuid') - - src_rows = import_sync_context.get('src_rows') - src_columns = import_sync_context.get('src_columns') - src_column_keys_set = {col['key'] for col in src_columns} - src_table_name = import_sync_context.get('src_table_name') - src_view_name = import_sync_context.get('src_view_name') - src_headers = import_sync_context.get('src_headers') - - dst_table_id = import_sync_context.get('dst_table_id') - dst_table_name = import_sync_context.get('dst_table_name') - dst_headers = import_sync_context.get('dst_headers') - dst_columns = import_sync_context.get('dst_columns') - dst_rows = import_sync_context.get('dst_rows') - - lang = import_sync_context.get('lang', 'en') - - # generate cols and rows - ## Old generate-cols is from src_columns from dtable json, but some (link-)formula columns have wrong array_type - ## For example, a LOOKUP(GEOLOCATION) link-formula column, whose array_type in dtable json is `string`, but should being `GEOLOCATION` - ## So, generate columns from the columns(archive columns) returned by SQL query instead of from the columns in dtable json, and remove old code - ## New code and more details is in the following code - - ## generate rows - ### get src view-rows - result_rows = [] - start, limit = 0, 10000 - to_be_updated_columns, to_be_appended_columns = [], [] - - while True: - url = dtable_server_url.rstrip('/') + '/api/v1/internal/dtables/' + str(src_dtable_uuid) + '/view-rows/?from=dtable_events' - if (start + limit) > SRC_ROWS_LIMIT: - limit = SRC_ROWS_LIMIT - start - query_params = { - 'table_name': src_table_name, - 'view_name': src_view_name, - 'use_dtable_db': True, - 'start': start, - 'limit': limit - } +def create_dst_table_or_update_columns(dst_dtable_uuid, dst_table_id, dst_table_name, to_be_appended_columns, to_be_updated_columns, dst_dtable_server_api, lang): + if not dst_table_id: ## create table + columns = [{ + 'column_key': col.get('key'), + 'column_name': col.get('name'), + 'column_type': col.get('type'), + 'column_data': col.get('data') + } for col in to_be_appended_columns] if to_be_appended_columns else [] try: - resp = requests.get(url, headers=src_headers, params=query_params, timeout=180) - if resp.status_code == 400: - try: - res_json = resp.json() - except: - return { - 'dst_table_id': None, - 'error_msg': 'fetch src view rows error', - 'task_status_code': 500 - } - else: - return { - 'dst_table_id': None, - 'error_msg': 'fetch src view rows error', - 'error_type': res_json.get('error_type'), - 'task_status_code': 400 - } - res_json = resp.json() - archive_rows = res_json.get('rows', []) - archive_metadata = res_json.get('metadata') + resp_json = dst_dtable_server_api.add_table(dst_table_name, lang, columns=columns) + dst_table_id = resp_json.get('_id') except Exception as e: - logger.error('request src_dtable: %s params: %s view-rows error: %s', src_dtable_uuid, query_params, e) - return { + logger.error(e) # TODO: table exists shoud return 400 + return None, { 'dst_table_id': None, - 'error_msg': 'fetch view rows error', + 'error_msg': 'create table error', 'task_status_code': 500 } - if start == 0: - ## generate columns from the columns(archive_metadata) returned from SQL query - sync_columns = [col for col in archive_metadata if col['key'] in src_column_keys_set] - to_be_updated_columns, to_be_appended_columns, error = generate_synced_columns(sync_columns, dst_columns=dst_columns) - if error: - return { - 'dst_table_id': None, - 'error_type': 'generate_synced_columns_error', - 'error_msg': str(error), # generally, this error is caused by client - 'task_status_code': 400 - } - result_rows.extend(archive_rows) - if not archive_rows or len(archive_rows) < limit or (start + limit) >= SRC_ROWS_LIMIT: - break - start += limit - - final_columns = (to_be_updated_columns or []) + (to_be_appended_columns or []) - - to_be_updated_rows, to_be_appended_rows, to_be_deleted_row_ids = generate_synced_rows(result_rows, src_rows, src_columns, final_columns, dst_rows=dst_rows) - - # sync table - ## maybe create table - if not dst_table_id: - url = dtable_server_url.strip('/') + '/api/v1/dtables/%s/tables/?from=dtable_events' % (str(dst_dtable_uuid),) - data = { - 'table_name': dst_table_name, - 'lang': lang, - 'columns': [{ + else: ## append/update columns + ### batch append columns + if to_be_appended_columns: + columns = [{ 'column_key': col.get('key'), 'column_name': col.get('name'), 'column_type': col.get('type'), 'column_data': col.get('data') - } for col in to_be_appended_columns] if to_be_appended_columns else [] - } - try: - resp = requests.post(url, headers=dst_headers, json=data, timeout=180) - if resp.status_code != 200: - logger.error('create new table error status code: %s, resp text: %s', resp.status_code, resp.text) - error_msg = 'create table error' - status_code = 500 - try: - resp_json = resp.json() - if resp_json.get('error_message'): - error_msg = resp_json['error_message'] - status_code = resp.status_code - except: - pass - return { - 'dst_table_id': None, - 'error_msg': error_msg, - 'task_status_code': status_code - } - dst_table_id = resp.json().get('_id') - except Exception as e: - logger.error(e) - return { - 'dst_table_id': None, - 'error_msg': 'create table error', - 'task_status_code': 500 - } - ## or maybe append/update columns - else: - ### batch append columns - if to_be_appended_columns: - url = dtable_server_url.strip('/') + '/api/v1/dtables/' + str(dst_dtable_uuid) + '/batch-append-columns/?from=dtable_events' - data = { - 'table_id': dst_table_id, - 'columns': [{ - 'column_key': col.get('key'), - 'column_name': col.get('name'), - 'column_type': col.get('type'), - 'column_data': col.get('data') - } for col in to_be_appended_columns] - } + } for col in to_be_appended_columns] try: - resp = requests.post(url, headers=dst_headers, json=data, timeout=180) - if resp.status_code != 200: - logger.error('batch append columns to dst dtable: %s, table: %s error status code: %s text: %s', dst_dtable_uuid, dst_table_id, resp.status_code, resp.text) - return { - 'dst_table_id': None, - 'error_msg': 'append columns error', - 'task_status_code': 500 - } + dst_dtable_server_api.batch_append_columns_by_table_id(dst_table_id, columns) except Exception as e: logger.error('batch append columns to dst dtable: %s, table: %s error: %s', dst_dtable_uuid, dst_table_id, e) - return { + return None, { 'dst_table_id': None, 'error_msg': 'append columns error', 'task_status_code': 500 } ### batch update columns if to_be_updated_columns: - url = dtable_server_url.strip('/') + '/api/v1/dtables/' + str(dst_dtable_uuid) + '/batch-update-columns/?from=dtable_events' - data = { - 'table_id': dst_table_id, - 'columns': [{ - 'key': col.get('key'), - 'type': col.get('type'), - 'data': col.get('data') - } for col in to_be_updated_columns] - } + columns = [{ + 'key': col.get('key'), + 'type': col.get('type'), + 'data': col.get('data') + } for col in to_be_updated_columns] try: - resp = requests.put(url, headers=dst_headers, json=data, timeout=180) - if resp.status_code != 200: - logger.error('batch update columns to dst dtable: %s, table: %s error status code: %s text: %s', dst_dtable_uuid, dst_table_id, resp.status_code, resp.text) - return { - 'dst_table_id': None, - 'error_msg': 'update columns error', - 'task_status_code': 500 - } + dst_dtable_server_api.batch_update_columns_by_table_id(dst_table_id, columns) except Exception as e: logger.error('batch update columns to dst dtable: %s, table: %s error: %s', dst_dtable_uuid, dst_table_id, e) - return { + return None, { 'dst_table_id': None, 'error_msg': 'update columns error', 'task_status_code': 500 } + return dst_table_id, None + + +def append_dst_rows(dst_dtable_uuid, dst_table_name, to_be_appended_rows, dst_dtable_db_api, dst_dtable_server_api, to_archive): + if to_archive: + step = 10000 + for i in range(0, len(to_be_appended_rows), step): + try: + dst_dtable_db_api.insert_rows(dst_table_name, to_be_appended_rows[i: i+step]) + except Exception as e: + logger.error('sync dataset append rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e) + return { + 'dst_table_id': None, + 'error_msg': 'append rows error', + 'task_status_code': 500 + } + else: + step = INSERT_UPDATE_ROWS_LIMIT + for i in range(0, len(to_be_appended_rows), step): + try: + dst_dtable_server_api.batch_append_rows(dst_table_name, to_be_appended_rows[i: i+step], need_convert_back=False) + except Exception as e: + logger.error('sync dataset append rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e) + return { + 'dst_table_id': None, + 'error_msg': 'append rows error', + 'task_status_code': 500 + } + - ## update delete append rows step by step - step = 1000 - ### update rows - url = dtable_server_url.strip('/') + '/api/v1/dtables/%s/batch-update-rows/?from=dtable_events' % (str(dst_dtable_uuid),) - for i in range(0, len(to_be_updated_rows), step): - updates = [] - for row in to_be_updated_rows[i: i+step]: - updates.append({ +def update_dst_rows(dst_dtable_uuid, dst_table_name, to_be_updated_rows, dst_dtable_db_api, dst_dtable_server_api, to_archive): + if to_archive: + step = 10000 + for i in range(0, len(to_be_updated_rows), step): + updates = [] + for row in to_be_updated_rows[i: i+step]: + row_id = row.pop('_id', None) + updates.append({ + 'row_id': row_id, + 'row': row + }) + try: + dst_dtable_db_api.batch_update_rows(dst_table_name, updates) + except Exception as e: + logger.error('sync dataset update rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e) + return { + 'dst_table_id': None, + 'error_msg': 'update rows error', + 'task_status_code': 500 + } + else: + step = INSERT_UPDATE_ROWS_LIMIT + for i in range(0, len(to_be_updated_rows), step): + updates = [{ 'row_id': row['_id'], 'row': row - }) - data = { - 'table_name': dst_table_name, - 'updates': updates, - 'need_convert_back': False - } - try: - resp = requests.put(url, headers=dst_headers, json=data, timeout=180) - if resp.status_code != 200: - logger.error('sync dataset update rows dst dtable: %s dst table: %s error status code: %s content: %s', dst_dtable_uuid, dst_table_name, resp.status_code, resp.text) + } for row in to_be_updated_rows[i: i+step]] + try: + dst_dtable_server_api.batch_update_rows(dst_table_name, updates, need_convert_back=False) + except Exception as e: + logger.error('sync dataset update rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e) return { 'dst_table_id': None, 'error_msg': 'update rows error', 'task_status_code': 500 } + + +def delete_dst_rows(dst_dtable_uuid, dst_table_name, to_be_deleted_row_ids, dst_dtable_db_api, dst_dtable_server_api, to_archive): + if to_archive: + step = 10000 + for i in range(0, len(to_be_deleted_row_ids), step): + try: + dst_dtable_db_api.batch_delete_rows(dst_table_name, to_be_deleted_row_ids[i: i+step]) + except Exception as e: + logger.error('sync dataset delete rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e) + else: + step = DELETE_ROWS_LIMIT + for i in range(0, len(to_be_deleted_row_ids), step): + try: + dst_dtable_server_api.batch_delete_rows(dst_table_name, to_be_deleted_row_ids[i: i+step]) + except Exception as e: + logger.error('sync dataset delete rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e) + + +def import_sync_CDS(context): + """ + fetch src/dst rows id, find need append/update/delete rows + """ + src_dtable_uuid = context.get('src_dtable_uuid') + dst_dtable_uuid = context.get('dst_dtable_uuid') + + src_table_name = context.get('src_table_name') + src_view_name = context.get('src_view_name') + src_view_type = context.get('src_view_type', 'table') + src_columns = context.get('src_columns') + src_enable_archive = context.get('src_enable_archive', False) + + dst_table_id = context.get('dst_table_id') + dst_table_name = context.get('dst_table_name') + dst_columns = context.get('dst_columns') + + operator = context.get('operator') + lang = context.get('lang', 'en') + + to_archive = context.get('to_archive', False) + + src_dtable_server_api = DTableServerAPI(operator, src_dtable_uuid, dtable_server_url) + src_dtable_db_api = DTableDBAPI(operator, src_dtable_uuid, INNER_DTABLE_DB_URL) + dst_dtable_server_api = DTableServerAPI(operator, dst_dtable_uuid, dtable_server_url) + dst_dtable_db_api = DTableDBAPI(operator, dst_dtable_uuid, INNER_DTABLE_DB_URL) + + server_only = not (to_archive and src_enable_archive and src_view_type == 'archive') + is_sync = bool(dst_table_id) + logger.debug('to_archive: %s src_enable_archive: %s src_view_type: %s', to_archive, src_enable_archive, src_view_type) + + # fetch create dst table or update dst table columns + # fetch all src view rows id, S + # fetch all dst table rows id, D + # to-be-appended-rows-id = S - D + # to-be-updated-rows-id = S & D + # to-be-deleted-rows-id = D - S + # delete dst to-be-deleted-rows, step by step + # fetch src to-be-updated-rows and dst to-be-updated-rows, update to dst table, step by step + # fetch src to-be-append-rows, append to dst table, step by step + + # fetch create dst table or update dst table columns + # use src_columns from context temporary ! + to_be_updated_columns, to_be_appended_columns, error = generate_synced_columns(src_columns, dst_columns=dst_columns) + if error: + return { + 'dst_table_id': None, + 'error_type': 'generate_synced_columns_error', + 'error_msg': str(error), # generally, this error is caused by client + 'task_status_code': 400 + } + final_columns = (to_be_updated_columns or []) + (to_be_appended_columns or []) + ### create or update dst columns + dst_table_id, error_resp = create_dst_table_or_update_columns(dst_dtable_uuid, dst_table_id, dst_table_name, to_be_appended_columns, to_be_updated_columns, dst_dtable_server_api, lang) + if error_resp: + return error_resp + + # fetch all src view rows id + src_rows_id_set = set() + src_rows_id_list = list() + src_metadata = src_dtable_server_api.get_metadata() + src_table = [table for table in src_metadata['tables'] if table['name'] == src_table_name][0] + src_view = [view for view in src_table['views'] if view['name'] == src_view_name][0] + filter_conditions = { + 'filters': src_view.get('filters', []), + 'filter_conjunction': src_view.get('filter_conjunction', 'And'), + 'sorts': src_view.get('sorts', []) + } + logger.debug('filter_conditions: %s', filter_conditions) + try: + sql_generator = BaseSQLGenerator(src_table_name, src_table['columns'], filter_conditions=filter_conditions) + filter_clause = sql_generator._filter2sql() + sort_clause = sql_generator._sort2sql() + logger.debug('filter_clause: %s, sort_clause: %s', filter_clause, sort_clause) + except Exception as e: + logger.error('generate src view sql error: %s', e) + return { + 'dst_table_id': None, + 'error_msg': 'generate src view sql error: %s' % e, + 'task_status_code': 500 + } + sql_template = f"SELECT `_id` FROM `{src_table_name}` {filter_clause} {sort_clause}" + start, step = 0, 10000 + while True: + if server_only and (start + step) > SRC_ROWS_LIMIT: + step = SRC_ROWS_LIMIT - start + sql = sql_template + (" LIMIT {offset}, {limit}".format(offset=start, limit=step)) + logger.debug('fetch src rows-id sql: %s', sql) + try: + rows = src_dtable_db_api.query(sql, convert=False, server_only=server_only) + except Exception as e: + logger.error('fetch src rows id error: %s', e) + return { + 'dst_table_id': None, + 'error_msg': 'fetch src rows id error: %s' % e, + 'task_status_code': 500 + } + for row in rows: + if row['_id'] in src_rows_id_set: + continue + src_rows_id_list.append(row['_id']) + src_rows_id_set.add(row['_id']) + ## judge whether break + if len(rows) < step or (server_only and (start + step) >= SRC_ROWS_LIMIT): + break + start += step + + # fetch all dst table rows id + dst_rows_id_set = set() + start, step = 0, 10000 + while is_sync and True: + sql = f"SELECT `_id` FROM `{dst_table_name}` LIMIT {start}, {step}" + logger.debug('fetch dst rows-id sql: %s', sql) + try: + rows = dst_dtable_db_api.query(sql, convert=False, server_only=(not to_archive)) except Exception as e: - logger.error('sync dataset update rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e) + logger.error('fetch dst rows id error: %s', e) return { 'dst_table_id': None, - 'error_msg': 'update rows error', + 'error_msg': 'fetch dst rows id error: %s' % e, + 'task_status_code': 500 + } + dst_rows_id_set |= {row['_id'] for row in rows} + if len(rows) < step: + break + start += step + + # calc to-be-appended-rows-id, to-be-updated-rows-id, to-be-deleted-rows-id + to_be_appended_rows_id_set = src_rows_id_set - dst_rows_id_set + to_be_updated_rows_id_set = src_rows_id_set & dst_rows_id_set + to_be_deleted_rows_id_set = dst_rows_id_set - src_rows_id_set + logger.debug('to_be_appended_rows_id_set: %s, to_be_updated_rows_id_set: %s, to_be_deleted_rows_id_set: %s', len(to_be_appended_rows_id_set), len(to_be_updated_rows_id_set), len(to_be_deleted_rows_id_set)) + + # delete dst to-be-deleted-rows + logger.debug('will delete %s rows', len(to_be_deleted_rows_id_set)) + delete_dst_rows(dst_dtable_uuid, dst_table_name, list(to_be_deleted_rows_id_set), dst_dtable_db_api, dst_dtable_server_api, to_archive) + + query_columns = ', '.join(['_id'] + ["`%s`" % col['name'] for col in final_columns]) + + # fetch src to-be-updated-rows and dst to-be-updated-rows, update to dst table, step by step + to_be_updated_rows_id_list = list(to_be_updated_rows_id_set) + step = 10000 + for i in range(0, len(to_be_updated_rows_id_list), step): + logger.debug('to_be_updated_rows_id_list i: %s step: %s', i, step) + ## fetch src to-be-updated-rows + rows_id_str = ', '.join(["'%s'" % row_id for row_id in to_be_updated_rows_id_list[i: i+step]]) + sql = f"SELECT {query_columns} FROM `{src_table_name}` WHERE _id IN ({rows_id_str}) LIMIT {step}" + try: + src_rows = src_dtable_db_api.query(sql, convert=False, server_only=server_only) + except Exception as e: + logger.error('fetch src to-be-updated-rows error: %s', e) + return { + 'dst_table_id': None, + 'error_msg': 'fetch src to-be-updated-rows error: %s' % e, 'task_status_code': 500 } - ### delete rows - url = dtable_server_url.strip('/') + '/api/v1/dtables/%s/batch-delete-rows/?from=dtable_events' % (str(dst_dtable_uuid),) - for i in range(0, len(to_be_deleted_row_ids), step): - data = { - 'table_name': dst_table_name, - 'row_ids': to_be_deleted_row_ids[i: i+step] - } + ## fetch src to-be-updated-rows + sql = f"SELECT {query_columns} FROM `{dst_table_name}` WHERE _id IN ({rows_id_str}) LIMIT {step}" try: - resp = requests.delete(url, headers=dst_headers, json=data, timeout=180) - if resp.status_code != 200: - logger.error('sync dataset delete rows dst dtable: %s dst table: %s error status code: %s, content: %s', dst_dtable_uuid, dst_table_name, resp.status_code, resp.text) - return { - 'dst_table_id': None, - 'error_msg': 'delete rows error', - 'task_status_code': 500 - } + dst_rows = dst_dtable_db_api.query(sql, convert=False, server_only=(not to_archive)) except Exception as e: - logger.error('sync dataset delete rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e) + logger.error('fetch dst to-be-updated-rows error: %s', e) return { 'dst_table_id': None, - 'error_msg': 'delete rows error', + 'error_msg': 'fetch dst to-be-updated-rows error: %s' % e, 'task_status_code': 500 } - ### append rows - url = dtable_server_url.strip('/') + '/api/v1/dtables/%s/batch-append-rows/?from=dtable_events' % (str(dst_dtable_uuid),) - for i in range(0, len(to_be_appended_rows), step): - data = { - 'table_name': dst_table_name, - 'rows': to_be_appended_rows[i: i+step], - 'need_convert_back': False - } + ## update + to_be_updated_rows, _, _ = generate_synced_rows(src_rows, src_columns, final_columns, dst_rows=dst_rows, to_archive=to_archive) + logger.debug('step src update-rows: %s to-be-updated-rows: %s', len(to_be_updated_rows_id_list[i: i+step]), len(to_be_updated_rows)) + error_resp = update_dst_rows(dst_dtable_uuid, dst_table_name, to_be_updated_rows, dst_dtable_db_api, dst_dtable_server_api, to_archive) + if error_resp: + return error_resp + + # fetch src to-be-append-rows, append to dst table, step by step + ## this list is to record the order of src rows + to_be_appended_rows_id_list = [row_id for row_id in src_rows_id_list if row_id in to_be_appended_rows_id_set] + + step = 10000 + for i in range(0, len(to_be_appended_rows_id_list), step): + logger.debug('to_be_appended_rows_id_list i: %s, step: %s', i, step) + step_to_be_appended_rows_id_list = [] + step_row_sort_dict = {} + for j in range(step): + if i + j >= len(to_be_appended_rows_id_list): + break + step_to_be_appended_rows_id_list.append(to_be_appended_rows_id_list[i+j]) + step_row_sort_dict[to_be_appended_rows_id_list[i+j]] = j + rows_id_str = ', '.join(["'%s'" % row_id for row_id in step_to_be_appended_rows_id_list]) + if filter_clause: + sql = f"SELECT {query_columns} FROM `{src_table_name}` WHERE (({filter_clause[len('WHERE'):]}) AND `_id` IN ({rows_id_str})) LIMIT {step}" + else: + sql = f"SELECT {query_columns} FROM `{src_table_name}` WHERE `_id` IN ({rows_id_str}) LIMIT {step}" try: - resp = requests.post(url, headers=dst_headers, json=data, timeout=180) - if resp.status_code != 200: - logger.error('sync dataset append rows dst dtable: %s dst table: %s error status code: %s', dst_dtable_uuid, dst_table_name, resp.status_code) - return { - 'dst_table_id': None, - 'error_msg': 'append rows error', - 'task_status_code': 500 - } + src_rows = src_dtable_db_api.query(sql, convert=False, server_only=server_only) except Exception as e: - logger.error('sync dataset append rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e) + logger.error('fetch to-be-appended-rows error: %s', e) return { 'dst_table_id': None, - 'error_msg': 'append rows error', + 'error_msg': 'fetch to-be-appended-rows error: %s' % e, 'task_status_code': 500 } + src_rows = sorted(src_rows, key=lambda row: step_row_sort_dict[row['_id']]) + _, to_be_appended_rows, _ = generate_synced_rows(src_rows, src_columns, final_columns, [], to_archive=to_archive) + error_resp = append_dst_rows(dst_dtable_uuid, dst_table_name, to_be_appended_rows, dst_dtable_db_api, dst_dtable_server_api, to_archive=to_archive) + if error_resp: + return error_resp return { 'dst_table_id': dst_table_id, - 'error_msg': None, + 'error_msg': '', 'task_status_code': 200 } @@ -845,4 +927,3 @@ def set_common_dataset_sync_invalid(dataset_sync_id, db_session): db_session.commit() except Exception as e: logger.error('set state of common dataset sync: %s error: %s', dataset_sync_id, e) - diff --git a/dtable_events/common_dataset/common_dataset_syncer.py b/dtable_events/common_dataset/common_dataset_syncer.py index 87ac46a9..5c02901d 100644 --- a/dtable_events/common_dataset/common_dataset_syncer.py +++ b/dtable_events/common_dataset/common_dataset_syncer.py @@ -1,18 +1,13 @@ import logging -import time from datetime import datetime, timedelta from threading import Thread -import jwt -import requests from apscheduler.schedulers.blocking import BlockingScheduler from dtable_events import init_db_session_class -from dtable_events.app.config import DTABLE_PRIVATE_KEY -from dtable_events.common_dataset.common_dataset_sync_utils import import_or_sync, set_common_dataset_invalid, set_common_dataset_sync_invalid +from dtable_events.common_dataset.common_dataset_sync_utils import import_sync_CDS, set_common_dataset_invalid, set_common_dataset_sync_invalid from dtable_events.utils import get_opt_from_conf_or_env, parse_bool, uuid_str_to_36_chars, get_inner_dtable_server_url - -logger = logging.getLogger(__name__) +from dtable_events.utils.dtable_server_api import DTableServerAPI class CommonDatasetSyncer(object): @@ -42,110 +37,62 @@ def is_enabled(self): return self._enabled -def get_dtable_server_header(dtable_uuid): - try: - access_token = jwt.encode({ - 'dtable_uuid': dtable_uuid, - 'username': 'dtable-events', - 'permission': 'rw', - 'exp': int(time.time()) + 60 - }, - DTABLE_PRIVATE_KEY, - algorithm='HS256' - ) - except Exception as e: - logger.error(e) - return - return {'Authorization': 'Token ' + access_token} - - def gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_id, dst_table_id, dataset_sync_id, dataset_id, db_session): """ return assets -> dict """ - dst_headers = get_dtable_server_header(dst_dtable_uuid) - src_headers = get_dtable_server_header(src_dtable_uuid) - - # request src_dtable dtable_server_url = get_inner_dtable_server_url() - url = dtable_server_url.strip('/') + '/dtables/' + src_dtable_uuid + '?from=dtable_events' - + src_dtable_server_api = DTableServerAPI('dtable-events', src_dtable_uuid, dtable_server_url) + dst_dtable_server_api = DTableServerAPI('dtable-events', dst_dtable_uuid, dtable_server_url) try: - resp = requests.get(url, headers=src_headers) - src_dtable_json = resp.json() + src_dtable_metadata = src_dtable_server_api.get_metadata() + dst_dtable_metadata = dst_dtable_server_api.get_metadata() except Exception as e: - logger.error('request src dtable: %s error: %s', src_dtable_uuid, e) - return + logging.error('request src dst dtable: %s, %s metadata error: %s', src_dtable_uuid, dst_dtable_uuid, e) + return None - # check src_table src_view - src_table = None - for table in src_dtable_json.get('tables', []): - if table.get('_id') == src_table_id: + src_table, src_view = None, None + for table in src_dtable_metadata.get('tables', []): + if table['_id'] == src_table_id: src_table = table break if not src_table: set_common_dataset_invalid(dataset_id, db_session) set_common_dataset_sync_invalid(dataset_sync_id, db_session) - logging.error('Source table not found.') - return - - src_view = None - if src_view_id: - for view in src_table.get('views', []): - if view.get('_id') == src_view_id: - src_view = view - break - if not src_view: - set_common_dataset_invalid(dataset_id, db_session) - set_common_dataset_sync_invalid(dataset_sync_id, db_session) - logging.error('Source view not found.') - return - else: - views = src_table.get('views', []) - if not views or not isinstance(views, list): - set_common_dataset_invalid(dataset_id, db_session) - set_common_dataset_sync_invalid(dataset_sync_id, db_session) - logging.error('No views found.') - return - src_view = views[0] + logging.warning('Source table not found.') + return None + for view in src_table.get('views', []): + if view['_id'] == src_view_id: + src_view = view + break + if not src_view: + set_common_dataset_invalid(dataset_id, db_session) + set_common_dataset_sync_invalid(dataset_sync_id, db_session) + logging.warning('Source view not found.') + return None - # get src columns - src_view_hidden_columns = src_view.get('hidden_columns', []) - if not src_view_hidden_columns: - src_columns = src_table.get('columns', []) - else: - src_columns = [col for col in src_table.get('columns', []) if col.get('key') not in src_view_hidden_columns] + src_columns = [col for col in src_table.get('columns', []) if col['key'] not in src_view.get('hidden_columns', [])] - # request dst_dtable - url = dtable_server_url.strip('/') + '/dtables/' + dst_dtable_uuid + '?from=dtable_events' - try: - resp = requests.get(url, headers=dst_headers) - dst_dtable_json = resp.json() - except Exception as e: - logging.error('request dst dtable: %s error: %s', dst_dtable_uuid, e) - return + src_version = src_dtable_metadata.get('version') - # check dst_table dst_table = None - for table in dst_dtable_json.get('tables', []): - if table.get('_id') == dst_table_id: - dst_table = table - break - if not dst_table: - set_common_dataset_sync_invalid(dataset_sync_id, db_session) - logging.warning('Destination table: %s not found.' % dst_table_id) - return + if dst_table_id: + for table in dst_dtable_metadata.get('tables', []): + if table['_id'] == dst_table_id: + dst_table = table + break + if not dst_table: + set_common_dataset_sync_invalid(dataset_sync_id, db_session) + logging.warning('Destination table not found.') + return None return { - 'dst_headers': dst_headers, - 'src_headers': src_headers, - 'src_table': src_table, - 'src_view': src_view, + 'src_table_name': src_table['name'], + 'src_view_name': src_view['name'], 'src_columns': src_columns, - 'dst_columns': dst_table.get('columns'), - 'dst_rows': dst_table.get('rows'), - 'dst_table_name': dst_table.get('name'), - 'src_version': src_dtable_json.get('version') + 'src_version': src_version, + 'dst_table_name': dst_table['name'] if dst_table else None, + 'dst_columns': dst_table['columns'] if dst_table else None } @@ -189,7 +136,7 @@ def update_sync_time_and_version(db_session, update_map): def check_common_dataset(db_session): - dataset_sync_list = list_pending_common_dataset_syncs(db_session) + dataset_sync_list = list(list_pending_common_dataset_syncs(db_session)) sync_count = 0 dataset_update_map = {} for dataset_sync in dataset_sync_list: @@ -207,53 +154,42 @@ def check_common_dataset(db_session): if not assets: continue - dst_headers = assets.get('dst_headers') - src_table = assets.get('src_table') - src_view = assets.get('src_view') - src_columns = assets.get('src_columns') - src_headers = assets.get('src_headers') - dst_columns = assets.get('dst_columns') - dst_rows = assets.get('dst_rows') - dst_table_name = assets.get('dst_table_name') - dtable_src_version = assets.get('src_version') - - if dtable_src_version == last_src_version: + if assets.get('src_version') == last_src_version: continue try: - result = import_or_sync({ - 'dst_dtable_uuid': dst_dtable_uuid, + result = import_sync_CDS({ 'src_dtable_uuid': src_dtable_uuid, - 'src_rows': src_table.get('rows', []), - 'src_columns': src_columns, - 'src_table_name': src_table.get('name'), - 'src_view_name': src_view.get('name'), - 'src_headers': src_headers, + 'dst_dtable_uuid': dst_dtable_uuid, + 'src_table_name': assets.get('src_table_name'), + 'src_view_name': assets.get('src_view_name'), + 'src_columns': assets.get('src_columns'), + 'src_version': assets.get('src_version'), 'dst_table_id': dst_table_id, - 'dst_table_name': dst_table_name, - 'dst_headers': dst_headers, - 'dst_columns': dst_columns, - 'dst_rows': dst_rows, - 'lang': 'en' # TODO: lang + 'dst_table_name': assets.get('dst_table_name'), + 'dst_columns': assets.get('dst_columns'), + 'operator': 'dtable-events', + 'lang': 'en', # TODO: lang + 'dataset_id': dataset_id }) except Exception as e: - logger.error('sync common dataset error: %s', e) + logging.error('sync common dataset error: %s', e) continue else: if result.get('error_msg'): - logger.error(result['error_msg']) + logging.error(result['error_msg']) if result.get('error_type') == 'generate_synced_columns_error': set_common_dataset_sync_invalid(dataset_sync_id, db_session) continue - dataset_update_map[dataset_sync_id] = dtable_src_version + dataset_update_map[dataset_sync_id] = assets.get('src_version') sync_count += 1 if sync_count == 1000: try: update_sync_time_and_version(db_session, dataset_update_map) except Exception as e: - logger.error(f'update sync time and src_version failed, error: {e}') + logging.error(f'update sync time and src_version failed, error: {e}') dataset_update_map = {} sync_count = 0 @@ -261,7 +197,7 @@ def check_common_dataset(db_session): try: update_sync_time_and_version(db_session, dataset_update_map) except Exception as e: - logger.error(f'update sync time and src_version failed, error: {e}') + logging.error(f'update sync time and src_version failed, error: {e}') class CommonDatasetSyncerTimer(Thread): @@ -279,7 +215,7 @@ def timed_job(): try: check_common_dataset(db_session) except Exception as e: - logger.exception('check periodcal common dataset syncs error: %s', e) + logging.exception('check periodcal common dataset syncs error: %s', e) finally: db_session.close() diff --git a/dtable_events/common_dataset/dtable_db_cell_validators.py b/dtable_events/common_dataset/dtable_db_cell_validators.py new file mode 100644 index 00000000..76dd1293 --- /dev/null +++ b/dtable_events/common_dataset/dtable_db_cell_validators.py @@ -0,0 +1,261 @@ +import logging + +from dateutil import parser + +from dtable_events.utils.constants import ColumnTypes + +logger = logging.getLogger(__name__) + + +class BaseValidator: + + def __init__(self, column): + self.column = column + + +class TextValidator(BaseValidator): + + def validate(self, value): + if not isinstance(value, str): + return None + return value + + +class NumberValidator(BaseValidator): + def validate(self, value): + if value is None: + return None + try: + return float(value) + except: + return None + + +class LongTextValidator(BaseValidator): + + def validate(self, value): + if value is None: + return None + if isinstance(value, dict): + return value.get('text') + elif isinstance(value, str): + return value + return None + + +class ImageValidator(BaseValidator): + + def validate(self, value): + if value is None: + return None + if isinstance(value, list): + if value and not isinstance(value[0], str): + return None + return [v for v in value if isinstance(v, str)] + return None + + +class DateValidator(BaseValidator): + + def validate(self, value): + if value is None: + return value + if not isinstance(value, str): + return None + try: + parser.parse(value) + except: + return None + return value + + +class CheckboxValidator(BaseValidator): + + def validate(self, value): + if not isinstance(value, bool): + return None + return value + + +class SingleSelectValidator(BaseValidator): + + def validate(self, value): + if not isinstance(value, str): + return None + column_data = self.column.get('data') or {} + options = column_data.get('options') or [] + for option in options: + if option['id'] == value: + return option['name'] + return None + + +class MultipleSelectValidator(BaseValidator): + + def validate(self, value): + if not isinstance(value, list) or (value and not isinstance(value[0], str)): + return None + column_data = self.column.get('data') or {} + options = column_data.get('options') or [] + options_dict = {option['id']: option for option in options} + return [options_dict[v]['name'] for v in value] or None + + +class URLValiadator(TextValidator): + pass + + +class DurationValidator(BaseValidator): + + def validate(self, value): + try: + return int(value) + except: + return None + + +class FileValidator(BaseValidator): + + def validate(self, value): + if not isinstance(value, list) or (value and not isinstance(value[0], dict)): + return None + real_values = [] + for v in value: + if not isinstance(v, dict): + continue + # TODO: check url name ... + real_values.append(v) + return real_values + + +class CollaboratorValidator(BaseValidator): + + def validate(self, value): + if not isinstance(value, list) or (value and not isinstance(value[0], str)): + return None + return [v for v in value if isinstance(v, str)] + + +class EmailValidator(TextValidator): + pass + + +class GeolocationValidator(BaseValidator): + + def is_valid_position(self, lng, lat): + return (lng or lng == 0) and (lat or lat == 0) + + def validate(self, value): + if not isinstance(value, dict): + return None + column_data = self.column.get('data') or {} + geo_format = column_data.get('geo_format') + if geo_format == 'lng_lat': + lng, lat = value.get('lng'), value.get('lat') + if not (lng and lat): + return None + if not self.is_valid_position(lng, lat): + return None + return value + elif geo_format == 'country_region': + return value.get('country_region', '') + elif geo_format == 'geolocation': + province, city, district, detail = value.get('province', ''), value.get('city', ''), value.get('district', ''), value.get('detail', '') + return { + 'province': province, + 'city': city, + 'district': district, + 'detail': detail + } + elif geo_format == 'province_city_district': + province, city, district = value.get('province', ''), value.get('city', ''), value.get('district', '') + return { + 'province': province, + 'city': city, + 'district': district + } + elif geo_format == 'province': + province = value.get('province', '') + return { + 'province': province + } + elif geo_format == 'province_city': + province, city = value.get('province', ''), value.get('city', '') + return { + 'province': province, + 'city': city + } + else: + return value + + +class RateValidator(BaseValidator): + + def validate(self, value): + try: + return int(value) + except: + return None + + +class DurationValidator(BaseValidator): + + def validate(self, value): + try: + return int(value) + except: + return None + + +class EmailValidator(TextValidator): + pass + + +class CreatorValidator(TextValidator): + pass + + +class CTimeValidator(DateValidator): + pass + + +class LastModifierValidator(TextValidator): + pass + + +class MTimeModifierValidator(DateValidator): + pass + + +VALIDATORS_MAP = { + ColumnTypes.TEXT: TextValidator, + ColumnTypes.NUMBER: NumberValidator, + ColumnTypes.LONG_TEXT: LongTextValidator, + ColumnTypes.IMAGE: ImageValidator, + ColumnTypes.DATE: DateValidator, + ColumnTypes.CHECKBOX: CheckboxValidator, + ColumnTypes.SINGLE_SELECT: SingleSelectValidator, + ColumnTypes.MULTIPLE_SELECT: MultipleSelectValidator, + ColumnTypes.URL: URLValiadator, + ColumnTypes.FILE: FileValidator, + ColumnTypes.COLLABORATOR: CollaboratorValidator, + ColumnTypes.GEOLOCATION: GeolocationValidator, + ColumnTypes.RATE: RateValidator, + ColumnTypes.DURATION: DurationValidator, + ColumnTypes.EMAIL: EmailValidator, + ColumnTypes.CREATOR: CreatorValidator, + ColumnTypes.CTIME: CTimeValidator, + ColumnTypes.LAST_MODIFIER: LastModifierValidator, + ColumnTypes.MTIME: MTimeModifierValidator +} + + +def validate_table_db_cell_value(column, value): + if column['type'] not in VALIDATORS_MAP: + return None + try: + return VALIDATORS_MAP[column['type']](column).validate(value) + except Exception as e: + logger.exception(e) + logger.error('validate column : %s, value: %s error: %s', column, value, e) + return None diff --git a/dtable_events/dtable_io/import_sync_common_dataset.py b/dtable_events/dtable_io/import_sync_common_dataset.py index 971306a3..c9f0a699 100644 --- a/dtable_events/dtable_io/import_sync_common_dataset.py +++ b/dtable_events/dtable_io/import_sync_common_dataset.py @@ -3,10 +3,11 @@ from datetime import datetime import requests -from dtable_events.common_dataset.common_dataset_sync_utils import import_or_sync +from dtable_events.common_dataset.common_dataset_sync_utils import import_sync_CDS from dtable_events.db import init_db_session_class from dtable_events.dtable_io import dtable_io_logger from dtable_events.utils import uuid_str_to_32_chars, get_inner_dtable_server_url +from dtable_events.utils.dtable_server_api import DTableServerAPI dtable_server_url = get_inner_dtable_server_url() @@ -23,26 +24,30 @@ def sync_common_dataset(context, config): :return api_error or None """ - dst_headers = context['dst_headers'] - src_table = context['src_table'] - src_view = context['src_view'] - src_columns = context['src_columns'] - src_headers = context['src_headers'] + src_dtable_uuid = context.get('src_dtable_uuid') + dst_dtable_uuid = context.get('dst_dtable_uuid') - dst_dtable_uuid = context['dst_dtable_uuid'] - src_dtable_uuid = context['src_dtable_uuid'] - dst_table_id = context['dst_table_id'] + src_table_name = context.get('src_table_name') + src_view_name = context.get('src_view_name') + src_columns = context.get('src_columns') + src_version = context.get('src_version') + + dst_table_id = context.get('dst_table_id') + dst_table_name = context.get('dst_table_name') + dst_columns = context.get('dst_columns') + + operator = context.get('operator') + lang = context.get('lang', 'en') dataset_id = context.get('dataset_id') - src_version = context.get('src_version') # get database version try: db_session = init_db_session_class(config)() except Exception as e: - db_session = None dtable_io_logger.error('create db session failed. ERROR: {}'.format(e)) return + sql = ''' SELECT id FROM dtable_common_dataset_sync WHERE dst_dtable_uuid=:dst_dtable_uuid AND dataset_id=:dataset_id AND dst_table_id=:dst_table_id @@ -57,52 +62,31 @@ def sync_common_dataset(context, config): }) except Exception as e: dtable_io_logger.error('get src version error: %s', e) - return - finally: db_session.close() - - if list(sync_dataset): - return - - # request dst_dtable - url = dtable_server_url.strip('/') + '/dtables/' + str(dst_dtable_uuid) + '?from=dtable_events' - try: - resp = requests.get(url, headers=dst_headers, timeout=180) - dst_dtable_json = resp.json() - except Exception as e: - dtable_io_logger.error('request dst dtable: %s error: %s', dst_dtable_uuid, e) return - # check dst_table - dst_table = None - for table in dst_dtable_json.get('tables', []): - if table.get('_id') == dst_table_id: - dst_table = table - break - if not dst_table: - dtable_io_logger.warning('Destination table: %s not found.' % dst_table_id) + sync_dataset = list(sync_dataset) + if sync_dataset: + dtable_io_logger.debug('sync_dataset: %s', sync_dataset[0]) return - dst_columns = dst_table.get('columns') - dst_rows = dst_table.get('rows') try: - result = import_or_sync({ - 'dst_dtable_uuid': dst_dtable_uuid, + result = import_sync_CDS({ 'src_dtable_uuid': src_dtable_uuid, - 'src_rows': src_table.get('rows', []), + 'dst_dtable_uuid': dst_dtable_uuid, + 'src_table_name': src_table_name, + 'src_view_name': src_view_name, 'src_columns': src_columns, - 'src_table_name': src_table.get('name'), - 'src_view_name': src_view.get('name'), - 'src_headers': src_headers, 'dst_table_id': dst_table_id, - 'dst_table_name': dst_table.get('name'), - 'dst_headers': dst_headers, - 'dst_rows': dst_rows, - 'dst_columns': dst_columns + 'dst_table_name': dst_table_name, + 'dst_columns': dst_columns, + 'operator': operator, + 'lang': lang }) except Exception as e: dtable_io_logger.exception(e) dtable_io_logger.error('sync common dataset error: %s', e) + db_session.close() raise Exception(str(e)) else: if result and 'task_status_code' in result and result['task_status_code'] != 200: @@ -112,15 +96,14 @@ def sync_common_dataset(context, config): raise Exception(error_msg) # get base's metadata - src_url = dtable_server_url.rstrip('/') + '/api/v1/dtables/' + str(src_dtable_uuid) + '/metadata/?from=dtable_events' + src_dtable_server_api = DTableServerAPI(operator, src_dtable_uuid, dtable_server_url) try: - dtable_metadata = requests.get(src_url, headers=src_headers, timeout=180) - src_metadata = dtable_metadata.json() + src_metadata = src_dtable_server_api.get_metadata() except Exception as e: dtable_io_logger.error('get metadata error: %s', e) return None, 'get metadata error: %s' % (e,) - last_src_version = src_metadata.get('metadata', {}).get('version') + last_src_version = src_metadata.get('version') sql = ''' UPDATE dtable_common_dataset_sync SET @@ -146,31 +129,29 @@ def import_common_dataset(context, config): """ import common dataset to destination table """ - dst_headers = context['dst_headers'] - src_table = context['src_table'] - src_columns = context['src_columns'] - src_view = context['src_view'] - src_headers = context['src_headers'] - - dst_dtable_uuid = context['dst_dtable_uuid'] - src_dtable_uuid = context['src_dtable_uuid'] - dst_table_name = context['dst_table_name'] + src_dtable_uuid = context.get('src_dtable_uuid') + dst_dtable_uuid = context.get('dst_dtable_uuid') + + src_table_name = context.get('src_table_name') + src_view_name = context.get('src_view_name') + src_columns = context.get('src_columns') + + dst_table_name = context.get('dst_table_name') + + operator = context.get('operator') lang = context.get('lang', 'en') dataset_id = context.get('dataset_id') - creator = context.get('creator') try: - result = import_or_sync({ - 'dst_dtable_uuid': dst_dtable_uuid, + result = import_sync_CDS({ 'src_dtable_uuid': src_dtable_uuid, - 'src_rows': src_table.get('rows', []), + 'dst_dtable_uuid': dst_dtable_uuid, + 'src_table_name': src_table_name, + 'src_view_name': src_view_name, 'src_columns': src_columns, - 'src_table_name': src_table.get('name'), - 'src_view_name': src_view.get('name'), - 'src_headers': src_headers, 'dst_table_name': dst_table_name, - 'dst_headers': dst_headers, + 'operator': operator, 'lang': lang }) except Exception as e: @@ -192,15 +173,14 @@ def import_common_dataset(context, config): return # get base's metadata - url = dtable_server_url.rstrip('/') + '/api/v1/dtables/' + str(src_dtable_uuid) + '/metadata/?from=dtable_events' + src_dtable_server_api = DTableServerAPI(operator, src_dtable_uuid, dtable_server_url) try: - dtable_metadata = requests.get(url, headers=src_headers, timeout=180) - src_metadata = dtable_metadata.json() + src_metadata = src_dtable_server_api.get_metadata() except Exception as e: dtable_io_logger.error('get metadata error: %s', e) return None, 'get metadata error: %s' % (e,) - last_src_version = src_metadata.get('metadata', {}).get('version') + last_src_version = src_metadata.get('version') sql = ''' INSERT INTO dtable_common_dataset_sync (`dst_dtable_uuid`, `dst_table_id`, `created_at`, `creator`, `last_sync_time`, `dataset_id`, `src_version`) @@ -212,7 +192,7 @@ def import_common_dataset(context, config): 'dst_dtable_uuid': uuid_str_to_32_chars(dst_dtable_uuid), 'dst_table_id': dst_table_id, 'created_at': datetime.now(), - 'creator': creator, + 'creator': operator, 'last_sync_time': datetime.now(), 'dataset_id': dataset_id, 'src_version': last_src_version diff --git a/dtable_events/dtable_io/utils.py b/dtable_events/dtable_io/utils.py index 6980407b..4fe2a573 100644 --- a/dtable_events/dtable_io/utils.py +++ b/dtable_events/dtable_io/utils.py @@ -68,9 +68,9 @@ def gen_inner_file_upload_url(token, op, replace=False): return url -def get_dtable_server_token(username, dtable_uuid): +def get_dtable_server_token(username, dtable_uuid, timeout=300): payload = { - 'exp': int(time.time()) + 300, + 'exp': int(time.time()) + timeout, 'dtable_uuid': dtable_uuid, 'username': username, 'permission': 'rw', diff --git a/dtable_events/utils/dtable_db_api.py b/dtable_events/utils/dtable_db_api.py index 37da904c..0faa2fd4 100644 --- a/dtable_events/utils/dtable_db_api.py +++ b/dtable_events/utils/dtable_db_api.py @@ -17,6 +17,9 @@ class RowInsertedError(Exception): class RowUpdatedError(Exception): pass +class RowDeletedError(Exception): + pass + def parse_response(response): if response.status_code >= 400: raise ConnectionError(response.status_code, response.text) @@ -202,11 +205,12 @@ def insert_rows(self, table_name, rows): } resp = requests.post(api_url, json=params, headers=self.headers, timeout=TIMEOUT) if not resp.status_code == 200: - raise RowInsertedError + logger.error('error insert rows resp: %s', resp.text) + raise RowInsertedError return resp.json() def batch_update_rows(self, table_name, rows_data): - url = "%s/api/v1/update-rows/%s" % ( + url = "%s/api/v1/update-rows/%s?from=dtable_events" % ( self.dtable_db_url, self.dtable_uuid ) @@ -220,3 +224,25 @@ def batch_update_rows(self, table_name, rows_data): raise RowUpdatedError return resp.json() + def batch_delete_rows(self, table_name, row_ids): + url = "%s/api/v1/delete-rows/%s?from=dtable_events" % ( + self.dtable_db_url, + self.dtable_uuid + ) + + json_data = { + 'table_name': table_name, + 'row_ids': row_ids + } + resp = requests.delete(url, json=json_data, headers=self.headers, timeout=TIMEOUT) + if not resp.status_code == 200: + raise RowDeletedError + return resp.json() + + def get_metadata(self): + url = '%s/api/v1/metadata/%s?from=dtable_events' % ( + self.dtable_db_url, + self.dtable_uuid + ) + resp = requests.get(url, headers=self.headers, timeout=TIMEOUT) + return parse_response(resp) diff --git a/dtable_events/utils/dtable_server_api.py b/dtable_events/utils/dtable_server_api.py index 79977d6c..56098de7 100644 --- a/dtable_events/utils/dtable_server_api.py +++ b/dtable_events/utils/dtable_server_api.py @@ -72,7 +72,7 @@ def __init__(self, username, dtable_uuid, dtable_server_url, server_url=None, re self._init() def _init(self): - dtable_server_access_token = get_dtable_server_token(self.username, self.dtable_uuid) + dtable_server_access_token = get_dtable_server_token(self.username, self.dtable_uuid, timeout=3600) self.headers = {'Authorization': 'Token ' + dtable_server_access_token} def get_metadata(self): @@ -158,13 +158,35 @@ def insert_column(self, table_name, column_name, column_type, column_data=None): data = parse_response(response) return data - def batch_append_rows(self, table_name, rows_data): + def batch_append_columns_by_table_id(self, table_id, columns): + logger.debug('batch append columns by table id table_id: %s columns: %s', table_id, columns) + url = self.dtable_server_url + '/api/v1/dtables/' + self.dtable_uuid + '/batch-append-columns/?from=dtable_events' + json_data = { + 'table_id': table_id, + 'columns': columns + } + response = requests.post(url, json=json_data, headers=self.headers, timeout=self.timeout) + return parse_response(response) + + def batch_update_columns_by_table_id(self, table_id, columns): + logger.debug('batch update columns by table id table_id: %s columns: %s', table_id, columns) + url = self.dtable_server_url + '/api/v1/dtables/' + self.dtable_uuid + '/batch-update-columns/?from=dtable_events' + json_data = { + 'table_id': table_id, + 'columns': columns + } + response = requests.put(url, json=json_data, headers=self.headers) + return parse_response(response) + + def batch_append_rows(self, table_name, rows_data, need_convert_back=None): logger.debug('batch append rows table_name: %s rows_data: %s', table_name, rows_data) url = self.dtable_server_url + '/api/v1/dtables/' + self.dtable_uuid + '/batch-append-rows/?from=dtable_events' json_data = { 'table_name': table_name, 'rows': rows_data, } + if need_convert_back is not None: + json_data['need_convert_back'] = need_convert_back response = requests.post(url, json=json_data, headers=self.headers, timeout=self.timeout) return parse_response(response) @@ -189,13 +211,15 @@ def update_row(self, table_name, row_id, row_data): response = requests.put(url, json=json_data, headers=self.headers, timeout=self.timeout) return parse_response(response) - def batch_update_rows(self, table_name, rows_data): + def batch_update_rows(self, table_name, rows_data, need_convert_back=None): logger.debug('batch update rows table_name: %s rows_data: %s', table_name, rows_data) url = self.dtable_server_url + '/api/v1/dtables/' + self.dtable_uuid + '/batch-update-rows/?from=dtable_events' json_data = { 'table_name': table_name, 'updates': rows_data, } + if need_convert_back is not None: + json_data['need_convert_back'] = need_convert_back response = requests.put(url, json=json_data, headers=self.headers, timeout=self.timeout) return parse_response(response)