Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

formats prefix to not be flowrun every time #400

Merged
merged 8 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion prefect_aws/workers/ecs_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ class ECSJobConfiguration(BaseJobConfiguration):
)
configure_cloudwatch_logs: Optional[bool] = Field(default=None)
cloudwatch_logs_options: Dict[str, str] = Field(default_factory=dict)
cloudwatch_logs_prefix: Optional[str] = Field(default=None)
network_configuration: Dict[str, Any] = Field(default_factory=dict)
stream_output: Optional[bool] = Field(default=None)
task_start_timeout_seconds: int = Field(default=300)
Expand Down Expand Up @@ -507,6 +508,16 @@ class ECSVariables(BaseVariables):
" for available options. "
),
)
cloudwatch_logs_prefix: Optional[str] = Field(
default=None,
description=(
"When `configure_cloudwatch_logs` is enabled, this setting may be used to"
" set a prefix for the log group. If not provided, the default prefix will"
" be `prefect-logs_<work_pool_name>_<deployment_id>`. If"
" `awslogs-stream-prefix` is present in `Cloudwatch logs options` this"
" setting will be ignored."
),
)

network_configuration: Dict[str, Any] = Field(
default_factory=dict,
Expand Down Expand Up @@ -1276,13 +1287,16 @@ def _prepare_task_definition(
container["environment"].remove(item)

if configuration.configure_cloudwatch_logs:
prefix = f"prefect-logs_{self._work_pool_name}_{flow_run.deployment_id}"
container["logConfiguration"] = {
"logDriver": "awslogs",
"options": {
"awslogs-create-group": "true",
"awslogs-group": "prefect",
"awslogs-region": region,
"awslogs-stream-prefix": configuration.name or "prefect",
"awslogs-stream-prefix": (
configuration.cloudwatch_logs_prefix or prefix
),
**configuration.cloudwatch_logs_options,
},
}
Expand Down
3 changes: 2 additions & 1 deletion tests/test_ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1232,8 +1232,8 @@ async def test_cloudwatch_log_options(aws_credentials):
configure_cloudwatch_logs=True,
execution_role_arn="test",
cloudwatch_logs_options={
"awslogs-stream-prefix": "override-prefix",
"max-buffer-size": "2m",
"awslogs-stream-prefix": "override-prefix",
},
)

Expand All @@ -1245,6 +1245,7 @@ async def test_cloudwatch_log_options(aws_credentials):
if container["name"] == "prefect":
# Assert that the 'prefect' container has logging configured with user
# provided options
print(container["logConfiguration"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
print(container["logConfiguration"])

assert container["logConfiguration"] == {
"logDriver": "awslogs",
"options": {
Expand Down
27 changes: 20 additions & 7 deletions tests/workers/test_ecs_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1324,8 +1324,20 @@ async def write_fake_log(task_arn):


@pytest.mark.usefixtures("ecs_mocks")
@pytest.mark.parametrize(
"cloudwatch_logs_options",
[
{
"awslogs-stream-prefix": "override-prefix",
"max-buffer-size": "2m",
},
{
"max-buffer-size": "2m",
},
],
)
async def test_cloudwatch_log_options(
aws_credentials: AwsCredentials, flow_run: FlowRun
aws_credentials: AwsCredentials, flow_run: FlowRun, cloudwatch_logs_options: dict
):
session = aws_credentials.get_boto3_session()
ecs_client = session.client("ecs")
Expand All @@ -1334,12 +1346,10 @@ async def test_cloudwatch_log_options(
aws_credentials=aws_credentials,
configure_cloudwatch_logs=True,
execution_role_arn="test",
cloudwatch_logs_options={
"awslogs-stream-prefix": "override-prefix",
"max-buffer-size": "2m",
},
cloudwatch_logs_options=cloudwatch_logs_options,
)
async with ECSWorker(work_pool_name="test") as worker:
work_pool_name = "test"
async with ECSWorker(work_pool_name=work_pool_name) as worker:
result = await run_then_stop_task(worker, configuration, flow_run)

assert result.status_code == 0
Expand All @@ -1349,6 +1359,9 @@ async def test_cloudwatch_log_options(
task_definition = describe_task_definition(ecs_client, task)

for container in task_definition["containerDefinitions"]:
prefix = f"prefect-logs_{work_pool_name}_{flow_run.deployment_id}"
if cloudwatch_logs_options.get("awslogs-stream-prefix"):
prefix = cloudwatch_logs_options["awslogs-stream-prefix"]
if container["name"] == ECS_DEFAULT_CONTAINER_NAME:
# Assert that the container has logging configured with user
# provided options
Expand All @@ -1358,7 +1371,7 @@ async def test_cloudwatch_log_options(
"awslogs-create-group": "true",
"awslogs-group": "prefect",
"awslogs-region": "us-east-1",
"awslogs-stream-prefix": "override-prefix",
"awslogs-stream-prefix": prefix,
"max-buffer-size": "2m",
},
}
Expand Down
Loading