Skip to content

Commit

Permalink
Add support for GCS paths. (#223)
Browse files Browse the repository at this point in the history
  • Loading branch information
keyurva authored Oct 7, 2023
1 parent 9341816 commit 85ed6f2
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 12 deletions.
1 change: 1 addition & 0 deletions simple/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
absl-py==1.4.0
certifi==2023.7.22
charset-normalizer==3.2.0
google-cloud-storage==2.11.0
idna==3.4
importlib-metadata==6.8.0
numpy==1.25.2
Expand Down
26 changes: 14 additions & 12 deletions simple/stats/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
sys.path.insert(1, os.path.join(_CODEDIR, "../"))

from util import dc_client as dc
from util.filehandler import create_file_handler


# TODO: Add support for units.
Expand All @@ -35,12 +36,12 @@ def __init__(
entity_type: str,
ignore_columns: list[str] = list(),
) -> None:
self.input_path = input_path
self.output_dir = output_dir
self.observations_file = os.path.join(output_dir,
constants.OBSERVATIONS_FILE_NAME)
self.debug_resolve_file = os.path.join(
output_dir, constants.DEBUG_RESOLVE_FILE_NAME)
self.input_fh = create_file_handler(input_path)
self.output_dir_fh = create_file_handler(output_dir)
self.observations_fh = self.output_dir_fh.make_file(
constants.OBSERVATIONS_FILE_NAME)
self.debug_resolve_fh = self.output_dir_fh.make_file(
constants.DEBUG_RESOLVE_FILE_NAME)
self.entity_type = entity_type
self.ignore_columns = ignore_columns
self.df = pd.DataFrame()
Expand All @@ -56,10 +57,10 @@ def do_import(self) -> None:
self._write_csvs()

def _init(self):
os.makedirs(self.output_dir, exist_ok=True)
self.output_dir_fh.make_dirs()

def _read_csv(self) -> None:
self.df = pd.read_csv(self.input_path, dtype="str")
self.df = pd.read_csv(self.input_fh.read_string_io(), dtype="str")
logging.info("Read %s rows.", self.df.index.size)

def _drop_ignored_columns(self):
Expand Down Expand Up @@ -169,9 +170,10 @@ def _create_debug_resolve_dataframe(

def _write_csvs(self) -> None:
logging.info("Writing %s observations to: %s", self.df.index.size,
self.observations_file)
self.df.to_csv(self.observations_file, index=False)
self.observations_fh)
self.observations_fh.write_string(self.df.to_csv(index=False))
if self.debug_resolve_df is not None:
logging.info("Writing resolutions (for debugging) to: %s",
self.debug_resolve_file)
self.debug_resolve_df.to_csv(self.debug_resolve_file, index=False)
self.debug_resolve_fh)
self.debug_resolve_fh.write_string(
self.debug_resolve_df.to_csv(index=False))
99 changes: 99 additions & 0 deletions simple/util/filehandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright 2023 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A generic FileHandler abstraction that allows clients to work seamlessly with
local and GCS files and directories.
"""

import os
import io
from google.cloud import storage

_GCS_PATH_PREFIX = "gs://"


class FileHandler:
"""(Abstract) base class that should be extended by concrete implementations."""

def __init__(self, path: str, isdir: bool) -> None:
self.path = path
self.isdir = isdir

def __str__(self) -> str:
return self.path

def read_string(self) -> str:
pass

def read_string_io(self) -> io.StringIO:
return io.StringIO(self.read_string())

def write_string(self, content: str) -> None:
pass

def make_file(self, file_name: str) -> "FileHandler":
pass

def make_dirs(self) -> None:
pass


class LocalFileHandler(FileHandler):

def __init__(self, path: str) -> None:
isdir = os.path.isdir(path)
super().__init__(path, isdir)

def read_string(self) -> str:
with open(self.path, "r") as f:
return f.read()

def write_string(self, content: str) -> None:
with open(self.path, "w") as f:
f.write(content)

def make_file(self, file_name: str) -> FileHandler:
return LocalFileHandler(os.path.join(self.path, file_name))

def make_dirs(self) -> None:
return os.makedirs(self.path, exist_ok=True)


class GcsFileHandler(FileHandler):
gcs_client = storage.Client()
# Using print instead of logging since the class is loaded before logging is initialized.
print("Using GCP Project:", gcs_client.project)

def __init__(self, path: str) -> None:
if not path.startswith(_GCS_PATH_PREFIX):
raise ValueError(f"Expected {_GCS_PATH_PREFIX} prefix, got {path}")
bucket_name, blob_name = path[len(_GCS_PATH_PREFIX):].split('/', 1)
self.bucket = GcsFileHandler.gcs_client.bucket(bucket_name)
self.blob = self.bucket.blob(blob_name)
super().__init__(path, path.endswith("/"))

def read_string(self) -> str:
return self.blob.download_as_string().decode("utf-8")

def write_string(self, content: str) -> None:
self.blob.upload_from_string(content)

def make_file(self, file_name: str) -> FileHandler:
return GcsFileHandler(
f"{self.path}{'' if self.isdir else '/'}{file_name}")


def create_file_handler(path: str) -> FileHandler:
if path.startswith(_GCS_PATH_PREFIX):
return GcsFileHandler(path)
return LocalFileHandler(path)

0 comments on commit 85ed6f2

Please sign in to comment.