Skip to content

Commit

Permalink
feat(target): SQL target developers can now more easily override the …
Browse files Browse the repository at this point in the history
…mapping from JSON schema to SQL column type
  • Loading branch information
edgarrmondragon committed Oct 25, 2024
1 parent fb9ac30 commit 90e9b3d
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 6 deletions.
189 changes: 186 additions & 3 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
else:
from warnings import deprecated

if sys.version_info < (3, 10):
from typing_extensions import TypeAlias
else:
from typing import TypeAlias # noqa: ICN003

if t.TYPE_CHECKING:
from sqlalchemy.engine import Engine
from sqlalchemy.engine.reflection import Inspector
Expand Down Expand Up @@ -192,6 +197,175 @@ def boolean_to_jsonschema(self, column_type: sa.types.Boolean) -> dict: # noqa:
return th.BooleanType.type_dict # type: ignore[no-any-return]


JSONtoSQLHandler: TypeAlias = t.Union[
t.Type[sa.types.TypeEngine],
t.Callable[[dict], sa.types.TypeEngine],
]


class JSONSchemaToSQL:
"""A configurable mapper for converting JSON Schema types to SQLAlchemy types."""

def __init__(self) -> None:
"""Initialize the mapper with default type mappings."""
# Default type mappings
self._type_mapping: dict[str, JSONtoSQLHandler] = {
"string": self._handle_string_type,
"integer": sa.types.INTEGER,
"number": sa.types.DECIMAL,
"boolean": sa.types.BOOLEAN,
"object": sa.types.VARCHAR,
"array": sa.types.VARCHAR,
}

# Format handlers for string types
self._format_handlers: dict[str, JSONtoSQLHandler] = {
# Default date-like formats
"date-time": sa.types.DATETIME,
"time": sa.types.TIME,
"date": sa.types.DATE,
# Common string formats with sensible defaults
"uuid": sa.types.UUID,
"email": lambda _: sa.types.VARCHAR(254), # RFC 5321
"uri": lambda _: sa.types.VARCHAR(2083), # Common browser limit
"hostname": lambda _: sa.types.VARCHAR(253), # RFC 1035
"ipv4": lambda _: sa.types.VARCHAR(15),
"ipv6": lambda _: sa.types.VARCHAR(45),
}

def _invoke_handler( # noqa: PLR6301
self,
handler: JSONtoSQLHandler,
schema: dict,
) -> sa.types.TypeEngine:
"""Invoke a handler, handling both type classes and callables.
Args:
handler: The handler to invoke.
schema: The schema to pass to callable handlers.
Returns:
The resulting SQLAlchemy type.
"""
if isinstance(handler, type):
return handler() # type: ignore[no-any-return]
return handler(schema)

def register_type_handler(self, json_type: str, handler: JSONtoSQLHandler) -> None:
"""Register a custom type handler.
Args:
json_type: The JSON Schema type to handle.
handler: Either a SQLAlchemy type class or a callable that takes a schema
dict and returns a SQLAlchemy type instance.
"""
self._type_mapping[json_type] = handler

def register_format_handler(
self,
format_name: str,
handler: JSONtoSQLHandler,
) -> None:
"""Register a custom format handler.
Args:
format_name: The format string (e.g., "date-time", "email", "custom-format").
handler: Either a SQLAlchemy type class or a callable that takes a schema
dict and returns a SQLAlchemy type instance.
""" # noqa: E501
self._format_handlers[format_name] = handler

def _get_type_from_schema(self, schema: dict) -> sa.types.TypeEngine | None:
"""Try to get a SQL type from a single schema object.
Args:
schema: The JSON Schema object.
Returns:
SQL type if one can be determined, None otherwise.
"""
# Check if this is a string with format first
if schema.get("type") == "string" and "format" in schema:
format_type = self._handle_format(schema)
if format_type is not None:
return format_type

# Then check regular types
if "type" in schema:
schema_type = schema["type"]
if isinstance(schema_type, (list, tuple)):
# For type arrays, try each type
for t in schema_type:
if handler := self._type_mapping.get(t):
return self._invoke_handler(handler, schema)
elif schema_type in self._type_mapping:
handler = self._type_mapping[schema_type]
return self._invoke_handler(handler, schema)

