From 201c3116214220699ed4131755e891e6d18f8b35 Mon Sep 17 00:00:00 2001 From: alexandredevely <12896316+alexandredevely@users.noreply.github.com> Date: Tue, 15 Oct 2024 12:11:09 +0200 Subject: [PATCH] add features_permissions --- oc/od/orchestrator.py | 305 ++++++++++++++++++++++++++++++------------ 1 file changed, 218 insertions(+), 87 deletions(-) diff --git a/oc/od/orchestrator.py b/oc/od/orchestrator.py index 683d4cc..280f62e 100755 --- a/oc/od/orchestrator.py +++ b/oc/od/orchestrator.py @@ -2897,7 +2897,7 @@ def get_executeclasse( self, authinfo:AuthInfo, userinfo:AuthUser, executeclassn return executeclass - def get_resources( self, currentcontainertype:str, executeclass:dict )->dict: + def get_resources_for_container_type( self, currentcontainertype:str, executeclass:dict )->dict: self.logger.debug('') resources = {} # resource is a always a dict currentcontainertype_ressources = oc.od.settings.desktop_pod[currentcontainertype].get('resources') @@ -2908,7 +2908,7 @@ def get_resources( self, currentcontainertype:str, executeclass:dict )->dict: if isinstance( executeclass_ressources, dict ): resources.update(executeclass_ressources) - self.logger.debug(f" get_resources return {resources}") + self.logger.debug(f"get_resources_for_container_type {currentcontainertype} return {resources}") return resources def read_pod_resources( self, pod_name:str)->dict: @@ -2936,6 +2936,7 @@ def read_pod_resources( self, pod_name:str)->dict: return resources + def notify_user( self, myDesktop:ODDesktop, method:str, data:dict )->bool: """notify_user @@ -2970,13 +2971,15 @@ def notify_user( self, myDesktop:ODDesktop, method:str, data:dict )->bool: return bReturn - def addcontainertopod( self, authinfo:AuthInfo, userinfo:AuthUser, currentcontainertype:str, myuuid:str, envlist:list, list_volumeMounts:list, workingdir:str=None, command:str=None ): + def addcontainertopod( self, authinfo:AuthInfo, userinfo:AuthUser, currentcontainertype:str, myuuid:str, envlist:list, list_volumeMounts:list, workingdir:str=None, command:str=None, resources:dict=None ): assert_type( authinfo, AuthInfo) assert_type( userinfo, AuthUser) assert_type( currentcontainertype, str) assert_type( myuuid, str) assert_type( list_volumeMounts, list ) + container_resources = resources or oc.od.settings.desktop_pod[currentcontainertype].get('resources') + self.logger.debug( f"pod container adding {currentcontainertype} to {myuuid}" ) securityContext = self.updateSecurityContextWithUserInfo( currentcontainertype, authinfo, userinfo ) image = self.getimagecontainerfromauthlabels( currentcontainertype, authinfo ) @@ -2986,7 +2989,7 @@ def addcontainertopod( self, authinfo:AuthInfo, userinfo:AuthUser, currentcontai 'image': image, 'env': envlist, 'volumeMounts': list_volumeMounts, - 'resources': oc.od.settings.desktop_pod[currentcontainertype].get('resources') + 'resources': container_resources } if isinstance( workingdir, str): container['workingDir'] = workingdir @@ -3333,7 +3336,8 @@ def createdesktop(self, authinfo:AuthInfo, userinfo:AuthUser, **kwargs)-> ODDesk myuuid=myuuid, envlist=envlist, workingdir=env['HOME'], - list_volumeMounts=list_volumeMounts + list_volumeMounts=list_volumeMounts, + resources=executeclasse.get('resources') ) # by default remove anonymous home directory content at preStop # or if oc.od.settings.desktop['removehomedirectory'] is True @@ -3346,7 +3350,7 @@ def createdesktop(self, authinfo:AuthInfo, userinfo:AuthUser, **kwargs)-> ODDesk self.logger.debug(f"pod container created {currentcontainertype}" ) localaccount_volume_name = self.get_volumes_localaccount_name( authinfo=authinfo, userinfo=userinfo ) - assert isinstance(localaccount_volume_name, str), f"localaccount secret volume is not found, invalid type {type(localaccount_volume_name)} " + assert isinstance(localaccount_volume_name, str), f"localaccount secret volume is not found" containers = { 'printer': { 'list_volumeMounts': [ pod_allvolumeMounts.get('tmp') ] }, # printer uses tmp volume @@ -4148,18 +4152,25 @@ def get_securitycontext(self, authinfo:AuthInfo, userinfo:AuthUser, app:dict ): self.logger.debug( f"securitycontext={securitycontext}") return securitycontext - def get_resources( self, authinfo:AuthInfo, userinfo:AuthUser, app:dict ): + def get_resources( self, authinfo:AuthInfo, userinfo:AuthUser, executeclassname:str )->dict: + """get_resources + + Args: + authinfo (AuthInfo): AuthInfo + userinfo (AuthUser): AuthUser + executeclassname (str): name of the execute class can be None + + Returns: + dict: resources for the pod + """ assert isinstance(authinfo, AuthInfo), f"authinfo has invalid type {type(authinfo)}" assert isinstance(userinfo, AuthUser), f"userinfo has invalid type {type(userinfo)}" - assert isinstance(app, dict), f"desktop has invalid type {type(app)}" - - executeclassname = app.get('executeclassname') - self.logger.debug( f"app name={app.get('name')} has executeclassname={executeclassname}") + # executeclassname can be None if executeclassname is not defined + # then assum this is the default executeclass executeclass = self.orchestrator.get_executeclasse( authinfo, userinfo, executeclassname ) self.logger.debug( f"executeclass={executeclass}") - resources = self.orchestrator.get_resources( self.type, executeclass ) + resources = self.orchestrator.get_resources_for_container_type( self.type, executeclass ) self.logger.debug( f"resources={resources}") - return resources @@ -4390,6 +4401,128 @@ def list( self, authinfo, userinfo, myDesktop, phase_filter=[ 'Running', 'Waitin return result + def create_thread_to_watch_for_pulling_event( self, myDesktop:ODDesktop, pod_name:str, app_container_name:str, app:dict )->threading.Thread: + self.logger.debug( '') + watch_thread=threading.Thread(target=self.thread_to_watch_for_pulling_event, args=[myDesktop, pod_name, app_container_name, app] ) + watch_thread.start() + return watch_thread + + def thread_to_watch_for_pulling_event( self, myDesktop:ODDesktop, pod_name:str, app_container_name:str, app:dict )->None: + self.logger.debug('') + + send_previous_pulling_message = False + field_path = f"spec.ephemeralContainers{{{app_container_name}}}" # 'spec.ephemeralContainers{philip-j--fry-2048-alpine-0ece1}' + + # start + data = { 'message': app.get('name'), + 'name': app.get('name'), + 'icondata': app.get('icondata'), + 'icon': app.get('icon'), + 'image': app.get('id'), + 'launch': app.get('launch') + } + + # field_selector=f'involvedObject.name={pod_name}' + # timeout_seconds=oc.od.settings.desktop['K8S_CREATE_POD_TIMEOUT_SECONDS'], + # send_initial_events=False, + self.logger.debug(f"w.stream kubeapi.list_namespaced_event starting") + # pod_resource_version = int(pod.metadata.resource_version) + # self.logger.debug(f"resource_version = {pod_resource_version}") + field_selector=f'involvedObject.name={pod_name}' + # field_selector='reason=Pulling' + # send_initial_events=False, sendInitialEvents is forbidden for watch unless the WatchList feature gate is enabled + + # watch list_namespaced_event + # ,involvedObject.kind=Pod + w = watch.Watch() + for event in w.stream( self.orchestrator.kubeapi.list_namespaced_event, + namespace=self.orchestrator.namespace, + field_selector=field_selector, + # resource_version_match='NotOlderThan', + # resource_version=pod_resource_version, + # send_initial_events=False, sendInitialEvents is forbidden for watch unless the WatchList feature gate is enabled + timeout_seconds=oc.od.settings.desktop['K8S_CREATE_EPHEMERALCONTAINER_TIMEOUT_SECONDS']): + if not isinstance(event, dict ): + self.logger.debug(f"event not a dict") + continue # safe type test event is a dict + event_object = event.get('object') + if not isinstance(event_object, CoreV1Event ): + self.logger.debug(f"event object is not a CoreV1Event") + continue # safe type test event object is a CoreV1Event + # resourceVersionMatch=NotOlderThan + # revent = self.orchestrator.kubeapi.read_namespaced_event( event_object.metadata.name, self.orchestrator.namespace ) + #if isinstance( event_object.last_timestamp, datetime.datetime ): + # if event_object.last_timestamp < patch_ephemeralcontainer_datetime: + # self.logger.debug(f"{event_object.reason} {event_object.last_timestamp} < {patch_ephemeralcontainer_datetime}") + # continue + + # self.logger.debug(f"event_object.reason = {event_object.reason}") + self.logger.debug(f"{event_object.type} reason={event_object.reason} message={event_object.message}") + # + # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Event.md + # Type of this event (Normal, Warning), new types could be added in the future + # 'Normal': Information only and will not cause any problems + # 'Warning': These events are to warn that something might go wrong + + # if patch_ephemeralcontainer_datetime > event_object. + if not isinstance (event_object.involved_object, V1ObjectReference ): + self.logger.debug(f"event_object.involved_object is not a V1ObjectReference") + continue + + + # filter field_selector=f'involvedObject.field_path={container_name}' is not supported + # https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/#list-of-supported-fields + # use 'spec.ephemeralContainers{philip-j--fry-2048-alpine-0ece1}' + # event_object.involved_object.field_path = "spec.ephemeralContainers{" + container_name + "}" + + + if isinstance( event_object.involved_object.field_path, str ): + if event_object.involved_object.field_path != field_path: + # this event is not for this thread + # self.logger.debug(f"event_object.involved_object.field_path = {event_object.involved_object.field_path} expecting field_path = {field_path}") + # w.stop() + continue + else: + # event_object.involved_object.field_path is not a string + # skip this event + continue + + # self.logger.debug( f"event_object.involved_object.field_path = {event_object.involved_object.field_path}" ) + + if event_object.type == 'Warning': # event Warning + # something might goes wrong + self.logger.error(f"{event_object.type} reason={event_object.reason} message={event_object.message}") + # return message + w.stop() + self.logger.debug("stop watching") + continue + + elif event_object.type == 'Normal': # event Normal + self.logger.debug(f"event_object.reason={event_object.reason}") + if event_object.reason == 'Pulling' : + send_previous_pulling_message = True + data['name'] = event_object.reason + data['message'] = f"{event_object.message}, please wait" + self.orchestrator.notify_user( myDesktop, 'container', data ) + elif event_object.reason == 'Pulled': + if send_previous_pulling_message is True: + data['name'] = event_object.reason + data['message'] = event_object.message + self.orchestrator.notify_user( myDesktop, 'container', data ) + elif event_object.reason == 'Started': + w.stop() + elif event_object.reason == 'Created': + continue # nothing to do + elif event_object.reason == 'Scheduled': + continue # nothing to do + else: + self.logger.debug(f"stop because {event_object.reason}") + w.stop() + + self.logger.debug(f"w.stream kubeapi.list_namespaced_event done") + + + def create(self, myDesktop, app, authinfo, userinfo={}, userargs=None, **kwargs ): self.logger.debug('') assert isinstance(myDesktop, ODDesktop), f"desktop has invalid type {type(myDesktop)}" @@ -4416,7 +4549,6 @@ def create(self, myDesktop, app, authinfo, userinfo={}, userargs=None, **kwargs self.logger.debug( f"reading pod desktop desktop.id={myDesktop.id} myDesktop.container_name={myDesktop.container_name} app_container_name={app_container_name}") - # resources = self.get_resources( authinfo, userinfo, app ) envlist = self.get_env_for_appinstance( myDesktop, app, authinfo, userinfo, userargs, **kwargs ) # add EXECUTION CONTEXT env var inside the container envlist.append( { 'name': 'ABCDESKTOP_EXECUTE_RUNTIME', 'value': self.type} ) @@ -4458,7 +4590,7 @@ def create(self, myDesktop, app, authinfo, userinfo={}, userargs=None, **kwargs Subpath mounts are not allowed for ephemeral containers" securitycontext = self.get_securitycontext( authinfo, userinfo, app ) - field_path = f"spec.ephemeralContainers{{{app_container_name}}}" # 'spec.ephemeralContainers{philip-j--fry-2048-alpine-0ece1}' + # apply network rules # network_config = self.applyappinstancerules_network( authinfo, rules ) @@ -4509,9 +4641,16 @@ def create(self, myDesktop, app, authinfo, userinfo={}, userargs=None, **kwargs raise ValueError( 'Invalid patch_namespaced_pod_ephemeralcontainers') # use same tzinfo as pod - patch_ephemeralcontainer_datetime = datetime.datetime.now(pod.metadata.creation_timestamp.tzinfo).replace( microsecond=0 ) - # watch list_namespaced_event - w = watch.Watch() + # patch_ephemeralcontainer_datetime = datetime.datetime.now(pod.metadata.creation_timestamp.tzinfo).replace( microsecond=0 ) + # default return + appinstancestatus = oc.od.appinstancestatus.ODAppInstanceStatus( id=app_container_name, type=self.type, wm_class=app.get('launch') ) + appinstancestatus.message = "Application" # default message + + # create a thread to watch for pulling event + # self.create_thread_to_watch_for_pulling_event( myDesktop, pod_name, app_container_name, app ) + + + pod_resource_version = pod.metadata.resource_version send_previous_pulling_message = False # start @@ -4525,7 +4664,7 @@ def create(self, myDesktop, app, authinfo, userinfo={}, userargs=None, **kwargs # default return - appinstancestatus = oc.od.appinstancestatus.ODAppInstanceStatus( id=app_container_name, type=self.type ) + appinstancestatus = oc.od.appinstancestatus.ODAppInstanceStatus( id=app_container_name, type=self.type, wm_class=app.get('launch') ) appinstancestatus.message = "Application" # default message # field_selector=f'involvedObject.name={pod_name}' @@ -4534,13 +4673,19 @@ def create(self, myDesktop, app, authinfo, userinfo={}, userargs=None, **kwargs self.logger.debug(f"w.stream kubeapi.list_namespaced_event starting") # pod_resource_version = int(pod.metadata.resource_version) # self.logger.debug(f"resource_version = {pod_resource_version}") - field_selector=f'involvedObject.name={pod_name}' - # field_selector='reason=Pulling' + field_selector=f'involvedObject.name={pod_name},reason=Pulling' + field_path=f"spec.ephemeralContainers{{{app_container_name}}}" # 'spec.ephemeralContainers{philip-j--fry-2048-alpine-0ece1}' + # field_selector='reason=Pulled' # send_initial_events=False, sendInitialEvents is forbidden for watch unless the WatchList feature gate is enabled - self.logger.debug(f"current filter time is {patch_ephemeralcontainer_datetime}") + # self.logger.debug(f"current filter time is {patch_ephemeralcontainer_datetime}") + # watch list_namespaced_event + # ,involvedObject.kind=Pod + w = watch.Watch() for event in w.stream( self.orchestrator.kubeapi.list_namespaced_event, namespace=self.orchestrator.namespace, field_selector=field_selector, + # resource_version_match='NotOlderThan', + # resource_version=pod_resource_version, # send_initial_events=False, sendInitialEvents is forbidden for watch unless the WatchList feature gate is enabled timeout_seconds=oc.od.settings.desktop['K8S_CREATE_EPHEMERALCONTAINER_TIMEOUT_SECONDS']): if not isinstance(event, dict ): @@ -4550,15 +4695,15 @@ def create(self, myDesktop, app, authinfo, userinfo={}, userargs=None, **kwargs if not isinstance(event_object, CoreV1Event ): self.logger.debug(f"event object is not a CoreV1Event") continue # safe type test event object is a CoreV1Event - + # resourceVersionMatch=NotOlderThan # revent = self.orchestrator.kubeapi.read_namespaced_event( event_object.metadata.name, self.orchestrator.namespace ) - if isinstance( event_object.last_timestamp, datetime.datetime ): - if event_object.last_timestamp < patch_ephemeralcontainer_datetime: - self.logger.debug(f"{event_object.reason} {event_object.last_timestamp} < {patch_ephemeralcontainer_datetime}") - continue + #if isinstance( event_object.last_timestamp, datetime.datetime ): + # if event_object.last_timestamp < patch_ephemeralcontainer_datetime: + # self.logger.debug(f"{event_object.reason} {event_object.last_timestamp} < {patch_ephemeralcontainer_datetime}") + # continue - self.logger.debug(f"event_object.reason = {event_object.reason}") - #self.logger.debug(f"{event_object.type} reason={event_object.reason} message={event_object.message}") + # self.logger.debug(f"event_object.reason = {event_object.reason}") + self.logger.debug(f"{event_object.type} reason={event_object.reason} message={event_object.message}") # # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Event.md # Type of this event (Normal, Warning), new types could be added in the future @@ -4574,32 +4719,21 @@ def create(self, myDesktop, app, authinfo, userinfo={}, userargs=None, **kwargs # filter field_selector=f'involvedObject.field_path={container_name}' is not supported # use 'spec.ephemeralContainers{philip-j--fry-2048-alpine-0ece1}' # event_object.involved_object.field_path = "spec.ephemeralContainers{" + container_name + "}" - if event_object.involved_object.field_path is None: - self.logger.debug(f"event_object.involved_object.field_path is None, continue") - continue - self.logger.debug( f"event_object.involved_object.field_path = {event_object.involved_object.field_path}" ) - if isinstance( event_object.involved_object.field_path, str ): - # check if event is a spec.ephemeralContainers - if event_object.involved_object.field_path.startswith('spec.ephemeralContainers'): - if event_object.involved_object.field_path != field_path: - # this event is not for this thread - self.logger.debug(f"event_object.involved_object.field_path = {event_object.involved_object.field_path} expecting field_path = {field_path}") - # w.stop() - continue - else: - # this event is for us - self.logger.debug(f"event_object.involved_object.field_path = {event_object.involved_object.field_path} == {field_path}") - else: - # this is not an ephemeralContainers event + if event_object.involved_object.field_path != field_path: + # this event is not for this thread + # self.logger.debug(f"event_object.involved_object.field_path = {event_object.involved_object.field_path} expecting field_path = {field_path}") + # w.stop() continue else: - # not a string and not None + # event_object.involved_object.field_path is not a string # skip this event continue + # self.logger.debug( f"event_object.involved_object.field_path = {event_object.involved_object.field_path}" ) + if event_object.type == 'Warning': # event Warning # something might goes wrong self.logger.error(f"{event_object.type} reason={event_object.reason} message={event_object.message}") @@ -4610,6 +4744,7 @@ def create(self, myDesktop, app, authinfo, userinfo={}, userargs=None, **kwargs continue elif event_object.type == 'Normal': # event Normal + self.logger.debug(f"event_object.reason={event_object.reason}") if event_object.reason == 'Pulling' : send_previous_pulling_message = True data['name'] = event_object.reason @@ -4623,27 +4758,16 @@ def create(self, myDesktop, app, authinfo, userinfo={}, userargs=None, **kwargs elif event_object.reason == 'Started': w.stop() elif event_object.reason == 'Created': - w.stop() + continue # nothing to do elif event_object.reason == 'Scheduled': continue # nothing to do else: self.logger.debug(f"stop because {event_object.reason}") w.stop() - - self.logger.debug(f"w.stream kubeapi.list_namespaced_event done") - data = { 'message': app.get('name'), - 'name': app.get('name'), - 'icondata': app.get('icondata'), - 'icon': app.get('icon'), - 'image': app.get('id'), - 'launch': app.get('launch') - } - - - """ + pod = self.orchestrator.kubeapi.read_namespaced_pod(namespace=self.orchestrator.namespace,name=pod_name) if isinstance( pod, V1Pod ) and \ isinstance( pod.status, V1PodStatus ) and \ @@ -4655,26 +4779,34 @@ def create(self, myDesktop, app, authinfo, userinfo={}, userargs=None, **kwargs if isinstance( c.state, V1ContainerState ): if isinstance(c.state.terminated, V1ContainerStateTerminated ): appinstancestatus.message = 'Terminated' - data['message'] = 'Application is terminated' + data = { + 'message': 'Application is terminated', + 'name': app.get('name'), + 'icondata': app.get('icondata'), + 'icon': app.get('icon'), + 'image': app.get('id'), + 'launch': app.get('launch') + } + # report error to the user self.orchestrator.notify_user( myDesktop, 'container', data ) elif isinstance(c.state.running, V1ContainerStateRunning ): appinstancestatus.message = 'Running' elif isinstance(c.state.waiting, V1ContainerStateWaiting): appinstancestatus.message = c.state.waiting.reason break - self.logger.debug(f"read_namespaced_pod done") + + self.logger.debug(f"create done {appinstancestatus}") return appinstancestatus + """ self.logger.debug(f"starting read_namespaced_pod") w = watch.Watch() # read_namespaced_pod - - for event in w.stream( self.orchestrator.kubeapi.list_namespaced_pod, - namespace=self.orchestrator.namespace, - field_selector=f"metadata.name={pod_name}" ): - + for event in w.stream( self.orchestrator.kubeapi.list_namespaced_pod, + namespace=self.orchestrator.namespace, + field_selector=f"metadata.name={pod_name}" ): # event must be a dict, else continue - if not isinstance(event,dict): continue + if not isinstance(event,dict): continue self.logger.debug( f"list_namespaced_pod get event type={event.get('type')}") # object={type(event.get('object'))}" ) pod = event.get('object') @@ -4686,37 +4818,36 @@ def create(self, myDesktop, app, authinfo, userinfo={}, userargs=None, **kwargs for c in pod.status.ephemeral_container_statuses: if isinstance( c, V1ContainerStatus ) and c.name == app_container_name: self.logger.debug( f"{app_container_name} is found in ephemeral_container_statuses {c}") - appinstancestatus = oc.od.appinstancestatus.ODAppInstanceStatus( id=c.name, type=self.type ) if isinstance( c.state, V1ContainerState ): - if isinstance(c.state.terminated, V1ContainerStateTerminated ): + if isinstance(c.state.terminated, V1ContainerStateTerminated ): appinstancestatus.message = 'Terminated' w.stop() + break elif isinstance(c.state.running, V1ContainerStateRunning ): appinstancestatus.message = 'Running' w.stop() - + break elif isinstance(c.state.waiting, V1ContainerStateWaiting): self.logger.debug( f"V1ContainerStateWaiting reason={c.state.waiting.reason}" ) - #data = { 'message': app.get('name'), - # 'name': app.get('name'), - # 'icondata': app.get('icondata'), - # 'icon': app.get('icon'), - # 'image': app.get('id'), - # 'launch': app.get('launch') - #} - - #if c.state.waiting.reason == 'PodInitializing': - # data['message'] = f"{c.state.waiting.reason} {app.get('name')}, please wait" - # self.orchestrator.notify_user( myDesktop, 'container', data ) + data = { 'message': app.get('name'), + 'name': app.get('name'), + 'icondata': app.get('icondata'), + 'icon': app.get('icon'), + 'image': app.get('id'), + 'launch': app.get('launch') + } + if c.state.waiting.reason == 'PodInitializing': + data['message'] = f"{c.state.waiting.reason} {app.get('name')}, please wait" + self.orchestrator.notify_user( myDesktop, 'container', data ) if event.get('type') == 'ERROR': self.logger.error( f"{event.get('type')} object={type(event.get('object'))}") appinstancestatus.message = 'ERROR' w.stop() - return appinstancestatus - - + return appinstancestatus + """ + """ # Valid values for event types (new types could be added in future) # EventTypeNormal string = "Normal" // Information only and will not cause any problems @@ -5137,7 +5268,7 @@ def create(self, myDesktop, app, authinfo, userinfo={}, userargs=None, **kwargs nodeSelector = self.get_appnodeSelector( authinfo, userinfo, app) securitycontext = self.get_securitycontext( authinfo, userinfo, app ) workingDir = self.orchestrator.get_user_homedirectory( authinfo, userinfo ) - resources = self.get_resources( authinfo, userinfo, app ) + resources = self.get_resources( authinfo, userinfo, app.get('executeclassname') ) affinity = self.get_affinity( authinfo, userinfo, app, myDesktop ) # init container for the pod apps @@ -5354,7 +5485,7 @@ def create(self, myDesktop, app, authinfo, userinfo={}, userargs=None, **kwargs message=pod.status.phase, webhook = fillednetworkconfig.get('webhook'), type=self.type, - wm_class=app.get('wm_class') + wm_class=app.get('launch') ) return appinstancestatus