diff --git a/zml2lido/__init__.py b/zml2lido/__init__.py
index c029877..8290399 100644
--- a/zml2lido/__init__.py
+++ b/zml2lido/__init__.py
@@ -74,7 +74,7 @@ def lido():
print(f"JOB: {args.job}")
lt = LidoTool(
- Input=args.input, force=args.force, validation=args.validate, chunks=args.chunks
+ src=args.input, force=args.force, validation=args.validate, chunks=args.chunks
)
lt.execute(args.job)
diff --git a/zml2lido/lidoTool.py b/zml2lido/lidoTool.py
index b690d2f..e4165ae 100644
--- a/zml2lido/lidoTool.py
+++ b/zml2lido/lidoTool.py
@@ -7,7 +7,7 @@
C:/M3/zml2lido
You need to specify three parameters
-j/--job: which flavor (job) of the transformation you want to use
- -i/--input: where the input xml file is
+ -i/--src: where the src xml file is
-o/--output: will be used as output directory; in my case
C:/m3/zml2lido/sdata/{output}
@@ -56,23 +56,23 @@ class LidoTool:
def __init__(
self,
*,
- Input: str,
+ src: str,
force: bool = False,
validation: bool = False,
chunks: bool = False,
) -> None:
"""
- Input: lido file or first chunk
+ src: lido file or first chunk
force: overwrites files
validation: validate lido files?
- chunks: expect consecutively numbered and zipped lido files as input
+ chunks: expect consecutively numbered and zipped lido files as src
"""
self.validation = validation
self.force = force
self.chunks = chunks
- self.Input = self._sanitize(Input=Input)
+ self.src = self._sanitize(src=src)
self.outdir = self._prepareOutdir()
self._initLog()
@@ -83,20 +83,20 @@ def __init__(
def execute(self, job: str) -> None:
if job == "dd":
# debug. Only lvl1
- lido_fn = self.zml2lido(Input=self.Input)
+ lido_fn = self.zml2lido(src=self.src)
elif job == "ddd":
# debug. Only lvl1
- lido_fn = self.zml2lido(Input=self.Input)
+ lido_fn = self.zml2lido(src=self.src)
self._valsplit(lido_fn)
elif job == "ohneLit":
# use different xslt for lvl1 conversion plus lvl2
- lido_fn = self.zml2lido(Input=self.Input, xslt="ohneLit")
- lvl2_fn = self.to_lvl2(Input=lido_fn)
+ lido_fn = self.zml2lido(src=self.src, xslt="ohneLit")
+ lvl2_fn = self.to_lvl2(src=lido_fn)
self._valsplit(lvl2_fn)
elif job == "mitLit":
# regular xslt, lvl2
- lido_fn = self.zml2lido(Input=self.Input)
- lvl2_fn = self.to_lvl2(Input=lido_fn)
+ lido_fn = self.zml2lido(src=self.src)
+ lvl2_fn = self.to_lvl2(src=lido_fn)
self._valsplit(lvl2_fn)
else:
raise SyntaxError("ERROR: Unknown job name!")
@@ -105,53 +105,53 @@ def lfilter(self, *, split: bool = False, Type: str) -> None:
if not Type in xsl:
raise TypeError(f"Error: Unknown type '{Type}'")
- new_fn = self.Input.stem + f"-no{Type}.xml"
+ new_fn = self.src.stem + f"-no{Type}.xml"
out_fn = self.outdir / new_fn
- self.saxon(Input=self.Input, xsl=xsl[Type], output=out_fn)
+ self.saxon(src=self.src, xsl=xsl[Type], output=out_fn)
if split:
self.force = True
- self.splitLido(Input=out_fn)
+ self.splitLido(src=out_fn)
- def to_lvl2(self, *, Input: str) -> Path:
+ def to_lvl2(self, *, src: str) -> Path:
if self.chunks:
- for chunkFn in self.loopChunks(Input=Input):
- new_fn = self.to_lvl2Single(Input=chunkFn)
- return self.firstChunkName(Input=new_fn)
+ for chunkFn in self.loopChunks(src=src):
+ new_fn = self.to_lvl2Single(src=chunkFn)
+ return self.firstChunkName(src=new_fn)
else:
- return self.to_lvl2Single(Input=Input)
+ return self.to_lvl2Single(src=src)
- def to_lvl2Single(self, *, Input: str) -> Path:
+ def to_lvl2Single(self, *, src: str | Path) -> Path:
"""
- Using Python rewrite (fix) generic Zetcom xml, mostly working on
- links (urls)
+ Using Python rewrite (fix) generic Zetcom xml, mostly working on links (urls)
"""
- out_fn = self._lvl2_path(Input)
+ out_fn = self._lvl2_path(src)
# print(f"lvl2: {out_fn}")
- # init for each chunk required, although we will
- lc = LinkChecker(Input=Input, chunks=self.chunks) # reads cache
+ try: # only load the first time
+ self.lc: LinkChecker
+ except:
+ self.lc = LinkChecker(src=src, chunks=self.chunks) # reads cache
if not out_fn.exists() or self.force:
- lc.relWorks_cache_single(fn=Input)
- lc.rmUnpublishedRecords() # remove unpublished records (not on SMB-Digital)
- # lc.rmInternalLinks() # remove resourceSets with internal links
- lc.fixRelatedWorks()
- lc.saveTree(out_fn)
+ # self.lc.relWorks_cache_single(fn=src)
+ self.lc.rmUnpublishedRecords() # remove unpublished records (not on SMB-Digital)
+ self.lc.fixRelatedWorks()
+ self.lc.saveTree(out_fn)
else:
- print(f" rewrite exists already: {out_fn}, no overwrite")
+ print(f" lvl2 already exists: {out_fn}")
return out_fn
- def splitLido(self, *, Input: str) -> str:
+ def splitLido(self, *, src: str | Path) -> str | Path:
# print("SPLITLIDO enter")
if self.chunks:
self.force = True # otherwise subsequent chunks are not written
- for chunkFn in self.loopChunks(Input=Input):
- self.splitLidoSingle(Input=chunkFn)
+ for chunkFn in self.loopChunks(src=src):
+ self.splitLidoSingle(src=chunkFn)
else:
- self.splitLidoSingle(Input=Input)
- return Input # dont act on split files
+ self.splitLidoSingle(src=src)
+ return src # dont act on split files
- def splitLidoSingle(self, *, Input: str) -> None:
+ def splitLidoSingle(self, *, src: str | Path) -> None:
"""
Create individual files per lido record
"""
@@ -161,21 +161,21 @@ def splitLidoSingle(self, *, Input: str) -> None:
if not splitDir.exists() or self.force: # self.force is True was problematic
print("SPLITLIDO making")
os.chdir(self.outdir)
- self.saxon(Input=Input, xsl=xsl["splitLido"], output="o.xml")
+ self.saxon(src=src, xsl=xsl["splitLido"], output="o.xml")
os.chdir(orig)
else:
print(f" SPLIT DIR exists already: {splitDir}")
- def splitSachbegriff(self, *, Input: str) -> Path:
+ def splitSachbegriff(self, *, src: str) -> Path:
print("SPLITSACHBEGRIFF")
if self.chunks:
- for chunkFn in self.loopChunks(Input=Input):
- sachbegriffFn = self.splitSachbegriff(Input=chunkFn)
- return self.firstChunkName(Input=sachbegriffFn)
+ for chunkFn in self.loopChunks(src=src):
+ sachbegriffFn = self.splitSachbegriff(src=chunkFn)
+ return self.firstChunkName(src=sachbegriffFn)
else:
- return self.splitSachbegriffSingle(Input=Input)
+ return self.splitSachbegriffSingle(src=src)
- def splitSachbegriffSingle(self, *, Input: str) -> Path:
+ def splitSachbegriffSingle(self, *, src: str) -> Path:
"""
Writes two files to output dir
ohneSachbegriff.xml is meant for debugging.
@@ -184,13 +184,13 @@ def splitSachbegriffSingle(self, *, Input: str) -> Path:
os.chdir(self.outdir)
out = "mitSachbegriff.xml"
if not Path(out).exists() or self.force is True:
- self.saxon(Input=Input, xsl=xsl["splitSachbegriff"], output=out)
+ self.saxon(src=src, xsl=xsl["splitSachbegriff"], output=out)
else:
print(f"{out} exist already, no overwrite")
os.chdir(orig)
return xslDir / out
- def validate(self, *, path: Optional[str] = None):
+ def validate(self, *, p: str | Path | None = None):
"""
It's optionally possible to specify a path for a file that needs validatation. If
path is None, the file that was specified during __init__ will be validated.
@@ -200,74 +200,74 @@ def validate(self, *, path: Optional[str] = None):
(Not tested recently for chunks...)
"""
- if path is None:
- to_val_fn = self.Input
+ if p is None:
+ to_val_fn = self.src
else:
- to_val_fn = path
+ to_val_fn: Path = Path(p)
print(f"VALIDATING LIDO FILE {to_val_fn}")
if self.chunks:
print(" with chunks")
- for chunkFn in self.loopChunks(Input=to_val_fn):
- self.validateSingle(Input=chunkFn)
+ for chunkFn in self.loopChunks(src=to_val_fn):
+ self.validateSingle(src=chunkFn)
else:
- self.validateSingle(Input=to_val_fn)
+ self.validateSingle(src=to_val_fn)
- def validateSingle(self, *, Input):
+ def validateSingle(self, *, src):
if not hasattr(self, "schema"):
print(f" loading schema {lidoXSD}")
schemaDoc = etree.parse(lidoXSD)
self.schema = etree.XMLSchema(schemaDoc)
- print(f" validating {Input}")
- doc = etree.parse(str(Input))
+ print(f" validating {src}")
+ doc = etree.parse(str(src))
self.schema.assert_(doc) # raises error when not valid
- return Input
+ return src
- def zml2lido(self, *, Input, xslt="zml2lido"):
+ def zml2lido(self, *, src, xslt="zml2lido"):
print(f"ZML2LIDO {xslt}")
if self.chunks:
print(" with chunks")
- for chunkFn in self.loopChunks(Input=self.Input):
- lidoFn = self.zml2lidoSingle(Input=chunkFn, xslt=xslt)
- return self.firstChunkName(Input=lidoFn)
+ for chunkFn in self.loopChunks(src=self.src):
+ lidoFn = self.zml2lidoSingle(src=chunkFn, xslt=xslt)
+ return self.firstChunkName(src=lidoFn)
else:
- return self.zml2lidoSingle(Input=Input, xslt=xslt)
+ return self.zml2lidoSingle(src=src, xslt=xslt)
- def zml2lidoSingle(self, *, Input: str | Path, xslt="zml2lido") -> Path:
+ def zml2lidoSingle(self, *, src: str | Path, xslt="zml2lido") -> Path:
"""
Convert a single file from zml to lido using the specified xslt.
- Input is a full path.
+ src is a full path.
"""
- inputP = Path(Input)
- lidoFn = self.outdir.joinpath(inputP.stem + ".lido.xml")
+ srcP = Path(src)
+ lidoFn = self.outdir.joinpath(srcP.stem + ".lido.xml")
print(f"zml2lidoSingle with {xsl[xslt]}") # with file '{lidoFn}'
if self.force is True or not lidoFn.exists():
- if inputP.suffix == ".zip": # unzipping temp file
- print(" input is zipped")
- parent_dir = inputP.parent
- member = Path(inputP.name).with_suffix(".xml")
+ if srcP.suffix == ".zip": # unzipping temp file
+ print(" src is zipped")
+ parent_dir = srcP.parent
+ member = Path(srcP.name).with_suffix(".xml")
temp_fn = parent_dir / member
- with ZipFile(inputP, "r") as zippy:
+ with ZipFile(srcP, "r") as zippy:
zippy.extract(str(member), path=parent_dir)
- new_input = temp_fn
+ new_src = temp_fn
else:
- new_input = inputP
+ new_src = srcP
- self.saxon(Input=new_input, xsl=xsl[xslt], output=lidoFn)
+ self.saxon(src=new_src, xsl=xsl[xslt], output=lidoFn)
- if inputP.suffix == ".zip":
+ if srcP.suffix == ".zip":
temp_fn.unlink()
else:
- print(f"lidoFn exists {lidoFn}")
+ print(f"exists {lidoFn}")
return lidoFn
#
# more helpers
#
- def loopChunks(self, *, Input: str) -> Iterable[str]:
+ def loopChunks(self, *, src: str | Path) -> Iterable[str | Path]:
"""
returns generator with path for existing files, counting up as long
files exist. For this to work, filename has to include
@@ -275,16 +275,16 @@ def loopChunks(self, *, Input: str) -> Iterable[str]:
This might belong in chunker,py to be reusable.
"""
- print(f"chunk input: {Input}")
- root, no, tail = self._analyze_chunkFn(Input=Input)
- chunkFn = Input
+ print(f"chunk src: {src}")
+ root, no, tail = self._analyze_chunkFn(src=src)
+ chunkFn = src
while Path(chunkFn).exists():
yield chunkFn
# print(f"{chunkFn} exists")
no += 1
chunkFn = f"{root}-chunk{no}{tail}"
- def firstChunkName(self, *, Input: str | Path):
+ def firstChunkName(self, *, src: str | Path):
"""
returns the chunk with no. 1
@@ -294,30 +294,30 @@ def firstChunkName(self, *, Input: str | Path):
start with chunk1?
List glob root* and take the first item?
"""
- root, no, tail = self._analyze_chunkFn(Input=Input)
- Input = Path(Input)
- parent = Input.parent
+ root, no, tail = self._analyze_chunkFn(src=src)
+ src = Path(src)
+ parent = src.parent
folder = {}
for each in parent.iterdir():
if str(each).startswith(root):
- root, no, tail = self._analyze_chunkFn(Input=each)
+ root, no, tail = self._analyze_chunkFn(src=each)
folder[no] = each
no = min(folder.keys())
firstFn = folder[no]
# print(f"***firstChunkName {firstFn}")
return firstFn
- def saxon(self, *, Input: str, output: str, xsl: str) -> None:
+ def saxon(self, *, src: str | Path, output: str | Path, xsl: str | Path) -> None:
if not Path(saxLib).exists():
raise SyntaxError(f"ERROR: saxLib {saxLib} does not exist!")
- if not Path(Input).exists():
- raise SyntaxError(f"ERROR: input {Input} does not exist!")
+ if not Path(src).exists():
+ raise SyntaxError(f"ERROR: src {src} does not exist!")
if not Path(xsl).exists():
raise SyntaxError(f"ERROR: xsl file does not exist!")
- cmd = f"java -Xmx1450m -jar {saxLib} -s:{Input} -xsl:{xsl} -o:{output}"
+ cmd = f"java -Xmx1450m -jar {saxLib} -s:{src} -xsl:{xsl} -o:{output}"
print(cmd)
subprocess.run(
@@ -328,18 +328,19 @@ def saxon(self, *, Input: str, output: str, xsl: str) -> None:
# private helper
#
- def _analyze_chunkFn(self, *, Input: str):
+ def _analyze_chunkFn(self, *, src: str | Path) -> tuple[str, int, str]:
"""
- Input could be Path or str.
+ src could be Path or str.
This might belong in chunker.py ...
"""
- # print(f"ENTER ANALYZE WITH {Input}")
- partsL = str(Input).split("-chunk")
+ # print(f"ENTER ANALYZE WITH {src}")
+ partsL = str(src).split("-chunk")
root = partsL[0]
m = re.match(r"(\d+)[\.-]", partsL[1])
- no = int(m.group(1))
- tail = str(Input).split("-chunk" + str(no))[1]
+ if m is not None:
+ no = int(m.group(1))
+ tail = str(src).split("-chunk" + str(no))[1]
# print(f"_ANALYZE '{root}' '{no}' '{tail}'")
return root, no, tail
@@ -366,15 +367,15 @@ def _lvl2_path(self, p: str | Path) -> Path:
def _prepareOutdir(self) -> Path:
# determine outdir (long or short)
sdataP = Path("sdata").resolve() # resolve probably not necessary
- if re.match(r"\d\d\d\d\d\d", self.Input.parent.name):
- outdir = sdataP / self.Input.parents[1].name / self.Input.parent.name
- elif self.Input.parent.name == "sdata":
+ if re.match(r"\d\d\d\d\d\d", self.src.parent.name):
+ outdir = sdataP / self.src.parents[1].name / self.src.parent.name
+ elif self.src.parent.name == "sdata":
raise SyntaxError(
- """ERROR: Don't use an input file inside of sdata.
+ """ERROR: Don't use an src file inside of sdata.
Use a subdirectory instead!"""
)
else:
- outdir = sdataP / self.Input.parent.name
+ outdir = sdataP / self.src.parent.name
if not outdir.exists():
print(f"Making new dir {outdir}")
@@ -382,9 +383,9 @@ def _prepareOutdir(self) -> Path:
print(f" outdir {outdir}")
return outdir
- def _sanitize(self, *, Input: str) -> Path:
+ def _sanitize(self, *, src: str | Path) -> Path:
"""
- Input could be Path or str.
+ src could be Path or str.
Some checks for convenience; mainly for our users, so they get more intelligable
error messages.
@@ -396,21 +397,23 @@ def _sanitize(self, *, Input: str) -> Path:
raise SyntaxError(f"ERROR: Call me from directory '{script_dir}', please!")
if not Path(saxLib).is_file():
- raise SyntaxError(f"ERROR: Saxon not found, check config file at {conf_fn}")
+ raise SyntaxError(
+ f"ERROR: Saxon not found, check environment variable saxLib"
+ )
- # check Input
- if Input is None:
- raise SyntaxError("ERROR: Input can't be None!")
- Input = Path(Input) # initial input file, e.g. 3Wege.zml.xml
+ # check src
+ if src is None:
+ raise SyntaxError("ERROR: src can't be None!")
+ src = Path(src) # initial src file, e.g. 3Wege.zml.xml
- if Input.is_dir():
- raise SyntaxError("ERROR: Input is directory!")
- elif not Input.exists():
- raise SyntaxError("ERROR: Input does not exist!")
+ if src.is_dir():
+ raise SyntaxError("ERROR: src is directory!")
+ elif not src.exists():
+ raise SyntaxError("ERROR: src does not exist!")
- return Input
+ return src
def _valsplit(self, fn):
if self.validation:
self.validate(path=fn)
- self.splitLido(Input=fn)
+ self.splitLido(src=fn)
diff --git a/zml2lido/linkChecker.py b/zml2lido/linkChecker.py
index dcfbb49..90ebfb9 100644
--- a/zml2lido/linkChecker.py
+++ b/zml2lido/linkChecker.py
@@ -1,8 +1,8 @@
"""
- parse a LIDO file for linkResources and work on linkResources that don't start with http
+ parse a LIDO file for g and work on linkResources that don't start with http
for those guess the URL based on heuristics indicated by the examples path below
- write result to lido file in same dir as input
- input and output are lido
+ write result to lido file in same dir as src
+ src and output are lido
https://recherche.smb.museum/images/5403567_2500x2500.jpg
lidoWrap/lido/administrativeMetadata/resourceWrap/resourceSet/resourceRepresentation/linkResource
@@ -17,7 +17,7 @@
from mpapi.search import Search
from pathlib import Path
import re
-from typing import Optional, Union
+from typing import Any, Optional, Union
NSMAP = {"l": "http://www.lido-schema.org"}
relWorks_maxSize = 20000 # more lasts forever
@@ -25,46 +25,26 @@
class LinkChecker:
- def __init__(self, *, Input: str | Path, chunks: bool = False):
- self.log(f"STATUS: LinkChecker is working on {Input}") # not exactly an error
- self.Input = Path(Input)
+ def __init__(self, *, src: str | Path, chunks: bool = False) -> None:
+ self._log(f"STATUS: LinkChecker is working on {src}") # not exactly an error
+ self.src = Path(src)
# self.chunk = chunk
- self.relWorksFn = self.Input.parent / "relWorks.cache.xml"
- self.tree = etree.parse(str(Input))
+ self.relWorksFn = self.src.parent / "relWorks.cache.xml"
+ self.tree = etree.parse(str(src))
# we used to not prepare the relWorksCache here. Why?
+ self._init_relWorks_cache()
if chunks:
print("prepare relWorks cache (chunks, many)")
- self._relWorks_cache_many(first=Input) # run only once to make cache
-
- def checkRelWorkOnline(self, *, modType: str, modItemId: int):
- """
- Checks if a specific relWork is online. No urlrequest, just examins if
- SMB-Freigabe = Ja.
-
- Expects modItemId as int; but str should work as well.
- """
- r = self.relWorks.xpath(
- f"""/m:application/m:modules/m:module[
- @name = '{modType}']/m:moduleItem[
- @id = {str(modItemId)}]/m:repeatableGroup[
- @name = 'ObjPublicationGrp']/m:repeatableGroupItem[
- m:vocabularyReference[@name='PublicationVoc']/m:vocabularyReferenceItem[@name='Ja']
- and m:vocabularyReference[@name='TypeVoc']/m:vocabularyReferenceItem[@id = 2600647]
- ]"""
- )
- if len(r) > 0:
- return True
- else:
- return False
+ self._relWorks_cache_many(first=src) # run only once to make cache
- def fixRelatedWorks(self):
+ def fixRelatedWorks(self) -> None:
"""
Frank doesn't want dead links in relatedWorks. So we loop thru them, check
if they are SMB-approved (using MpApi) and, if not, we remove them. We're
also include ISILs in the same step.
"""
- self.log(
+ self._log(
"fixRelatedWorks: Removing relatedWorks that are not online and getting ISILs"
)
@@ -77,118 +57,40 @@ def fixRelatedWorks(self):
)
# for //relatedWork in the current LIDO document
- for ID in relatedWorksL:
- # don't log self.log(f"fixRelatedWorks checking {ID.text}")
+ for objectID in relatedWorksL:
+ # don't _log self._log(f"fixRelatedWorks checking {objectID.text}")
# assuming that source always exists
- src = ID.xpath("@l:source", namespaces=NSMAP)[0]
+ src = objectID.xpath("@l:source", namespaces=NSMAP)[0]
if src == "OBJ.ID":
modType = "Object"
elif src == "LIT.ID":
modType = "Literature"
elif src == "ISIL/ID":
raise ValueError(
- "ERROR: @lido:source='ISIL/ID' indicates that an already"
+ "ERROR: @lido:source='ISIL/ID' indicates that an already "
+ "processed LIDO file is being processed again"
)
modType = "Object"
else:
raise ValueError(f"ERROR: Unknown type: {src}")
- if ID.text is not None:
- id_int = int(ID.text)
- # only recursive should get us here
- # except:
- # id_int = int(ID.text.split("/")[-1])
- # print (f"*****{id_str} {modType}")
+ if objectID.text is not None:
+ id_int = int(objectID.text)
if modType == "Literature":
pass
# print("WARN: No check for modType 'Literature'")
else:
# print(f"fixing relatedWork {modType} {id_int}")
- try:
- # is the work already in the cache?
- relWorkN = self.relWorks[(modType, id_int)]
- except: # if not, get record and add it to cache
- print(f" getting item from online RIA {modType} {id_int}")
- # if not, get it now and add to cache
- q = Search(module=modType, limit=-1)
- q.addCriterion(
- operator="equalsField",
- field="__id",
- value=str(id_int),
- )
- q = self._optimize_relWorks_cache(query=q)
- # q.toFile(path="sdata/debug.search.xml")
- relWork = client.search2(query=q)
- if relWork: # realistic that query results are empty?
- # appending them to relWork cache
- self.relWorks += relWork
- # print (" update file cache")
- self.relWorks.toFile(path=self.relWorksFn)
- else:
- # if relWork record is already in cache
- relWork = Module()
- relWork.addItem(itemN=relWorkN, mtype=modType)
-
- if self.checkRelWorkOnline(modType=modType, modItemId=id_int):
- # rewrite ISIL, should look like this:
- # de-MUS-018313/744501
- # self.log(f" looking up ISIL for relWork")
- ID.attrib["{http://www.lido-schema.org}source"] = "ISIL/ID"
- # we're assuming there is always a verwaltendeInstitution, but that is not enforced by RIA!
- try:
- verwInst = relWork.xpath(
- """//m:moduleReference[
- @name='ObjOwnerRef'
- ]/m:moduleReferenceItem/m:formattedValue"""
- )[0]
- except:
- self.log(
- f"WARNING: verwaltendeInstitution empty! {modType} {id_int}"
- )
- else:
- ISIL = self.ISIL_lookup(institution=verwInst.text)
- ID.text = f"{ISIL}/{str(id_int)}"
- print(f" relWork {id_int}: {verwInst.text} -> {ISIL}")
- else:
- self.log(f" removing unpublic relWork")
- relWorkSet = ID.getparent().getparent().getparent()
- relWorkSet.getparent().remove(relWorkSet)
-
- def ISIL_lookup(self, *, institution):
- """
- Load vocmap.xml and lookup ISIL for name of institution.
+ if not self.relWorks.item_exists(mtype=modType, ID=id_int):
+ self._add_to_relWorks_cache(mtype=modType, ID=id_int)
+ # at this point we can rely on item being in relWorks cache
+ self._rewrite_relWork(mtype=modType, objectID=objectID)
- In the beginning, we die when no ISIL found, but later we might carp more gracefully.
+ def linkResource_online_http(self) -> None:
"""
- vm_fn = Path(__file__).parents[1] / "vocmap.xml"
- if not vm_fn.exists():
- raise SyntaxError(f"File not found {vm_fn}")
- vocMap = etree.parse(vm_fn)
- try:
- ISIL = vocMap.xpath(
- f"""/vocmap/voc[
- @name='verwaltendeInstitution'
- ]/concept[
- source = '{institution}'
- ]/target[
- @name = 'ISIL'
- ]"""
- )[0]
- except:
- raise SyntaxError(
- f"vocMap: verwaltendeInstitution '{institution}' not found"
- )
- return ISIL.text
-
- def log(self, msg):
- print(msg)
- logging.info(msg)
-
- def new_check(self):
- """
- For all linkResources, check if url responds ok
+ For all linkResources in self.tree, check if url responds ok using http.
+ Prints the result (which is a bit awkward).
"""
linkResourceL = self.tree.xpath(
"/l:lidoWrap/l:lido/l:administrativeMetadata/l:resourceWrap/l:resourceSet/l:resourceRepresentation/l:linkResource",
@@ -212,10 +114,12 @@ def new_check(self):
else:
print("\tsuccess")
- def relWorks_cache_single(self, *, fn):
+ def relWorks_cache_single(self, *, fn: str | Path) -> None:
"""
Extracts IDs from one file (fn), queriess RIA for those IDs and adds new info to
- self.relWorks
+ self.relWorks.
+
+ This function currently seems to be so slow that it's useless.
"""
fn = Path(fn)
ID_cache = set() # set of relWork ids, no duplicates
@@ -223,14 +127,14 @@ def relWorks_cache_single(self, *, fn):
print(f"growing relWorks with ids from {fn}")
self._grow_relWorks_cache(ID_cache)
- def rmInternalLinks(self):
+ def rmInternalLinks(self) -> None:
"""
- SEEMS TO BE NO LONGER NEEDED!
-
Remove resourceSet whose linkResource point to internal links;
links are internal if they dont begin with "http", e.g.
+
+ Not currently used.
"""
- self.log("resourceSet: Removing sets with remaining internal links")
+ self._log("resourceSet: Removing sets with remaining internal links")
linkResourceL = self.tree.xpath(
"/l:lidoWrap/l:lido/l:administrativeMetadata/l:resourceWrap/l:resourceSet/l:resourceRepresentation/l:linkResource",
namespaces=NSMAP,
@@ -241,13 +145,13 @@ def rmInternalLinks(self):
resourceSet = link.getparent().getparent()
resourceSet.getparent().remove(resourceSet)
- def rmUnpublishedRecords(self):
+ def rmUnpublishedRecords(self) -> None:
"""
Remove lido records which are not published on SMB Digital.
Assumes that only records which have SMBFreigabe=Ja have objectPublishedID
"""
- # self.log(
+ # self._log(
# " LinkChecker: Removing lido records that are not published on recherche.smb"
# )
recordsL = self.tree.xpath(
@@ -255,28 +159,65 @@ def rmUnpublishedRecords(self):
)
for recordN in recordsL:
recID = recordN.xpath("l:lidoRecID", namespaces=NSMAP)[0]
- self.log(f"rm unpublishedRecords: {recID}")
+ self._log(f"rm unpublishedRecords: {recID}")
recordN.getparent().remove(recordN)
- self.log("rmUnpublishedRecords: done!")
+ self._log("rmUnpublishedRecords: done!")
- def saveTree(self, out_fn) -> str:
+ def saveTree(self, out_fn: str | Path) -> str:
"""
During __init__ we loaded a LIDO file, with this function we write it back to the
out file location as set during __init__.
"""
- self.log(f"Writing back to {out_fn}")
+ self._log(f"Writing back to {out_fn}")
self.tree.write(
str(out_fn), pretty_print=True, encoding="UTF-8", xml_declaration=True
)
return out_fn
#
+ # PRIVATE
#
- #
+
+ def _add_to_relWorks_cache(self, *, mtype: str, ID: int) -> None:
+ """
+ Get item from RIA, add to relWorks cache and write cache to disk.
+
+ Caution: Does not include a check if relWork is already in cache.
+ """
+ print(f" getting item from online RIA {modType} {id_int}")
+ # if not, get it now and add to cache
+ q = Search(module=mType, limit=-1)
+ q.addCriterion(
+ operator="equalsField",
+ field="__id",
+ value=str(id_int),
+ )
+ q = self._optimize_relWorks_cache(query=q)
+ # q.toFile(path="sdata/debug.search.xml")
+ relWork = client.search2(query=q)
+ if relWork: # realistic that query results are empty?
+ # appending them to relWork cache
+ self.relWorks += relWork
+ # print (" update file cache")
+ self.relWorks.toFile(path=self.relWorksFn)
+
+ def _del_relWork(self, *, ID) -> None:
+ """
+ delete a relWork from self.etree.
+ ID is a lxml node
+ """
+ self._log(f" removing unpublic relWork {ID.text}")
+ relWorkSet = ID.getparent().getparent().getparent()
+ relWorkSet.getparent().remove(relWorkSet)
def _file_to_ID_cache(self, chunk_fn: Path, ID_cache: set) -> set:
- print(f" data file (may be a chunk) exists {chunk_fn}")
- self._init_relWorks_cache()
+ """
+ Given the path to a lido file, scan it for relWorks and produce a set with the
+ objIds.
+
+ ID is a lxml node
+ """
+ print(f" _file_to_ID_cache exists {chunk_fn}")
chunkET = etree.parse(str(chunk_fn))
relWorksL = chunkET.xpath(
@@ -285,7 +226,7 @@ def _file_to_ID_cache(self, chunk_fn: Path, ID_cache: set) -> set:
namespaces=NSMAP,
)
- print(f" chunk has {len(relWorksL)} relWorks")
+ print(f" _file_to_ID_cache {len(relWorksL)} relWorks")
for ID in relWorksL:
src = ID.xpath("@l:source", namespaces=NSMAP)[0]
@@ -297,12 +238,14 @@ def _file_to_ID_cache(self, chunk_fn: Path, ID_cache: set) -> set:
raise ValueError(f"ERROR: Unknown type: {src}")
# dont write more than a few thousand items in cache
- if len(ID_cache) >= relWorks_maxSize:
- break
+ # if len(ID_cache) >= relWorks_maxSize:
+ # print("break here")
+ # break
if ID.text is not None and mType == "Object":
# only add this to ID_cache if not yet in relWorks cache
if not self.relWorks.item_exists(mtype="Object", ID=int(ID.text)):
ID_cache.add(int(ID.text))
+ print(f" adding {len(ID_cache)} IDs")
return ID_cache
def _grow_relWorks_cache(self, ID_cache: set) -> None:
@@ -310,7 +253,9 @@ def _grow_relWorks_cache(self, ID_cache: set) -> None:
Make one query with all the IDs from ID_cache, execute the query and save the results
to self.relWorks, also write to disk
"""
- print(f" Length of ID_cache: {len(ID_cache)}")
+ print(
+ f" _grow_relWorks_cache: new IDs: {len(ID_cache)} relWorks:{len(self.relWorks)}"
+ )
client = MpApi(baseURL=baseURL, user=user, pw=pw)
if len(ID_cache) > 0:
q = Search(module="Object", limit=-1)
@@ -336,20 +281,24 @@ def _grow_relWorks_cache(self, ID_cache: set) -> None:
self.relWorks = newRelWorksM
else:
# if relWorks exists already, add to it
+ print(" adding")
self.relWorks += newRelWorksM
# save the cache to file after processing every chunk
# no max_size limitation
self.relWorks.toFile(path=self.relWorksFn)
def _init_relWorks_cache(self):
+ """
+ Initializes self.refWorks cache. If cache file exists, load it. May
+ also initialize empty self.refWorks.
+ """
if Path(self.relWorksFn).exists():
try:
self.relWorks
except:
# print("Inline cache not loaded yet")
- print(f" About to load existing relWorks cache {self.relWorksFn}")
+ print(f" Loading existing relWorks cache {self.relWorksFn}")
self.relWorks = Module(file=self.relWorksFn)
- return
# if we read relWorks cache from file we dont loop thru data files (chunks)
# looking for all the relWorks to fill the cache as best as we can
# else:
@@ -358,9 +307,39 @@ def _init_relWorks_cache(self):
print(f" No relWorks file to load at {self.relWorksFn}")
self.relWorks = Module()
+ def _log(self, msg):
+ print(msg)
+ logging.info(msg)
+
+ def _lookup_ISIL(self, *, institution):
+ """
+ Load vocmap.xml and lookup ISIL for name of institution.
+
+ In the beginning, we die when no ISIL found, but later we might carp more gracefully.
+ """
+ vm_fn = Path(__file__).parents[1] / "vocmap.xml"
+ if not vm_fn.exists():
+ raise SyntaxError(f"File not found {vm_fn}")
+ vocMap = etree.parse(vm_fn)
+ try:
+ ISIL = vocMap.xpath(
+ f"""/vocmap/voc[
+ @name='verwaltendeInstitution'
+ ]/concept[
+ source = '{institution}'
+ ]/target[
+ @name = 'ISIL'
+ ]"""
+ )[0]
+ except:
+ raise SyntaxError(
+ f"vocMap: verwaltendeInstitution '{institution}' not found"
+ )
+ return ISIL.text
+
def _nextChunk(self, *, fn: Path):
"""
- Returns the path/name of the next chunk if it exists or errors if the input
+ Returns the path/name of the next chunk if it exists or errors if the src
is not chunkable or the next chunk does not exist.
Expects path/name of lvl 1 lido file that ends in ".lido.xml".
@@ -397,6 +376,27 @@ def _optimize_relWorks_cache(self, *, query):
query.validate(mode="search")
return query
+ def _relWork_online(self, *, modType: str, modItemId: int):
+ """
+ Checks if a specific relWork is online. No urlrequest, just examins if
+ SMB-Freigabe = Ja.
+
+ Expects modItemId as int; but str should work as well.
+ """
+ r = self.relWorks.xpath(
+ f"""/m:application/m:modules/m:module[
+ @name = '{modType}']/m:moduleItem[
+ @id = {str(modItemId)}]/m:repeatableGroup[
+ @name = 'ObjPublicationGrp']/m:repeatableGroupItem[
+ m:vocabularyReference[@name='PublicationVoc']/m:vocabularyReferenceItem[@name='Ja']
+ and m:vocabularyReference[@name='TypeVoc']/m:vocabularyReferenceItem[@id = 2600647]
+ ]"""
+ )
+ if len(r) > 0:
+ return True
+ else:
+ return False
+
def _relWorks_cache_many(self, *, first):
"""
creates relatedWorksCache from all chunks
@@ -416,6 +416,9 @@ def _relWorks_cache_many(self, *, first):
"""
ID_cache = set() # set of relWork ids, no duplicates
chunk_fn = Path(first)
+ # if the cache is already at max_size, we dont need this step
+ if len(self.relWorks) >= relWorks_maxSize:
+ return None
while chunk_fn.exists():
ID_cache = self._file_to_ID_cache(chunk_fn, ID_cache)
try:
@@ -423,8 +426,37 @@ def _relWorks_cache_many(self, *, first):
except:
# print (" breaking the while")
break # break the while if this is the only data file or the last chunk
+ if len(ID_cache) + len(self.refWorks) >= relWorks_maxSize:
+ break
self._grow_relWorks_cache(ID_cache)
+ def _rewrite_relWork(self, *, mtype: str, objectID: Any) -> None:
+ """
+ if relWork unpublic delete; otherwise rewrite
+ """
+ id_int = int(ID.text)
+
+ if self._relWork_online(modType=modType, modItemId=id_int):
+ # rewrite ISIL, should look like this:
+ # de-MUS-018313/744501
+ # self._log(f" looking up ISIL for relWork")
+ objectID.attrib["{http://www.lido-schema.org}source"] = "ISIL/ID"
+ # we're assuming there is always a verwaltendeInstitution, but that is not enforced by RIA!
+ try:
+ verwInst = relWork.xpath(
+ """//m:moduleReference[
+ @name='ObjOwnerRef'
+ ]/m:moduleReferenceItem/m:formattedValue"""
+ )[0]
+ except:
+ self._log(f"WARNING: verwaltendeInstitution empty! {modType} {id_int}")
+ else:
+ ISIL = self._lookup_ISIL(institution=verwInst.text)
+ objectID.text = f"{ISIL}/{str(id_int)}"
+ print(f" relWork {id_int}: {verwInst.text} -> {ISIL}")
+ else:
+ self._del_relWork(objectID=objectID)
+
if __name__ == "__main__":
import argparse
@@ -432,7 +464,7 @@ def _relWorks_cache_many(self, *, first):
parser = argparse.ArgumentParser(description="Simple linkResource checker")
parser.add_argument(
"-i",
- "--input",
+ "--src",
help="point to LIDO file",
required=True,
)
@@ -441,6 +473,6 @@ def _relWorks_cache_many(self, *, first):
args = parser.parse_args()
m = LinkChecker(
- Input=args.input,
+ src=args.src,
)
- m.new_check()
+ m.linkResource_online_http()
diff --git a/zml2lido/qc.py b/zml2lido/qc.py
index 5df6575..99ee000 100644
--- a/zml2lido/qc.py
+++ b/zml2lido/qc.py
@@ -9,15 +9,18 @@
"""
import argparse
-import pathlib from Path
+from pathlib import Path
+
parser = argparse.ArgumentParser(description="Quality control for LIDO files")
- parser.add_argument(
- "-i",
- "--input",
- help="specify an input file",
- required=True,
- )
- args = parser.parse_args()
+parser.add_argument(
+ "-i",
+ "--input",
+ help="specify an input file",
+ required=True,
+)
+args = parser.parse_args()
+
class QualityControl:
def __init__(self, *, input_fn):
+ pass