Skip to content

Commit

Permalink
Merge pull request #1259 from poplarShift/parquet-export
Browse files Browse the repository at this point in the history
IO module for parquet
  • Loading branch information
knutfrode authored Mar 20, 2024
2 parents 11da1f3 + 14297c7 commit fd43319
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 1 deletion.
1 change: 0 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,3 @@ dependencies:
- pip
- pip:
- motuclient
- pytest-sphinx
67 changes: 67 additions & 0 deletions opendrift/export/io_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import sys
import os
from datetime import datetime, timedelta
import logging

logging.captureWarnings(True)
logger = logging.getLogger(__name__)
import string
from shutil import move

import numpy as np
import pandas as pd
from opendrift.models.basemodel import Mode

def init(self, filename):

self.outfile = filename
dummy_data = {
k: pd.Series([], dtype=t) for k, (t, _) in self.history.dtype.fields.items()
}
dummy_data["time"] = pd.Series([], dtype="datetime64[ns]")
df = pd.DataFrame(dummy_data)
df.to_parquet(self.outfile, engine="fastparquet")


def write_buffer(self):
num_steps_to_export = self.steps_output - self.steps_exported

data = {
k: self.history[k][:, 0:num_steps_to_export][
~self.history[k].mask[:, 0:num_steps_to_export]
] # automatically flattens array
for k in self.history.dtype.fields
}

times = [
self.start_time + n * self.time_step_output
for n in range(self.steps_exported, self.steps_output)
]

_arr_template = self.history["ID"][:, 0:num_steps_to_export]
time_arr = np.repeat([times], _arr_template.shape[0], axis=0)
data["time"] = time_arr[~_arr_template.mask] # automatically flattens array

df = pd.DataFrame(data)
df.to_parquet(self.outfile, engine="fastparquet", append=True)

logger.info("Wrote %s steps to file %s" % (num_steps_to_export, self.outfile))
self.history.mask = True # Reset history array, for new data
self.steps_exported = self.steps_exported + num_steps_to_export


def close(self):
logger.warning("`.close` not strictly needed...?")

def import_file(self, filename, times=None, elements=None, load_history=True):
"""Create OpenDrift object from imported file.
This reimport is potentially very costly anyway
"""
logger.info("Skipping reimport")
return self

def import_file_xarray(self, filename, times=None, elements=None, load_history=True):
"""Create OpenDrift object from file
Odd if this I/O backend specific feature were required for all of opendrift to run
"""
raise NotImplementedError("wontfix")
56 changes: 56 additions & 0 deletions tests/models/test_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# This file is part of OpenDrift.
#
# OpenDrift is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 2
#
# OpenDrift is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OpenDrift. If not, see <https://www.gnu.org/licenses/>.
#
# Copyright 2015, Knut-Frode Dagestad, MET Norway

import os
import pytest
from datetime import datetime, timedelta

from opendrift.readers import reader_netCDF_CF_generic
from opendrift.models.oceandrift import OceanDrift
try:
import fastparquet
has_fastparquet = True
except:
has_fastparquet = False

need_fastparquet = pytest.mark.skipif(has_fastparquet == False,
reason = 'fastparquet must be installed to use fastparquet writer')


@need_fastparquet
def test_io_parquet(tmpdir):
outfile = tmpdir + "test_io_parquet.nc"
o = OceanDrift(
loglevel=30,
iomodule="parquet",
)
norkyst = reader_netCDF_CF_generic.Reader(
o.test_data_folder()
+ "16Nov2015_NorKyst_z_surface/norkyst800_subset_16Nov2015.nc"
)
o.add_reader(norkyst)
o.seed_elements(4.96, 60.1, radius=10, number=10, time=norkyst.start_time)
o.run(
steps=10,
time_step=timedelta(minutes=30),
time_step_output=timedelta(minutes=30),
outfile=outfile,
export_buffer_length=2,
)
os.remove(outfile)

0 comments on commit fd43319

Please sign in to comment.