From 7fa1d4a80d45cc74b232c3f6efb2aed8535abb80 Mon Sep 17 00:00:00 2001 From: wraymo Date: Thu, 9 May 2024 16:12:46 -0400 Subject: [PATCH 01/15] print real-time statistics during compression --- .../scripts/native/compress.py | 56 ++--- .../executor/compress/fs_compression_task.py | 206 ++++++++++++------ .../compress/compression_scheduler.py | 33 +-- .../scheduler/scheduler_data.py | 11 +- 4 files changed, 172 insertions(+), 134 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py index 67ab5a732..f32b24092 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py @@ -1,4 +1,5 @@ import argparse +import datetime import logging import pathlib import shutil @@ -46,12 +47,8 @@ def handle_job_update(db, db_cursor, job_id, no_progress_reporting): f"FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id={job_id}" ) - completion_query = ( - f"SELECT duration, uncompressed_size, compressed_size " - f"FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id={job_id}" - ) - job_last_uncompressed_size = 0 + while True: db_cursor.execute(polling_query) results = db_cursor.fetchall() @@ -63,37 +60,46 @@ def handle_job_update(db, db_cursor, job_id, no_progress_reporting): job_row = results[0] job_status = job_row["status"] - - if not no_progress_reporting: - job_uncompressed_size = job_row["uncompressed_size"] - job_compressed_size = job_row["compressed_size"] - if job_uncompressed_size > 0: - compression_ratio = float(job_uncompressed_size) / job_compressed_size - if job_last_uncompressed_size < job_uncompressed_size: - logger.info( - f"Compressed {pretty_size(job_uncompressed_size)} into " - f"{pretty_size(job_compressed_size)} ({compression_ratio:.2f})" - ) - job_last_uncompressed_size = job_uncompressed_size + current_time = datetime.datetime.now() if CompressionJobStatus.SUCCEEDED == job_status: # All tasks in the job is done - speed = 0 if not no_progress_reporting: - db_cursor.execute(completion_query) - job_row = db_cursor.fetchone() - if job_row["duration"] and job_row["duration"] > 0: - speed = job_row["uncompressed_size"] / job_row["duration"] + job_uncompressed_size = job_row["uncompressed_size"] + job_compressed_size = job_row["compressed_size"] + compression_speed = ( + job_uncompressed_size / (current_time - job_row["start_time"]).total_seconds() + ) + compression_ratio = float(job_uncompressed_size) / job_compressed_size logger.info( - f"Compression finished. Runtime: {job_row['duration']}s. " - f"Speed: {pretty_size(speed)}/s." + f"Compression finished. " + f"Compressed {pretty_size(job_uncompressed_size)} into " + f"{pretty_size(job_compressed_size)} ({compression_ratio:.2f}). " + f"Speed: {pretty_size(compression_speed)}/s." ) break # Done if CompressionJobStatus.FAILED == job_status: # One or more tasks in the job has failed logger.error(f"Compression failed. {job_row['status_msg']}") break # Done - if CompressionJobStatus.RUNNING == job_status or CompressionJobStatus.PENDING == job_status: + + if CompressionJobStatus.RUNNING == job_status: + if not no_progress_reporting: + job_uncompressed_size = job_row["uncompressed_size"] + job_compressed_size = job_row["compressed_size"] + + if job_last_uncompressed_size < job_uncompressed_size: + compression_ratio = float(job_uncompressed_size) / job_compressed_size + compression_speed = (job_uncompressed_size - job_last_uncompressed_size) / ( + current_time - job_row["start_time"] + ).total_seconds() + logger.info( + f"Compressed {pretty_size(job_uncompressed_size)} into " + f"{pretty_size(job_compressed_size)} ({compression_ratio:.2f}). " + f"Speed: {pretty_size(compression_speed)}/s." + ) + job_last_uncompressed_size = job_uncompressed_size + elif CompressionJobStatus.PENDING == job_status: pass # Simply wait another iteration else: error_msg = f"Unhandled CompressionJobStatus: {job_status}" diff --git a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py index cf696b240..a134f76bc 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py @@ -9,21 +9,52 @@ import yaml from celery.app.task import Task from celery.utils.log import get_task_logger -from clp_py_utils.clp_config import Database, StorageEngine +from clp_py_utils.clp_config import ( + COMPRESSION_JOBS_TABLE_NAME, + COMPRESSION_TASKS_TABLE_NAME, + Database, + StorageEngine, +) from clp_py_utils.clp_logging import set_logging_level from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.compress.celery import app from job_orchestration.scheduler.constants import CompressionTaskStatus from job_orchestration.scheduler.job_config import ClpIoConfig, PathsToCompress -from job_orchestration.scheduler.scheduler_data import ( - CompressionTaskFailureResult, - CompressionTaskSuccessResult, -) +from job_orchestration.scheduler.scheduler_data import CompressionTaskResult + # Setup logging logger = get_task_logger(__name__) +def update_compression_task_metadata(db_cursor, task_id, kv): + if not len(kv): + logger.error("Must specify at least one field to update") + raise ValueError + + field_set_expressions = [f'{k}="{v}"' for k, v in kv.items()] + query = f""" + UPDATE {COMPRESSION_TASKS_TABLE_NAME} + SET {", ".join(field_set_expressions)} + WHERE id={task_id} + """ + db_cursor.execute(query) + + +def increment_compression_job_metadata(db_cursor, job_id, kv): + if not len(kv): + logger.error("Must specify at least one field to update") + raise ValueError + + field_set_expressions = [f"{k}={k}+{v}" for k, v in kv.items()] + query = f""" + UPDATE {COMPRESSION_JOBS_TABLE_NAME} + SET {", ".join(field_set_expressions)} + WHERE id={job_id} + """ + db_cursor.execute(query) + + def make_clp_command( clp_home: pathlib.Path, archive_output_dir: pathlib.Path, @@ -98,6 +129,8 @@ def run_clp( task_id: int, tag_ids, paths_to_compress: PathsToCompress, + db_conn, + db_cursor, clp_metadata_db_connection_config, ): """ @@ -112,6 +145,8 @@ def run_clp( :param task_id: :param tag_ids: :param paths_to_compress: PathToCompress + :param db_conn: + :param db_cursor: :param clp_metadata_db_connection_config :return: tuple -- (whether compression was successful, output messages) """ @@ -169,35 +204,18 @@ def run_clp( compression_successful = False proc = subprocess.Popen(compression_cmd, stdout=subprocess.PIPE, stderr=stderr_log_file) - sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_connection_config)) - with closing(sql_adapter.create_connection(True)) as db_conn, closing( - db_conn.cursor(dictionary=True) - ) as db_cursor: - # Compute the total amount of data compressed - last_archive_stats = None - total_uncompressed_size = 0 - total_compressed_size = 0 - while True: - line = proc.stdout.readline() - if not line: - break - stats = json.loads(line.decode("ascii")) - if last_archive_stats is not None and stats["id"] != last_archive_stats["id"]: - # We've started a new archive so add the previous archive's last - # reported size to the total - total_uncompressed_size += last_archive_stats["uncompressed_size"] - total_compressed_size += last_archive_stats["size"] - if tag_ids is not None: - update_tags( - db_conn, - db_cursor, - clp_metadata_db_connection_config["table_prefix"], - last_archive_stats["id"], - tag_ids, - ) - last_archive_stats = stats - if last_archive_stats is not None: - # Add the last archive's last reported size + # Compute the total amount of data compressed + last_archive_stats = None + total_uncompressed_size = 0 + total_compressed_size = 0 + while True: + line = proc.stdout.readline() + if not line: + break + stats = json.loads(line.decode("ascii")) + if last_archive_stats is not None and stats["id"] != last_archive_stats["id"]: + # We've started a new archive so add the previous archive's last + # reported size to the total total_uncompressed_size += last_archive_stats["uncompressed_size"] total_compressed_size += last_archive_stats["size"] if tag_ids is not None: @@ -208,6 +226,19 @@ def run_clp( last_archive_stats["id"], tag_ids, ) + last_archive_stats = stats + if last_archive_stats is not None: + # Add the last archive's last reported size + total_uncompressed_size += last_archive_stats["uncompressed_size"] + total_compressed_size += last_archive_stats["size"] + if tag_ids is not None: + update_tags( + db_conn, + db_cursor, + clp_metadata_db_connection_config["table_prefix"], + last_archive_stats["id"], + tag_ids, + ) # Wait for compression to finish return_code = proc.wait() @@ -256,37 +287,78 @@ def compress( clp_io_config = ClpIoConfig.parse_raw(clp_io_config_json) paths_to_compress = PathsToCompress.parse_raw(paths_to_compress_json) - start_time = datetime.datetime.now() - logger.info(f"[job_id={job_id} task_id={task_id}] COMPRESSION STARTED.") - compression_successful, worker_output = run_clp( - clp_io_config, - pathlib.Path(clp_home_str), - pathlib.Path(data_dir_str), - pathlib.Path(archive_output_dir_str), - pathlib.Path(logs_dir_str), - job_id, - task_id, - tag_ids, - paths_to_compress, - clp_metadata_db_connection_config, - ) - duration = (datetime.datetime.now() - start_time).total_seconds() - logger.info(f"[job_id={job_id} task_id={task_id}] COMPRESSION COMPLETED.") + sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_connection_config)) - if compression_successful: - return CompressionTaskSuccessResult( - task_id=task_id, - status=CompressionTaskStatus.SUCCEEDED, - start_time=start_time, - duration=duration, - total_uncompressed_size=worker_output["total_uncompressed_size"], - total_compressed_size=worker_output["total_compressed_size"], - ).dict() - else: - return CompressionTaskFailureResult( - task_id=task_id, - status=CompressionTaskStatus.FAILED, - start_time=start_time, - duration=duration, - error_message=worker_output["error_message"], - ).dict() + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + start_time = datetime.datetime.now() + logger.info(f"[job_id={job_id} task_id={task_id}] COMPRESSION STARTED.") + compression_successful, worker_output = run_clp( + clp_io_config, + pathlib.Path(clp_home_str), + pathlib.Path(data_dir_str), + pathlib.Path(archive_output_dir_str), + pathlib.Path(logs_dir_str), + job_id, + task_id, + tag_ids, + paths_to_compress, + db_conn, + db_cursor, + clp_metadata_db_connection_config, + ) + duration = (datetime.datetime.now() - start_time).total_seconds() + logger.info(f"[job_id={job_id} task_id={task_id}] COMPRESSION COMPLETED.") + + if compression_successful: + uncompressed_size = worker_output["total_uncompressed_size"] + compressed_size = worker_output["total_compressed_size"] + + update_compression_task_metadata( + db_cursor, + task_id, + dict( + start_time=start_time, + status=CompressionTaskStatus.SUCCEEDED, + partition_uncompressed_size=uncompressed_size, + partition_compressed_size=compressed_size, + duration=duration, + ), + ) + db_conn.commit() + + increment_compression_job_metadata( + db_cursor, + job_id, + dict( + num_tasks_completed=1, + total_uncompressed_size=uncompressed_size, + total_compressed_size=compressed_size, + ), + ) + db_conn.commit() + + return CompressionTaskResult( + task_id=task_id, + status=CompressionTaskStatus.SUCCEEDED, + duration=duration, + ).dict() + else: + update_compression_task_metadata( + db_cursor, + task_id, + dict( + start_time=start_time, + status=CompressionTaskStatus.FAILED, + duration=duration, + ), + ) + db_conn.commit() + + return CompressionTaskResult( + task_id=task_id, + status=CompressionTaskStatus.FAILED, + duration=duration, + error_message=worker_output["error_message"], + ).dict() diff --git a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py index 7280e366b..f1121f139 100644 --- a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py @@ -228,9 +228,7 @@ def poll_running_jobs(db_conn, db_cursor): jobs_to_delete = [] for job_id, job in scheduled_jobs.items(): job_success = True - num_tasks_completed = 0 - uncompressed_size = 0 - compressed_size = 0 + duration = 0.0 error_message = "" try: @@ -245,35 +243,12 @@ def poll_running_jobs(db_conn, db_cursor): task_result = CompressionTaskFailureResult.parse_obj(task_result) job_success = False error_message += f"task {task_result.task_id}: {task_result.error_message}\n" - update_compression_task_metadata( - db_cursor, - task_result.task_id, - dict( - start_time=task_result.start_time, - status=task_result.status, - duration=task_result.duration, - ), - ) logger.error( f"Compression task job-{job_id}-task-{task_result.task_id} failed with" f" error: {task_result.error_message}." ) else: task_result = CompressionTaskSuccessResult.parse_obj(task_result) - num_tasks_completed += 1 - uncompressed_size += task_result.total_uncompressed_size - compressed_size += task_result.total_compressed_size - update_compression_task_metadata( - db_cursor, - task_result.task_id, - dict( - start_time=task_result.start_time, - status=task_result.status, - partition_uncompressed_size=task_result.total_uncompressed_size, - partition_compressed_size=task_result.total_compressed_size, - duration=task_result.duration, - ), - ) logger.info( f"Compression task job-{job_id}-task-{task_result.task_id} completed in" f" {task_result.duration} second(s)." @@ -282,8 +257,6 @@ def poll_running_jobs(db_conn, db_cursor): logger.error(f"Error while getting results for job {job_id}: {e}") job_success = False - db_conn.commit() - if job_success: logger.info(f"Job {job_id} succeeded.") update_compression_job_metadata( @@ -292,9 +265,6 @@ def poll_running_jobs(db_conn, db_cursor): dict( status=CompressionJobStatus.SUCCEEDED, duration=duration, - uncompressed_size=uncompressed_size, - compressed_size=compressed_size, - num_tasks_completed=num_tasks_completed, ), ) else: @@ -305,7 +275,6 @@ def poll_running_jobs(db_conn, db_cursor): dict( status=CompressionJobStatus.FAILED, status_msg=error_message, - num_tasks_completed=num_tasks_completed, ), ) db_conn.commit() diff --git a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py index 0c46cc544..87c1540e7 100644 --- a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py +++ b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py @@ -18,8 +18,8 @@ class CompressionJob(BaseModel): class CompressionTaskResult(BaseModel): task_id: int status: int - start_time: datetime.datetime duration: float + error_message: Optional[str] @validator("status") def valid_status(cls, field): @@ -29,15 +29,6 @@ def valid_status(cls, field): return field -class CompressionTaskFailureResult(CompressionTaskResult): - error_message: str - - -class CompressionTaskSuccessResult(CompressionTaskResult): - total_uncompressed_size: int - total_compressed_size: int - - class InternalJobState(Enum): WAITING_FOR_REDUCER = auto() WAITING_FOR_DISPATCH = auto() From 8c65963815105f77b66fa5cc9c9b70c54c0b8570 Mon Sep 17 00:00:00 2001 From: wraymo Date: Thu, 9 May 2024 16:14:29 -0400 Subject: [PATCH 02/15] remove unused headers --- .../clp_package_utils/scripts/native/compress.py | 2 -- .../job_orchestration/executor/compress/fs_compression_task.py | 1 - 2 files changed, 3 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py index f32b24092..7d83176d2 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py @@ -2,10 +2,8 @@ import datetime import logging import pathlib -import shutil import sys import time -import uuid from contextlib import closing import brotli diff --git a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py index a134f76bc..7684675bc 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py @@ -1,6 +1,5 @@ import datetime import json -import logging import os import pathlib import subprocess From 7aabb511ff6dff13ab0bd5c723cad198a3b108f4 Mon Sep 17 00:00:00 2001 From: wraymo Date: Thu, 9 May 2024 16:17:58 -0400 Subject: [PATCH 03/15] fix a lint error --- .../job_orchestration/executor/compress/fs_compression_task.py | 1 - 1 file changed, 1 deletion(-) diff --git a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py index 7684675bc..a605a0ee9 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py @@ -21,7 +21,6 @@ from job_orchestration.scheduler.job_config import ClpIoConfig, PathsToCompress from job_orchestration.scheduler.scheduler_data import CompressionTaskResult - # Setup logging logger = get_task_logger(__name__) From 5a1bc8a7a0b096b644f32bc6d3cc38d27f44cd0c Mon Sep 17 00:00:00 2001 From: wraymo Date: Thu, 9 May 2024 22:40:25 -0400 Subject: [PATCH 04/15] fix a bug --- .../scheduler/compress/compression_scheduler.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py index f1121f139..e66138976 100644 --- a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py @@ -27,8 +27,7 @@ from job_orchestration.scheduler.job_config import ClpIoConfig from job_orchestration.scheduler.scheduler_data import ( CompressionJob, - CompressionTaskFailureResult, - CompressionTaskSuccessResult, + CompressionTaskResult, ) from pydantic import ValidationError @@ -239,8 +238,8 @@ def poll_running_jobs(db_conn, db_cursor): duration = (datetime.datetime.now() - job.start_time).total_seconds() # Check for finished jobs for task_result in returned_results: - if not task_result["status"] == CompressionTaskStatus.SUCCEEDED: - task_result = CompressionTaskFailureResult.parse_obj(task_result) + task_result = CompressionTaskResult.parse_obj(task_result) + if not task_result.status == CompressionTaskStatus.SUCCEEDED: job_success = False error_message += f"task {task_result.task_id}: {task_result.error_message}\n" logger.error( @@ -248,7 +247,6 @@ def poll_running_jobs(db_conn, db_cursor): f" error: {task_result.error_message}." ) else: - task_result = CompressionTaskSuccessResult.parse_obj(task_result) logger.info( f"Compression task job-{job_id}-task-{task_result.task_id} completed in" f" {task_result.duration} second(s)." From 3f947957c9753104a5e265181c40eae4a0f17b9b Mon Sep 17 00:00:00 2001 From: wraymo Date: Thu, 9 May 2024 22:52:29 -0400 Subject: [PATCH 05/15] fix a bug --- .../executor/compress/fs_compression_task.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py index a605a0ee9..ca4e1a127 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py @@ -331,8 +331,8 @@ def compress( job_id, dict( num_tasks_completed=1, - total_uncompressed_size=uncompressed_size, - total_compressed_size=compressed_size, + uncompressed_size=uncompressed_size, + compressed_size=compressed_size, ), ) db_conn.commit() From 16562475791b214a376418c1c0cd2042bb8e16a8 Mon Sep 17 00:00:00 2001 From: wraymo Date: Thu, 9 May 2024 23:20:36 -0400 Subject: [PATCH 06/15] fix a bug --- .../clp_package_utils/scripts/native/compress.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py index 7d83176d2..e9a313a34 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py @@ -41,7 +41,7 @@ def handle_job_update(db, db_cursor, job_id, no_progress_reporting): ) else: polling_query = ( - f"SELECT status, status_msg, uncompressed_size, compressed_size " + f"SELECT start_time, status, status_msg, uncompressed_size, compressed_size " f"FROM {COMPRESSION_JOBS_TABLE_NAME} WHERE id={job_id}" ) @@ -88,9 +88,10 @@ def handle_job_update(db, db_cursor, job_id, no_progress_reporting): if job_last_uncompressed_size < job_uncompressed_size: compression_ratio = float(job_uncompressed_size) / job_compressed_size - compression_speed = (job_uncompressed_size - job_last_uncompressed_size) / ( - current_time - job_row["start_time"] - ).total_seconds() + compression_speed = ( + job_uncompressed_size + / (current_time - job_row["start_time"]).total_seconds() + ) logger.info( f"Compressed {pretty_size(job_uncompressed_size)} into " f"{pretty_size(job_compressed_size)} ({compression_ratio:.2f}). " From be7e37059867d2a2b09f779182cc1677ce1f4fde Mon Sep 17 00:00:00 2001 From: wraymo Date: Mon, 13 May 2024 16:26:41 -0400 Subject: [PATCH 07/15] update job status at archive level --- .../executor/compress/fs_compression_task.py | 42 +++++++------------ 1 file changed, 15 insertions(+), 27 deletions(-) diff --git a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py index ca4e1a127..d27af4b0a 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py @@ -203,7 +203,7 @@ def run_clp( proc = subprocess.Popen(compression_cmd, stdout=subprocess.PIPE, stderr=stderr_log_file) # Compute the total amount of data compressed - last_archive_stats = None + last_archive_id = "" total_uncompressed_size = 0 total_compressed_size = 0 while True: @@ -211,32 +211,28 @@ def run_clp( if not line: break stats = json.loads(line.decode("ascii")) - if last_archive_stats is not None and stats["id"] != last_archive_stats["id"]: - # We've started a new archive so add the previous archive's last - # reported size to the total - total_uncompressed_size += last_archive_stats["uncompressed_size"] - total_compressed_size += last_archive_stats["size"] + if stats["id"] != last_archive_id: + total_uncompressed_size += stats["uncompressed_size"] + total_compressed_size += stats["size"] if tag_ids is not None: update_tags( db_conn, db_cursor, clp_metadata_db_connection_config["table_prefix"], - last_archive_stats["id"], + stats["id"], tag_ids, ) - last_archive_stats = stats - if last_archive_stats is not None: - # Add the last archive's last reported size - total_uncompressed_size += last_archive_stats["uncompressed_size"] - total_compressed_size += last_archive_stats["size"] - if tag_ids is not None: - update_tags( - db_conn, + increment_compression_job_metadata( db_cursor, - clp_metadata_db_connection_config["table_prefix"], - last_archive_stats["id"], - tag_ids, + job_id, + dict( + uncompressed_size=stats["uncompressed_size"], + compressed_size=stats["size"], + ), ) + db_conn.commit() + + last_archive_id = stats["id"] # Wait for compression to finish return_code = proc.wait() @@ -326,15 +322,7 @@ def compress( ) db_conn.commit() - increment_compression_job_metadata( - db_cursor, - job_id, - dict( - num_tasks_completed=1, - uncompressed_size=uncompressed_size, - compressed_size=compressed_size, - ), - ) + increment_compression_job_metadata(db_cursor, job_id, dict(num_tasks_completed=1)) db_conn.commit() return CompressionTaskResult( From e3dd58f96875f566b2e6b80d40aeb2fe16014919 Mon Sep 17 00:00:00 2001 From: wraymo Date: Mon, 13 May 2024 16:43:57 -0400 Subject: [PATCH 08/15] update partition_uncompressed_size and partition_comppressed_size for failed tasks --- .../executor/compress/fs_compression_task.py | 71 ++++++++----------- 1 file changed, 28 insertions(+), 43 deletions(-) diff --git a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py index d27af4b0a..30f816c7a 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py @@ -250,13 +250,15 @@ def run_clp( # Close stderr log file stderr_log_file.close() + worker_output = { + "total_uncompressed_size": total_uncompressed_size, + "total_compressed_size": total_compressed_size, + } if compression_successful: - return compression_successful, { - "total_uncompressed_size": total_uncompressed_size, - "total_compressed_size": total_compressed_size, - } + return CompressionTaskStatus.SUCCEEDED, worker_output else: - return compression_successful, {"error_message": f"See logs {stderr_log_path}"} + worker_output["error_message"] = f"See logs {stderr_log_path}" + return CompressionTaskStatus.FAILED, worker_output @app.task(bind=True) @@ -288,7 +290,7 @@ def compress( ) as db_cursor: start_time = datetime.datetime.now() logger.info(f"[job_id={job_id} task_id={task_id}] COMPRESSION STARTED.") - compression_successful, worker_output = run_clp( + compression_task_status, worker_output = run_clp( clp_io_config, pathlib.Path(clp_home_str), pathlib.Path(data_dir_str), @@ -305,46 +307,29 @@ def compress( duration = (datetime.datetime.now() - start_time).total_seconds() logger.info(f"[job_id={job_id} task_id={task_id}] COMPRESSION COMPLETED.") - if compression_successful: - uncompressed_size = worker_output["total_uncompressed_size"] - compressed_size = worker_output["total_compressed_size"] + update_compression_task_metadata( + db_cursor, + task_id, + dict( + start_time=start_time, + status=compression_task_status, + partition_uncompressed_size=worker_output["total_uncompressed_size"], + partition_compressed_size=worker_output["total_compressed_size"], + duration=duration, + ), + ) + db_conn.commit() - update_compression_task_metadata( - db_cursor, - task_id, - dict( - start_time=start_time, - status=CompressionTaskStatus.SUCCEEDED, - partition_uncompressed_size=uncompressed_size, - partition_compressed_size=compressed_size, - duration=duration, - ), - ) - db_conn.commit() + compression_task_result = CompressionTaskResult( + task_id=task_id, + status=compression_task_status, + duration=duration, + ) + if CompressionTaskStatus.SUCCEEDED == compression_task_status: increment_compression_job_metadata(db_cursor, job_id, dict(num_tasks_completed=1)) db_conn.commit() - - return CompressionTaskResult( - task_id=task_id, - status=CompressionTaskStatus.SUCCEEDED, - duration=duration, - ).dict() else: - update_compression_task_metadata( - db_cursor, - task_id, - dict( - start_time=start_time, - status=CompressionTaskStatus.FAILED, - duration=duration, - ), - ) - db_conn.commit() + compression_task_result.error_message = worker_output["error_message"] - return CompressionTaskResult( - task_id=task_id, - status=CompressionTaskStatus.FAILED, - duration=duration, - error_message=worker_output["error_message"], - ).dict() + return compression_task_result.dict() From 29f2f38d738a63559090e5a7ecac3905ff14d7dc Mon Sep 17 00:00:00 2001 From: wraymo Date: Tue, 14 May 2024 16:52:09 -0400 Subject: [PATCH 09/15] apply suggestions from code review --- .../scripts/native/compress.py | 51 ++++++++----------- .../compress/compression_scheduler.py | 13 ++--- 2 files changed, 28 insertions(+), 36 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py index e9a313a34..6566baf47 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py @@ -8,6 +8,11 @@ import brotli import msgpack +from clp_package_utils.general import ( + CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, + get_clp_home, + validate_and_load_config_file, +) from clp_py_utils.clp_config import COMPRESSION_JOBS_TABLE_NAME from clp_py_utils.pretty_size import pretty_size from clp_py_utils.sql_adapter import SQL_Adapter @@ -17,12 +22,6 @@ ) from job_orchestration.scheduler.job_config import ClpIoConfig, InputConfig, OutputConfig -from clp_package_utils.general import ( - CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, - get_clp_home, - validate_and_load_config_file, -) - # Setup logging # Create logger logger = logging.getLogger(__file__) @@ -34,6 +33,19 @@ logger.addHandler(logging_console_handler) +def print_compression_job_status(job_row, current_time): + job_uncompressed_size = job_row["uncompressed_size"] + job_compressed_size = job_row["compressed_size"] + job_start_time = job_row["start_time"] + compression_ratio = float(job_uncompressed_size) / job_compressed_size + compression_speed = job_uncompressed_size / (current_time - job_start_time).total_seconds() + logger.info( + f"Compressed {pretty_size(job_uncompressed_size)} into " + f"{pretty_size(job_compressed_size)} ({compression_ratio:.2f}). " + f"Speed: {pretty_size(compression_speed)}/s." + ) + + def handle_job_update(db, db_cursor, job_id, no_progress_reporting): if no_progress_reporting: polling_query = ( @@ -63,18 +75,8 @@ def handle_job_update(db, db_cursor, job_id, no_progress_reporting): if CompressionJobStatus.SUCCEEDED == job_status: # All tasks in the job is done if not no_progress_reporting: - job_uncompressed_size = job_row["uncompressed_size"] - job_compressed_size = job_row["compressed_size"] - compression_speed = ( - job_uncompressed_size / (current_time - job_row["start_time"]).total_seconds() - ) - compression_ratio = float(job_uncompressed_size) / job_compressed_size - logger.info( - f"Compression finished. " - f"Compressed {pretty_size(job_uncompressed_size)} into " - f"{pretty_size(job_compressed_size)} ({compression_ratio:.2f}). " - f"Speed: {pretty_size(compression_speed)}/s." - ) + logger.info("Compression finished.") + print_compression_job_status(job_row, current_time) break # Done if CompressionJobStatus.FAILED == job_status: # One or more tasks in the job has failed @@ -84,19 +86,8 @@ def handle_job_update(db, db_cursor, job_id, no_progress_reporting): if CompressionJobStatus.RUNNING == job_status: if not no_progress_reporting: job_uncompressed_size = job_row["uncompressed_size"] - job_compressed_size = job_row["compressed_size"] - if job_last_uncompressed_size < job_uncompressed_size: - compression_ratio = float(job_uncompressed_size) / job_compressed_size - compression_speed = ( - job_uncompressed_size - / (current_time - job_row["start_time"]).total_seconds() - ) - logger.info( - f"Compressed {pretty_size(job_uncompressed_size)} into " - f"{pretty_size(job_compressed_size)} ({compression_ratio:.2f}). " - f"Speed: {pretty_size(compression_speed)}/s." - ) + print_compression_job_status(job_row, current_time) job_last_uncompressed_size = job_uncompressed_size elif CompressionJobStatus.PENDING == job_status: pass # Simply wait another iteration diff --git a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py index e66138976..62b7a27fc 100644 --- a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py @@ -239,18 +239,19 @@ def poll_running_jobs(db_conn, db_cursor): # Check for finished jobs for task_result in returned_results: task_result = CompressionTaskResult.parse_obj(task_result) - if not task_result.status == CompressionTaskStatus.SUCCEEDED: + if task_result.status == CompressionTaskStatus.SUCCEEDED: + logger.info( + f"Compression task job-{job_id}-task-{task_result.task_id} completed in" + f" {task_result.duration} second(s)." + ) + else: job_success = False error_message += f"task {task_result.task_id}: {task_result.error_message}\n" logger.error( f"Compression task job-{job_id}-task-{task_result.task_id} failed with" f" error: {task_result.error_message}." ) - else: - logger.info( - f"Compression task job-{job_id}-task-{task_result.task_id} completed in" - f" {task_result.duration} second(s)." - ) + except Exception as e: logger.error(f"Error while getting results for job {job_id}: {e}") job_success = False From 088924e004b00ac2ea26da69475d53005b344354 Mon Sep 17 00:00:00 2001 From: wraymo Date: Tue, 14 May 2024 17:02:53 -0400 Subject: [PATCH 10/15] fix lint error --- .../clp_package_utils/scripts/native/compress.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py index 6566baf47..f037be630 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py @@ -8,11 +8,6 @@ import brotli import msgpack -from clp_package_utils.general import ( - CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, - get_clp_home, - validate_and_load_config_file, -) from clp_py_utils.clp_config import COMPRESSION_JOBS_TABLE_NAME from clp_py_utils.pretty_size import pretty_size from clp_py_utils.sql_adapter import SQL_Adapter @@ -22,6 +17,12 @@ ) from job_orchestration.scheduler.job_config import ClpIoConfig, InputConfig, OutputConfig +from clp_package_utils.general import ( + CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, + get_clp_home, + validate_and_load_config_file, +) + # Setup logging # Create logger logger = logging.getLogger(__file__) From 897db91889ad3c3cbf8d3cf298ebc47d1ea7202e Mon Sep 17 00:00:00 2001 From: wraymo Date: Wed, 15 May 2024 10:49:39 -0400 Subject: [PATCH 11/15] apply suggestions from code review --- .../executor/compress/fs_compression_task.py | 129 ++++++++++-------- 1 file changed, 75 insertions(+), 54 deletions(-) diff --git a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py index 30f816c7a..1f9701e96 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py @@ -53,6 +53,25 @@ def increment_compression_job_metadata(db_cursor, job_id, kv): db_cursor.execute(query) +def update_tags(db_cursor, table_prefix, archive_id, tag_ids): + db_cursor.executemany( + f"INSERT INTO {table_prefix}archive_tags (archive_id, tag_id) VALUES (%s, %s)", + [(archive_id, tag_id) for tag_id in tag_ids], + ) + + +def update_job_metadata_and_tags(db_cursor, job_id, table_prefix, tag_ids, archive_stats): + update_tags(db_cursor, table_prefix, archive_stats["id"], tag_ids) + increment_compression_job_metadata( + db_cursor, + job_id, + dict( + uncompressed_size=archive_stats["uncompressed_size"], + compressed_size=archive_stats["size"], + ), + ) + + def make_clp_command( clp_home: pathlib.Path, archive_output_dir: pathlib.Path, @@ -109,14 +128,6 @@ def make_clp_s_command( return compression_cmd -def update_tags(db_conn, db_cursor, table_prefix, archive_id, tag_ids): - db_cursor.executemany( - f"INSERT INTO {table_prefix}archive_tags (archive_id, tag_id) VALUES (%s, %s)", - [(archive_id, tag_id) for tag_id in tag_ids], - ) - db_conn.commit() - - def run_clp( clp_config: ClpIoConfig, clp_home: pathlib.Path, @@ -127,8 +138,7 @@ def run_clp( task_id: int, tag_ids, paths_to_compress: PathsToCompress, - db_conn, - db_cursor, + sql_adapter: SQL_Adapter, clp_metadata_db_connection_config, ): """ @@ -143,8 +153,7 @@ def run_clp( :param task_id: :param tag_ids: :param paths_to_compress: PathToCompress - :param db_conn: - :param db_cursor: + :param sql_adapter: :param clp_metadata_db_connection_config :return: tuple -- (whether compression was successful, output messages) """ @@ -206,34 +215,47 @@ def run_clp( last_archive_id = "" total_uncompressed_size = 0 total_compressed_size = 0 + last_archive_stats = None while True: line = proc.stdout.readline() if not line: break stats = json.loads(line.decode("ascii")) - if stats["id"] != last_archive_id: + if last_archive_stats is not None and stats["id"] != last_archive_stats["id"]: + # We've started a new archive so add the previous archive's last + # reported size to the total total_uncompressed_size += stats["uncompressed_size"] total_compressed_size += stats["size"] - if tag_ids is not None: - update_tags( - db_conn, + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + update_job_metadata_and_tags( db_cursor, + job_id, clp_metadata_db_connection_config["table_prefix"], - stats["id"], tag_ids, + last_archive_stats, ) - increment_compression_job_metadata( + db_conn.commit() + + last_archive_stats = stats + + if last_archive_stats is not None: + # Add the last archive's last reported size + total_uncompressed_size += last_archive_stats["uncompressed_size"] + total_compressed_size += last_archive_stats["size"] + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + update_job_metadata_and_tags( db_cursor, job_id, - dict( - uncompressed_size=stats["uncompressed_size"], - compressed_size=stats["size"], - ), + clp_metadata_db_connection_config["table_prefix"], + tag_ids, + last_archive_stats, ) db_conn.commit() - last_archive_id = stats["id"] - # Wait for compression to finish return_code = proc.wait() if 0 != return_code: @@ -285,28 +307,27 @@ def compress( sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_connection_config)) + start_time = datetime.datetime.now() + logger.info(f"[job_id={job_id} task_id={task_id}] COMPRESSION STARTED.") + compression_task_status, worker_output = run_clp( + clp_io_config, + pathlib.Path(clp_home_str), + pathlib.Path(data_dir_str), + pathlib.Path(archive_output_dir_str), + pathlib.Path(logs_dir_str), + job_id, + task_id, + tag_ids, + paths_to_compress, + sql_adapter, + clp_metadata_db_connection_config, + ) + duration = (datetime.datetime.now() - start_time).total_seconds() + logger.info(f"[job_id={job_id} task_id={task_id}] COMPRESSION COMPLETED.") + with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) ) as db_cursor: - start_time = datetime.datetime.now() - logger.info(f"[job_id={job_id} task_id={task_id}] COMPRESSION STARTED.") - compression_task_status, worker_output = run_clp( - clp_io_config, - pathlib.Path(clp_home_str), - pathlib.Path(data_dir_str), - pathlib.Path(archive_output_dir_str), - pathlib.Path(logs_dir_str), - job_id, - task_id, - tag_ids, - paths_to_compress, - db_conn, - db_cursor, - clp_metadata_db_connection_config, - ) - duration = (datetime.datetime.now() - start_time).total_seconds() - logger.info(f"[job_id={job_id} task_id={task_id}] COMPRESSION COMPLETED.") - update_compression_task_metadata( db_cursor, task_id, @@ -320,16 +341,16 @@ def compress( ) db_conn.commit() - compression_task_result = CompressionTaskResult( - task_id=task_id, - status=compression_task_status, - duration=duration, - ) + compression_task_result = CompressionTaskResult( + task_id=task_id, + status=compression_task_status, + duration=duration, + ) - if CompressionTaskStatus.SUCCEEDED == compression_task_status: - increment_compression_job_metadata(db_cursor, job_id, dict(num_tasks_completed=1)) - db_conn.commit() - else: - compression_task_result.error_message = worker_output["error_message"] + if CompressionTaskStatus.SUCCEEDED == compression_task_status: + increment_compression_job_metadata(db_cursor, job_id, dict(num_tasks_completed=1)) + db_conn.commit() + else: + compression_task_result.error_message = worker_output["error_message"] - return compression_task_result.dict() + return compression_task_result.dict() From 31744b208480000185cd37422a7fb81c1be892ca Mon Sep 17 00:00:00 2001 From: wraymo Date: Wed, 15 May 2024 10:53:44 -0400 Subject: [PATCH 12/15] fix some bugs --- .../executor/compress/fs_compression_task.py | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py index 1f9701e96..e00632a7b 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py @@ -153,7 +153,7 @@ def run_clp( :param task_id: :param tag_ids: :param paths_to_compress: PathToCompress - :param sql_adapter: + :param sql_adapter: SQL_Adapter :param clp_metadata_db_connection_config :return: tuple -- (whether compression was successful, output messages) """ @@ -224,8 +224,8 @@ def run_clp( if last_archive_stats is not None and stats["id"] != last_archive_stats["id"]: # We've started a new archive so add the previous archive's last # reported size to the total - total_uncompressed_size += stats["uncompressed_size"] - total_compressed_size += stats["size"] + total_uncompressed_size += last_archive_stats["uncompressed_size"] + total_compressed_size += last_archive_stats["size"] with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) ) as db_cursor: @@ -341,16 +341,16 @@ def compress( ) db_conn.commit() - compression_task_result = CompressionTaskResult( - task_id=task_id, - status=compression_task_status, - duration=duration, - ) + compression_task_result = CompressionTaskResult( + task_id=task_id, + status=compression_task_status, + duration=duration, + ) - if CompressionTaskStatus.SUCCEEDED == compression_task_status: - increment_compression_job_metadata(db_cursor, job_id, dict(num_tasks_completed=1)) - db_conn.commit() - else: - compression_task_result.error_message = worker_output["error_message"] + if CompressionTaskStatus.SUCCEEDED == compression_task_status: + increment_compression_job_metadata(db_cursor, job_id, dict(num_tasks_completed=1)) + db_conn.commit() + else: + compression_task_result.error_message = worker_output["error_message"] - return compression_task_result.dict() + return compression_task_result.dict() From 943960809bfc606a948b06d210bb4cb4b9a36fc7 Mon Sep 17 00:00:00 2001 From: wraymo Date: Wed, 15 May 2024 11:07:18 -0400 Subject: [PATCH 13/15] fix a bug --- .../job_orchestration/executor/compress/fs_compression_task.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py index e00632a7b..6345f3387 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py @@ -61,7 +61,8 @@ def update_tags(db_cursor, table_prefix, archive_id, tag_ids): def update_job_metadata_and_tags(db_cursor, job_id, table_prefix, tag_ids, archive_stats): - update_tags(db_cursor, table_prefix, archive_stats["id"], tag_ids) + if tag_ids is not None: + update_tags(db_cursor, table_prefix, archive_stats["id"], tag_ids) increment_compression_job_metadata( db_cursor, job_id, From f98a2057579c450ef2633e1f1c94f3b1140753dd Mon Sep 17 00:00:00 2001 From: wraymo Date: Wed, 15 May 2024 14:10:17 -0400 Subject: [PATCH 14/15] apply suggestions from code review --- .../scripts/native/compress.py | 2 +- .../executor/compress/fs_compression_task.py | 28 +++++++++---------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py index f037be630..a08602007 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py @@ -42,7 +42,7 @@ def print_compression_job_status(job_row, current_time): compression_speed = job_uncompressed_size / (current_time - job_start_time).total_seconds() logger.info( f"Compressed {pretty_size(job_uncompressed_size)} into " - f"{pretty_size(job_compressed_size)} ({compression_ratio:.2f}). " + f"{pretty_size(job_compressed_size)} ({compression_ratio:.2f}x). " f"Speed: {pretty_size(compression_speed)}/s." ) diff --git a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py index 6345f3387..6aa0725e9 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py @@ -28,7 +28,7 @@ def update_compression_task_metadata(db_cursor, task_id, kv): if not len(kv): logger.error("Must specify at least one field to update") - raise ValueError + raise ValueError("Must specify at least one field to update") field_set_expressions = [f'{k}="{v}"' for k, v in kv.items()] query = f""" @@ -42,7 +42,7 @@ def update_compression_task_metadata(db_cursor, task_id, kv): def increment_compression_job_metadata(db_cursor, job_id, kv): if not len(kv): logger.error("Must specify at least one field to update") - raise ValueError + raise ValueError("Must specify at least one field to update") field_set_expressions = [f"{k}={k}+{v}" for k, v in kv.items()] query = f""" @@ -213,10 +213,9 @@ def run_clp( proc = subprocess.Popen(compression_cmd, stdout=subprocess.PIPE, stderr=stderr_log_file) # Compute the total amount of data compressed - last_archive_id = "" + last_archive_stats = None total_uncompressed_size = 0 total_compressed_size = 0 - last_archive_stats = None while True: line = proc.stdout.readline() if not line: @@ -340,18 +339,17 @@ def compress( duration=duration, ), ) + if CompressionTaskStatus.SUCCEEDED == compression_task_status: + increment_compression_job_metadata(db_cursor, job_id, dict(num_tasks_completed=1)) db_conn.commit() - compression_task_result = CompressionTaskResult( - task_id=task_id, - status=compression_task_status, - duration=duration, - ) + compression_task_result = CompressionTaskResult( + task_id=task_id, + status=compression_task_status, + duration=duration, + ) - if CompressionTaskStatus.SUCCEEDED == compression_task_status: - increment_compression_job_metadata(db_cursor, job_id, dict(num_tasks_completed=1)) - db_conn.commit() - else: - compression_task_result.error_message = worker_output["error_message"] + if CompressionTaskStatus.FAILED == compression_task_status: + compression_task_result.error_message = worker_output["error_message"] - return compression_task_result.dict() + return compression_task_result.dict() From 1738a86d2503aa38154ec5b3e58fcdcdb9281609 Mon Sep 17 00:00:00 2001 From: wraymo Date: Wed, 15 May 2024 14:41:03 -0400 Subject: [PATCH 15/15] apply suggestions from code review --- .../job_orchestration/executor/compress/fs_compression_task.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py index 6aa0725e9..ce88ad185 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py @@ -27,7 +27,6 @@ def update_compression_task_metadata(db_cursor, task_id, kv): if not len(kv): - logger.error("Must specify at least one field to update") raise ValueError("Must specify at least one field to update") field_set_expressions = [f'{k}="{v}"' for k, v in kv.items()] @@ -41,7 +40,6 @@ def update_compression_task_metadata(db_cursor, task_id, kv): def increment_compression_job_metadata(db_cursor, job_id, kv): if not len(kv): - logger.error("Must specify at least one field to update") raise ValueError("Must specify at least one field to update") field_set_expressions = [f"{k}={k}+{v}" for k, v in kv.items()]