Skip to content

Commit

Permalink
Add support for configurable job uniqueness.
Browse files Browse the repository at this point in the history
This is needed for proper job array support see also pull requests
in the xdmod and xdmod-supremm and xdmod-xsede code base.
  • Loading branch information
jpwhite4 committed May 26, 2023
1 parent 7c81feb commit 84f0178
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
1 change: 1 addition & 0 deletions config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"hostname_mode": "hostname",
"pcp_log_dir": "/data/pcp-logs/my_cluster_name",
"script_dir": "/data/jobscripts/my_cluster_name",
"job_uniq_mode": "local_job_id",

// fast_index uses an alternative method of indexing job-level pcp archives which can significantly speed
// up the indexarchives.py script. The tradeoff is that the indexed archive end time is not found and the
Expand Down
23 changes: 22 additions & 1 deletion src/supremm/xdmodaccount.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ class XDMoDAcct(Accounting):
def __init__(self, resource_id, config):
super(XDMoDAcct, self).__init__(resource_id, config)

job_uniq_mode = 'local_job_id'
for _, resconfig in config.resourceconfigs():
if resconfig['resource_id'] == resource_id:
if 'job_unique_mode' in resconfig:
job_uniq_mode = resconfig['job_unique_mode']
break

self.dbsettings = config.getsection("datawarehouse")

xdmod_schema_version = self.detectXdmodSchema()
Expand All @@ -25,6 +32,13 @@ def __init__(self, resource_id, config):
jf.`job_id` AS `job_id`,
jf.`resource_id` AS `resource_id`,
COALESCE(jf.`local_job_id_raw`, jf.`local_jobid`) AS `local_job_id`,
"""
if job_uniq_mode == 'local_job_id':
self._query += "COALESCE(jf.`local_job_id_raw`, jf.`local_jobid`) as `job_uniq_id`, "
else:
self._query += "IF(jf.`local_job_array_index` = -1, jf.`local_jobid`, CONCAT(jf.`local_jobid`, '_', jf.`local_job_array_index`)) as job_uniq_id, "

self._query += """
jf.`start_time_ts` AS `start_time`,
jf.`end_time_ts` AS `end_time`,
jf.`submit_time_ts` AS `submit`,
Expand Down Expand Up @@ -71,6 +85,13 @@ def __init__(self, resource_id, config):
jf.`job_id` as `job_id`,
jf.`resource_id` as `resource_id`,
COALESCE(jf.`local_job_id_raw`, jf.`local_jobid`) as `local_job_id`,
"""
if job_uniq_mode == 'local_job_id':
self._query += "COALESCE(jf.`local_job_id_raw`, jf.`local_jobid`) as `job_uniq_id`, "
else:
self._query += "IF(jf.`local_job_array_index` = -1, jf.`local_jobid`, CONCAT(jf.`local_jobid`, '_', jf.`local_job_array_index`)) as job_uniq_id, "

self._query += """
jf.`start_time_ts` as `start_time`,
jf.`end_time_ts` as `end_time`,
jf.`submit_time_ts` as `submit`,
Expand Down Expand Up @@ -280,7 +301,7 @@ def executequery(self, query, data):
jobpk = record['job_id']
del record['job_id']
record['host_list'] = hostlist
job = Job(jobpk, str(record['local_job_id']), record)
job = Job(jobpk, str(record['job_uniq_id']), record)
job.set_nodes(hostlist)
job.set_rawarchives(hostarchives)

Expand Down

0 comments on commit 84f0178

Please sign in to comment.