-
Notifications
You must be signed in to change notification settings - Fork 0
/
scicat_archival.py
143 lines (128 loc) · 7.37 KB
/
scicat_archival.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from typing import Union, Any
from scicat_common import check_request_response
import requests
from datetime import datetime
import urllib.parse
class ArchivalMockException(Exception):
pass
# simulates dataIngestion for creating the job
def create_job(base_url: str, token: str, retrieval: bool = False, dataset_pid: str = "",
dataset_files: list[str] = [], dataset_list: Union[None, list] = None) -> None:
# NOTE: if you want to send more than one dataset per job, you must use dataset_list
# otherwise use dataset pid and dataset files for a "general case"
# NOTE: when creating an archival job, the datasetLifecycle is automatically updated by
# scicat to mark it as non-archivable!
if dataset_list is None:
job_json = {
"type": "retrieve" if retrieval else "archive",
"datasetList": [
{
"pid": dataset_pid,
"files": [dataset_files]
}
]
}
else:
job_json = {
"type": "retrieve" if retrieval else "archive",
"datasetList": dataset_list,
}
access_token = {'access_token': token}
r = requests.post(url=base_url+'/Jobs', params=access_token, json=job_json)
check_request_response(r, "can't create job")
return r.json().get('id')
# simulates node-red for forwarding jobs
def forward_job(base_url: str, token: str, job_id: str) -> list[dict]:
access_token = {'access_token': token}
r = requests.get(url=base_url+'/Jobs/datasetDetails', params={'jobId': job_id} | access_token)
check_request_response(r, "can't get job's dataset details")
dataset_list = r.json()
if not isinstance(dataset_list, list) or dataset_list == []:
raise ArchivalMockException("unexpected response or empty list: {}".format(dataset_list))
r = requests.put(url=base_url+'/Jobs/'+urllib.parse.quote(job_id, safe=''),
json={"jobStatusMessage": "jobForwarded"}, params=access_token)
check_request_response(r, "can't mark job as forwarded")
return dataset_list
def check_dataset(dataset: dict, archivable: bool = False, retrievable: bool = False):
if not isinstance(dataset, dict):
raise ArchivalMockException("dataset is invalid: {}".format(dataset))
dataset_lifecycle = dataset.get('datasetlifecycle')
dataset_id = dataset.get('pid')
if dataset_id is None or not isinstance(dataset_lifecycle, dict):
raise ArchivalMockException("dataset is invalid: {}".format(dataset))
if not ((archivable == dataset_lifecycle.get('archivable')) or \
(retrievable == dataset_lifecycle.get('retrievable'))):
raise ArchivalMockException("dataset is incompatible with desired operation: {}".format(dataset))
# simulates AREMA for archival registering
def handle_archive_job(base_url: str, token: str, job_id: str, datasets: list) -> None:
access_token = {'access_token': token}
# mark job as being handled
r = requests.put(url=base_url+'/Jobs/'+urllib.parse.quote(job_id, safe=''), params=access_token,
json={"jobStatusMessage": "inProgress"})
check_request_response(r, "can't mark job as being in progress")
for dataset in datasets:
check_dataset(dataset) # dataset integrity check
dataset_id = dataset.get('pid')
# 1 - mark datasets as being archived
r = requests.put(url=base_url+'/Datasets/'+urllib.parse.quote(dataset_id, safe=''),
params=access_token, json={"datasetlifecycle": {"archiveStatusMessage": "started"}})
check_request_response(r, "can't mark dataset as being archived")
# 2 - send datablocks
r = requests.get(url=base_url+'/Datasets/'+urllib.parse.quote(dataset_id, safe=''),
params=access_token | {"filter": '{"include": [{"relation": "origdatablocks"}]}'})
check_request_response(r, "can't get origdatablocks of dataset")
orig_datablocks = r.json().get('origdatablocks')
if not isinstance(orig_datablocks, list):
ArchivalMockException("invalid orig datablocks for dataset: {}".format(dataset_id))
for orig_datablock in orig_datablocks:
# create datablock by adapting orig datablock (it's a mock after all)
datablock = dict(orig_datablock)
datablock.pop('createdBy', None)
datablock.pop('updatedBy', None)
datablock.pop('createdAt', None)
datablock.pop('updatedAt', None)
od_id = datablock.pop('id', None)
if od_id is None:
ArchivalMockException("OrigDatablock doesn't have id for dataset {}".format(dataset_id))
datablock['archiveId'] = '/archive/{}.tar'.format(od_id)
datablock['packedSize'] = datablock.get('size')
datablock['chkAlg'] = 'sha1'
datablock['version'] = '2.0.2'
# send datablock
r = requests.post(url=base_url+'/Datablocks', params=access_token, json=datablock)
check_request_response(r, "can't create datablock for orig datablock {} in dataset {}".format(od_id,
dataset_id
))
# 3 - mark datasets as archived
r = requests.put(url=base_url+'/Datasets/'+urllib.parse.quote(dataset_id, safe=''),
params=access_token, json={"datasetlifecycle": {"retrievable": True,
"archiveStatusMessage": "datasetOnArchiveDisk"}})
check_request_response(r, "can't mark dataset as archived")
# mark job as successfully finished
r = requests.put(url=base_url+'/Jobs/'+urllib.parse.quote(job_id, safe=''),
json={"jobStatusMessage": "finishedSuccessful"},
params=access_token)
check_request_response(r, "can't mark job as finished")
return
def handle_retrieve_job(base_url: str, token: str, job_id: str, datasets: list) -> None:
access_token = {'access_token': token}
# mark job as being handled
r = requests.put(url=base_url+'/Jobs/'+urllib.parse.quote(job_id, safe=''), params=access_token,
json={"jobStatusMessage": "inProgress"})
for dataset in datasets:
check_dataset(dataset, retrievable=True) # dataset integrity check
dataset_id = dataset.get('pid')
# mark dataset as being retrieved
r = requests.put(url=base_url+'/Datasets/'+urllib.parse.quote(dataset_id, safe=''),
params=access_token, json={"datasetlifecycle": {"retrieveStatusMessage": "started"}})
check_request_response(r, "can't mark dataset as being retrieved")
# mark dataset as retrieved
r = requests.put(url=base_url+'/Datasets/'+urllib.parse.quote(dataset_id, safe=''),
params=access_token, json={"datasetlifecycle": {"retrieveStatusMessage": "datasetRetrieved"}})
check_request_response(r, "can't mark dataset as retrieved")
# mark job as successfully finished
r = requests.put(url=base_url+'/Jobs/'+urllib.parse.quote(job_id, safe=''),
json={"jobStatusMessage": "finishedSuccessful"},
params=access_token)
check_request_response(r, "can't mark job as finished")
pass