Skip to content

Commit

Permalink
CSV-specific settings
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Sep 18, 2024
1 parent 1b9f4ed commit 8144fe2
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 53 deletions.
17 changes: 5 additions & 12 deletions samples/sample_tap_csv/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,13 @@ class CSVStream(FileStream):
"""CSV stream class."""

def read_file(self, path: str) -> t.Iterable[Record]:
# Make these configurable.
delimiter = ","
quotechar = '"'
escapechar = None
doublequote = True
lineterminator = "\r\n"

with self.filesystem.open(path, mode="r") as file:
reader = csv.DictReader(
file,
delimiter=delimiter,
quotechar=quotechar,
escapechar=escapechar,
doublequote=doublequote,
lineterminator=lineterminator,
delimiter=self.config["delimiter"],
quotechar=self.config["quotechar"],
escapechar=self.config.get("escapechar"),
doublequote=self.config["doublequote"],
lineterminator=self.config["lineterminator"],
)
yield from reader
34 changes: 34 additions & 0 deletions samples/sample_tap_csv/sample_tap_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import singer_sdk.typing as th
from samples.sample_tap_csv.client import CSVStream
from singer_sdk.contrib.filesystem import FolderTap

Expand All @@ -12,3 +13,36 @@ class SampleTapCSV(FolderTap):
name = "sample-tap-csv"
valid_extensions: tuple[str, ...] = (".csv",)
default_stream_class = CSVStream

config_jsonschema = th.PropertiesList(
th.Property(
"delimiter",
th.StringType,
default=",",
description="Field delimiter character.",
),
th.Property(
"quotechar",
th.StringType,
default='"',
description="Quote character.",
),
th.Property(
"escapechar",
th.StringType,
default=None,
description="Escape character.",
),
th.Property(
"doublequote",
th.BooleanType,
default=True,
description="Whether quotechar inside a field should be doubled.",
),
th.Property(
"lineterminator",
th.StringType,
default="\r\n",
description="Line terminator character.",
),
).to_dict()
111 changes: 70 additions & 41 deletions singer_sdk/contrib/filesystem/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,48 @@
DEFAULT_MERGE_STREAM_NAME = "files"


class ReadMode(str, enum.Enum):
"""Sync mode for the tap."""

one_stream_per_file = "one_stream_per_file"
merge = "merge"


BASE_CONFIG_SCHEMA = th.PropertiesList(
th.Property(
"filesystem",
th.StringType,
required=True,
default="local",
allowed_values=["local"],
description="The filesystem to use.",
),
th.Property(
"path",
th.StringType,
required=True,
description="Path to the directory where the files are stored.",
),
th.Property(
"read_mode",
th.StringType,
required=True,
description=(
"Use `one_stream_per_file` to read each file as a separate stream, or "
"`merge` to merge all files into a single stream."
),
allowed_values=list(ReadMode),
),
th.Property(
"stream_name",
th.StringType,
required=True,
default=DEFAULT_MERGE_STREAM_NAME,
description="Name of the stream to use when `read_mode` is `merge`.",
),
).to_dict()


def file_path_to_stream_name(file_path: str) -> str:
"""Convert a file path to a stream name.
Expand All @@ -33,13 +75,6 @@ def file_path_to_stream_name(file_path: str) -> str:
return path_obj.with_suffix("").as_posix().replace("/", "__")


class ReadMode(str, enum.Enum):
"""Sync mode for the tap."""

one_stream_per_file = "one_stream_per_file"
merge = "merge"


_T = t.TypeVar("_T", bound=FileStream)


Expand All @@ -50,40 +85,34 @@ class FolderTap(Tap, t.Generic[_T]):

default_stream_class: type[_T]

config_jsonschema = th.PropertiesList(
th.Property(
"filesystem",
th.StringType,
required=True,
default="local",
allowed_values=["local"],
description="The filesystem to use.",
),
th.Property(
"path",
th.StringType,
required=True,
description="Path to the directory where the files are stored.",
),
th.Property(
"read_mode",
th.StringType,
required=True,
description=(
"Use `one_stream_per_file` to read each file as a separate stream, or "
"`merge` to merge all files into a single stream."
),
allowed_values=list(ReadMode),
),
th.Property(
"stream_name",
th.StringType,
required=True,
default=DEFAULT_MERGE_STREAM_NAME,
description="Name of the stream to use when `read_mode` is `merge`.",
),
# TODO(edgarmondragon): Other configuration options.
).to_dict()
config_jsonschema: t.ClassVar[dict] = {"properties": {}}

@classmethod
def append_builtin_config(cls: type[FolderTap], config_jsonschema: dict) -> None:
"""Appends built-in config to `config_jsonschema` if not already set.
To customize or disable this behavior, developers may either override this class
method or override the `capabilities` property to disabled any unwanted
built-in capabilities.
For all except very advanced use cases, we recommend leaving these
implementations "as-is", since this provides the most choice to users and is
the most "future proof" in terms of taking advantage of built-in capabilities
which may be added in the future.
Args:
config_jsonschema: [description]
"""

def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:
# Append any missing properties in the target with those from source.
for k, v in source_jsonschema["properties"].items():
if k not in target_jsonschema["properties"]:
target_jsonschema["properties"][k] = v

_merge_missing(BASE_CONFIG_SCHEMA, config_jsonschema)

super().append_builtin_config(config_jsonschema)

@functools.cached_property
def read_mode(self) -> ReadMode:
Expand Down

0 comments on commit 8144fe2

Please sign in to comment.