-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
clp-package: Add support for printing real-time compression statistics. #388
Conversation
Can you summarize the approach taken? I.e., at a high level, what is the code change. |
Updated |
compression_ratio = float(job_uncompressed_size) / job_compressed_size | ||
compression_speed = ( | ||
job_uncompressed_size | ||
/ (current_time - job_row["start_time"]).total_seconds() | ||
) | ||
logger.info( | ||
f"Compressed {pretty_size(job_uncompressed_size)} into " | ||
f"{pretty_size(job_compressed_size)} ({compression_ratio:.2f}). " | ||
f"Speed: {pretty_size(compression_speed)}/s." | ||
) | ||
job_last_uncompressed_size = job_uncompressed_size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's deduplicate this with the block on line 68.
if not task_result["status"] == CompressionTaskStatus.SUCCEEDED: | ||
task_result = CompressionTaskFailureResult.parse_obj(task_result) | ||
task_result = CompressionTaskResult.parse_obj(task_result) | ||
if not task_result.status == CompressionTaskStatus.SUCCEEDED: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if not task_result.status == CompressionTaskStatus.SUCCEEDED: | |
if task_result.status != CompressionTaskStatus.SUCCEEDED: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, this if-else would be simpler if we swapped the cases.
if not line: | ||
break | ||
stats = json.loads(line.decode("ascii")) | ||
if stats["id"] != last_archive_id: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, since an archive may print its stats multiple times (after every segment is created in clp
's case), then we would still need the previous logic that keeps last_archive_stats
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous logic is to check last_archive_stats['id']
, if it's different from current id, we update everything. That's the same as current logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not exactly. In the previous logic if last_archive_stats['id']
was different from the current ID, we would add uncompressed_size
and size
from last_archive_stats
, but in the current code, we're adding the values from stats
.
To see why this is a problem, imagine clp
creates two archives with two segments each, meaning it will print the archive stats 4 times, something like this:
- archive-1-seg-1:
uncompressed_size = 10, size = 1
- archive-1-seg-2:
uncompressed_size = 20, size = 2
- archive-2-seg-1:
uncompressed_size = 5, size = 1
- archive-2-seg-2:
uncompressed_size = 10, size 2
In the current code, when we see the printout of (1), we will do total_uncompressed_size += 10, size += 1
. When we see the printout of (3), we will do total_uncompressed_size += 5, size += 1
. This will give us total_uncompressed_size = 15, size = 2
. But it should be total_uncompressed_size = 30, size = 4
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see your point. uncompressed_size
is cumulative. I thought we could accept the first one, and abandon the rest with the same id.
).dict() | ||
else: | ||
return CompressionTaskFailureResult( | ||
with closing(sql_adapter.create_connection(True)) as db_conn, closing( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think we should try to keep our MySQL connections short, even if frequent; so I would prefer we open a connection only just before we perform a write (currently, this opens a connection before we start compression, which itself could take a long time depending on the dataset and config).
MySQL's blog says that it's capable of handling a lot of short connections, but its default concurrent connection limit is only 151. We should probably benchmark this for ourselves at some point (a long time ago, @haiqi96 had done some scalability benchmarking that showed MySQL struggled with 20 concurrent connections performing inserts), but for now, I think following their advice is the safer option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, my original plan was to use short connection. But I notice in compression scheduler, we maintain a long connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the scheduler's case, it's only maintaining one connection that it's using for polling (among other things), right? In theory we could make it use shorter connections, but there I'm not sure it will make much difference (we should still measure at some point though).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. For the compression task, we read the output from the process and update the database. Do you think we should open a connection each time when we get a new line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could try opening a connection each time we need to update the archive's stats + tags (which would only be every time we finish an archive) and then once at the end of the task.
components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py
Outdated
Show resolved
Hide resolved
logger.error("Must specify at least one field to update") | ||
raise ValueError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.error("Must specify at least one field to update") | |
raise ValueError | |
raise ValueError("Must specify at least one field to update") |
logger.error("Must specify at least one field to update") | ||
raise ValueError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.error("Must specify at least one field to update") | |
raise ValueError | |
raise ValueError("Must specify at least one field to update") |
References
Description
For large jobs, users often experience extended waiting periods until completion. During that time, they have no idea about the compression status. This PR addresses this issue by printing out real-time statistics (compression ratio and speed) during compression. Specifically, this PR introduces the code changes listed below:
CompressionTaskFailureResult
andCompressionTaskSuccessResult
are removed.compressed_size
,uncompressed_size
andnum_tasks_completed
) of compression jobs are updated at the same time.completion_query
is removed and compression statistics are now printed promptly upon their update.Validation performed