Skip to content

Commit

Permalink
Merge pull request #5073 from fstagni/v7r2-fixes35
Browse files Browse the repository at this point in the history
[v7r2] implementing bulk indexing for jobs parameters
  • Loading branch information
Andrei Tsaregorodtsev authored Apr 14, 2021
2 parents 66539d6 + 1a0b0ee commit ac36c3e
Show file tree
Hide file tree
Showing 17 changed files with 248 additions and 189 deletions.
82 changes: 44 additions & 38 deletions src/DIRAC/Core/Utilities/ElasticSearchDB.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""
This class a wrapper around elasticsearch-py. It is used to query
Elasticsearch database.
This class a wrapper around elasticsearch-py.
It is used to query Elasticsearch instances.
"""

from __future__ import absolute_import
Expand Down Expand Up @@ -45,6 +44,39 @@ def wrapper_decorator(self, *args, **kwargs):
return wrapper_decorator


def generateDocs(data, withTimeStamp=True):
""" Generator for fast bulk indexing, yields docs
:param list data: list of dictionaries
:param bool withTimeStamp: add the timestamps to the docs
:return: doc
"""
for doc in data:
if "_type" not in doc:
doc['_type'] = "_doc"
if withTimeStamp:
if 'timestamp' not in doc:
sLog.warn("timestamp is not given")

# if the timestamp is not provided, we use the current utc time.
timestamp = doc.get('timestamp', int(Time.toEpoch()))
try:
if isinstance(timestamp, datetime):
doc['timestamp'] = int(timestamp.strftime('%s')) * 1000
elif isinstance(timestamp, six.string_types):
timeobj = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f')
doc['timestamp'] = int(timeobj.strftime('%s')) * 1000
else: # we assume the timestamp is an unix epoch time (integer).
doc['timestamp'] = timestamp * 1000
except (TypeError, ValueError) as e:
# in case we are not able to convert the timestamp to epoch time....
sLog.error("Wrong timestamp", e)
doc['timestamp'] = int(Time.toEpoch()) * 1000

sLog.debug("yielding %s" % doc)
yield doc

class ElasticSearchDB(object):

"""
Expand All @@ -55,7 +87,6 @@ class ElasticSearchDB(object):
:param int timeout: the default time out to Elasticsearch
:param int RESULT_SIZE: The number of data points which will be returned by the query.
"""
__chunk_size = 1000
__url = ""
__timeout = 120
clusterName = ''
Expand Down Expand Up @@ -314,7 +345,7 @@ def deleteIndex(self, indexName):
sLog.info("Deleting index", indexName)
try:
retVal = self.client.indices.delete(indexName)
except NotFoundError as e:
except NotFoundError:
sLog.warn("Index does not exist", indexName)
return S_OK("Noting to delete")
except ValueError as e:
Expand Down Expand Up @@ -356,13 +387,15 @@ def index(self, indexName, body=None, docID=None):
return S_ERROR(res)

@ifConnected
def bulk_index(self, indexPrefix, data=None, mapping=None, period='day'):
def bulk_index(self, indexPrefix, data=None, mapping=None, period='day', withTimeStamp=True):
"""
:param str indexPrefix: index name.
:param list data: contains a list of dictionary
:param dict mapping: the mapping used by elasticsearch
:param str period: We can specify which kind of indexes will be created.
Currently only daily and monthly indexes are supported.
:param str period: Accepts 'day' and 'month'. We can specify which kind of indexes will be created.
:param bool withTimeStamp: add timestamp to data, if not there already.
:returns: S_OK/S_ERROR
"""
sLog.verbose("Bulk indexing", "%d records will be inserted" % len(data))
if mapping is None:
Expand All @@ -381,45 +414,18 @@ def bulk_index(self, indexPrefix, data=None, mapping=None, period='day'):
retVal = self.createIndex(indexPrefix, mapping, period)
if not retVal['OK']:
return retVal
docs = []
for row in data:
body = {
'_index': indexName,
'_source': {},
'_type': '_doc'
}
body['_source'] = row

if 'timestamp' not in row:
sLog.warn("timestamp is not given! Note: the actual time is used!")

# if the timestamp is not provided, we use the current utc time.
timestamp = row.get('timestamp', int(Time.toEpoch()))
try:
if isinstance(timestamp, datetime):
body['_source']['timestamp'] = int(timestamp.strftime('%s')) * 1000
elif isinstance(timestamp, six.string_types):
timeobj = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f')
body['_source']['timestamp'] = int(timeobj.strftime('%s')) * 1000
else: # we assume the timestamp is an unix epoch time (integer).
body['_source']['timestamp'] = timestamp * 1000
except (TypeError, ValueError) as e:
# in case we are not able to convert the timestamp to epoch time....
sLog.error("Wrong timestamp", e)
body['_source']['timestamp'] = int(Time.toEpoch()) * 1000
docs += [body]
try:
res = bulk(self.client, docs, chunk_size=self.__chunk_size)
res = bulk(client=self.client, index=indexName, actions=generateDocs(data, withTimeStamp))
except (BulkIndexError, RequestError) as e:
sLog.exception()
return S_ERROR(e)

if res[0] == len(docs):
if res[0] == len(data):
# we have inserted all documents...
return S_OK(len(docs))
return S_OK(len(data))
else:
return S_ERROR(res)
return res

@ifConnected
def getUniqueValue(self, indexName, key, orderBy=False):
Expand Down
4 changes: 2 additions & 2 deletions src/DIRAC/FrameworkSystem/Client/ComponentInstaller.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,14 +582,14 @@ def removeComponentOptionsFromCS(self, system, component, mySetup=None):
cType = installation['Component']['Type']

# Is the component a rename of another module?
if installation['Instance'] == installation['Component']['Module']:
if installation['Instance'] == installation['Component']['DIRACModule']:
isRenamed = False
else:
isRenamed = True

result = self.monitoringClient.getInstallations(
{'UnInstallationTime': None},
{'DIRACSystem': system, 'DIRACModule': installation['Component']['Module']},
{'DIRACSystem': system, 'DIRACModule': installation['Component']['DIRACModule']},
{},
True)
if not result['OK']:
Expand Down
3 changes: 2 additions & 1 deletion src/DIRAC/RequestManagementSystem/Client/ReqClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ def getRequestIDsForJobs(self, jobIDs):
:return: S_ERROR or S_OK( "Successful": { jobID1: reqID1, jobID2: requID2, ... },
"Failed" : { jobIDn: errMsg, jobIDm: errMsg, ...} )
"""
self.log.verbose("getRequestIDsForJobs: attempt to get request(s) for job %s" % jobIDs)
self.log.verbose("getRequestIDsForJobs: attempt to get request(s) for jobs",
"(n=%d)" % len(jobIDs))
res = self._getRPC().getRequestIDsForJobs(jobIDs)
if not res["OK"]:
self.log.error("getRequestIDsForJobs: unable to get request(s) for jobs",
Expand Down
96 changes: 29 additions & 67 deletions src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,32 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

__RCSID__ = "$Id$"

import time
import os

from DIRAC import S_OK
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.File import File
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
from DIRAC.WorkloadManagementSystem.Client.SandboxStoreClient import SandboxStoreClient
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.File import File
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient

import DIRAC.Core.Utilities.Time as Time


class JobCleaningAgent(AgentModule):
"""
The specific agents must provide the following methods:
* initialize() for initial settings
* beginExecution()
* execute() - the main method called in the agent cycle
* endExecution()
* finalize() - the graceful exit of the method, this one is usually used for the agent restart
Agent for removing jobs in status "Deleted", and not only
"""

def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -98,8 +94,6 @@ def initialize(self):
self.log.info("Will exclude the following Production types from cleaning %s" % (
', '.join(self.prodTypes)))
self.maxJobsAtOnce = self.am_getOption('MaxJobsAtOnce', 500)
self.jobByJob = self.am_getOption('JobByJob', False)
self.throttlingPeriod = self.am_getOption('ThrottlingPeriod', 0.)

self.removeStatusDelay['Done'] = self.am_getOption('RemoveStatusDelay/Done', 7)
self.removeStatusDelay['Killed'] = self.am_getOption('RemoveStatusDelay/Killed', 7)
Expand All @@ -126,12 +120,11 @@ def _getAllowedJobTypes(self):
self.log.notice("JobTypes to clean %s" % cleanJobTypes)
return S_OK(cleanJobTypes)

#############################################################################
def execute(self):
""" Remove jobs in various status
"""
# Delete jobs in "Deleted" state
result = self.removeJobsByStatus({'Status': 'Deleted'})
result = self.removeJobsByStatus({'Status': JobStatus.DELETED})
if not result['OK']:
return result

Expand Down Expand Up @@ -179,16 +172,25 @@ def removeJobsByStatus(self, condDict, delay=False):
if not result['OK']:
return result

jobList = result['Value']
jobList = [int(jID) for jID in result['Value']]
if len(jobList) > self.maxJobsAtOnce:
jobList = jobList[:self.maxJobsAtOnce]
if not jobList:
return S_OK()

self.log.notice("Deleting %s jobs for %s" % (len(jobList), condDict))
self.log.notice("Attempting to delete jobs", "(%d for %s)" % (len(jobList), condDict))

# remove from jobList those that have still Operations to do in RMS
res = ReqClient().getRequestIDsForJobs(jobList)
if not res['OK']:
return res
if res['Value']['Successful']:
self.log.warn("Some jobs won't be removed, as still having Requests to complete",
"(n=%d)" % len(res['Value']['Successful']))
jobList = list(set(jobList).difference(set(res['Value']['Successful'])))
if not jobList:
return S_OK()

count = 0
error_count = 0
result = SandboxStoreClient(useCertificates=True).unassignJobs(jobList)
if not result['OK']:
self.log.error("Cannot unassign jobs to sandboxes", result['Message'])
Expand All @@ -203,54 +205,14 @@ def removeJobsByStatus(self, condDict, delay=False):
failedJobs = result['Value']['Failed']
for job in failedJobs:
jobList.pop(jobList.index(job))
if not jobList:
return S_OK()

# TODO: we should not remove a job if it still has requests in the RequestManager.
# But this logic should go in the client or in the service, and right now no service expose jobDB.removeJobFromDB

if self.jobByJob:
for jobID in jobList:
resultJobDB = self.jobDB.removeJobFromDB(jobID)
resultTQ = self.taskQueueDB.deleteJob(jobID)
resultLogDB = self.jobLoggingDB.deleteJob(jobID)
errorFlag = False
if not resultJobDB['OK']:
self.log.warn('Failed to remove job %d from JobDB' % jobID, result['Message'])
errorFlag = True
if not resultTQ['OK']:
self.log.warn('Failed to remove job %d from TaskQueueDB' % jobID, result['Message'])
errorFlag = True
if not resultLogDB['OK']:
self.log.warn('Failed to remove job %d from JobLoggingDB' % jobID, result['Message'])
errorFlag = True
if errorFlag:
error_count += 1
else:
count += 1
if self.throttlingPeriod:
time.sleep(self.throttlingPeriod)
else:
result = self.jobDB.removeJobFromDB(jobList)
if not result['OK']:
self.log.error('Failed to delete %d jobs from JobDB' % len(jobList))
else:
self.log.info('Deleted %d jobs from JobDB' % len(jobList))

for jobID in jobList:
resultTQ = self.taskQueueDB.deleteJob(jobID)
if not resultTQ['OK']:
self.log.warn('Failed to remove job %d from TaskQueueDB' % jobID, resultTQ['Message'])
error_count += 1
else:
count += 1

result = self.jobLoggingDB.deleteJob(jobList)
if not result['OK']:
self.log.error('Failed to delete %d jobs from JobLoggingDB' % len(jobList))
else:
self.log.info('Deleted %d jobs from JobLoggingDB' % len(jobList))
result = JobManagerClient().removeJob(jobList)
if not result['OK']:
self.log.error("Could not remove jobs", result['Message'])
return result

if count > 0 or error_count > 0:
self.log.info('Deleted %d jobs from JobDB, %d errors' % (count, error_count))
return S_OK()

def deleteJobOversizedSandbox(self, jobIDList):
Expand All @@ -260,7 +222,7 @@ def deleteJobOversizedSandbox(self, jobIDList):
failed = {}
successful = {}

result = JobMonitoringClient().getJobParameters(jobIDList, 'OutputSandboxLFN')
result = JobMonitoringClient().getJobParameters(jobIDList, ['OutputSandboxLFN'])
if not result['OK']:
return result
osLFNDict = result['Value']
Expand Down
Loading

0 comments on commit ac36c3e

Please sign in to comment.