Skip to content

Commit

Permalink
fix: Fixes null state attempting to be written (#233)
Browse files Browse the repository at this point in the history
Having a replication key column that contains null values was causing
null state to be written under some circumstances. This PR sorts null
values as the "lowest" result for replication key purposes, preventing
nulls from taking precedence over real replication values.

Closes #203
  • Loading branch information
sebastianswms authored Sep 12, 2023
1 parent 65a2147 commit bcc0db1
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 1 deletion.
6 changes: 5 additions & 1 deletion tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from singer_sdk import SQLConnector, SQLStream
from singer_sdk import typing as th
from singer_sdk.helpers._typing import TypeConformanceLevel
from sqlalchemy import nullsfirst
from sqlalchemy.engine import Engine
from sqlalchemy.engine.reflection import Inspector

Expand Down Expand Up @@ -225,7 +226,10 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
query = table.select()
if self.replication_key:
replication_key_col = table.columns[self.replication_key]
query = query.order_by(replication_key_col)

# Nulls first because the default is to have nulls as the "highest" value
# which incorrectly causes the tap to attempt to store null state.
query = query.order_by(nullsfirst(replication_key_col))

start_val = self.get_starting_replication_key_value(context)
if start_val:
Expand Down
116 changes: 116 additions & 0 deletions tests/test_replication_key.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
"""Tests standard tap features using the built-in SDK tests library."""
import copy
import json

import pendulum
import sqlalchemy
from singer_sdk.testing.runners import TapTestRunner
from singer_sdk.testing.templates import TapTestTemplate
from sqlalchemy import Column, MetaData, String, Table
from sqlalchemy.dialects.postgresql import TIMESTAMP

from tap_postgres.tap import TapPostgres

Expand Down Expand Up @@ -42,6 +47,117 @@ def replication_key_test(tap, table_name):
tap.sync_all()


def test_null_replication_key_with_start_date():
"""Null replication keys cause weird behavior. Check for appropriate handling.
If a start date is provided, only non-null records with an replication key value
greater than the start date should be synced.
"""
table_name = "test_null_replication_key_with_start_date"
engine = sqlalchemy.create_engine(SAMPLE_CONFIG["sqlalchemy_url"])

metadata_obj = MetaData()
table = Table(
table_name,
metadata_obj,
Column("data", String()),
Column("updated_at", TIMESTAMP),
)
with engine.connect() as conn:
if table.exists(conn):
table.drop(conn)
metadata_obj.create_all(conn)
insert = table.insert().values(
data="Alpha", updated_at=pendulum.datetime(2022, 10, 20).to_iso8601_string()
)
conn.execute(insert)
insert = table.insert().values(
data="Bravo", updated_at=pendulum.datetime(2022, 11, 20).to_iso8601_string()
)
conn.execute(insert)
insert = table.insert().values(data="Zulu", updated_at=None)
conn.execute(insert)
tap = TapPostgres(config=SAMPLE_CONFIG)
tap_catalog = json.loads(tap.catalog_json_text)
altered_table_name = f"public-{table_name}"
for stream in tap_catalog["streams"]:
if stream.get("stream") and altered_table_name not in stream["stream"]:
for metadata in stream["metadata"]:
metadata["metadata"]["selected"] = False
else:
stream["replication_key"] = "updated_at"
for metadata in stream["metadata"]:
metadata["metadata"]["selected"] = True
if metadata["breadcrumb"] == []:
metadata["metadata"]["replication-method"] = "INCREMENTAL"
metadata["metadata"]["replication-key"] = "updated_at"

test_runner = TapTestRunner(
tap_class=TapPostgres,
config=SAMPLE_CONFIG,
catalog=tap_catalog,
)
test_runner.sync_all()
assert len(test_runner.records[altered_table_name]) == 1 # Only record Bravo.


def test_null_replication_key_without_start_date():
"""Null replication keys cause weird behavior. Check for appropriate handling.
If a start date is not provided, sync all records, including those with a null value
for their replication key.
"""
table_name = "test_null_replication_key_without_start_date"

modified_config = copy.deepcopy(SAMPLE_CONFIG)
modified_config["start_date"] = None
engine = sqlalchemy.create_engine(modified_config["sqlalchemy_url"])

metadata_obj = MetaData()
table = Table(
table_name,
metadata_obj,
Column("data", String()),
Column("updated_at", TIMESTAMP),
)
with engine.connect() as conn:
if table.exists(conn):
table.drop(conn)
metadata_obj.create_all(conn)
insert = table.insert().values(
data="Alpha", updated_at=pendulum.datetime(2022, 10, 20).to_iso8601_string()
)
conn.execute(insert)
insert = table.insert().values(
data="Bravo", updated_at=pendulum.datetime(2022, 11, 20).to_iso8601_string()
)
conn.execute(insert)
insert = table.insert().values(data="Zulu", updated_at=None)
conn.execute(insert)
tap = TapPostgres(config=modified_config)
tap_catalog = json.loads(tap.catalog_json_text)
altered_table_name = f"public-{table_name}"
for stream in tap_catalog["streams"]:
if stream.get("stream") and altered_table_name not in stream["stream"]:
for metadata in stream["metadata"]:
metadata["metadata"]["selected"] = False
else:
stream["replication_key"] = "updated_at"
for metadata in stream["metadata"]:
metadata["metadata"]["selected"] = True
if metadata["breadcrumb"] == []:
metadata["metadata"]["replication-method"] = "INCREMENTAL"
metadata["metadata"]["replication-key"] = "updated_at"

test_runner = TapTestRunner(
tap_class=TapPostgres,
config=modified_config,
catalog=tap_catalog,
)
test_runner.sync_all()
assert len(test_runner.records[altered_table_name]) == 3 # All three records.


class TapTestReplicationKey(TapTestTemplate):
name = "replication_key"
table_name = TABLE_NAME
Expand Down

0 comments on commit bcc0db1

Please sign in to comment.