Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update parse.py #1

Merged
merged 3 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 50 additions & 23 deletions cpm/parse.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,83 @@
from typing import TextIO, Union
from cpm.exceptions import *
from cpm.models import DSM


def parse_csv(filepath: str, delimiter: str = 'auto', encoding: str = 'utf-8', instigator: str = 'column'):
def parse_csv(file: Union[str, TextIO], delimiter: str = 'auto', encoding: str = 'utf-8', instigator: str = 'column'):
"""
Parse CSV to DSM
:param filepath: Targeted CSV file
:param file: Targeted CSV file or file-like object
:param delimiter: CSV delimiter. Defaults to auto-detection.
:param encoding: text-encoding. Defaults to utf-8
:param instigator: Determines directionality of DSM. Defaults to columns instigating rows.
:return: DSM
"""


content = _read_file(file, encoding)

if delimiter == 'auto':
with open(filepath, 'r', encoding=encoding) as file:
delimiter = detect_delimiter(file.read())
delimiter = detect_delimiter(content)

# Identify number of rows, and separate header row
num_cols = 0
column_names = []
with open(filepath, 'r') as file:
for line in file:
column_names.append(line.split(delimiter)[0])
num_cols += 1
lines = _get_file_lines(file, encoding)
for line in lines:
column_names.append(line.split(delimiter)[0])
num_cols += 1

# We do not want the first column in the header
column_names.pop(0)

data = []

with open(filepath, 'r') as file:
for i, line in enumerate(file):
if i == 0:
for i, line in enumerate(lines):
if i == 0:
continue
data.append([])
for j, col in enumerate(line.split(delimiter)):
if j == 0:
continue
data.append([])
for j, col in enumerate(line.split(delimiter)):
if j == 0:
continue
if col == "":
if col == "":
data[i-1].append(None)
else:
try:
data[i-1].append(float(col))
except ValueError:
data[i-1].append(None)
else:
try:
data[i-1].append(float(col))
except ValueError:
data[i - 1].append(None)

dsm = DSM(matrix=data, columns=column_names, instigator=instigator)

return dsm


def _read_file(file, encoding):
if isinstance(file, str):
with open(file, 'r', encoding=encoding) as f:
return f.read()
elif hasattr(file, 'read'):
position = file.tell()
content = file.read()
file.seek(position)
return content
else:
raise ValueError("Invalid file input. Must be a filepath or a file-like object.")


def _get_file_lines(file, encoding):
if isinstance(file, str):
with open(file, 'r', encoding=encoding) as f:
return f.readlines()
elif hasattr(file, 'read'):
position = file.tell()
file.seek(0)
lines = file.readlines()
file.seek(position)
return lines
else:
raise ValueError("Invalid file input. Must be a filepath or a file-like object.")


def detect_delimiter(text, look_ahead=1000):
"""
Attempts to determine CSV delmiter based on a certain amount of sample characters
Expand Down Expand Up @@ -85,4 +113,3 @@ def detect_delimiter(text, look_ahead=1000):
raise AutoDelimiterError('None of the default delimiters matched the file. Is the file empty?')

return best_delimiter

9 changes: 8 additions & 1 deletion tests/test_parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import pytest
from cpm.parse import parse_csv


Expand Down Expand Up @@ -58,3 +57,11 @@ def test_parse_dsm_network_instigator_row():
assert len(a_neighbours) == 1
assert a_neighbours[0] == 3


def test_parse_file_object():
path = './tests/test-assets/dsm-network-test.csv'
with open(path) as file:
dsm = parse_csv(file)

for col in ['A', 'B', 'C', 'D']:
assert col in dsm.columns
Loading