Skip to content

Commit

Permalink
Merge pull request #293 from 4dn-dcic/smaht-ingestion-20231102
Browse files Browse the repository at this point in the history
SMaHT ingestion related work.
  • Loading branch information
dmichaels-harvard authored Nov 30, 2023
2 parents 1de66d4 + 35296b7 commit 9474417
Show file tree
Hide file tree
Showing 19 changed files with 806 additions and 126 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ dcicutils
Change Log
----------

8.4.0
=====
* More work related to SMaHT ingestion (bundle/sheet_utils, data_readers, etc).


8.3.0
=========
=====

* Updates for RAS to Redis API

Expand Down
154 changes: 123 additions & 31 deletions dcicutils/bundle_utils.py

Large diffs are not rendered by default.

147 changes: 147 additions & 0 deletions dcicutils/data_readers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import abc
import csv
import openpyxl
from typing import Any, Generator, Iterator, List, Optional, Type, Tuple, Union
from dcicutils.misc_utils import right_trim


class RowReader(abc.ABC):

def __init__(self):
self.header = None
self.location = 0
self._warning_empty_headers = False
self._warning_extra_values = [] # Line numbers.
self.open()

def __iter__(self) -> Iterator:
for row in self.rows:
self.location += 1
if self.is_comment_row(row):
continue
if self.is_terminating_row(row):
break
if len(self.header) < len(row): # Row values beyond what there are headers for are ignored.
self._warning_extra_values.append(self.location)
yield {column: self.cell_value(value) for column, value in zip(self.header, row)}

def _define_header(self, header: List[Optional[Any]]) -> None:
self.header = []
for index, column in enumerate(header or []):
if not (column := str(column).strip() if column is not None else ""):
self._warning_empty_headers = True
break # Empty header column signals end of header.
self.header.append(column)

def rows(self) -> Generator[Union[List[Optional[Any]], Tuple[Optional[Any], ...]], None, None]:
yield

def is_comment_row(self, row: Union[List[Optional[Any]], Tuple[Optional[Any]]]) -> bool:
return False

def is_terminating_row(self, row: Union[List[Optional[Any]], Tuple[Optional[Any]]]) -> bool:
return False

def cell_value(self, value: Optional[Any]) -> Optional[Any]:
return str(value).strip() if value is not None else ""

def open(self) -> None:
pass

@property
def issues(self) -> Optional[List[str]]:
issues = []
if self._warning_empty_headers:
issues.append("Empty header column encountered; ignoring it and all subsequent columns.")
if self._warning_extra_values:
issues.extend([f"Extra column values on row [{row_number}]" for row_number in self._warning_extra_values])
return issues if issues else None


class ListReader(RowReader):

def __init__(self, rows: List[List[Optional[Any]]]) -> None:
self._rows = rows
super().__init__()

@property
def rows(self) -> Generator[List[Optional[Any]], None, None]:
for row in self._rows[1:]:
yield row

def open(self) -> None:
if not self.header:
self._define_header(self._rows[0] if self._rows else [])


class CsvReader(RowReader):

def __init__(self, file: str) -> None:
self._file = file
self._file_handle = None
self._rows = None
super().__init__()

@property
def rows(self) -> Generator[List[Optional[Any]], None, None]:
for row in self._rows:
yield right_trim(row)

def open(self) -> None:
if self._file_handle is None:
self._file_handle = open(self._file)
self._rows = csv.reader(self._file_handle, delimiter="\t" if self._file.endswith(".tsv") else ",")
self._define_header(right_trim(next(self._rows, [])))

def __del__(self) -> None:
if (file_handle := self._file_handle) is not None:
self._file_handle = None
file_handle.close()


class ExcelSheetReader(RowReader):

def __init__(self, sheet_name: str, workbook: openpyxl.workbook.workbook.Workbook) -> None:
self.sheet_name = sheet_name or "Sheet1"
self._workbook = workbook
self._rows = None
super().__init__()

