Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement microbatch incremental strategy #825

Merged
merged 8 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- Add `include_full_name_in_path` config boolean for external locations. This writes tables to {location_root}/{catalog}/{schema}/{table} ([823](https://github.com/databricks/dbt-databricks/pull/823))
- Add a new `workflow_job` submission method for python, which creates a long-lived Databricks Workflow instead of a one-time run (thanks @kdazzle!) ([762](https://github.com/databricks/dbt-databricks/pull/762))
- Allow for additional options to be passed to the Databricks Job API when using other python submission methods. For example, enable email_notifications (thanks @kdazzle!) ([762](https://github.com/databricks/dbt-databricks/pull/762))
- Support microbatch incremental strategy using replace_where ([825](https://github.com/databricks/dbt-databricks/pull/825))

### Under the Hood

Expand Down
16 changes: 9 additions & 7 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ def run_sql_for_tests(
conn.transaction_open = False

def valid_incremental_strategies(self) -> List[str]:
return ["append", "merge", "insert_overwrite", "replace_where"]
return ["append", "merge", "insert_overwrite", "replace_where", "microbatch"]

@property
def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]:
Expand Down Expand Up @@ -699,12 +699,14 @@ def get_persist_doc_columns(
# an error when we tried to alter the table.
for column in existing_columns:
name = column.column
if (
name in columns
and "description" in columns[name]
and columns[name]["description"] != (column.comment or "")
):
return_columns[name] = columns[name]
if name in columns:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure exactly what introduces this uncertainty, but I've experimentally observed that sometimes config_column is a dict and sometimes its a ColumnInfo, and these types have different access methods for getting description.

config_column = columns[name]
if isinstance(config_column, dict):
comment = columns[name].get("description", "")
else:
comment = config_column.description
benc-db marked this conversation as resolved.
Show resolved Hide resolved
if comment != (column.comment or ""):
return_columns[name] = columns[name]

return return_columns

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,18 @@ select {{source_cols_csv}} from {{ source_relation }}
{%- endfor %})
{%- endif -%}
{% endmacro %}

{% macro databricks__get_incremental_microbatch_sql(arg_dict) %}
{%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%}
{%- set event_time = model.config.event_time -%}
{%- set start_time = config.get("__dbt_internal_microbatch_event_time_start") -%}
{%- set end_time = config.get("__dbt_internal_microbatch_event_time_end") -%}
{%- if start_time -%}
{%- do incremental_predicates.append("cast(" ~ event_time ~ " as TIMESTAMP) >= '" ~ start_time ~ "'") -%}
{%- endif -%}
{%- if end_time -%}
{%- do incremental_predicates.append("cast(" ~ event_time ~ " as TIMESTAMP) < '" ~ end_time ~ "'") -%}
{%- endif -%}
{%- do arg_dict.update({'incremental_predicates': incremental_predicates}) -%}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preps for replace_where strategy by adding

cast(<event_time> as TIMESTAMP) >= <start_time> and cast(<event_time> as TIMESTAMP) < <end_time>

as an incremental predicate.

{{ return(get_replace_where_sql(arg_dict)) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
Use the 'merge' or 'replace_where' strategy instead
{%- endset %}

{% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'replace_where'] %}
{% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'replace_where', 'microbatch'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_delta_only_msg) %}
{% endif %}
{% if raw_strategy == 'replace_where' and file_format not in ['delta'] %}
{% if raw_strategy in ('replace_where', 'microbatch') and file_format not in ['delta'] %}
{% do exceptions.raise_compiler_error(invalid_delta_only_msg) %}
{% endif %}
{% endif %}
Expand Down
16 changes: 16 additions & 0 deletions tests/functional/adapter/microbatch/fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
schema = """version: 2
models:
- name: input_model

- name: microbatch_model
config:
persist_docs:
relation: True
columns: True
description: This is a microbatch model
columns:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added my own schema with column comments to supplement the included tests, since hitting comments originally broke my implementation despite passing the included functional tests.

- name: id
description: "Id of the model"
- name: event_time
description: "Timestamp of the event"
"""
16 changes: 16 additions & 0 deletions tests/functional/adapter/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dbt.tests.adapter.incremental.test_incremental_microbatch import (
BaseMicrobatch,
)
import pytest

from tests.functional.adapter.microbatch import fixtures


class TestDatabricksMicrobatch(BaseMicrobatch):
@pytest.fixture(scope="class")
def models(self, microbatch_model_sql, input_model_sql):
return {
"schema.yml": fixtures.schema,
"input_model.sql": input_model_sql,
"microbatch_model.sql": microbatch_model_sql,
}
Loading