Skip to content

Commit

Permalink
update/delete rows use dtable-db api
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexCXC committed Mar 1, 2023
1 parent 173a3f6 commit a38891e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 71 deletions.
99 changes: 37 additions & 62 deletions dtable_events/common_dataset/common_dataset_sync_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def generate_synced_columns(src_columns, dst_columns=None):
return to_be_updated_columns, to_be_appended_columns, None


def generate_synced_rows(converted_rows, src_columns, synced_columns, dst_rows=None, to_archive=False):
def generate_synced_rows(converted_rows, src_columns, synced_columns, dst_rows=None, to_dtable_db=False):
"""
generate synced rows divided into `rows to be updated`, `rows to be appended` and `rows to be deleted`
return: to_be_updated_rows, to_be_appended_rows, to_be_deleted_row_ids
Expand All @@ -261,7 +261,7 @@ def generate_synced_rows(converted_rows, src_columns, synced_columns, dst_rows=N
to_be_deleted_row_ids.append(row_id)
continue

update_row = generate_single_row(converted_row, src_columns, synced_columns_dict, dst_row=row, to_archive=to_archive)
update_row = generate_single_row(converted_row, src_columns, synced_columns_dict, dst_row=row, to_dtable_db=to_dtable_db)
if update_row:
update_row['_id'] = row_id
to_be_updated_rows.append(update_row)
Expand All @@ -271,7 +271,7 @@ def generate_synced_rows(converted_rows, src_columns, synced_columns, dst_rows=N
row_id = converted_row.get('_id')
if transfered_row_ids.get(row_id):
continue
append_row = generate_single_row(converted_row, src_columns, synced_columns_dict, dst_row=None, to_archive=to_archive)
append_row = generate_single_row(converted_row, src_columns, synced_columns_dict, dst_row=None, to_dtable_db=to_dtable_db)
if append_row:
append_row['_id'] = row_id
to_be_appended_rows.append(append_row)
Expand Down Expand Up @@ -516,7 +516,7 @@ def is_equal(v1, v2, column_type):
return False


def generate_single_row(converted_row, src_columns, transfered_columns_dict, dst_row=None, to_archive=False):
def generate_single_row(converted_row, src_columns, transfered_columns_dict, dst_row=None, to_dtable_db=False):
"""
generate new single row according to src column type
:param converted_row: {'_id': '', 'column_key_1': '', 'col_key_2'; ''} from dtable-db
Expand All @@ -540,19 +540,19 @@ def generate_single_row(converted_row, src_columns, transfered_columns_dict, dst
if not transfered_column:
continue

if to_archive and col['key'] in ['_creator', '_ctime', '_last_modifier', '_mtime']:
if to_dtable_db and col['key'] in ['_creator', '_ctime', '_last_modifier', '_mtime']:
continue

if op_type == 'update':
converted_cell_value = get_converted_cell_value(converted_cell_value, transfered_column, col)
if not is_equal(dst_row.get(col_key), converted_cell_value, transfered_column['type']):
if not to_archive:
if not to_dtable_db:
dataset_row[col_key] = converted_cell_value
else:
dataset_row[col['name']] = validate_table_db_cell_value(transfered_column, converted_cell_value)
else:
converted_cell_value = get_converted_cell_value(converted_cell_value, transfered_column, col)
if not to_archive:
if not to_dtable_db:
dataset_row[col_key] = converted_cell_value
else:
dataset_row[col['name']] = validate_table_db_cell_value(transfered_column, converted_cell_value)
Expand Down Expand Up @@ -642,59 +642,34 @@ def append_dst_rows(dst_dtable_uuid, dst_table_name, to_be_appended_rows, dst_dt
}


def update_dst_rows(dst_dtable_uuid, dst_table_name, to_be_updated_rows, dst_dtable_db_api, dst_dtable_server_api, to_archive):
if to_archive:
step = 10000
for i in range(0, len(to_be_updated_rows), step):
updates = []
for row in to_be_updated_rows[i: i+step]:
row_id = row.pop('_id', None)
updates.append({
'row_id': row_id,
'row': row
})
try:
dst_dtable_db_api.batch_update_rows(dst_table_name, updates)
except Exception as e:
logger.error('sync dataset update rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e)
return {
'dst_table_id': None,
'error_msg': 'update rows error',
'task_status_code': 500
}
else:
step = INSERT_UPDATE_ROWS_LIMIT
for i in range(0, len(to_be_updated_rows), step):
updates = [{
'row_id': row['_id'],
def update_dst_rows(dst_dtable_uuid, dst_table_name, to_be_updated_rows, dst_dtable_db_api):
step = 10000
for i in range(0, len(to_be_updated_rows), step):
updates = []
for row in to_be_updated_rows[i: i+step]:
row_id = row.pop('_id', None)
updates.append({
'row_id': row_id,
'row': row
} for row in to_be_updated_rows[i: i+step]]
try:
dst_dtable_server_api.batch_update_rows(dst_table_name, updates, need_convert_back=False)
except Exception as e:
logger.error('sync dataset update rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e)
return {
'dst_table_id': None,
'error_msg': 'update rows error',
'task_status_code': 500
}
})
try:
dst_dtable_db_api.batch_update_rows(dst_table_name, updates)
except Exception as e:
logger.error('sync dataset update rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e)
return {
'dst_table_id': None,
'error_msg': 'update rows error',
'task_status_code': 500
}


def delete_dst_rows(dst_dtable_uuid, dst_table_name, to_be_deleted_row_ids, dst_dtable_db_api, dst_dtable_server_api, to_archive):
if to_archive:
step = 10000
for i in range(0, len(to_be_deleted_row_ids), step):
try:
dst_dtable_db_api.batch_delete_rows(dst_table_name, to_be_deleted_row_ids[i: i+step])
except Exception as e:
logger.error('sync dataset delete rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e)
else:
step = DELETE_ROWS_LIMIT
for i in range(0, len(to_be_deleted_row_ids), step):
try:
dst_dtable_server_api.batch_delete_rows(dst_table_name, to_be_deleted_row_ids[i: i+step])
except Exception as e:
logger.error('sync dataset delete rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e)
def delete_dst_rows(dst_dtable_uuid, dst_table_name, to_be_deleted_row_ids, dst_dtable_db_api):
step = 10000
for i in range(0, len(to_be_deleted_row_ids), step):
try:
dst_dtable_db_api.batch_delete_rows(dst_table_name, to_be_deleted_row_ids[i: i+step])
except Exception as e:
logger.error('sync dataset delete rows dst dtable: %s dst table: %s error: %s', dst_dtable_uuid, dst_table_name, e)


def import_sync_CDS(context):
Expand Down Expand Up @@ -832,7 +807,7 @@ def import_sync_CDS(context):

# delete dst to-be-deleted-rows
logger.debug('will delete %s rows', len(to_be_deleted_rows_id_set))
delete_dst_rows(dst_dtable_uuid, dst_table_name, list(to_be_deleted_rows_id_set), dst_dtable_db_api, dst_dtable_server_api, to_archive)
delete_dst_rows(dst_dtable_uuid, dst_table_name, list(to_be_deleted_rows_id_set), dst_dtable_db_api)

query_columns = ', '.join(['_id'] + ["`%s`" % col['name'] for col in final_columns])

Expand All @@ -843,7 +818,7 @@ def import_sync_CDS(context):
logger.debug('to_be_updated_rows_id_list i: %s step: %s', i, step)
## fetch src to-be-updated-rows
rows_id_str = ', '.join(["'%s'" % row_id for row_id in to_be_updated_rows_id_list[i: i+step]])
sql = f"SELECT {query_columns} FROM `{src_table_name}` WHERE _id IN ({rows_id_str}) LIMIT {step}"
sql = f"SELECT {query_columns} FROM `{src_table_name}` WHERE `_id` IN ({rows_id_str}) LIMIT {step}"
try:
src_rows = src_dtable_db_api.query(sql, convert=False, server_only=server_only)
except Exception as e:
Expand All @@ -867,9 +842,9 @@ def import_sync_CDS(context):
}

## update
to_be_updated_rows, _, _ = generate_synced_rows(src_rows, src_columns, final_columns, dst_rows=dst_rows, to_archive=to_archive)
to_be_updated_rows, _, _ = generate_synced_rows(src_rows, src_columns, final_columns, dst_rows=dst_rows, to_dtable_db=True)
logger.debug('step src update-rows: %s to-be-updated-rows: %s', len(to_be_updated_rows_id_list[i: i+step]), len(to_be_updated_rows))
error_resp = update_dst_rows(dst_dtable_uuid, dst_table_name, to_be_updated_rows, dst_dtable_db_api, dst_dtable_server_api, to_archive)
error_resp = update_dst_rows(dst_dtable_uuid, dst_table_name, to_be_updated_rows, dst_dtable_db_api)
if error_resp:
return error_resp

Expand Down Expand Up @@ -902,7 +877,7 @@ def import_sync_CDS(context):
'task_status_code': 500
}
src_rows = sorted(src_rows, key=lambda row: step_row_sort_dict[row['_id']])
_, to_be_appended_rows, _ = generate_synced_rows(src_rows, src_columns, final_columns, [], to_archive=to_archive)
_, to_be_appended_rows, _ = generate_synced_rows(src_rows, src_columns, final_columns, [], to_dtable_db=to_archive)
error_resp = append_dst_rows(dst_dtable_uuid, dst_table_name, to_be_appended_rows, dst_dtable_db_api, dst_dtable_server_api, to_archive=to_archive)
if error_resp:
return error_resp
Expand Down
14 changes: 5 additions & 9 deletions dtable_events/common_dataset/common_dataset_syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_
if not src_table:
set_common_dataset_invalid(dataset_id, db_session)
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.error('Source table not found.')
logging.warning('Source table not found.')
return None
for view in src_table.get('views', []):
if view['_id'] == src_view_id:
Expand All @@ -89,12 +89,11 @@ def gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_
if not src_view:
set_common_dataset_invalid(dataset_id, db_session)
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.error('Source view not found.')
logging.warning('Source view not found.')
return None

src_columns = [col for col in src_table.get('columns', []) if col not in src_view.get('hidden_columns', [])]
src_columns = [col for col in src_table.get('columns', []) if col['key'] not in src_view.get('hidden_columns', [])]

src_enable_archive = (src_dtable_metadata.get('settings') or {}).get('enable_archive', False)
src_version = src_dtable_metadata.get('version')

dst_table = None
Expand All @@ -104,17 +103,14 @@ def gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_
dst_table = table
break
if not dst_table:
set_common_dataset_invalid(dataset_id, db_session)
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.error('Destination table not found.')
logging.warning('Destination table not found.')
return None

return {
'src_table_name': src_table['name'],
'src_view_name': src_view['name'],
'src_view_type': src_view.get('type', 'table'),
'src_columns': src_columns,
'src_enable_archive': src_enable_archive,
'src_version': src_version,
'dst_table_name': dst_table['name'] if dst_table else None,
'dst_columns': dst_table['columns'] if dst_table else None
Expand Down Expand Up @@ -161,7 +157,7 @@ def update_sync_time_and_version(db_session, update_map):


def check_common_dataset(db_session):
dataset_sync_list = list_pending_common_dataset_syncs(db_session)
dataset_sync_list = list(list_pending_common_dataset_syncs(db_session))
sync_count = 0
dataset_update_map = {}
for dataset_sync in dataset_sync_list:
Expand Down

0 comments on commit a38891e

Please sign in to comment.