Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update predict result model due to mindsdb changes #16

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@ mindsdb:
```
dbt init <project_name>
```
- create a intergration with "database" materialization:
```
{{
config(
materialized='database',
engine='trino',
parameters={
"user": env_var('TRINO_USER'),
"auth": "basic",
"http_scheme": "https",
"port": 443,
"password": env_var('TRINO_PASSWORD'),
"host": "trino.company.com",
"catalog": "hive",
"schema": "photorep_schema",
"with": "with (transactional = true)"
}
)
}}
```

- To create predictor add dbt model with "predictor" materialization:
Name of the model is used as name of predictor.
Parameters:
Expand All @@ -66,6 +87,13 @@ Parameters:
select * from stores
```

- Other paramaters for time-series predictor:
- order_by - column that the time series will be order by.
- group_by - rows that make a partition
- window - the number [int] of rows to "look back" into when making a prediction
- horizon - keyword specifies the number of future predictions, default value is 1


- To apply predictor add dbt model with "table" materialization.
It creates or replaces table in selected integration with results of predictor.
Name of the model is used as name of the table to store prediction results.
Expand All @@ -76,10 +104,12 @@ Parameters:
- integration - name of used integration to get data from and save result to.
In has to be created in mindsdb beforehand
```
{{ config(materialized='table', predictor_name='TEST_PREDICTOR_NAME', integration='int1') }}
select a, bc from ddd where name > latest
{{ config(materialized='table', integration='int1') }}
select a, bc from ddd JOIN TEST_PREDICTOR_NAME where name > latest
```

Notes: "predictor_name" has been removed from model configuration. Instead a JOIN sentence need to be set explicitly

## Testing

- Install dev requirements
Expand Down
3 changes: 2 additions & 1 deletion dbt/adapters/mindsdb/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def open(cls, connection):
port=credentials.port,
username=credentials.username,
password=credentials.password,
database=credentials.database
database=credentials.database,
buffered=True
)
connection.state = 'open'
connection.handle = handle
Expand Down
20 changes: 20 additions & 0 deletions dbt/adapters/mindsdb/relation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from dataclasses import dataclass

from dbt.adapters.base.relation import BaseRelation, Policy
from dbt.contracts.relation import ComponentName


@dataclass
class MindsdbQuotePolicy(Policy):
database: bool = False
schema: bool = False
identifier: bool = False


@dataclass(frozen=True, eq=False, repr=False)
class MindsdbRelation(BaseRelation):
quote_policy: MindsdbQuotePolicy = MindsdbQuotePolicy()

# Overridden as Mindsdb converts relation identifiers to lowercase
def _is_exactish_match(self, field: ComponentName, value: str) -> bool:
return self.path.get_lowered_part(field) == value.lower()
28 changes: 19 additions & 9 deletions dbt/include/mindsdb/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@
{% endmacro %}


{% macro apply_predictor_wrap(sql, predictor_name, destination_table) -%}
{% macro apply_predictor_wrap(sql, destination_table) -%}

create or replace table {{ destination_table }}
select * from (
{{ sql }}
)
join {{ predictor_name }}

{% endmacro %}


Expand All @@ -39,15 +37,27 @@

{% endmacro %}

{% macro create_predictor_wrap(sql, predictor, integration, predict, predict_alias, using) -%}
{% macro create_predictor_wrap(sql, predictor, integration, predict, predict_alias, using, order_by, group_by, window, horizon) -%}

CREATE PREDICTOR {{ predictor }}
FROM {{ integration }} (
{{ sql }}
) PREDICT {{ predict }} {% if predict_alias is not none %} as {{predict_alias}} {% endif %}
{% if using is not none %}
USING
{{using}}
{% endif %}
{%- if using is not none %}
USING
{{using}}
{%- endif %}
{%- if order_by %}
ORDER BY {{order_by}}
{%- endif %}
{%- if group_by %}
GROUP BY {{group_by}}
{%- endif %}
{%- if window %}
WINDOW {{window}}
{%- endif %}
{%- if horizon %}
HORIZON {{horizon}}
{%- endif %}

{% endmacro %}
{% endmacro %}
45 changes: 45 additions & 0 deletions dbt/include/mindsdb/macros/materialization/database.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{% materialization database, adapter='mindsdb' %}
{%- set database = model['alias'] -%}
{%- set engine = config.get('engine') -%}
{%- set prefix = config.get('prefix') -%}
{%- set parameters = config.get('parameters') -%}

{% if prefix is none %}
{%- set connector = database %}
{% else %}
{%- set connector = prefix ~ "_" ~ database %}
{% endif %}


-- build model

-- WA for https://github.com/mindsdb/mindsdb/issues/4152
{%- call statement('tables', fetch_result = True) -%}
SHOW DATABASES
{%- endcall -%}
{%- set tables = load_result('tables') -%}
{%- set tables_data = tables['data'] -%}

{%- set found_table = False -%}
{% for item in tables_data %}
{% if item[0] == connector %}
{%- call statement('main') -%}
DROP DATABASE IF EXISTS {{ connector }}
{%- endcall -%}
{% endif %}
{% endfor %}

-- end WA


{%- call statement('main') -%}
CREATE DATABASE {{ connector }} WITH ENGINE='{{engine}}',
PARAMETERS={{parameters}}
{%- endcall -%}

{{ log("Create mindsdb database(integration) \"" ~ connector ~ "\" with engine \"" ~ engine ~ "\"", True) }}

-- Return the relations created in this materialization
{{ return({'relations': []}) }}
{%- endmaterialization -%}

7 changes: 6 additions & 1 deletion dbt/include/mindsdb/macros/materialization/predictor.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
{%- set integration = config.get('integration') -%}
{%- set predict = config.get('predict') -%}
{%- set predict_alias = config.get('predict_alias') -%}
{%- set order_by = config.get('order_by', none) -%}
{%- set group_by = config.get('group_by', none) -%}
{%- set window = config.get('window', none) -%}
{%- set horizon = config.get('horizon', none) -%}
{%- set using = config.get('using') -%}

{% if integration is none %}
Expand Down Expand Up @@ -46,7 +50,8 @@
integration,
predict,
predict_alias,
using_str )}}
using_str,
order_by, group_by, window, horizon )}}
{%- endcall -%}


Expand Down
8 changes: 4 additions & 4 deletions dbt/include/mindsdb/macros/materialization/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

{%- set identifier = model['alias'] -%}
{%- set integration = config.get('integration') -%}
{%- set predictor_name = config.get('predictor_name') -%}

{% if integration is none %}
{{ exceptions.raise_compiler_error('Integration is not set') }}
Expand All @@ -14,17 +13,18 @@

-- path
{% for item in identifier.split('.') -%}
{{ target_relation_list.append('`{}`'.format(item)) }}
{{ target_relation_list.append('`{}`'.format(item)) }}
{%- endfor %}

-- final
{% set target_relation = target_relation_list | join('.') %}

-- ... setup database ...
-- ... run pre-hooks...

-- build model
{% call statement('main') %}
{{ apply_predictor_wrap(sql, predictor_name, target_relation) }}
{{ apply_predictor_wrap(sql, target_relation) }}
{% endcall %}

-- ... run post-hooks ...
Expand All @@ -34,4 +34,4 @@

{{ return({'relations': []}) }}

{%- endmaterialization -%}
{%- endmaterialization -%}
12 changes: 12 additions & 0 deletions dbt/include/mindsdb/macros/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{% macro mindsdb__create_schema(relation) -%}
{%- call statement('create_schema') -%}
SELECT 1
{% endcall %}
{% endmacro %}


{% macro mindsdb__drop_schema(relation) -%}
{%- call statement('drop_schema') -%}
drop database if exists {{ relation.without_identifier() }}
{% endcall %}
{% endmacro %}
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
packages=find_namespace_packages(include=['dbt', 'dbt.*']),
include_package_data=True,
install_requires=[
"dbt-core==1.0.1",
"dbt-core>=1.0.1",
"mysql-connector-python~=8.0.22",
]
)
52 changes: 48 additions & 4 deletions tests/unit/test_mindsdb_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,16 +173,15 @@ def test_create_predictor(self):
def test_prediction(self):

model = '''
{{ config(materialized='table', predictor_name='TEST_PREDICTOR_NAME', integration='int1') }}
select a, bc from ddd where name > latest
{{ config(materialized='table', integration='int1') }}
select a, bc from ddd JOIN TEST_PREDICTOR_NAME where name > latest
'''

expected = '''
create or replace table `int1`.`schem`.`predict`
select * from (
select a, bc from ddd where name > latest
select a, bc from ddd JOIN TEST_PREDICTOR_NAME where name > latest
)
join TEST_PREDICTOR_NAME
'''

expected = self.sql_line_format(expected)
Expand All @@ -191,3 +190,48 @@ def test_prediction(self):
queries = self.get_dbt_queries()

assert expected in queries



def test_create_database(self):
model = '''
{{
config(
materialized='database',
engine='trino',
parameters={
"user": "user",
"auth": "basic",
"http_scheme": "https",
"port": 443,
"password": "password",
"host": "localhost",
"catalog": "catalog",
"schema": "schema",
"with": "with (transactional = true)"
}
)
}}
'''

expected1 = 'SHOW DATABASES'
expected2 = 'DROP DATABASE IF EXISTS new_database'

expected3 = '''
CREATE DATABASE new_database WITH ENGINE='trino',
PARAMETERS={'user': 'user', 'auth': 'basic', 'http_scheme': 'https', 'port': 443, 'password': 'password', 'host': 'localhost', 'catalog': 'catalog', 'schema': 'schema', 'with': 'with (transactional = true)'}
'''

expected3 = self.sql_line_format(expected3)

self.add_model('new_database', model)
queries = self.get_dbt_queries()

# queries exist
assert expected1 in queries
assert expected2 not in queries
assert expected3 in queries

# right queries order
assert queries.index(expected1) < queries.index(expected3)