@property
def rows(self) -> Generator[Tuple[Optional[Any], ...], None, None]:
for row in self._rows(min_row=2, values_only=True):
yield right_trim(row)

def is_terminating_row(self, row: Tuple[Optional[Any]]) -> bool:
return all(cell is None for cell in row) # Empty row signals end of data.

def open(self) -> None:
if not self._rows:
self._rows = self._workbook[self.sheet_name].iter_rows
self._define_header(right_trim(next(self._rows(min_row=1, max_row=1, values_only=True), [])))


class Excel:

def __init__(self, file: str, reader_class: Optional[Type] = None) -> None:
self._file = file
self._workbook = None
self.sheet_names = None
if isinstance(reader_class, Type) and issubclass(reader_class, ExcelSheetReader):
self._reader_class = reader_class
else:
self._reader_class = ExcelSheetReader
self.open()

def sheet_reader(self, sheet_name: str) -> ExcelSheetReader:
return self._reader_class(sheet_name=sheet_name, workbook=self._workbook)

def open(self) -> None:
if self._workbook is None:
self._workbook = openpyxl.load_workbook(self._file, data_only=True)
self.sheet_names = self._workbook.sheetnames or []

def __del__(self) -> None:
if (workbook := self._workbook) is not None:
self._workbook = None
workbook.close()
125 changes: 124 additions & 1 deletion dcicutils/misc_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from collections import defaultdict
from datetime import datetime as datetime_type
from dateutil.parser import parse as dateutil_parse
from typing import Optional
from typing import Any, Callable, List, Optional, Tuple, Union


# Is this the right place for this? I feel like this should be done in an application, not a library.
Expand Down Expand Up @@ -978,6 +978,43 @@ def str_to_bool(x: Optional[str]) -> Optional[bool]:
raise ValueError(f"An argument to str_or_bool must be a string or None: {x!r}")


def to_integer(value: str, fallback: Optional[Any] = None) -> Optional[Any]:
try:
return int(value)
except Exception:
return fallback


def to_float(value: str, fallback: Optional[Any] = None) -> Optional[Any]:
try:
return float(value)
except Exception:
return fallback


def to_boolean(value: str, fallback: Optional[Any]) -> Optional[Any]:
if isinstance(value, str) and (value := value.strip().lower()):
if (lower_value := value.lower()) in ["true", "t"]:
return True
elif lower_value in ["false", "f"]:
return False
return fallback


def to_enum(value: str, enumerators: List[str]) -> Optional[str]:
matches = []
if isinstance(value, str) and (value := value.strip()) and isinstance(enumerators, List):
enum_specifiers = {str(enum).lower(): enum for enum in enumerators}
if (enum_value := enum_specifiers.get(lower_value := value.lower())) is not None:
return enum_value
for enum_canonical, _ in enum_specifiers.items():
if enum_canonical.startswith(lower_value):
matches.append(enum_canonical)
if len(matches) == 1:
return enum_specifiers[matches[0]]
return enum_specifiers[matches[0]] if len(matches) == 1 else value


@contextlib.contextmanager
def override_environ(**overrides):
"""
Expand Down Expand Up @@ -1107,6 +1144,18 @@ def remove_suffix(suffix: str, text: str, required: bool = False):
return text[:len(text)-len(suffix)]


def remove_empty_properties(data: Optional[Union[list, dict]]) -> None:
if isinstance(data, dict):
for key in list(data.keys()):
if (value := data[key]) in [None, "", {}, []]:
del data[key]
else:
remove_empty_properties(value)
elif isinstance(data, list):
for item in data:
remove_empty_properties(item)


class ObsoleteError(Exception):
pass

Expand Down Expand Up @@ -1320,6 +1369,18 @@ def json_file_contents(filename):
return json.load(fp)


def load_json_if(s: str, is_array: bool = False, is_object: bool = False,
fallback: Optional[Any] = None) -> Optional[Any]:
if (isinstance(s, str) and
((is_object and s.startswith("{") and s.endswith("}")) or
(is_array and s.startswith("[") and s.endswith("]")))):
try:
return json.loads(s)
except Exception:
pass
return fallback


def camel_case_to_snake_case(s, separator='_'):
"""
Converts CamelCase to snake_case.
Expand Down Expand Up @@ -1404,6 +1465,42 @@ def string_list(s):
return [p for p in [part.strip() for part in s.split(",")] if p]