return None

def _handle_format(self, schema: dict) -> sa.types.TypeEngine | None:
"""Handle format-specific type conversion.
Args:
schema: The JSON Schema object.
Returns:
The format-specific SQL type if applicable, None otherwise.
"""
if "format" not in schema:
return None

format_string: str = schema["format"]

if handler := self._format_handlers.get(format_string):
return self._invoke_handler(handler, schema)

return None

def _handle_string_type(self, schema: dict) -> sa.types.TypeEngine:
"""Handle string type conversion with special cases for formats.
Args:
schema: The JSON Schema object.
Returns:
Appropriate SQLAlchemy type.
"""
# Check for format-specific handling first
if format_type := self._handle_format(schema):
return format_type

# Default string handling
max_length: int | None = schema.get("maxLength")
return sa.types.VARCHAR(max_length)

def to_sql_type(self, schema: dict) -> sa.types.TypeEngine:
"""Convert a JSON Schema type definition to a SQLAlchemy type.
Args:
schema: The JSON Schema object.
Returns:
The corresponding SQLAlchemy type.
"""
if sql_type := self._get_type_from_schema(schema):
return sql_type

# Handle anyOf
if "anyOf" in schema:
for subschema in schema["anyOf"]:
# Skip null types in anyOf
if subschema.get("type") == "null":
continue

Check warning on line 360 in singer_sdk/connectors/sql.py

View check run for this annotation

Codecov / codecov/patch

singer_sdk/connectors/sql.py#L360

Added line #L360 was not covered by tests

if sql_type := self._get_type_from_schema(subschema):
return sql_type

# Fallback
return sa.types.VARCHAR()


class SQLConnector: # noqa: PLR0904
"""Base class for SQLAlchemy-based connectors.
Expand Down Expand Up @@ -255,6 +429,16 @@ def sql_to_jsonschema(self) -> SQLToJSONSchema:
"""
return SQLToJSONSchema()

@functools.cached_property
def jsonschema_to_sql(self) -> JSONSchemaToSQL:
"""The JSON-to-SQL type mapper object for this SQL connector.
Override this property to provide a custom mapping for your SQL dialect.
.. versionadded:: 0.42.0
"""
return JSONSchemaToSQL()

