Skip to content
This repository has been archived by the owner on Jan 2, 2024. It is now read-only.

Commit

Permalink
feat: add append_query_builder attribute to SQLDataNode and open appe…
Browse files Browse the repository at this point in the history
…nd() method
  • Loading branch information
trgiangdo committed Nov 10, 2023
1 parent 0ea22da commit e142305
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def _check_callable(self, data_node_config_id: str, data_node_config: DataNodeCo
],
DataNodeConfig._STORAGE_TYPE_VALUE_SQL: [
DataNodeConfig._REQUIRED_WRITE_QUERY_BUILDER_SQL_PROPERTY,
DataNodeConfig._OPTIONAL_APPEND_QUERY_BUILDER_SQL_PROPERTY,
],
}

Expand Down
5 changes: 5 additions & 0 deletions src/taipy/core/config/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@
"type": "string",
"taipy_function": true
},
"append_query_builder": {
"description": "storage_type: sql specific. A callable function that takes in the data as an input parameter and returns a list of SQL queries to be executed when the append data node method is called.",
"type": "string",
"taipy_function": true
},
"collection_name ": {
"description": "storage_type: mongo_collection specific.",
"type": "string"
Expand Down
9 changes: 8 additions & 1 deletion src/taipy/core/config/data_node_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class DataNodeConfig(Section):
# SQL
_REQUIRED_READ_QUERY_SQL_PROPERTY = "read_query"
_REQUIRED_WRITE_QUERY_BUILDER_SQL_PROPERTY = "write_query_builder"
_OPTIONAL_APPEND_QUERY_BUILDER_SQL_PROPERTY = "append_query_builder"
# MONGO
_REQUIRED_DB_NAME_MONGO_PROPERTY = "db_name"
_REQUIRED_COLLECTION_NAME_MONGO_PROPERTY = "collection_name"
Expand Down Expand Up @@ -207,6 +208,7 @@ class DataNodeConfig(Section):
_OPTIONAL_HOST_SQL_PROPERTY: "localhost",
_OPTIONAL_PORT_SQL_PROPERTY: 1433,
_OPTIONAL_DRIVER_SQL_PROPERTY: "",
_OPTIONAL_APPEND_QUERY_BUILDER_SQL_PROPERTY: None,
_OPTIONAL_FOLDER_PATH_SQLITE_PROPERTY: None,
_OPTIONAL_FILE_EXTENSION_SQLITE_PROPERTY: ".db",
_OPTIONAL_DB_EXTRA_ARGS_SQL_PROPERTY: None,
Expand Down Expand Up @@ -867,6 +869,7 @@ def _configure_sql(
db_engine: str,
read_query: str,
write_query_builder: Callable,
append_query_builder: Optional[Callable] = None,
db_username: Optional[str] = None,
db_password: Optional[str] = None,
db_host: Optional[str] = None,
Expand All @@ -889,7 +892,9 @@ def _configure_sql(
or *"postgresql"*.
read_query (str): The SQL query string used to read the data from the database.
write_query_builder (Callable): A callback function that takes the data as an input parameter
and returns a list of SQL queries.
and returns a list of SQL queries to be executed when writing data to the data node.
append_query_builder (Optional[Callable]): A callback function that takes the data as an input parameter
and returns a list of SQL queries to be executed when appending data to the data node.
db_username (Optional[str]): The database username. Required by the *"mssql"*, *"mysql"*, and
*"postgresql"* engines.
db_password (Optional[str]): The database password. Required by the *"mssql"*, *"mysql"*, and
Expand Down Expand Up @@ -927,6 +932,8 @@ def _configure_sql(
}
)

if append_query_builder is not None:
properties[cls._OPTIONAL_APPEND_QUERY_BUILDER_SQL_PROPERTY] = append_query_builder
if db_username is not None:
properties[cls._OPTIONAL_DB_USERNAME_SQL_PROPERTY] = db_username
if db_password is not None:
Expand Down
16 changes: 16 additions & 0 deletions src/taipy/core/data/_abstract_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,22 @@ def _get_read_query(self, operators: Optional[Union[List, Tuple]] = None, join_o
def _get_base_read_query(self) -> str:
raise NotImplementedError

def _append(self, data) -> None:
engine = self._get_engine()
with engine.connect() as connection:
with connection.begin() as transaction:
try:
self._do_append(data, engine, connection)
except Exception as e:
transaction.rollback()
raise e
else:
transaction.commit()

@abstractmethod
def _do_append(self, data, engine, connection) -> None:
raise NotImplementedError

def _write(self, data) -> None:
"""Check data against a collection of types to handle insertion on the database."""
engine = self._get_engine()
Expand Down
25 changes: 21 additions & 4 deletions src/taipy/core/data/_data_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class _DataNodeConverter(_AbstractConverter):
_EXPOSED_TYPE_KEY = "exposed_type"
__WRITE_QUERY_BUILDER_NAME_KEY = "write_query_builder_name"
__WRITE_QUERY_BUILDER_MODULE_KEY = "write_query_builder_module"
__APPEND_QUERY_BUILDER_NAME_KEY = "append_query_builder_name"
__APPEND_QUERY_BUILDER_MODULE_KEY = "append_query_builder_module"
# TODO: This limits the valid string to only the ones provided by the Converter.
# While in practice, each data nodes might have different exposed type possibilities.
# The previous implementation used tabular datanode but it's no longer suitable so
Expand Down Expand Up @@ -71,11 +73,16 @@ def __serialize_json_dn_properties(cls, datanode_properties: dict):

@classmethod
def __serialize_sql_dn_properties(cls, datanode_properties: dict) -> dict:
query_builder = datanode_properties.get(SQLDataNode._WRITE_QUERY_BUILDER_KEY)
datanode_properties[cls.__WRITE_QUERY_BUILDER_NAME_KEY] = query_builder.__name__ if query_builder else None
datanode_properties[cls.__WRITE_QUERY_BUILDER_MODULE_KEY] = query_builder.__module__ if query_builder else None
write_qb = datanode_properties.get(SQLDataNode._WRITE_QUERY_BUILDER_KEY)
datanode_properties[cls.__WRITE_QUERY_BUILDER_NAME_KEY] = write_qb.__name__ if write_qb else None
datanode_properties[cls.__WRITE_QUERY_BUILDER_MODULE_KEY] = write_qb.__module__ if write_qb else None
datanode_properties.pop(SQLDataNode._WRITE_QUERY_BUILDER_KEY, None)

append_qb = datanode_properties.get(SQLDataNode._APPEND_QUERY_BUILDER_KEY)
datanode_properties[cls.__APPEND_QUERY_BUILDER_NAME_KEY] = append_qb.__name__ if append_qb else None
datanode_properties[cls.__APPEND_QUERY_BUILDER_MODULE_KEY] = append_qb.__module__ if append_qb else None
datanode_properties.pop(SQLDataNode._APPEND_QUERY_BUILDER_KEY, None)

return datanode_properties

@classmethod
Expand Down Expand Up @@ -209,7 +216,6 @@ def __deserialize_json_dn_properties(cls, datanode_model_properties: dict) -> di

@classmethod
def __deserialize_sql_dn_model_properties(cls, datanode_model_properties: dict) -> dict:

if datanode_model_properties[cls.__WRITE_QUERY_BUILDER_MODULE_KEY]:
datanode_model_properties[SQLDataNode._WRITE_QUERY_BUILDER_KEY] = _load_fct(
datanode_model_properties[cls.__WRITE_QUERY_BUILDER_MODULE_KEY],
Expand All @@ -221,6 +227,17 @@ def __deserialize_sql_dn_model_properties(cls, datanode_model_properties: dict)
del datanode_model_properties[cls.__WRITE_QUERY_BUILDER_NAME_KEY]
del datanode_model_properties[cls.__WRITE_QUERY_BUILDER_MODULE_KEY]

if datanode_model_properties[cls.__APPEND_QUERY_BUILDER_MODULE_KEY]:
datanode_model_properties[SQLDataNode._APPEND_QUERY_BUILDER_KEY] = _load_fct(
datanode_model_properties[cls.__APPEND_QUERY_BUILDER_MODULE_KEY],
datanode_model_properties[cls.__APPEND_QUERY_BUILDER_NAME_KEY],
)
else:
datanode_model_properties[SQLDataNode._APPEND_QUERY_BUILDER_KEY] = None

del datanode_model_properties[cls.__APPEND_QUERY_BUILDER_NAME_KEY]
del datanode_model_properties[cls.__APPEND_QUERY_BUILDER_MODULE_KEY]

return datanode_model_properties

@classmethod
Expand Down
13 changes: 12 additions & 1 deletion src/taipy/core/data/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ class SQLDataNode(_AbstractSQLDataNode):
_"postgresql"_.
- _"read_query"_ `(str)`: The SQL query string used to read the data from the database.
- _"write_query_builder"_ `(Callable)`: A callback function that takes the data as an input parameter and
returns a list of SQL queries.
returns a list of SQL queries to be executed when writing data to the data node.
- _"append_query_builder"_ `(Callable)`: A callback function that takes the data as an input parameter and
returns a list of SQL queries to be executed when appending data to the data node.
- _"db_username"_ `(str)`: The database username.
- _"db_password"_ `(str)`: The database password.
- _"db_host"_ `(str)`: The database host. The default value is _"localhost"_.
Expand All @@ -72,6 +74,7 @@ class SQLDataNode(_AbstractSQLDataNode):
__STORAGE_TYPE = "sql"
__READ_QUERY_KEY = "read_query"
_WRITE_QUERY_BUILDER_KEY = "write_query_builder"
_APPEND_QUERY_BUILDER_KEY = "append_query_builder"

def __init__(
self,
Expand Down Expand Up @@ -116,6 +119,7 @@ def __init__(
{
self.__READ_QUERY_KEY,
self._WRITE_QUERY_BUILDER_KEY,
self._APPEND_QUERY_BUILDER_KEY,
}
)

Expand All @@ -126,8 +130,15 @@ def storage_type(cls) -> str:
def _get_base_read_query(self) -> str:
return self.properties.get(self.__READ_QUERY_KEY)

def _do_append(self, data, engine, connection) -> None:
queries = self.properties.get(self._APPEND_QUERY_BUILDER_KEY)(data)
self.__execute_queries(queries, connection)

def _do_write(self, data, engine, connection) -> None:
queries = self.properties.get(self._WRITE_QUERY_BUILDER_KEY)(data)
self.__execute_queries(queries, connection)

def __execute_queries(self, queries: List[str], connection) -> None:
if not isinstance(queries, list):
queries = [queries]
for query in queries:
Expand Down
13 changes: 2 additions & 11 deletions src/taipy/core/data/sql_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,8 @@ def storage_type(cls) -> str:
def _get_base_read_query(self) -> str:
return f"SELECT * FROM {self.properties[self.__TABLE_KEY]}"

def _append(self, data) -> None:
engine = self._get_engine()
with engine.connect() as connection:
with connection.begin() as transaction:
try:
self.__insert_data(data, engine, connection)
except Exception as e:
transaction.rollback()
raise e
else:
transaction.commit()
def _do_append(self, data, engine, connection) -> None:
self.__insert_data(data, engine, connection)

def _do_write(self, data, engine, connection) -> None:
self.__insert_data(data, engine, connection, delete_table=True)
Expand Down
12 changes: 9 additions & 3 deletions tests/core/config/checkers/test_data_node_config_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,16 +440,22 @@ def test_check_callable_properties(self, caplog):
"db_engine": "foo",
"read_query": "foo",
"write_query_builder": 1,
"append_query_builder": 2,
}
with pytest.raises(SystemExit):
Config._collector = IssueCollector()
Config.check()
assert len(Config._collector.errors) == 1
expected_error_message = (
assert len(Config._collector.errors) == 2
expected_error_message_1 = (
"`write_query_builder` of DataNodeConfig `new` must be populated with a Callable function."
" Current value of property `write_query_builder` is 1."
)
assert expected_error_message in caplog.text
assert expected_error_message_1 in caplog.text
expected_error_message_2 = (
"`append_query_builder` of DataNodeConfig `new` must be populated with a Callable function."
" Current value of property `append_query_builder` is 2."
)
assert expected_error_message_2 in caplog.text

config._sections[DataNodeConfig.name]["new"].storage_type = "generic"
config._sections[DataNodeConfig.name]["new"].properties = {"write_fct": 12}
Expand Down
5 changes: 3 additions & 2 deletions tests/core/config/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ def test_configure_sql_table_data_node(self):
assert len(Config.data_nodes) == 2

def test_configure_sql_data_node(self):
a, b, c, d, e, f, g, h, i, j, extra_args, exposed_type, scope, vp, k = (
a, b, c, d, e, f, g, h, i, j, k, extra_args, exposed_type, scope, vp, k = (
"foo",
"user",
"pwd",
"db",
"engine",
"read_query",
"write_query_builder",
"append_query_builder",
"port",
"host",
"driver",
Expand All @@ -84,7 +85,7 @@ def test_configure_sql_data_node(self):
timedelta(1),
"qux",
)
Config.configure_sql_data_node(a, b, c, d, e, f, g, h, i, j, extra_args, exposed_type, scope, vp, property=k)
Config.configure_sql_data_node(a, b, c, d, e, f, g, h, i, j, k, extra_args, exposed_type, scope, vp, property=k)
assert len(Config.data_nodes) == 2

def test_configure_mongo_data_node(self):
Expand Down
4 changes: 4 additions & 0 deletions tests/core/config/test_configure_default_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ def query_builder():
db_engine="mssql",
read_query="SELECT * FROM default_table",
write_query_builder=query_builder,
append_query_builder=query_builder,
db_port=1010,
db_host="default_host",
db_driver="default server",
Expand All @@ -449,6 +450,7 @@ def query_builder():
assert dn1.db_engine == "mssql"
assert dn1.read_query == "SELECT * FROM default_table"
assert dn1.write_query_builder == query_builder
assert dn1.append_query_builder == query_builder
assert dn1.db_port == 1010
assert dn1.db_host == "default_host"
assert dn1.db_driver == "default server"
Expand All @@ -468,6 +470,7 @@ def query_builder():
assert dn2.db_engine == "mssql"
assert dn2.read_query == "SELECT * FROM table_2"
assert dn2.write_query_builder == query_builder
assert dn2.append_query_builder == query_builder
assert dn2.db_port == 2020
assert dn2.db_host == "host_2"
assert dn2.db_driver == "default server"
Expand Down Expand Up @@ -495,6 +498,7 @@ def query_builder():
assert dn3.db_engine == "postgresql"
assert dn3.read_query == "SELECT * FROM table_3"
assert dn3.write_query_builder == query_builder
assert dn3.append_query_builder == query_builder
assert dn3.db_port == 1010
assert dn3.db_host == "default_host"
assert dn3.db_driver == "default server"
Expand Down
53 changes: 53 additions & 0 deletions tests/core/data/test_sql_data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ def my_write_query_builder_with_modin(data: modin_pd.DataFrame):
return ["DELETE FROM example", ("INSERT INTO example VALUES (:foo, :bar)", insert_data)]


def my_append_query_builder_with_pandas(data: pd.DataFrame):
insert_data = data.to_dict("records")
return [("INSERT INTO example VALUES (:foo, :bar)", insert_data)]


def my_append_query_builder_with_modin(data: modin_pd.DataFrame):
insert_data = data.to_dict("records")
return [("INSERT INTO example VALUES (:foo, :bar)", insert_data)]


def single_write_query_builder(data):
return "DELETE FROM example"

Expand Down Expand Up @@ -306,6 +316,49 @@ def test_sqlite_read_file_with_different_extension(self, tmp_sqlite_path, reques
data = dn.read()
assert data.equals(pd.DataFrame([{"foo": 1, "bar": 2}, {"foo": 3, "bar": 4}]))

def test_sqlite_append_pandas(self, tmp_sqlite_sqlite3_file_path):
folder_path, db_name, file_extension = tmp_sqlite_sqlite3_file_path
properties = {
"db_engine": "sqlite",
"read_query": "SELECT * FROM example",
"write_query_builder": my_write_query_builder_with_pandas,
"append_query_builder": my_append_query_builder_with_pandas,
"db_name": db_name,
"sqlite_folder_path": folder_path,
"sqlite_file_extension": file_extension,
}

dn = SQLDataNode("sqlite_dn", Scope.SCENARIO, properties=properties)
original_data = pd.DataFrame([{"foo": 1, "bar": 2}, {"foo": 3, "bar": 4}])
data = dn.read()
assert_frame_equal(data, original_data)

append_data_1 = pd.DataFrame([{"foo": 5, "bar": 6}, {"foo": 7, "bar": 8}])
dn.append(append_data_1)
assert_frame_equal(dn.read(), pd.concat([original_data, append_data_1]).reset_index(drop=True))

def test_sqlite_append_modin(self, tmp_sqlite_sqlite3_file_path):
folder_path, db_name, file_extension = tmp_sqlite_sqlite3_file_path
properties = {
"db_engine": "sqlite",
"read_query": "SELECT * FROM example",
"write_query_builder": my_write_query_builder_with_pandas,
"append_query_builder": my_append_query_builder_with_pandas,
"db_name": db_name,
"sqlite_folder_path": folder_path,
"sqlite_file_extension": file_extension,
"exposed_type": "modin",
}

dn = SQLDataNode("sqlite_dn", Scope.SCENARIO, properties=properties)
original_data = modin_pd.DataFrame([{"foo": 1, "bar": 2}, {"foo": 3, "bar": 4}])
data = dn.read()
df_equals(data, original_data)

append_data_1 = modin_pd.DataFrame([{"foo": 5, "bar": 6}, {"foo": 7, "bar": 8}])
dn.append(append_data_1)
df_equals(dn.read(), modin_pd.concat([original_data, append_data_1]).reset_index(drop=True))

def test_filter_pandas_exposed_type(self, tmp_sqlite_sqlite3_file_path):
folder_path, db_name, file_extension = tmp_sqlite_sqlite3_file_path
properties = {
Expand Down
20 changes: 20 additions & 0 deletions tests/core/data/test_sql_table_data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,26 @@ def test_sqlite_append_pandas(self, tmp_sqlite_sqlite3_file_path):
dn.append(append_data_1)
assert_frame_equal(dn.read(), pd.concat([original_data, append_data_1]).reset_index(drop=True))

def test_sqlite_append_modin(self, tmp_sqlite_sqlite3_file_path):
folder_path, db_name, file_extension = tmp_sqlite_sqlite3_file_path
properties = {
"db_engine": "sqlite",
"table_name": "example",
"db_name": db_name,
"sqlite_folder_path": folder_path,
"sqlite_file_extension": file_extension,
"exposed_type": "modin",
}

dn = SQLTableDataNode("sqlite_dn", Scope.SCENARIO, properties=properties)
original_data = modin_pd.DataFrame([{"foo": 1, "bar": 2}, {"foo": 3, "bar": 4}])
data = dn.read()
df_equals(data, original_data)

append_data_1 = modin_pd.DataFrame([{"foo": 5, "bar": 6}, {"foo": 7, "bar": 8}])
dn.append(append_data_1)
df_equals(dn.read(), modin_pd.concat([original_data, append_data_1]).reset_index(drop=True))

def test_filter_pandas_exposed_type(self, tmp_sqlite_sqlite3_file_path):
folder_path, db_name, file_extension = tmp_sqlite_sqlite3_file_path
properties = {
Expand Down

0 comments on commit e142305

Please sign in to comment.