Skip to content

Commit

Permalink
Merge branch 'main' into 1836-bug-_parse_timestamps_in_record-throws-…
Browse files Browse the repository at this point in the history
…exception-for-key-not-present-in-schema
  • Loading branch information
edgarrmondragon authored Jul 31, 2023
2 parents e73adff + 611d50d commit e086fe0
Show file tree
Hide file tree
Showing 32 changed files with 655 additions and 144 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/constraints.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pip==23.2
pip==23.2.1
poetry==1.5.1
pre-commit==3.3.3
nox==2023.4.22
Expand Down
10 changes: 7 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ env:

jobs:
tests:
name: Test on ${{ matrix.python-version }} (${{ matrix.session }}) / ${{ matrix.os }}
name: "Test on ${{ matrix.python-version }} (${{ matrix.session }}) / ${{ matrix.os }} / SQLAlchemy: ${{ matrix.sqlalchemy }}"
runs-on: ${{ matrix.os }}
env:
NOXSESSION: ${{ matrix.session }}
Expand All @@ -47,9 +47,11 @@ jobs:
session: [tests]
os: ["ubuntu-latest", "macos-latest", "windows-latest"]
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
sqlalchemy: ["2.*"]
include:
- { session: doctest, python-version: "3.10", os: "ubuntu-latest" }
- { session: mypy, python-version: "3.8", os: "ubuntu-latest" }
- { session: tests, python-version: "3.11", os: "ubuntu-latest", sqlalchemy: "1.*" }
- { session: doctest, python-version: "3.10", os: "ubuntu-latest", sqlalchemy: "2.*" }
- { session: mypy, python-version: "3.8", os: "ubuntu-latest", sqlalchemy: "2.*" }

steps:
- name: Check out the repository
Expand Down Expand Up @@ -86,6 +88,8 @@ jobs:
nox --version
- name: Run Nox
env:
SQLALCHEMY_VERSION: ${{ matrix.sqlalchemy }}
run: |
nox --python=${{ matrix.python-version }}
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ repos:
- id: check-readthedocs

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.0.278
rev: v0.0.280
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix, --show-fixes]
Expand Down
2 changes: 2 additions & 0 deletions docs/deprecation.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ incompatible way, following their deprecation, as indicated in the
[`RESTStream.get_new_paginator`](singer_sdk.RESTStream.get_new_paginator).

See the [migration guide](./guides/pagination-classes.md) for more information.

- The `singer_sdk.testing.get_standard_tap_tests` and `singer_sdk.testing.get_standard_target_tests` functions will be removed. Replace them with `singer_sdk.testing.get_tap_test_class` and `singer_sdk.testing.get_target_test_class` functions respective to generate a richer test suite.
11 changes: 8 additions & 3 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ def tests(session: Session) -> None:
session.install(".[s3]")
session.install(*test_dependencies)

sqlalchemy_version = os.environ.get("SQLALCHEMY_VERSION")
if sqlalchemy_version:
# Bypass nox-poetry use of --constraint so we can install a version of
# SQLAlchemy that doesn't match what's in poetry.lock.
session.poetry.session.install( # type: ignore[attr-defined]
f"sqlalchemy=={sqlalchemy_version}",
)

try:
session.run(
"coverage",
Expand All @@ -95,9 +103,6 @@ def tests(session: Session) -> None:
"-v",
"--durations=10",
*session.posargs,
env={
"SQLALCHEMY_WARN_20": "1",
},
)
finally:
if session.interactive:
Expand Down
154 changes: 72 additions & 82 deletions poetry.lock

Large diffs are not rendered by default.

9 changes: 1 addition & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ memoization = ">=0.3.2,<0.5.0"
jsonpath-ng = "^1.5.3"
joblib = "^1.0.1"
inflection = "^0.5.1"
sqlalchemy = "^1.4"
sqlalchemy = ">=1.4,<3.0"
python-dotenv = ">=0.20,<0.22"
typing-extensions = "^4.2.0"
simplejson = "^3.17.6"
Expand Down Expand Up @@ -109,7 +109,6 @@ numpy = [
{ version = ">=1.22", python = ">=3.8" },
]
requests-mock = "^1.10.0"
sqlalchemy2-stubs = {version = "^0.0.2a32", allow-prereleases = true}
types-jsonschema = "^4.17.0.6"
types-python-dateutil = "^2.8.19"
types-pytz = ">=2022.7.1.2,<2024.0.0.0"
Expand All @@ -132,9 +131,6 @@ exclude = ".*simpleeval.*"

