diff --git a/dbt/adapters/databricks/__version__.py b/dbt/adapters/databricks/__version__.py index 8fe27739..4d389309 100644 --- a/dbt/adapters/databricks/__version__.py +++ b/dbt/adapters/databricks/__version__.py @@ -1 +1 @@ -version: str = "1.8.6" +version: str = "1.8.6a1" diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index 46cb11dd..8614b53a 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -45,6 +45,7 @@ from dbt.adapters.databricks.connections import USE_LONG_SESSIONS from dbt.adapters.databricks.python_submissions import ( DbtDatabricksAllPurposeClusterPythonJobHelper, + SessionHelper ) from dbt.adapters.databricks.python_submissions import ( DbtDatabricksJobClusterPythonJobHelper, @@ -611,6 +612,7 @@ def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]: return { "job_cluster": DbtDatabricksJobClusterPythonJobHelper, "all_purpose_cluster": DbtDatabricksAllPurposeClusterPythonJobHelper, + "session": SessionHelper, } @available diff --git a/dbt/adapters/databricks/python_submissions.py b/dbt/adapters/databricks/python_submissions.py index b725191e..aa2c98c4 100644 --- a/dbt/adapters/databricks/python_submissions.py +++ b/dbt/adapters/databricks/python_submissions.py @@ -600,3 +600,16 @@ def check_credentials(self) -> None: "Databricks `http_path` or `cluster_id` of an all-purpose cluster is required " "for the `all_purpose_cluster` submission method." ) + + +class SessionHelper(PythonJobHelper): + def __init__(self, parsed_model: Dict, credentials: DatabricksCredentials) -> None: + pass + + def submit(self, compiled_code: str) -> Any: + try: + from pyspark.sql import SparkSession + spark = SparkSession.getActiveSession() + exec(compiled_code, {"spark": spark}) + except Exception as e: + raise DbtRuntimeError(f"Python model failed with traceback as:\n{e}") \ No newline at end of file