diff --git a/README.md b/README.md index a897fec..7d86090 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,27 @@ mindsdb: ``` dbt init ``` +- create a intergration with "database" materialization: +``` + {{ + config( + materialized='database', + engine='trino', + parameters={ + "user": env_var('TRINO_USER'), + "auth": "basic", + "http_scheme": "https", + "port": 443, + "password": env_var('TRINO_PASSWORD'), + "host": "trino.company.com", + "catalog": "hive", + "schema": "photorep_schema", + "with": "with (transactional = true)" + } + ) + }} +``` + - To create predictor add dbt model with "predictor" materialization: Name of the model is used as name of predictor. Parameters: @@ -66,6 +87,13 @@ Parameters: select * from stores ``` +- Other paramaters for time-series predictor: + - order_by - column that the time series will be order by. + - group_by - rows that make a partition + - window - the number [int] of rows to "look back" into when making a prediction + - horizon - keyword specifies the number of future predictions, default value is 1 + + - To apply predictor add dbt model with "table" materialization. It creates or replaces table in selected integration with results of predictor. Name of the model is used as name of the table to store prediction results. @@ -76,10 +104,12 @@ Parameters: - integration - name of used integration to get data from and save result to. In has to be created in mindsdb beforehand ``` - {{ config(materialized='table', predictor_name='TEST_PREDICTOR_NAME', integration='int1') }} - select a, bc from ddd where name > latest + {{ config(materialized='table', integration='int1') }} + select a, bc from ddd JOIN TEST_PREDICTOR_NAME where name > latest ``` +Notes: "predictor_name" has been removed from model configuration. Instead a JOIN sentence need to be set explicitly + ## Testing - Install dev requirements diff --git a/dbt/adapters/mindsdb/connections.py b/dbt/adapters/mindsdb/connections.py index 9c14fbf..68ca6d5 100644 --- a/dbt/adapters/mindsdb/connections.py +++ b/dbt/adapters/mindsdb/connections.py @@ -56,7 +56,8 @@ def open(cls, connection): port=credentials.port, username=credentials.username, password=credentials.password, - database=credentials.database + database=credentials.database, + buffered=True ) connection.state = 'open' connection.handle = handle diff --git a/dbt/adapters/mindsdb/relation.py b/dbt/adapters/mindsdb/relation.py new file mode 100644 index 0000000..48868f5 --- /dev/null +++ b/dbt/adapters/mindsdb/relation.py @@ -0,0 +1,20 @@ +from dataclasses import dataclass + +from dbt.adapters.base.relation import BaseRelation, Policy +from dbt.contracts.relation import ComponentName + + +@dataclass +class MindsdbQuotePolicy(Policy): + database: bool = False + schema: bool = False + identifier: bool = False + + +@dataclass(frozen=True, eq=False, repr=False) +class MindsdbRelation(BaseRelation): + quote_policy: MindsdbQuotePolicy = MindsdbQuotePolicy() + + # Overridden as Mindsdb converts relation identifiers to lowercase + def _is_exactish_match(self, field: ComponentName, value: str) -> bool: + return self.path.get_lowered_part(field) == value.lower() diff --git a/dbt/include/mindsdb/macros/adapters.sql b/dbt/include/mindsdb/macros/adapters.sql index d0f0f3a..ecdd31d 100644 --- a/dbt/include/mindsdb/macros/adapters.sql +++ b/dbt/include/mindsdb/macros/adapters.sql @@ -22,14 +22,12 @@ {% endmacro %} -{% macro apply_predictor_wrap(sql, predictor_name, destination_table) -%} +{% macro apply_predictor_wrap(sql, destination_table) -%} create or replace table {{ destination_table }} select * from ( {{ sql }} ) - join {{ predictor_name }} - {% endmacro %} @@ -39,15 +37,27 @@ {% endmacro %} -{% macro create_predictor_wrap(sql, predictor, integration, predict, predict_alias, using) -%} +{% macro create_predictor_wrap(sql, predictor, integration, predict, predict_alias, using, order_by, group_by, window, horizon) -%} CREATE PREDICTOR {{ predictor }} FROM {{ integration }} ( {{ sql }} ) PREDICT {{ predict }} {% if predict_alias is not none %} as {{predict_alias}} {% endif %} - {% if using is not none %} - USING - {{using}} - {% endif %} + {%- if using is not none %} + USING + {{using}} + {%- endif %} + {%- if order_by %} + ORDER BY {{order_by}} + {%- endif %} + {%- if group_by %} + GROUP BY {{group_by}} + {%- endif %} + {%- if window %} + WINDOW {{window}} + {%- endif %} + {%- if horizon %} + HORIZON {{horizon}} + {%- endif %} -{% endmacro %} \ No newline at end of file +{% endmacro %} diff --git a/dbt/include/mindsdb/macros/materialization/database.sql b/dbt/include/mindsdb/macros/materialization/database.sql new file mode 100644 index 0000000..5238372 --- /dev/null +++ b/dbt/include/mindsdb/macros/materialization/database.sql @@ -0,0 +1,45 @@ +{% materialization database, adapter='mindsdb' %} + {%- set database = model['alias'] -%} + {%- set engine = config.get('engine') -%} + {%- set prefix = config.get('prefix') -%} + {%- set parameters = config.get('parameters') -%} + + {% if prefix is none %} + {%- set connector = database %} + {% else %} + {%- set connector = prefix ~ "_" ~ database %} + {% endif %} + + + -- build model + + -- WA for https://github.com/mindsdb/mindsdb/issues/4152 + {%- call statement('tables', fetch_result = True) -%} + SHOW DATABASES + {%- endcall -%} + {%- set tables = load_result('tables') -%} + {%- set tables_data = tables['data'] -%} + + {%- set found_table = False -%} + {% for item in tables_data %} + {% if item[0] == connector %} + {%- call statement('main') -%} + DROP DATABASE IF EXISTS {{ connector }} + {%- endcall -%} + {% endif %} + {% endfor %} + + -- end WA + + + {%- call statement('main') -%} + CREATE DATABASE {{ connector }} WITH ENGINE='{{engine}}', + PARAMETERS={{parameters}} + {%- endcall -%} + + {{ log("Create mindsdb database(integration) \"" ~ connector ~ "\" with engine \"" ~ engine ~ "\"", True) }} + + -- Return the relations created in this materialization + {{ return({'relations': []}) }} +{%- endmaterialization -%} + diff --git a/dbt/include/mindsdb/macros/materialization/predictor.sql b/dbt/include/mindsdb/macros/materialization/predictor.sql index 18c7b76..991a452 100644 --- a/dbt/include/mindsdb/macros/materialization/predictor.sql +++ b/dbt/include/mindsdb/macros/materialization/predictor.sql @@ -5,6 +5,10 @@ {%- set integration = config.get('integration') -%} {%- set predict = config.get('predict') -%} {%- set predict_alias = config.get('predict_alias') -%} + {%- set order_by = config.get('order_by', none) -%} + {%- set group_by = config.get('group_by', none) -%} + {%- set window = config.get('window', none) -%} + {%- set horizon = config.get('horizon', none) -%} {%- set using = config.get('using') -%} {% if integration is none %} @@ -46,7 +50,8 @@ integration, predict, predict_alias, - using_str )}} + using_str, + order_by, group_by, window, horizon )}} {%- endcall -%} diff --git a/dbt/include/mindsdb/macros/materialization/table.sql b/dbt/include/mindsdb/macros/materialization/table.sql index 1dc3947..c16bb46 100644 --- a/dbt/include/mindsdb/macros/materialization/table.sql +++ b/dbt/include/mindsdb/macros/materialization/table.sql @@ -3,7 +3,6 @@ {%- set identifier = model['alias'] -%} {%- set integration = config.get('integration') -%} - {%- set predictor_name = config.get('predictor_name') -%} {% if integration is none %} {{ exceptions.raise_compiler_error('Integration is not set') }} @@ -14,9 +13,10 @@ -- path {% for item in identifier.split('.') -%} - {{ target_relation_list.append('`{}`'.format(item)) }} + {{ target_relation_list.append('`{}`'.format(item)) }} {%- endfor %} + -- final {% set target_relation = target_relation_list | join('.') %} -- ... setup database ... @@ -24,7 +24,7 @@ -- build model {% call statement('main') %} - {{ apply_predictor_wrap(sql, predictor_name, target_relation) }} + {{ apply_predictor_wrap(sql, target_relation) }} {% endcall %} -- ... run post-hooks ... @@ -34,4 +34,4 @@ {{ return({'relations': []}) }} -{%- endmaterialization -%} \ No newline at end of file +{%- endmaterialization -%} diff --git a/dbt/include/mindsdb/macros/schema.sql b/dbt/include/mindsdb/macros/schema.sql new file mode 100644 index 0000000..7e0ef01 --- /dev/null +++ b/dbt/include/mindsdb/macros/schema.sql @@ -0,0 +1,12 @@ +{% macro mindsdb__create_schema(relation) -%} + {%- call statement('create_schema') -%} + SELECT 1 + {% endcall %} +{% endmacro %} + + +{% macro mindsdb__drop_schema(relation) -%} + {%- call statement('drop_schema') -%} + drop database if exists {{ relation.without_identifier() }} + {% endcall %} +{% endmacro %} diff --git a/setup.py b/setup.py index 8eb835d..0439529 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ packages=find_namespace_packages(include=['dbt', 'dbt.*']), include_package_data=True, install_requires=[ - "dbt-core==1.0.1", + "dbt-core>=1.0.1", "mysql-connector-python~=8.0.22", ] ) diff --git a/tests/unit/test_mindsdb_adapter.py b/tests/unit/test_mindsdb_adapter.py index 52728ad..93287de 100644 --- a/tests/unit/test_mindsdb_adapter.py +++ b/tests/unit/test_mindsdb_adapter.py @@ -173,16 +173,15 @@ def test_create_predictor(self): def test_prediction(self): model = ''' - {{ config(materialized='table', predictor_name='TEST_PREDICTOR_NAME', integration='int1') }} - select a, bc from ddd where name > latest + {{ config(materialized='table', integration='int1') }} + select a, bc from ddd JOIN TEST_PREDICTOR_NAME where name > latest ''' expected = ''' create or replace table `int1`.`schem`.`predict` select * from ( - select a, bc from ddd where name > latest + select a, bc from ddd JOIN TEST_PREDICTOR_NAME where name > latest ) - join TEST_PREDICTOR_NAME ''' expected = self.sql_line_format(expected) @@ -191,3 +190,48 @@ def test_prediction(self): queries = self.get_dbt_queries() assert expected in queries + + + + def test_create_database(self): + model = ''' + {{ + config( + materialized='database', + engine='trino', + parameters={ + "user": "user", + "auth": "basic", + "http_scheme": "https", + "port": 443, + "password": "password", + "host": "localhost", + "catalog": "catalog", + "schema": "schema", + "with": "with (transactional = true)" + } + ) + }} + ''' + + expected1 = 'SHOW DATABASES' + expected2 = 'DROP DATABASE IF EXISTS new_database' + + expected3 = ''' + CREATE DATABASE new_database WITH ENGINE='trino', + PARAMETERS={'user': 'user', 'auth': 'basic', 'http_scheme': 'https', 'port': 443, 'password': 'password', 'host': 'localhost', 'catalog': 'catalog', 'schema': 'schema', 'with': 'with (transactional = true)'} + ''' + + expected3 = self.sql_line_format(expected3) + + self.add_model('new_database', model) + queries = self.get_dbt_queries() + + # queries exist + assert expected1 in queries + assert expected2 not in queries + assert expected3 in queries + + # right queries order + assert queries.index(expected1) < queries.index(expected3) +