forked from artefactual-labs/archivematica-acceptance-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
/
archivematicaselenium.py
2144 lines (1955 loc) · 95 KB
/
archivematicaselenium.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Archivematica Selenium.
This module contains the ``ArchivematicaSelenium`` class that provides special
methods for using Selenium to interact with the Archivematica dashboard.
Instances of this class can be used to write acceptance tests. A typical test
would initiate a transfer of a specified data set and then make assertions
about the output from one or more micro-services operating on that data set.
Example usage::
def test_feature(self):
transfer_uuid = start_transfer(
'home/vagrant/archivematica-sampledata/SampleTransfers/BagTransfer',
'My_Transfer')
validation_job = self.parse_job('Validate formats', transfer_uuid)
# Make assertions using the ``validation_job`` dict, e.g.,
assert job.get('job_output') == 'Completed successfully'
"Public" methods:
- login
- start_transfer
- parse_job
- parse_normalization_report
- get_sip_uuid
- get_mets
- upload_policy
- change_normalization_rule_command
- remove_all_transfers
- remove_all_ingests
Tested using Selenium's Chrome and Firefox webdrivers.
Dependencies:
- selenium
- lxml
Test environments where this module has been tested and has worked:
1. Ubuntu 16.04
Firefox 48.0
Selenium 2.53.6
Python 3.5.1
Archivematica dev/issue-10133-ingest-policy-check-good
Storage Service qa/0.x
2. Firefox 47.01 (*note* does not work on v. 48.0)
Mac OS X 10.10.5
Selenium 2.53.6
Python 3.4.2
3. Chrome 52.0.2743.116 (64-bit) -- TODO: has stopped working!
Mac OS X 10.10.5
Selenium 2.53.6
Python 3.4.2
WARNING: this will *not* currently work with a headless PhantomJS() webdriver.
With PhantomJS, it can login, but when it attempts to use the interface for
selecting a transfer folder it times out when waiting for the 'home' folder to
become visible. See ``navigate_to_transfer_directory_and_click``.
"""
import json
import logging
from lxml import etree
import os
import pprint
import requests
import shlex
import shutil
import sys
import subprocess
import time
import uuid
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import (
TimeoutException, WebDriverException, StaleElementReferenceException,
NoSuchElementException, MoveTargetOutOfBoundsException)
from selenium.webdriver.common.action_chains import ActionChains
logger = logging.getLogger(__file__)
log_filename, _ = os.path.splitext(os.path.basename(__file__))
log_filename = log_filename + '.log'
log_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), log_filename)
handler = logging.FileHandler(log_path)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
# Assuming we don't switch JS frameworks :), DOM selectors should be constants.
SELECTOR_INPUT_TRANSFER_NAME = 'input[ng-model="vm.transfer.name"]'
SELECTOR_INPUT_TRANSFER_ACCESSION = 'input[ng-model="vm.transfer.accession"]'
SELECTOR_DIV_TRANSFER_SOURCE_BROWSE = 'div.transfer-tree-container'
SELECTOR_BUTTON_ADD_DIR_TO_TRANSFER = 'button.pull-right[type=submit]'
SELECTOR_BUTTON_BROWSE_TRANSFER_SOURCES = \
'button[data-target="#transfer_browse_tree"]'
SELECTOR_BUTTON_START_TRANSFER = 'button[ng-click="vm.transfer.start()"]'
DEFAULT_AM_USERNAME = 'test',
DEFAULT_AM_PASSWORD = 'testtest',
DEFAULT_AM_URL = 'http://192.168.168.192/',
DEFAULT_SS_USERNAME = 'test',
DEFAULT_SS_PASSWORD = 'test',
DEFAULT_SS_URL = 'http://192.168.168.192:8000/',
DUMMY_VAL = 'Archivematica Acceptance Test'
METADATA_ATTRS = ('title', 'creator')
JOB_OUTPUTS_COMPLETE = (
'Failed',
'Completed successfully',
'Awaiting decision')
TMP_DIR_NAME = '.amsc-tmp'
def squash(string):
"""Simple function that makes it easy to compare two strings for
equality even if they have incidental (for our purposes) formatting
differences.
"""
return string.strip().lower().replace(' ', '')
class ArchivematicaSeleniumException(Exception):
pass
def recurse_on_stale(func):
"""Decorator that re-runs a method if it triggers a
``StaleElementReferenceException``. This error occurs when AM's JS repaints
the DOM and we're holding on to now-destroyed elements.
"""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except StaleElementReferenceException:
return wrapper(*args, **kwargs)
return wrapper
class ArchivematicaSeleniumError(Exception):
pass
class ArchivematicaSelenium:
"""Selenium tests for MediaConch-related functionality in Archivematica.
TODOs:
1. Test in multiple different browser and platform combinations.
2. Run headless.
3. Fix issues: search for "TODO/WARNING"
"""
# =========================================================================
# Config.
# =========================================================================
# General timeout for page load and JS changes (in seconds)
timeout = 5
def __init__(self,
am_username=DEFAULT_AM_USERNAME,
am_password=DEFAULT_AM_PASSWORD,
am_url=DEFAULT_AM_URL,
am_api_key=None,
ss_username=DEFAULT_SS_USERNAME,
ss_password=DEFAULT_SS_PASSWORD,
ss_url=DEFAULT_SS_URL,
ss_api_key=None):
self.am_username = am_username
self.am_password = am_password
self.am_url = am_url
self.am_api_key = am_api_key
self.ss_username = ss_username
self.ss_password = ss_password
self.ss_url = ss_url
self._ss_api_key = ss_api_key
self._tmp_path = None
self.metadata_attrs = METADATA_ATTRS
self.dummy_val = DUMMY_VAL
# =========================================================================
# Test Infrastructure.
# =========================================================================
# Valuate this to 'Firefox' or 'Chrome'. 'PhantomJS' will fail.
# Note/TODO: Chrome is currently failing on my machine because the
# transfers are not displaying their jobs/microservices.
driver_name = 'Firefox'
# driver_name = 'PhantomJS'
all_drivers = []
def get_driver(self):
if self.driver_name == 'PhantomJS':
# These capabilities were part of a failed attempt to make the
# PhantomJS driver work.
cap = webdriver.DesiredCapabilities.PHANTOMJS
cap["phantomjs.page.settings.resourceTimeout"] = 20000
cap["phantomjs.page.settings.userAgent"] = \
('Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5)'
' AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116'
' Safari/537.36')
return webdriver.PhantomJS(desired_capabilities=cap)
driver = getattr(webdriver, self.driver_name)()
self.all_drivers.append(driver)
return driver
def set_up(self):
"""Use the Chrome or Firefox webdriver. Has worked with
- Chrome 52.0.2743.116 (64-bit)
- Firefox 47.01 (*note* does not work on v. 48.0)
"""
self.driver = self.get_driver()
self.driver.maximize_window()
def tear_down(self):
# Close all the $%&@#! browser windows!
for window_handle in self.driver.window_handles:
self.driver.switch_to.window(window_handle)
self.driver.close()
#self.clear_tmp_dir()
for driver in self.all_drivers:
try:
driver.close()
except:
pass
# =========================================================================
# Archivematica-specific Methods
# =========================================================================
# Archivematica high-level helpers ("public methods")
# =========================================================================
# These methods let you do high-level things in the AM GUI like logging in
# or starting a transfer with a given name and transfer directory.
def start_transfer(self, transfer_path, transfer_name, accession_no=None):
"""Start a new transfer with name ``transfer_name``, transfering the
directory at ``transfer_path``.
:param str transfer_path: the path to the transfer to be started as it
appears in the AM file explorer interface; should not start or end
with a forward slash.
:param str transfer_name: the name of the transfer; should be a valid
AM transfer name, i.e., one that AM will not alter. This is because
the name is used to re-identify the transfer from the DOM data.
Should match /[a-zA-Z0-9_]+/.
"""
self.navigate_to_transfer_tab()
self.enter_transfer_name(transfer_name)
if accession_no:
self.enter_accession_no(accession_no)
self.add_transfer_directory(transfer_path)
self.click_start_transfer_button()
transfer_uuid, transfer_div_elem = self.wait_for_transfer_to_appear(
transfer_name)
self.approve_transfer(transfer_div_elem)
return transfer_uuid
def login(self):
"""Login to Archivematica."""
self.driver.get(self.get_login_url())
username_input_id = 'id_username'
password_input_id = 'id_password'
try:
element_present = EC.presence_of_element_located(
(By.ID, username_input_id))
WebDriverWait(self.driver, self.timeout).until(element_present)
except TimeoutException:
print("Loading took too much time!")
username_elem = self.driver.find_element_by_id(username_input_id)
username_elem.send_keys(self.am_username)
password_elem = self.driver.find_element_by_id(password_input_id)
password_elem.send_keys(self.am_password)
submit_button_elem = self.driver.find_element_by_tag_name('button')
submit_button_elem.click()
# submit_button_elem.send_keys(Keys.RETURN)
def remove_all_transfers(self):
"""Remove all transfers in the Transfers tab."""
self.navigate_to_transfer_tab()
self.wait_for_presence(self.transfer_div_selector, 20)
while True:
top_transfer_elem = self.get_top_transfer()
if not top_transfer_elem:
break
self.remove_top_transfer(top_transfer_elem)
def remove_all_ingests(self):
"""Remove all ingests in the Ingest tab."""
url = self.get_ingest_url()
self.driver.get(url)
if self.driver.current_url != url:
self.login()
self.driver.get(url)
self.wait_for_presence(self.transfer_div_selector, 20)
while True:
top_transfer_elem = self.get_top_transfer()
if not top_transfer_elem:
break
self.remove_top_transfer(top_transfer_elem)
# URL getters
# =========================================================================
def get_ss_login_url(self):
return '{}login/'.format(self.ss_url)
def get_edit_default_processing_config_url(self):
return '{}administration/processing/edit/default/'.format(
self.am_url)
def get_default_ss_user_edit_url(self):
return '{}administration/users/1/edit/'.format(self.ss_url)
def get_ss_users_url(self):
return '{}administration/users/'.format(self.ss_url)
def get_transfer_url(self):
return '{}transfer/'.format(self.am_url)
def get_storage_setup_url(self):
return '{}installer/storagesetup/'.format(self.am_url)
def get_ingest_url(self):
return '{}ingest/'.format(self.am_url)
def get_metadata_add_url(self, sip_uuid):
return '{}ingest/{}/metadata/add/'.format(self.am_url, sip_uuid)
def get_preservation_planning_url(self):
return '{}fpr/format/'.format(self.am_url)
def get_archival_storage_url(self, aip_uuid=None):
if aip_uuid:
return '{}archival-storage/{}/'.format(self.am_url, aip_uuid)
return '{}archival-storage/'.format(self.am_url)
def get_rules_url(self):
return '{}fpr/fprule/'.format(self.am_url)
def get_create_rule_url(self):
return '{}fpr/fprule/create/'.format(self.am_url)
def get_normalization_rules_url(self):
return '{}fpr/fprule/normalization/'.format(self.am_url)
def get_policies_url(self):
return '{}administration/policies/'.format(self.am_url)
def get_validation_commands_url(self):
return '{}fpr/fpcommand/validation/'.format(self.am_url)
def get_create_command_url(self):
return '{}fpr/fpcommand/create/'.format(self.am_url)
def get_login_url(self):
return '{}administration/accounts/login/'.format(self.am_url)
def get_tasks_url(self, job_uuid):
return '{}tasks/{}/'.format(self.am_url, job_uuid)
def get_normalization_report_url(self, sip_uuid):
return '{}ingest/normalization-report/{}/'.format(
self.am_url, sip_uuid)
def get_installer_welcome_url(self):
return '{}installer/welcome/'.format(self.am_url)
# CSS classes, selectors and other identifiers
# =========================================================================
# CSS class of the "Add" links in the AM file explorer.
add_transfer_folder_class = \
'backbone-file-explorer-directory_entry_actions'
# CSS selector for the <div> holding an entire transfer.
transfer_div_selector = 'div.sip'
# CSS selector for the <div> holding the gear icon, the roport icon, etc.
transfer_actions_selector = 'div.job-detail-actions'
# UUID for the "Approve transfer" option
approve_transfer_uuid = '6953950b-c101-4f4c-a0c3-0cd0684afe5e'
# Archivematica methods
# =========================================================================
def parse_mediaconch_cmd_stdout(self, stdout):
"""Return the JSON parse of the first JSON-parseable line in
``stdout``, else ``{}``.
"""
for line in stdout.splitlines():
try:
return json.loads(line)
except ValueError:
pass
return {}
@recurse_on_stale
def get_job_output(self, ms_name, transfer_uuid):
"""Get the output---"Completed successfully", "Failed"---of the Job
model representing the execution of micro-service ``ms_name`` in
transfer ``transfer_uuid``.
"""
ms_group_elem = self.get_transfer_micro_service_group_elem(
group_name, transfer_uuid)
for job_elem in ms_group_elem.find_elements_by_css_selector('div.job'):
for span_elem in job_elem.find_elements_by_css_selector(
'div.job-detail-microservice span'):
if span_elem.text.strip() == ms_name:
return job_elem.find_element_by_css_selector(
'div.job-detail-currentstep span').text.strip()
return None
def get_sip_uuid(self, transfer_name):
self.driver.close()
self.driver = self.get_driver()
ingest_url = self.get_ingest_url()
self.driver.get(ingest_url)
if self.driver.current_url != ingest_url:
self.login()
self.driver.get(ingest_url)
sip_uuid, ingest_div_elem = self.wait_for_transfer_to_appear(
transfer_name)
return sip_uuid
def get_mets(self, transfer_name, sip_uuid=None, parse_xml=True):
"""Return the METS file XML as a string.
WARNING: this only works if the processingMCP.xml config file is set to
*not* store the AIP.
"""
if not sip_uuid:
sip_uuid = self.get_sip_uuid(transfer_name)
ingest_url = self.get_ingest_url()
self.navigate(ingest_url)
# Wait for the "Store AIP" micro-service.
self.expose_job('Store AIP (review)', sip_uuid, 'ingest')
aip_preview_url = '{}/ingest/preview/aip/{}'.format(
self.am_url, sip_uuid)
self.navigate(aip_preview_url)
mets_path = 'storeAIP/{}-{}/METS.{}.xml'.format(
transfer_name, sip_uuid, sip_uuid)
self.navigate_to_aip_directory_and_click(mets_path)
self.wait_for_new_window()
original_window_handle = self.driver.window_handles[0]
new_window_handle = self.driver.window_handles[1]
self.driver.switch_to.window(new_window_handle)
mets = self.driver.page_source
self.driver.switch_to.window(original_window_handle)
if parse_xml:
return etree.fromstring(mets.encode('utf8'))
return mets
def wait_for_new_window(self, timeout=10):
handles_before = self.driver.window_handles
yield
WebDriverWait(self.driver, timeout).until(
lambda driver: len(handles_before) != len(driver.window_handles))
def navigate_to_aip_directory_and_click(self, path):
"""Click on the file at ``path`` in the "Review AIP" interface.
TODO: non-DRY given
``navigate_to_transfer_directory_and_click``--fix if possible.
"""
try:
self._navigate_to_aip_directory_and_click(path)
except (TimeoutException, MoveTargetOutOfBoundsException):
self.click_aip_directory_tries += 1
if (self.click_aip_directory_tries >=
self.max_click_aip_directory_tries):
print('Failed to navigate to aip directory'
' {}'.format(path))
self.click_aip_directory_tries = 0
raise
else:
self.navigate_to_aip_directory_and_click(path)
else:
self.click_aip_directory_tries = 0
def _navigate_to_aip_directory_and_click(self, path):
self.cwd = [
'explorer_var_archivematica_sharedDirectory_watchedDirectories']
while path.startswith('/'):
path = path[1:]
while path.endswith('/'):
path = path[:-1]
path_parts = path.split('/')
if path_parts[-1].startswith('METS.'):
path_parts[-1] = 'METS__{}'.format(path_parts[-1][5:])
for i, folder in enumerate(path_parts):
is_last = False
if i == len(path_parts) - 1:
is_last = True
self.cwd.append(folder)
folder_id = '_'.join(self.cwd)
block = WebDriverWait(self.driver, 1)
block.until(EC.presence_of_element_located(
(By.ID, 'explorer')))
if is_last:
self.click_file_old_browser(folder_id)
# self.click_file(folder_id)
else:
self.click_folder_old_browser(folder_id)
# self.click_folder(folder_id)
def expose_job(self, ms_name, transfer_uuid, unit_type='transfer'):
"""Expose (i.e., click MS group and wait for appearance of) the job
representing the execution of the micro-service named ``ms_name`` on
the transfer/SIP with UUID ``transfer_uuid``.
"""
# Navigate to the Transfers or Ingest tab, depending on ``unit_type``
# (if we're not there already)
unit_url = self.get_transfer_url()
if unit_type != 'transfer':
unit_url = self.get_ingest_url()
self.navigate(unit_url)
ms_name, group_name = self.micro_service2group(ms_name)
# If not visible, click the micro-service group to expand it.
self.wait_for_transfer_micro_service_group(group_name, transfer_uuid)
is_visible = self.get_transfer_micro_service_group_elem(
group_name, transfer_uuid)\
.find_element_by_css_selector('div.microservice-group + div')\
.is_displayed()
if not is_visible:
self.get_transfer_micro_service_group_elem(
group_name, transfer_uuid).click()
self.wait_for_microservice_visibility(
ms_name, group_name, transfer_uuid)
return ms_name, group_name
def await_job_completion(self, ms_name, transfer_uuid,
unit_type='transfer'):
"""Wait for the job representing the execution of micro-service
``ms_name`` on the unit with UUID ``transfer_uuid`` to complete.
"""
ms_name, group_name = self.expose_job(ms_name, transfer_uuid, unit_type)
job_uuid, job_output = self.get_job_uuid(
ms_name, group_name, transfer_uuid)
return job_uuid, job_output
def await_decision_point(self, ms_name, transfer_uuid,
unit_type='transfer'):
"""Wait for the decision point job for micro-service ``ms_name`` to
appear.
"""
logger.debug('Await decision point with unit_type {}'.format(unit_type))
ms_name, group_name = self.expose_job(ms_name, transfer_uuid, unit_type)
job_uuid, job_output = self.get_job_uuid(
ms_name, group_name, transfer_uuid,
job_outputs=('Awaiting decision',))
return job_uuid, job_output
@property
def tmp_path(self):
if not self._tmp_path:
here = os.path.dirname(os.path.abspath(__file__))
self._tmp_path = os.path.join(here, TMP_DIR_NAME)
if not os.path.isdir(self._tmp_path):
os.makedirs(self._tmp_path)
return self._tmp_path
def clear_tmp_dir(self):
for thing in os.listdir(self.tmp_path):
thing_path = os.path.join(self.tmp_path, thing)
try:
if os.path.isfile(thing_path):
os.unlink(thing_path)
elif os.path.isdir(thing_path):
shutil.rmtree(thing_path)
except Exception as e:
print(e)
def wait_for_aip_in_archival_storage(self, aip_uuid):
"""Wait for the AIP with UUID ``aip_uuid`` to appear in the Archival
storage tab.
"""
max_seconds = 120
seconds = 0
while True:
self.navigate(self.get_archival_storage_url(), reload=True)
self.driver.find_element_by_css_selector(
'input[title="search query"]').send_keys(aip_uuid)
Select(self.driver.find_element_by_css_selector(
'select[title="field name"]')).select_by_visible_text(
'AIP UUID')
Select(self.driver.find_element_by_css_selector(
'select[title="query type"]')).select_by_visible_text(
'Phrase')
self.driver.find_element_by_id('search_submit').click()
summary_el = self.driver.find_element_by_css_selector(
'div.search-summary')
if 'No results, please try another search.' in summary_el.text:
seconds += 1
if seconds > max_seconds:
break
time.sleep(1)
else:
time.sleep(1) # Sleep a little longer, for good measure
break
def initiate_reingest(self, aip_uuid, reingest_type='metadata-only'):
url = self.get_archival_storage_url(aip_uuid=aip_uuid)
max_attempts = 10
attempt = 0
while True:
if attempt > max_attempts:
raise ArchivematicaSeleniumError('Unable to navigate to'
' {}'.format(url))
r = requests.get(url)
if r.status_code == requests.codes.ok:
logger.info('Requests got OK status code {} when requesting'
' {}'.format(r.status_code, url))
break
logger.info('Requests got bad status code {} when requesting'
' {}; waiting for 1 second before trying'
' again'.format(r.status_code, url))
attempt += 1
time.sleep(1)
self.navigate(url)
reingest_tab_selector = 'a[href="#tab-reingest"]'
self.wait_for_presence(reingest_tab_selector, timeout=10)
try:
self.driver.find_element_by_css_selector(
reingest_tab_selector).click()
except:
logger.warning('Unable to find Re-ingest tab using selector'
' {}'.format(reingest_tab_selector))
time.sleep(20)
type_selector = {
'metadata-only': 'input#id_reingest-reingest_type_1',
'metadata-and-objects': 'input#id_reingest-reingest_type_2'
}.get(reingest_type)
if not type_selector:
raise ArchivematicaSeleniumError('Unable to initiate a reingest of'
' type {} on AIP {}'.format(reingest_type, aip_uuid))
self.driver.find_element_by_css_selector(type_selector).click()
self.driver.find_element_by_css_selector(
'button[name=submit-reingest-form]').click()
alert_text = self.driver.find_element_by_css_selector(
'div.alert-success').text.strip()
assert alert_text.startswith('Package {} sent to pipeline'.format(aip_uuid))
assert alert_text.endswith('for re-ingest')
def add_dummy_metadata(self, sip_uuid):
self.navigate(self.get_ingest_url())
self.driver.find_element_by_id('sip-row-{}'.format(sip_uuid))\
.find_element_by_css_selector('a.btn_show_metadata').click()
self.navigate(self.get_metadata_add_url(sip_uuid))
for attr in self.metadata_attrs:
self.driver.find_element_by_id('id_{}'.format(attr))\
.send_keys(self.dummy_val)
try:
self.driver.find_element_by_css_selector(
'input[value=Create]').click()
except NoSuchElementException:
# Should be a "Create" button but sometimes during development the
# metadata already exists so it is a "Save" button.
self.driver.find_element_by_css_selector(
'input[value=Save]').click()
def save_download(self, request, file_path):
with open(file_path, 'wb') as f:
for block in request.iter_content(1024):
f.write(block)
def download_aip(self, transfer_name, sip_uuid):
"""Use the AM SS to download the completed AIP.
Calls http://localhost:8000/api/v2/file/<SIP-UUID>/download/\
?username=<SS-USERNAME>&api_key=<SS-API-KEY>
"""
payload = {'username': self.ss_username, 'api_key': self.ss_api_key}
logger.debug('payload for downloading aip {}'.format(payload))
url = '{}api/v2/file/{}/download/'.format(self.ss_url, sip_uuid)
aip_name = '{}-{}.7z'.format(transfer_name, sip_uuid)
aip_path = os.path.join(self.tmp_path, aip_name)
max_attempts = 20
attempt = 0
while True:
r = requests.get(url, params=payload, stream=True)
if r.ok:
self.save_download(r, aip_path)
return aip_path
elif r.status_code in (404, 500) and attempt < max_attempts:
logger.warning(
'Trying again to download AIP {} via GET request to URL {};'
' SS returned status code {} and message {}'.format(
sip_uuid, url, r.status_code, r.text))
attempt += 1
time.sleep(1)
else:
logger.warning('Unable to download AIP {} via GET request to'
' URL {}; SS returned status code {} and message'
' {}'.format(sip_uuid, url, r.status_code,
r.text))
raise ArchivematicaSeleniumError(
'Unable to download AIP {}'.format(sip_uuid))
def decompress_aip(self, aip_path):
aip_dir_path, _ = os.path.splitext(aip_path)
try:
devnull = getattr(subprocess, 'DEVNULL')
except AttributeError:
devnull = open(os.devnull, 'wb')
p = subprocess.Popen(
shlex.split('7z x {} -aoa'.format(aip_path)),
cwd=self.tmp_path,
stdout=devnull,
stderr=subprocess.STDOUT)
p.wait()
assert p.returncode == 0
assert os.path.isdir(aip_dir_path), ('Failed to create dir {} from'
' compressed AIP at {}'.format(aip_dir_path, aip_path))
return aip_dir_path
@recurse_on_stale
def make_choice(self, choice_text, decision_point, uuid_val,
unit_type='transfer'):
"""Make the choice matching the text ``choice_text`` at decision point
(i.e., microservice) job matching ``decision_point``.
"""
decision_point, group_name = self.expose_job(
decision_point, uuid_val, unit_type=unit_type)
ms_group_elem = self.get_transfer_micro_service_group_elem(
group_name, uuid_val)
action_div_el = None
for job_elem in ms_group_elem.find_elements_by_css_selector('div.job'):
for span_elem in job_elem.find_elements_by_css_selector(
'div.job-detail-microservice span'):
if squash(span_elem.text) == squash(decision_point):
action_div_el = job_elem.find_element_by_css_selector(
'div.job-detail-actions')
break
if action_div_el:
break
if action_div_el:
try:
select_el = action_div_el.find_element_by_css_selector('select')
except NoSuchElementException:
time.sleep(0.5)
self.make_choice(choice_text, decision_point, uuid_val,
unit_type=unit_type)
index = None
for i, option_el in enumerate(
select_el.find_elements_by_tag_name('option')):
if squash(choice_text) in squash(option_el.text):
index = i
if index is not None:
Select(select_el).select_by_index(index)
else:
raise Exception('Unable to select choice'
' "{}"'.format(choice_text))
else:
raise Exception('Unable to find decision point {}'.format(
decision_point))
def parse_job(self, ms_name, transfer_uuid, unit_type='transfer'):
"""Parse the job representing the execution of the micro-service named
``ms_name`` on the transfer with UUID ``transfer_uuid``. Return a dict
containing the ``job_output`` (e.g., "Failed") and the parsed tasks
<table> as a dict with the following format::
>>> {
'<task_uuid>': {
'task_uuid': '...',
'file_uuid': '...',
'file_name': '...',
'client': '...',
'exit_code': '...',
'command': '...',
'arguments': [...],
'stdout': '...',
'stderr': '...'
},
'<task_uuid>': { ... }
}
"""
ms_name, group_name = self.expose_job(ms_name, transfer_uuid, unit_type)
# If we don't wait for a second here, then sometimes the tasks page
# returns incorrect data because (assumedly) the tasks haven't been
# written to disk correctly (?) What happens is that tasks will have an
# exit code of 'None' in the interface but when you look at them in the
# db, they have a sensible exit code.
# TODO: this doesn't solve the problem. Figure out why these strange
# exit codes sometimes show up.
time.sleep(1)
# Getting the Job UUID also means waiting for the job to terminate.
job_uuid, job_output = self.get_job_uuid(ms_name, group_name,
transfer_uuid)
# Open the tasks in a new browser window with a new
# Selenium driver; then parse the table there.
table_dict = {'job_output': job_output, 'tasks': {}}
tasks_url = self.get_tasks_url(job_uuid)
table_dict = self.parse_tasks_table(tasks_url, table_dict)
return table_dict
def parse_tasks_table(self, tasks_url, table_dict):
old_driver = self.driver
table_dict = self._parse_tasks_table(tasks_url, table_dict)
self.driver = old_driver
return table_dict
def _parse_tasks_table(self, tasks_url, table_dict):
old_driver = self.driver
self.driver = self.get_driver()
if self.driver.current_url != tasks_url:
self.login()
self.driver.get(tasks_url)
self.wait_for_presence('table')
# Parse the <table> to a dict and return it.
table_elem = self.driver.find_element_by_tag_name('table')
row_dict = {}
for row_elem in table_elem.find_elements_by_tag_name('tr'):
row_type = self.get_tasks_row_type(row_elem)
if row_type == 'header':
if row_dict:
table_dict['tasks'][row_dict['task_uuid']] = row_dict
row_dict = self.process_task_header_row(row_elem, {})
elif row_type == 'command':
row_dict = self.process_task_command_row(row_elem, row_dict)
elif row_type == 'stdout':
row_dict = self.process_task_stdout_row(row_elem, row_dict)
else:
row_dict = self.process_task_stderr_row(row_elem, row_dict)
table_dict['tasks'][row_dict['task_uuid']] = row_dict
next_tasks_url = None
for link_button in self.driver.find_elements_by_css_selector('a.btn'):
if link_button.text.strip() == 'Next Page':
next_tasks_url = '{}{}'.format(
self.am_url, link_button.get_attribute('href'))
self.driver.close()
if next_tasks_url:
table_dict = self._parse_tasks_table(next_tasks_url, table_dict)
return table_dict
def get_task_by_file_name(self, file_name, tasks):
try:
return [t for t in tasks.values()
if t['file_name'] == file_name][0]
except IndexError:
return None
def process_task_header_row(self, row_elem, row_dict):
"""Parse the text in the first tasks <tr>, the one "File UUID:"."""
for line in row_elem.find_element_by_tag_name('td').text\
.strip().split('\n'):
line = line.strip()
if line.startswith('('):
line = line[1:]
if line.endswith(')'):
line = line[:-1]
attr, val = [x.strip() for x in line.split(':')]
row_dict[attr.lower().replace(' ', '_')] = val
return row_dict
def process_task_command_row(self, row_elem, row_dict):
"""Parse the text in the second tasks <tr>, the one specifying command
and arguments."""
command_text = \
row_elem.find_element_by_tag_name('td').text.strip().split(':')[1]
command, *arguments = command_text.split()
arguments = ' '.join(arguments)
if arguments[0] == '"':
arguments = arguments[1:]
if arguments[-1] == '"':
arguments = arguments[:-1]
row_dict['command'] = command
row_dict['arguments'] = arguments.split('" "')
return row_dict
def process_task_stdout_row(self, row_elem, row_dict):
"""Parse out the tasks's stdout from the <table>."""
row_dict['stdout'] = \
row_elem.find_element_by_tag_name('pre').text.strip()
return row_dict
def process_task_stderr_row(self, row_elem, row_dict):
"""Parse out the tasks's stderr from the <table>."""
row_dict['stderr'] = \
row_elem.find_element_by_tag_name('pre').text.strip()
return row_dict
def get_tasks_row_type(self, row_elem):
"""Induce the type of the row ``row_elem`` in the tasks table.
Note: tasks are represented as a table where blocks of adjacent rows
represent the outcome of a single task. All tasks appear to have
"header" and "command" rows, but not all have "sdtout" and "stderr(or)"
rows.
"""
if row_elem.get_attribute('class').strip():
return 'header'
try:
row_elem.find_element_by_css_selector('td.stdout')
return 'stdout'
except NoSuchElementException:
pass
try:
row_elem.find_element_by_css_selector('td.stderror')
return 'stderr'
except NoSuchElementException:
pass
return 'command'
# This should map all micro-service names (i.e., descriptions) to their
# groups, just so tests don't need to specify both.
# TODO: complete the mapping.
# WARNING: some micro-services map to multiple groups. This will currently
# break operations that require waiting for one of those micro-services,
# performing an action on one of them, etc.
# The following JavaScript at the console will create an object mapping all
# (run) micro-service names to their micro-service group names.
"""
var map_ = {};
$('div.sip').first().find('div.microservicegroup').each(function(){
var group = $(this).find(
'span.microservice-group-name').text().replace(
'Micro-service: ', '');
var children = $(this).children();
if (!$(children[1]).is(':visible')) { children[0].click() }
$(children[1]).find('div.job').each(function(){
var ms = $(this).find(
'div.job-detail-microservice span[title]').text();
if (map_.hasOwnProperty(ms)) {
console.log(
ms + ' is a DUPLICATE!: ' + group + ' and ' + map_[ms]);
} else {
map_[ms] = group;
}
});
});
console.log(JSON.stringify(map_, undefined, 2));
"""
micro_services2groups = {
'Add processed structMap to METS.xml document': ('Update METS.xml document',),
'Approve AIP reingest': ('Reingest AIP',),
'Approve normalization': ('Normalize',),
'Approve normalization (review)': ('Normalize',),
'Approve standard transfer': ('Approve transfer',),
'Assign checksums and file sizes to metadata': ('Process metadata directory',),
'Assign checksums and file sizes to objects': ('Assign file UUIDs and checksums',),
'Assign checksums and file sizes to submissionDocumentation': ('Process submission documentation',),
'Assign file UUIDs to metadata': ('Process metadata directory',),
'Assign file UUIDs to objects': ('Assign file UUIDs and checksums',),
'Assign file UUIDs to submission documentation': ('Process submission documentation',),
'Attempt restructure for compliance': ('Verify transfer compliance',),
'Characterize and extract metadata': ('Characterize and extract metadata',),
'Characterize and extract metadata on metadata files': ('Process metadata directory',),
'Characterize and extract metadata on submission documentation': ('Process submission documentation',),
'Check for Access directory': ('Normalize',),
'Check for Service directory': ('Normalize',),
'Check for manual normalized files': ('Process manually normalized files',),
'Check for specialized processing': ('Examine contents',),
'Check for submission documentation': ('Process submission documentation',),
'Check if AIP is a file or directory': ('Prepare AIP',),
'Check if DIP should be generated': ('Prepare AIP',),
'Check if SIP is from Maildir Transfer': ('Rename SIP directory with SIP UUID',),
'Check transfer directory for objects': ('Create SIP from Transfer',),
'Compress AIP': ('Prepare AIP',),
'Copy submission documentation': ('Prepare AIP',),
'Copy transfer submission documentation': ('Process submission documentation',),
'Copy transfers metadata and logs': ('Process metadata directory',),
'Create AIP Pointer File': ('Prepare AIP',),
'Create SIP from transfer objects': ('Create SIP from Transfer',),
'Create SIP(s)': ('Create SIP from Transfer',),
'Create thumbnails directory': ('Normalize',),
'Create transfer metadata XML': ('Complete transfer',),
'Designate to process as a standard transfer': ('Quarantine',),
'Determine if transfer contains packages': ('Extract packages',),
'Determine which files to identify': ('Identify file format',),
'Examine contents?': ('Examine contents',),
'Find type to process as': ('Quarantine',),
'Generate METS.xml document': ('Generate METS.xml document', 'Generate AIP METS'),
'Generate transfer structure report': ('Generate transfer structure report',),
'Grant normalization options for no pre-existing DIP': ('Normalize',),
'Identify file format': (
'Identify file format',
'Normalize',
'Process submission documentation'),
'Identify file format of metadata files': ('Process metadata directory',),
'Identify manually normalized files': ('Normalize',),
'Include default SIP processingMCP.xml': ('Include default SIP processingMCP.xml',),
'Include default Transfer processingMCP.xml': ('Include default Transfer processingMCP.xml',),
'Load Dublin Core metadata from disk': ('Clean up names',),
'Load labels from metadata/file_labels.csv': ('Characterize and extract metadata',),
'Load options to create SIPs': ('Create SIP from Transfer',),
'Move metadata to objects directory': ('Process metadata directory',),
'Move submission documentation into objects directory': ('Process submission documentation',),