Skip to content

Commit

Permalink
Merge remote-tracking branch 'release/rel-v7r2' into rel-v7r2
Browse files Browse the repository at this point in the history
  • Loading branch information
atsareg committed Mar 30, 2021
2 parents e4d3f9c + a753e2f commit aa53b82
Show file tree
Hide file tree
Showing 44 changed files with 541 additions and 858 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
# CLIENT_USE_PYTHON3: No
- TEST_NAME: "Python 3 client"
CLIENT_USE_PYTHON3: Yes
CLIENT_DIRACOSVER: 2.0a8
CLIENT_DIRACOSVER: latest

steps:
- uses: actions/checkout@v2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Systems / Framework / <INSTANCE> / Databases - Sub-subsection
===============================================================

Databases used by DataManagement System. Note that each database is a separate subsection.
Databases used by Framework System. Note that each database is a separate subsection.

+--------------------------------+----------------------------------------------+----------------------+
| **Name** | **Description** | **Example** |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ in the order of preference of the option resolution:
This is the file in the user's home directory with the *CFG* format

*$DIRACROOT/etc/dirac.cfg*
This is the configuration file in the root directory of the DIRAC installation
This is the configuration file in the root directory of the DIRAC installation ($DIRACROOT is set in python2 installations)

*Configuration Service*
Configuration data available from the global DIRAC Configuration Service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ Show setup command allows administrators to know which components, Services and
'PilotStatusAgent',
'JobCleaningAgent',
'StatesAccountingAgent']},
'Services': {'Accounting': ['ReportGenerator', 'DataStore'],
'Services': {'Accounting': ['ReportGenerator',
'DataStore'],
'Configuration': ['Server'],
'Framework': ['Monitoring',
'BundleDelivery',
Expand Down
65 changes: 57 additions & 8 deletions docs/source/DeveloperGuide/Systems/WorkloadManagement/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,68 @@
Workload Management System
===========================

------------
--------
Overview
------------
--------

The system provides high user jobs efficiency, hiding the heterogeneity of the the underlying computing resources.
The system provides high user jobs efficiency, hiding the heterogeneity of the underlying computing resources.

It realizes the task scheduling paradigm with Generic Pilot Jobs (or Agents).
It realizes the task scheduling paradigm with Generic Pilot Jobs.
This task scheduling method solves many problems of using unstable distributed computing resources which are available in computing grids.

------------
Architecture
------------
----------------
DIRAC JobWrapper
----------------

The JobAgent is creating a file that is made from the JobWrapperTemplate.py file.
It creates a temporary file using this file as a template, which becomes Wrapper_<jobID> somewhere in the workDirectory.
It is this file that is then submitted as a real "job wrapper script".

The JobWrapper is not a job wrapper, but is an object that is used by the job wrapper
(i.e. the JobWrapperTemplate’s execute() method) to actually do the work.

The only change made in the "template" file is the following:
wrapperTemplate = wrapperTemplate.replace( "@SITEPYTHON@", str( siteRoot ) )

Then the file is submitted in bash using the defined CE (the InProcessCE in the default case)

The sequence executed is ("job" is the JobWrapper object here ;-) ):

.. code-block:: python
job.initialize( arguments )
#[…]
result = job.transferInputSandbox( arguments['Job']['InputSandbox'] )
#[…]
result = job.resolveInputData()
#[…]
result = job.execute( arguments )
#[…]
result = job.processJobOutputs( arguments )
#[…]
return job.finalize( arguments )
The watchdog is started in job.execute().
A direct consequence is that the time taken to download the input files is not taken into account for the WallClock time.

A race condition might happen inside this method.
The problem here is that we submit the process in detached mode (or in a thread, not clear as here thread may be used for process),
wait 10 seconds and expect it to be started.
If this fails, the JobWrapperTemplate gives up, but if however the detached process runs, it continues executing as if nothing happened!
It is there that there is the famous gJobReport.setJobStatus( 'Failed', 'Exception During Execution', sendFlag = False )
which is sometimes causing jobs to go to "Failed" and then continue.

There is a nice "feature" of this complex cascade which is that the jobAgent reports "Job submitted as ..."
(meaning the job was submitted to the local CE, i.e. the InProcessCE in our case) _after_ the "job" is actually executed!!!

The JobWrapper can also interpret error codes from the application itself.
An error code is, for example, the DErrno.WMSRESC (1502) error code, which will instruct the JobWrapperTemplate to reschedule
the current job.


-------------------
Server Architecture
-------------------

It is based on layered architecture and is based on DIRAC architecture:

Expand All @@ -25,7 +75,6 @@ It is based on layered architecture and is based on DIRAC architecture:
* JobManagerHandler
* JobMonitoringHandler
* JobPolicy
* JobStateSyncHandler
* JobStateUpdateHandler
* MatcherHandler
* OptimizationMindHandler
Expand Down
48 changes: 0 additions & 48 deletions docs/source/DeveloperGuide/WorkloadManagementSystem/index.rst

This file was deleted.

1 change: 0 additions & 1 deletion docs/source/DeveloperGuide/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,5 @@ group.
REST/index
WebAppDIRAC/index
Internals/index
WorkloadManagementSystem/index
Externals/index
TornadoServices/index
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ console_scripts =
dirac-prod-start = DIRAC.ProductionSystem.scripts.dirac_prod_start:main [admin]
dirac-prod-stop = DIRAC.ProductionSystem.scripts.dirac_prod_stop:main [admin]
# Resources
dirac-resource-get-parameters = DIRAC.Resources.scripts.dirac_resource_get_parameters:main [admin]
dirac-resource-get-parameters = DIRAC.Resources.scripts.dirac_resource_get_parameters:main [admin, pilot]
dirac-resource-info = DIRAC.Resources.scripts.dirac_resource_info:main [admin]
# RequestManagementSystem
dirac-rms-list-req-cache = DIRAC.RequestManagementSystem.scripts.dirac_rms_list_req_cache:main
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def _plotPacketLossRate(self, reportRequest, plotInfo, filename):

# prepare custom scale (10,20,...,100)
scale_data = dict(zip(range(0, 101), range(100, -1, -1)))
scale_ticks = range(0, 101, 10)
scale_ticks = list(range(0, 101, 10))

metadata = {'title': 'Packet loss rate by %s' % reportRequest['grouping'],
'starttime': reportRequest['startTime'],
Expand Down
13 changes: 7 additions & 6 deletions src/DIRAC/Core/Utilities/Graphs/GraphUtilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,11 @@ def _set_orderOfMagnitude(self, range):

class PrettyDateFormatter(AutoDateFormatter):
""" This class provides a formatter which conforms to the
desired date formates for the Phedex system.
desired date formats for the Phedex system.
"""

def __init__(self, locator):
"""Format dates according to the Phedex system"""
tz = pytz.timezone('UTC')
AutoDateFormatter.__init__(self, locator, tz=tz)

Expand Down Expand Up @@ -289,7 +290,7 @@ def get_locator(self, dmin, dmax):
self._freq = YEARLY
elif numMonths >= numticks:
self._freq = MONTHLY
bymonth = range(1, 13)
bymonth = list(range(1, 13))
if (0 <= numMonths) and (numMonths <= 14):
interval = 1 # show every month
elif (15 <= numMonths) and (numMonths <= 29):
Expand All @@ -301,7 +302,7 @@ def get_locator(self, dmin, dmax):
elif numDays >= numticks:
self._freq = DAILY
bymonth = None
bymonthday = range(1, 32)
bymonthday = list(range(1, 32))
if (0 <= numDays) and (numDays <= 9):
interval = 1 # show every day
elif (10 <= numDays) and (numDays <= 19):
Expand All @@ -316,7 +317,7 @@ def get_locator(self, dmin, dmax):
self._freq = HOURLY
bymonth = None
bymonthday = None
byhour = range(0, 24) # show every hour
byhour = list(range(0, 24)) # show every hour
if (0 <= numHours) and (numHours <= 14):
interval = 1 # show every hour
elif (15 <= numHours) and (numHours <= 30):
Expand All @@ -334,7 +335,7 @@ def get_locator(self, dmin, dmax):
bymonth = None
bymonthday = None
byhour = None
byminute = range(0, 60)
byminute = list(range(0, 60))
if numMinutes > (10.0 * numticks):
interval = 10
# end if
Expand All @@ -344,7 +345,7 @@ def get_locator(self, dmin, dmax):
bymonthday = None
byhour = None
byminute = None
bysecond = range(0, 60)
bysecond = list(range(0, 60))
if numSeconds > (10.0 * numticks):
interval = 10
# end if
Expand Down
4 changes: 3 additions & 1 deletion src/DIRAC/Core/scripts/dirac_deploy_scripts.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#!/usr/bin/env python
"""
Deploy all scripts and extensions.
Deploy all scripts and extensions for python2 installations.
Useless in python3.
This script is not meant to be called by users (it's automatically called by dirac-install).
Usage:
Expand Down
4 changes: 2 additions & 2 deletions src/DIRAC/DataManagementSystem/Client/FileCatalogClientCLI.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ def do_ancestor(self, args):
depth = [1]
if len(argss) > 1:
depth = int(argss[1])
depth = range(1, depth + 1)
depth = list(range(1, depth + 1))

try:
result = self.fc.getFileAncestors([lfn], depth)
Expand Down Expand Up @@ -761,7 +761,7 @@ def do_descendent(self, args):
depth = [1]
if len(argss) > 1:
depth = int(argss[1])
depth = range(1, depth + 1)
depth = list(range(1, depth + 1))

try:
result = self.fc.getFileDescendents([lfn], depth)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ def _setDirectoryMode(self, path, mode, recursive=False):
"""
return self._setDirectoryParameter(path, 'Mode', mode, recursive=recursive)

def __getLogicalSize(self, lfns, ps_name, connection):
def __getLogicalSize(self, lfns, ps_name, recursiveSum=True, connection=None):
successful = {}
failed = {}
for path in lfns:
Expand All @@ -555,7 +555,7 @@ def __getLogicalSize(self, lfns, ps_name, connection):
continue

dirID = result['Value']
result = self.db.executeStoredProcedureWithCursor(ps_name, (dirID, ))
result = self.db.executeStoredProcedureWithCursor(ps_name, (dirID, recursiveSum))

if not result['OK']:
failed[path] = result['Message']
Expand All @@ -575,17 +575,21 @@ def __getLogicalSize(self, lfns, ps_name, connection):

return S_OK({'Successful': successful, 'Failed': failed})

def _getDirectoryLogicalSizeFromUsage(self, lfns, connection):
def _getDirectoryLogicalSizeFromUsage(self, lfns, recursiveSum=True, connection=None):
""" Get the total "logical" size of the requested directories
"""
return self.__getLogicalSize(lfns, 'ps_get_dir_logical_size', connection)
return self.__getLogicalSize(lfns, 'ps_get_dir_logical_size', recursiveSum=recursiveSum, connection=connection)

def _getDirectoryLogicalSize(self, lfns, connection):
def _getDirectoryLogicalSize(self, lfns, recursiveSum=True, connection=None):
""" Get the total "logical" size of the requested directories
"""
return self.__getLogicalSize(lfns, 'ps_calculate_dir_logical_size', connection)
return self.__getLogicalSize(
lfns,
'ps_calculate_dir_logical_size',
recursiveSum=recursiveSum,
connection=connection)

def __getPhysicalSize(self, lfns, ps_name, connection):
def __getPhysicalSize(self, lfns, ps_name, recursiveSum=True, connection=None):
""" Get the total size of the requested directories
"""

Expand All @@ -601,8 +605,7 @@ def __getPhysicalSize(self, lfns, ps_name, connection):
continue
dirID = result['Value']

result = self.db.executeStoredProcedureWithCursor(ps_name, (dirID, ))

result = self.db.executeStoredProcedureWithCursor(ps_name, (dirID, recursiveSum))
if not result['OK']:
failed[path] = result['Message']
continue
Expand All @@ -624,15 +627,15 @@ def __getPhysicalSize(self, lfns, ps_name, connection):

return S_OK({'Successful': successful, 'Failed': failed})

def _getDirectoryPhysicalSizeFromUsage(self, lfns, connection):
def _getDirectoryPhysicalSizeFromUsage(self, lfns, recursiveSum=True, connection=None):
""" Get the total size of the requested directories
"""
return self.__getPhysicalSize(lfns, 'ps_get_dir_physical_size', connection)
return self.__getPhysicalSize(lfns, 'ps_get_dir_physical_size', recursiveSum=recursiveSum, connection=connection)

def _getDirectoryPhysicalSize(self, lfns, connection):
def _getDirectoryPhysicalSize(self, lfns, recursiveSum=True, connection=None):
""" Get the total size of the requested directories
"""
return self.__getPhysicalSize(lfns, 'ps_calculate_dir_physical_size', connection)
return self.__getPhysicalSize(lfns, 'ps_calculate_dir_physical_size', recursiveSum=recursiveSum, connection=None)

def _changeDirectoryParameter(self, paths,
directoryFunction,
Expand Down
Loading

0 comments on commit aa53b82

Please sign in to comment.