Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.5 tests, incremental merge, temp table fix #86

Merged
merged 22 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions dbt/adapters/exasol/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dbt.adapters.sql import SQLAdapter
from dbt.exceptions import CompilationError
from dbt.utils import filter_null_values
from dbt.adapters.base.meta import available
from dbt.adapters.base.impl import ConstraintSupport
from dbt.contracts.graph.nodes import ConstraintType

Expand Down Expand Up @@ -99,6 +100,40 @@ def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append", "delete+insert"]
return ["append", "merge", "delete+insert"]

@staticmethod
def is_valid_identifier(identifier) -> bool:
# The first character should be alphabetic
if not identifier[0].isalpha():
return False
# Rest of the characters is either alphanumeric or any one of the literals '#', '$', '_'
idx = 1
while idx < len(identifier):
identifier_chr = identifier[idx]
if not identifier_chr.isalnum() and identifier_chr not in ('#', '$', '_'):
return False
idx += 1
return True


@available
def should_identifier_be_quoted(self,
identifier,
models_column_dict=None) -> bool:

#Check if the naming is valid
if not self.is_valid_identifier(identifier):
return True
#check if the column is set to be quoted in the model config
elif models_column_dict and identifier in models_column_dict:
return models_column_dict[identifier].get('quote', False)
elif models_column_dict and self.quote(identifier) in models_column_dict:
return models_column_dict[self.quote(identifier)].get('quote', False)
return False

@available
def check_and_quote_identifier(self, identifier, models_column_dict=None) -> str:
if self.should_identifier_be_quoted(identifier, models_column_dict):
return self.quote(identifier)
else:
return identifier
11 changes: 5 additions & 6 deletions dbt/include/exasol/macros/adapters.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

