Skip to content

Commit

Permalink
fix(targets): Handle missing record properties in SQL sinks
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Jul 20, 2023
1 parent 384cff7 commit db5cb7e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 7 deletions.
20 changes: 13 additions & 7 deletions singer_sdk/sinks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,15 +322,21 @@ def bulk_insert_records(
if isinstance(insert_sql, str):
insert_sql = sqlalchemy.text(insert_sql)

conformed_records = (
[self.conform_record(record) for record in records]
if isinstance(records, list)
else (self.conform_record(record) for record in records)
)
conformed_records = [self.conform_record(record) for record in records]
property_names = list(self.conform_schema(schema)["properties"].keys())

# Create new record dicts with missing properties filled in with None
new_records = [
{name: record.get(name, None) for name in property_names}
for record in conformed_records
]

self.logger.info("Inserting with SQL: %s", insert_sql)

with self.connector._connect() as conn, conn.begin():
conn.execute(insert_sql, conformed_records)
return len(conformed_records) if isinstance(conformed_records, list) else None
result = conn.execute(insert_sql, new_records)

return result.rowcount

def merge_upsert_from_table(
self,
Expand Down
29 changes: 29 additions & 0 deletions tests/samples/test_target_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,35 @@ def test_sqlite_column_no_morph(sqlite_sample_target: SQLTarget):
target_sync_test(sqlite_sample_target, input=StringIO(tap_output_b), finalize=True)


def test_record_with_missing_properties(
sqlite_sample_target: SQLTarget,
):
"""Test handling of records with missing properties."""
tap_output = "\n".join(
json.dumps(msg)
for msg in [
{
"type": "SCHEMA",
"stream": "test_stream",
"schema": {
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
},
},
"key_properties": ["id"],
},
{
"type": "RECORD",
"stream": "test_stream",
"record": {"id": 1},
},
]
)
target_sync_test(sqlite_sample_target, input=StringIO(tap_output), finalize=True)


@pytest.mark.parametrize(
"stream_name,schema,key_properties,expected_dml",
[
Expand Down

0 comments on commit db5cb7e

Please sign in to comment.