Skip to content

Commit

Permalink
fix naming
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Mar 30, 2021
1 parent 4b9e538 commit 25d07ab
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/DIRAC/Core/Utilities/MySQL.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ def clean(self, now=False):
now = time.time()
self.__lastClean = now
for thid in list(self.__assigned):
if not thid.is_alive():
if not thid.is_alive():
self.__pop(thid)
try:
data = self.__assigned[thid]
Expand Down
60 changes: 30 additions & 30 deletions src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def initialize(self):
self.log.info('Stalled Job Agent running in disabled mode')

# setting up the threading
maxNumberOfThreads = self.am_getOption('maxNumberOfThreads', 15)
maxNumberOfThreads = self.am_getOption('MaxNumberOfThreads', 15)
threadPool = ThreadPool(maxNumberOfThreads, maxNumberOfThreads)
self.log.verbose("Multithreaded with %d threads" % maxNumberOfThreads)

Expand Down Expand Up @@ -106,8 +106,8 @@ def execute(self):
self.failedTime = int(watchdogCycle * (failedTime + 0.5))

self.minorStalledStatuses = (
JobMinorStatus.STALLED_PILOT_NOT_RUNNING,
'Stalling for more than %d sec' % self.failedTime)
JobMinorStatus.STALLED_PILOT_NOT_RUNNING,
'Stalling for more than %d sec' % self.failedTime)

# Now we are getting what's going to be checked

Expand All @@ -117,15 +117,15 @@ def execute(self):
checkedStatuses = [JobStatus.RUNNING, JobStatus.COMPLETING]
# Only get jobs whose HeartBeat is older than the stalledTime
result = self.jobDB.selectJobs({'Status': checkedStatuses},
older=checkTime, timeStamp='HeartBeatTime')
older=checkTime, timeStamp='HeartBeatTime')
if not result['OK']:
self.log.error("Issue selecting %s jobs" % ' & '.join(checkedStatuses), result['Message'])
if result['Value']:
jobs = sorted(result['Value'])
self.log.info('%s jobs will be checked for being stalled' % ' & '.join(checkedStatuses),
'(n=%d, heartbeat before %s)' % (len(jobs), str(checkTime)))
'(n=%d, heartbeat before %s)' % (len(jobs), str(checkTime)))
for job in jobs:
self.jobsQueue.put('%s:_markStalledJobs' % job)
self.jobsQueue.put('%s:_markStalledJobs' % job)

# 2) Queueing the Stalled jobs that might be marked Failed
result = self.jobDB.selectJobs({'Status': JobStatus.STALLED})
Expand All @@ -135,18 +135,18 @@ def execute(self):
jobs = sorted(result['Value'])
self.log.info('Jobs Stalled will be checked for failure', '(n=%d)' % len(jobs))
for job in jobs:
self.jobsQueue.put('%s:_failStalledJobs' % job)
self.jobsQueue.put('%s:_failStalledJobs' % job)

# 3) Send accounting
for minor in self.minorStalledStatuses:
result = self.jobDB.selectJobs({'Status': JobStatus.FAILED, 'MinorStatus': minor, 'AccountedFlag': 'False'})
if not result['OK']:
self.log.error("Issue selecting jobs for accounting", result['Message'])
self.log.error("Issue selecting jobs for accounting", result['Message'])
if result['Value']:
jobs = result['Value']
self.log.info('Stalled jobs will be Accounted', '(n=%d)' % (len(jobs)))
for job in jobs:
self.jobsQueue.put('%s:__sendAccounting' % job)
jobs = result['Value']
self.log.info('Stalled jobs will be Accounted', '(n=%d)' % (len(jobs)))
for job in jobs:
self.jobsQueue.put('%s:__sendAccounting' % job)

# From here on we don't use the threads

Expand All @@ -172,8 +172,8 @@ def _execute(self):
jobID = int(jobID)
res = getattr(self, '%s' % jobOp)(jobID)
if not res['OK']:
self.log.error("Failure executing %s" % jobOp,
"on %d: %s" % (jobID, res['Message']))
self.log.error("Failure executing %s" % jobOp,
"on %d: %s" % (jobID, res['Message']))

