Skip to content

Commit

Permalink
feat: Add json_as_object option (#526)
Browse files Browse the repository at this point in the history
To allow JSON and JSONB to be tapped as plain objects

See [this Slack
thread](https://meltano.slack.com/archives/C069CQNHDNF/p1729784981090099)
for context
  • Loading branch information
joncass authored Nov 1, 2024
1 parent aaa3b7e commit 6efa7d0
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 2 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com).
| sqlalchemy_url | False | None | Example postgresql://[username]:[password]@localhost:5432/[db_name] |
| filter_schemas | False | None | If an array of schema names is provided, the tap will only process the specified Postgres schemas and ignore others. If left blank, the tap automatically determines ALL available Postgres schemas. |
| dates_as_string | False | 0 | Defaults to false, if true, date, and timestamp fields will be Strings. If you see ValueError: Year is out of range, try setting this to True. |
| json_as_object | False | 0 | Defaults to false, if true, json and jsonb fields will be Objects. |
| ssh_tunnel | False | None | SSH Tunnel Configuration, this is a json object |
| ssh_tunnel.enable | False | 0 | Enable an ssh tunnel (also known as bastion server), see the other ssh_tunnel.* properties for more details |
| ssh_tunnel.host | False | None | Host of the bastion server, this is the host we'll connect to via ssh |
Expand Down Expand Up @@ -132,6 +133,8 @@ Create tests within the `tap_postgres/tests` subfolder and
poetry run pytest
```

NOTE: Running the tests requires a locally running postgres. See tests/settings.py for the expected configuration.

You can also test the `tap-postgres` CLI interface directly using `poetry run`:

```bash
Expand Down
10 changes: 8 additions & 2 deletions tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@
class PostgresSQLToJSONSchema(SQLToJSONSchema):
"""Custom SQL to JSON Schema conversion for Postgres."""

def __init__(self, dates_as_string: bool, *args, **kwargs):
def __init__(self, dates_as_string: bool, json_as_object: bool, *args, **kwargs):
"""Initialize the SQL to JSON Schema converter."""
super().__init__(*args, **kwargs)
self.dates_as_string = dates_as_string
self.json_as_object = json_as_object

@SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined]
def array_to_jsonschema(self, column_type: postgresql.ARRAY) -> dict:
Expand All @@ -55,6 +56,8 @@ def array_to_jsonschema(self, column_type: postgresql.ARRAY) -> dict:
@SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined]
def json_to_jsonschema(self, column_type: postgresql.JSON) -> dict:
"""Override the default mapping for JSON and JSONB columns."""
if self.json_as_object:
return {"type": ["object", "null"]}
return {"type": ["string", "number", "integer", "array", "object", "boolean"]}

@SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined]
Expand Down Expand Up @@ -159,7 +162,10 @@ def __init__(
@functools.cached_property
def sql_to_jsonschema(self):
"""Return a mapping of SQL types to JSON Schema types."""
return PostgresSQLToJSONSchema(dates_as_string=self.config["dates_as_string"])
return PostgresSQLToJSONSchema(
dates_as_string=self.config["dates_as_string"],
json_as_object=self.config["json_as_object"],
)

def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]:
"""Return a list of schema names in DB, or overrides with user-provided values.
Expand Down
8 changes: 8 additions & 0 deletions tap_postgres/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,14 @@ def __init__(
),
default=False,
),
th.Property(
"json_as_object",
th.BooleanType,
description=(
"Defaults to false, if true, json and jsonb fields will be Objects."
),
default=False,
),
th.Property(
"ssh_tunnel",
th.ObjectType(
Expand Down
73 changes: 73 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,79 @@ def test_jsonb_array():
assert test_runner.records[altered_table_name][i] == rows[i]


def test_json_as_object():
"""Some use cases require JSON and JSONB columns to be typed as Objects."""
table_name = "test_json_as_object"
engine = sa.create_engine(SAMPLE_CONFIG["sqlalchemy_url"], future=True)

metadata_obj = sa.MetaData()
table = sa.Table(
table_name,
metadata_obj,
sa.Column("column_jsonb", JSONB),
sa.Column("column_json", JSON),
)

rows = [
{"column_jsonb": {"foo": "bar"}, "column_json": {"baz": "foo"}},
{"column_jsonb": 3.14, "column_json": -9.3},
{"column_jsonb": 22, "column_json": 10000000},
{"column_jsonb": {}, "column_json": {}},
{"column_jsonb": ["bar", "foo"], "column_json": ["foo", "baz"]},
{"column_jsonb": True, "column_json": False},
]

with engine.begin() as conn:
table.drop(conn, checkfirst=True)
metadata_obj.create_all(conn)
insert = table.insert().values(rows)
conn.execute(insert)

copied_config = copy.deepcopy(SAMPLE_CONFIG)
# This should cause the same data to pass
copied_config["json_as_object"] = True

tap = TapPostgres(config=copied_config)
tap_catalog = json.loads(tap.catalog_json_text)
altered_table_name = f"{DB_SCHEMA_NAME}-{table_name}"

for stream in tap_catalog["streams"]:
if stream.get("stream") and altered_table_name not in stream["stream"]:
for metadata in stream["metadata"]:
metadata["metadata"]["selected"] = False
else:
for metadata in stream["metadata"]:
metadata["metadata"]["selected"] = True
if metadata["breadcrumb"] == []:
metadata["metadata"]["replication-method"] = "FULL_TABLE"

test_runner = PostgresTestRunner(
tap_class=TapPostgres, config=SAMPLE_CONFIG, catalog=tap_catalog
)
test_runner.sync_all()
for schema_message in test_runner.schema_messages:
if (
"stream" in schema_message
and schema_message["stream"] == altered_table_name
):
assert schema_message["schema"]["properties"]["column_jsonb"] == {
"type": [
"object",
"null",
]
}
assert schema_message["schema"]["properties"]["column_json"] == {
"type": [
"object",
"null",
]
}

assert len(rows) == len(test_runner.records[altered_table_name])
for expected_row, actual_row in zip(rows, test_runner.records[altered_table_name]):
assert actual_row == expected_row


def test_numeric_types():
"""Schema was wrong for Decimal objects. Check they are correctly selected."""
table_name = "test_decimal"
Expand Down

0 comments on commit 6efa7d0

Please sign in to comment.