diff --git a/docs/apache-airflow-providers-ydb/index.rst b/docs/apache-airflow-providers-ydb/index.rst index 30b8e90d9753..edc9955b12aa 100644 --- a/docs/apache-airflow-providers-ydb/index.rst +++ b/docs/apache-airflow-providers-ydb/index.rst @@ -102,7 +102,8 @@ PIP package Version required ======================================= ================== ``apache-airflow`` ``>=2.8.0`` ``apache-airflow-providers-common-sql`` ``>=1.14.1`` -``ydb`` ``>=3.12.1`` +``ydb`` ``>=3.18.8`` +``ydb-dbapi`` ``>=0.1.0`` ======================================= ================== Cross provider package dependencies diff --git a/docs/apache-airflow-providers-ydb/operators/ydb_operator_howto_guide.rst b/docs/apache-airflow-providers-ydb/operators/ydb_operator_howto_guide.rst index 9416894e5332..5b5e9a8a69d4 100644 --- a/docs/apache-airflow-providers-ydb/operators/ydb_operator_howto_guide.rst +++ b/docs/apache-airflow-providers-ydb/operators/ydb_operator_howto_guide.rst @@ -29,7 +29,7 @@ workflow. Airflow is essentially a graph (Directed Acyclic Graph) made up of tas A task defined or implemented by a operator is a unit of work in your data pipeline. The purpose of this guide is to define tasks involving interactions with a YDB database with -the :class:`~airflow.providers.ydb.operators.YDBExecuteQueryOperator` and :class:`~airflow.providers.ydb.operators.YDBScanQueryOperator`. +the :class:`~airflow.providers.ydb.operators.YDBExecuteQueryOperator`. Common database operations with YDBExecuteQueryOperator ------------------------------------------------------- @@ -162,26 +162,6 @@ by creating a sql file. ) -Executing Scan Queries with YDBScanQueryOperator -------------------------------------------------------- - -YDBScanQueryOperator executes YDB Scan Queries, which designed primarily for running analytical ad hoc queries. Parameters of the operators are: - -- ``sql`` - string with query; -- ``conn_id`` - YDB connection id. Default value is ``ydb_default``; -- ``params`` - parameters to be injected into query if it is Jinja template, more details about :doc:`params ` - -Example of using YDBScanQueryOperator: - -.. code-block:: python - - get_birth_date_scan = YDBScanQueryOperator( - task_id="get_birth_date_scan", - sql="sql/birth_date.sql", - params={"begin_date": "2020-01-01", "end_date": "2020-12-31"}, - ) - - The complete YDB Operator DAG ----------------------------- @@ -196,7 +176,7 @@ When we put everything together, our DAG should look like this: Conclusion ---------- -In this how-to guide we explored the Apache Airflow YDBExecuteQueryOperator and YDBScanQueryOperator to connect to YDB database. Let's quickly highlight the key takeaways. +In this how-to guide we explored the Apache Airflow YDBExecuteQueryOperator to connect to YDB database. Let's quickly highlight the key takeaways. It is best practice to create subdirectory called ``sql`` in your ``dags`` directory where you can store your sql files. This will make your code more elegant and more maintainable. And finally, we looked at the templated version of sql script and usage of ``params`` attribute. diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 865ce0931ea3..1c3de04d2c5d 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -1395,7 +1395,8 @@ "deps": [ "apache-airflow-providers-common-sql>=1.14.1", "apache-airflow>=2.8.0", - "ydb>=3.12.1" + "ydb-dbapi>=0.1.0", + "ydb>=3.18.8" ], "devel-deps": [], "plugins": [], diff --git a/providers/src/airflow/providers/ydb/CHANGELOG.rst b/providers/src/airflow/providers/ydb/CHANGELOG.rst index afb2845e1517..39447306680b 100644 --- a/providers/src/airflow/providers/ydb/CHANGELOG.rst +++ b/providers/src/airflow/providers/ydb/CHANGELOG.rst @@ -27,6 +27,10 @@ Changelog --------- +.. note:: + This release removes YDBScanQueryOperator from this provider package. + At this point, YDBExecuteQueryOperator could load unlimited amount of rows, so no specific operator is needed. + 1.4.0 ..... diff --git a/providers/src/airflow/providers/ydb/hooks/_vendor/__init__.py b/providers/src/airflow/providers/ydb/hooks/_vendor/__init__.py deleted file mode 100644 index 13a83393a912..000000000000 --- a/providers/src/airflow/providers/ydb/hooks/_vendor/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/providers/src/airflow/providers/ydb/hooks/_vendor/dbapi/__init__.py b/providers/src/airflow/providers/ydb/hooks/_vendor/dbapi/__init__.py deleted file mode 100644 index f8fffe7c46f5..000000000000 --- a/providers/src/airflow/providers/ydb/hooks/_vendor/dbapi/__init__.py +++ /dev/null @@ -1,43 +0,0 @@ -from .connection import AsyncConnection, Connection, IsolationLevel # noqa: F401 -from .cursor import AsyncCursor, Cursor, YdbQuery # noqa: F401 -from .errors import ( - DatabaseError, - DataError, - Error, - IntegrityError, - InterfaceError, - InternalError, - NotSupportedError, - OperationalError, - ProgrammingError, - Warning, -) - - -class YdbDBApi: - def __init__(self): - self.paramstyle = "pyformat" - self.threadsafety = 0 - self.apilevel = "1.0" - self._init_dbapi_attributes() - - def _init_dbapi_attributes(self): - for name, value in { - "Warning": Warning, - "Error": Error, - "InterfaceError": InterfaceError, - "DatabaseError": DatabaseError, - "DataError": DataError, - "OperationalError": OperationalError, - "IntegrityError": IntegrityError, - "InternalError": InternalError, - "ProgrammingError": ProgrammingError, - "NotSupportedError": NotSupportedError, - }.items(): - setattr(self, name, value) - - def connect(self, *args, **kwargs) -> Connection: - return Connection(*args, **kwargs) - - def async_connect(self, *args, **kwargs) -> AsyncConnection: - return AsyncConnection(*args, **kwargs) diff --git a/providers/src/airflow/providers/ydb/hooks/_vendor/dbapi/connection.py b/providers/src/airflow/providers/ydb/hooks/_vendor/dbapi/connection.py deleted file mode 100644 index fa0941e99a71..000000000000 --- a/providers/src/airflow/providers/ydb/hooks/_vendor/dbapi/connection.py +++ /dev/null @@ -1,194 +0,0 @@ -import collections.abc -import posixpath -from typing import Any, List, NamedTuple, Optional - -import sqlalchemy.util as util -import ydb - -from .cursor import AsyncCursor, Cursor -from .errors import InterfaceError, InternalError, NotSupportedError - - -class IsolationLevel: - SERIALIZABLE = "SERIALIZABLE" - ONLINE_READONLY = "ONLINE READONLY" - ONLINE_READONLY_INCONSISTENT = "ONLINE READONLY INCONSISTENT" - STALE_READONLY = "STALE READONLY" - SNAPSHOT_READONLY = "SNAPSHOT READONLY" - AUTOCOMMIT = "AUTOCOMMIT" - - -class Connection: - _await = staticmethod(util.await_only) - - _is_async = False - _ydb_driver_class = ydb.Driver - _ydb_session_pool_class = ydb.SessionPool - _ydb_table_client_class = ydb.TableClient - _cursor_class = Cursor - - def __init__( - self, - host: str = "", - port: str = "", - database: str = "", - **conn_kwargs: Any, - ): - self.endpoint = f"grpc://{host}:{port}" - self.database = database - self.conn_kwargs = conn_kwargs - self.credentials = self.conn_kwargs.pop("credentials", None) - self.table_path_prefix = self.conn_kwargs.pop("ydb_table_path_prefix", "") - - if "ydb_session_pool" in self.conn_kwargs: # Use session pool managed manually - self._shared_session_pool = True - self.session_pool: ydb.SessionPool = self.conn_kwargs.pop("ydb_session_pool") - self.driver = ( - self.session_pool._driver - if hasattr(self.session_pool, "_driver") - else self.session_pool._pool_impl._driver - ) - self.driver.table_client = self._ydb_table_client_class(self.driver, self._get_table_client_settings()) - else: - self._shared_session_pool = False - self.driver = self._create_driver() - self.session_pool = self._ydb_session_pool_class(self.driver, size=5) - - self.interactive_transaction: bool = False # AUTOCOMMIT - self.tx_mode: ydb.AbstractTransactionModeBuilder = ydb.SerializableReadWrite() - self.tx_context: Optional[ydb.TxContext] = None - self.use_scan_query: bool = False - - def cursor(self): - return self._cursor_class( - self.driver, self.session_pool, self.tx_mode, self.tx_context, self.use_scan_query, self.table_path_prefix - ) - - def describe(self, table_path: str) -> ydb.TableDescription: - abs_table_path = posixpath.join(self.database, self.table_path_prefix, table_path) - cursor = self.cursor() - return cursor.describe_table(abs_table_path) - - def check_exists(self, table_path: str) -> ydb.SchemeEntry: - abs_table_path = posixpath.join(self.database, self.table_path_prefix, table_path) - cursor = self.cursor() - return cursor.check_exists(abs_table_path) - - def get_table_names(self) -> List[str]: - abs_dir_path = posixpath.join(self.database, self.table_path_prefix) - cursor = self.cursor() - return [posixpath.relpath(path, abs_dir_path) for path in cursor.get_table_names(abs_dir_path)] - - def set_isolation_level(self, isolation_level: str): - class IsolationSettings(NamedTuple): - ydb_mode: ydb.AbstractTransactionModeBuilder - interactive: bool - - ydb_isolation_settings_map = { - IsolationLevel.AUTOCOMMIT: IsolationSettings(ydb.SerializableReadWrite(), interactive=False), - IsolationLevel.SERIALIZABLE: IsolationSettings(ydb.SerializableReadWrite(), interactive=True), - IsolationLevel.ONLINE_READONLY: IsolationSettings(ydb.OnlineReadOnly(), interactive=False), - IsolationLevel.ONLINE_READONLY_INCONSISTENT: IsolationSettings( - ydb.OnlineReadOnly().with_allow_inconsistent_reads(), interactive=False - ), - IsolationLevel.STALE_READONLY: IsolationSettings(ydb.StaleReadOnly(), interactive=False), - IsolationLevel.SNAPSHOT_READONLY: IsolationSettings(ydb.SnapshotReadOnly(), interactive=True), - } - ydb_isolation_settings = ydb_isolation_settings_map[isolation_level] - if self.tx_context and self.tx_context.tx_id: - raise InternalError("Failed to set transaction mode: transaction is already began") - self.tx_mode = ydb_isolation_settings.ydb_mode - self.interactive_transaction = ydb_isolation_settings.interactive - - def get_isolation_level(self) -> str: - if self.tx_mode.name == ydb.SerializableReadWrite().name: - if self.interactive_transaction: - return IsolationLevel.SERIALIZABLE - else: - return IsolationLevel.AUTOCOMMIT - elif self.tx_mode.name == ydb.OnlineReadOnly().name: - if self.tx_mode.settings.allow_inconsistent_reads: - return IsolationLevel.ONLINE_READONLY_INCONSISTENT - else: - return IsolationLevel.ONLINE_READONLY - elif self.tx_mode.name == ydb.StaleReadOnly().name: - return IsolationLevel.STALE_READONLY - elif self.tx_mode.name == ydb.SnapshotReadOnly().name: - return IsolationLevel.SNAPSHOT_READONLY - else: - raise NotSupportedError(f"{self.tx_mode.name} is not supported") - - def set_ydb_scan_query(self, value: bool) -> None: - self.use_scan_query = value - - def get_ydb_scan_query(self) -> bool: - return self.use_scan_query - - def begin(self): - self.tx_context = None - if self.interactive_transaction and not self.use_scan_query: - session = self._maybe_await(self.session_pool.acquire) - self.tx_context = session.transaction(self.tx_mode) - self._maybe_await(self.tx_context.begin) - - def commit(self): - if self.tx_context and self.tx_context.tx_id: - self._maybe_await(self.tx_context.commit) - self._maybe_await(self.session_pool.release, self.tx_context.session) - self.tx_context = None - - def rollback(self): - if self.tx_context and self.tx_context.tx_id: - self._maybe_await(self.tx_context.rollback) - self._maybe_await(self.session_pool.release, self.tx_context.session) - self.tx_context = None - - def close(self): - self.rollback() - if not self._shared_session_pool: - self._maybe_await(self.session_pool.stop) - self._stop_driver() - - @classmethod - def _maybe_await(cls, callee: collections.abc.Callable, *args, **kwargs) -> Any: - if cls._is_async: - return cls._await(callee(*args, **kwargs)) - return callee(*args, **kwargs) - - def _get_table_client_settings(self) -> ydb.TableClientSettings: - return ( - ydb.TableClientSettings() - .with_native_date_in_result_sets(True) - .with_native_datetime_in_result_sets(True) - .with_native_timestamp_in_result_sets(True) - .with_native_interval_in_result_sets(True) - .with_native_json_in_result_sets(False) - ) - - def _create_driver(self): - driver_config = ydb.DriverConfig( - endpoint=self.endpoint, - database=self.database, - table_client_settings=self._get_table_client_settings(), - credentials=self.credentials, - ) - driver = self._ydb_driver_class(driver_config) - try: - self._maybe_await(driver.wait, timeout=5, fail_fast=True) - except ydb.Error as e: - raise InterfaceError(e.message, original_error=e) from e - except Exception as e: - self._maybe_await(driver.stop) - raise InterfaceError(f"Failed to connect to YDB, details {driver.discovery_debug_details()}") from e - return driver - - def _stop_driver(self): - self._maybe_await(self.driver.stop) - - -class AsyncConnection(Connection): - _is_async = True - _ydb_driver_class = ydb.aio.Driver - _ydb_session_pool_class = ydb.aio.SessionPool - _ydb_table_client_class = ydb.aio.table.TableClient - _cursor_class = AsyncCursor diff --git a/providers/src/airflow/providers/ydb/hooks/_vendor/dbapi/constants.py b/providers/src/airflow/providers/ydb/hooks/_vendor/dbapi/constants.py deleted file mode 100644 index a27aef1dd583..000000000000 --- a/providers/src/airflow/providers/ydb/hooks/_vendor/dbapi/constants.py +++ /dev/null @@ -1,218 +0,0 @@ -YDB_KEYWORDS = { - "abort", - "action", - "add", - "after", - "all", - "alter", - "analyze", - "and", - "ansi", - "any", - "array", - "as", - "asc", - "assume", - "async", - "attach", - "autoincrement", - "before", - "begin", - "bernoulli", - "between", - "bitcast", - "by", - "cascade", - "case", - "cast", - "changefeed", - "check", - "collate", - "column", - "columns", - "commit", - "compact", - "conditional", - "conflict", - "constraint", - "consumer", - "cover", - "create", - "cross", - "cube", - "current", - "current_date", - "current_time", - "current_timestamp", - "data", - "database", - "decimal", - "declare", - "default", - "deferrable", - "deferred", - "define", - "delete", - "desc", - "detach", - "disable", - "discard", - "distinct", - "do", - "drop", - "each", - "else", - "empty", - "empty_action", - "encrypted", - "end", - "erase", - "error", - "escape", - "evaluate", - "except", - "exclude", - "exclusion", - "exclusive", - "exists", - "explain", - "export", - "external", - "fail", - "family", - "filter", - "flatten", - "following", - "for", - "foreign", - "from", - "full", - "function", - "glob", - "group", - "grouping", - "groups", - "hash", - "having", - "hop", - "if", - "ignore", - "ilike", - "immediate", - "import", - "in", - "index", - "indexed", - "inherits", - "initially", - "inner", - "insert", - "instead", - "intersect", - "into", - "is", - "isnull", - "join", - "json_exists", - "json_query", - "json_value", - "key", - "left", - "like", - "limit", - "local", - "match", - "natural", - "no", - "not", - "notnull", - "null", - "nulls", - "object", - "of", - "offset", - "on", - "only", - "or", - "order", - "others", - "outer", - "over", - "partition", - "passing", - "password", - "plan", - "pragma", - "preceding", - "presort", - "primary", - "process", - "raise", - "range", - "reduce", - "references", - "regexp", - "reindex", - "release", - "rename", - "replace", - "replication", - "reset", - "respect", - "restrict", - "result", - "return", - "returning", - "revert", - "right", - "rlike", - "rollback", - "rollup", - "row", - "rows", - "sample", - "savepoint", - "schema", - "select", - "semi", - "sets", - "source", - "stream", - "subquery", - "symbols", - "sync", - "system", - "table", - "tablesample", - "tablestore", - "temp", - "temporary", - "then", - "ties", - "to", - "topic", - "transaction", - "trigger", - "type", - "unbounded", - "unconditional", - "union", - "unique", - "unknown", - "update", - "upsert", - "use", - "user", - "using", - "vacuum", - "values", - "view", - "virtual", - "when", - "where", - "window", - "with", - "without", - "wrapper", - "xor", -} diff --git a/providers/src/airflow/providers/ydb/hooks/_vendor/dbapi/cursor.py b/providers/src/airflow/providers/ydb/hooks/_vendor/dbapi/cursor.py deleted file mode 100644 index 22dfdaacb2c0..000000000000 --- a/providers/src/airflow/providers/ydb/hooks/_vendor/dbapi/cursor.py +++ /dev/null @@ -1,389 +0,0 @@ -import collections.abc -import dataclasses -import functools -import hashlib -import itertools -import posixpath -from collections.abc import AsyncIterator -from typing import ( - Any, - Dict, - Generator, - List, - Mapping, - Optional, - Sequence, - Union, -) - -import ydb -import ydb.aio -from sqlalchemy import util - -from .errors import ( - DatabaseError, - DataError, - IntegrityError, - InternalError, - NotSupportedError, - OperationalError, - ProgrammingError, -) - - -def get_column_type(type_obj: Any) -> str: - return str(ydb.convert.type_to_native(type_obj)) - - -@dataclasses.dataclass -class YdbQuery: - yql_text: str - parameters_types: Dict[str, Union[ydb.PrimitiveType, ydb.AbstractTypeBuilder]] = dataclasses.field( - default_factory=dict - ) - is_ddl: bool = False - - -def _handle_ydb_errors(func): - @functools.wraps(func) - def wrapper(*args, **kwargs): - try: - return func(*args, **kwargs) - except (ydb.issues.AlreadyExists, ydb.issues.PreconditionFailed) as e: - raise IntegrityError(e.message, original_error=e) from e - except (ydb.issues.Unsupported, ydb.issues.Unimplemented) as e: - raise NotSupportedError(e.message, original_error=e) from e - except (ydb.issues.BadRequest, ydb.issues.SchemeError) as e: - raise ProgrammingError(e.message, original_error=e) from e - except ( - ydb.issues.TruncatedResponseError, - ydb.issues.ConnectionError, - ydb.issues.Aborted, - ydb.issues.Unavailable, - ydb.issues.Overloaded, - ydb.issues.Undetermined, - ydb.issues.Timeout, - ydb.issues.Cancelled, - ydb.issues.SessionBusy, - ydb.issues.SessionExpired, - ydb.issues.SessionPoolEmpty, - ) as e: - raise OperationalError(e.message, original_error=e) from e - except ydb.issues.GenericError as e: - raise DataError(e.message, original_error=e) from e - except ydb.issues.InternalError as e: - raise InternalError(e.message, original_error=e) from e - except ydb.Error as e: - raise DatabaseError(e.message, original_error=e) from e - except Exception as e: - raise DatabaseError("Failed to execute query") from e - - return wrapper - - -class Cursor: - def __init__( - self, - driver: Union[ydb.Driver, ydb.aio.Driver], - session_pool: Union[ydb.SessionPool, ydb.aio.SessionPool], - tx_mode: ydb.AbstractTransactionModeBuilder, - tx_context: Optional[ydb.BaseTxContext] = None, - use_scan_query: bool = False, - table_path_prefix: str = "", - ): - self.driver = driver - self.session_pool = session_pool - self.tx_mode = tx_mode - self.tx_context = tx_context - self.use_scan_query = use_scan_query - self.description = None - self.arraysize = 1 - self.rows = None - self._rows_prefetched = None - self.root_directory = table_path_prefix - - @_handle_ydb_errors - def describe_table(self, abs_table_path: str) -> ydb.TableDescription: - return self._retry_operation_in_pool(self._describe_table, abs_table_path) - - def check_exists(self, abs_table_path: str) -> bool: - try: - self._retry_operation_in_pool(self._describe_path, abs_table_path) - return True - except ydb.SchemeError: - return False - - @_handle_ydb_errors - def get_table_names(self, abs_dir_path: str) -> List[str]: - directory: ydb.Directory = self._retry_operation_in_pool(self._list_directory, abs_dir_path) - result = [] - for child in directory.children: - child_abs_path = posixpath.join(abs_dir_path, child.name) - if child.is_table(): - result.append(child_abs_path) - elif child.is_directory() and not child.name.startswith("."): - result.extend(self.get_table_names(child_abs_path)) - return result - - def execute(self, operation: YdbQuery, parameters: Optional[Mapping[str, Any]] = None): - query = self._get_ydb_query(operation) - - if operation.is_ddl: - chunks = self._execute_ddl(query) - elif self.use_scan_query: - chunks = self._execute_scan_query(query, parameters) - else: - chunks = self._execute_dml(query, parameters) - - rows = self._rows_iterable(chunks) - # Prefetch the description: - try: - first_row = next(rows) - except StopIteration: - pass - else: - rows = itertools.chain((first_row,), rows) - if self.rows is not None: - rows = itertools.chain(self.rows, rows) - - self.rows = rows - - def _get_ydb_query(self, operation: YdbQuery) -> Union[ydb.DataQuery, str]: - pragma = "" - if self.root_directory: - pragma = f'PRAGMA TablePathPrefix = "{self.root_directory}";\n' - - yql_with_pragma = pragma + operation.yql_text - - if operation.is_ddl or not operation.parameters_types: - return yql_with_pragma - - return self._make_data_query(yql_with_pragma, operation.parameters_types) - - def _make_data_query( - self, - yql_text: str, - parameters_types: Dict[str, Union[ydb.PrimitiveType, ydb.AbstractTypeBuilder]], - ) -> ydb.DataQuery: - """ - ydb.DataQuery uses hashed SQL text as cache key, which may cause issues if parameters change type within - the same session, so we include parameter types to the key to prevent false positive cache hit. - """ - - sorted_parameters = sorted(parameters_types.items()) # dict keys are unique, so the sorting is stable - - yql_with_params = yql_text + "".join([k + str(v) for k, v in sorted_parameters]) - name = hashlib.sha256(yql_with_params.encode("utf-8")).hexdigest() - return ydb.DataQuery(yql_text, parameters_types, name=name) - - @_handle_ydb_errors - def _execute_scan_query( - self, query: Union[ydb.DataQuery, str], parameters: Optional[Mapping[str, Any]] = None - ) -> Generator[ydb.convert.ResultSet, None, None]: - prepared_query = query - if isinstance(query, str) and parameters: - prepared_query: ydb.DataQuery = self._retry_operation_in_pool(self._prepare, query) - - if isinstance(query, str): - scan_query = ydb.ScanQuery(query, None) - else: - scan_query = ydb.ScanQuery(prepared_query.yql_text, prepared_query.parameters_types) - - return self._execute_scan_query_in_driver(scan_query, parameters) - - @_handle_ydb_errors - def _execute_dml( - self, query: Union[ydb.DataQuery, str], parameters: Optional[Mapping[str, Any]] = None - ) -> ydb.convert.ResultSets: - prepared_query = query - if isinstance(query, str) and parameters: - if self.tx_context: - prepared_query = self._run_operation_in_session(self._prepare, query) - else: - prepared_query = self._retry_operation_in_pool(self._prepare, query) - - if self.tx_context: - return self._run_operation_in_tx(self._execute_in_tx, prepared_query, parameters) - - return self._retry_operation_in_pool(self._execute_in_session, self.tx_mode, prepared_query, parameters) - - @_handle_ydb_errors - def _execute_ddl(self, query: str) -> ydb.convert.ResultSets: - return self._retry_operation_in_pool(self._execute_scheme, query) - - @staticmethod - def _execute_scheme(session: ydb.Session, query: str) -> ydb.convert.ResultSets: - return session.execute_scheme(query) - - @staticmethod - def _describe_table(session: ydb.Session, abs_table_path: str) -> ydb.TableDescription: - return session.describe_table(abs_table_path) - - @staticmethod - def _describe_path(session: ydb.Session, table_path: str) -> ydb.SchemeEntry: - return session._driver.scheme_client.describe_path(table_path) - - @staticmethod - def _list_directory(session: ydb.Session, abs_dir_path: str) -> ydb.Directory: - return session._driver.scheme_client.list_directory(abs_dir_path) - - @staticmethod - def _prepare(session: ydb.Session, query: str) -> ydb.DataQuery: - return session.prepare(query) - - @staticmethod - def _execute_in_tx( - tx_context: ydb.TxContext, prepared_query: ydb.DataQuery, parameters: Optional[Mapping[str, Any]] - ) -> ydb.convert.ResultSets: - return tx_context.execute(prepared_query, parameters, commit_tx=False) - - @staticmethod - def _execute_in_session( - session: ydb.Session, - tx_mode: ydb.AbstractTransactionModeBuilder, - prepared_query: ydb.DataQuery, - parameters: Optional[Mapping[str, Any]], - ) -> ydb.convert.ResultSets: - return session.transaction(tx_mode).execute(prepared_query, parameters, commit_tx=True) - - def _execute_scan_query_in_driver( - self, - scan_query: ydb.ScanQuery, - parameters: Optional[Mapping[str, Any]], - ) -> Generator[ydb.convert.ResultSet, None, None]: - chunk: ydb.ScanQueryResult - for chunk in self.driver.table_client.scan_query(scan_query, parameters): - yield chunk.result_set - - def _run_operation_in_tx(self, callee: collections.abc.Callable, *args, **kwargs): - return callee(self.tx_context, *args, **kwargs) - - def _run_operation_in_session(self, callee: collections.abc.Callable, *args, **kwargs): - return callee(self.tx_context.session, *args, **kwargs) - - def _retry_operation_in_pool(self, callee: collections.abc.Callable, *args, **kwargs): - return self.session_pool.retry_operation_sync(callee, None, *args, **kwargs) - - def _rows_iterable(self, chunks_iterable: ydb.convert.ResultSets): - try: - for chunk in chunks_iterable: - self.description = [ - ( - col.name, - get_column_type(col.type), - None, - None, - None, - None, - None, - ) - for col in chunk.columns - ] - for row in chunk.rows: - # returns tuple to be compatible with SqlAlchemy and because - # of this PEP to return a sequence: https://www.python.org/dev/peps/pep-0249/#fetchmany - yield row[::] - except ydb.Error as e: - raise DatabaseError(e.message, original_error=e) from e - - def _ensure_prefetched(self): - if self.rows is not None and self._rows_prefetched is None: - self._rows_prefetched = list(self.rows) - self.rows = iter(self._rows_prefetched) - return self._rows_prefetched - - def executemany(self, operation: YdbQuery, seq_of_parameters: Optional[Sequence[Mapping[str, Any]]]): - for parameters in seq_of_parameters: - self.execute(operation, parameters) - - def executescript(self, script): - return self.execute(script) - - def fetchone(self): - return next(self.rows or iter([]), None) - - def fetchmany(self, size=None): - return list(itertools.islice(self.rows, size or self.arraysize)) - - def fetchall(self): - return list(self.rows) - - def nextset(self): - self.fetchall() - - def setinputsizes(self, sizes): - pass - - def setoutputsize(self, column=None): - pass - - def close(self): - self.rows = None - self._rows_prefetched = None - - @property - def rowcount(self): - return len(self._ensure_prefetched()) - - -class AsyncCursor(Cursor): - _await = staticmethod(util.await_only) - - @staticmethod - async def _describe_table(session: ydb.aio.table.Session, abs_table_path: str) -> ydb.TableDescription: - return await session.describe_table(abs_table_path) - - @staticmethod - async def _describe_path(session: ydb.aio.table.Session, abs_table_path: str) -> ydb.SchemeEntry: - return await session._driver.scheme_client.describe_path(abs_table_path) - - @staticmethod - async def _list_directory(session: ydb.aio.table.Session, abs_dir_path: str) -> ydb.Directory: - return await session._driver.scheme_client.list_directory(abs_dir_path) - - @staticmethod - async def _execute_scheme(session: ydb.aio.table.Session, query: str) -> ydb.convert.ResultSets: - return await session.execute_scheme(query) - - @staticmethod - async def _prepare(session: ydb.aio.table.Session, query: str) -> ydb.DataQuery: - return await session.prepare(query) - - @staticmethod - async def _execute_in_tx( - tx_context: ydb.aio.table.TxContext, prepared_query: ydb.DataQuery, parameters: Optional[Mapping[str, Any]] - ) -> ydb.convert.ResultSets: - return await tx_context.execute(prepared_query, parameters, commit_tx=False) - - @staticmethod - async def _execute_in_session( - session: ydb.aio.table.Session, - tx_mode: ydb.AbstractTransactionModeBuilder, - prepared_query: ydb.DataQuery, - parameters: Optional[Mapping[str, Any]], - ) -> ydb.convert.ResultSets: - return await session.transaction(tx_mode).execute(prepared_query, parameters, commit_tx=True) - - def _execute_scan_query_in_driver( - self, - scan_query: ydb.ScanQuery, - parameters: Optional[Mapping[str, Any]], - ) -> Generator[ydb.convert.ResultSet, None, None]: - iterator: AsyncIterator[ydb.ScanQueryResult] = self._await( - self.driver.table_client.scan_query(scan_query, parameters) - ) - while True: - try: - result = self._await(iterator.__anext__()) - yield result.result_set - except StopAsyncIteration: - break - - def _run_operation_in_tx(self, callee: collections.abc.Coroutine, *args, **kwargs): - return self._await(callee(self.tx_context, *args, **kwargs)) - - def _run_operation_in_session(self, callee: collections.abc.Coroutine, *args, **kwargs): - return self._await(callee(self.tx_context.session, *args, **kwargs)) - - def _retry_operation_in_pool(self, callee: collections.abc.Coroutine, *args, **kwargs): - return self._await(self.session_pool.retry_operation(callee, *args, **kwargs)) diff --git a/providers/src/airflow/providers/ydb/hooks/_vendor/dbapi/errors.py b/providers/src/airflow/providers/ydb/hooks/_vendor/dbapi/errors.py deleted file mode 100644 index 79faba801aed..000000000000 --- a/providers/src/airflow/providers/ydb/hooks/_vendor/dbapi/errors.py +++ /dev/null @@ -1,103 +0,0 @@ -from typing import List, Optional - -import ydb -from google.protobuf.message import Message - - -class Warning(Exception): - pass - - -class Error(Exception): - def __init__( - self, - message: str, - original_error: Optional[ydb.Error] = None, - ): - super(Error, self).__init__(message) - - self.original_error = original_error - if original_error: - pretty_issues = _pretty_issues(original_error.issues) - self.issues = original_error.issues - self.message = pretty_issues or message - self.status = original_error.status - - -class InterfaceError(Error): - pass - - -class DatabaseError(Error): - pass - - -class DataError(DatabaseError): - pass - - -class OperationalError(DatabaseError): - pass - - -class IntegrityError(DatabaseError): - pass - - -class InternalError(DatabaseError): - pass - - -class ProgrammingError(DatabaseError): - pass - - -class NotSupportedError(DatabaseError): - pass - - -def _pretty_issues(issues: List[Message]) -> str: - if issues is None: - return None - - children_messages = [_get_messages(issue, root=True) for issue in issues] - - if None in children_messages: - return None - - return "\n" + "\n".join(children_messages) - - -def _get_messages(issue: Message, max_depth: int = 100, indent: int = 2, depth: int = 0, root: bool = False) -> str: - if depth >= max_depth: - return None - - margin_str = " " * depth * indent - pre_message = "" - children = "" - - if issue.issues: - collapsed_messages = [] - while not root and len(issue.issues) == 1: - collapsed_messages.append(issue.message) - issue = issue.issues[0] - - if collapsed_messages: - pre_message = f"{margin_str}{', '.join(collapsed_messages)}\n" - depth += 1 - margin_str = " " * depth * indent - - children_messages = [ - _get_messages(iss, max_depth=max_depth, indent=indent, depth=depth + 1) for iss in issue.issues - ] - - if None in children_messages: - return None - - children = "\n".join(children_messages) - - return ( - f"{pre_message}{margin_str}{issue.message}\n{margin_str}" - f"severity level: {issue.severity}\n{margin_str}" - f"issue code: {issue.issue_code}\n{children}" - ) diff --git a/providers/src/airflow/providers/ydb/hooks/_vendor/readme.md b/providers/src/airflow/providers/ydb/hooks/_vendor/readme.md deleted file mode 100644 index 14a2585b69d5..000000000000 --- a/providers/src/airflow/providers/ydb/hooks/_vendor/readme.md +++ /dev/null @@ -1,3 +0,0 @@ -dbapi is extracted from https://github.com/ydb-platform/ydb-sqlalchemy/releases/tag/0.0.1b22 (Apache License 2.0) to avoid dependency on sqlalchemy package ver > 2. -_vendor could be removed in favor of ydb-sqlalchemy package after switching Airflow core to sqlalchemy > 2 (related issue https://github.com/apache/airflow/issues/28723). -Another option is to wait for separate package for ydb-dbapi: https://github.com/ydb-platform/ydb-sqlalchemy/issues/46 and switch to it afterwards. diff --git a/providers/src/airflow/providers/ydb/hooks/ydb.py b/providers/src/airflow/providers/ydb/hooks/ydb.py index 19740d40e6dd..03c59d72c87b 100644 --- a/providers/src/airflow/providers/ydb/hooks/ydb.py +++ b/providers/src/airflow/providers/ydb/hooks/ydb.py @@ -20,19 +20,19 @@ import ydb from sqlalchemy.engine import URL +from ydb_dbapi import Connection as DbApiConnection from airflow.exceptions import AirflowException from airflow.providers.common.sql.hooks.sql import DbApiHook -from airflow.providers.ydb.hooks._vendor.dbapi.connection import Connection as DbApiConnection -from airflow.providers.ydb.hooks._vendor.dbapi.cursor import YdbQuery from airflow.providers.ydb.utils.credentials import get_credentials_from_connection from airflow.providers.ydb.utils.defaults import CONN_NAME_ATTR, CONN_TYPE, DEFAULT_CONN_NAME DEFAULT_YDB_GRPCS_PORT: int = 2135 if TYPE_CHECKING: + from ydb_dbapi import Cursor as DbApiCursor + from airflow.models.connection import Connection - from airflow.providers.ydb.hooks._vendor.dbapi.cursor import Cursor as DbApiCursor class YDBCursor: @@ -46,8 +46,10 @@ def execute(self, sql: str, parameters: Mapping[str, Any] | None = None): if parameters is not None: raise AirflowException("parameters is not supported yet") - q = YdbQuery(yql_text=sql, is_ddl=self.is_ddl) - return self.delegatee.execute(q, parameters) + if self.is_ddl: + return self.delegatee.execute_scheme(sql, parameters) + + return self.delegatee.execute(sql, parameters) def executemany(self, sql: str, seq_of_parameters: Sequence[Mapping[str, Any]]): for parameters in seq_of_parameters: @@ -95,11 +97,12 @@ def description(self): class YDBConnection: """YDB connection wrapper.""" - def __init__(self, ydb_session_pool: Any, is_ddl: bool, use_scan_query: bool): + def __init__(self, database: str, ydb_session_pool: Any, is_ddl: bool): self.is_ddl = is_ddl - self.use_scan_query = use_scan_query - self.delegatee: DbApiConnection = DbApiConnection(ydb_session_pool=ydb_session_pool) - self.delegatee.set_ydb_scan_query(use_scan_query) + self.delegatee: DbApiConnection = DbApiConnection( + database=database, + ydb_session_pool=ydb_session_pool, + ) def cursor(self) -> YDBCursor: return YDBCursor(self.delegatee.cursor(), is_ddl=self.is_ddl) @@ -123,7 +126,7 @@ def close(self) -> None: self.delegatee.close() def bulk_upsert(self, table_name: str, rows: Sequence, column_types: ydb.BulkUpsertColumns): - self.delegatee.driver.table_client.bulk_upsert(table_name, rows=rows, column_types=column_types) + self.delegatee.bulk_upsert(table_name, rows=rows, column_types=column_types) class YDBHook(DbApiHook): @@ -136,12 +139,11 @@ class YDBHook(DbApiHook): supports_autocommit: bool = True supports_executemany: bool = True - def __init__(self, *args, is_ddl: bool = False, use_scan_query: bool = False, **kwargs) -> None: + def __init__(self, *args, is_ddl: bool = False, **kwargs) -> None: super().__init__(*args, **kwargs) self.is_ddl = is_ddl - self.use_scan_query = use_scan_query - conn: Connection = self.get_connection(self.get_conn_id()) + conn: Connection = self.connection host: str | None = conn.host if not host: raise ValueError("YDB host must be specified") @@ -161,13 +163,13 @@ def __init__(self, *args, is_ddl: bool = False, use_scan_query: bool = False, ** driver_config = ydb.DriverConfig( endpoint=endpoint, database=database, - table_client_settings=YDBHook._get_table_client_settings(), + query_client_settings=YDBHook._get_query_client_settings(), credentials=credentials, ) driver = ydb.Driver(driver_config) # wait until driver become initialized driver.wait(fail_fast=True, timeout=10) - self.ydb_session_pool = ydb.SessionPool(driver, size=5) + self.ydb_session_pool = ydb.QuerySessionPool(driver, size=5) @classmethod def get_connection_form_widgets(cls) -> dict[str, Any]: @@ -237,7 +239,7 @@ def sqlalchemy_url(self) -> URL: def get_conn(self) -> YDBConnection: """Establish a connection to a YDB database.""" - return YDBConnection(self.ydb_session_pool, is_ddl=self.is_ddl, use_scan_query=self.use_scan_query) + return YDBConnection(self.database, self.ydb_session_pool, is_ddl=self.is_ddl) @staticmethod def _serialize_cell(cell: object, conn: YDBConnection | None = None) -> Any: @@ -251,12 +253,12 @@ def bulk_upsert(self, table_name: str, rows: Sequence, column_types: ydb.BulkUps https://ydb.tech/docs/en/recipes/ydb-sdk/bulk-upsert """ - self.get_conn().bulk_upsert(f"{self.database}/{table_name}", rows, column_types) + self.get_conn().bulk_upsert(table_name, rows, column_types) @staticmethod - def _get_table_client_settings() -> ydb.TableClientSettings: + def _get_query_client_settings() -> ydb.QueryClientSettings: return ( - ydb.TableClientSettings() + ydb.QueryClientSettings() .with_native_date_in_result_sets(True) .with_native_datetime_in_result_sets(True) .with_native_timestamp_in_result_sets(True) diff --git a/providers/src/airflow/providers/ydb/operators/ydb.py b/providers/src/airflow/providers/ydb/operators/ydb.py index 787abf8cbd82..1867f227b4eb 100644 --- a/providers/src/airflow/providers/ydb/operators/ydb.py +++ b/providers/src/airflow/providers/ydb/operators/ydb.py @@ -52,33 +52,3 @@ def __init__( kwargs["hook_params"] = {"is_ddl": is_ddl, **hook_params} super().__init__(conn_id=ydb_conn_id, sql=sql, parameters=parameters, **kwargs) - - -class YDBScanQueryOperator(SQLExecuteQueryOperator): - """ - Executes scan query in a specific YDB database. - - :param sql: the SQL code to be executed as a single string, or - a list of str (sql statements), or a reference to a template file. - Template references are recognized by str ending in '.sql' - :param ydb_conn_id: The :ref:`ydb conn id ` - reference to a specific YDB cluster and database. - :param parameters: (optional) the parameters to render the SQL query with. - """ - - ui_color = "#ededed" - - def __init__( - self, - sql: str | list[str], - ydb_conn_id: str = "ydb_default", - parameters: Mapping | Iterable | None = None, - **kwargs, - ) -> None: - if parameters is not None: - raise AirflowException("parameters are not supported yet") - - hook_params = kwargs.pop("hook_params", {}) - kwargs["hook_params"] = {"use_scan_query": True, **hook_params} - - super().__init__(conn_id=ydb_conn_id, sql=sql, parameters=parameters, **kwargs) diff --git a/providers/src/airflow/providers/ydb/provider.yaml b/providers/src/airflow/providers/ydb/provider.yaml index 18f7140e1aa1..f70a70b2834f 100644 --- a/providers/src/airflow/providers/ydb/provider.yaml +++ b/providers/src/airflow/providers/ydb/provider.yaml @@ -34,7 +34,8 @@ versions: dependencies: - apache-airflow>=2.8.0 - apache-airflow-providers-common-sql>=1.14.1 - - ydb>=3.12.1 + - ydb>=3.18.8 + - ydb-dbapi>=0.1.0 integrations: - integration-name: YDB @@ -53,10 +54,6 @@ hooks: - integration-name: YDB python-modules: - airflow.providers.ydb.hooks.ydb - - airflow.providers.ydb.hooks._vendor.dbapi.connection - - airflow.providers.ydb.hooks._vendor.dbapi.constants - - airflow.providers.ydb.hooks._vendor.dbapi.cursor - - airflow.providers.ydb.hooks._vendor.dbapi.errors connection-types: - hook-class-name: airflow.providers.ydb.hooks.ydb.YDBHook diff --git a/providers/tests/system/ydb/example_ydb.py b/providers/tests/system/ydb/example_ydb.py index 2f3f88cabe64..cd2d4835d49a 100644 --- a/providers/tests/system/ydb/example_ydb.py +++ b/providers/tests/system/ydb/example_ydb.py @@ -24,7 +24,7 @@ from airflow import DAG from airflow.decorators import task from airflow.providers.ydb.hooks.ydb import YDBHook -from airflow.providers.ydb.operators.ydb import YDBExecuteQueryOperator, YDBScanQueryOperator +from airflow.providers.ydb.operators.ydb import YDBExecuteQueryOperator # [START ydb_operator_howto_guide] @@ -101,21 +101,12 @@ def populate_pet_table_via_bulk_upsert(): ) # [END ydb_operator_howto_guide_get_birth_date] - # [START ydb_operator_howto_guide_get_birth_date_scan] - get_birth_date_scan = YDBScanQueryOperator( - task_id="get_birth_date_scan", - sql="SELECT * FROM pet WHERE birth_date BETWEEN '{{params.begin_date}}' AND '{{params.end_date}}'", - params={"begin_date": "2020-01-01", "end_date": "2020-12-31"}, - ) - # [END ydb_operator_howto_guide_get_birth_date_scan] - ( create_pet_table >> populate_pet_table >> populate_pet_table_via_bulk_upsert() >> get_all_pets >> get_birth_date - >> get_birth_date_scan ) # [END ydb_operator_howto_guide] diff --git a/providers/tests/ydb/hooks/test_ydb.py b/providers/tests/ydb/hooks/test_ydb.py index f644fc88cd57..8ef5f9e998c3 100644 --- a/providers/tests/ydb/hooks/test_ydb.py +++ b/providers/tests/ydb/hooks/test_ydb.py @@ -27,14 +27,9 @@ def wait(*args, **kwargs): pass -class FakeSessionPoolImpl: - def __init__(self, driver): - self._driver = driver - - class FakeSessionPool: def __init__(self, driver): - self._pool_impl = FakeSessionPoolImpl(driver) + self._driver = driver class FakeYDBCursor: @@ -63,10 +58,8 @@ def rowcount(self): @patch("airflow.hooks.base.BaseHook.get_connection") @patch("ydb.Driver") -@patch("ydb.SessionPool") -@patch( - "airflow.providers.ydb.hooks._vendor.dbapi.connection.Connection._cursor_class", new_callable=PropertyMock -) +@patch("ydb.QuerySessionPool") +@patch("ydb_dbapi.Connection._cursor_cls", new_callable=PropertyMock) def test_execute(cursor_class, mock_session_pool, mock_driver, mock_get_connection): mock_get_connection.return_value = Connection( conn_type="ydb", diff --git a/providers/tests/ydb/operators/test_ydb.py b/providers/tests/ydb/operators/test_ydb.py index d9e9340b591c..1392253f743d 100644 --- a/providers/tests/ydb/operators/test_ydb.py +++ b/providers/tests/ydb/operators/test_ydb.py @@ -46,27 +46,24 @@ def test_sql_templating(create_task_instance_of_operator): class FakeDriver: + def __init__(self, *args): + self.table_client = FakeTableClient() + def wait(*args, **kwargs): pass -class FakeSessionPoolImpl: - def __init__(self, driver): - self._driver = driver - - class FakeTableClient: def __init__(self, *args): self.bulk_upsert_args = [] def bulk_upsert(self, table_path, rows, column_types, settings=None): - assert settings is None self.bulk_upsert_args.append((table_path, rows, column_types)) class FakeSessionPool: def __init__(self, driver): - self._pool_impl = FakeSessionPoolImpl(driver) + self._driver = driver class FakeYDBCursor: @@ -105,30 +102,21 @@ def setup_method(self): @patch("airflow.hooks.base.BaseHook.get_connection") @patch("ydb.Driver") - @patch("ydb.SessionPool") - @patch( - "airflow.providers.ydb.hooks._vendor.dbapi.connection.Connection._ydb_table_client_class", - new_callable=PropertyMock, - ) - @patch( - "airflow.providers.ydb.hooks._vendor.dbapi.connection.Connection._cursor_class", - new_callable=PropertyMock, - ) - def test_execute_query( - self, cursor_class, table_client_class, mock_session_pool, mock_driver, mock_get_connection - ): + @patch("ydb.QuerySessionPool") + @patch("ydb_dbapi.Connection._cursor_cls", new_callable=PropertyMock) + def test_execute_query(self, cursor_class, mock_session_pool, mock_driver, mock_get_connection): mock_get_connection.return_value = Connection( conn_type="ydb", host="localhost", extra={"database": "/my_db"} ) cursor_class.return_value = FakeYDBCursor - table_client_class.return_value = FakeTableClient driver = FakeDriver() mock_driver.return_value = driver session_pool = FakeSessionPool(driver) mock_session_pool.return_value = session_pool + context = {"ti": MagicMock()} operator = YDBExecuteQueryOperator( task_id="simple_sql", sql="select 987", is_ddl=False, handler=fetch_one_handler @@ -157,7 +145,7 @@ def test_execute_query( {"a": 888, "b": "world"}, ] hook.bulk_upsert("my_table", rows=rows, column_types=column_types) - assert len(session_pool._pool_impl._driver.table_client.bulk_upsert_args) == 1 - arg0 = session_pool._pool_impl._driver.table_client.bulk_upsert_args[0] + assert len(session_pool._driver.table_client.bulk_upsert_args) == 1 + arg0 = session_pool._driver.table_client.bulk_upsert_args[0] assert arg0[0] == "/my_db/my_table" assert len(arg0[1]) == 2