Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Custom incremental strategy not working with dbt-databricks #9516

Closed
3 tasks done
Mali-DS opened this issue Feb 4, 2024 · 1 comment
Closed
3 tasks done

[Bug] Custom incremental strategy not working with dbt-databricks #9516

Mali-DS opened this issue Feb 4, 2024 · 1 comment
Labels
bug Something isn't working wontfix Not a bug or out of scope for dbt-core

Comments

@Mali-DS
Copy link

Mali-DS commented Feb 4, 2024

Is this your first time submitting a feature request?

  • I have read the expectations for open source contributors
  • I have searched the existing issues, and I could not find an existing issue for this feature
  • I am requesting a straightforward extension of existing dbt functionality, rather than a Big Idea better suited to a discussion

Describe the feature

Hi
I need to have a new merge strategy in databricks:
I've created a macro like as :

{% macro get_incremental_merge_conditional_sql(arg_dict) %}

  {% do return(custom_merge_conditional_sql(arg_dict["target"], arg_dict["source"], arg_dict["unique_key"], arg_dict["dest_columns"], arg_dict["predicates"])) %}

{% endmacro %}

{% macro custom_merge_conditional_sql(target, source, unique_key, dest_columns, predicates) %}
    {% set predicates = [] if predicates is none else [] + predicates %}
    {% set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) %}
    {% set update_columns = config.get('merge_update_columns', default = dest_columns | map(attribute="quoted") | list) %}
    {% set sql_header = config.get('sql_header', none) %}
    {% set date_col = config.get('update_date_column', none) %}
    {% if unique_key %}
        {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
            {% for key in unique_key %}
                {% set this_key_match %}
                    DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }}
                {% endset %}
                {% do predicates.append(this_key_match) %}
            {% endfor %}
        {% else %}
            {% set unique_key_match %}
                DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
            {% endset %}
            {% do predicates.append(unique_key_match) %}
        {% endif %}
    {% else %}
        {% do predicates.append('FALSE') %}
    {% endif %}

    {{ sql_header if sql_header is not none }}

    merge into {{ target }} as DBT_INTERNAL_DEST
        using {{ source }} as DBT_INTERNAL_SOURCE
        on {{ predicates | join(' and ') }}

    {% if unique_key %}
    when matched {{ add_merge_date_condition() }} then update set
        {% for column_name in update_columns -%}
            {{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
            {% if not loop.last %}, {% endif %}
        {% endfor %}
        {% if date_col %} and DBT_INTERNAL_SOURCE.{{date_col}} >= DBT_INTERNAL_DEST.{{date_col}} {% endif %} 
    {% endif %}

    when not matched then insert
        ({{ dest_cols_csv }})
    values
        ({{ dest_cols_csv }})

{% endmacro %}

then Tried to call this new merge strategy in my model like this:

{{config(
 materialized='incremental',
 incremental_strategy='merge_conditional',
 unique_key = ['order_id','status'],
 predicates = ['DBT_INTERNAL_SOURCE.status_datetime_nzt > DBT_INTERNAL_DEST.status_datetime_nzt'],
 update_date_column = ['status_datetime_nzt'],
 tags=["magento"]
 )}}

SELECT
  id as order_id,
  status ,
  status_datetime_nzt,
  CAST('' AS VARCHAR(4)) as member_id,
  customer.segment as customer_segment,
  customer.type as customer_type,
  get(filter(custom_attributes, x -> x.name= 'Document_No'), 0)['value'] as document_no,
  get(filter(custom_attributes, x -> x.name= 'flybuys_card_number'), 0)['value'] as flybuys_card_number,
  get(filter(custom_attributes, x -> x.name= 'Document_Link'), 0)['value'] as document_link,
  get(filter(custom_attributes, x -> x.name= 'Token_id'), 0)['value'] as token_id,
  get(filter(custom_attributes, x -> x.name= 'flybuys_points'), 0)['value'] as flybuys_points,
  get(filter(custom_attributes, x -> x.name= 'flybuys_price'), 0)['value'] as flybuys_price,
  get(filter(custom_attributes, x -> x.name= 'harry_id'), 0)['value'] as harry_id,
  get(filter(custom_attributes, x -> x.name= 'lc_purchaseUUID'), 0)['value'] as lc_purchaseUUID,
  get(filter(custom_attributes, x -> x.name= 'title'), 0)['value'] as title
FROM {{ source('magento', 'derived_magento_orders_events') }} 

and even I defined the new merge stretagy in dbt_project.yml file like this:

models:
 data_platform:   
  derived_lc_resources:
       derived_order_custom_attributes:  
           +schema: derived_lc_resources
           +materialized: incremental
           +incremental_strategy: merge_conditional

The error is here:
Invalid incremental strategy provided: merge_conditional
Expected one of: 'merge', 'replace_where', 'append', 'insert_overwrite'

Looks dbt can not figure out the new incremental strategy?

Please help
Thanks
Mali

Describe alternatives you've considered

No response

Who will this benefit?

No response

Are you interested in contributing this feature?

No response

Anything else?

No response

@Mali-DS Mali-DS added enhancement New feature or request triage labels Feb 4, 2024
@dbeatty10
Copy link
Contributor

Thanks for reaching out @Mali-DS !

I tried the simple custom incremental strategy here, and it worked for dbt-postgres, dbt-redshift, and dbt-snowflake.

It doesn't work on dbt-bigquery or dbt-spark because custom incremental strategies have not yet been implemented for those: #9290.

It also looks like custom incremental strategies are not yet supported in dbt-databricks.

So you'd need to reach out to the maintainers of dbt-databricks about this one since this is not an issue with dbt-core.

@dbeatty10 dbeatty10 closed this as not planned Won't fix, can't repro, duplicate, stale Feb 6, 2024
@dbeatty10 dbeatty10 added wontfix Not a bug or out of scope for dbt-core and removed triage labels Feb 6, 2024
@dbeatty10 dbeatty10 changed the title [Feature] <Custom Incremental Strategy> [Bug] Custom incremental strategy not working with dbt-databricks Feb 6, 2024
@dbeatty10 dbeatty10 added bug Something isn't working and removed enhancement New feature or request labels Feb 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working wontfix Not a bug or out of scope for dbt-core
Projects
None yet
Development

No branches or pull requests

2 participants