Skip to content

Commit

Permalink
test: Test with singer-sdk @ main
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Sep 6, 2024
1 parent a30a6b9 commit 1ff5a98
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 140 deletions.
23 changes: 13 additions & 10 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ sqlalchemy = "<3"
sshtunnel = "0.4.0"

[tool.poetry.dependencies.singer-sdk]
version = "~=0.40.0a1"
extras = ["faker"]
git = "https://github.com/meltano/sdk.git"

[tool.poetry.group.dev.dependencies]
faker = ">=18.5.1"
Expand All @@ -56,7 +55,7 @@ types-jsonschema = ">=4.19.0.3"
types-psycopg2 = ">=2.9.21.20240118"

[tool.poetry.dev-dependencies.singer-sdk]
version = "*"
git = "https://github.com/meltano/sdk.git"
extras = ["testing"]

[tool.mypy]
Expand Down
174 changes: 47 additions & 127 deletions tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import annotations

import datetime
import functools
import json
import select
import typing as t
Expand All @@ -16,19 +17,59 @@
import psycopg2
import singer_sdk.helpers._typing
import sqlalchemy as sa
import sqlalchemy.types
from psycopg2 import extras
from singer_sdk import SQLConnector, SQLStream
from singer_sdk import typing as th
from singer_sdk.connectors.sql import SQLToJSONSchema
from singer_sdk.helpers._state import increment_state
from singer_sdk.helpers._typing import TypeConformanceLevel
from singer_sdk.streams.core import REPLICATION_INCREMENTAL
from sqlalchemy.dialects import postgresql

if TYPE_CHECKING:
from singer_sdk.helpers.types import Context
from sqlalchemy.dialects import postgresql
from sqlalchemy.engine import Engine
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.types import TypeEngine


class PostgresSQLToJSONSchema(SQLToJSONSchema):
"""Custom SQL to JSON Schema conversion for Postgres."""

def __init__(self, dates_as_string: bool, *args, **kwargs):
"""Initialize the SQL to JSON Schema converter."""
super().__init__(*args, **kwargs)
self.dates_as_string = dates_as_string

@SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined]
def array_to_jsonschema(self, column_type: postgresql.ARRAY) -> dict:
"""Override the default mapping for NUMERIC columns.
For example, a scale of 4 translates to a multipleOf 0.0001.
"""
return {
"type": "array",
"items": self.to_jsonschema(column_type.item_type),
}

@SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined]
def json_to_jsonschema(self, column_type: postgresql.JSON) -> dict:
"""Override the default mapping for JSON and JSONB columns."""
return {"type": ["string", "number", "integer", "array", "object", "boolean"]}

@SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined]
def datetime_to_jsonschema(self, column_type: sqlalchemy.types.DateTime) -> dict:
"""Override the default mapping for DATETIME columns."""
if self.dates_as_string:
return {"type": ["string", "null"]}
return super().datetime_to_jsonschema(column_type)

@SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined]
def date_to_jsonschema(self, column_type: sqlalchemy.types.Date) -> dict:
"""Override the default mapping for DATE columns."""
if self.dates_as_string:
return {"type": ["string", "null"]}
return super().date_to_jsonschema(column_type)


def patched_conform(
Expand Down Expand Up @@ -115,131 +156,10 @@ def __init__(

super().__init__(config=config, sqlalchemy_url=sqlalchemy_url)

# Note super is static, we can get away with this because this is called once
# and is luckily referenced via the instance of the class
def to_jsonschema_type( # type: ignore[override]
self,
sql_type: str | TypeEngine | type[TypeEngine] | postgresql.ARRAY | Any,
) -> dict:
"""Return a JSON Schema representation of the provided type.
Overridden from SQLConnector to correctly handle JSONB and Arrays.
Also Overridden in order to call our instance method `sdk_typing_object()`
instead of the static version
By default will call `typing.to_jsonschema_type()` for strings and SQLAlchemy
types.
Args:
sql_type: The string representation of the SQL type, a SQLAlchemy
TypeEngine class or object, or a custom-specified object.
Raises:
ValueError: If the type received could not be translated to jsonschema.
Returns:
The JSON Schema representation of the provided type.
"""
type_name = None
if isinstance(sql_type, str):
type_name = sql_type
elif isinstance(sql_type, sa.types.TypeEngine):
type_name = type(sql_type).__name__

if (
type_name is not None
and isinstance(sql_type, sa.dialects.postgresql.ARRAY)
and type_name == "ARRAY"
):
array_type = self.sdk_typing_object(sql_type.item_type)
return th.ArrayType(array_type).type_dict
return self.sdk_typing_object(sql_type).type_dict

def sdk_typing_object(
self,
from_type: str | TypeEngine | type[TypeEngine],
) -> (
th.DateTimeType
| th.NumberType
| th.IntegerType
| th.DateType
| th.StringType
| th.BooleanType
| th.CustomType
):
"""Return the JSON Schema dict that describes the sql type.
Args:
from_type: The SQL type as a string or as a TypeEngine. If a TypeEngine is
provided, it may be provided as a class or a specific object instance.
Raises:
ValueError: If the `from_type` value is not of type `str` or `TypeEngine`.
Returns:
A compatible JSON Schema type definition.
"""
# NOTE: This is an ordered mapping, with earlier mappings taking precedence. If
# the SQL-provided type contains the type name on the left, the mapping will
# return the respective singer type.
# NOTE: jsonb and json should theoretically be th.AnyType().type_dict but that
# causes problems down the line with an error like:
# singer_sdk.helpers._typing.EmptySchemaTypeError: Could not detect type from
# empty type_dict. Did you forget to define a property in the stream schema?
sqltype_lookup: dict[
str,
th.DateTimeType
| th.NumberType
| th.IntegerType
| th.DateType
| th.StringType
| th.BooleanType
| th.CustomType,
] = {
"jsonb": th.CustomType(
{"type": ["string", "number", "integer", "array", "object", "boolean"]}
),
"json": th.CustomType(
{"type": ["string", "number", "integer", "array", "object", "boolean"]}
),
"timestamp": th.DateTimeType(),
"datetime": th.DateTimeType(),
"date": th.DateType(),
"int": th.IntegerType(),
"numeric": th.NumberType(),
"decimal": th.NumberType(),
"double": th.NumberType(),
"float": th.NumberType(),
"real": th.NumberType(),
"float4": th.NumberType(),
"string": th.StringType(),
"text": th.StringType(),
"char": th.StringType(),
"bool": th.BooleanType(),
"variant": th.StringType(),
}
if self.config["dates_as_string"] is True:
sqltype_lookup["date"] = th.StringType()
sqltype_lookup["datetime"] = th.StringType()
if isinstance(from_type, str):
type_name = from_type
elif isinstance(from_type, sa.types.TypeEngine):
type_name = type(from_type).__name__
elif isinstance(from_type, type) and issubclass(from_type, sa.types.TypeEngine):
type_name = from_type.__name__
else:
raise ValueError(
"Expected `str` or a SQLAlchemy `TypeEngine` object or type."
)

# Look for the type name within the known SQL type names:
for sqltype, jsonschema_type in sqltype_lookup.items():
if sqltype.lower() in type_name.lower():
return jsonschema_type

return sqltype_lookup["string"] # safe failover to str
@functools.cached_property
def type_mapping(self):
"""Return a mapping of SQL types to JSON Schema types."""
return PostgresSQLToJSONSchema(dates_as_string=self.config["dates_as_string"])

def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]:
"""Return a list of schema names in DB, or overrides with user-provided values.
Expand Down

0 comments on commit 1ff5a98

Please sign in to comment.