#############################################################################
def _markStalledJobs(self, jobID):
Expand Down Expand Up @@ -211,7 +211,7 @@ def _failStalledJobs(self, jobID):
result = self.__getJobPilotStatus(jobID)
if not result['OK']:
self.log.error('Failed to get pilot status',
"for job %d: %s" % (jobID, result['Message']))
"for job %d: %s" % (jobID, result['Message']))
return result
pilotStatus = result['Value']
if pilotStatus != "Running":
Expand All @@ -220,12 +220,12 @@ def _failStalledJobs(self, jobID):
# Verify that there was no sign of life for long enough
result = self.__getLatestUpdateTime(jobID)
if not result['OK']:
self.log.error('Failed to get job update time',
"for job %d: %s" % (jobID, result['Message']))
self.log.error('Failed to get job update time',
"for job %d: %s" % (jobID, result['Message']))
return result
elapsedTime = toEpoch() - result['Value']
if elapsedTime > self.failedTime:
setFailed = self.minorStalledStatuses[1]
setFailed = self.minorStalledStatuses[1]

# Set the jobs Failed, send them a kill signal in case they are not really dead
# and send accounting info
Expand All @@ -249,10 +249,10 @@ def __getJobPilotStatus(self, jobID):
result = PilotManagerClient().getPilotInfo(pilotReference)
if not result['OK']:
if DErrno.cmpError(result, DErrno.EWMSNOPILOT):
self.log.warn("No pilot found", "for job %d: %s" % (jobID, result['Message']))
self.log.warn("No pilot found", "for job %d: %s" % (jobID, result['Message']))
return S_OK('NoPilot')
self.log.error('Failed to get pilot information',
'for job %d: %s' % (jobID, result['Message']))
'for job %d: %s' % (jobID, result['Message']))
return result
pilotStatus = result['Value'][pilotReference]['Status']

Expand All @@ -271,7 +271,7 @@ def __checkJobStalled(self, job, stalledTime):
self.log.debug('(CurrentTime-LastUpdate) = %s secs' % (elapsedTime))
if elapsedTime > stalledTime:
self.log.info('Job is identified as stalled',
": jobID %d with last update > %s secs ago" % (job, elapsedTime))
": jobID %d with last update > %s secs ago" % (job, elapsedTime))
return S_OK('Stalled')

return S_ERROR('Job %s is running and will be ignored' % job)
Expand All @@ -283,7 +283,7 @@ def __getLatestUpdateTime(self, job):
result = self.jobDB.getJobAttributes(job, ['HeartBeatTime', 'LastUpdateTime'])
if not result['OK'] or not result['Value']:
self.log.error('Failed to get job attributes',
'for job %d: %s' % (job, result['Message'] if 'Message' in result else 'empty'))
'for job %d: %s' % (job, result['Message'] if 'Message' in result else 'empty'))
return S_ERROR('Could not get attributes for job')

latestUpdate = 0
Expand Down Expand Up @@ -317,25 +317,25 @@ def __updateJobStatus(self, job, status, minorStatus=None):
result = self.jobDB.setJobAttribute(job, 'Status', status, update=True)
if not result['OK']:
self.log.error("Failed setting Status",
"%s for job %d: %s" % (status, job, result['Message']))
"%s for job %d: %s" % (status, job, result['Message']))
toRet = result
if minorStatus:
self.log.debug("self.jobDB.setJobAttribute(%s,'MinorStatus','%s',update=True)" % (job, minorStatus))
result = self.jobDB.setJobAttribute(job, 'MinorStatus', minorStatus, update=True)
if not result['OK']:
self.log.error("Failed setting MinorStatus",
"%s for job %d: %s" % (minorStatus, job, result['Message']))
toRet = result
self.log.error("Failed setting MinorStatus",
"%s for job %d: %s" % (minorStatus, job, result['Message']))
toRet = result

if not minorStatus: # Retain last minor status for stalled jobs
result = self.jobDB.getJobAttributes(job, ['MinorStatus'])
if result['OK']:
minorStatus = result['Value']['MinorStatus']
else:
self.log.error("Failed getting MinorStatus",
"for job %d: %s" % (job, result['Message']))
minorStatus = 'idem'
toRet = result
self.log.error("Failed getting MinorStatus",
"for job %d: %s" % (job, result['Message']))
minorStatus = 'idem'
toRet = result

result = self.logDB.addLoggingRecord(job, status=status, minorStatus=minorStatus, source='StalledJobAgent')
if not result['OK']:
Expand Down
2 changes: 1 addition & 1 deletion src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ Agents
StalledTimeHours = 2
FailedTimeHours = 6
PollingTime = 3600
maxNumberOfThreads = 15
MaxNumberOfThreads = 15
}
##END
##BEGIN JobCleaningAgent
Expand Down

0 comments on commit 25d07ab

Please sign in to comment.