-
Notifications
You must be signed in to change notification settings - Fork 0
/
controller.py
121 lines (102 loc) · 4.46 KB
/
controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import os
import sys
import time
import json
import yaml
import atexit
from kubernetes import client, config, watch
from kubernetes.client.rest import ApiException
import dl_job
import settings.settings as settings
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()
class DLOperator:
def __init__(self):
atexit.register(self.clean_up)
if 'KUBERNETES_PORT' in os.environ:
config.load_incluster_config()
else:
config.load_kube_config()
self.configuration = client.Configuration()
self.configuration.assert_hostname = False
self.api_client = client.api_client.ApiClient(configuration=self.configuration)
self.v1_client = client.ApiextensionsV1beta1Api(self.api_client)
self.crd_client = client.CustomObjectsApi(self.api_client)
self.jobs = {}
# launch default clean up process
self.clean_up()
def clean_up(self):
# delete all crds in the cluster
# all jobs associated with the crds will be deleted as well
logging.info("Deleting existing custom resources...")
current_crds = ["{}.{}".format(x['spec']['names']['plural'], x['spec']['group'])
for x in self.v1_client.list_custom_resource_definition().to_dict()['items']]
for c in current_crds:
logger.info(f"Deleting {c}")
self.v1_client.delete_custom_resource_definition(name=c, body=client.V1DeleteOptions())
for k, v in self.jobs.items():
v.clean_up()
# wait a moment for the resource to be deleted
time.sleep(2)
def create_crd(self, crd_path):
current_crds = [x['spec']['names']['kind'].lower() for x in
self.v1_client.list_custom_resource_definition().to_dict()['items']]
logger.info(f"Creating {crd_path} CRD")
with open(crd_path) as crd:
body = yaml.load(crd)
try:
self.v1_client.create_custom_resource_definition(body)
except ApiException as e:
if json.loads(e.body)['reason'] == "AlreadyExists":
logger.info(f"Resource {body['metadata']['name']} already exists")
return
raise e
def create_dljob(self):
# TODO: Make this work
logger.info(f"Creating {dl_job} DLJOB")
with open("test.json") as test:
test0 = json.loads(test.read())
try:
self.crd_client.create_cluster_custom_object(settings.DOMAIN, "v1",
settings.CRD_NAME_PLURAL,
body=test0)
except ApiException as e:
logger.debug(json.loads(e.body))
sys.exit(1)
def update_crd(self, obj):
metadata = obj.get("metadata")
if not metadata:
logger.info("No metadata in object, skipping: %s" % json.dumps(obj, indent=1))
return
name = metadata.get("name")
namespace = metadata.get("namespace")
spec = obj["spec"]
# Update spec ...
logger.info("Updating: %s" % name)
logger.info("Updating: %s" % name)
self.crd_client.replace_namespaced_custom_object(settings.DOMAIN, "v1", namespace, settings.CRD_NAME_PLURAL, name, obj)
def watch_crd(self):
logger.info("Waiting for DLJobs to come up...")
resource_version = ''
while True:
stream = watch.Watch().stream(self.crd_client.list_cluster_custom_object, settings.DOMAIN, "v1", settings.CRD_NAME_PLURAL,
resource_version=resource_version)
for event in stream:
obj = event["object"]
operation = event['type']
spec = obj.get("spec")
if not spec:
continue
metadata = obj.get("metadata")
name = metadata['name']
logger.info("Handling %s on %s" % (operation, name))
if operation == "ADDED":
self.new_job(name=name, kind=obj.get("kind"), spec=spec)
def new_job(self, name, kind, spec):
# job does not exists. Create new job
if name not in self.jobs:
logger.debug(f"{name} not found in jobs. Create new one.")
job_class = getattr(dl_job, kind)
job = job_class(name, spec)
self.jobs[name] = job