diff --git a/template/{{.input_root_dir}}/{{template `project_name_alphanumeric_underscore` .}}/deployment/batch_inference/predict.py.tmpl b/template/{{.input_root_dir}}/{{template `project_name_alphanumeric_underscore` .}}/deployment/batch_inference/predict.py.tmpl index 743ac907..a127707b 100644 --- a/template/{{.input_root_dir}}/{{template `project_name_alphanumeric_underscore` .}}/deployment/batch_inference/predict.py.tmpl +++ b/template/{{.input_root_dir}}/{{template `project_name_alphanumeric_underscore` .}}/deployment/batch_inference/predict.py.tmpl @@ -9,18 +9,24 @@ def predict_batch( Apply the model at the specified URI for batch inference on the table with name input_table_name, writing results to the table with name output_table_name """ - {{ if (eq .input_include_models_in_unity_catalog "yes") }}mlflow.set_registry_uri("databricks-uc"){{ end }} + {{ if (eq .input_include_models_in_unity_catalog `yes`) }} + mlflow.set_registry_uri("databricks-uc") + {{ end }} table = spark_session.table(input_table_name) - {{ if (eq .input_include_feature_store `yes`) }} - {{ if (eq .input_include_models_in_unity_catalog "no") }} + {{ if (eq .input_include_feature_store `yes`) }} + {{ if (eq .input_include_models_in_unity_catalog `no`) }} from databricks.feature_store import FeatureStoreClient + fs_client = FeatureStoreClient() - prediction_df = fs_client.score_batch(model_uri, table) + + prediction_df = fs_client.score_batch(model_uri, table) {{ else }} from databricks.feature_engineering import FeatureEngineeringClient + fe_client = FeatureEngineeringClient() + prediction_df = fe_client.score_batch(model_uri = model_uri, df = table) - {{ end }} + {{ end }} output_df = ( prediction_df.withColumn("prediction", prediction_df["prediction"]) .withColumn("model_version", lit(model_version)) @@ -29,14 +35,17 @@ def predict_batch( {{ else }} predict = mlflow.pyfunc.spark_udf( spark_session, model_uri, result_type="string", env_manager="virtualenv" - ) + ) + output_df = ( table.withColumn("prediction", predict(struct(*table.columns))) .withColumn("model_version", lit(model_version)) .withColumn("inference_timestamp", to_timestamp(lit(ts))) ) - {{ end }} + {{ end -}} + output_df.display() + # Model predictions are written to the Delta table provided as input. # Delta is the default format in Databricks Runtime 8.0 and above. output_df.write.format("delta").mode("overwrite").saveAsTable(output_table_name) \ No newline at end of file diff --git a/template/{{.input_root_dir}}/{{template `project_name_alphanumeric_underscore` .}}/feature_engineering/notebooks/GenerateAndWriteFeatures.py.tmpl b/template/{{.input_root_dir}}/{{template `project_name_alphanumeric_underscore` .}}/feature_engineering/notebooks/GenerateAndWriteFeatures.py.tmpl index f926358d..da3f22d9 100644 --- a/template/{{.input_root_dir}}/{{template `project_name_alphanumeric_underscore` .}}/feature_engineering/notebooks/GenerateAndWriteFeatures.py.tmpl +++ b/template/{{.input_root_dir}}/{{template `project_name_alphanumeric_underscore` .}}/feature_engineering/notebooks/GenerateAndWriteFeatures.py.tmpl @@ -68,10 +68,9 @@ notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.ge %cd $notebook_path %cd ../features - # COMMAND ---------- -# DBTITLE 1,Define input and output variables +# DBTITLE 1,Define input and output variables input_table_path = dbutils.widgets.get("input_table_path") output_table_name = dbutils.widgets.get("output_table_name") input_start_date = dbutils.widgets.get("input_start_date") @@ -87,19 +86,18 @@ assert output_table_name != "", "output_table_name notebook parameter must be sp output_database = output_table_name.split(".")[0] # COMMAND ---------- -# DBTITLE 1,Create database. +# DBTITLE 1,Create database. spark.sql("CREATE DATABASE IF NOT EXISTS " + output_database) # COMMAND ---------- -# DBTITLE 1, Read input data. +# DBTITLE 1, Read input data. raw_data = spark.read.format("delta").load(input_table_path) - # COMMAND ---------- -# DBTITLE 1,Compute features. +# DBTITLE 1,Compute features. # Compute the features. This is done by dynamically loading the features module. from importlib import import_module @@ -114,9 +112,9 @@ features_df = compute_features_fn( ) # COMMAND ---------- -# DBTITLE 1, Write computed features. -{{- if (eq .input_include_models_in_unity_catalog "no") }} +# DBTITLE 1, Write computed features. +{{- if (eq .input_include_models_in_unity_catalog `no`) }} from databricks import feature_store fs = feature_store.FeatureStoreClient() @@ -135,10 +133,9 @@ fs.write_table( name=output_table_name, df=features_df, mode="merge", -) - -{{else}} +){{ else }} from databricks.feature_engineering import FeatureEngineeringClient + fe = FeatureEngineeringClient() # Create the feature table if it does not exist first. @@ -155,7 +152,8 @@ fe.write_table( name=output_table_name, df=features_df, mode="merge", -) -{{end -}} +){{ end }} + +# COMMAND ---------- -dbutils.notebook.exit(0) +dbutils.notebook.exit(0) \ No newline at end of file diff --git a/template/{{.input_root_dir}}/{{template `project_name_alphanumeric_underscore` .}}/training/notebooks/Train.py.tmpl b/template/{{.input_root_dir}}/{{template `project_name_alphanumeric_underscore` .}}/training/notebooks/Train.py.tmpl index b6cd7693..eb909157 100644 --- a/template/{{.input_root_dir}}/{{template `project_name_alphanumeric_underscore` .}}/training/notebooks/Train.py.tmpl +++ b/template/{{.input_root_dir}}/{{template `project_name_alphanumeric_underscore` .}}/training/notebooks/Train.py.tmpl @@ -39,8 +39,8 @@ notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.ge dbutils.library.restartPython() # COMMAND ---------- -# DBTITLE 1, Notebook arguments +# DBTITLE 1, Notebook arguments # List of input args needed to run this notebook as a job. # Provide them via DB widgets or notebook arguments. @@ -77,15 +77,15 @@ dbutils.widgets.text( {{end -}} # COMMAND ---------- -# DBTITLE 1,Define input and output variables +# DBTITLE 1,Define input and output variables input_table_path = dbutils.widgets.get("training_data_path") experiment_name = dbutils.widgets.get("experiment_name") model_name = dbutils.widgets.get("model_name") # COMMAND ---------- -# DBTITLE 1, Set experiment +# DBTITLE 1, Set experiment import mlflow mlflow.set_experiment(experiment_name) @@ -96,12 +96,13 @@ mlflow.set_registry_uri('databricks-uc') {{end -}} # COMMAND ---------- -# DBTITLE 1, Load raw data +# DBTITLE 1, Load raw data training_df = spark.read.format("delta").load(input_table_path) training_df.display() # COMMAND ---------- + # DBTITLE 1, Helper function from mlflow.tracking import MlflowClient import mlflow.pyfunc @@ -123,8 +124,8 @@ def get_latest_model_version(model_name): # MAGIC Train a LightGBM model on the data, then log and register the model with MLflow. # COMMAND ---------- -# DBTITLE 1, Train model +# DBTITLE 1, Train model import mlflow from sklearn.model_selection import train_test_split import lightgbm as lgb @@ -152,8 +153,8 @@ num_rounds = 100 model = lgb.train(param, train_lgb_dataset, num_rounds) # COMMAND ---------- -# DBTITLE 1, Log model and return output. +# DBTITLE 1, Log model and return output. # Take the first row of the training dataset as the model input example. input_example = X_train.iloc[[0]] diff --git a/template/{{.input_root_dir}}/{{template `project_name_alphanumeric_underscore` .}}/training/notebooks/TrainWithFeatureStore.py.tmpl b/template/{{.input_root_dir}}/{{template `project_name_alphanumeric_underscore` .}}/training/notebooks/TrainWithFeatureStore.py.tmpl index af89a30b..aa2e35f3 100644 --- a/template/{{.input_root_dir}}/{{template `project_name_alphanumeric_underscore` .}}/training/notebooks/TrainWithFeatureStore.py.tmpl +++ b/template/{{.input_root_dir}}/{{template `project_name_alphanumeric_underscore` .}}/training/notebooks/TrainWithFeatureStore.py.tmpl @@ -39,8 +39,8 @@ notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.ge dbutils.library.restartPython() # COMMAND ---------- -# DBTITLE 1, Notebook arguments +# DBTITLE 1, Notebook arguments # List of input args needed to run this notebook as a job. # Provide them via DB widgets or notebook arguments. @@ -62,19 +62,15 @@ dbutils.widgets.text( label="MLflow experiment name", ) -{{- if (eq .input_include_models_in_unity_catalog "no") }} +{{ if (eq .input_include_models_in_unity_catalog `no`) }} # MLflow registered model name to use for the trained mode. dbutils.widgets.text( "model_name", "dev-{{template `model_name` .}}", label="Model Name" -) - -{{else}} +){{ else }} # Unity Catalog registered model name to use for the trained mode. dbutils.widgets.text( "model_name", "dev.{{ .input_schema_name }}.{{template `model_name` .}}", label="Full (Three-Level) Model Name" -) - -{{end -}} +){{ end }} # Pickup features table name dbutils.widgets.text( @@ -93,29 +89,29 @@ dbutils.widgets.text( ) # COMMAND ---------- -# DBTITLE 1,Define input and output variables +# DBTITLE 1,Define input and output variables input_table_path = dbutils.widgets.get("training_data_path") experiment_name = dbutils.widgets.get("experiment_name") model_name = dbutils.widgets.get("model_name") # COMMAND ---------- -# DBTITLE 1, Set experiment +# DBTITLE 1, Set experiment import mlflow mlflow.set_experiment(experiment_name) {{ if (eq .input_include_models_in_unity_catalog `yes`) }}mlflow.set_registry_uri('databricks-uc'){{ end }} # COMMAND ---------- -# DBTITLE 1, Load raw data +# DBTITLE 1, Load raw data raw_data = spark.read.format("delta").load(input_table_path) raw_data.display() # COMMAND ---------- -# DBTITLE 1, Helper functions +# DBTITLE 1, Helper functions from datetime import timedelta, timezone import math import mlflow.pyfunc @@ -173,15 +169,15 @@ def get_latest_model_version(model_name): # COMMAND ---------- -# DBTITLE 1, Read taxi data for training +# DBTITLE 1, Read taxi data for training taxi_data = rounded_taxi_data(raw_data) taxi_data.display() # COMMAND ---------- -# DBTITLE 1, Create FeatureLookups -{{- if (eq .input_include_models_in_unity_catalog "no") }} +# DBTITLE 1, Create FeatureLookups +{{- if (eq .input_include_models_in_unity_catalog `no`) }} from databricks.feature_store import FeatureLookup import mlflow @@ -207,9 +203,7 @@ dropoff_feature_lookups = [ lookup_key=["dropoff_zip"], timestamp_lookup_key=["rounded_dropoff_datetime"], ), -] - -{{else}} +]{{ else }} from databricks.feature_engineering import FeatureLookup import mlflow @@ -235,13 +229,12 @@ dropoff_feature_lookups = [ lookup_key=["dropoff_zip"], timestamp_lookup_key=["rounded_dropoff_datetime"], ), -] -{{end -}} +]{{ end }} # COMMAND ---------- -# DBTITLE 1, Create Training Dataset -{{- if (eq .input_include_models_in_unity_catalog "no") }} +# DBTITLE 1, Create Training Dataset +{{ if (eq .input_include_models_in_unity_catalog `no`) }} from databricks import feature_store # End any existing runs (in the case this notebook is being run for a second time) @@ -263,8 +256,7 @@ training_set = fs.create_training_set( label="fare_amount", exclude_columns=exclude_columns, ) - -{{else}} +{{ else }} from databricks.feature_engineering import FeatureEngineeringClient # End any existing runs (in the case this notebook is being run for a second time) @@ -282,11 +274,12 @@ fe = FeatureEngineeringClient() # Create the training set that includes the raw input data merged with corresponding features from both feature tables training_set = fe.create_training_set( df=taxi_data, # specify the df - feature_lookups=pickup_feature_lookups + dropoff_feature_lookups, # both features need to be available; defined in GenerateAndWriteFeatures &/or feature-engineering-workflow-resource.yml + feature_lookups=pickup_feature_lookups + dropoff_feature_lookups, + # both features need to be available; defined in GenerateAndWriteFeatures &/or feature-engineering-workflow-resource.yml label="fare_amount", exclude_columns=exclude_columns, ) -{{end -}} +{{ end }} # Load the TrainingSet into a dataframe which can be passed into sklearn for training a model training_df = training_set.load_df() @@ -302,8 +295,8 @@ training_df.display() # MAGIC Train a LightGBM model on the data returned by `TrainingSet.to_df`, then log the model with `FeatureStoreClient.log_model`. The model will be packaged with feature metadata. # COMMAND ---------- -# DBTITLE 1, Train model +# DBTITLE 1, Train model import lightgbm as lgb from sklearn.model_selection import train_test_split import mlflow.lightgbm @@ -332,9 +325,9 @@ num_rounds = 100 model = lgb.train(param, train_lgb_dataset, num_rounds) # COMMAND ---------- -# DBTITLE 1, Log model and return output. -{{- if (eq .input_include_models_in_unity_catalog "no") }} +# DBTITLE 1, Log model and return output. +{{- if (eq .input_include_models_in_unity_catalog `no`) }} # Log the trained model with MLflow and package it with feature lookup information. fs.log_model( model=model, #specify model @@ -342,8 +335,7 @@ fs.log_model( flavor=mlflow.lightgbm, training_set=training_set, registered_model_name=model_name, -) -{{else}} +){{ else }} # Log the trained model with MLflow and package it with feature lookup information. fe.log_model( model=model, #specify model @@ -352,7 +344,7 @@ fe.log_model( training_set=training_set, registered_model_name=model_name, ) -{{end -}} +{{ end }} # The returned model URI is needed by the model deployment notebook. model_version = get_latest_model_version(model_name)