Skip to content

Commit

Permalink
[BUGFIX] Metric table.column_type should properly evaluate for Post…
Browse files Browse the repository at this point in the history
…gres (#10793)

Co-authored-by: Thu Pham <thu.pham@greatexpectations.io>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 19, 2024
1 parent 17c2383 commit ab0a7f7
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
)

strict_suite.add_expectation(
gxe.ExpectColumnValuesToBeOfType(column="transfer_amount", type_="DOUBLE_PRECISION")
gxe.ExpectColumnValuesToBeOfType(column="transfer_amount", type_="DOUBLE PRECISION")
)

strict_results = batch.validate(strict_suite)
Expand All @@ -85,7 +85,7 @@

relaxed_suite.add_expectation(
gxe.ExpectColumnValuesToBeInTypeList(
column="transfer_amount", type_list=["DOUBLE_PRECISION", "STRING"]
column="transfer_amount", type_list=["DOUBLE PRECISION", "STRING"]
)
)

Expand Down
78 changes: 78 additions & 0 deletions great_expectations/compatibility/postgresql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from __future__ import annotations

from great_expectations.compatibility.not_imported import NotImported

POSTGRESQL_NOT_IMPORTED = NotImported(
"postgresql connection components are not installed, please 'pip install psycopg2'"
)

try:
import psycopg2 # noqa: F401
import sqlalchemy.dialects.postgresql as postgresqltypes
except ImportError:
postgresqltypes = POSTGRESQL_NOT_IMPORTED # type: ignore[assignment]

try:
from sqlalchemy.dialects.postgresql import TEXT
except (ImportError, AttributeError):
TEXT = POSTGRESQL_NOT_IMPORTED # type: ignore[misc, assignment]

try:
from sqlalchemy.dialects.postgresql import CHAR
except (ImportError, AttributeError):
CHAR = POSTGRESQL_NOT_IMPORTED # type: ignore[misc, assignment]

try:
from sqlalchemy.dialects.postgresql import INTEGER
except (ImportError, AttributeError):
INTEGER = POSTGRESQL_NOT_IMPORTED # type: ignore[misc, assignment]

try:
from sqlalchemy.dialects.postgresql import SMALLINT
except (ImportError, AttributeError):
SMALLINT = POSTGRESQL_NOT_IMPORTED # type: ignore[misc, assignment]

try:
from sqlalchemy.dialects.postgresql import BIGINT
except (ImportError, AttributeError):
BIGINT = POSTGRESQL_NOT_IMPORTED # type: ignore[misc, assignment]

try:
from sqlalchemy.dialects.postgresql import TIMESTAMP
except (ImportError, AttributeError):
TIMESTAMP = POSTGRESQL_NOT_IMPORTED # type: ignore[misc, assignment]

try:
from sqlalchemy.dialects.postgresql import DATE
except (ImportError, AttributeError):
DATE = POSTGRESQL_NOT_IMPORTED # type: ignore[misc, assignment]

try:
from sqlalchemy.dialects.postgresql import DOUBLE_PRECISION
except (ImportError, AttributeError):
DOUBLE_PRECISION = POSTGRESQL_NOT_IMPORTED # type: ignore[misc, assignment]

try:
from sqlalchemy.dialects.postgresql import BOOLEAN
except (ImportError, AttributeError):
BOOLEAN = POSTGRESQL_NOT_IMPORTED # type: ignore[misc, assignment]

try:
from sqlalchemy.dialects.postgresql import NUMERIC
except (ImportError, AttributeError):
NUMERIC = POSTGRESQL_NOT_IMPORTED # type: ignore[misc, assignment]


class POSTGRESQL_TYPES:
"""Namespace for PostgreSQL dialect types."""

