diff --git a/dtable_events/common_dataset/common_dataset_sync_utils.py b/dtable_events/common_dataset/common_dataset_sync_utils.py index 5c568af5..51bd9322 100644 --- a/dtable_events/common_dataset/common_dataset_sync_utils.py +++ b/dtable_events/common_dataset/common_dataset_sync_utils.py @@ -241,7 +241,7 @@ def generate_synced_columns(src_columns, dst_columns=None): return to_be_updated_columns, to_be_appended_columns, None -def generate_synced_rows(converted_rows, src_columns, synced_columns, dst_rows=None, to_archive=False): +def generate_synced_rows(converted_rows, src_columns, synced_columns, dst_rows=None, to_dtable_db=False): """ generate synced rows divided into `rows to be updated`, `rows to be appended` and `rows to be deleted` return: to_be_updated_rows, to_be_appended_rows, to_be_deleted_row_ids @@ -261,7 +261,7 @@ def generate_synced_rows(converted_rows, src_columns, synced_columns, dst_rows=N to_be_deleted_row_ids.append(row_id) continue - update_row = generate_single_row(converted_row, src_columns, synced_columns_dict, dst_row=row, to_archive=to_archive) + update_row = generate_single_row(converted_row, src_columns, synced_columns_dict, dst_row=row, to_dtable_db=to_dtable_db) if update_row: update_row['_id'] = row_id to_be_updated_rows.append(update_row) @@ -271,7 +271,7 @@ def generate_synced_rows(converted_rows, src_columns, synced_columns, dst_rows=N row_id = converted_row.get('_id') if transfered_row_ids.get(row_id): continue - append_row = generate_single_row(converted_row, src_columns, synced_columns_dict, dst_row=None, to_archive=to_archive) + append_row = generate_single_row(converted_row, src_columns, synced_columns_dict, dst_row=None, to_dtable_db=to_dtable_db) if append_row: append_row['_id'] = row_id to_be_appended_rows.append(append_row) @@ -516,7 +516,7 @@ def is_equal(v1, v2, column_type): return False -def generate_single_row(converted_row, src_columns, transfered_columns_dict, dst_row=None, to_archive=False): +def generate_single_row(converted_row, src_columns, transfered_columns_dict, dst_row=None, to_dtable_db=False): """ generate new single row according to src column type :param converted_row: {'_id': '', 'column_key_1': '', 'col_key_2'; ''} from dtable-db @@ -540,19 +540,19 @@ def generate_single_row(converted_row, src_columns, transfered_columns_dict, dst if not transfered_column: continue - if to_archive and col['key'] in ['_creator', '_ctime', '_last_modifier', '_mtime']: + if to_dtable_db and col['key'] in ['_creator', '_ctime', '_last_modifier', '_mtime']: continue if op_type == 'update': converted_cell_value = get_converted_cell_value(converted_cell_value, transfered_column, col) if not is_equal(dst_row.get(col_key), converted_cell_value, transfered_column['type']): - if not to_archive: + if not to_dtable_db: dataset_row[col_key] = converted_cell_value else: dataset_row[col['name']] = validate_table_db_cell_value(transfered_column, converted_cell_value) else: converted_cell_value = get_converted_cell_value(converted_cell_value, transfered_column, col) - if not to_archive: + if not to_dtable_db: dataset_row[col_key] = converted_cell_value else: dataset_row[col['name']] = validate_table_db_cell_value(transfered_column, converted_cell_value) @@ -642,59 +642,34 @@ def append_dst_rows(dst_dtable_uuid, dst_table_name, to_be_appended_rows, dst_dt } -def update_dst_rows(dst_dtable_uuid, dst_table_name, to_be_updated_rows, dst_dtable_db_api, dst_dtable_server_api, to_archive): - if to_archive: - step = 10000 - for i in range(0, len(to_be_updated_rows), step): - updates = [] - for row in to_be_updated_rows[i: i+step]: - row_id = row.pop('_id', None) - updates.append({ - 'row_id': row_id, - 'row': row - }) - try: - dst_dtable_db_api.batch_update_rows(dst_table_name, updates) - except Exception as e: - logger.error('sync dataset update rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e) - return { - 'dst_table_id': None, - 'error_msg': 'update rows error', - 'task_status_code': 500 - } - else: - step = INSERT_UPDATE_ROWS_LIMIT - for i in range(0, len(to_be_updated_rows), step): - updates = [{ - 'row_id': row['_id'], +def update_dst_rows(dst_dtable_uuid, dst_table_name, to_be_updated_rows, dst_dtable_db_api): + step = 10000 + for i in range(0, len(to_be_updated_rows), step): + updates = [] + for row in to_be_updated_rows[i: i+step]: + row_id = row.pop('_id', None) + updates.append({ + 'row_id': row_id, 'row': row - } for row in to_be_updated_rows[i: i+step]] - try: - dst_dtable_server_api.batch_update_rows(dst_table_name, updates, need_convert_back=False) - except Exception as e: - logger.error('sync dataset update rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e) - return { - 'dst_table_id': None, - 'error_msg': 'update rows error', - 'task_status_code': 500 - } + }) + try: + dst_dtable_db_api.batch_update_rows(dst_table_name, updates) + except Exception as e: + logger.error('sync dataset update rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e) + return { + 'dst_table_id': None, + 'error_msg': 'update rows error', + 'task_status_code': 500 + } -def delete_dst_rows(dst_dtable_uuid, dst_table_name, to_be_deleted_row_ids, dst_dtable_db_api, dst_dtable_server_api, to_archive): - if to_archive: - step = 10000 - for i in range(0, len(to_be_deleted_row_ids), step): - try: - dst_dtable_db_api.batch_delete_rows(dst_table_name, to_be_deleted_row_ids[i: i+step]) - except Exception as e: - logger.error('sync dataset delete rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e) - else: - step = DELETE_ROWS_LIMIT - for i in range(0, len(to_be_deleted_row_ids), step): - try: - dst_dtable_server_api.batch_delete_rows(dst_table_name, to_be_deleted_row_ids[i: i+step]) - except Exception as e: - logger.error('sync dataset delete rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e) +def delete_dst_rows(dst_dtable_uuid, dst_table_name, to_be_deleted_row_ids, dst_dtable_db_api): + step = 10000 + for i in range(0, len(to_be_deleted_row_ids), step): + try: + dst_dtable_db_api.batch_delete_rows(dst_table_name, to_be_deleted_row_ids[i: i+step]) + except Exception as e: + logger.error('sync dataset delete rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e) def import_sync_CDS(context): @@ -832,7 +807,7 @@ def import_sync_CDS(context): # delete dst to-be-deleted-rows logger.debug('will delete %s rows', len(to_be_deleted_rows_id_set)) - delete_dst_rows(dst_dtable_uuid, dst_table_name, list(to_be_deleted_rows_id_set), dst_dtable_db_api, dst_dtable_server_api, to_archive) + delete_dst_rows(dst_dtable_uuid, dst_table_name, list(to_be_deleted_rows_id_set), dst_dtable_db_api) query_columns = ', '.join(['_id'] + ["`%s`" % col['name'] for col in final_columns]) @@ -843,7 +818,7 @@ def import_sync_CDS(context): logger.debug('to_be_updated_rows_id_list i: %s step: %s', i, step) ## fetch src to-be-updated-rows rows_id_str = ', '.join(["'%s'" % row_id for row_id in to_be_updated_rows_id_list[i: i+step]]) - sql = f"SELECT {query_columns} FROM `{src_table_name}` WHERE _id IN ({rows_id_str}) LIMIT {step}" + sql = f"SELECT {query_columns} FROM `{src_table_name}` WHERE `_id` IN ({rows_id_str}) LIMIT {step}" try: src_rows = src_dtable_db_api.query(sql, convert=False, server_only=server_only) except Exception as e: @@ -867,9 +842,9 @@ def import_sync_CDS(context): } ## update - to_be_updated_rows, _, _ = generate_synced_rows(src_rows, src_columns, final_columns, dst_rows=dst_rows, to_archive=to_archive) + to_be_updated_rows, _, _ = generate_synced_rows(src_rows, src_columns, final_columns, dst_rows=dst_rows, to_dtable_db=True) logger.debug('step src update-rows: %s to-be-updated-rows: %s', len(to_be_updated_rows_id_list[i: i+step]), len(to_be_updated_rows)) - error_resp = update_dst_rows(dst_dtable_uuid, dst_table_name, to_be_updated_rows, dst_dtable_db_api, dst_dtable_server_api, to_archive) + error_resp = update_dst_rows(dst_dtable_uuid, dst_table_name, to_be_updated_rows, dst_dtable_db_api) if error_resp: return error_resp @@ -902,7 +877,7 @@ def import_sync_CDS(context): 'task_status_code': 500 } src_rows = sorted(src_rows, key=lambda row: step_row_sort_dict[row['_id']]) - _, to_be_appended_rows, _ = generate_synced_rows(src_rows, src_columns, final_columns, [], to_archive=to_archive) + _, to_be_appended_rows, _ = generate_synced_rows(src_rows, src_columns, final_columns, [], to_dtable_db=to_archive) error_resp = append_dst_rows(dst_dtable_uuid, dst_table_name, to_be_appended_rows, dst_dtable_db_api, dst_dtable_server_api, to_archive=to_archive) if error_resp: return error_resp diff --git a/dtable_events/common_dataset/common_dataset_syncer.py b/dtable_events/common_dataset/common_dataset_syncer.py index d9876a30..e5e5054d 100644 --- a/dtable_events/common_dataset/common_dataset_syncer.py +++ b/dtable_events/common_dataset/common_dataset_syncer.py @@ -80,7 +80,7 @@ def gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_ if not src_table: set_common_dataset_invalid(dataset_id, db_session) set_common_dataset_sync_invalid(dataset_sync_id, db_session) - logging.error('Source table not found.') + logging.warning('Source table not found.') return None for view in src_table.get('views', []): if view['_id'] == src_view_id: @@ -89,12 +89,11 @@ def gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_ if not src_view: set_common_dataset_invalid(dataset_id, db_session) set_common_dataset_sync_invalid(dataset_sync_id, db_session) - logging.error('Source view not found.') + logging.warning('Source view not found.') return None - src_columns = [col for col in src_table.get('columns', []) if col not in src_view.get('hidden_columns', [])] + src_columns = [col for col in src_table.get('columns', []) if col['key'] not in src_view.get('hidden_columns', [])] - src_enable_archive = (src_dtable_metadata.get('settings') or {}).get('enable_archive', False) src_version = src_dtable_metadata.get('version') dst_table = None @@ -104,17 +103,14 @@ def gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_ dst_table = table break if not dst_table: - set_common_dataset_invalid(dataset_id, db_session) set_common_dataset_sync_invalid(dataset_sync_id, db_session) - logging.error('Destination table not found.') + logging.warning('Destination table not found.') return None return { 'src_table_name': src_table['name'], 'src_view_name': src_view['name'], - 'src_view_type': src_view.get('type', 'table'), 'src_columns': src_columns, - 'src_enable_archive': src_enable_archive, 'src_version': src_version, 'dst_table_name': dst_table['name'] if dst_table else None, 'dst_columns': dst_table['columns'] if dst_table else None @@ -161,7 +157,7 @@ def update_sync_time_and_version(db_session, update_map): def check_common_dataset(db_session): - dataset_sync_list = list_pending_common_dataset_syncs(db_session) + dataset_sync_list = list(list_pending_common_dataset_syncs(db_session)) sync_count = 0 dataset_update_map = {} for dataset_sync in dataset_sync_list: