-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add kfp pipeline for running a pytorch job #14
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
description: Kubeflow PyTorchJob launcher | ||
inputs: | ||
- {name: name, type: String, description: 'PyTorchJob name.'} | ||
- {name: namespace, type: String, default: kubeflow, description: 'PyTorchJob namespace (likely your current namespace).'} | ||
- {name: version, type: String, default: v1, description: 'PyTorchJob version.'} | ||
- {name: master_spec, type: JsonObject, default: '{}', description: 'PyTorchJob Master replicaSpecs.'} | ||
- {name: worker_spec, type: JsonObject, default: '{}', description: 'PyTorchJob Worker replicaSpecs.'} | ||
- {name: job_timeout_minutes, type: Integer, default: 1440, description: 'Time in minutes to wait for the job to complete.'} | ||
- {name: delete_after_done, type: Boolean, default: 'True' , description: 'Whether to delete the job after it is finished.'} | ||
- {name: clean_pod_policy, type: String, default: Running, description: 'Defines the policy for cleaning up pods after the PyTorchJob completes.'} | ||
- {name: active_deadline_seconds, type: Integer, optional: true, description: 'Specifies the duration (in seconds) since startTime during which the job can remain active before it is terminated. Must be a positive integer. This setting applies only to pods where restartPolicy is OnFailure or Always.'} | ||
- {name: backoff_limit, type: Integer, optional: true, description: 'Number of retries before marking this job as failed.'} | ||
- {name: ttl_seconds_after_finished, type: Integer, optional: true, description: 'Defines the TTL for cleaning up finished PyTorchJobs.'} | ||
implementation: | ||
container: | ||
image: cascribner/kubeflow-pytorchjob-launcher:v1 | ||
command: [python, /ml/launch_pytorchjob.py] | ||
args: | ||
- --name | ||
- {inputValue: name} | ||
- --namespace | ||
- {inputValue: namespace} | ||
- --version | ||
- {inputValue: version} | ||
- --masterSpec | ||
- {inputValue: master_spec} | ||
- --workerSpec | ||
- {inputValue: worker_spec} | ||
- --jobTimeoutMinutes | ||
- {inputValue: job_timeout_minutes} | ||
- --deleteAfterDone | ||
- {inputValue: delete_after_done} | ||
- --cleanPodPolicy | ||
- {inputValue: clean_pod_policy} | ||
- --activeDeadlineSeconds | ||
- {inputValue: active_deadline_seconds} | ||
- --backoffLimit | ||
- {inputValue: backoff_limit} | ||
- --ttlSecondsAfterFinished | ||
- {inputValue: ttl_seconds_after_finished} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,148 @@ | ||||||||||||||||||||||
from typing import NamedTuple | ||||||||||||||||||||||
import kfp.dsl as dsl | ||||||||||||||||||||||
from kfp import components | ||||||||||||||||||||||
|
||||||||||||||||||||||
@dsl.component(base_image="python:slim") | ||||||||||||||||||||||
def create_worker_spec(worker_num: int = 0) -> NamedTuple( | ||||||||||||||||||||||
"CreatWorkerSpec", [("worker_spec", dict)]): | ||||||||||||||||||||||
""" | ||||||||||||||||||||||
Creates pytorch-job worker spec | ||||||||||||||||||||||
""" | ||||||||||||||||||||||
from collections import namedtuple | ||||||||||||||||||||||
worker = {} | ||||||||||||||||||||||
if worker_num > 0: | ||||||||||||||||||||||
worker = { | ||||||||||||||||||||||
"replicas": worker_num, | ||||||||||||||||||||||
"restartPolicy": "OnFailure", | ||||||||||||||||||||||
"template": { | ||||||||||||||||||||||
"metadata": { | ||||||||||||||||||||||
"annotations": { | ||||||||||||||||||||||
"sidecar.istio.io/inject": "false" | ||||||||||||||||||||||
} | ||||||||||||||||||||||
}, | ||||||||||||||||||||||
"spec": { | ||||||||||||||||||||||
"containers": [ | ||||||||||||||||||||||
{ "command": [ | ||||||||||||||||||||||
'/bin/bash', | ||||||||||||||||||||||
'-c', | ||||||||||||||||||||||
'--' | ||||||||||||||||||||||
], | ||||||||||||||||||||||
"args": [ | ||||||||||||||||||||||
"python3.11 -u run.py" | ||||||||||||||||||||||
], | ||||||||||||||||||||||
"image": "quay.io/michaelclifford/test-train:0.0.11", | ||||||||||||||||||||||
"name": "pytorch", | ||||||||||||||||||||||
"resources": { | ||||||||||||||||||||||
"requests": { | ||||||||||||||||||||||
"memory": "8Gi", | ||||||||||||||||||||||
"cpu": "2000m", | ||||||||||||||||||||||
# Uncomment for GPU | ||||||||||||||||||||||
"nvidia.com/gpu": 1, | ||||||||||||||||||||||
}, | ||||||||||||||||||||||
"limits": { | ||||||||||||||||||||||
"memory": "8Gi", | ||||||||||||||||||||||
"cpu": "2000m", | ||||||||||||||||||||||
# Uncomment for GPU | ||||||||||||||||||||||
"nvidia.com/gpu": 1, | ||||||||||||||||||||||
}, | ||||||||||||||||||||||
}, | ||||||||||||||||||||||
} | ||||||||||||||||||||||
] | ||||||||||||||||||||||
}, | ||||||||||||||||||||||
}, | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
worker_spec_output = namedtuple( | ||||||||||||||||||||||
"MyWorkerOutput", ["worker_spec"] | ||||||||||||||||||||||
) | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you want to output a named tuple here? This is useful only if you output multiple params. I don't think it's needed here at all. |
||||||||||||||||||||||
return worker_spec_output(worker) | ||||||||||||||||||||||
|
||||||||||||||||||||||
@dsl.pipeline( | ||||||||||||||||||||||
name="launch-kubeflow-pytorchjob", | ||||||||||||||||||||||
description="An example to launch pytorch.", | ||||||||||||||||||||||
) | ||||||||||||||||||||||
def ilab_train( | ||||||||||||||||||||||
namespace: str = "mcliffor", | ||||||||||||||||||||||
worker_replicas: int = 1, | ||||||||||||||||||||||
ttl_seconds_after_finished: int = -1, | ||||||||||||||||||||||
job_timeout_minutes: int = 600, | ||||||||||||||||||||||
delete_after_done: bool = False): | ||||||||||||||||||||||
|
||||||||||||||||||||||
pytorchjob_launcher_op = components.load_component_from_file("component.yaml") | ||||||||||||||||||||||
|
||||||||||||||||||||||
master = { | ||||||||||||||||||||||
"replicas": 1, | ||||||||||||||||||||||
"restartPolicy": "OnFailure", | ||||||||||||||||||||||
"template": { | ||||||||||||||||||||||
"metadata": { | ||||||||||||||||||||||
"annotations": { | ||||||||||||||||||||||
# See https://github.com/kubeflow/website/issues/2011 | ||||||||||||||||||||||
"sidecar.istio.io/inject": "false" | ||||||||||||||||||||||
} | ||||||||||||||||||||||
}, | ||||||||||||||||||||||
"spec": { | ||||||||||||||||||||||
"containers": [ | ||||||||||||||||||||||
{ | ||||||||||||||||||||||
# To override default command | ||||||||||||||||||||||
"command": [ | ||||||||||||||||||||||
'/bin/bash', | ||||||||||||||||||||||
'-c', | ||||||||||||||||||||||
'--' | ||||||||||||||||||||||
], | ||||||||||||||||||||||
"args": [ | ||||||||||||||||||||||
"python3.11 -u run.py" | ||||||||||||||||||||||
], | ||||||||||||||||||||||
# Or, create your own image from | ||||||||||||||||||||||
# https://github.com/kubeflow/pytorch-operator/tree/master/examples/mnist | ||||||||||||||||||||||
"image": "quay.io/michaelclifford/test-train:0.0.11", | ||||||||||||||||||||||
"name": "pytorch", | ||||||||||||||||||||||
"resources": { | ||||||||||||||||||||||
"requests": { | ||||||||||||||||||||||
"memory": "8Gi", | ||||||||||||||||||||||
"cpu": "2000m", | ||||||||||||||||||||||
# Uncomment for GPU | ||||||||||||||||||||||
"nvidia.com/gpu": 1, | ||||||||||||||||||||||
}, | ||||||||||||||||||||||
"limits": { | ||||||||||||||||||||||
"memory": "8Gi", | ||||||||||||||||||||||
"cpu": "2000m", | ||||||||||||||||||||||
# Uncomment for GPU | ||||||||||||||||||||||
"nvidia.com/gpu": 1, | ||||||||||||||||||||||
}, | ||||||||||||||||||||||
Comment on lines
+41
to
+55
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we parametrize this? |
||||||||||||||||||||||
}, | ||||||||||||||||||||||
} | ||||||||||||||||||||||
], | ||||||||||||||||||||||
# If imagePullSecrets required | ||||||||||||||||||||||
# "imagePullSecrets": [ | ||||||||||||||||||||||
# {"name": "image-pull-secret"}, | ||||||||||||||||||||||
# ], | ||||||||||||||||||||||
}, | ||||||||||||||||||||||
}, | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
worker_spec_create = create_worker_spec(worker_num=worker_replicas) | ||||||||||||||||||||||
|
||||||||||||||||||||||
# Launch and monitor the job with the launcher | ||||||||||||||||||||||
pytorchjob_launcher_op( | ||||||||||||||||||||||
name="pytorch-job", | ||||||||||||||||||||||
namespace=namespace, | ||||||||||||||||||||||
master_spec=master, | ||||||||||||||||||||||
worker_spec = worker_spec_create.outputs["worker_spec"], | ||||||||||||||||||||||
ttl_seconds_after_finished=ttl_seconds_after_finished, | ||||||||||||||||||||||
job_timeout_minutes=job_timeout_minutes, | ||||||||||||||||||||||
delete_after_done=delete_after_done, | ||||||||||||||||||||||
active_deadline_seconds=100, | ||||||||||||||||||||||
backoff_limit=1 | ||||||||||||||||||||||
) | ||||||||||||||||||||||
|
||||||||||||||||||||||
|
||||||||||||||||||||||
if __name__ == "__main__": | ||||||||||||||||||||||
import kfp.compiler as compiler | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think you need to nest the import here. This is against https://pylint.readthedocs.io/en/latest/user_guide/messages/convention/import-outside-toplevel.html |
||||||||||||||||||||||
|
||||||||||||||||||||||
pipeline_file = "pipeline.yaml" | ||||||||||||||||||||||
print( | ||||||||||||||||||||||
f"Compiling pipeline as {pipeline_file}" | ||||||||||||||||||||||
) | ||||||||||||||||||||||
compiler.Compiler().compile( | ||||||||||||||||||||||
ilab_train, pipeline_file | ||||||||||||||||||||||
) | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is somewhat weirdly formatted. I think simple:
Suggested change
Would be fully compliant with PEP8. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole thing can be rewritten as:
or even better - not a component at all. Afterall, it is a single
if
statement + setting of a single value in a dict. This doesn't have to be a component at all. Remember that each component we create we start a container - this only slows the workflow. Especially in cases where it's a simple data formatting.