Skip to content

Commit

Permalink
recent updates during local code init review
Browse files Browse the repository at this point in the history
  • Loading branch information
hengrumay committed Jul 22, 2024
1 parent 1b0d16d commit a364abf
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,24 @@ def predict_batch(
Apply the model at the specified URI for batch inference on the table with name input_table_name,
writing results to the table with name output_table_name
"""
{{ if (eq .input_include_models_in_unity_catalog "yes") }}mlflow.set_registry_uri("databricks-uc"){{ end }}
{{ if (eq .input_include_models_in_unity_catalog `yes`) }}
mlflow.set_registry_uri("databricks-uc")
{{ end }}
table = spark_session.table(input_table_name)
{{ if (eq .input_include_feature_store `yes`) }}
{{ if (eq .input_include_models_in_unity_catalog "no") }}
{{ if (eq .input_include_feature_store `yes`) }}
{{ if (eq .input_include_models_in_unity_catalog `no`) }}
from databricks.feature_store import FeatureStoreClient

fs_client = FeatureStoreClient()
prediction_df = fs_client.score_batch(model_uri, table)

prediction_df = fs_client.score_batch(model_uri, table)
{{ else }}
from databricks.feature_engineering import FeatureEngineeringClient

fe_client = FeatureEngineeringClient()

prediction_df = fe_client.score_batch(model_uri = model_uri, df = table)
{{ end }}
{{ end }}
output_df = (
prediction_df.withColumn("prediction", prediction_df["prediction"])
.withColumn("model_version", lit(model_version))
Expand All @@ -29,14 +35,17 @@ def predict_batch(
{{ else }}
predict = mlflow.pyfunc.spark_udf(
spark_session, model_uri, result_type="string", env_manager="virtualenv"
)
)

output_df = (
table.withColumn("prediction", predict(struct(*table.columns)))
.withColumn("model_version", lit(model_version))
.withColumn("inference_timestamp", to_timestamp(lit(ts)))
)
{{ end }}
{{ end -}}

output_df.display()

# Model predictions are written to the Delta table provided as input.
# Delta is the default format in Databricks Runtime 8.0 and above.
output_df.write.format("delta").mode("overwrite").saveAsTable(output_table_name)
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,9 @@ notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.ge
%cd $notebook_path
%cd ../features


# COMMAND ----------
# DBTITLE 1,Define input and output variables

# DBTITLE 1,Define input and output variables
input_table_path = dbutils.widgets.get("input_table_path")
output_table_name = dbutils.widgets.get("output_table_name")
input_start_date = dbutils.widgets.get("input_start_date")
Expand All @@ -87,19 +86,18 @@ assert output_table_name != "", "output_table_name notebook parameter must be sp
output_database = output_table_name.split(".")[0]

# COMMAND ----------
# DBTITLE 1,Create database.

# DBTITLE 1,Create database.
spark.sql("CREATE DATABASE IF NOT EXISTS " + output_database)

# COMMAND ----------
# DBTITLE 1, Read input data.

# DBTITLE 1, Read input data.
raw_data = spark.read.format("delta").load(input_table_path)


# COMMAND ----------
# DBTITLE 1,Compute features.

# DBTITLE 1,Compute features.
# Compute the features. This is done by dynamically loading the features module.
from importlib import import_module

Expand All @@ -114,9 +112,9 @@ features_df = compute_features_fn(
)

# COMMAND ----------
# DBTITLE 1, Write computed features.

{{- if (eq .input_include_models_in_unity_catalog "no") }}
# DBTITLE 1, Write computed features.
{{- if (eq .input_include_models_in_unity_catalog `no`) }}
from databricks import feature_store

fs = feature_store.FeatureStoreClient()
Expand All @@ -135,10 +133,9 @@ fs.write_table(
name=output_table_name,
df=features_df,
mode="merge",
)

{{else}}
){{ else }}
from databricks.feature_engineering import FeatureEngineeringClient

fe = FeatureEngineeringClient()

# Create the feature table if it does not exist first.
Expand All @@ -155,7 +152,8 @@ fe.write_table(
name=output_table_name,
df=features_df,
mode="merge",
)
{{end -}}
){{ end }}

# COMMAND ----------

dbutils.notebook.exit(0)
dbutils.notebook.exit(0)
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.ge
dbutils.library.restartPython()

# COMMAND ----------
# DBTITLE 1, Notebook arguments

# DBTITLE 1, Notebook arguments
# List of input args needed to run this notebook as a job.
# Provide them via DB widgets or notebook arguments.

Expand Down Expand Up @@ -77,15 +77,15 @@ dbutils.widgets.text(
{{end -}}

# COMMAND ----------
# DBTITLE 1,Define input and output variables

# DBTITLE 1,Define input and output variables
input_table_path = dbutils.widgets.get("training_data_path")
experiment_name = dbutils.widgets.get("experiment_name")
model_name = dbutils.widgets.get("model_name")

# COMMAND ----------
# DBTITLE 1, Set experiment

# DBTITLE 1, Set experiment
import mlflow

mlflow.set_experiment(experiment_name)
Expand All @@ -96,12 +96,13 @@ mlflow.set_registry_uri('databricks-uc')
{{end -}}

# COMMAND ----------
# DBTITLE 1, Load raw data

# DBTITLE 1, Load raw data
training_df = spark.read.format("delta").load(input_table_path)
training_df.display()

# COMMAND ----------

# DBTITLE 1, Helper function
from mlflow.tracking import MlflowClient
import mlflow.pyfunc
Expand All @@ -123,8 +124,8 @@ def get_latest_model_version(model_name):
# MAGIC Train a LightGBM model on the data, then log and register the model with MLflow.

# COMMAND ----------
# DBTITLE 1, Train model

# DBTITLE 1, Train model
import mlflow
from sklearn.model_selection import train_test_split
import lightgbm as lgb
Expand Down Expand Up @@ -152,8 +153,8 @@ num_rounds = 100
model = lgb.train(param, train_lgb_dataset, num_rounds)

# COMMAND ----------
# DBTITLE 1, Log model and return output.

# DBTITLE 1, Log model and return output.
# Take the first row of the training dataset as the model input example.
input_example = X_train.iloc[[0]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.ge
dbutils.library.restartPython()

# COMMAND ----------
# DBTITLE 1, Notebook arguments

# DBTITLE 1, Notebook arguments
# List of input args needed to run this notebook as a job.
# Provide them via DB widgets or notebook arguments.

Expand All @@ -62,19 +62,15 @@ dbutils.widgets.text(
label="MLflow experiment name",
)

{{- if (eq .input_include_models_in_unity_catalog "no") }}
{{ if (eq .input_include_models_in_unity_catalog `no`) }}
# MLflow registered model name to use for the trained mode.
dbutils.widgets.text(
"model_name", "dev-{{template `model_name` .}}", label="Model Name"
)

{{else}}
){{ else }}
# Unity Catalog registered model name to use for the trained mode.
dbutils.widgets.text(
"model_name", "dev.{{ .input_schema_name }}.{{template `model_name` .}}", label="Full (Three-Level) Model Name"
)

{{end -}}
){{ end }}

# Pickup features table name
dbutils.widgets.text(
Expand All @@ -93,29 +89,29 @@ dbutils.widgets.text(
)

# COMMAND ----------
# DBTITLE 1,Define input and output variables

# DBTITLE 1,Define input and output variables
input_table_path = dbutils.widgets.get("training_data_path")
experiment_name = dbutils.widgets.get("experiment_name")
model_name = dbutils.widgets.get("model_name")

# COMMAND ----------
# DBTITLE 1, Set experiment

# DBTITLE 1, Set experiment
import mlflow

mlflow.set_experiment(experiment_name)
{{ if (eq .input_include_models_in_unity_catalog `yes`) }}mlflow.set_registry_uri('databricks-uc'){{ end }}

# COMMAND ----------
# DBTITLE 1, Load raw data

# DBTITLE 1, Load raw data
raw_data = spark.read.format("delta").load(input_table_path)
raw_data.display()

# COMMAND ----------
# DBTITLE 1, Helper functions

# DBTITLE 1, Helper functions
from datetime import timedelta, timezone
import math
import mlflow.pyfunc
Expand Down Expand Up @@ -173,15 +169,15 @@ def get_latest_model_version(model_name):


# COMMAND ----------
# DBTITLE 1, Read taxi data for training

# DBTITLE 1, Read taxi data for training
taxi_data = rounded_taxi_data(raw_data)
taxi_data.display()

# COMMAND ----------
# DBTITLE 1, Create FeatureLookups

{{- if (eq .input_include_models_in_unity_catalog "no") }}
# DBTITLE 1, Create FeatureLookups
{{- if (eq .input_include_models_in_unity_catalog `no`) }}
from databricks.feature_store import FeatureLookup
import mlflow

Expand All @@ -207,9 +203,7 @@ dropoff_feature_lookups = [
lookup_key=["dropoff_zip"],
timestamp_lookup_key=["rounded_dropoff_datetime"],
),
]

{{else}}
]{{ else }}
from databricks.feature_engineering import FeatureLookup
import mlflow

Expand All @@ -235,13 +229,12 @@ dropoff_feature_lookups = [
lookup_key=["dropoff_zip"],
timestamp_lookup_key=["rounded_dropoff_datetime"],
),
]
{{end -}}
]{{ end }}

# COMMAND ----------
# DBTITLE 1, Create Training Dataset

{{- if (eq .input_include_models_in_unity_catalog "no") }}
# DBTITLE 1, Create Training Dataset
{{ if (eq .input_include_models_in_unity_catalog `no`) }}
from databricks import feature_store

# End any existing runs (in the case this notebook is being run for a second time)
Expand All @@ -263,8 +256,7 @@ training_set = fs.create_training_set(
label="fare_amount",
exclude_columns=exclude_columns,
)

{{else}}
{{ else }}
from databricks.feature_engineering import FeatureEngineeringClient

# End any existing runs (in the case this notebook is being run for a second time)
Expand All @@ -282,11 +274,12 @@ fe = FeatureEngineeringClient()
# Create the training set that includes the raw input data merged with corresponding features from both feature tables
training_set = fe.create_training_set(
df=taxi_data, # specify the df
feature_lookups=pickup_feature_lookups + dropoff_feature_lookups, # both features need to be available; defined in GenerateAndWriteFeatures &/or feature-engineering-workflow-resource.yml
feature_lookups=pickup_feature_lookups + dropoff_feature_lookups,
# both features need to be available; defined in GenerateAndWriteFeatures &/or feature-engineering-workflow-resource.yml
label="fare_amount",
exclude_columns=exclude_columns,
)
{{end -}}
{{ end }}

# Load the TrainingSet into a dataframe which can be passed into sklearn for training a model
training_df = training_set.load_df()
Expand All @@ -302,8 +295,8 @@ training_df.display()
# MAGIC Train a LightGBM model on the data returned by `TrainingSet.to_df`, then log the model with `FeatureStoreClient.log_model`. The model will be packaged with feature metadata.

# COMMAND ----------
# DBTITLE 1, Train model

# DBTITLE 1, Train model
import lightgbm as lgb
from sklearn.model_selection import train_test_split
import mlflow.lightgbm
Expand Down Expand Up @@ -332,18 +325,17 @@ num_rounds = 100
model = lgb.train(param, train_lgb_dataset, num_rounds)

# COMMAND ----------
# DBTITLE 1, Log model and return output.

{{- if (eq .input_include_models_in_unity_catalog "no") }}
# DBTITLE 1, Log model and return output.
{{- if (eq .input_include_models_in_unity_catalog `no`) }}
# Log the trained model with MLflow and package it with feature lookup information.
fs.log_model(
model=model, #specify model
artifact_path="model_packaged",
flavor=mlflow.lightgbm,
training_set=training_set,
registered_model_name=model_name,
)
{{else}}
){{ else }}
# Log the trained model with MLflow and package it with feature lookup information.
fe.log_model(
model=model, #specify model
Expand All @@ -352,7 +344,7 @@ fe.log_model(
training_set=training_set,
registered_model_name=model_name,
)
{{end -}}
{{ end }}

# The returned model URI is needed by the model deployment notebook.
model_version = get_latest_model_version(model_name)
Expand Down

0 comments on commit a364abf

Please sign in to comment.