diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 0cd27364a..a8ab26c25 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -311,6 +311,18 @@ def handle_multiple_types(self, types: t.Sequence[str]) -> sa.types.TypeEngine: """ return sa.types.VARCHAR() + def handle_raw_string(self, schema: dict) -> sa.types.TypeEngine: # noqa: PLR6301 + """Handle a string type generically. + + Args: + schema: The JSON Schema object. + + Returns: + Appropriate SQLAlchemy type. + """ + max_length: int | None = schema.get("maxLength") + return sa.types.VARCHAR(max_length) + def _get_type_from_schema(self, schema: dict) -> sa.types.TypeEngine | None: """Try to get a SQL type from a single schema object. @@ -378,9 +390,7 @@ def _handle_string_type(self, schema: dict) -> sa.types.TypeEngine: if format_type := self._handle_format(schema): return format_type - # Default string handling - max_length: int | None = schema.get("maxLength") - return sa.types.VARCHAR(max_length) + return self.handle_raw_string(schema) def to_sql_type(self, schema: dict) -> sa.types.TypeEngine: """Convert a JSON Schema type definition to a SQLAlchemy type. diff --git a/tests/core/test_connector_sql.py b/tests/core/test_connector_sql.py index dc987a39b..66637e9da 100644 --- a/tests/core/test_connector_sql.py +++ b/tests/core/test_connector_sql.py @@ -597,13 +597,6 @@ def test_anyof_unknown(self, json_schema_to_sql: JSONSchemaToSQL): result = json_schema_to_sql.to_sql_type(jsonschema_type) assert isinstance(result, sa.types.VARCHAR) - def test_custom_fallback(self): - json_schema_to_sql = JSONSchemaToSQL() - json_schema_to_sql.fallback_type = sa.types.CHAR - jsonschema_type = {"cannot": "compute"} - result = json_schema_to_sql.to_sql_type(jsonschema_type) - assert isinstance(result, sa.types.CHAR) - @pytest.mark.parametrize( "jsonschema_type,expected_type", [ @@ -637,3 +630,39 @@ def test_unknown_format(self, json_schema_to_sql: JSONSchemaToSQL): jsonschema_type = {"type": "string", "format": "unknown"} result = json_schema_to_sql.to_sql_type(jsonschema_type) assert isinstance(result, sa.types.VARCHAR) + + def test_custom_fallback(self): + json_schema_to_sql = JSONSchemaToSQL() + json_schema_to_sql.fallback_type = sa.types.CHAR + jsonschema_type = {"cannot": "compute"} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.CHAR) + + def test_custom_handle_raw_string(self): + class CustomJSONSchemaToSQL(JSONSchemaToSQL): + def handle_raw_string(self, schema): + if schema.get("contentMediaType") == "image/png": + return sa.types.LargeBinary() + + return super().handle_raw_string(schema) + + json_schema_to_sql = CustomJSONSchemaToSQL() + + vanilla = {"type": ["string"]} + result = json_schema_to_sql.to_sql_type(vanilla) + assert isinstance(result, sa.types.VARCHAR) + + non_image_type = { + "type": "string", + "contentMediaType": "text/html", + } + result = json_schema_to_sql.to_sql_type(non_image_type) + assert isinstance(result, sa.types.VARCHAR) + + image_type = { + "type": "string", + "contentEncoding": "base64", + "contentMediaType": "image/png", + } + result = json_schema_to_sql.to_sql_type(image_type) + assert isinstance(result, sa.types.LargeBinary)