From 98130cb8135cf049576c48b11b74affe7f6a1d9e Mon Sep 17 00:00:00 2001 From: fstagni Date: Mon, 29 Mar 2021 13:42:47 +0200 Subject: [PATCH 1/8] better logging --- .../private/standardLogging/Logging.py | 4 --- .../Agent/StalledJobAgent.py | 26 +++++++++---------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/src/DIRAC/FrameworkSystem/private/standardLogging/Logging.py b/src/DIRAC/FrameworkSystem/private/standardLogging/Logging.py index 86d1f0d7323..683927b5154 100644 --- a/src/DIRAC/FrameworkSystem/private/standardLogging/Logging.py +++ b/src/DIRAC/FrameworkSystem/private/standardLogging/Logging.py @@ -12,14 +12,11 @@ from DIRAC import S_ERROR from DIRAC.FrameworkSystem.private.standardLogging.LogLevels import LogLevels -from DIRAC.Core.Utilities.Decorators import deprecated from DIRAC.Core.Utilities.LockRing import LockRing -from DIRAC.Resources.LogBackends.AbstractBackend import AbstractBackend class Logging(object): """ - - Logging is a wrapper of the logger object from the standard "logging" library which integrates some DIRAC concepts. - It aimed at seamlessly replacing the previous gLogger implementation and thus provides the same interface. - Logging is generally used to create log records, that are then sent to pre-determined backends. @@ -453,4 +450,3 @@ def getSubLogger(self, subName, child=True): return childLogging finally: self._lockInit.release() - \ No newline at end of file diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py index d3f8f807f77..3563aa1d91e 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py @@ -88,8 +88,8 @@ def execute(self): self.matchedTime = self.am_getOption('MatchedTime', self.matchedTime) self.rescheduledTime = self.am_getOption('RescheduledTime', self.rescheduledTime) - self.log.verbose('StalledTime = %s cycles' % (stalledTime)) - self.log.verbose('FailedTime = %s cycles' % (failedTime)) + self.log.verbose('', 'StalledTime = %s cycles' % (stalledTime)) + self.log.verbose('', 'FailedTime = %s cycles' % (failedTime)) watchdogCycle = gConfig.getValue(cfgPath(wrapperSection, 'CheckingTime'), 30 * 60) watchdogCycle = max(watchdogCycle, gConfig.getValue(cfgPath(wrapperSection, 'MinCheckingTime'), 20 * 60)) @@ -137,8 +137,8 @@ def _markStalledJobs(self, stalledTime): if not result['Value']: return S_OK() jobs = sorted(result['Value']) - self.log.info('%d %s jobs will be checked for being stalled, heartbeat before %s' % - (len(jobs), ' & '.join(checkedStatuses), str(checkTime))) + self.log.info('%s jobs will be checked for being stalled' % ' & '.join(checkedStatuses), + '(n=%d, heartbeat before %s)' % (len(jobs), str(checkTime))) for job in jobs: delayTime = stalledTime @@ -149,14 +149,14 @@ def _markStalledJobs(self, stalledTime): # Check if the job is really stalled result = self.__checkJobStalled(job, delayTime) if result['OK']: - self.log.verbose('Updating status to Stalled for job %s' % (job)) + self.log.verbose('Updating status to Stalled', 'for job %s' % (job)) self.__updateJobStatus(job, JobStatus.STALLED) stalledCounter += 1 else: self.log.verbose(result['Message']) aliveCounter += 1 - self.log.info('Total jobs: %d, Stalled jobs: %d, %s jobs: %d' % + self.log.info('', 'Total jobs: %d, Stalled jobs: %d, %s jobs: %d' % (len(jobs), stalledCounter, '+'.join(checkedStatuses), aliveCounter)) return S_OK() @@ -174,7 +174,7 @@ def _failStalledJobs(self, failedTime): minorStalledStatuses = (JobMinorStatus.STALLED_PILOT_NOT_RUNNING, 'Stalling for more than %d sec' % failedTime) if jobs: - self.log.info('%d jobs Stalled will be checked for failure' % len(jobs)) + self.log.info('Jobs Stalled will be checked for failure', '(n=%d)' % len(jobs)) for job in jobs: setFailed = False @@ -213,7 +213,7 @@ def _failStalledJobs(self, failedTime): return result if result['Value']: jobs = result['Value'] - self.log.info('%s Stalled jobs will be Accounted' % (len(jobs))) + self.log.info('Stalled jobs will be Accounted', '(n=%d)' % (len(jobs))) for job in jobs: result = self.__sendAccounting(job) if not result['OK']: @@ -225,9 +225,9 @@ def _failStalledJobs(self, failedTime): break if failedCounter: - self.log.info('%d jobs set to Failed' % failedCounter) + self.log.info('jobs set to Failed', '(%d)' % failedCounter) if recoverCounter: - self.log.info('%d jobs properly Accounted' % recoverCounter) + self.log.info('jobs properly Accounted', '(%d)' % recoverCounter) return S_OK(failedCounter) ############################################################################# @@ -296,7 +296,7 @@ def __getLatestUpdateTime(self, job): if not latestUpdate: return S_ERROR('LastUpdate and HeartBeat times are null for job %s' % job) else: - self.log.verbose('Latest update time from epoch for job %s is %s' % (job, latestUpdate)) + self.log.verbose('', 'Latest update time from epoch for job %s is %s' % (job, latestUpdate)) return S_OK(latestUpdate) ############################################################################# @@ -304,16 +304,16 @@ def __updateJobStatus(self, job, status, minorStatus=None): """ This method updates the job status in the JobDB, this should only be used to fail jobs due to the optimizer chain. """ - self.log.verbose("self.jobDB.setJobAttribute(%s,'Status','%s',update=True)" % (job, status)) if self.am_getOption('Enable', True): + self.log.debug("self.jobDB.setJobAttribute(%s,'Status','%s',update=True)" % (job, status)) result = self.jobDB.setJobAttribute(job, 'Status', status, update=True) else: result = S_OK('DisabledMode') if result['OK']: if minorStatus: - self.log.verbose("self.jobDB.setJobAttribute(%s,'MinorStatus','%s',update=True)" % (job, minorStatus)) + self.log.debug("self.jobDB.setJobAttribute(%s,'MinorStatus','%s',update=True)" % (job, minorStatus)) result = self.jobDB.setJobAttribute(job, 'MinorStatus', minorStatus, update=True) if not minorStatus: # Retain last minor status for stalled jobs From b9638d8c921520f3fd0c286a4345a7b1b660195a Mon Sep 17 00:00:00 2001 From: fstagni Date: Mon, 29 Mar 2021 17:21:25 +0200 Subject: [PATCH 2/8] Using threads --- .../Agent/StalledJobAgent.py | 261 +++++++++--------- 1 file changed, 135 insertions(+), 126 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py index 3563aa1d91e..09b80576fa9 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py @@ -18,11 +18,14 @@ __RCSID__ = "$Id$" import six +from six.moves.queue import Queue + from DIRAC import S_OK, S_ERROR, gConfig from DIRAC.AccountingSystem.Client.Types.Job import Job from DIRAC.Core.Base.AgentModule import AgentModule from DIRAC.Core.Utilities.Time import fromString, toEpoch, dateTime, second from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd +from DIRAC.Core.Utilities.ThreadPool import ThreadPool from DIRAC.ConfigurationSystem.Client.Helpers import cfgPath from DIRAC.ConfigurationSystem.Client.PathFinder import getSystemInstance from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient @@ -55,6 +58,7 @@ def __init__(self, *args, **kwargs): self.rescheduledTime = 600 self.submittingTime = 300 self.stalledJobsTolerantSites = [] + self.jobsQueue = Queue() ############################################################################# def initialize(self): @@ -65,6 +69,15 @@ def initialize(self): self.am_setOption('PollingTime', 60 * 60) if not self.am_getOption('Enable', True): self.log.info('Stalled Job Agent running in disabled mode') + + # setting up the threading + maxNumberOfThreads = self.am_getOption('maxNumberOfThreads', 15) + threadPool = ThreadPool(maxNumberOfThreads, maxNumberOfThreads) + self.log.verbose("Multithreaded with %d threads" % maxNumberOfThreads) + + for i in range(maxNumberOfThreads): + threadPool.generateJobAndQueueIt(self._execute) + return S_OK() ############################################################################# @@ -72,8 +85,9 @@ def execute(self): """ The main agent execution method """ - self.log.verbose('Waking up Stalled Job Agent') + self.log.debug('Waking up Stalled Job Agent') + # getting parameters wms_instance = getSystemInstance('WorkloadManagement') if not wms_instance: return S_ERROR('Can not get the WorkloadManagement system instance') @@ -95,142 +109,130 @@ def execute(self): watchdogCycle = max(watchdogCycle, gConfig.getValue(cfgPath(wrapperSection, 'MinCheckingTime'), 20 * 60)) # Add half cycle to avoid race conditions - stalledTime = int(watchdogCycle * (stalledTime + 0.5)) - failedTime = int(watchdogCycle * (failedTime + 0.5)) + self.stalledTime = int(watchdogCycle * (stalledTime + 0.5)) + self.failedTime = int(watchdogCycle * (failedTime + 0.5)) - result = self._markStalledJobs(stalledTime) - if not result['OK']: - self.log.error('Failed to detect stalled jobs', result['Message']) + self.minorStalledStatuses = ( + JobMinorStatus.STALLED_PILOT_NOT_RUNNING, + 'Stalling for more than %d sec' % self.failedTime) - # Note, jobs will be revived automatically during the heartbeat signal phase and - # subsequent status changes will result in jobs not being selected by the - # stalled job agent. + # Now we are getting what's going to be checked - result = self._failStalledJobs(failedTime) + # 1) For marking the jobs stalled + # This is the minimum time we wait for declaring a job Stalled, therefore it is safe + checkTime = dateTime() - stalledTime * second + checkedStatuses = [JobStatus.RUNNING, JobStatus.COMPLETING] + # Only get jobs whose HeartBeat is older than the stalledTime + result = self.jobDB.selectJobs({'Status': checkedStatuses}, + older=checkTime, timeStamp='HeartBeatTime') + if not result['OK']: + return result + if result['Value']: + jobs = sorted(result['Value']) + self.log.info('%s jobs will be checked for being stalled' % ' & '.join(checkedStatuses), + '(n=%d, heartbeat before %s)' % (len(jobs), str(checkTime))) + for job in jobs: + self.jobsQueue.put('%s:_markStalledJobs' % job) + + # 2) For marking the Stalled jobs to Failed + result = self.jobDB.selectJobs({'Status': JobStatus.STALLED}) if not result['OK']: - self.log.error('Failed to process stalled jobs', result['Message']) + return result + if result['Value']: + jobs = sorted(result['Value']) + self.log.info('Jobs Stalled will be checked for failure', '(n=%d)' % len(jobs)) + for job in jobs: + self.jobsQueue.put('%s:_failStalledJobs' % job) + + # 3) Send accounting + for minor in self.minorStalledStatuses: + result = self.jobDB.selectJobs({'Status': JobStatus.FAILED, 'MinorStatus': minor, 'AccountedFlag': 'False'}) + if not result['OK']: + return result + if result['Value']: + jobs = result['Value'] + self.log.info('Stalled jobs will be Accounted', '(n=%d)' % (len(jobs))) + for job in jobs: + self.jobsQueue.put('%s:__sendAccounting' % job) + # 4) Fail submitting jobs result = self._failSubmittingJobs() if not result['OK']: self.log.error('Failed to process jobs being submitted', result['Message']) + # 5) Kick stuck jobs result = self._kickStuckJobs() if not result['OK']: self.log.error('Failed to kick stuck jobs', result['Message']) return S_OK('Stalled Job Agent cycle complete') - ############################################################################# - def _markStalledJobs(self, stalledTime): - """ Identifies stalled jobs running or completing without update longer than stalledTime. + def _execute(self): """ - stalledCounter = 0 - aliveCounter = 0 - # This is the minimum time we wait for declaring a job Stalled, therefore it is safe - checkTime = dateTime() - stalledTime * second - checkedStatuses = [JobStatus.RUNNING, JobStatus.COMPLETING] - # Only get jobs whose HeartBeat is older than the stalledTime - result = self.jobDB.selectJobs({'Status': checkedStatuses}, - older=checkTime, timeStamp='HeartBeatTime') - if not result['OK']: - return result - if not result['Value']: - return S_OK() - jobs = sorted(result['Value']) - self.log.info('%s jobs will be checked for being stalled' % ' & '.join(checkedStatuses), - '(n=%d, heartbeat before %s)' % (len(jobs), str(checkTime))) - - for job in jobs: - delayTime = stalledTime - # Add a tolerance time for some sites if required - site = self.jobDB.getJobAttribute(job, 'site')['Value'] - if site in self.stalledJobsTolerantSites: - delayTime += self.stalledJobsToleranceTime - # Check if the job is really stalled - result = self.__checkJobStalled(job, delayTime) - if result['OK']: - self.log.verbose('Updating status to Stalled', 'for job %s' % (job)) - self.__updateJobStatus(job, JobStatus.STALLED) - stalledCounter += 1 - else: - self.log.verbose(result['Message']) - aliveCounter += 1 + Doing the actual job. This is run inside the threads + """ + while True: + job_Op = self.jobsQueue.get() + jobID, jobOp = job_Op.split(':') + jobID = int(jobID) + res = getattr(self, '%s' % jobOp)(jobID) + if not res['OK']: + self.log.error("Failure executing %s" % jobOp, "on %d" % jobID) - self.log.info('', 'Total jobs: %d, Stalled jobs: %d, %s jobs: %d' % - (len(jobs), stalledCounter, '+'.join(checkedStatuses), aliveCounter)) + ############################################################################# + def _markStalledJobs(self, jobID): + """ + Identifies if JobID is stalled: + running or completing without update longer than stalledTime. + """ + delayTime = self.stalledTime + # Add a tolerance time for some sites if required + site = self.jobDB.getJobAttribute(jobID, 'site')['Value'] + if site in self.stalledJobsTolerantSites: + delayTime += self.stalledJobsToleranceTime + # Check if the job is really stalled + result = self.__checkJobStalled(jobID, delayTime) + if result['OK']: + self.log.verbose('Updating status to Stalled', 'for job %s' % (jobID)) + self.__updateJobStatus(jobID, JobStatus.STALLED) + else: + self.log.error(result['Message']) return S_OK() ############################################################################# - def _failStalledJobs(self, failedTime): + def _failStalledJobs(self, jobID): """ Changes the Stalled status to Failed for jobs long in the Stalled status """ - # Only get jobs that have been Stalled for long enough - result = self.jobDB.selectJobs({'Status': JobStatus.STALLED}) + + setFailed = False + # Check if the job pilot is lost + result = self.__getJobPilotStatus(jobID) if not result['OK']: + self.log.error('Failed to get pilot status', + "for job %d: %s" % (jobID, result['Message'])) return result - jobs = result['Value'] - - failedCounter = 0 - minorStalledStatuses = (JobMinorStatus.STALLED_PILOT_NOT_RUNNING, 'Stalling for more than %d sec' % failedTime) - - if jobs: - self.log.info('Jobs Stalled will be checked for failure', '(n=%d)' % len(jobs)) - - for job in jobs: - setFailed = False - # Check if the job pilot is lost - result = self.__getJobPilotStatus(job) - if not result['OK']: - self.log.error('Failed to get pilot status', result['Message']) - continue - pilotStatus = result['Value'] - if pilotStatus != "Running": - setFailed = minorStalledStatuses[0] - else: - # Verify that there was no sign of life for long enough - result = self.__getLatestUpdateTime(job) - if not result['OK']: - self.log.error('Failed to get job update time', result['Message']) - continue - elapsedTime = toEpoch() - result['Value'] - if elapsedTime > failedTime: - setFailed = minorStalledStatuses[1] - - # Set the jobs Failed, send them a kill signal in case they are not really dead and send accounting info - if setFailed: - self.__sendKillCommand(job) - self.__updateJobStatus(job, JobStatus.FAILED, minorStatus=setFailed) - failedCounter += 1 - result = self.__sendAccounting(job) - if not result['OK']: - self.log.error('Failed to send accounting', result['Message']) - - recoverCounter = 0 - - for minor in minorStalledStatuses: - result = self.jobDB.selectJobs({'Status': JobStatus.FAILED, 'MinorStatus': minor, 'AccountedFlag': 'False'}) + pilotStatus = result['Value'] + if pilotStatus != "Running": + setFailed = self.minorStalledStatuses[0] + else: + # Verify that there was no sign of life for long enough + result = self.__getLatestUpdateTime(jobID) if not result['OK']: + self.log.error('Failed to get job update time', + "for job %d: %s" % (jobID, result['Message'])) return result - if result['Value']: - jobs = result['Value'] - self.log.info('Stalled jobs will be Accounted', '(n=%d)' % (len(jobs))) - for job in jobs: - result = self.__sendAccounting(job) - if not result['OK']: - self.log.error('Failed to send accounting', result['Message']) - continue + elapsedTime = toEpoch() - result['Value'] + if elapsedTime > self.failedTime: + setFailed = self.minorStalledStatuses[1] - recoverCounter += 1 - if not result['OK']: - break + # Set the jobs Failed, send them a kill signal in case they are not really dead + # and send accounting info + if setFailed: + self.__sendKillCommand(jobID) + self.__updateJobStatus(jobID, JobStatus.FAILED, minorStatus=setFailed) - if failedCounter: - self.log.info('jobs set to Failed', '(%d)' % failedCounter) - if recoverCounter: - self.log.info('jobs properly Accounted', '(%d)' % recoverCounter) - return S_OK(failedCounter) + return S_OK() - ############################################################################# def __getJobPilotStatus(self, jobID): """ Get the job pilot status """ @@ -264,9 +266,10 @@ def __checkJobStalled(self, job, stalledTime): return result elapsedTime = toEpoch() - result['Value'] - self.log.verbose('(CurrentTime-LastUpdate) = %s secs' % (elapsedTime)) + self.log.debug('(CurrentTime-LastUpdate) = %s secs' % (elapsedTime)) if elapsedTime > stalledTime: - self.log.info('Job %s is identified as stalled with last update > %s secs ago' % (job, elapsedTime)) + self.log.info('Job is identified as stalled', + ": jobID %d with last update > %s secs ago" % (job, elapsedTime)) return S_OK('Stalled') return S_ERROR('Job %s is running and will be ignored' % job) @@ -284,12 +287,12 @@ def __getLatestUpdateTime(self, job): latestUpdate = 0 if not result['Value']['HeartBeatTime'] or result['Value']['HeartBeatTime'] == 'None': - self.log.verbose('HeartBeatTime is null for job %s' % job) + self.log.verbose('HeartBeatTime is null', 'for job %s' % job) else: latestUpdate = toEpoch(fromString(result['Value']['HeartBeatTime'])) if not result['Value']['LastUpdateTime'] or result['Value']['LastUpdateTime'] == 'None': - self.log.verbose('LastUpdateTime is null for job %s' % job) + self.log.verbose('LastUpdateTime is null', 'for job %s' % job) else: latestUpdate = max(latestUpdate, toEpoch(fromString(result['Value']['LastUpdateTime']))) @@ -301,30 +304,36 @@ def __getLatestUpdateTime(self, job): ############################################################################# def __updateJobStatus(self, job, status, minorStatus=None): - """ This method updates the job status in the JobDB, this should only be - used to fail jobs due to the optimizer chain. + """ This method updates the job status in the JobDB """ - if self.am_getOption('Enable', True): - self.log.debug("self.jobDB.setJobAttribute(%s,'Status','%s',update=True)" % (job, status)) - result = self.jobDB.setJobAttribute(job, 'Status', status, update=True) - else: - result = S_OK('DisabledMode') + if not self.am_getOption('Enable', True): + return S_OK('Disabled') - if result['OK']: - if minorStatus: - self.log.debug("self.jobDB.setJobAttribute(%s,'MinorStatus','%s',update=True)" % (job, minorStatus)) - result = self.jobDB.setJobAttribute(job, 'MinorStatus', minorStatus, update=True) + self.log.debug("self.jobDB.setJobAttribute(%s,'Status','%s',update=True)" % (job, status)) + result = self.jobDB.setJobAttribute(job, 'Status', status, update=True) + if not result['OK']: + self.log.error("Failed setting Status", + "%s for job %d: %s" % (status, job, result['Message'])) + if minorStatus: + self.log.debug("self.jobDB.setJobAttribute(%s,'MinorStatus','%s',update=True)" % (job, minorStatus)) + result = self.jobDB.setJobAttribute(job, 'MinorStatus', minorStatus, update=True) + if not result['OK']: + self.log.error("Failed setting MinorStatus", + "%s for job %d: %s" % (minorStatus, job, result['Message'])) if not minorStatus: # Retain last minor status for stalled jobs result = self.jobDB.getJobAttributes(job, ['MinorStatus']) if result['OK']: minorStatus = result['Value']['MinorStatus'] + else: + self.log.error("Failed getting MinorStatus", + "for job %d: %s" % (job, result['Message'])) + minorStatus = 'Unknown' - logStatus = status - result = self.logDB.addLoggingRecord(job, status=logStatus, minorStatus=minorStatus, source='StalledJobAgent') + result = self.logDB.addLoggingRecord(job, status=status, minorStatus=minorStatus, source='StalledJobAgent') if not result['OK']: - self.log.warn(result) + self.log.warn(result['Message']) return result From 4337b3aabb41d363c63de7208d469c8378a8d025 Mon Sep 17 00:00:00 2001 From: fstagni Date: Mon, 29 Mar 2021 17:55:43 +0200 Subject: [PATCH 3/8] fixes for tests --- .../WorkloadManagementSystem/Agent/StalledJobAgent.py | 9 ++++++--- .../Agent/test/Test_Agent_StalledJobAgent.py | 9 +++++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py index 09b80576fa9..21cf27a671c 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py @@ -50,7 +50,7 @@ class StalledJobAgent(AgentModule): def __init__(self, *args, **kwargs): """ c'tor """ - AgentModule.__init__(self, *args, **kwargs) + super(StalledJobAgent, self).__init__(*args, **kwargs) self.jobDB = None self.logDB = None @@ -120,7 +120,7 @@ def execute(self): # 1) For marking the jobs stalled # This is the minimum time we wait for declaring a job Stalled, therefore it is safe - checkTime = dateTime() - stalledTime * second + checkTime = dateTime() - self.stalledTime * second checkedStatuses = [JobStatus.RUNNING, JobStatus.COMPLETING] # Only get jobs whose HeartBeat is older than the stalledTime result = self.jobDB.selectJobs({'Status': checkedStatuses}, @@ -187,7 +187,10 @@ def _markStalledJobs(self, jobID): """ delayTime = self.stalledTime # Add a tolerance time for some sites if required - site = self.jobDB.getJobAttribute(jobID, 'site')['Value'] + result = self.jobDB.getJobAttribute(jobID, 'site') + if not result['OK']: + return result + site = result['Value'] if site in self.stalledJobsTolerantSites: delayTime += self.stalledJobsToleranceTime # Check if the job is really stalled diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py index b21c8c7d308..249f719b928 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py @@ -33,9 +33,9 @@ def test__failSubmittingJobs(mocker): stalledJobAgent = StalledJobAgent() stalledJobAgent._AgentModule__configDefaults = mockAM + stalledJobAgent.log = gLogger stalledJobAgent.initialize() stalledJobAgent.jobDB.log = gLogger - stalledJobAgent.log = gLogger stalledJobAgent.log.setLevel('DEBUG') result = stalledJobAgent._failSubmittingJobs() @@ -57,9 +57,9 @@ def test__kickStuckJobs(mocker): stalledJobAgent = StalledJobAgent() stalledJobAgent._AgentModule__configDefaults = mockAM + stalledJobAgent.log = gLogger stalledJobAgent.initialize() stalledJobAgent.jobDB.log = gLogger - stalledJobAgent.log = gLogger stalledJobAgent.log.setLevel('DEBUG') result = stalledJobAgent._kickStuckJobs() @@ -81,9 +81,9 @@ def test__failStalledJobs(mocker): stalledJobAgent = StalledJobAgent() stalledJobAgent._AgentModule__configDefaults = mockAM + stalledJobAgent.log = gLogger stalledJobAgent.initialize() stalledJobAgent.jobDB.log = gLogger - stalledJobAgent.log = gLogger stalledJobAgent.log.setLevel('DEBUG') result = stalledJobAgent._failStalledJobs(0) @@ -105,10 +105,11 @@ def test__markStalledJobs(mocker): stalledJobAgent = StalledJobAgent() stalledJobAgent._AgentModule__configDefaults = mockAM + stalledJobAgent.log = gLogger stalledJobAgent.initialize() stalledJobAgent.jobDB.log = gLogger - stalledJobAgent.log = gLogger stalledJobAgent.log.setLevel('DEBUG') + stalledJobAgent.stalledTime = 120 result = stalledJobAgent._markStalledJobs(0) From c53411494b7b9758a350360bcb610ad862f89b2a Mon Sep 17 00:00:00 2001 From: fstagni Date: Mon, 29 Mar 2021 18:14:47 +0200 Subject: [PATCH 4/8] using threading is_alive instead of derpecated isAlive --- src/DIRAC/AccountingSystem/Client/DataStoreClient.py | 2 +- src/DIRAC/Core/DISET/private/MessageBroker.py | 2 +- src/DIRAC/Core/Utilities/MySQL.py | 2 +- src/DIRAC/ResourceStatusSystem/Utilities/RSSCache.py | 2 +- .../WorkloadManagementSystem/Agent/StalledJobAgent.py | 9 +++++---- .../WorkloadManagementSystem/JobWrapper/JobWrapper.py | 6 +++--- .../WorkloadManagementSystem/JobWrapper/Watchdog.py | 2 +- 7 files changed, 13 insertions(+), 12 deletions(-) diff --git a/src/DIRAC/AccountingSystem/Client/DataStoreClient.py b/src/DIRAC/AccountingSystem/Client/DataStoreClient.py index 2789330b8e4..d2c1ed1660d 100644 --- a/src/DIRAC/AccountingSystem/Client/DataStoreClient.py +++ b/src/DIRAC/AccountingSystem/Client/DataStoreClient.py @@ -125,7 +125,7 @@ def delayedCommit(self): allowing to send more registers at once (reduces overheads). """ - if not self.__commitTimer.isAlive(): + if not self.__commitTimer.is_alive(): self.__commitTimer = threading.Timer(5, self.commit) self.__commitTimer.start() diff --git a/src/DIRAC/Core/DISET/private/MessageBroker.py b/src/DIRAC/Core/DISET/private/MessageBroker.py index faa1cddfc24..f8152bbda20 100644 --- a/src/DIRAC/Core/DISET/private/MessageBroker.py +++ b/src/DIRAC/Core/DISET/private/MessageBroker.py @@ -124,7 +124,7 @@ def listenToTransport(self, trid, listen=True): # Listen to connections def __startListeningThread(self): - threadDead = self.__listeningForMessages and self.__listenThread is not None and not self.__listenThread.isAlive() + threadDead = self.__listeningForMessages and self.__listenThread is not None and not self.__listenThread.is_alive() if not self.__listeningForMessages or threadDead: self.__listeningForMessages = True self.__listenThread = threading.Thread(target=self.__listenAutoReceiveConnections) diff --git a/src/DIRAC/Core/Utilities/MySQL.py b/src/DIRAC/Core/Utilities/MySQL.py index 171f66058db..a5a4f36419c 100755 --- a/src/DIRAC/Core/Utilities/MySQL.py +++ b/src/DIRAC/Core/Utilities/MySQL.py @@ -333,7 +333,7 @@ def clean(self, now=False): now = time.time() self.__lastClean = now for thid in list(self.__assigned): - if not thid.isAlive(): + if not thid.is_alive(): self.__pop(thid) try: data = self.__assigned[thid] diff --git a/src/DIRAC/ResourceStatusSystem/Utilities/RSSCache.py b/src/DIRAC/ResourceStatusSystem/Utilities/RSSCache.py index 0bf65d5f237..703474b2fba 100644 --- a/src/DIRAC/ResourceStatusSystem/Utilities/RSSCache.py +++ b/src/DIRAC/ResourceStatusSystem/Utilities/RSSCache.py @@ -59,7 +59,7 @@ def isCacheAlive(self): ''' Returns status of the cache refreshing thread ''' - return S_OK(self.__refreshThread.isAlive()) + return S_OK(self.__refreshThread.is_alive()) def setLifeTime(self, lifeTime): ''' diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py index 21cf27a671c..0fe2a2ffbb3 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py @@ -75,7 +75,7 @@ def initialize(self): threadPool = ThreadPool(maxNumberOfThreads, maxNumberOfThreads) self.log.verbose("Multithreaded with %d threads" % maxNumberOfThreads) - for i in range(maxNumberOfThreads): + for _ in range(maxNumberOfThreads): threadPool.generateJobAndQueueIt(self._execute) return S_OK() @@ -84,7 +84,6 @@ def initialize(self): def execute(self): """ The main agent execution method """ - self.log.debug('Waking up Stalled Job Agent') # getting parameters @@ -118,7 +117,7 @@ def execute(self): # Now we are getting what's going to be checked - # 1) For marking the jobs stalled + # 1) Queueing the jobs that might be marked Stalled # This is the minimum time we wait for declaring a job Stalled, therefore it is safe checkTime = dateTime() - self.stalledTime * second checkedStatuses = [JobStatus.RUNNING, JobStatus.COMPLETING] @@ -134,7 +133,7 @@ def execute(self): for job in jobs: self.jobsQueue.put('%s:_markStalledJobs' % job) - # 2) For marking the Stalled jobs to Failed + # 2) Queueing the Stalled jobs that might be marked Failed result = self.jobDB.selectJobs({'Status': JobStatus.STALLED}) if not result['OK']: return result @@ -155,6 +154,8 @@ def execute(self): for job in jobs: self.jobsQueue.put('%s:__sendAccounting' % job) + # From here on we don't use the threads + # 4) Fail submitting jobs result = self._failSubmittingJobs() if not result['OK']: diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py index 808707d17f5..067e0bd02e2 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py @@ -416,15 +416,15 @@ def execute(self): if 'DisableCPUCheck' in self.jobArgs: watchdog.testCPUConsumed = False - if exeThread.isAlive(): + if exeThread.is_alive(): self.log.info('Application thread is started in Job Wrapper') watchdog.run() else: self.log.warn('Application thread stopped very quickly...') - if exeThread.isAlive(): + if exeThread.is_alive(): self.log.warn('Watchdog exited before completion of execution thread') - while exeThread.isAlive(): + while exeThread.is_alive(): time.sleep(5) outputs = None diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py index b6b3b6eba57..635e63aab01 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py @@ -195,7 +195,7 @@ def execute(self): """ The main agent execution method of the Watchdog. """ - if not self.exeThread.isAlive(): + if not self.exeThread.is_alive(): self.__getUsageSummary() self.log.info('Process to monitor has completed, Watchdog will exit.') return S_OK("Ended") From 07ed8c1e9921857aaacf2e6ffa8264b7ada2656a Mon Sep 17 00:00:00 2001 From: fstagni Date: Fri, 26 Mar 2021 17:56:28 +0100 Subject: [PATCH 5/8] get info from ConfigTemplate.cfg --- src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg | 1 + 1 file changed, 1 insertion(+) diff --git a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg index 94b67dddbe3..db5b9495c1a 100644 --- a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg @@ -142,6 +142,7 @@ Agents StalledTimeHours = 2 FailedTimeHours = 6 PollingTime = 3600 + maxNumberOfThreads = 15 } ##END ##BEGIN JobCleaningAgent From 9a30ec24f4d7a83e6da24ba691aa5f0fa3df0717 Mon Sep 17 00:00:00 2001 From: fstagni Date: Tue, 30 Mar 2021 10:42:06 +0200 Subject: [PATCH 6/8] using DErrno --- src/DIRAC/Core/Utilities/DErrno.py | 13 ++++++++----- .../WorkloadManagementSystem/DB/PilotAgentsDB.py | 14 +++++++------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/DIRAC/Core/Utilities/DErrno.py b/src/DIRAC/Core/Utilities/DErrno.py index 8077e0625ab..feb17203353 100644 --- a/src/DIRAC/Core/Utilities/DErrno.py +++ b/src/DIRAC/Core/Utilities/DErrno.py @@ -54,12 +54,12 @@ # 1200: Framework # 1300: Interfaces # 1400: Config -# 1500: WMS / Workflow -# 1600: DMS/StorageManagement +# 1500: WMS + Workflow +# 1600: DMS + StorageManagement # 1700: RMS -# 1800: Accounting -# 1900: TS -# 2000: Resources and RSS +# 1800: Accounting + Monitoring +# 1900: TS + Production +# 2000: Resources + RSS # ## Generic (10XX) # Python related: 0X @@ -116,6 +116,7 @@ EWMSJDL = 1501 EWMSRESC = 1502 EWMSSUBM = 1503 +EWMSNOPILOT = 1550 # ## DMS/StorageManagement (16XX) EFILESIZE = 1601 @@ -190,6 +191,7 @@ 1501: 'EWMSJDL', 1502: 'EWMSRESC', 1503: 'EWMSSUBM', + 1550: 'EWMSNOPILOT', # DMS/StorageManagement 1601: 'EFILESIZE', 1602: 'EGFAL', @@ -262,6 +264,7 @@ EWMSJDL: "Invalid job description", EWMSRESC: "Job to reschedule", EWMSSUBM: "Job submission error", + EWMSNOPILOT: "No pilots found", # DMS/StorageManagement EFILESIZE: "Bad file size", EGFAL: "Error with the gfal call", diff --git a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py index d60593769c5..e95e2121919 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py @@ -28,6 +28,7 @@ from DIRAC import S_OK, S_ERROR from DIRAC.Core.Base.DB import DB import DIRAC.Core.Utilities.Time as Time +from DIRAC.Core.Utilities import DErrno from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getCESiteMapping from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getUsernameForDN, getDNForUsername from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus @@ -300,7 +301,7 @@ def getPilotInfo(self, pilotRef=False, parentId=False, conn=False, paramNames=[] msg += " for PilotJobReference(s): %s" % pilotRef if parentId: msg += " with parent id: %s" % parentId - return S_ERROR(msg) + return S_ERROR(DErrno.EWMSNOPILOT, msg) resDict = {} pilotIDs = [] @@ -375,9 +376,10 @@ def storePilotOutput(self, pilotRef, output, error): e_error = result['Value'] req = "INSERT INTO PilotOutput (PilotID,StdOutput,StdError) VALUES (%d,%s,%s)" % (pilotID, e_output, e_error) result = self._update(req) + if not result['OK']: + return result req = "UPDATE PilotAgents SET OutputReady='True' where PilotID=%d" % pilotID - result = self._update(req) - return result + return self._update(req) ########################################################################################## def getPilotOutput(self, pilotRef): @@ -439,8 +441,7 @@ def setJobForPilot(self, jobID, pilotRef, site=None, updateStatus=True): if not result['OK']: return result req = "INSERT INTO JobToPilotMapping (PilotID,JobID,StartTime) VALUES (%d,%d,UTC_TIMESTAMP())" % (pilotID, jobID) - result = self._update(req) - return result + return self._update(req) else: return S_ERROR('PilotJobReference ' + pilotRef + ' not found') @@ -450,8 +451,7 @@ def setCurrentJobID(self, pilotRef, jobID): """ req = "UPDATE PilotAgents SET CurrentJobID=%d WHERE PilotJobReference='%s'" % (jobID, pilotRef) - result = self._update(req) - return result + return self._update(req) ########################################################################################## def getJobsForPilot(self, pilotID): From 4b9e538947178bd9c00172dbb82a67209df6b15c Mon Sep 17 00:00:00 2001 From: fstagni Date: Tue, 30 Mar 2021 10:44:18 +0200 Subject: [PATCH 7/8] better logging and error management --- .../Agent/StalledJobAgent.py | 80 ++++++++++--------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py index 0fe2a2ffbb3..5f692863ae5 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py @@ -23,6 +23,7 @@ from DIRAC import S_OK, S_ERROR, gConfig from DIRAC.AccountingSystem.Client.Types.Job import Job from DIRAC.Core.Base.AgentModule import AgentModule +from DIRAC.Core.Utilities import DErrno from DIRAC.Core.Utilities.Time import fromString, toEpoch, dateTime, second from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd from DIRAC.Core.Utilities.ThreadPool import ThreadPool @@ -37,15 +38,8 @@ class StalledJobAgent(AgentModule): + """ Agent for setting Running jobs Stalled, and Stalled jobs Failed. And a few more. """ -The specific agents must provide the following methods: -- initialize() for initial settings -- beginExecution() -- execute() - the main method called in the agent cycle -- endExecution() -- finalize() - the graceful exit of the method, this one is usually used -for the agent restart -""" def __init__(self, *args, **kwargs): """ c'tor @@ -125,7 +119,7 @@ def execute(self): result = self.jobDB.selectJobs({'Status': checkedStatuses}, older=checkTime, timeStamp='HeartBeatTime') if not result['OK']: - return result + self.log.error("Issue selecting %s jobs" % ' & '.join(checkedStatuses), result['Message']) if result['Value']: jobs = sorted(result['Value']) self.log.info('%s jobs will be checked for being stalled' % ' & '.join(checkedStatuses), @@ -136,7 +130,7 @@ def execute(self): # 2) Queueing the Stalled jobs that might be marked Failed result = self.jobDB.selectJobs({'Status': JobStatus.STALLED}) if not result['OK']: - return result + self.log.error("Issue selecting Stalled jobs", result['Message']) if result['Value']: jobs = sorted(result['Value']) self.log.info('Jobs Stalled will be checked for failure', '(n=%d)' % len(jobs)) @@ -147,7 +141,7 @@ def execute(self): for minor in self.minorStalledStatuses: result = self.jobDB.selectJobs({'Status': JobStatus.FAILED, 'MinorStatus': minor, 'AccountedFlag': 'False'}) if not result['OK']: - return result + self.log.error("Issue selecting jobs for accounting", result['Message']) if result['Value']: jobs = result['Value'] self.log.info('Stalled jobs will be Accounted', '(n=%d)' % (len(jobs))) @@ -166,7 +160,7 @@ def execute(self): if not result['OK']: self.log.error('Failed to kick stuck jobs', result['Message']) - return S_OK('Stalled Job Agent cycle complete') + return S_OK() def _execute(self): """ @@ -178,13 +172,16 @@ def _execute(self): jobID = int(jobID) res = getattr(self, '%s' % jobOp)(jobID) if not res['OK']: - self.log.error("Failure executing %s" % jobOp, "on %d" % jobID) + self.log.error("Failure executing %s" % jobOp, + "on %d: %s" % (jobID, res['Message'])) ############################################################################# def _markStalledJobs(self, jobID): """ Identifies if JobID is stalled: running or completing without update longer than stalledTime. + + Run inside thread. """ delayTime = self.stalledTime # Add a tolerance time for some sites if required @@ -196,16 +193,17 @@ def _markStalledJobs(self, jobID): delayTime += self.stalledJobsToleranceTime # Check if the job is really stalled result = self.__checkJobStalled(jobID, delayTime) - if result['OK']: - self.log.verbose('Updating status to Stalled', 'for job %s' % (jobID)) - self.__updateJobStatus(jobID, JobStatus.STALLED) - else: - self.log.error(result['Message']) - return S_OK() + if not result['OK']: + return result + self.log.verbose('Updating status to Stalled', 'for job %s' % (jobID)) + return self.__updateJobStatus(jobID, JobStatus.STALLED) ############################################################################# def _failStalledJobs(self, jobID): - """ Changes the Stalled status to Failed for jobs long in the Stalled status + """ + Changes the Stalled status to Failed for jobs long in the Stalled status. + + Run inside thread. """ setFailed = False @@ -232,8 +230,8 @@ def _failStalledJobs(self, jobID): # Set the jobs Failed, send them a kill signal in case they are not really dead # and send accounting info if setFailed: - self.__sendKillCommand(jobID) - self.__updateJobStatus(jobID, JobStatus.FAILED, minorStatus=setFailed) + self.__sendKillCommand(jobID) # always returns None + return self.__updateJobStatus(jobID, JobStatus.FAILED, minorStatus=setFailed) return S_OK() @@ -250,12 +248,12 @@ def __getJobPilotStatus(self, jobID): result = PilotManagerClient().getPilotInfo(pilotReference) if not result['OK']: - if "No pilots found" in result['Message']: - self.log.warn(result['Message']) + if DErrno.cmpError(result, DErrno.EWMSNOPILOT): + self.log.warn("No pilot found", "for job %d: %s" % (jobID, result['Message'])) return S_OK('NoPilot') self.log.error('Failed to get pilot information', - 'for job %d: ' % jobID + result['Message']) - return S_ERROR('Failed to get the pilot status') + 'for job %d: %s' % (jobID, result['Message'])) + return result pilotStatus = result['Value'][pilotReference]['Status'] return S_OK(pilotStatus) @@ -283,10 +281,9 @@ def __getLatestUpdateTime(self, job): """ Returns the most recent of HeartBeatTime and LastUpdateTime """ result = self.jobDB.getJobAttributes(job, ['HeartBeatTime', 'LastUpdateTime']) - if not result['OK']: - self.log.error('Failed to get job attributes', result['Message']) if not result['OK'] or not result['Value']: - self.log.error('Could not get attributes for job', '%s' % job) + self.log.error('Failed to get job attributes', + 'for job %d: %s' % (job, result['Message'] if 'Message' in result else 'empty')) return S_ERROR('Could not get attributes for job') latestUpdate = 0 @@ -314,17 +311,21 @@ def __updateJobStatus(self, job, status, minorStatus=None): if not self.am_getOption('Enable', True): return S_OK('Disabled') + toRet = S_OK() + self.log.debug("self.jobDB.setJobAttribute(%s,'Status','%s',update=True)" % (job, status)) result = self.jobDB.setJobAttribute(job, 'Status', status, update=True) if not result['OK']: self.log.error("Failed setting Status", "%s for job %d: %s" % (status, job, result['Message'])) + toRet = result if minorStatus: self.log.debug("self.jobDB.setJobAttribute(%s,'MinorStatus','%s',update=True)" % (job, minorStatus)) result = self.jobDB.setJobAttribute(job, 'MinorStatus', minorStatus, update=True) - if not result['OK']: - self.log.error("Failed setting MinorStatus", - "%s for job %d: %s" % (minorStatus, job, result['Message'])) + if not result['OK']: + self.log.error("Failed setting MinorStatus", + "%s for job %d: %s" % (minorStatus, job, result['Message'])) + toRet = result if not minorStatus: # Retain last minor status for stalled jobs result = self.jobDB.getJobAttributes(job, ['MinorStatus']) @@ -333,13 +334,15 @@ def __updateJobStatus(self, job, status, minorStatus=None): else: self.log.error("Failed getting MinorStatus", "for job %d: %s" % (job, result['Message'])) - minorStatus = 'Unknown' + minorStatus = 'idem' + toRet = result result = self.logDB.addLoggingRecord(job, status=status, minorStatus=minorStatus, source='StalledJobAgent') if not result['OK']: - self.log.warn(result['Message']) + self.log.warn("Failed adding logging record", result['Message']) + toRet = result - return result + return toRet def __getProcessingType(self, jobID): """ Get the Processing Type from the JDL, until it is promoted to a real Attribute @@ -354,7 +357,10 @@ def __getProcessingType(self, jobID): return processingType def __sendAccounting(self, jobID): - """ Send WMS accounting data for the given job + """ + Send WMS accounting data for the given job. + + Run inside thread. """ try: accountingReport = Job() @@ -572,5 +578,3 @@ def __sendKillCommand(self, job): else: self.log.error("Failed to get ownerDN or Group for job:", "%s: %s, %s" % (job, ownerDN.get('Message', ''), ownerGroup.get('Message', ''))) - -# EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF# From 25d07abc746d1fe62fd643550b6abee88fbac717 Mon Sep 17 00:00:00 2001 From: fstagni Date: Tue, 30 Mar 2021 14:04:57 +0200 Subject: [PATCH 8/8] fix naming --- src/DIRAC/Core/Utilities/MySQL.py | 2 +- .../Agent/StalledJobAgent.py | 60 +++++++++---------- .../ConfigTemplate.cfg | 2 +- 3 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/DIRAC/Core/Utilities/MySQL.py b/src/DIRAC/Core/Utilities/MySQL.py index a5a4f36419c..84c2757d45e 100755 --- a/src/DIRAC/Core/Utilities/MySQL.py +++ b/src/DIRAC/Core/Utilities/MySQL.py @@ -333,7 +333,7 @@ def clean(self, now=False): now = time.time() self.__lastClean = now for thid in list(self.__assigned): - if not thid.is_alive(): + if not thid.is_alive(): self.__pop(thid) try: data = self.__assigned[thid] diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py index 5f692863ae5..ba9ba581bef 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py @@ -65,7 +65,7 @@ def initialize(self): self.log.info('Stalled Job Agent running in disabled mode') # setting up the threading - maxNumberOfThreads = self.am_getOption('maxNumberOfThreads', 15) + maxNumberOfThreads = self.am_getOption('MaxNumberOfThreads', 15) threadPool = ThreadPool(maxNumberOfThreads, maxNumberOfThreads) self.log.verbose("Multithreaded with %d threads" % maxNumberOfThreads) @@ -106,8 +106,8 @@ def execute(self): self.failedTime = int(watchdogCycle * (failedTime + 0.5)) self.minorStalledStatuses = ( - JobMinorStatus.STALLED_PILOT_NOT_RUNNING, - 'Stalling for more than %d sec' % self.failedTime) + JobMinorStatus.STALLED_PILOT_NOT_RUNNING, + 'Stalling for more than %d sec' % self.failedTime) # Now we are getting what's going to be checked @@ -117,15 +117,15 @@ def execute(self): checkedStatuses = [JobStatus.RUNNING, JobStatus.COMPLETING] # Only get jobs whose HeartBeat is older than the stalledTime result = self.jobDB.selectJobs({'Status': checkedStatuses}, - older=checkTime, timeStamp='HeartBeatTime') + older=checkTime, timeStamp='HeartBeatTime') if not result['OK']: self.log.error("Issue selecting %s jobs" % ' & '.join(checkedStatuses), result['Message']) if result['Value']: jobs = sorted(result['Value']) self.log.info('%s jobs will be checked for being stalled' % ' & '.join(checkedStatuses), - '(n=%d, heartbeat before %s)' % (len(jobs), str(checkTime))) + '(n=%d, heartbeat before %s)' % (len(jobs), str(checkTime))) for job in jobs: - self.jobsQueue.put('%s:_markStalledJobs' % job) + self.jobsQueue.put('%s:_markStalledJobs' % job) # 2) Queueing the Stalled jobs that might be marked Failed result = self.jobDB.selectJobs({'Status': JobStatus.STALLED}) @@ -135,18 +135,18 @@ def execute(self): jobs = sorted(result['Value']) self.log.info('Jobs Stalled will be checked for failure', '(n=%d)' % len(jobs)) for job in jobs: - self.jobsQueue.put('%s:_failStalledJobs' % job) + self.jobsQueue.put('%s:_failStalledJobs' % job) # 3) Send accounting for minor in self.minorStalledStatuses: result = self.jobDB.selectJobs({'Status': JobStatus.FAILED, 'MinorStatus': minor, 'AccountedFlag': 'False'}) if not result['OK']: - self.log.error("Issue selecting jobs for accounting", result['Message']) + self.log.error("Issue selecting jobs for accounting", result['Message']) if result['Value']: - jobs = result['Value'] - self.log.info('Stalled jobs will be Accounted', '(n=%d)' % (len(jobs))) - for job in jobs: - self.jobsQueue.put('%s:__sendAccounting' % job) + jobs = result['Value'] + self.log.info('Stalled jobs will be Accounted', '(n=%d)' % (len(jobs))) + for job in jobs: + self.jobsQueue.put('%s:__sendAccounting' % job) # From here on we don't use the threads @@ -172,8 +172,8 @@ def _execute(self): jobID = int(jobID) res = getattr(self, '%s' % jobOp)(jobID) if not res['OK']: - self.log.error("Failure executing %s" % jobOp, - "on %d: %s" % (jobID, res['Message'])) + self.log.error("Failure executing %s" % jobOp, + "on %d: %s" % (jobID, res['Message'])) ############################################################################# def _markStalledJobs(self, jobID): @@ -211,7 +211,7 @@ def _failStalledJobs(self, jobID): result = self.__getJobPilotStatus(jobID) if not result['OK']: self.log.error('Failed to get pilot status', - "for job %d: %s" % (jobID, result['Message'])) + "for job %d: %s" % (jobID, result['Message'])) return result pilotStatus = result['Value'] if pilotStatus != "Running": @@ -220,12 +220,12 @@ def _failStalledJobs(self, jobID): # Verify that there was no sign of life for long enough result = self.__getLatestUpdateTime(jobID) if not result['OK']: - self.log.error('Failed to get job update time', - "for job %d: %s" % (jobID, result['Message'])) + self.log.error('Failed to get job update time', + "for job %d: %s" % (jobID, result['Message'])) return result elapsedTime = toEpoch() - result['Value'] if elapsedTime > self.failedTime: - setFailed = self.minorStalledStatuses[1] + setFailed = self.minorStalledStatuses[1] # Set the jobs Failed, send them a kill signal in case they are not really dead # and send accounting info @@ -249,10 +249,10 @@ def __getJobPilotStatus(self, jobID): result = PilotManagerClient().getPilotInfo(pilotReference) if not result['OK']: if DErrno.cmpError(result, DErrno.EWMSNOPILOT): - self.log.warn("No pilot found", "for job %d: %s" % (jobID, result['Message'])) + self.log.warn("No pilot found", "for job %d: %s" % (jobID, result['Message'])) return S_OK('NoPilot') self.log.error('Failed to get pilot information', - 'for job %d: %s' % (jobID, result['Message'])) + 'for job %d: %s' % (jobID, result['Message'])) return result pilotStatus = result['Value'][pilotReference]['Status'] @@ -271,7 +271,7 @@ def __checkJobStalled(self, job, stalledTime): self.log.debug('(CurrentTime-LastUpdate) = %s secs' % (elapsedTime)) if elapsedTime > stalledTime: self.log.info('Job is identified as stalled', - ": jobID %d with last update > %s secs ago" % (job, elapsedTime)) + ": jobID %d with last update > %s secs ago" % (job, elapsedTime)) return S_OK('Stalled') return S_ERROR('Job %s is running and will be ignored' % job) @@ -283,7 +283,7 @@ def __getLatestUpdateTime(self, job): result = self.jobDB.getJobAttributes(job, ['HeartBeatTime', 'LastUpdateTime']) if not result['OK'] or not result['Value']: self.log.error('Failed to get job attributes', - 'for job %d: %s' % (job, result['Message'] if 'Message' in result else 'empty')) + 'for job %d: %s' % (job, result['Message'] if 'Message' in result else 'empty')) return S_ERROR('Could not get attributes for job') latestUpdate = 0 @@ -317,25 +317,25 @@ def __updateJobStatus(self, job, status, minorStatus=None): result = self.jobDB.setJobAttribute(job, 'Status', status, update=True) if not result['OK']: self.log.error("Failed setting Status", - "%s for job %d: %s" % (status, job, result['Message'])) + "%s for job %d: %s" % (status, job, result['Message'])) toRet = result if minorStatus: self.log.debug("self.jobDB.setJobAttribute(%s,'MinorStatus','%s',update=True)" % (job, minorStatus)) result = self.jobDB.setJobAttribute(job, 'MinorStatus', minorStatus, update=True) if not result['OK']: - self.log.error("Failed setting MinorStatus", - "%s for job %d: %s" % (minorStatus, job, result['Message'])) - toRet = result + self.log.error("Failed setting MinorStatus", + "%s for job %d: %s" % (minorStatus, job, result['Message'])) + toRet = result if not minorStatus: # Retain last minor status for stalled jobs result = self.jobDB.getJobAttributes(job, ['MinorStatus']) if result['OK']: minorStatus = result['Value']['MinorStatus'] else: - self.log.error("Failed getting MinorStatus", - "for job %d: %s" % (job, result['Message'])) - minorStatus = 'idem' - toRet = result + self.log.error("Failed getting MinorStatus", + "for job %d: %s" % (job, result['Message'])) + minorStatus = 'idem' + toRet = result result = self.logDB.addLoggingRecord(job, status=status, minorStatus=minorStatus, source='StalledJobAgent') if not result['OK']: diff --git a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg index db5b9495c1a..70279363218 100644 --- a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg @@ -142,7 +142,7 @@ Agents StalledTimeHours = 2 FailedTimeHours = 6 PollingTime = 3600 - maxNumberOfThreads = 15 + MaxNumberOfThreads = 15 } ##END ##BEGIN JobCleaningAgent