def split_string(value: str, delimiter: str, escape: Optional[str] = None) -> List[str]:
"""
Splits the given string into an array of string based on the given delimiter, and an optional escape character.
"""
if not isinstance(value, str) or not (value := value.strip()):
return []
if not isinstance(escape, str) or not escape:
return [item.strip() for item in value.split(delimiter)]
result = []
item = r""
escaped = False
for c in value:
if c == delimiter and not escaped:
result.append(item.strip())
item = r""
elif c == escape and not escaped:
escaped = True
else:
item += c
escaped = False
result.append(item.strip())
return [item for item in result if item]


def right_trim(list_or_tuple: Union[List[Any], Tuple[Any]],
remove: Optional[Callable] = None) -> Union[List[Any], Tuple[Any]]:
"""
Removes training None (or emptry string) values from the give tuple or list arnd returns;
does NOT change the given value.
"""
i = len(list_or_tuple) - 1
while i >= 0 and ((remove and remove(list_or_tuple[i])) or (not remove and list_or_tuple[i] in (None, ""))):
i -= 1
return list_or_tuple[:i + 1]


def is_c4_arn(arn: str) -> bool:
"""
Returns True iff the given (presumed) AWS ARN string value looks like it
Expand Down Expand Up @@ -2050,6 +2147,32 @@ def merge_key_value_dict_lists(x, y):
return [key_value_dict(k, v) for k, v in merged.items()]


def merge_objects(target: Union[dict, List[Any]], source: Union[dict, List[Any]], full: bool = False) -> dict:
"""
Merges the given source dictionary or list into the target dictionary or list.
This MAY well change the given target (dictionary or list) IN PLACE.
The the full argument is True then any target lists longer than the
source be will be filled out with the last element(s) of the source.
"""
if target is None:
return source
if isinstance(target, dict) and isinstance(source, dict) and source:
for key, value in source.items():
target[key] = merge_objects(target[key], value, full) if key in target else value
elif isinstance(target, list) and isinstance(source, list) and source:
for i in range(max(len(source), len(target))):
if i < len(target):
if i < len(source):
target[i] = merge_objects(target[i], source[i], full)
elif full:
target[i] = merge_objects(target[i], source[len(source) - 1], full)
else:
target.append(source[i])
elif source:
target = source
return target


# Stealing topological sort classes below from python's graphlib module introduced
# in v3.9 with minor refactoring.
# Source: https://github.com/python/cpython/blob/3.11/Lib/graphlib.py
Expand Down
2 changes: 1 addition & 1 deletion dcicutils/qa_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3540,7 +3540,7 @@ def recurse(json1, json2, path=""):
if not result:
# out(f"Recursive failure at {path!r} in list comparison")
pass
elif type(json1) == type(json2):
elif type(json1) is type(json2):
result = json1 == json2
if not result:
out(f"Failed at {path!r} due to value mismatch: {json.dumps(json1)} != {json.dumps(json2)}")
Expand Down
2 changes: 1 addition & 1 deletion dcicutils/s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def get_key(self, keyfile_name: str = 'access_key_admin'
SSECustomerKey=os.environ['S3_ENCRYPT_KEY'],
SSECustomerAlgorithm='AES256')
body_data: Union[bytes, str] = response['Body'].read()
auth_text: str = body_data.decode() if type(body_data) == bytes else body_data
auth_text: str = body_data.decode() if type(body_data) is bytes else body_data
try:
return json.loads(auth_text)
except (ValueError, TypeError):
Expand Down
Loading

0 comments on commit 9474417

Please sign in to comment.