[tool.pytest.ini_options]
addopts = '-vvv --ignore=singer_sdk/helpers/_simpleeval.py -m "not external"'
filterwarnings = [
"error::sqlalchemy.exc.RemovedIn20Warning",
]
markers = [
"external: Tests relying on external resources",
"windows: Tests that only run on Windows",
Expand Down Expand Up @@ -190,9 +186,6 @@ fail_under = 82
[tool.mypy]
exclude = "tests"
files = "singer_sdk"
plugins = [
"sqlalchemy.ext.mypy.plugin",
]
python_version = "3.8"
warn_unused_configs = true
warn_unused_ignores = true
Expand Down
2 changes: 1 addition & 1 deletion singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def get_sqlalchemy_url(self, config: dict[str, t.Any]) -> str:
@staticmethod
def to_jsonschema_type(
sql_type: (
str
str # noqa: ANN401
| sqlalchemy.types.TypeEngine
| type[sqlalchemy.types.TypeEngine]
| t.Any
Expand Down
7 changes: 6 additions & 1 deletion singer_sdk/helpers/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,9 @@
from importlib import metadata
from typing import final # noqa: ICN003

__all__ = ["metadata", "final"]
if sys.version_info < (3, 9):
import importlib_resources as resources
else:
from importlib import resources

__all__ = ["metadata", "final", "resources"]
18 changes: 9 additions & 9 deletions singer_sdk/helpers/_flattening.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,15 @@ def _flatten_schema( # noqa: C901
else:
items.append((new_key, v))
elif len(v.values()) > 0:
if list(v.values())[0][0]["type"] == "string":
list(v.values())[0][0]["type"] = ["null", "string"]
items.append((new_key, list(v.values())[0][0]))
elif list(v.values())[0][0]["type"] == "array":
list(v.values())[0][0]["type"] = ["null", "array"]
items.append((new_key, list(v.values())[0][0]))
elif list(v.values())[0][0]["type"] == "object":
list(v.values())[0][0]["type"] = ["null", "object"]
items.append((new_key, list(v.values())[0][0]))
if next(iter(v.values()))[0]["type"] == "string":
next(iter(v.values()))[0]["type"] = ["null", "string"]
items.append((new_key, next(iter(v.values()))[0]))
elif next(iter(v.values()))[0]["type"] == "array":
next(iter(v.values()))[0]["type"] = ["null", "array"]
items.append((new_key, next(iter(v.values()))[0]))
elif next(iter(v.values()))[0]["type"] == "object":
next(iter(v.values()))[0]["type"] = ["null", "object"]
items.append((new_key, next(iter(v.values()))[0]))

# Sort and check for duplicates
def _key_func(item):
Expand Down
7 changes: 7 additions & 0 deletions singer_sdk/helpers/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@
description="The default target database schema name to use for all streams.",
),
).to_dict()
ADD_RECORD_METADATA_CONFIG = PropertiesList(
Property(
"add_record_metadata",
BooleanType(),
description="Add metadata to records.",
),
).to_dict()


class DeprecatedEnum(Enum):
Expand Down
13 changes: 13 additions & 0 deletions singer_sdk/plugin_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import os
import sys
import time
import typing as t
from pathlib import Path, PurePath
from types import MappingProxyType
Expand Down Expand Up @@ -155,6 +156,9 @@ def __init__(
metrics._setup_logging(self.config)
self.metrics_logger = metrics.get_metrics_logger()

# Initialization timestamp
self.__initialized_at = int(time.time() * 1000)

def setup_mapper(self) -> None:
"""Initialize the plugin mapper for this tap."""
self._mapper = PluginMapper(
Expand Down Expand Up @@ -185,6 +189,15 @@ def mapper(self, mapper: PluginMapper) -> None:
"""
self._mapper = mapper

@property
def initialized_at(self) -> int:
"""Start time of the plugin.
Returns:
The start time of the plugin.
"""
return self.__initialized_at

@classproperty
def capabilities(self) -> list[CapabilitiesEnum]:
"""Get capabilities.
Expand Down
12 changes: 8 additions & 4 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
if t.TYPE_CHECKING:
from logging import Logger

from singer_sdk.plugin_base import PluginBase
from singer_sdk.target_base import Target

JSONSchemaValidator = Draft7Validator

Expand All @@ -48,7 +48,7 @@ class Sink(metaclass=abc.ABCMeta):

def __init__(
self,
target: PluginBase,
target: Target,
stream_name: str,
schema: dict,
key_properties: list[str] | None,
Expand All @@ -62,6 +62,7 @@ def __init__(
key_properties: Primary key of the stream to sink.
"""
self.logger = target.logger
self.sync_started_at = target.initialized_at
self._config = dict(target.config)
self._pending_batch: dict | None = None
self.stream_name = stream_name
Expand Down Expand Up @@ -238,7 +239,7 @@ def _add_sdc_metadata_to_record(
Args:
record: Individual record in the stream.
message: TODO
message: The record message.
context: Stream partition or context dictionary.
"""
record["_sdc_extracted_at"] = message.get("time_extracted")
Expand All @@ -252,6 +253,7 @@ def _add_sdc_metadata_to_record(
record["_sdc_deleted_at"] = record.get("_sdc_deleted_at")
record["_sdc_sequence"] = int(round(time.time() * 1000))
record["_sdc_table_version"] = message.get("version")
record["_sdc_sync_started_at"] = self.sync_started_at

def _add_sdc_metadata_to_schema(self) -> None:
"""Add _sdc metadata columns.
Expand All @@ -270,7 +272,7 @@ def _add_sdc_metadata_to_schema(self) -> None:
"type": ["null", "string"],
"format": "date-time",
}
for col in ("_sdc_sequence", "_sdc_table_version"):
for col in ("_sdc_sequence", "_sdc_table_version", "_sdc_sync_started_at"):
properties_dict[col] = {"type": ["null", "integer"]}

def _remove_sdc_metadata_from_schema(self) -> None:
Expand All @@ -287,6 +289,7 @@ def _remove_sdc_metadata_from_schema(self) -> None:
"_sdc_deleted_at",
"_sdc_sequence",
"_sdc_table_version",
"_sdc_sync_started_at",
):
properties_dict.pop(col, None)

Expand All @@ -305,6 +308,7 @@ def _remove_sdc_metadata_from_record(self, record: dict) -> None:
record.pop("_sdc_deleted_at", None)
record.pop("_sdc_sequence", None)
record.pop("_sdc_table_version", None)
record.pop("_sdc_sync_started_at", None)

# Record validation

Expand Down
24 changes: 15 additions & 9 deletions singer_sdk/sinks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
if t.TYPE_CHECKING:
from sqlalchemy.sql import Executable

from singer_sdk.plugin_base import PluginBase
from singer_sdk.target_base import Target


class SQLSink(BatchSink):
Expand All @@ -32,7 +32,7 @@ class SQLSink(BatchSink):

def __init__(
self,
target: PluginBase,
target: Target,
stream_name: str,
schema: dict,
key_properties: list[str] | None,
Expand Down Expand Up @@ -322,15 +322,21 @@ def bulk_insert_records(
if isinstance(insert_sql, str):
insert_sql = sqlalchemy.text(insert_sql)

conformed_records = (
[self.conform_record(record) for record in records]
if isinstance(records, list)
else (self.conform_record(record) for record in records)
)
conformed_records = [self.conform_record(record) for record in records]
property_names = list(self.conform_schema(schema)["properties"].keys())

# Create new record dicts with missing properties filled in with None
new_records = [
{name: record.get(name) for name in property_names}
for record in conformed_records
]

self.logger.info("Inserting with SQL: %s", insert_sql)

with self.connector._connect() as conn, conn.begin():
conn.execute(insert_sql, conformed_records)
return len(conformed_records) if isinstance(conformed_records, list) else None
result = conn.execute(insert_sql, new_records)

return result.rowcount

def merge_upsert_from_table(
self,
Expand Down
9 changes: 6 additions & 3 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,10 @@ def is_timestamp_replication_key(self) -> bool:
type_dict = self.schema.get("properties", {}).get(self.replication_key)
return is_datetime_type(type_dict)

def get_starting_replication_key_value(self, context: dict | None) -> t.Any | None:
def get_starting_replication_key_value(
self,
context: dict | None,
) -> t.Any | None: # noqa: ANN401
"""Get starting replication key.
Will return the value of the stream's replication key when `--state` is passed.
Expand Down Expand Up @@ -385,7 +388,7 @@ def _write_starting_replication_value(self, context: dict | None) -> None:
def get_replication_key_signpost(
self,
context: dict | None, # noqa: ARG002
) -> datetime.datetime | t.Any | None:
) -> datetime.datetime | t.Any | None: # noqa: ANN401
"""Get the replication signpost.
For timestamp-based replication keys, this defaults to `utc_now()`. For
Expand Down Expand Up @@ -1255,7 +1258,7 @@ def get_child_context(self, record: dict, context: dict | None) -> dict | None:
Raises:
NotImplementedError: If the stream has children but this method is not
overriden.
overridden.
"""
if context is None:
for child_stream in self.child_streams:
Expand Down
6 changes: 4 additions & 2 deletions singer_sdk/streams/graphql.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.streams.rest import RESTStream

_TToken = t.TypeVar("_TToken")

class GraphQLStream(RESTStream, metaclass=abc.ABCMeta):

class GraphQLStream(RESTStream, t.Generic[_TToken], metaclass=abc.ABCMeta):
"""Abstract base class for API-type streams.
GraphQL streams inherit from the class `GraphQLStream`, which in turn inherits from
Expand Down Expand Up @@ -43,7 +45,7 @@ def query(self) -> str:
def prepare_request_payload(
self,
context: dict | None,
next_page_token: t.Any | None,
next_page_token: _TToken | None,
) -> dict | None:
"""Prepare the data payload for the GraphQL API request.
Expand Down
Loading

0 comments on commit e086fe0

Please sign in to comment.