TEXT = TEXT
CHAR = CHAR
INTEGER = INTEGER
SMALLINT = SMALLINT
BIGINT = BIGINT
TIMESTAMP = TIMESTAMP
DATE = DATE
DOUBLE_PRECISION = DOUBLE_PRECISION
BOOLEAN = BOOLEAN
NUMERIC = NUMERIC
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,11 @@ def _validate_pandas( # noqa: C901, PLR0912
def _validate_sqlalchemy(self, actual_column_type, expected_types_list, execution_engine):
if expected_types_list is None:
success = True
elif execution_engine.dialect_name in [GXSqlDialect.SNOWFLAKE, GXSqlDialect.DATABRICKS]:
elif execution_engine.dialect_name in [
GXSqlDialect.DATABRICKS,
GXSqlDialect.POSTGRESQL,
GXSqlDialect.SNOWFLAKE,
]:
success = isinstance(actual_column_type, str) and any(
actual_column_type.lower() == expected_type.lower()
for expected_type in expected_types_list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,11 @@ def _validate_sqlalchemy(self, actual_column_type, expected_type, execution_engi

if expected_type is None:
success = True
elif execution_engine.dialect_name in [GXSqlDialect.SNOWFLAKE, GXSqlDialect.DATABRICKS]:
elif execution_engine.dialect_name in [
GXSqlDialect.DATABRICKS,
GXSqlDialect.POSTGRESQL,
GXSqlDialect.SNOWFLAKE,
]:
success = (
isinstance(actual_column_type, str)
and actual_column_type.lower() == expected_type.lower()
Expand Down
13 changes: 10 additions & 3 deletions great_expectations/expectations/metrics/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ def __getitem__(self, key: Any) -> Any:
return item


def get_sqlalchemy_column_metadata( # noqa: C901
def get_sqlalchemy_column_metadata( # noqa: C901, PLR0912
execution_engine: SqlAlchemyExecutionEngine,
table_selectable: sqlalchemy.Select,
schema_name: Optional[str] = None,
Expand Down Expand Up @@ -414,11 +414,18 @@ def get_sqlalchemy_column_metadata( # noqa: C901
)

dialect_name = execution_engine.dialect.name
if dialect_name in [GXSqlDialect.SNOWFLAKE, GXSqlDialect.DATABRICKS]:
if dialect_name in [
GXSqlDialect.DATABRICKS,
GXSqlDialect.POSTGRESQL,
GXSqlDialect.SNOWFLAKE,
]:
# WARNING: Do not alter columns in place, as they are cached on the inspector
columns_copy = [column.copy() for column in columns]
for column in columns_copy:
column["type"] = column["type"].compile(dialect=execution_engine.dialect)
if column.get("type"):
# When using column_reflection_fallback, we might not be able to
# extract the column type, and only have the column name
column["type"] = column["type"].compile(dialect=execution_engine.dialect)
if dialect_name == GXSqlDialect.SNOWFLAKE:
return [
# TODO: SmartColumn should know the dialect and do lookups based on that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import great_expectations.expectations as gxe
from great_expectations.compatibility.databricks import DATABRICKS_TYPES
from great_expectations.compatibility.postgresql import POSTGRESQL_TYPES
from great_expectations.compatibility.snowflake import SNOWFLAKE_TYPES
from great_expectations.compatibility.sqlalchemy import (
sqlalchemy as sa,
Expand All @@ -19,6 +20,7 @@
from tests.integration.test_utils.data_source_config import (
DatabricksDatasourceTestConfig,
PandasDataFrameDatasourceTestConfig,
PostgreSQLDatasourceTestConfig,
SnowflakeDatasourceTestConfig,
)

Expand Down Expand Up @@ -386,6 +388,110 @@ def test_success_complete_snowflake(
assert result_dict["observed_value"] in expectation.type_list


@pytest.mark.parametrize(
"expectation",
[
pytest.param(
gxe.ExpectColumnValuesToBeInTypeList(column="CHAR", type_list=["CHAR", "CHAR(1)"]),
id="CHAR",
),
pytest.param(
gxe.ExpectColumnValuesToBeInTypeList(column="TEXT", type_list=["TEXT"]),
id="TEXT",
),
pytest.param(
gxe.ExpectColumnValuesToBeInTypeList(column="INTEGER", type_list=["INTEGER"]),
id="INTEGER",
),
pytest.param(
gxe.ExpectColumnValuesToBeInTypeList(column="SMALLINT", type_list=["SMALLINT"]),
id="SMALLINT",
),
pytest.param(
gxe.ExpectColumnValuesToBeInTypeList(column="BIGINT", type_list=["BIGINT"]),
id="BIGINT",
),
pytest.param(
gxe.ExpectColumnValuesToBeInTypeList(
column="TIMESTAMP", type_list=["TIMESTAMP", "TIMESTAMP WITHOUT TIME ZONE"]
),
id="TIMESTAMP",
),
pytest.param(
gxe.ExpectColumnValuesToBeInTypeList(column="DATE", type_list=["DATE"]),
id="DATE",
),
pytest.param(
gxe.ExpectColumnValuesToBeInTypeList(
column="DOUBLE_PRECISION", type_list=["DOUBLE PRECISION"]
),
id="DOUBLE_PRECISION",
),
pytest.param(
gxe.ExpectColumnValuesToBeInTypeList(column="BOOLEAN", type_list=["BOOLEAN"]),
id="BOOLEAN",
),
pytest.param(
gxe.ExpectColumnValuesToBeInTypeList(column="NUMERIC", type_list=["NUMERIC"]),
id="NUMERIC",
),
],
)
@parameterize_batch_for_data_sources(
data_source_configs=[
PostgreSQLDatasourceTestConfig(
column_types={
"CHAR": POSTGRESQL_TYPES.CHAR,
"TEXT": POSTGRESQL_TYPES.TEXT,
"INTEGER": POSTGRESQL_TYPES.INTEGER,
"SMALLINT": POSTGRESQL_TYPES.SMALLINT,
"BIGINT": POSTGRESQL_TYPES.BIGINT,
"TIMESTAMP": POSTGRESQL_TYPES.TIMESTAMP,
"DATE": POSTGRESQL_TYPES.DATE,
"DOUBLE_PRECISION": POSTGRESQL_TYPES.DOUBLE_PRECISION,
"BOOLEAN": POSTGRESQL_TYPES.BOOLEAN,
"NUMERIC": POSTGRESQL_TYPES.NUMERIC,
}
),
],
data=pd.DataFrame(
{
"CHAR": ["a", "b", "c"],
"TEXT": ["a", "b", "c"],
"INTEGER": [1, 2, 3],
"SMALLINT": [1, 2, 3],
"BIGINT": [1, 2, 3],
"TIMESTAMP": [
"2021-01-01 00:00:00",
"2021-01-02 00:00:00",
"2021-01-03 00:00:00",
],
"DATE": [
# Date in isoformat
"2021-01-01",
"2021-01-02",
"2021-01-03",
],
"DOUBLE_PRECISION": [1.0, 2.0, 3.0],
"BOOLEAN": [False, False, True],
"NUMERIC": [1, 2, 3],
},
dtype="object",
),
)
def test_success_complete_postgres(
batch_for_datasource: Batch, expectation: gxe.ExpectColumnValuesToBeInTypeList
) -> None:
result = batch_for_datasource.validate(expectation, result_format=ResultFormat.COMPLETE)
result_dict = result.to_json_dict()["result"]

assert result.success
assert isinstance(result_dict, dict)
assert isinstance(result_dict["observed_value"], str)
assert isinstance(expectation.type_list, list)
assert result_dict["observed_value"] in expectation.type_list


@pytest.mark.parametrize(
"expectation",
[
Expand Down

0 comments on commit ab0a7f7

Please sign in to comment.