Skip to content

Commit

Permalink
Feature/38 incremental materialization strategies (#62)
Browse files Browse the repository at this point in the history
* going back to incremental default implementation - splitting delete and insert into two statements

* default implementation from core - bug in need_swap

* parallel pytest worker and default python 3.10.9

* removing python versioning bc poetry is not able to manage it

* new envlist format
  • Loading branch information
tglunde authored Apr 14, 2023
1 parent 670fa33 commit 34f3d82
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 225 deletions.
26 changes: 5 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ dbt-exasol:
# Known isues
## >=1.3 Python model not yet supported - WIP
- Please follow [this pull request](https://github.com/tglunde/dbt-exasol/pull/59)
## Breaking changes with release 1.2.2
- Timestamp format defaults to YYYY-MM-DDTHH:MI:SS.FF6
Expand All @@ -50,25 +53,6 @@ In order to support packages like dbt-utils and dbt-audit-helper, we needed to c
# Reporting bugs and contributing code
- Please report bugs using the issues
# Release History
## Release 1.2.2
- Added timestamp format parameter in profile.yml parameter file to set Exasol session parameter NLS_TIMESTAMP_FORMAT when opening a connection. Defaults to 'YYYY-MM-DDTHH:MI:SS.FF6'
- Adding row_separator (LF/CRLF) parameter in profile.yml parameter file to be used in seed csv import. Defaults to operating system default (os.linesep in python).
- bugfix #36 regarding column quotes and case sensitivity of column names.
- bugfix #42 regarding datatype change when using snapshot materialization. Added modify column statement in exasol__alter_column_type macro
- bugfix #17 number format datatype
- issue #24 - dbt-core v1.2.0 compatibility finished
# Releases
## Release 1.2.0
- support for invalidate_hard_deletes option in snapshots added by jups23
- added persist_docs support by sti0
- added additional configuration keys that can be included in profiles.yml by johannes-becker-otto
- added cross-database macros introduced in 1.2 by sti0
- added support for connection retries by sti0
- added support for grants by sti0
- added pytest functional adapter tests by tglunde
- tox testing for python 3.7.2 through 3.10 added by tglunde
## Release 1.0.0
- pyexasol HTTP import csv feature implemented. Optimal performance and compatibility with Exasol CSV parsing
[GitHub Releases](https://github.com/tglunde/dbt-exasol/releases)
4 changes: 4 additions & 0 deletions dbt/adapters/exasol/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ def execute(self, query, bindings: Optional[Any] = None):
"""executing query"""
if query.startswith("0CSV|"):
self.import_from_file(bindings, query.split("|", 1)[1]) # type: ignore
elif query.__contains__("|SEPARATEMEPLEASE|"):
sqls = query.split("|SEPARATEMEPLEASE|")
for sql in sqls:
self.stmt = self.connection.execute(sql)
else:
self.stmt = self.connection.execute(query)
return self
Expand Down
9 changes: 8 additions & 1 deletion dbt/adapters/exasol/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from dbt.exceptions import raise_compiler_error
from dbt.utils import filter_null_values

from dbt.adapters.exasol import ExasolColumn, ExasolConnectionManager, ExasolRelation
from dbt.adapters.exasol import (ExasolColumn, ExasolConnectionManager,
ExasolRelation)


class ExasolAdapter(SQLAdapter):
Expand Down Expand Up @@ -79,3 +80,9 @@ def quote_seed_column(self, column: str, quote_config: Optional[bool]) -> str:
if quote_columns:
return self.quote(column)
return column

def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append", "delete+insert"]
134 changes: 68 additions & 66 deletions dbt/include/exasol/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
@@ -1,91 +1,93 @@
{% macro incremental_delete(target_relation, tmp_relation) -%}
{%- set unique_key = config.get('unique_key') -%}

{% if unique_key %}
{% if unique_key is sequence and unique_key is not string %}
delete from {{ target_relation }}
where exists (select 1 from {{ tmp_relation }}
where
{% for key in unique_key %}
{{ tmp_relation }}.{{ key }} = {{ target_relation }}.{{ key }}
{{ "and " if not loop.last }}
{% endfor %}
);
{% else %}
delete from {{ target_relation }}
where (
{{ unique_key }}) in (
select ({{ unique_key }})
from {{ tmp_relation }}
);
{% endif %}
{%endif%}
{%- endmacro %}

{% macro incremental_insert(tmp_relation, target_relation, unique_key=none, statement_name="main") %}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | join(', ', attribute='name') -%}

insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation.schema }}.{{ tmp_relation.identifier }}
);
{%- endmacro %}
{% materialization incremental, adapter='exasol' -%}

