diff --git a/src/DIRAC/AccountingSystem/Client/DataStoreClient.py b/src/DIRAC/AccountingSystem/Client/DataStoreClient.py index 2789330b8e4..d2c1ed1660d 100644 --- a/src/DIRAC/AccountingSystem/Client/DataStoreClient.py +++ b/src/DIRAC/AccountingSystem/Client/DataStoreClient.py @@ -125,7 +125,7 @@ def delayedCommit(self): allowing to send more registers at once (reduces overheads). """ - if not self.__commitTimer.isAlive(): + if not self.__commitTimer.is_alive(): self.__commitTimer = threading.Timer(5, self.commit) self.__commitTimer.start() diff --git a/src/DIRAC/Core/DISET/private/MessageBroker.py b/src/DIRAC/Core/DISET/private/MessageBroker.py index faa1cddfc24..f8152bbda20 100644 --- a/src/DIRAC/Core/DISET/private/MessageBroker.py +++ b/src/DIRAC/Core/DISET/private/MessageBroker.py @@ -124,7 +124,7 @@ def listenToTransport(self, trid, listen=True): # Listen to connections def __startListeningThread(self): - threadDead = self.__listeningForMessages and self.__listenThread is not None and not self.__listenThread.isAlive() + threadDead = self.__listeningForMessages and self.__listenThread is not None and not self.__listenThread.is_alive() if not self.__listeningForMessages or threadDead: self.__listeningForMessages = True self.__listenThread = threading.Thread(target=self.__listenAutoReceiveConnections) diff --git a/src/DIRAC/Core/Utilities/DErrno.py b/src/DIRAC/Core/Utilities/DErrno.py index 8077e0625ab..feb17203353 100644 --- a/src/DIRAC/Core/Utilities/DErrno.py +++ b/src/DIRAC/Core/Utilities/DErrno.py @@ -54,12 +54,12 @@ # 1200: Framework # 1300: Interfaces # 1400: Config -# 1500: WMS / Workflow -# 1600: DMS/StorageManagement +# 1500: WMS + Workflow +# 1600: DMS + StorageManagement # 1700: RMS -# 1800: Accounting -# 1900: TS -# 2000: Resources and RSS +# 1800: Accounting + Monitoring +# 1900: TS + Production +# 2000: Resources + RSS # ## Generic (10XX) # Python related: 0X @@ -116,6 +116,7 @@ EWMSJDL = 1501 EWMSRESC = 1502 EWMSSUBM = 1503 +EWMSNOPILOT = 1550 # ## DMS/StorageManagement (16XX) EFILESIZE = 1601 @@ -190,6 +191,7 @@ 1501: 'EWMSJDL', 1502: 'EWMSRESC', 1503: 'EWMSSUBM', + 1550: 'EWMSNOPILOT', # DMS/StorageManagement 1601: 'EFILESIZE', 1602: 'EGFAL', @@ -262,6 +264,7 @@ EWMSJDL: "Invalid job description", EWMSRESC: "Job to reschedule", EWMSSUBM: "Job submission error", + EWMSNOPILOT: "No pilots found", # DMS/StorageManagement EFILESIZE: "Bad file size", EGFAL: "Error with the gfal call", diff --git a/src/DIRAC/Core/Utilities/MySQL.py b/src/DIRAC/Core/Utilities/MySQL.py index 40fb2a5602b..cd830034e82 100755 --- a/src/DIRAC/Core/Utilities/MySQL.py +++ b/src/DIRAC/Core/Utilities/MySQL.py @@ -333,7 +333,7 @@ def clean(self, now=False): now = time.time() self.__lastClean = now for thid in list(self.__assigned): - if not thid.isAlive(): + if not thid.is_alive(): self.__pop(thid) try: data = self.__assigned[thid] diff --git a/src/DIRAC/FrameworkSystem/private/standardLogging/Logging.py b/src/DIRAC/FrameworkSystem/private/standardLogging/Logging.py index 86d1f0d7323..683927b5154 100644 --- a/src/DIRAC/FrameworkSystem/private/standardLogging/Logging.py +++ b/src/DIRAC/FrameworkSystem/private/standardLogging/Logging.py @@ -12,14 +12,11 @@ from DIRAC import S_ERROR from DIRAC.FrameworkSystem.private.standardLogging.LogLevels import LogLevels -from DIRAC.Core.Utilities.Decorators import deprecated from DIRAC.Core.Utilities.LockRing import LockRing -from DIRAC.Resources.LogBackends.AbstractBackend import AbstractBackend class Logging(object): """ - - Logging is a wrapper of the logger object from the standard "logging" library which integrates some DIRAC concepts. - It aimed at seamlessly replacing the previous gLogger implementation and thus provides the same interface. - Logging is generally used to create log records, that are then sent to pre-determined backends. @@ -453,4 +450,3 @@ def getSubLogger(self, subName, child=True): return childLogging finally: self._lockInit.release() - \ No newline at end of file diff --git a/src/DIRAC/ResourceStatusSystem/Utilities/RSSCache.py b/src/DIRAC/ResourceStatusSystem/Utilities/RSSCache.py index 0bf65d5f237..703474b2fba 100644 --- a/src/DIRAC/ResourceStatusSystem/Utilities/RSSCache.py +++ b/src/DIRAC/ResourceStatusSystem/Utilities/RSSCache.py @@ -59,7 +59,7 @@ def isCacheAlive(self): ''' Returns status of the cache refreshing thread ''' - return S_OK(self.__refreshThread.isAlive()) + return S_OK(self.__refreshThread.is_alive()) def setLifeTime(self, lifeTime): ''' diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py index d3f8f807f77..ba9ba581bef 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py @@ -18,11 +18,15 @@ __RCSID__ = "$Id$" import six +from six.moves.queue import Queue + from DIRAC import S_OK, S_ERROR, gConfig from DIRAC.AccountingSystem.Client.Types.Job import Job from DIRAC.Core.Base.AgentModule import AgentModule +from DIRAC.Core.Utilities import DErrno from DIRAC.Core.Utilities.Time import fromString, toEpoch, dateTime, second from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd +from DIRAC.Core.Utilities.ThreadPool import ThreadPool from DIRAC.ConfigurationSystem.Client.Helpers import cfgPath from DIRAC.ConfigurationSystem.Client.PathFinder import getSystemInstance from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient @@ -34,20 +38,13 @@ class StalledJobAgent(AgentModule): + """ Agent for setting Running jobs Stalled, and Stalled jobs Failed. And a few more. """ -The specific agents must provide the following methods: -- initialize() for initial settings -- beginExecution() -- execute() - the main method called in the agent cycle -- endExecution() -- finalize() - the graceful exit of the method, this one is usually used -for the agent restart -""" def __init__(self, *args, **kwargs): """ c'tor """ - AgentModule.__init__(self, *args, **kwargs) + super(StalledJobAgent, self).__init__(*args, **kwargs) self.jobDB = None self.logDB = None @@ -55,6 +52,7 @@ def __init__(self, *args, **kwargs): self.rescheduledTime = 600 self.submittingTime = 300 self.stalledJobsTolerantSites = [] + self.jobsQueue = Queue() ############################################################################# def initialize(self): @@ -65,15 +63,24 @@ def initialize(self): self.am_setOption('PollingTime', 60 * 60) if not self.am_getOption('Enable', True): self.log.info('Stalled Job Agent running in disabled mode') + + # setting up the threading + maxNumberOfThreads = self.am_getOption('MaxNumberOfThreads', 15) + threadPool = ThreadPool(maxNumberOfThreads, maxNumberOfThreads) + self.log.verbose("Multithreaded with %d threads" % maxNumberOfThreads) + + for _ in range(maxNumberOfThreads): + threadPool.generateJobAndQueueIt(self._execute) + return S_OK() ############################################################################# def execute(self): """ The main agent execution method """ + self.log.debug('Waking up Stalled Job Agent') - self.log.verbose('Waking up Stalled Job Agent') - + # getting parameters wms_instance = getSystemInstance('WorkloadManagement') if not wms_instance: return S_ERROR('Can not get the WorkloadManagement system instance') @@ -88,149 +95,146 @@ def execute(self): self.matchedTime = self.am_getOption('MatchedTime', self.matchedTime) self.rescheduledTime = self.am_getOption('RescheduledTime', self.rescheduledTime) - self.log.verbose('StalledTime = %s cycles' % (stalledTime)) - self.log.verbose('FailedTime = %s cycles' % (failedTime)) + self.log.verbose('', 'StalledTime = %s cycles' % (stalledTime)) + self.log.verbose('', 'FailedTime = %s cycles' % (failedTime)) watchdogCycle = gConfig.getValue(cfgPath(wrapperSection, 'CheckingTime'), 30 * 60) watchdogCycle = max(watchdogCycle, gConfig.getValue(cfgPath(wrapperSection, 'MinCheckingTime'), 20 * 60)) # Add half cycle to avoid race conditions - stalledTime = int(watchdogCycle * (stalledTime + 0.5)) - failedTime = int(watchdogCycle * (failedTime + 0.5)) + self.stalledTime = int(watchdogCycle * (stalledTime + 0.5)) + self.failedTime = int(watchdogCycle * (failedTime + 0.5)) - result = self._markStalledJobs(stalledTime) - if not result['OK']: - self.log.error('Failed to detect stalled jobs', result['Message']) + self.minorStalledStatuses = ( + JobMinorStatus.STALLED_PILOT_NOT_RUNNING, + 'Stalling for more than %d sec' % self.failedTime) - # Note, jobs will be revived automatically during the heartbeat signal phase and - # subsequent status changes will result in jobs not being selected by the - # stalled job agent. + # Now we are getting what's going to be checked + + # 1) Queueing the jobs that might be marked Stalled + # This is the minimum time we wait for declaring a job Stalled, therefore it is safe + checkTime = dateTime() - self.stalledTime * second + checkedStatuses = [JobStatus.RUNNING, JobStatus.COMPLETING] + # Only get jobs whose HeartBeat is older than the stalledTime + result = self.jobDB.selectJobs({'Status': checkedStatuses}, + older=checkTime, timeStamp='HeartBeatTime') + if not result['OK']: + self.log.error("Issue selecting %s jobs" % ' & '.join(checkedStatuses), result['Message']) + if result['Value']: + jobs = sorted(result['Value']) + self.log.info('%s jobs will be checked for being stalled' % ' & '.join(checkedStatuses), + '(n=%d, heartbeat before %s)' % (len(jobs), str(checkTime))) + for job in jobs: + self.jobsQueue.put('%s:_markStalledJobs' % job) - result = self._failStalledJobs(failedTime) + # 2) Queueing the Stalled jobs that might be marked Failed + result = self.jobDB.selectJobs({'Status': JobStatus.STALLED}) if not result['OK']: - self.log.error('Failed to process stalled jobs', result['Message']) + self.log.error("Issue selecting Stalled jobs", result['Message']) + if result['Value']: + jobs = sorted(result['Value']) + self.log.info('Jobs Stalled will be checked for failure', '(n=%d)' % len(jobs)) + for job in jobs: + self.jobsQueue.put('%s:_failStalledJobs' % job) + # 3) Send accounting + for minor in self.minorStalledStatuses: + result = self.jobDB.selectJobs({'Status': JobStatus.FAILED, 'MinorStatus': minor, 'AccountedFlag': 'False'}) + if not result['OK']: + self.log.error("Issue selecting jobs for accounting", result['Message']) + if result['Value']: + jobs = result['Value'] + self.log.info('Stalled jobs will be Accounted', '(n=%d)' % (len(jobs))) + for job in jobs: + self.jobsQueue.put('%s:__sendAccounting' % job) + + # From here on we don't use the threads + + # 4) Fail submitting jobs result = self._failSubmittingJobs() if not result['OK']: self.log.error('Failed to process jobs being submitted', result['Message']) + # 5) Kick stuck jobs result = self._kickStuckJobs() if not result['OK']: self.log.error('Failed to kick stuck jobs', result['Message']) - return S_OK('Stalled Job Agent cycle complete') + return S_OK() - ############################################################################# - def _markStalledJobs(self, stalledTime): - """ Identifies stalled jobs running or completing without update longer than stalledTime. + def _execute(self): """ - stalledCounter = 0 - aliveCounter = 0 - # This is the minimum time we wait for declaring a job Stalled, therefore it is safe - checkTime = dateTime() - stalledTime * second - checkedStatuses = [JobStatus.RUNNING, JobStatus.COMPLETING] - # Only get jobs whose HeartBeat is older than the stalledTime - result = self.jobDB.selectJobs({'Status': checkedStatuses}, - older=checkTime, timeStamp='HeartBeatTime') - if not result['OK']: - return result - if not result['Value']: - return S_OK() - jobs = sorted(result['Value']) - self.log.info('%d %s jobs will be checked for being stalled, heartbeat before %s' % - (len(jobs), ' & '.join(checkedStatuses), str(checkTime))) - - for job in jobs: - delayTime = stalledTime - # Add a tolerance time for some sites if required - site = self.jobDB.getJobAttribute(job, 'site')['Value'] - if site in self.stalledJobsTolerantSites: - delayTime += self.stalledJobsToleranceTime - # Check if the job is really stalled - result = self.__checkJobStalled(job, delayTime) - if result['OK']: - self.log.verbose('Updating status to Stalled for job %s' % (job)) - self.__updateJobStatus(job, JobStatus.STALLED) - stalledCounter += 1 - else: - self.log.verbose(result['Message']) - aliveCounter += 1 - - self.log.info('Total jobs: %d, Stalled jobs: %d, %s jobs: %d' % - (len(jobs), stalledCounter, '+'.join(checkedStatuses), aliveCounter)) - return S_OK() + Doing the actual job. This is run inside the threads + """ + while True: + job_Op = self.jobsQueue.get() + jobID, jobOp = job_Op.split(':') + jobID = int(jobID) + res = getattr(self, '%s' % jobOp)(jobID) + if not res['OK']: + self.log.error("Failure executing %s" % jobOp, + "on %d: %s" % (jobID, res['Message'])) ############################################################################# - def _failStalledJobs(self, failedTime): - """ Changes the Stalled status to Failed for jobs long in the Stalled status + def _markStalledJobs(self, jobID): """ - # Only get jobs that have been Stalled for long enough - result = self.jobDB.selectJobs({'Status': JobStatus.STALLED}) + Identifies if JobID is stalled: + running or completing without update longer than stalledTime. + + Run inside thread. + """ + delayTime = self.stalledTime + # Add a tolerance time for some sites if required + result = self.jobDB.getJobAttribute(jobID, 'site') + if not result['OK']: + return result + site = result['Value'] + if site in self.stalledJobsTolerantSites: + delayTime += self.stalledJobsToleranceTime + # Check if the job is really stalled + result = self.__checkJobStalled(jobID, delayTime) if not result['OK']: return result - jobs = result['Value'] + self.log.verbose('Updating status to Stalled', 'for job %s' % (jobID)) + return self.__updateJobStatus(jobID, JobStatus.STALLED) - failedCounter = 0 - minorStalledStatuses = (JobMinorStatus.STALLED_PILOT_NOT_RUNNING, 'Stalling for more than %d sec' % failedTime) + ############################################################################# + def _failStalledJobs(self, jobID): + """ + Changes the Stalled status to Failed for jobs long in the Stalled status. - if jobs: - self.log.info('%d jobs Stalled will be checked for failure' % len(jobs)) + Run inside thread. + """ - for job in jobs: - setFailed = False - # Check if the job pilot is lost - result = self.__getJobPilotStatus(job) - if not result['OK']: - self.log.error('Failed to get pilot status', result['Message']) - continue - pilotStatus = result['Value'] - if pilotStatus != "Running": - setFailed = minorStalledStatuses[0] - else: - # Verify that there was no sign of life for long enough - result = self.__getLatestUpdateTime(job) - if not result['OK']: - self.log.error('Failed to get job update time', result['Message']) - continue - elapsedTime = toEpoch() - result['Value'] - if elapsedTime > failedTime: - setFailed = minorStalledStatuses[1] - - # Set the jobs Failed, send them a kill signal in case they are not really dead and send accounting info - if setFailed: - self.__sendKillCommand(job) - self.__updateJobStatus(job, JobStatus.FAILED, minorStatus=setFailed) - failedCounter += 1 - result = self.__sendAccounting(job) - if not result['OK']: - self.log.error('Failed to send accounting', result['Message']) - - recoverCounter = 0 - - for minor in minorStalledStatuses: - result = self.jobDB.selectJobs({'Status': JobStatus.FAILED, 'MinorStatus': minor, 'AccountedFlag': 'False'}) + setFailed = False + # Check if the job pilot is lost + result = self.__getJobPilotStatus(jobID) + if not result['OK']: + self.log.error('Failed to get pilot status', + "for job %d: %s" % (jobID, result['Message'])) + return result + pilotStatus = result['Value'] + if pilotStatus != "Running": + setFailed = self.minorStalledStatuses[0] + else: + # Verify that there was no sign of life for long enough + result = self.__getLatestUpdateTime(jobID) if not result['OK']: + self.log.error('Failed to get job update time', + "for job %d: %s" % (jobID, result['Message'])) return result - if result['Value']: - jobs = result['Value'] - self.log.info('%s Stalled jobs will be Accounted' % (len(jobs))) - for job in jobs: - result = self.__sendAccounting(job) - if not result['OK']: - self.log.error('Failed to send accounting', result['Message']) - continue + elapsedTime = toEpoch() - result['Value'] + if elapsedTime > self.failedTime: + setFailed = self.minorStalledStatuses[1] - recoverCounter += 1 - if not result['OK']: - break + # Set the jobs Failed, send them a kill signal in case they are not really dead + # and send accounting info + if setFailed: + self.__sendKillCommand(jobID) # always returns None + return self.__updateJobStatus(jobID, JobStatus.FAILED, minorStatus=setFailed) - if failedCounter: - self.log.info('%d jobs set to Failed' % failedCounter) - if recoverCounter: - self.log.info('%d jobs properly Accounted' % recoverCounter) - return S_OK(failedCounter) + return S_OK() - ############################################################################# def __getJobPilotStatus(self, jobID): """ Get the job pilot status """ @@ -244,12 +248,12 @@ def __getJobPilotStatus(self, jobID): result = PilotManagerClient().getPilotInfo(pilotReference) if not result['OK']: - if "No pilots found" in result['Message']: - self.log.warn(result['Message']) + if DErrno.cmpError(result, DErrno.EWMSNOPILOT): + self.log.warn("No pilot found", "for job %d: %s" % (jobID, result['Message'])) return S_OK('NoPilot') self.log.error('Failed to get pilot information', - 'for job %d: ' % jobID + result['Message']) - return S_ERROR('Failed to get the pilot status') + 'for job %d: %s' % (jobID, result['Message'])) + return result pilotStatus = result['Value'][pilotReference]['Status'] return S_OK(pilotStatus) @@ -264,9 +268,10 @@ def __checkJobStalled(self, job, stalledTime): return result elapsedTime = toEpoch() - result['Value'] - self.log.verbose('(CurrentTime-LastUpdate) = %s secs' % (elapsedTime)) + self.log.debug('(CurrentTime-LastUpdate) = %s secs' % (elapsedTime)) if elapsedTime > stalledTime: - self.log.info('Job %s is identified as stalled with last update > %s secs ago' % (job, elapsedTime)) + self.log.info('Job is identified as stalled', + ": jobID %d with last update > %s secs ago" % (job, elapsedTime)) return S_OK('Stalled') return S_ERROR('Job %s is running and will be ignored' % job) @@ -276,57 +281,68 @@ def __getLatestUpdateTime(self, job): """ Returns the most recent of HeartBeatTime and LastUpdateTime """ result = self.jobDB.getJobAttributes(job, ['HeartBeatTime', 'LastUpdateTime']) - if not result['OK']: - self.log.error('Failed to get job attributes', result['Message']) if not result['OK'] or not result['Value']: - self.log.error('Could not get attributes for job', '%s' % job) + self.log.error('Failed to get job attributes', + 'for job %d: %s' % (job, result['Message'] if 'Message' in result else 'empty')) return S_ERROR('Could not get attributes for job') latestUpdate = 0 if not result['Value']['HeartBeatTime'] or result['Value']['HeartBeatTime'] == 'None': - self.log.verbose('HeartBeatTime is null for job %s' % job) + self.log.verbose('HeartBeatTime is null', 'for job %s' % job) else: latestUpdate = toEpoch(fromString(result['Value']['HeartBeatTime'])) if not result['Value']['LastUpdateTime'] or result['Value']['LastUpdateTime'] == 'None': - self.log.verbose('LastUpdateTime is null for job %s' % job) + self.log.verbose('LastUpdateTime is null', 'for job %s' % job) else: latestUpdate = max(latestUpdate, toEpoch(fromString(result['Value']['LastUpdateTime']))) if not latestUpdate: return S_ERROR('LastUpdate and HeartBeat times are null for job %s' % job) else: - self.log.verbose('Latest update time from epoch for job %s is %s' % (job, latestUpdate)) + self.log.verbose('', 'Latest update time from epoch for job %s is %s' % (job, latestUpdate)) return S_OK(latestUpdate) ############################################################################# def __updateJobStatus(self, job, status, minorStatus=None): - """ This method updates the job status in the JobDB, this should only be - used to fail jobs due to the optimizer chain. + """ This method updates the job status in the JobDB """ - self.log.verbose("self.jobDB.setJobAttribute(%s,'Status','%s',update=True)" % (job, status)) - if self.am_getOption('Enable', True): - result = self.jobDB.setJobAttribute(job, 'Status', status, update=True) - else: - result = S_OK('DisabledMode') + if not self.am_getOption('Enable', True): + return S_OK('Disabled') - if result['OK']: - if minorStatus: - self.log.verbose("self.jobDB.setJobAttribute(%s,'MinorStatus','%s',update=True)" % (job, minorStatus)) - result = self.jobDB.setJobAttribute(job, 'MinorStatus', minorStatus, update=True) + toRet = S_OK() + + self.log.debug("self.jobDB.setJobAttribute(%s,'Status','%s',update=True)" % (job, status)) + result = self.jobDB.setJobAttribute(job, 'Status', status, update=True) + if not result['OK']: + self.log.error("Failed setting Status", + "%s for job %d: %s" % (status, job, result['Message'])) + toRet = result + if minorStatus: + self.log.debug("self.jobDB.setJobAttribute(%s,'MinorStatus','%s',update=True)" % (job, minorStatus)) + result = self.jobDB.setJobAttribute(job, 'MinorStatus', minorStatus, update=True) + if not result['OK']: + self.log.error("Failed setting MinorStatus", + "%s for job %d: %s" % (minorStatus, job, result['Message'])) + toRet = result if not minorStatus: # Retain last minor status for stalled jobs result = self.jobDB.getJobAttributes(job, ['MinorStatus']) if result['OK']: minorStatus = result['Value']['MinorStatus'] + else: + self.log.error("Failed getting MinorStatus", + "for job %d: %s" % (job, result['Message'])) + minorStatus = 'idem' + toRet = result - logStatus = status - result = self.logDB.addLoggingRecord(job, status=logStatus, minorStatus=minorStatus, source='StalledJobAgent') + result = self.logDB.addLoggingRecord(job, status=status, minorStatus=minorStatus, source='StalledJobAgent') if not result['OK']: - self.log.warn(result) + self.log.warn("Failed adding logging record", result['Message']) + toRet = result - return result + return toRet def __getProcessingType(self, jobID): """ Get the Processing Type from the JDL, until it is promoted to a real Attribute @@ -341,7 +357,10 @@ def __getProcessingType(self, jobID): return processingType def __sendAccounting(self, jobID): - """ Send WMS accounting data for the given job + """ + Send WMS accounting data for the given job. + + Run inside thread. """ try: accountingReport = Job() @@ -559,5 +578,3 @@ def __sendKillCommand(self, job): else: self.log.error("Failed to get ownerDN or Group for job:", "%s: %s, %s" % (job, ownerDN.get('Message', ''), ownerGroup.get('Message', ''))) - -# EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF# diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py index b21c8c7d308..249f719b928 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py @@ -33,9 +33,9 @@ def test__failSubmittingJobs(mocker): stalledJobAgent = StalledJobAgent() stalledJobAgent._AgentModule__configDefaults = mockAM + stalledJobAgent.log = gLogger stalledJobAgent.initialize() stalledJobAgent.jobDB.log = gLogger - stalledJobAgent.log = gLogger stalledJobAgent.log.setLevel('DEBUG') result = stalledJobAgent._failSubmittingJobs() @@ -57,9 +57,9 @@ def test__kickStuckJobs(mocker): stalledJobAgent = StalledJobAgent() stalledJobAgent._AgentModule__configDefaults = mockAM + stalledJobAgent.log = gLogger stalledJobAgent.initialize() stalledJobAgent.jobDB.log = gLogger - stalledJobAgent.log = gLogger stalledJobAgent.log.setLevel('DEBUG') result = stalledJobAgent._kickStuckJobs() @@ -81,9 +81,9 @@ def test__failStalledJobs(mocker): stalledJobAgent = StalledJobAgent() stalledJobAgent._AgentModule__configDefaults = mockAM + stalledJobAgent.log = gLogger stalledJobAgent.initialize() stalledJobAgent.jobDB.log = gLogger - stalledJobAgent.log = gLogger stalledJobAgent.log.setLevel('DEBUG') result = stalledJobAgent._failStalledJobs(0) @@ -105,10 +105,11 @@ def test__markStalledJobs(mocker): stalledJobAgent = StalledJobAgent() stalledJobAgent._AgentModule__configDefaults = mockAM + stalledJobAgent.log = gLogger stalledJobAgent.initialize() stalledJobAgent.jobDB.log = gLogger - stalledJobAgent.log = gLogger stalledJobAgent.log.setLevel('DEBUG') + stalledJobAgent.stalledTime = 120 result = stalledJobAgent._markStalledJobs(0) diff --git a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg index 94b67dddbe3..70279363218 100644 --- a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg @@ -142,6 +142,7 @@ Agents StalledTimeHours = 2 FailedTimeHours = 6 PollingTime = 3600 + MaxNumberOfThreads = 15 } ##END ##BEGIN JobCleaningAgent diff --git a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py index d60593769c5..e95e2121919 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py @@ -28,6 +28,7 @@ from DIRAC import S_OK, S_ERROR from DIRAC.Core.Base.DB import DB import DIRAC.Core.Utilities.Time as Time +from DIRAC.Core.Utilities import DErrno from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getCESiteMapping from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getUsernameForDN, getDNForUsername from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus @@ -300,7 +301,7 @@ def getPilotInfo(self, pilotRef=False, parentId=False, conn=False, paramNames=[] msg += " for PilotJobReference(s): %s" % pilotRef if parentId: msg += " with parent id: %s" % parentId - return S_ERROR(msg) + return S_ERROR(DErrno.EWMSNOPILOT, msg) resDict = {} pilotIDs = [] @@ -375,9 +376,10 @@ def storePilotOutput(self, pilotRef, output, error): e_error = result['Value'] req = "INSERT INTO PilotOutput (PilotID,StdOutput,StdError) VALUES (%d,%s,%s)" % (pilotID, e_output, e_error) result = self._update(req) + if not result['OK']: + return result req = "UPDATE PilotAgents SET OutputReady='True' where PilotID=%d" % pilotID - result = self._update(req) - return result + return self._update(req) ########################################################################################## def getPilotOutput(self, pilotRef): @@ -439,8 +441,7 @@ def setJobForPilot(self, jobID, pilotRef, site=None, updateStatus=True): if not result['OK']: return result req = "INSERT INTO JobToPilotMapping (PilotID,JobID,StartTime) VALUES (%d,%d,UTC_TIMESTAMP())" % (pilotID, jobID) - result = self._update(req) - return result + return self._update(req) else: return S_ERROR('PilotJobReference ' + pilotRef + ' not found') @@ -450,8 +451,7 @@ def setCurrentJobID(self, pilotRef, jobID): """ req = "UPDATE PilotAgents SET CurrentJobID=%d WHERE PilotJobReference='%s'" % (jobID, pilotRef) - result = self._update(req) - return result + return self._update(req) ########################################################################################## def getJobsForPilot(self, pilotID): diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py index 808707d17f5..067e0bd02e2 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py @@ -416,15 +416,15 @@ def execute(self): if 'DisableCPUCheck' in self.jobArgs: watchdog.testCPUConsumed = False - if exeThread.isAlive(): + if exeThread.is_alive(): self.log.info('Application thread is started in Job Wrapper') watchdog.run() else: self.log.warn('Application thread stopped very quickly...') - if exeThread.isAlive(): + if exeThread.is_alive(): self.log.warn('Watchdog exited before completion of execution thread') - while exeThread.isAlive(): + while exeThread.is_alive(): time.sleep(5) outputs = None diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py index b6b3b6eba57..635e63aab01 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/Watchdog.py @@ -195,7 +195,7 @@ def execute(self): """ The main agent execution method of the Watchdog. """ - if not self.exeThread.isAlive(): + if not self.exeThread.is_alive(): self.__getUsageSummary() self.log.info('Process to monitor has completed, Watchdog will exit.') return S_OK("Ended")