-
Notifications
You must be signed in to change notification settings - Fork 3
/
manuscript.py
350 lines (286 loc) · 15.5 KB
/
manuscript.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
from typing import List, Tuple, Dict
from lxml import etree as et
from pandas import DataFrame
import os
import sys
import shutil # For recursively removing directories.
from copy import deepcopy
from collections import OrderedDict
import utils
import entry
def ignore_data_path(filepath: str) -> str:
"""Remove the manuscript data path portion from a filepath."""
return filepath.partition(os.path.commonpath([os.path.abspath(filepath), utils.manuscript_data_path]))[2]
def extract_folio(filepath: str) -> str:
"""Get the folio out of a filepath which points to a folio XML file.
E.g. .../tl_p162v_preTEI.xml -> 162v
"""
return os.path.basename(filepath).split("_")[1][1:]
def filename_from_folio(folio: str, version: str, extension: str = 'xml') -> str:
"""Returns the filename associated with a folio of a particular version.
E.g. 162v, tl, xml -> tl_p162v_preTEI.xml
"""
return f'{version}_p{folio.zfill(4)}_preTEI.{extension}'
def clean_folio(folio: str) -> str:
return folio.lstrip("0")
def clean_id(identity: str) -> str:
return identity.lstrip("p").lstrip("0").replace("_","")
def display_id(identity: str) -> str:
# inverse of clean_id()
return "p" + identity[:-1].zfill(4) + "_" + identity[-1]
def separate_by_id(filepath: str) -> Dict[str, et.Element]:
"""Take a file path, read it as XML, and process it into separate elements by ID.
Returned object is a dictionary of lxml.etree.Element objects keyed by entry ID as a string.
Divs without IDs will lumped together into one object keyed by an empty string.
"""
entries = OrderedDict()
print(f"Separating divs in file: {ignore_data_path(filepath)}...")
xml = et.parse(filepath)
divs = xml.findall("div") # not recursive, which is okay since there should be no nested divs
for div in divs:
key = div.get("id") or ""
if key in entries.keys():
entries[key].append(div) # add continued entry in-place
else:
root = et.Element("entry") # start a new entry with an <entry></entry> element
root.append(div) # put the current div in the new tree
entries[key] = root
print(f"Found {len(entries)} div{'' if len(entries)==1 else 's'} in file {ignore_data_path(filepath)} with ID{'' if len(entries)==1 else 's'}: {', '.join(entries.keys())}.")
return entries
def generate_entries(directory) -> List[entry.Entry]:
"""Given the path to a directory of XML files, generate a list of Entry objects.
Entry objects are generated by processing the files into their constituent divs and connecting divs with the same ID together, even across files.
Divs without IDs are ignored and not included in the returned list.
The folio of each entry is considered to be the folio of the first div in the entry.
"""
print(f"Generating entries from files in folder {directory}...")
# First, get the XML etree of each entry, keyed by ID.
xml_dict: Dict[str, et.Element] = OrderedDict()
folios_by_id = {} # Keep track of which folio is associated with each ID.
for root, _, files in os.walk(directory):
for filename in files:
folio = extract_folio(filename)
entries: Dict[str, et.Element] = separate_by_id(os.path.join(root, filename)) # Process the individual file into a dictionary.
# Merge individual file's XML etrees with the greater dict of XML etrees.
# If that ID is already a key in the dict, append all the divs from this file with that ID to the existing XML etree.
# Otherwise, create a new key-value pair for that ID.
for identity, xml in entries.items():
if identity in xml_dict.keys():
for div in xml.findall("div"): # Extract divs from xml.
xml_dict[identity].append(div) # Append each div.
elif identity: # Only add it to the dict if it has an ID.
xml_dict[identity] = xml
folios_by_id[identity] = folio
# With the entire directory parsed into XML etrees by div, convert each XML etree into an Entry object.
entries: List[entry.Entry] = []
for identity, xml in xml_dict.items():
folio = folios_by_id[identity]
print(f"Generating entry with folio {folio}, ID {identity}...")
entries.append(entry.Entry(xml, folio=clean_folio(folio), identity=clean_id(identity)))
print(f"Generated {len(entries)} entr{'y' if len(entries)==1 else 'ies'}.")
return list(sorted(entries, key=lambda e: e.identity))
def generate_folios(directory) -> List[entry.Entry]:
"""Given the path to a directory of XML files, generate a list of Entry objects by loading each file as its own entry.
"""
folios = []
for root, _, files in os.walk(directory):
for filename in files:
print(f"Generating folio from file {ignore_data_path(os.path.join(root, filename))}...")
folios.append(entry.Entry.from_file(os.path.join(root, filename), folio=clean_folio(extract_folio(filename))))
return list(sorted(folios, key=lambda e: e.folio))
class Manuscript():
def __init__(self, entries={}, folios={}):
"""Contain dictionaries representing the manuscript's entries and folios, keyed by version, with the following schema:
{
version1 :
{ID1 : entry.Entry},
{ID2 : entry.Entry},
...,
version2 :
{ID1 : entry.Entry},
{ID2 : entry.Entry},
...,
...,
}
"""
self.entries = {}
self.folios = {}
self.versions = []
for version, list_of_entries in entries.items():
self.add_entries(version, list_of_entries)
for version, list_of_folios in folios.items():
self.add_folios(version, list_of_folios)
# TODO: write a search method
def get_entry(self, version, identity):
es = self.entries.get(version)
return es and es.get(clean_id(identity)) # short-circuit if es is None
def get_folio(self, version, folio):
fs = self.folios.get(version)
return fs and fs.get(clean_folio(folio)) # short-circuit if fs is None
def add_entry(self, version, entry):
if version not in self.versions:
self.versions.append(version)
if version not in self.entries.keys():
self.entries[version] = OrderedDict()
self.entries[version][clean_id(entry.identity)] = entry
def add_entries(self, version, list_of_entries):
for entry in list_of_entries:
self.add_entry(version, entry)
def add_folio(self, version, folio):
if version not in self.versions:
self.versions.append(version)
if version not in self.folios.keys():
self.folios[version] = OrderedDict()
self.folios[version][clean_folio(folio.folio)] = folio
def add_folios(self, version, list_of_folios):
for folio in list_of_folios:
self.add_folio(version, folio)
def add_dir(self, directory):
"""Add another version of the manuscript by providing a path to a folder containing XML files to be parsed as entries and folios."""
self.add_entries(os.path.basename(directory), generate_entries(directory))
self.add_folios(os.path.basename(directory), generate_folios(directory))
def add_dirs(self, *directories):
for directory in directories:
self.add_dir(directory)
@classmethod
def from_dir(cls, directory):
"""Given a path to a folder with XML files for various manuscript versions, generate the manuscript using those entries and folios as inputs.
"""
return from_dirs(cls, [directory])
@classmethod
def from_dirs(cls, *directories):
"""Given any number of paths to folders with XML files for various manuscript versions, generate the manuscript using those entries and folios as inputs.
"""
print(f"Generating Manuscript object for versions {','.join([os.path.basename(directory) for directory in directories])}...")
entries = {}
folios = {}
for directory in directories:
version = os.path.basename(directory)
list_of_entries = generate_entries(directory)
list_of_folios = generate_folios(directory)
entries[version] = list_of_entries
folios[version] = list_of_folios
return cls(entries, folios)
def update(self, dry_run=False):
self.update_metadata(dry_run=dry_run)
self.update_ms_txt(dry_run=dry_run)
self.update_entries(dry_run=dry_run)
self.update_all_folios(dry_run=dry_run)
def update_ms_txt(self, outdir=utils.ms_txt_path, dry_run=False):
"""Update with the current manuscript from /ms-xml/.
Iterate through /ms-xml/ for each version, remove tags, and save to /ms-txt/.
"""
if not dry_run:
for version in utils.versions:
shutil.rmtree(os.path.join(outdir, version))
for version, folios_dict in self.folios.items():
for folio_name, folio in folios_dict.items():
outpath = os.path.join(outdir, version, filename_from_folio(folio_name, version, "txt"))
if not dry_run:
os.makedirs(os.path.dirname(outpath), exist_ok=True)
if dry_run:
outpath = os.devnull
with open(outpath, 'w') as fp:
print(f"Writing folio {version}_{folio_name} to {ignore_data_path(outpath)}...")
fp.write(folio.text)
def update_entries(self, outdir=utils.entries_path, dry_run=False):
"""Update /m-k-manuscript-data/entries/ with the current manuscript from /ms-xml/."""
txt_dir = os.path.join(outdir, "txt")
xml_dir = os.path.join(outdir, "xml")
if not dry_run:
shutil.rmtree(txt_dir)
shutil.rmtree(xml_dir)
for version, entries in self.entries.items():
txt_path = os.path.join(txt_dir, version)
xml_path = os.path.join(xml_dir, version)
if not dry_run:
os.makedirs(txt_path, exist_ok=True)
os.makedirs(xml_path, exist_ok=True)
for identity, entry in entries.items():
# need to leftpad this
filepath_txt = os.path.join(txt_path, f'{version}_{display_id(entry.identity)}.txt')
filepath_xml = os.path.join(xml_path, f'{version}_{display_id(entry.identity)}.xml')
content_txt = entry.text
content_xml = entry.xml_string # should already have an <entry> root tag :)
if dry_run:
filepath_txt = os.devnull
with open(filepath_txt, 'w', encoding='utf-8') as fp:
print(f"Writing entry {entry.identity} {version} txt to {ignore_data_path(filepath_txt)}...")
fp.write(content_txt)
if dry_run:
filepath_xml = os.devnull
with open(filepath_xml, 'w', encoding='utf-8') as fp:
print(f"Writing entry {entry.identity} {version} xml to {ignore_data_path(filepath_xml)}...")
fp.write(content_xml)
def update_all_folios(self, outdir=utils.all_folios_path, dry_run=False):
"""Update /m-k-manuscript-data/allFolios/ with the current manuscript from /ms-xml/."""
txt_dir = os.path.join(outdir, "txt")
xml_dir = os.path.join(outdir, "xml")
if not dry_run:
shutil.rmtree(txt_dir)
shutil.rmtree(xml_dir)
for version in self.versions:
content_txt = self.generate_all_folios(method="txt", version=version)
content_xml = self.generate_all_folios(method="xml", version=version)
txt_path = os.path.join(txt_dir, version)
xml_path = os.path.join(xml_dir, version)
if not dry_run:
os.makedirs(txt_path, exist_ok=True)
os.makedirs(xml_path, exist_ok=True)
filepath_txt = os.path.join(txt_path, f"all_{version}.txt")
filepath_xml = os.path.join(xml_path, f"all_{version}.xml")
if dry_run:
filepath_txt = os.devnull
with open(filepath_txt, 'w', encoding='utf-8') as fp:
print(f"Writing allFolios {version} txt to {ignore_data_path(filepath_txt)}...")
fp.write(content_txt)
if dry_run:
filepath_xml = os.devnull
with open(filepath_xml, 'w', encoding='utf-8') as fp:
print(f"Writing allFolios {version} xml to {ignore_data_path(filepath_xml)}...")
fp.write(content_xml)
def generate_all_folios(self, method="txt", version="tl"):
"""Generate a single txt or xml file containing the content of each file (i.e. folio) of a given version in sequence.
`method` may be "txt" or "xml".
"""
if method=="txt":
content = "" # string representing the entire text version
for folio_name, folio in sorted(self.folios[version].items(), key=lambda i: i[0].zfill(4)):
print(f"Adding folio {folio_name} to allFolios {version} {method}...")
content += folio.text + "\n\n"
elif method=="xml":
root = et.Element("all") # Create a root element to wrap the entire XML.
for folio_name, folio in sorted(self.folios[version].items(), key=lambda i: i[0].zfill(4)):
print(f"Adding folio {folio_name} to allFolios {version} {method}...")
list_of_divs = folio.xml.findall("div")
divs = [deepcopy(div) for div in list_of_divs] # Lxml modifies in-place when you move divs, so deepcopy makes a copy of the folio so we don't lose data in self.folios.
root.extend(divs) # Add children of <entry> element.
content = entry.to_xml_string(root)
else:
raise Exception(f"Invalid method: '{method}'. Methods: txt, xml")
return content
def update_metadata(self, outdir=utils.metadata_path, outfile="entry_metadata.csv", dry_run=False):
"""Write a metadata file containing information about each entry."""
df = self.generate_metadata()
df.drop(columns=self.versions, inplace=True) # this is just memory addresses
outpath = os.path.join(outdir, outfile)
if dry_run:
outpath = os.devnull
print(f"Writing metadata to {ignore_data_path(outpath)}...")
df.to_csv(outpath, index=False)
def generate_metadata(self):
"""Create a Pandas DataFrame indexed by entry containing metadata about the manuscript."""
print("Generating metadata...")
if ("tl" not in self.versions):
raise Exception(f"Metadata not available: TL version not loaded.")
df = DataFrame(self.entries)
df['folio'] = df.tl.apply(lambda x: x.folio.zfill(4)) # Add back leading zeros.
df['folio_display'] = df.tl.apply(lambda x: x.folio)
df['div_id'] = df.tl.apply(lambda x: "p" + x.identity[:-1].zfill(4) + "_" + x.identity[-1]) # Use the standard ID formatting.
df['categories'] = df.tl.apply(lambda x: (';'.join(x.categories)))
for version in self.versions:
df[f'heading_{version}'] = df[version].apply(lambda x: x.title)
for prop, tag in utils.prop_dict.items():
for version in self.versions:
df[f'{tag}_{version}'] = df[version].apply(lambda x: ';'.join(x.properties[prop]))
return df