-- relations
{%- set existing_relation = load_relation(this) -%}

{% materialization incremental, adapter='exasol' -%}
{%- set target_relation = this.incorporate(type='table') -%}
{%- set temp_relation = make_temp_relation(target_relation)-%}
{%- set intermediate_relation = make_intermediate_relation(target_relation)-%}
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}

-- configs
{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
{%- set identifier = model['alias'] -%}
{%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database, type='table') -%}
{% set existing_relation = adapter.get_relation(database=database, schema=schema, identifier = identifier) %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%}
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}

-- the temp_ and backup_ relations should not already exist in the database; get_relation
-- will return None in that case. Otherwise, we get a relation that we can drop
-- later, before we try to use this name for the current operation. This has to happen before
-- BEGIN, in a separate transaction
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation)-%}
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
-- grab current tables grants config for comparision later on
{% set grant_config = config.get('grants') %}
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}

-- grab current tables grants config for comparision later on
{%- set grant_config = config.get('grants') -%}

-- setup
{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set to_drop = [] %}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view %}
{% do adapter.drop_relation(existing_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% set build_sql = get_create_table_as_sql(False, target_relation, sql) %}
{% elif full_refresh_mode %}
{% do drop_relation(existing_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% set build_sql = get_create_table_as_sql(False, intermediate_relation, sql) %}
{% set need_swap = true %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do run_query(incremental_delete(target_relation, tmp_relation)) %}
{% set build_sql = incremental_insert(tmp_relation, target_relation) %}
{% do run_query(get_create_table_as_sql(True, temp_relation, sql)) %}
{% do adapter.expand_target_column_types(
from_relation=temp_relation,
to_relation=target_relation) %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %}
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}

{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %}
{% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %}

{% endif %}


{%- call statement('main') -%}
{{ build_sql }}
{%- endcall -%}
{% call statement("main") %}
{{ build_sql }}
{% endcall %}

{% if tmp_relation is not none %}
{% do adapter.drop_relation(tmp_relation) %}
{% if need_swap %}
{% do adapter.rename_relation(existing_relation, backup_relation) %}
{% do adapter.rename_relation(intermediate_relation, target_relation) %}
{% do to_drop.append(backup_relation) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke) %}
{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

{% if existing_relation is none or existing_relation.is_view or should_full_refresh() %}
{% do create_indexes(target_relation) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}
{% do adapter.commit() %}

{{ run_hooks(post_hooks, inside_transaction=False) }}
{% for rel in to_drop %}
{% do adapter.drop_relation(rel) %}
{% endfor %}

{% do persist_docs(target_relation, model) %}
{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}
{%- endmaterialization %}

{%- endmaterialization %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{% macro exasol__get_incremental_default_sql(arg_dict) %}

{% if arg_dict["unique_key"] %}
{% do return(get_incremental_delete_insert_sql(arg_dict)) %}
{% else %}
{% do return(get_incremental_append_sql(arg_dict)) %}
{% endif %}

{% endmacro %}
34 changes: 34 additions & 0 deletions dbt/include/exasol/macros/materializations/merge.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{% macro exasol__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

{% if unique_key %}
{% if unique_key is sequence and unique_key is not string %}
delete from {{ target }}
where exists ( select 1 from {{ source }}
where
{% for key in unique_key %}
{{ source }}.{{ key }} = {{ target }}.{{ key }}
{{ "and " if not loop.last }}
{% endfor %}
)
{% else %}
delete from {{ target }}
where (
{{ unique_key }}) in (
select ({{ unique_key }})
from {{ source }}
)

{% endif %}
{% endif %}

|SEPARATEMEPLEASE|

insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)

{%- endmacro %}
4 changes: 0 additions & 4 deletions dbt/include/exasol/macros/materializations/view.sql

This file was deleted.

Loading

0 comments on commit 34f3d82

Please sign in to comment.