From 6576f5276e46fc634806ceaa25b71fc7a93a94c3 Mon Sep 17 00:00:00 2001 From: "Edgar R. M" Date: Wed, 26 Jul 2023 14:00:32 -0600 Subject: [PATCH] fix(targets): Handle missing record properties in SQL sinks (#1865) * fix(targets): Handle missing record properties in SQL sinks * Update singer_sdk/sinks/sql.py * Add test to target suite --- singer_sdk/sinks/sql.py | 14 +++++++-- singer_sdk/testing/suites.py | 2 ++ .../record_missing_fields.singer | 4 +++ singer_sdk/testing/target_tests.py | 6 ++++ tests/samples/test_target_sqlite.py | 29 +++++++++++++++++++ 5 files changed, 53 insertions(+), 2 deletions(-) create mode 100644 singer_sdk/testing/target_test_streams/record_missing_fields.singer diff --git a/singer_sdk/sinks/sql.py b/singer_sdk/sinks/sql.py index 02fd307b9..9dac99b1e 100644 --- a/singer_sdk/sinks/sql.py +++ b/singer_sdk/sinks/sql.py @@ -323,10 +323,20 @@ def bulk_insert_records( insert_sql = sqlalchemy.text(insert_sql) conformed_records = [self.conform_record(record) for record in records] + property_names = list(self.conform_schema(schema)["properties"].keys()) + + # Create new record dicts with missing properties filled in with None + new_records = [ + {name: record.get(name) for name in property_names} + for record in conformed_records + ] + self.logger.info("Inserting with SQL: %s", insert_sql) + with self.connector._connect() as conn, conn.begin(): - conn.execute(insert_sql, conformed_records) - return len(conformed_records) if isinstance(conformed_records, list) else None + result = conn.execute(insert_sql, new_records) + + return result.rowcount def merge_upsert_from_table( self, diff --git a/singer_sdk/testing/suites.py b/singer_sdk/testing/suites.py index 0f8a9fabe..d795cf153 100644 --- a/singer_sdk/testing/suites.py +++ b/singer_sdk/testing/suites.py @@ -37,6 +37,7 @@ TargetOptionalAttributes, TargetRecordBeforeSchemaTest, TargetRecordMissingKeyProperty, + TargetRecordMissingOptionalFields, TargetSchemaNoProperties, TargetSchemaUpdates, TargetSpecialCharsInAttributes, @@ -103,6 +104,7 @@ class TestSuite: TargetOptionalAttributes, TargetRecordBeforeSchemaTest, TargetRecordMissingKeyProperty, + TargetRecordMissingOptionalFields, TargetSchemaNoProperties, TargetSchemaUpdates, TargetSpecialCharsInAttributes, diff --git a/singer_sdk/testing/target_test_streams/record_missing_fields.singer b/singer_sdk/testing/target_test_streams/record_missing_fields.singer new file mode 100644 index 000000000..a398f6bd6 --- /dev/null +++ b/singer_sdk/testing/target_test_streams/record_missing_fields.singer @@ -0,0 +1,4 @@ +{"type": "SCHEMA", "stream": "record_missing_fields", "key_properties": ["id"], "schema": {"type": "object", "properties": {"id": {"type": "integer"}, "optional": {"type": "string"}}, "required": ["id"]}} +{"type": "RECORD", "stream": "record_missing_fields", "record": {"id": 1, "optional": "now you see me"}} +{"type": "RECORD", "stream": "record_missing_fields", "record": {"id": 2}} +{"type": "STATE", "value": {}} diff --git a/singer_sdk/testing/target_tests.py b/singer_sdk/testing/target_tests.py index 8412329c5..96e0b0d59 100644 --- a/singer_sdk/testing/target_tests.py +++ b/singer_sdk/testing/target_tests.py @@ -139,3 +139,9 @@ class TargetSpecialCharsInAttributes(TargetFileTestTemplate): """Test Target handles special chars in attributes.""" name = "special_chars_in_attributes" + + +class TargetRecordMissingOptionalFields(TargetFileTestTemplate): + """Test Target handles record missing optional fields.""" + + name = "record_missing_fields" diff --git a/tests/samples/test_target_sqlite.py b/tests/samples/test_target_sqlite.py index a7ca3b3c5..abf1bccaf 100644 --- a/tests/samples/test_target_sqlite.py +++ b/tests/samples/test_target_sqlite.py @@ -396,6 +396,35 @@ def test_sqlite_column_no_morph(sqlite_sample_target: SQLTarget): target_sync_test(sqlite_sample_target, input=StringIO(tap_output_b), finalize=True) +def test_record_with_missing_properties( + sqlite_sample_target: SQLTarget, +): + """Test handling of records with missing properties.""" + tap_output = "\n".join( + json.dumps(msg) + for msg in [ + { + "type": "SCHEMA", + "stream": "test_stream", + "schema": { + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + }, + }, + "key_properties": ["id"], + }, + { + "type": "RECORD", + "stream": "test_stream", + "record": {"id": 1}, + }, + ] + ) + target_sync_test(sqlite_sample_target, input=StringIO(tap_output), finalize=True) + + @pytest.mark.parametrize( "stream_name,schema,key_properties,expected_dml", [