/*
LIST_RELATIONS_MACRO_NAME = 'list_relations_without_caching'
GET_COLUMNS_IN_RELATION_MACRO_NAME = 'get_columns_in_relation'
Expand Down Expand Up @@ -139,7 +138,7 @@ AS
{% macro exasol__alter_relation_comment(relation, relation_comment) -%}
{# Comments on views are not supported outside DDL, see https://docs.exasol.com/db/latest/sql/comment.htm#UsageNotes #}
{%- if not relation.is_view %}
{%- set comment = relation_comment | replace("'", '"') %}
{%- set comment = relation_comment | replace("'", "''") %}
COMMENT ON {{ relation.type }} {{ relation }} IS '{{ comment }}';
{%- endif %}
{% endmacro %}
Expand All @@ -155,7 +154,7 @@ AS
{% set matched_column = None -%}
{% endif -%}
{% if matched_column -%}
{% set comment = column_dict[matched_column]['description'] | replace("'", '"') -%}
{% set comment = column_dict[matched_column]['description'] | replace("'", "''") -%}
{% else -%}
{% set comment = "" -%}
{% endif -%}
Expand Down Expand Up @@ -187,7 +186,7 @@ AS

{% macro persist_view_relation_docs() %}
{%- if config.persist_relation_docs() %}
COMMENT IS '{{ model.description }}'
COMMENT IS '{{ model.description | replace("'", "''")}}'
{%- endif %}
{% endmacro %}

Expand All @@ -197,8 +196,8 @@ COMMENT IS '{{ model.description }}'
{% endcall %}
{% endmacro %}

{% macro exasol__get_empty_subquery_sql(select_sql, sql_header=None ) %}
{% macro exasol__get_empty_subquery_sql(select_sql, select_sql_header=None ) %}
select * from (
{{ select_sql }}
) dbt_sbq_tmp
{% endmacro %}
{% endmacro %}
5 changes: 3 additions & 2 deletions dbt/include/exasol/macros/catalog.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
OBJECT_NAME as table_name,
ROOT_NAME as table_owner,
OBJECT_TYPE AS table_type,
OBJECT_COMMENT
OBJECT_COMMENT as table_comment
from sys.EXA_USER_OBJECTS
WHERE OBJECT_TYPE IN('TABLE', 'VIEW')

Expand All @@ -34,7 +34,8 @@
)

select tabs.table_owner as [table_owner],
tabs.table_type AS [table_type],
tabs.table_type AS [table_type],
tabs.table_comment as [table_comment],
cols.table_database as [table_database],
cols.table_schema as [table_schema],
cols.table_name as [table_name],
Expand Down
102 changes: 43 additions & 59 deletions dbt/include/exasol/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
@@ -1,82 +1,63 @@
{% materialization incremental, adapter='exasol', supported_languages=['sql'] %}

{% materialization incremental, adapter='exasol' -%}

-- relations
{%- set existing_relation = load_relation(this) -%}

{%- set target_relation = this.incorporate(type='table') -%}
{%- set temp_relation = make_temp_relation(target_relation)-%}
{%- set intermediate_relation = make_intermediate_relation(target_relation)-%}
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}

-- configs
{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%}
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}

-- the temp_ and backup_ relations should not already exist in the database; get_relation
-- will return None in that case. Otherwise, we get a relation that we can drop
-- later, before we try to use this name for the current operation. This has to happen before
-- BEGIN, in a separate transaction
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation)-%}
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
-- grab current tables grants config for comparision later on
{% set grant_config = config.get('grants') %}
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}
{% set unique_key = config.get('unique_key') %}
{% set full_refresh_mode = flags.FULL_REFRESH %}
{%- set language = model['language'] -%}
{% set target_relation = this.incorporate(type='table') %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
{% set grant_config = config.get('grants') %}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set to_drop = [] %}

{% if existing_relation is none %}
{% set build_sql = get_create_table_as_sql(False, target_relation, sql) %}
{% elif full_refresh_mode %}
{% set build_sql = get_create_table_as_sql(False, intermediate_relation, sql) %}
{% set need_swap = true %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or full_refresh_mode %}
{#-- Checking if backup relation exists#}
{% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %}
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
{% do adapter.drop_relation(backup_relation) %}
{% if existing_relation.is_view %}
{% do adapter.drop_relation(existing_relation) %}
{% else %}
{% do adapter.rename_relation(existing_relation, backup_relation) %}
{% endif %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% do to_drop.append(backup_relation) %}
{% else %}
{% do run_query(get_create_table_as_sql(True, temp_relation, sql)) %}
{% do adapter.expand_target_column_types(
from_relation=temp_relation,
{% set tmp_relation = make_temp_relation(target_relation) %}
{% do to_drop.append(tmp_relation) %}
{% call statement("make_tmp_relation") %}
{{create_table_as(True, tmp_relation, sql)}}
{% endcall %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %}
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}

{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %}
{% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %}
{% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}

{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': tmp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %}
{% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %}

{% endif %}

{% call statement("main") %}
{{ build_sql }}
{% endcall %}

{% if need_swap %}
{% do adapter.rename_relation(existing_relation, backup_relation) %}
{% do adapter.rename_relation(intermediate_relation, target_relation) %}
{% do to_drop.append(backup_relation) %}
{% endif %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

{% if existing_relation is none or existing_relation.is_view or should_full_refresh() %}
{% do create_indexes(target_relation) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
Expand All @@ -88,6 +69,9 @@

{{ run_hooks(post_hooks, inside_transaction=False) }}

{% set should_revoke = should_revoke(existing_relation.is_table, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}
119 changes: 114 additions & 5 deletions dbt/include/exasol/macros/materializations/merge.sql
Original file line number Diff line number Diff line change
@@ -1,24 +1,34 @@
{% macro exasol__get_delete_insert_merge_sql(target, source, unique_key, dest_columns,incremental_predicates=none) -%}
{% macro exasol__get_delete_insert_merge_sql(target, source, unique_key, dest_columns,incremental_predicates) -%}

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

{% if unique_key %}
{% if unique_key is sequence and unique_key is not string %}
delete from {{ target }}
delete from {{target }}
where exists ( select 1 from {{ source }}
where
where
{% for key in unique_key %}
{{ source }}.{{ key }} = {{ target }}.{{ key }}
{{ "and " if not loop.last }}
{{ "and " if not loop.last}}
{% endfor %}
)
{% if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{% endif %}
);
{% else %}
delete from {{ target }}
where (
{{ unique_key }}) in (
select ({{ unique_key }})
from {{ source }}
)
{%- if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{%- endif -%};

{% endif %}
{% endif %}
Expand All @@ -32,3 +42,102 @@
)

{%- endmacro %}


{% macro exasol_check_and_quote_unique_key_for_incremental_merge(unique_key, incremental_predicates=none) %}
{%- set unique_key_list = [] -%}
{%- set unique_key_merge_predicates = [] if incremental_predicates is none else [] + incremental_predicates -%}
{% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
{% for key in unique_key | unique %}
{% if adapter.should_identifier_be_quoted(key, model.columns) == true %}
{% do unique_key_list.append('"' ~ key ~ '"') %}
{% else %}
{% do unique_key_list.append(key.upper()) %}
{% endif %}
{% endfor %}
{% else %}
{% if adapter.should_identifier_be_quoted(unique_key, model.columns) == true %}
{% do unique_key_list.append('"' ~ unique_key ~ '"') %}
{% else %}
{% do unique_key_list.append(unique_key.upper()) %}
{% endif %}
{% endif %}
{% for key in unique_key_list %}
{% set this_key_match %}
DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }}
{% endset %}
{% do unique_key_merge_predicates.append(this_key_match) %}
{% endfor %}
{%- set unique_key_result = {'unique_key_list': unique_key_list,
'unique_key_merge_predicates': unique_key_merge_predicates} -%}
{{ return(unique_key_result)}}
{% endmacro %}


{% macro exasol__get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) %}
{%- set default_cols = dest_columns | map(attribute='name') | list -%}

{%- if merge_update_columns and merge_exclude_columns -%}
{{ exceptions.raise_compiler_error(
'Model cannot specify merge_update_columns and merge_exclude_columns. Please update model to use only one config'
)}}
{%- elif merge_update_columns -%}
{%- set update_columns = merge_update_columns -%}
{%- elif merge_exclude_columns -%}
{%- set update_columns = [] -%}
{%- for column in dest_columns -%}
{% if column.column | lower not in merge_exclude_columns | map("lower") | list %}
{%- do update_columns.append(column.name) -%}
{% endif %}
{%- endfor -%}
{%- else -%}
{%- set update_columns = default_cols -%}
{%- endif -%}

{%- set quoted_update_columns = [] -%}
{% for col in update_columns %}
{% do quoted_update_columns.append(adapter.check_and_quote_identifier(col, model.columns)) %}
{% endfor %}
{{ return(quoted_update_columns)}}
{% endmacro %}


{% macro exasol__get_incremental_merge_sql(args_dict) %}
{%- set dest_columns = args_dict["dest_columns"] -%}
{%- set temp_relation = args_dict["temp_relation"] -%}
{%- set target_relation = args_dict["target_relation"] -%}
{%- set unique_key = args_dict["unique_key"] -%}
{%- set dest_column_names = dest_columns | map(attribute='name') | list -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set merge_update_columns = config.get('merge_update_columns') -%}
{%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
{%- set incremental_predicates = args_dict["incremental_predicates"] -%}
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}
{%- if unique_key -%}
{%- set unique_key_result = exasol_check_and_quote_unique_key_for_incremental_merge(unique_key, incremental_predicates) -%}
{%- set unique_key_list = unique_key_result['unique_key_list'] -%}
{%- set unique_key_merge_predicates = unique_key_result['unique_key_merge_predicates'] -%}
merge into {{ target_relation }} DBT_INTERNAL_DEST
using {{ temp_relation }} DBT_INTERNAL_SOURCE
on ({{ unique_key_merge_predicates | join(' AND ') }})
when matched then
update set
{% for col in update_columns if (col.upper() not in unique_key_list and col not in unique_key_list) -%}
DBT_INTERNAL_DEST.{{ col }} = DBT_INTERNAL_SOURCE.{{ col }}{% if not loop.last %}, {% endif %}
{% endfor -%}
when not matched then
insert({{ dest_cols_csv }})
values(
{% for col in dest_columns -%}
DBT_INTERNAL_SOURCE.{{ adapter.check_and_quote_identifier(col.name, model.columns) }}{% if not loop.last %}, {% endif %}
{% endfor -%}
)
{%- else -%}
insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ temp_relation }}
)
{%- endif -%}
{% endmacro %}

Loading