Skip to content

Commit

Permalink
Use fsspec
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Sep 17, 2024
1 parent 97b27d7 commit 0fd556a
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 10 deletions.
41 changes: 40 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ backoff = { version = ">=2.0.0", python = "<4" }
backports-datetime-fromisoformat = { version = ">=2.0.1", python = "<3.11" }
click = "~=8.0"
fs = ">=2.4.16"
importlib-metadata = {version = "<9.0.0", python = "<3.12"}
importlib-resources = {version = ">=5.12.0,!=6.2.0,!=6.3.0,!=6.3.1", python = "<3.10"}
fsspec = ">=2024.9.0"
importlib-metadata = { version = "<9.0.0", python = "<3.12" }
importlib-resources = { version = ">=5.12.0,!=6.2.0,!=6.3.0,!=6.3.1", python = "<3.10" }
inflection = ">=0.5.1"
joblib = ">=1.3.0"
jsonpath-ng = ">=1.5.3"
Expand Down
25 changes: 18 additions & 7 deletions samples/sample_tap_csv/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import csv
import datetime
import os
import typing as t

import fsspec

from singer_sdk import Stream
from singer_sdk.helpers._util import utc_now # noqa: PLC2701
from singer_sdk.streams.core import REPLICATION_INCREMENTAL

if t.TYPE_CHECKING:
Expand Down Expand Up @@ -47,20 +49,22 @@ def __init__(

self._partitions = partitions or []

self.filesystem: fsspec.AbstractFileSystem = fsspec.filesystem("local")
self._sync_start_time = utc_now()

@property
def partitions(self) -> list[Context]:
return self._partitions

def _read_file(self, path: str) -> t.Iterable[Record]: # noqa: PLR6301
def _read_file(self, path: str) -> t.Iterable[Record]:
# Make these configurable.
delimiter = ","
quotechar = '"'
escapechar = None
doublequote = True
lineterminator = "\r\n"

# TODO: Use filesytem-specific file open method.
with open(path, encoding="utf-8") as file: # noqa: PTH123
with self.filesystem.open(path, mode="r") as file:
reader = csv.DictReader(
file,
delimiter=delimiter,
Expand All @@ -76,16 +80,23 @@ def get_records(
context: Context | None,
) -> t.Iterable[Record | tuple[Record, Context | None]]:
path: str = context[SDC_META_FILEPATH]
mtime = os.path.getmtime(path) # noqa: PTH204

mtime: datetime.datetime | None
try:
mtime: datetime.datetime = self.filesystem.modified(path)
except NotImplementedError:
self.logger.warning("Filesystem does not support modified time")
mtime = None

if (
self.replication_method is REPLICATION_INCREMENTAL
and (previous_bookmark := self.get_starting_timestamp(context))
and _to_datetime(mtime) < previous_bookmark
and mtime is not None
and mtime < previous_bookmark
):
self.logger.info("File has not been modified since last read, skipping")
return

for record in self._read_file(path):
record[SDC_META_MODIFIED_AT] = _to_datetime(mtime)
record[SDC_META_MODIFIED_AT] = mtime or self._sync_start_time
yield record

0 comments on commit 0fd556a

Please sign in to comment.