Skip to content

Commit

Permalink
minor refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
beatfactor committed Aug 17, 2024
1 parent c9ba22e commit 286c597
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 110 deletions.
4 changes: 2 additions & 2 deletions oceanstream/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .core import initialize, compute_sv, combine, convert, process_raw_file
from .core import initialize, compute_sv, combine, process_raw_file

__all__ = [
"compute_sv", "combine", "convert", "process_raw_file", "initialize"
"compute_sv", "combine", "process_raw_file", "initialize"
]
5 changes: 4 additions & 1 deletion oceanstream/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,10 @@ def convert(

try:
if filePath.is_file():
from oceanstream.process import convert_raw_file
from oceanstream.convert import convert_raw_file
print(f"[blue]Converting raw file {source} to Zarr...[/blue]")
convert_raw_file(filePath, configData)
print("✅ The file has been converted successfully.")
elif filePath.is_dir():
from oceanstream.process import convert_raw_files
convert_raw_files(configData, workers_count=workers_count)
Expand Down Expand Up @@ -215,6 +216,7 @@ def compute_sv(
depth_offset: float = typer.Option(0.0, help="Depth offset for the echogram plot"),
waveform_mode: str = typer.Option("CW", help="Waveform mode, can be either CW or BB",
show_choices=["CW", "BB"]),
encode_mode: str = typer.Option("power", help="Encode mode, can be either power or complex", show_choices=["power", "complex"]),
config: str = typer.Option(None, help="Path to a configuration file"),
log_level: str = typer.Option("WARNING", help="Set the logging level",
show_choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"])
Expand Down Expand Up @@ -258,6 +260,7 @@ def compute_sv(
chunks=chunks,
plot_echogram=plot_echogram,
waveform_mode=waveform_mode,
encode_mode=encode_mode,
depth_offset=depth_offset)

status.stop()
Expand Down
54 changes: 27 additions & 27 deletions oceanstream/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from pathlib import Path
from oceanstream.settings import load_config
from oceanstream.process import convert_raw_file, convert_raw_files, compute_single_file
from oceanstream.process import convert_raw_files, compute_single_file

DEFAULT_OUTPUT_FOLDER = "output"
DEFAULT_SONAR_MODEL = "EK60"
Expand Down Expand Up @@ -80,32 +80,32 @@ def process_raw_file(source, output=None, sonar_model=None, plot_echogram=False,
sys.exit(1)


def convert(source, output=None, base_path=None, workers_count=None, config=None, log_level="WARNING", chunks=None):
logging.debug("Starting convert function")
settings = {
"output_folder": output or DEFAULT_OUTPUT_FOLDER
}

if config is not None:
settings.update(config)

file_path = Path(source)
config_data = initialize(settings, file_path, log_level=log_level)

if chunks:
config_data['chunks'] = chunks
else:
config_data['chunks'] = config_data.get('base_chunk_sizes', None)

if file_path.is_file():
logging.debug(f"Converting raw file: {file_path}")
convert_raw_file(file_path, config_data, base_path=base_path)
logging.info(f"Converted raw file {source} to Zarr and wrote output to: {config_data['output_folder']}")
elif file_path.is_dir():
logging.debug(f"Converting raw files in directory: {file_path}")
convert_raw_files(config_data, workers_count=workers_count)
else:
logging.error(f"The provided path '{source}' is not a valid file/folder.")
# def convert(source, output=None, base_path=None, workers_count=None, config=None, log_level="WARNING", chunks=None):
# logging.debug("Starting convert function")
# settings = {
# "output_folder": output or DEFAULT_OUTPUT_FOLDER
# }
#
# if config is not None:
# settings.update(config)
#
# file_path = Path(source)
# config_data = initialize(settings, file_path, log_level=log_level)
#
# if chunks:
# config_data['chunks'] = chunks
# else:
# config_data['chunks'] = config_data.get('base_chunk_sizes', None)
#
# if file_path.is_file():
# logging.debug(f"Converting raw file: {file_path}")
# convert_raw_file(file_path, config_data, base_path=base_path)
# logging.info(f"Converted raw file {source} to Zarr and wrote output to: {config_data['output_folder']}")
# elif file_path.is_dir():
# logging.debug(f"Converting raw files in directory: {file_path}")
# convert_raw_files(config_data, workers_count=workers_count)
# else:
# logging.error(f"The provided path '{source}' is not a valid file/folder.")


def combine(source, output=None, config=None, log_level="WARNING", chunks=None):
Expand Down
1 change: 0 additions & 1 deletion oceanstream/echodata/sv_computation.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ def compute_sv(echodata: EchoData, **kwargs) -> xr.Dataset:
{list(SupportedSonarModelsForSv)}."
)
# Compute Sv
print(f"Computing Sv for {sonar_model} sonar model...", kwargs)
Sv = ep.calibrate.compute_Sv(echodata, **kwargs)
# Check if the computed Sv is empty
if Sv["Sv"].values.size == 0:
Expand Down
3 changes: 1 addition & 2 deletions oceanstream/process/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
from .process import compute_sv, process_file_with_progress, read_file_with_progress
from .processed_data_io import read_processed, write_processed
from .combine_zarr import combine_zarr_files, read_zarr_files
from .file_processor import process_raw_file_with_progress, compute_Sv_to_zarr, convert_raw_file, compute_single_file, \
from .file_processor import process_raw_file_with_progress, compute_Sv_to_zarr, compute_single_file, \
compute_and_export_single_file
from .folder_processor import convert_raw_files, process_zarr_files, export_location_from_zarr_files

__all__ = [
"compute_sv",
"process_file_with_progress",
"process_raw_file_with_progress",
"convert_raw_file",
"process_zarr_files",
"export_location_from_zarr_files",
"compute_Sv_to_zarr",
Expand Down
78 changes: 12 additions & 66 deletions oceanstream/process/file_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,11 @@ def compute_and_export_single_file(config_data, **kwargs):
file_path = config_data["raw_path"]
chunks = kwargs.get("chunks")
echodata = ep.open_converted(file_path, chunks=chunks)
print(f"File {file_path} opened successfully.")

try:
file_data = []
output_path, ds_processed, echogram_files = compute_Sv_to_zarr(echodata, config_data, **kwargs)
gps_data = export_location_json(ds_processed)
# gps_data = export_location_json(ds_processed)

output_message = {
"filename": str(output_path), # Convert PosixPath to string
Expand All @@ -76,7 +75,7 @@ def compute_and_export_single_file(config_data, **kwargs):
gps_json_file_path = Path(config_data["output_folder"]) / "gps_data.json"

save_output_data(output_message, json_file_path)
append_gps_data(gps_data, gps_json_file_path, str(output_path))
# append_gps_data(gps_data, gps_json_file_path, str(output_path))

echogram_folder = Path(config_data["output_folder"]) / "echograms"
echogram_folder.mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -116,7 +115,13 @@ def compute_Sv_to_zarr(echodata, config_data, base_path=None, chunks=None, plot_
str: Path to the zarr file.
"""
waveform_mode = kwargs.get("waveform_mode", "CW")
encode_mode = waveform_mode == "CW" and "power" or "complex"
if encode_mode := kwargs.pop("encode_mode", None):
encode_mode = encode_mode.lower()
else:
encode_mode = waveform_mode == "CW" and "power" or "complex"

echodata["Vendor_specific"] = echodata["Vendor_specific"].isel(channel=slice(1))

Sv = compute_sv(echodata, encode_mode=encode_mode, **kwargs)

ds_processed = Sv
Expand Down Expand Up @@ -217,75 +222,16 @@ async def process_raw_file_with_progress(config_data, plot_echogram, waveform_mo
logging.exception(f"Error processing file {config_data['raw_path']}: {e}")


def convert_raw_file(file_path, config_data, base_path=None, progress_counter=None, counter_lock=None):
logging.debug("Starting processing of file: %s", file_path)

file_path_obj = Path(file_path)
file_config_data = {**config_data, 'raw_path': file_path_obj}

if base_path:
relative_path = file_path_obj.relative_to(base_path)
relative_path = relative_path.parent
else:
relative_path = None

echodata, encode_mode = read_file(file_config_data, use_swap=True, skip_integrity_check=True)
file_name = file_path_obj.stem + ".zarr"

if 'cloud_storage' in config_data:
if relative_path:
file_location = Path(relative_path) / file_name
else:
file_location = file_name
store = _get_chunk_store(config_data['cloud_storage'], file_location)
echodata.to_zarr(save_path=store, overwrite=True, parallel=False)

output_dest = config_data['cloud_storage']['container_name'] + "/" + file_location
else:
if relative_path:
output_path = Path(config_data["output_folder"]) / relative_path
output_path.mkdir(parents=True, exist_ok=True)
else:
output_path = Path(config_data["output_folder"])

output_dest = output_path / file_name
echodata.to_zarr(save_path=output_dest, overwrite=True, parallel=False)

with counter_lock:
progress_counter.value += 1

logging.debug("Finished processing of file: %s", file_path)

return output_dest


def write_zarr_file(zarr_path, zarr_file_name, ds_processed, config_data=None, output_path=None):
if 'cloud_storage' in config_data:
store = _get_chunk_store(config_data['cloud_storage'], Path(zarr_path) / zarr_file_name)
from cloud import get_chunk_store

store = get_chunk_store(config_data['cloud_storage'], Path(zarr_path) / zarr_file_name)
else:
store = os.path.join(output_path, zarr_file_name)

ds_processed.to_zarr(store, mode='w')


def _get_chunk_store(storage_config, path):
from oceanstream.process.azure.blob_storage import get_azfs
azfs = get_azfs(storage_config)

container_name = storage_config['container_name']

if not azfs.exists(container_name):
try:
azfs.mkdir(container_name)
except Exception as e:
logging.error(f"Error creating container {container_name}: {e}")
raise

if azfs:
return azfs.get_mapper(f"{container_name}/{path}")

raise ValueError(f"Unsupported storage type: {storage_config['storage_type']}")


def _get_chunk_sizes(var_dims, chunk_sizes):
return {dim: chunk_sizes[dim] for dim in var_dims if dim in chunk_sizes}
24 changes: 13 additions & 11 deletions oceanstream/process/folder_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from .combine_zarr import read_zarr_files
from .process import compute_sv
from .processed_data_io import write_processed
from .file_processor import convert_raw_file, compute_single_file, compute_and_export_single_file, \
from .file_processor import compute_single_file, compute_and_export_single_file, \
export_location_from_Sv_dataset

# logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
Expand Down Expand Up @@ -130,15 +130,6 @@ def print_call_stack():
print(f"File: {frame.filename}, Line: {frame.lineno}, Function: {frame.function}")


def signal_handler(sig, frame):
global pool
print('Terminating processes...')
if pool:
pool.terminate()
pool.join()
sys.exit(0)


def find_raw_files(base_dir):
raw_files = []
for root, dirs, files in os.walk(base_dir):
Expand All @@ -159,6 +150,8 @@ def update_convert_raw(pgr_queue, pb):
def convert_raw_files(config_data, workers_count=os.cpu_count()):
global pool

from oceanstream.convert import convert_raw_file

try:
dir_path = config_data['raw_path']
log_level = config_data.get('log_level', logging.ERROR)
Expand Down Expand Up @@ -390,4 +383,13 @@ def from_filename(file_name):
return None


signal.signal(signal.SIGINT, signal_handler)
# def signal_handler(sig, frame):
# global pool
# print('Terminating processes...')
# if pool:
# pool.terminate()
# pool.join()
# sys.exit(0)
#
#
# signal.signal(signal.SIGINT, signal_handler)
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ plot =
cmocean
denoise =
process =
convert =
exports =
complete =
%(cli)s
Expand Down

0 comments on commit 286c597

Please sign in to comment.