diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c1486c6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,169 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +*client-config.json +*config-secret +*env_vars.bash +temp.py +.vscode/ +*.csv +*.pickle +*.txt diff --git a/README.md b/README.md new file mode 100644 index 0000000..b51d275 --- /dev/null +++ b/README.md @@ -0,0 +1,127 @@ +**_THIS IS A MIRORED VERSION OF AN INTERNAL PRODUCTION REPOSITORY LAST UPDATED 2024-07-30_** + +# db_qa_diff +QA to compare records between database tables + +Expect this to take roughly 10 seconds per 50,000 records +## Installation + +```bash +pip install git+https://github.com/CityOfPhiladelphia/db_qa_diff.git +``` +### Additional Requirements +`pip install` the following: +* python database adapters for the specific databases poviders in use such as `psycopg` for PostgreSQL, `cx_oracle` for Oracle, etc. +* `citygeo_secrets` recommended for set-up - see [here](https://github.com/CityOfPhiladelphia/citygeo_secrets) + +## Usage + +```python +import db_qa_diff +import sqlalchemy as sa +import citygeo_secrets as cgs + +# Setup +def create_postgresql_engine(creds: dict, schema_secret_name: str, host_secret_name: str) -> sa.Engine: + '''Compose the URL object, create Postgresql engine, and test connection + - schema_secret_name: Name of secret that contains schema information. Used to make this function dynamic + - host_secret_name: Name of secret that contains host information. Used to make this function dynamic''' + + db_creds = creds[host_secret_name] + schema_creds = creds[schema_secret_name] + url_object = sa.URL.create( + drivername='postgresql+psycopg', # must already pip install psycopg + username=schema_creds['login'], + password=schema_creds['password'], + host=db_creds['host'], + port=db_creds['port'], + database=db_creds['database'] + ) + engine = sa.create_engine(url_object) + engine.connect() # Test connection to ensure correct credentials, as SQLAlchemy uses lazy initialization + return engine + + +def create_oracle_engine(creds: dict, schema_secret_name: str, host_secret_name: str) -> sa.Engine: + '''Compose the URL object, create Oracle engine, and test connection + - schema_secret_name: Name of secret that contains schema information. Used to make this function dynamic + - host_secret_name: Name of secret that contains host information. Used to make this function dynamic''' + + db_creds = creds[schema_secret_name] + creds_host = creds[host_secret_name] + url_object = sa.URL.create( + drivername='oracle+cx_oracle', # must already pip install cx_oracle + username=db_creds['login'], + password=db_creds['password'], + host=creds_host['host']['hostName'], + port=creds_host['host']['port'], + database=creds_host['database'] + ) + engine = sa.create_engine(url_object) + engine.connect() + return engine + +cgs.set_config(keeper_dir="~") # Set to directory containing `client_config.json` + +engine_oracle_ais_sources = cgs.connect_with_secrets(create_oracle_engine, + "databridge-oracle/hostname", "GIS_AIS_SOURCES", + host_secret_name="databridge-oracle/hostname", schema_secret_name="GIS_AIS_SOURCES" + ) +engine_postgresql_ais_sources = cgs.connect_with_secrets(create_postgresql_engine, + "databridge-v2/hostname-testing", "databridge-v2/ais_sources", + host_secret_name="databridge-v2/hostname-testing", schema_secret_name="databridge-v2/ais_sources" + ) + +# Actual package use +db_qa_diff.recorddiff( + engine_oracle_ais_sources, engine_postgresql_ais_sources, + "usps_cityzip", "usps_alias", "usps_zip4s", + ignore_all=['objectid'], + ignore_cols={'usps_cityzip': 'cityname'} + ) + +# Output printed to stdout +``` + +### Output +```bash +# Example output printed to console + +Table: usps_cityzip +1 newly appear in (0.5% of 184 rows) +# python list of dicts + +6 disappear from (3.2% of 189 rows) +# python list of dicts +... + +Lap elapsed time: 5 second(s) +``` + +## Functions +**db_qa_diff.recorddiff**(_engine1_, _engine2_, _*tables_, + _ignore_all_ = [], _ignore_cols_ = {}): + +Compare rows between similarly named tables in two different databases. Does not account for indices, primary keys, or other table artifacts + +Parameters: +* _engine1_: sqlalchemy.Engine + * An SQLAlchemy Engine +* _engine2_: sqlalchemy.Engine + * An SQLAlchemy Engine +* _*tables_: str | tuple[str, str] + * Names of tables to compare. They can take the form of + * `'table1', 'table2', ...` if the tables have the same names, or + * `('engine1_table1', 'engine2_table1'), ('engine1_table2', 'engine2_table2'), ...` if the tables have different names between databases + * Specify a particular schema with the syntax + * `"."` +* _ignore_all_: list[str] = [] + * A list of columns to ignore when comparing tables passed with _*tables_. Frequently this will be columns such as "objectid" or a floating-point "geometry" field subject to different rounding thresholds between databases. No errors are raised if a table does not contain a column to be ignored. + * This package currently does not understand geometry fields such as "shape", so these must always be ignored. +* _ignore_cols_: dict[str, str | list[str]] = {} + * A dictionary of `{table_name: ["col1", "col2"], table2: ...}` with a list of columns to exclude from the specified table only. + * If comparing two tables with different names, then `ignore_cols` will look for table name matches to either table. + +## Notes +* Oracle does not support temporary tables per session before Oracle 18c. Please use a different database provider for `engine2`, or submit a pull request with manager's approval. This package will make a temporary table in `engine2` using the table in `engine1`. +* For the documentation on SQLAlchemy engines - see [here](https://docs.sqlalchemy.org/en/20/tutorial/engine.html) diff --git a/db_qa_diff/__init__.py b/db_qa_diff/__init__.py new file mode 100644 index 0000000..a12b3bb --- /dev/null +++ b/db_qa_diff/__init__.py @@ -0,0 +1,176 @@ +import sqlalchemy as sa +from .utils import SimpleTimer +import pprint, re +from dataclasses import dataclass, field +from collections.abc import Sequence + +@dataclass +class _Bucket: + '''A bucket class to keep track of everything involved with comparing two tables + in potentially separate databases''' + t1name: str + t2name: str + metadata1: sa.MetaData + metadata2: sa.MetaData + host1: str + host2: str + engine1: sa.Engine + engine2: sa.Engine + table1: sa.Table + table2: sa.Table + table1_in_engine2: sa.Table = field(init=False) # Added later + drop_cols: list[str] + + +def _create_table1_in_engine2(b: _Bucket) -> sa.Table: + '''Create table1 as a TEMP table in engine2 database''' + l = [] + for col in b.table2.c: + if col.name not in b.drop_cols: + l.append(sa.Column(col.name, col.type)) + table1_in_engine2 = sa.Table(f'{b.table1}_table1_in_engine2', + b.metadata2, *l, prefixes=['TEMPORARY']) + table1_in_engine2.create(bind=b.engine2) + return table1_in_engine2 + + +def _copy_table1_to_engine2(b: _Bucket, conn1: sa.Connection, conn2: sa.Connection): + '''Batch INSERT data from table1 in engine1 to the temp table in engine2''' + n = 15000 + rows_inserted = 0 + print(f'\n{"*" * 80}') + print(f'Transferring {b.t1name} from {b.host1} to TEMP table in {b.host2}') + stmt = sa.select(sa.func.count()).select_from(b.table1) + result = conn1.execute(stmt) + row_count = result.scalar_one_or_none() + + conn1.execution_options(yield_per=n) + stmt = sa.select(b.table1) + result = conn1.execute(stmt) + for partition in result.mappings().partitions(): + stmt2 = sa.insert(b.table1_in_engine2) + conn2.execute(stmt2, partition) + rows_inserted = rows_inserted + len(partition) + print(f'... transferred {rows_inserted:,d} of {row_count:,d} - {rows_inserted / row_count:.1%}') + print() + + +def _create_drop_cols(ignore_all: list[str], ignore_cols: dict, entry: str) -> list[str]: + '''Create the columns to ignore from all tables and + the columns to ignore from this specific table''' + drop_cols = [l.lower() for l in ignore_all] + extend_cols = [] + if isinstance(entry, str): + entry = [entry] + for k, v in ignore_cols.items(): + for table in entry: + if k == table: + if isinstance(v, list) or isinstance(v, tuple): + extend_cols.extend(v) + else: + extend_cols.append(v) + drop_cols.extend(extend_cols) + return drop_cols + + +def _compare_tables(b: _Bucket, conn2: sa.Connection): + '''Compare the two tables now in engine2 with SQL EXCEPT''' + table_2_cols = [col for col in b.table2.c if col.name not in b.drop_cols] + + stmt_appear = sa.select( + *table_2_cols).except_(sa.select(b.table1_in_engine2)) + table1_in_engine2_nrows = conn2.execute( + sa.select(sa.func.count()).select_from(b.table1_in_engine2)).scalar_one() + rv_appear = conn2.execute(stmt_appear) + appear = rv_appear.mappings().fetchmany(5) + + stmt_disappear = (sa.select(b.table1_in_engine2).except_( + sa.select(*table_2_cols))) + table2_nrows = conn2.execute( + sa.select(sa.func.count()).select_from(b.table2)).scalar_one() + rv_disappear = conn2.execute(stmt_disappear) + disappear = rv_disappear.mappings().fetchmany(5) + + print(f'{b.t2name}: {max(rv_appear.rowcount, 0):,d} newly appear in {b.host2} ({max(rv_appear.rowcount / max(table2_nrows, 1), 0) :.1%} of {table2_nrows:,d} rows)') + if rv_appear.rowcount > 0: + for row in appear: + pprint.pprint(dict(row), sort_dicts=False) + print() + print(f'{b.t1name}: {max(rv_disappear.rowcount, 0):,d} disappear from {b.host1} ({max(rv_disappear.rowcount / max(table1_in_engine2_nrows, 1), 0) :.1%} of {table1_in_engine2_nrows:,d} rows)') + if rv_disappear.rowcount > 0: + for row in disappear: + pprint.pprint(dict(row), sort_dicts=False) + + +def recorddiff(engine1: sa.Engine, engine2: sa.Engine, + *tables: str | tuple[str, str], + ignore_all: list[str] = [], ignore_cols: dict[str, str | list[str]] = {}): + '''Compare rows between similarly named tables in two different databases + + Does not account for indices, primary keys, or other table artifacts + - `engine1`: SQLAlchemy Engine + - `engine2`: SQLAlchemy Engine (must not be Oracle) + - `tables`: Names of tables to compare. They can take the form of + - `'table1', 'table2', ...` if the tables have the same names, or + - `[('engine1_table1', 'engine2_table1'), ('engine1_table2', 'engine2_table2'), ...]` if the tables have different names between databases + - `ignore_all`: List of columns to ignore across all tables in comparison + - `ignore_cols`: Dict of `{table: [list of columns]}` to ignore only in a specific table + + It is advised to put the older database first, i.e. if you are comparing an + older Oracle database and a modern postgresql database, then `engine1` should be + Oracle while `engine2` should be postgresql. This package will make a table in + `engine2` using the table in `engine1`, meaning you may run into name length + overflow errors if `engine2` is an older database.''' + if engine2.name == 'oracle': + raise NotImplementedError("Oracle does not support temporary tables per session before Oracle 18c. Please use a different database provider for engine2, or submit a pull request with manager's approval.") + timer = SimpleTimer() + metadata1 = sa.MetaData() + metadata2 = sa.MetaData() + assert isinstance(ignore_all, list), "ignore_all is not a list!" + assert isinstance(ignore_cols, dict), "ignore_cols is not a dict!" + + for entry in tables: + timer.start_lap() + if isinstance(entry, str): + t1name = entry + t2name = entry + elif isinstance(entry, Sequence): + assert len(entry) == 2 + t1name, t2name = entry[0], entry[1] + else: + raise TypeError(f'Type of {entry} ({type(entry)}) not accepted. Must be str or sequence with length 2.') + + regex_pattern = '^(?:(?P\w+)\.)?(?P\w+)$' + try: + m1 = re.match(regex_pattern, t1name) + schema_extract1 = m1['schema'] + table_extract1 = m1['table'] + table1 = sa.Table(table_extract1, metadata1, schema=schema_extract1, autoload_with=engine1) + host1 = engine1.url.host + except sa.exc.NoSuchTableError as e: + print(f'\nTable "{t1name}" not found in {engine1.url}\n') + raise e + try: + m2 = re.match(regex_pattern, t2name) + schema_extract2 = m2['schema'] + table_extract2 = m2['table'] + table2 = sa.Table(table_extract2, metadata2, schema=schema_extract2, autoload_with=engine2) + host2 = engine2.url.host + except sa.exc.NoSuchTableError as e: + print(f'\nTable {t2name} not found in {engine2.url}\n') + raise e + + drop_cols = _create_drop_cols(ignore_all, ignore_cols, entry) + + b = _Bucket(t1name, t2name, metadata1, metadata2, host1, host2, engine1, engine2, + table1, table2, drop_cols) + b.table1_in_engine2 = _create_table1_in_engine2(b) + # To see what b looks like now, use pprint.pprint(b) + + with engine1.begin() as conn1, engine2.begin() as conn2: + _copy_table1_to_engine2(b, conn1, conn2) + _compare_tables(b, conn2) + + timer.end_lap() + + timer.end() diff --git a/db_qa_diff/utils.py b/db_qa_diff/utils.py new file mode 100644 index 0000000..c8abc48 --- /dev/null +++ b/db_qa_diff/utils.py @@ -0,0 +1,69 @@ +import sqlalchemy as sa, time + +def print_stmt(stmt: sa.sql, rowcount = None): + '''Print out an SQLAlchemy statement''' + print(stmt, '\n') + if rowcount != None and rowcount >= 0: + print(f'Rows affected: {rowcount:,}\n') + + +def print_petl(nrows: int, table): + '''Print a petl table if row count > 0''' + if nrows != 0: + looked_table = table.look() + print(looked_table) + else: + print(f'\t--') + + +class SimpleTimer(): + '''A simple timer, with ability to measure a "lap" + ``` + timer = SimpleTimer() + for x in range(3): + timer.start_lap() + # run some code + timer.end_lap() + timer.end() + ``` + ''' + one_minute = 60 + one_hour = one_minute * 60 + one_day = one_hour * 24 + + def __init__(self): + self._start = time.time() + self._lap_start = None + + def _format_elapsed(self, elapsed): + days = elapsed // self.one_day + remainder = elapsed - (days * self.one_day) + hours = remainder // self.one_hour + remainder = remainder - (hours * self.one_hour) + minutes = remainder // self.one_minute + remainder = remainder - (minutes * self.one_minute) + seconds = remainder % self.one_minute + + if days > 0: + return f'{days:.0f} day(s), {hours:.0f} hour(s), {minutes:.0f} minute(s), and {seconds:.0f} second(s)' + elif hours > 0: + return f'{hours:.0f} hour(s), {minutes:.0f} minute(s), and {seconds:.0f} second(s)' + elif minutes > 0: + return f'{minutes:.0f} minute(s) and {seconds:.0f} second(s)' + else: + return f'{seconds:.0f} second(s)' + + def start_lap(self): + '''Begin recording a new "lap"''' + self._lap_start = time.time() + + def end_lap(self): + '''Calculate elapsed time of most recent lap''' + assert self._lap_start != None, "SimpleTimer lap ended before being started" + elapsed = time.time() - self._lap_start + print(f'Lap elapsed time: {self._format_elapsed(elapsed)}') + + def end(self): + '''Calculate elapsed time of timer''' + elapsed = time.time() - self._start + print(f'Timer elapsed time: {self._format_elapsed(elapsed)}') diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e326636 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,17 @@ +# Documentation on setuptools & pyproject.toml - https://setuptools.pypa.io/en/latest/userguide/index.html +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" + +[project] +name = "db_qa_diff" +version = "2.2.0" +description = "QA to compare records between database tables" +readme = "README.md" +authors = [ + {name = "James Midkiff", email = "james.midkiff@phila.gov"} + ] +requires-python = ">=3.7" +dependencies = [ + "sqlalchemy>=2.0,<3.0", + ]