@contextmanager
def _connect(self) -> t.Iterator[sa.engine.Connection]:
with self._engine.connect().execution_options(stream_results=True) as conn:
Expand Down Expand Up @@ -418,8 +602,7 @@ def to_jsonschema_type(
msg = f"Unexpected type received: '{type(sql_type).__name__}'"
raise ValueError(msg)

@staticmethod
def to_sql_type(jsonschema_type: dict) -> sa.types.TypeEngine:
def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine:

Check warning on line 605 in singer_sdk/connectors/sql.py

View workflow job for this annotation

GitHub Actions / Check API Changes

SQLConnector.to_sql_type(jsonschema_type)

Positional parameter was moved: `` -> ``

Check warning on line 605 in singer_sdk/connectors/sql.py

View workflow job for this annotation

GitHub Actions / Check API Changes

SQLConnector.to_sql_type(self)

Parameter was added as required: `` -> ``
"""Return a JSON Schema representation of the provided type.
By default will call `typing.to_sql_type()`.
Expand All @@ -435,7 +618,7 @@ def to_sql_type(jsonschema_type: dict) -> sa.types.TypeEngine:
Returns:
The SQLAlchemy type representation of the data type.
"""
return th.to_sql_type(jsonschema_type)
return self.jsonschema_to_sql.to_sql_type(jsonschema_type)

@staticmethod
def get_fully_qualified_name(
Expand Down
4 changes: 4 additions & 0 deletions singer_sdk/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,10 @@ def _jsonschema_type_check(jsonschema_type: dict, type_check: tuple[str]) -> boo
)


@deprecated(
"Use `JSONSchemaToSQL` instead.",
category=DeprecationWarning,
)
def to_sql_type( # noqa: PLR0911, C901
jsonschema_type: dict,
) -> sa.types.TypeEngine:
Expand Down
142 changes: 140 additions & 2 deletions tests/core/test_connector_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@

from samples.sample_duckdb import DuckDBConnector
from singer_sdk.connectors import SQLConnector
from singer_sdk.connectors.sql import FullyQualifiedName, SQLToJSONSchema
from singer_sdk.connectors.sql import (
FullyQualifiedName,
JSONSchemaToSQL,
SQLToJSONSchema,
)
from singer_sdk.exceptions import ConfigValidationError

if t.TYPE_CHECKING:
Expand Down Expand Up @@ -445,7 +449,7 @@ def test_sql_to_json_schema_map(
assert m.to_jsonschema(sql_type) == expected_jsonschema_type


def test_custom_type():
def test_custom_type_to_jsonschema():
class MyMap(SQLToJSONSchema):
@SQLToJSONSchema.to_jsonschema.register
def custom_number_to_jsonschema(self, column_type: sa.types.NUMERIC) -> dict:
Expand All @@ -470,3 +474,137 @@ def my_type_to_jsonschema(self, column_type) -> dict: # noqa: ARG002
"multipleOf": 0.01,
}
assert m.to_jsonschema(sa.types.BOOLEAN()) == {"type": ["boolean"]}


class TestJSONSchemaToSQL:
@pytest.fixture
def json_schema_to_sql(self) -> JSONSchemaToSQL:
return JSONSchemaToSQL()

def test_register_jsonschema_type_handler(
self,
json_schema_to_sql: JSONSchemaToSQL,
):
json_schema_to_sql.register_type_handler("my-type", sa.types.LargeBinary)
result = json_schema_to_sql.to_sql_type({"type": "my-type"})
assert isinstance(result, sa.types.LargeBinary)

def test_register_jsonschema_format_handler(
self,
json_schema_to_sql: JSONSchemaToSQL,
):
json_schema_to_sql.register_format_handler("my-format", sa.types.LargeBinary)
result = json_schema_to_sql.to_sql_type(
{
"type": "string",
"format": "my-format",
}
)
assert isinstance(result, sa.types.LargeBinary)

def test_string(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {"type": ["string", "null"]}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(result, sa.types.VARCHAR)
assert result.length is None

def test_string_max_length(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {"type": ["string", "null"], "maxLength": 10}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(
json_schema_to_sql.to_sql_type(jsonschema_type),
sa.types.VARCHAR,
)
assert result.length == 10

def test_integer(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {"type": ["integer", "null"]}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(result, sa.types.INTEGER)

def test_number(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {"type": ["number", "null"]}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(result, sa.types.DECIMAL)

def test_boolean(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {"type": ["boolean", "null"]}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(result, sa.types.BOOLEAN)

def test_object(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {"type": "object", "properties": {}}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(result, sa.types.VARCHAR)

def test_array(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {"type": "array"}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(result, sa.types.VARCHAR)

def test_array_items(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {"type": "array", "items": {"type": "string"}}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(result, sa.types.VARCHAR)

def test_date(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {"format": "date", "type": ["string", "null"]}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(result, sa.types.DATE)

def test_time(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {"format": "time", "type": ["string", "null"]}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(result, sa.types.TIME)

def test_uuid(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {"format": "uuid", "type": ["string", "null"]}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(result, sa.types.UUID)

def test_datetime(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {"format": "date-time", "type": ["string", "null"]}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(result, sa.types.DATETIME)

def test_anyof_datetime(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {
"anyOf": [
{"type": "string", "format": "date-time"},
{"type": "null"},
],
}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(result, sa.types.DATETIME)

def test_anyof_integer(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {
"anyOf": [
{"type": "integer"},
{"type": "null"},
],
}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(result, sa.types.INTEGER)

def test_complex(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {
"type": [
"array",
"object",
"boolean",
"null",
]
}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(result, sa.types.VARCHAR)

def test_unknown_type(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {"cannot": "compute"}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(result, sa.types.VARCHAR)

def test_unknown_format(self, json_schema_to_sql: JSONSchemaToSQL):
jsonschema_type = {"type": "string", "format": "unknown"}
result = json_schema_to_sql.to_sql_type(jsonschema_type)
assert isinstance(result, sa.types.VARCHAR)
Loading

0 comments on commit 90e9b3d

Please sign in to comment.