Skip to content

Commit

Permalink
feat: Added SQLConnector.prepare_primary_key
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Oct 21, 2024
1 parent 5843a0b commit 06f54d0
Showing 1 changed file with 44 additions and 0 deletions.
44 changes: 44 additions & 0 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,50 @@ def prepare_table(
self.to_sql_type(property_def),
)

self.prepare_primary_key(
full_table_name=full_table_name,
primary_keys=primary_keys,
)

def prepare_primary_key(
self,
*,
full_table_name: str | FullyQualifiedName,
primary_keys: t.Sequence[str],
) -> None:
"""Adapt target table primary key to provided schema if possible.
Args:
full_table_name: the target table name.
primary_keys: list of key properties.
"""
_, schema_name, table_name = self.parse_full_table_name(full_table_name)
meta = sa.MetaData(schema=schema_name)
meta.reflect(bind=self._engine, only=[table_name])
table = meta.tables[f"{schema_name}.{table_name}"]
current_pk_cols = [col.name for col in table.primary_key.columns]

# Nothing to do
if current_pk_cols == primary_keys:
return

new_pk = sa.PrimaryKeyConstraint(*primary_keys)

# If table has no primary key, add the provided one
if not current_pk_cols:
with self._connect() as conn, conn.begin():
conn.execute(sa.schema.AddConstraint(new_pk))
return

# Drop the existing primary key
with self._connect() as conn, conn.begin():
conn.execute(sa.schema.DropConstraint(table.primary_key))

# Add the new primary key
if primary_keys:
with self._connect() as conn, conn.begin():
conn.execute(sa.schema.AddConstraint(new_pk))

def prepare_column(
self,
full_table_name: str | FullyQualifiedName,
Expand Down

0 comments on commit 06f54d0

Please sign in to comment.