Skip to content

Commit

Permalink
added feature to scale jobs without admission controller
Browse files Browse the repository at this point in the history
  • Loading branch information
samuel-esp committed Jul 17, 2024
1 parent f2586bc commit a82b7d2
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 7 deletions.
1 change: 1 addition & 0 deletions chart/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ rules:
- batch
resources:
- cronjobs
- jobs
verbs:
- get
- watch
Expand Down
19 changes: 13 additions & 6 deletions kube_downscaler/scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ def pods_force_uptime(api, namespace: str):
return True
return False

def scale_jobs_without_admission_controller(plural, admission_controller):
return plural == "jobs" and admission_controller == ""

def is_stack_deployment(resource: NamespacedAPIObject) -> bool:
if resource.kind == Deployment.kind and resource.version == Deployment.version:
Expand Down Expand Up @@ -195,7 +197,7 @@ def ignore_resource(resource: NamespacedAPIObject, now: datetime.datetime) -> bo
def get_replicas(
resource: NamespacedAPIObject, original_replicas: Optional[int], uptime: str
) -> int:
if resource.kind == "CronJob":
if resource.kind in ["CronJob", "Job"]:
suspended = resource.obj["spec"]["suspend"]
replicas = 0 if suspended else 1
state = "suspended" if suspended else "not suspended"
Expand Down Expand Up @@ -397,12 +399,12 @@ def scale_up(
f"Unsuspending {resource.kind} {resource.namespace}/{resource.name} (uptime: {uptime}, downtime: {downtime})"
)
event_message = "Unsuspending DaemonSet"
elif resource.kind == "CronJob":
elif resource.kind in ["CronJob", "Job"]:
resource.obj["spec"]["suspend"] = False
logger.info(
f"Unsuspending {resource.kind} {resource.namespace}/{resource.name} (uptime: {uptime}, downtime: {downtime})"
)
event_message = "Unsuspending CronJob"
event_message = f"Unsuspending {resource.kind}"
elif resource.kind == "PodDisruptionBudget":
if "minAvailable" in resource.obj["spec"]:
resource.obj["spec"]["minAvailable"] = original_replicas
Expand Down Expand Up @@ -468,12 +470,12 @@ def scale_down(
f"Suspending {resource.kind} {resource.namespace}/{resource.name} (uptime: {uptime}, downtime: {downtime})"
)
event_message = "Suspending DaemonSet"
elif resource.kind == "CronJob":
elif resource.kind in ["CronJob", "Job"]:
resource.obj["spec"]["suspend"] = True
logger.info(
f"Suspending {resource.kind} {resource.namespace}/{resource.name} (uptime: {uptime}, downtime: {downtime})"
)
event_message = "Suspending CronJob"
event_message = f"Suspending {resource.kind}"
elif resource.kind == "PodDisruptionBudget":
if "minAvailable" in resource.obj["spec"]:
resource.obj["spec"]["minAvailable"] = target_replicas
Expand Down Expand Up @@ -836,6 +838,11 @@ def autoscale_resources(
f"{resource.kind} {resource.namespace}/{resource.name} was excluded (name matches exclusion list)"
)
continue
if resource.kind == 'Job' and 'ownerReferences' in resource.metadata:
logger.debug(
f"{resource.kind} {resource.namespace}/{resource.name} was excluded (Job with ownerReferences)"
)
continue
resources_by_namespace[resource.namespace].append(resource)
except requests.HTTPError as e:
if e.response.status_code == 404:
Expand Down Expand Up @@ -1165,7 +1172,7 @@ def scale(
for clazz in RESOURCE_CLASSES:
plural = clazz.endpoint
if plural in include_resources:
if plural != "jobs":
if scale_jobs_without_admission_controller(plural, admission_controller) or plural != "jobs":
autoscale_resources(
api,
clazz,
Expand Down
148 changes: 147 additions & 1 deletion tests/test_scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,9 +893,9 @@ def get(url, version, **kwargs):
include_resources=include_resources,
exclude_namespaces=[],
exclude_deployments=[],
admission_controller="",
dry_run=False,
grace_period=300,
admission_controller="",
downtime_replicas=0,
enable_events=True,
matching_labels=frozenset([re.compile("")]),
Expand Down Expand Up @@ -993,6 +993,152 @@ def get(url, version, **kwargs):
}
assert json.loads(api.patch.call_args[1]["data"]) == patch_data

def test_scaler_job_suspend_without_admission_controller(monkeypatch):
api = MagicMock()
monkeypatch.setattr(
"kube_downscaler.scaler.helper.get_kube_api", MagicMock(return_value=api)
)
monkeypatch.setattr(
"kube_downscaler.scaler.helper.add_event", MagicMock(return_value=None)
)

def get(url, version, **kwargs):
if url == "pods":
data = {"items": []}
elif url == "jobs":
data = {
"items": [
{
"metadata": {
"name": "job-1",
"namespace": "default",
"creationTimestamp": "2019-03-01T16:38:00Z",
},
"spec": {"suspend": False},
},
]
}
elif url == "namespaces/default":
data = {"metadata": {"annotations": {"downscaler/uptime": "never"}}}
# data = {'metadata': {}}
else:
raise Exception(f"unexpected call: {url}, {version}, {kwargs}")

response = MagicMock()
response.json.return_value = data
return response

api.get = get

include_resources = frozenset(["jobs"])
scale(
namespace=None,
upscale_period="never",
downscale_period="never",
default_uptime="never",
default_downtime="always",
include_resources=include_resources,
exclude_namespaces=[],
exclude_deployments=[],
dry_run=False,
grace_period=300,
admission_controller="",
downtime_replicas=0,
enable_events=True,
matching_labels=frozenset([re.compile("")]),
)

assert api.patch.call_count == 1
assert api.patch.call_args[1]["url"] == "/jobs/job-1"

patch_data = {
"metadata": {
"name": "job-1",
"namespace": "default",
"creationTimestamp": "2019-03-01T16:38:00Z",
"annotations": {ORIGINAL_REPLICAS_ANNOTATION: "1"},
},
"spec": {"suspend": True},
}
assert json.loads(api.patch.call_args[1]["data"]) == patch_data


def test_scaler_job_unsuspend_without_admission_controller(monkeypatch):
api = MagicMock()
monkeypatch.setattr(
"kube_downscaler.scaler.helper.get_kube_api", MagicMock(return_value=api)
)
monkeypatch.setattr(
"kube_downscaler.scaler.helper.add_event", MagicMock(return_value=None)
)

def get(url, version, **kwargs):
if url == "pods":
data = {"items": []}
elif url == "jobs":
data = {
"items": [
{
"metadata": {
"name": "job-1",
"namespace": "default",
"creationTimestamp": "2019-03-01T16:38:00Z",
"annotations": {ORIGINAL_REPLICAS_ANNOTATION: "1"},
},
"spec": {"suspend": True},
},
]
}
elif url == "namespaces/default":
data = {
"metadata": {
"annotations": {
"downscaler/uptime": "always",
"downscaler/downtime": "never",
}
}
}
# data = {'metadata': {}}
else:
raise Exception(f"unexpected call: {url}, {version}, {kwargs}")

response = MagicMock()
response.json.return_value = data
return response

api.get = get

include_resources = frozenset(["jobs"])
scale(
namespace=None,
upscale_period="never",
downscale_period="never",
default_uptime="never",
default_downtime="always",
include_resources=include_resources,
exclude_namespaces=[],
exclude_deployments=[],
matching_labels=frozenset([re.compile("")]),
dry_run=False,
grace_period=300,
admission_controller="",
downtime_replicas=0,
enable_events=True,
)

assert api.patch.call_count == 1
assert api.patch.call_args[1]["url"] == "/jobs/job-1"

patch_data = {
"metadata": {
"name": "job-1",
"namespace": "default",
"creationTimestamp": "2019-03-01T16:38:00Z",
"annotations": {ORIGINAL_REPLICAS_ANNOTATION: None},
},
"spec": {"suspend": False},
}
assert json.loads(api.patch.call_args[1]["data"]) == patch_data

def test_scaler_downscale_period_no_error(monkeypatch, caplog):
api = MagicMock()
Expand Down

0 comments on commit a82b7d2

Please sign in to comment.