diff --git a/.github/workflows/README.md b/.github/workflows/README.md new file mode 100644 index 0000000..04356c8 --- /dev/null +++ b/.github/workflows/README.md @@ -0,0 +1,7 @@ +# CI/CD Workflow Definitions +This directory contains CI/CD workflow definitions using [GitHub Actions](https://docs.github.com/en/actions), +under ``workflows``. These workflows cover testing and deployment of both ML code (for model training, batch inference, etc) and the +Databricks ML asset definitions under ``mlops_stacks_gcp_fs/assets``. + +To set up CI/CD for a new project, +please refer to [ML asset config - set up CI CD](../../mlops_stacks_gcp_fs/assets/README.md#set-up-ci-and-cd). diff --git a/.github/workflows/mlops_stacks_gcp_fs-bundle-cd-prod.yml b/.github/workflows/mlops_stacks_gcp_fs-bundle-cd-prod.yml new file mode 100644 index 0000000..c8c8168 --- /dev/null +++ b/.github/workflows/mlops_stacks_gcp_fs-bundle-cd-prod.yml @@ -0,0 +1,34 @@ +# This GitHub workflow deploys Bundle assets (ML asset config and more) +# defined under mlops_stacks_gcp_fs/assets/* +# and mlops_stacks_gcp_fs/databricks.yml with prod deployment target configs, +# when PRs are merged into the release branch +name: Bundle Deployment for mlops_stacks_gcp_fs Prod + +on: + push: + branches: + - 'release' + workflow_dispatch: + +defaults: + run: + working-directory: ./mlops_stacks_gcp_fs + +env: + DATABRICKS_TOKEN: ${{ secrets.PROD_WORKSPACE_TOKEN }} + +jobs: + prod: + concurrency: mlops_stacks_gcp_fs-prod-bundle-job + runs-on: ubuntu-22.04 + steps: + - uses: actions/checkout@v3 + - uses: databricks/setup-cli@v0.211.0 + - name: Validate Bundle For Prod + id: validate + run: | + databricks bundle validate -t prod + - name: Deploy Bundle to Prod + id: deploy + run: | + databricks bundle deploy -t prod diff --git a/.github/workflows/mlops_stacks_gcp_fs-bundle-cd-staging.yml b/.github/workflows/mlops_stacks_gcp_fs-bundle-cd-staging.yml new file mode 100644 index 0000000..e3cb067 --- /dev/null +++ b/.github/workflows/mlops_stacks_gcp_fs-bundle-cd-staging.yml @@ -0,0 +1,34 @@ +# This GitHub workflow deploys Bundle assets (ML asset config and more) +# defined under mlops_stacks_gcp_fs/assets/* +# and mlops_stacks_gcp_fs/databricks.yml with staging deployment target configs, +# when PRs are merged into the default branch +name: Bundle Deployment for mlops_stacks_gcp_fs Staging + +on: + push: + branches: + - 'main' + workflow_dispatch: + +defaults: + run: + working-directory: ./mlops_stacks_gcp_fs + +env: + DATABRICKS_TOKEN: ${{ secrets.STAGING_WORKSPACE_TOKEN }} + +jobs: + staging: + concurrency: mlops_stacks_gcp_fs-staging-bundle-job + runs-on: ubuntu-22.04 + steps: + - uses: actions/checkout@v3 + - uses: databricks/setup-cli@v0.211.0 + - name: Validate Bundle For Staging + id: validate + run: | + databricks bundle validate -t staging + - name: Deploy Bundle to Staging + id: deploy + run: | + databricks bundle deploy -t staging diff --git a/.github/workflows/mlops_stacks_gcp_fs-bundle-ci.yml b/.github/workflows/mlops_stacks_gcp_fs-bundle-ci.yml new file mode 100644 index 0000000..02ac728 --- /dev/null +++ b/.github/workflows/mlops_stacks_gcp_fs-bundle-ci.yml @@ -0,0 +1,93 @@ +# This GitHub workflow validates Bundle config (ML asset config and more) +# defined under mlops_stacks_gcp_fs/assets/* +# and mlops_stacks_gcp_fs/databricks.yml, when PRs are merged into the main branch +name: Bundle validation for mlops_stacks_gcp_fs + +on: + workflow_dispatch: + pull_request_target: + +defaults: + run: + working-directory: ./mlops_stacks_gcp_fs/ + +env: + STAGING_WORKSPACE_TOKEN: ${{ secrets.STAGING_WORKSPACE_TOKEN }} + PROD_WORKSPACE_TOKEN: ${{ secrets.PROD_WORKSPACE_TOKEN }} + +jobs: + staging: + concurrency: mlops_stacks_gcp_fs-staging-bundle-job + runs-on: ubuntu-22.04 + steps: + - uses: actions/checkout@v3 + with: + ref: ${{ github.event.pull_request.head.sha || github.sha }} + - uses: databricks/setup-cli@v0.211.0 + - name: Validate Bundle For Staging + id: validate + env: + DATABRICKS_TOKEN: ${{ env.STAGING_WORKSPACE_TOKEN }} + run: | + databricks bundle validate -t staging > ../validate_output.txt + - name: Create Comment with Bundle Configuration + uses: actions/github-script@v6 + id: comment + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + script: | + const fs = require('fs'); + const fileContents = fs.readFileSync('validate_output.txt', 'utf8'); + const output = `#### Bundle Staging Config Validated 🖌 +
Staging Validation Output + + \`\`\`\n + ${fileContents} + \`\`\` + +
` + + github.rest.issues.createComment({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + body: output + }) + + prod: + concurrency: mlops_stacks_gcp_fs-prod-bundle-job + runs-on: ubuntu-22.04 + steps: + - uses: actions/checkout@v3 + with: + ref: ${{ github.event.pull_request.head.sha || github.sha }} + - uses: databricks/setup-cli@v0.211.0 + - name: Validate Bundle For Prod + id: validate + env: + DATABRICKS_TOKEN: ${{ env.PROD_WORKSPACE_TOKEN }} + run: | + databricks bundle validate -t prod > ../validate_output.txt + - name: Create Comment with Bundle Configuration + uses: actions/github-script@v6 + id: comment + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + script: | + const fs = require('fs'); + const fileContents = fs.readFileSync('validate_output.txt', 'utf8'); + const output = `#### Bundle Prod Config Validated 🖌 +
Prod Validation Output + + \`\`\`\n + ${fileContents} + \`\`\` + +
` + + github.rest.issues.createComment({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + body: output + }) diff --git a/.github/workflows/mlops_stacks_gcp_fs-lint-cicd-workflow-files.yml b/.github/workflows/mlops_stacks_gcp_fs-lint-cicd-workflow-files.yml new file mode 100644 index 0000000..59a5c5b --- /dev/null +++ b/.github/workflows/mlops_stacks_gcp_fs-lint-cicd-workflow-files.yml @@ -0,0 +1,19 @@ +name: Lint CI/CD workflow files +on: + pull_request: + paths: + - '.github/workflows/**' + workflow_dispatch: + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Download actionlint + id: get_actionlint + run: bash <(curl https://raw.githubusercontent.com/rhysd/actionlint/main/scripts/download-actionlint.bash) + shell: bash + - name: Check workflow files + run: ${{ steps.get_actionlint.outputs.executable }} -color + shell: bash diff --git a/.github/workflows/mlops_stacks_gcp_fs-run-tests-fs.yml b/.github/workflows/mlops_stacks_gcp_fs-run-tests-fs.yml new file mode 100644 index 0000000..7156cfc --- /dev/null +++ b/.github/workflows/mlops_stacks_gcp_fs-run-tests-fs.yml @@ -0,0 +1,59 @@ +name: Feature and Training Integration Tests for mlops_stacks_gcp_fs +on: + workflow_dispatch: + pull_request: + +defaults: + run: + working-directory: ./mlops_stacks_gcp_fs/ + +env: + DATABRICKS_TOKEN: ${{ secrets.STAGING_WORKSPACE_TOKEN }} + +concurrency: mlops_stacks_gcp_fs-feature-training-integration-test-staging + +jobs: + unit_tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + with: + python-version: 3.8 + # Feature store tests bring up a local Spark session, so Java is required. + - uses: actions/setup-java@v2 + with: + distribution: 'temurin' + java-version: '11' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -r ../test-requirements.txt + - name: Run tests with pytest + run: | + pytest + + integration_test: + needs: unit_tests + runs-on: ubuntu-22.04 + steps: + - name: Checkout repo + uses: actions/checkout@v3 + - uses: databricks/setup-cli@v0.211.0 + - name: Validate Bundle For Test Deployment Target in Staging Workspace + id: validate + run: | + databricks bundle validate -t test + - name: Deploy Bundle to Test Deployment Target in Staging Workspace + id: deploy + run: | + databricks bundle deploy -t test + - name: Run Feature Engineering Workflow for Test Deployment Target in Staging Workspace + id: feature_engineering + run: | + databricks bundle run write_feature_table_job -t test + - name: Run Training Workflow for Test Deployment Target in Staging Workspace + id: training + run: | + databricks bundle run model_training_job -t test diff --git a/mlops_stacks_gcp_fs/README.md b/mlops_stacks_gcp_fs/README.md new file mode 100644 index 0000000..a7a72e9 --- /dev/null +++ b/mlops_stacks_gcp_fs/README.md @@ -0,0 +1,5 @@ +# mlops_stacks_gcp_fs + +This directory contains python code, notebooks and ML asset configs related to one ML project. + +See the [Project overview](../docs/project-overview.md) for details on code structure of project directory. \ No newline at end of file diff --git a/mlops_stacks_gcp_fs/__init__.py b/mlops_stacks_gcp_fs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mlops_stacks_gcp_fs/assets/README.md b/mlops_stacks_gcp_fs/assets/README.md index ce30069..7788f48 100644 --- a/mlops_stacks_gcp_fs/assets/README.md +++ b/mlops_stacks_gcp_fs/assets/README.md @@ -134,7 +134,7 @@ new_cluster: &new_cluster spark_version: 13.3.x-cpu-ml-scala2.12 node_type_id: n2-highmem-4 custom_tags: - clusterSource: mlops-stack/0.2 + clusterSource: mlops-stack resources: jobs: @@ -189,7 +189,7 @@ new_cluster: &new_cluster spark_version: 13.3.x-cpu-ml-scala2.12 node_type_id: n2-highmem-4 custom_tags: - clusterSource: mlops-stack/0.2 + clusterSource: mlops-stack resources: jobs: diff --git a/mlops_stacks_gcp_fs/assets/batch-inference-workflow-asset.yml b/mlops_stacks_gcp_fs/assets/batch-inference-workflow-asset.yml new file mode 100644 index 0000000..9fe6e29 --- /dev/null +++ b/mlops_stacks_gcp_fs/assets/batch-inference-workflow-asset.yml @@ -0,0 +1,41 @@ +new_cluster: &new_cluster + new_cluster: + num_workers: 3 + spark_version: 13.3.x-cpu-ml-scala2.12 + node_type_id: n2-highmem-4 + custom_tags: + clusterSource: mlops-stack + +common_permissions: &permissions + permissions: + - level: CAN_VIEW + group_name: users + +resources: + jobs: + batch_inference_job: + name: ${bundle.target}-mlops_stacks_gcp_fs-batch-inference-job + tasks: + - task_key: batch_inference_job + <<: *new_cluster + notebook_task: + notebook_path: ../deployment/batch_inference/notebooks/BatchInference.py + base_parameters: + env: ${bundle.target} + input_table_name: hive_metastore.default.taxi_scoring_sample_feature_store_inference_input + output_table_name: ${bundle.target}_mlops_stacks_gcp_fs_predictions + model_name: ${var.model_name} + # git source information of current ML asset deployment. It will be persisted as part of the workflow run + git_source_info: url:${bundle.git.origin_url}; branch:${bundle.git.branch}; commit:${bundle.git.commit} + + schedule: + quartz_cron_expression: "0 0 11 * * ?" # daily at 11am + timezone_id: UTC + <<: *permissions + # If you want to turn on notifications for this job, please uncomment the below code, + # and provide a list of emails to the on_failure argument. + # + # email_notifications: + # on_failure: + # - first@company.com + # - second@company.com diff --git a/mlops_stacks_gcp_fs/assets/feature-engineering-workflow-asset.yml b/mlops_stacks_gcp_fs/assets/feature-engineering-workflow-asset.yml new file mode 100644 index 0000000..a8f8669 --- /dev/null +++ b/mlops_stacks_gcp_fs/assets/feature-engineering-workflow-asset.yml @@ -0,0 +1,64 @@ +new_cluster: &new_cluster + new_cluster: + num_workers: 3 + spark_version: 13.3.x-cpu-ml-scala2.12 + node_type_id: n2-highmem-4 + custom_tags: + clusterSource: mlops-stack + +common_permissions: &permissions + permissions: + - level: CAN_VIEW + group_name: users + +resources: + jobs: + write_feature_table_job: + name: ${bundle.target}-mlops_stacks_gcp_fs-write-feature-table-job + job_clusters: + - job_cluster_key: write_feature_table_job_cluster + <<: *new_cluster + tasks: + - task_key: PickupFeatures + job_cluster_key: write_feature_table_job_cluster + notebook_task: + notebook_path: ../feature_engineering/notebooks/GenerateAndWriteFeatures.py + base_parameters: + # TODO modify these arguments to reflect your setup. + input_table_path: /databricks-datasets/nyctaxi-with-zipcodes/subsampled + # TODO: Empty start/end dates will process the whole range. Update this as needed to process recent data. + input_start_date: "" + input_end_date: "" + timestamp_column: tpep_pickup_datetime + output_table_name: feature_store_taxi_example.${bundle.target}_mlops_stacks_gcp_fs_trip_pickup_features + features_transform_module: pickup_features + primary_keys: zip + # git source information of current ML asset deployment. It will be persisted as part of the workflow run + git_source_info: url:${bundle.git.origin_url}; branch:${bundle.git.branch}; commit:${bundle.git.commit} + - task_key: DropoffFeatures + job_cluster_key: write_feature_table_job_cluster + notebook_task: + notebook_path: ../feature_engineering/notebooks/GenerateAndWriteFeatures.py + base_parameters: + # TODO: modify these arguments to reflect your setup. + input_table_path: /databricks-datasets/nyctaxi-with-zipcodes/subsampled + # TODO: Empty start/end dates will process the whole range. Update this as needed to process recent data. + input_start_date: "" + input_end_date: "" + timestamp_column: tpep_dropoff_datetime + output_table_name: feature_store_taxi_example.${bundle.target}_mlops_stacks_gcp_fs_trip_dropoff_features + features_transform_module: dropoff_features + primary_keys: zip + # git source information of current ML asset deployment. It will be persisted as part of the workflow run + git_source_info: url:${bundle.git.origin_url}; branch:${bundle.git.branch}; commit:${bundle.git.commit} + schedule: + quartz_cron_expression: "0 0 7 * * ?" # daily at 7am + timezone_id: UTC + <<: *permissions + # If you want to turn on notifications for this job, please uncomment the below code, + # and provide a list of emails to the on_failure argument. + # + # email_notifications: + # on_failure: + # - first@company.com + # - second@company.com diff --git a/mlops_stacks_gcp_fs/assets/ml-artifacts-asset.yml b/mlops_stacks_gcp_fs/assets/ml-artifacts-asset.yml new file mode 100644 index 0000000..3b3b305 --- /dev/null +++ b/mlops_stacks_gcp_fs/assets/ml-artifacts-asset.yml @@ -0,0 +1,54 @@ +# Deployment target specific values +targets: + dev: + resources: + models: + model: + description: MLflow registered model for the "mlops_stacks_gcp_fs" ML Project for ${bundle.target} deployment target. + + test: + resources: + models: + model: + description: MLflow registered model for the "mlops_stacks_gcp_fs" ML Project for ${bundle.target} deployment target. + + staging: + resources: + models: + model: + description: MLflow registered model for the "mlops_stacks_gcp_fs" ML Project for ${bundle.target} deployment target. + + prod: + resources: + models: + model: + description: | + MLflow registered model for the "mlops_stacks_gcp_fs" ML Project. See the corresponding [Git repo]($#{var.git_repo_url}) for details on the project. + + Links: + * [Recurring model training job](https://416411475796958.8.gcp.databricks.com#job/${resources.jobs.model_training_job.id}): trains fresh model versions using the latest ML code. + * [Recurring batch inference job](https://416411475796958.8.gcp.databricks.com#job/${resources.jobs.batch_inference_job.id}): applies the latest ${bundle.target} model version for batch inference. + +# Allow users to read the experiment and the model +common_permissions: &permissions + permissions: + - level: CAN_READ + group_name: users + + + +# Defines model and experiments +resources: + models: + model: + name: ${var.model_name} + <<: *permissions + depends_on: + - resources.jobs.model_training_job.id + - resources.jobs.batch_inference_job.id + + experiments: + experiment: + name: ${var.experiment_name} + <<: *permissions + description: MLflow Experiment used to track runs for mlops_stacks_gcp_fs project. diff --git a/mlops_stacks_gcp_fs/assets/model-workflow-asset.yml b/mlops_stacks_gcp_fs/assets/model-workflow-asset.yml new file mode 100644 index 0000000..f7fcc37 --- /dev/null +++ b/mlops_stacks_gcp_fs/assets/model-workflow-asset.yml @@ -0,0 +1,99 @@ +new_cluster: &new_cluster + new_cluster: + num_workers: 3 + spark_version: 13.3.x-cpu-ml-scala2.12 + node_type_id: n2-highmem-4 + custom_tags: + clusterSource: mlops-stack + +common_permissions: &permissions + permissions: + - level: CAN_VIEW + group_name: users + +resources: + jobs: + model_training_job: + name: ${bundle.target}-mlops_stacks_gcp_fs-model-training-job + job_clusters: + - job_cluster_key: model_training_job_cluster + <<: *new_cluster + tasks: + - task_key: Train + job_cluster_key: model_training_job_cluster + notebook_task: + notebook_path: ../training/notebooks/TrainWithFeatureStore.py + base_parameters: + env: ${bundle.target} + # TODO: Update training_data_path + training_data_path: /databricks-datasets/nyctaxi-with-zipcodes/subsampled + experiment_name: ${var.experiment_name} + model_name: ${var.model_name} + pickup_features_table: feature_store_taxi_example.${bundle.target}_mlops_stacks_gcp_fs_trip_pickup_features + dropoff_features_table: feature_store_taxi_example.${bundle.target}_mlops_stacks_gcp_fs_trip_dropoff_features + # git source information of current ML asset deployment. It will be persisted as part of the workflow run + git_source_info: url:${bundle.git.origin_url}; branch:${bundle.git.branch}; commit:${bundle.git.commit} + - task_key: ModelValidation + job_cluster_key: model_training_job_cluster + depends_on: + - task_key: Train + notebook_task: + notebook_path: ../validation/notebooks/ModelValidation.py + base_parameters: + experiment_name: ${var.experiment_name} + # The `run_mode` defines whether model validation is enabled or not. + # It can be one of the three values: + # `disabled` : Do not run the model validation notebook. + # `dry_run` : Run the model validation notebook. Ignore failed model validation rules and proceed to move + # model to Production stage. + # `enabled` : Run the model validation notebook. Move model to Production stage only if all model validation + # rules are passing. + # TODO: update run_mode + run_mode: dry_run + # Whether to load the current registered "Production" stage model as baseline. + # Baseline model is a requirement for relative change and absolute change validation thresholds. + # TODO: update enable_baseline_comparison + enable_baseline_comparison: "false" + # Please refer to data parameter in mlflow.evaluate documentation https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.evaluate + # TODO: update validation_input + validation_input: SELECT * FROM delta.`dbfs:/databricks-datasets/nyctaxi-with-zipcodes/subsampled` + # A string describing the model type. The model type can be either "regressor" and "classifier". + # Please refer to model_type parameter in mlflow.evaluate documentation https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.evaluate + # TODO: update model_type + model_type: regressor + # The string name of a column from data that contains evaluation labels. + # Please refer to targets parameter in mlflow.evaluate documentation https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.evaluate + # TODO: targets + targets: fare_amount + # Specifies the name of the function in mlops_stacks_gcp_fs/training_validation_deployment/validation/validation.py that returns custom metrics. + # TODO(optional): custom_metrics_loader_function + custom_metrics_loader_function: custom_metrics + # Specifies the name of the function in mlops_stacks_gcp_fs/training_validation_deployment/validation/validation.py that returns model validation thresholds. + # TODO(optional): validation_thresholds_loader_function + validation_thresholds_loader_function: validation_thresholds + # Specifies the name of the function in mlops_stacks_gcp_fs/training_validation_deployment/validation/validation.py that returns evaluator_config. + # TODO(optional): evaluator_config_loader_function + evaluator_config_loader_function: evaluator_config + # git source information of current ML asset deployment. It will be persisted as part of the workflow run + git_source_info: url:${bundle.git.origin_url}; branch:${bundle.git.branch}; commit:${bundle.git.commit} + - task_key: ModelDeployment + job_cluster_key: model_training_job_cluster + depends_on: + - task_key: ModelValidation + notebook_task: + notebook_path: ../deployment/model_deployment/notebooks/ModelDeployment.py + base_parameters: + env: ${bundle.target} + # git source information of current ML asset deployment. It will be persisted as part of the workflow run + git_source_info: url:${bundle.git.origin_url}; branch:${bundle.git.branch}; commit:${bundle.git.commit} + schedule: + quartz_cron_expression: "0 0 9 * * ?" # daily at 9am + timezone_id: UTC + <<: *permissions + # If you want to turn on notifications for this job, please uncomment the below code, + # and provide a list of emails to the on_failure argument. + # + # email_notifications: + # on_failure: + # - first@company.com + # - second@company.com diff --git a/mlops_stacks_gcp_fs/assets/monitoring-workflow-asset.yml b/mlops_stacks_gcp_fs/assets/monitoring-workflow-asset.yml new file mode 100644 index 0000000..a4d505d --- /dev/null +++ b/mlops_stacks_gcp_fs/assets/monitoring-workflow-asset.yml @@ -0,0 +1 @@ +# TODO: Add data monitoring support for mlops diff --git a/mlops_stacks_gcp_fs/databricks.yml b/mlops_stacks_gcp_fs/databricks.yml new file mode 100644 index 0000000..fee3f77 --- /dev/null +++ b/mlops_stacks_gcp_fs/databricks.yml @@ -0,0 +1,38 @@ +# The name of the bundle. run `databricks bundle schema` to see the full bundle settings schema. +bundle: + name: mlops_stacks_gcp_fs + +variables: + experiment_name: + description: Experiment name for the model training. + default: /Users/${workspace.current_user.userName}/${bundle.target}-mlops_stacks_gcp_fs-experiment + model_name: + description: Model name for the model training. + default: ${bundle.target}-mlops_stacks_gcp_fs-model + +include: + # Assets folder contains ML artifact assets for the ml project that defines model and experiment + # And workflows assets for the ml project including model training -> validation -> deployment, + # feature engineering, batch inference, data monitoring, metric refresh, alerts and triggering retraining + - ./assets/*.yml + +# Deployment Target specific values for workspace +targets: + dev: + default: true + workspace: + # TODO: add dev workspace URL + host: + + staging: + workspace: + host: https://416411475796958.8.gcp.databricks.com + + prod: + workspace: + host: https://416411475796958.8.gcp.databricks.com + + test: + workspace: + host: https://416411475796958.8.gcp.databricks.com + diff --git a/mlops_stacks_gcp_fs/deployment/batch_inference/README.md b/mlops_stacks_gcp_fs/deployment/batch_inference/README.md new file mode 100644 index 0000000..b801e10 --- /dev/null +++ b/mlops_stacks_gcp_fs/deployment/batch_inference/README.md @@ -0,0 +1,42 @@ +# Batch Inference +To set up batch inference job via scheduled Databricks workflow, please refer to [mlops_stacks_gcp_fs/assets/README.md](../../assets/README.md) + +## Prepare the batch inference input table for the example Project +Please run the following code in a notebook to generate the example batch inference input table. + +``` +from pyspark.sql.functions import to_timestamp, lit +from pyspark.sql.types import IntegerType +import math +from datetime import timedelta, timezone + +def rounded_unix_timestamp(dt, num_minutes=15): + """ + Ceilings datetime dt to interval num_minutes, then returns the unix timestamp. + """ + nsecs = dt.minute * 60 + dt.second + dt.microsecond * 1e-6 + delta = math.ceil(nsecs / (60 * num_minutes)) * (60 * num_minutes) - nsecs + return int((dt + timedelta(seconds=delta)).replace(tzinfo=timezone.utc).timestamp()) + + +rounded_unix_timestamp_udf = udf(rounded_unix_timestamp, IntegerType()) + +df = spark.table("delta.`dbfs:/databricks-datasets/nyctaxi-with-zipcodes/subsampled`") +df.withColumn( + "rounded_pickup_datetime", + to_timestamp(rounded_unix_timestamp_udf(df["tpep_pickup_datetime"], lit(15))), +).withColumn( + "rounded_dropoff_datetime", + to_timestamp(rounded_unix_timestamp_udf(df["tpep_dropoff_datetime"], lit(30))), +).drop( + "tpep_pickup_datetime" +).drop( + "tpep_dropoff_datetime" +).drop( + "fare_amount" +).write.mode( + "overwrite" +).saveAsTable( + name="hive_metastore.default.taxi_scoring_sample_feature_store_inference_input" +) +``` diff --git a/mlops_stacks_gcp_fs/deployment/batch_inference/notebooks/BatchInference.py b/mlops_stacks_gcp_fs/deployment/batch_inference/notebooks/BatchInference.py new file mode 100644 index 0000000..73d26bd --- /dev/null +++ b/mlops_stacks_gcp_fs/deployment/batch_inference/notebooks/BatchInference.py @@ -0,0 +1,99 @@ +# Databricks notebook source +################################################################################## +# Batch Inference Notebook +# +# This notebook is an example of applying a model for batch inference against an input delta table, +# It is configured and can be executed as the batch_inference_job in the batch_inference_job workflow defined under +# ``mlops_stacks_gcp_fs/assets/batch-inference-workflow-asset.yml`` +# +# Parameters: +# +# * env (optional) - String name of the current environment (dev, staging, or prod). Defaults to "dev" +# * input_table_name (required) - Delta table name containing your input data. +# * output_table_name (required) - Delta table name where the predictions will be written to. +# Note that this will create a new version of the Delta table if +# the table already exists +# * model_name (required) - The name of the model to be used in batch inference. +################################################################################## + + +# List of input args needed to run the notebook as a job. +# Provide them via DB widgets or notebook arguments. +# +# Name of the current environment +dbutils.widgets.dropdown("env", "dev", ["dev", "staging", "prod"], "Environment Name") +# A Hive-registered Delta table containing the input features. +dbutils.widgets.text("input_table_name", "", label="Input Table Name") +# Delta table to store the output predictions. +dbutils.widgets.text("output_table_name", "", label="Output Table Name") +# Batch inference model name +dbutils.widgets.text( + "model_name", "dev-mlops_stacks_gcp_fs-model", label="Model Name" +) + + +# COMMAND ---------- + +import os + +notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()) +%cd $notebook_path + +# COMMAND ---------- + +# MAGIC %pip install -r ../../../requirements.txt + +# COMMAND ---------- + +dbutils.library.restartPython() + +# COMMAND ---------- + +import sys +import os +notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()) +%cd $notebook_path +%cd .. +sys.path.append("../..") + +# COMMAND ---------- + +# DBTITLE 1,Define input and output variables +from utils import get_deployed_model_stage_for_env + +env = dbutils.widgets.get("env") +input_table_name = dbutils.widgets.get("input_table_name") +output_table_name = dbutils.widgets.get("output_table_name") +model_name = dbutils.widgets.get("model_name") +assert input_table_name != "", "input_table_name notebook parameter must be specified" +assert output_table_name != "", "output_table_name notebook parameter must be specified" +assert model_name != "", "model_name notebook parameter must be specified" +stage = get_deployed_model_stage_for_env(env) +model_uri = f"models:/{model_name}/{stage}" + +# COMMAND ---------- + +from mlflow import MlflowClient + +# Get model version from stage +model_version_infos = MlflowClient().search_model_versions("name = '%s'" % model_name) +model_version = max( + int(version.version) + for version in model_version_infos + if version.current_stage == stage +) + +# COMMAND ---------- + +# Get datetime +from datetime import datetime + +ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + +# COMMAND ---------- +# DBTITLE 1,Load model and run inference + +from predict import predict_batch + +predict_batch(spark, model_uri, input_table_name, output_table_name, model_version, ts) +dbutils.notebook.exit(output_table_name) diff --git a/mlops_stacks_gcp_fs/deployment/batch_inference/predict.py b/mlops_stacks_gcp_fs/deployment/batch_inference/predict.py new file mode 100644 index 0000000..d9d2975 --- /dev/null +++ b/mlops_stacks_gcp_fs/deployment/batch_inference/predict.py @@ -0,0 +1,32 @@ +import mlflow +from pyspark.sql.functions import struct, lit, to_timestamp + + +def predict_batch( + spark_session, model_uri, input_table_name, output_table_name, model_version, ts +): + """ + Apply the model at the specified URI for batch inference on the table with name input_table_name, + writing results to the table with name output_table_name + """ + + table = spark_session.table(input_table_name) + + from databricks.feature_store import FeatureStoreClient + + fs_client = FeatureStoreClient() + + prediction_df = fs_client.score_batch( + model_uri, + table + ) + output_df = ( + prediction_df.withColumn("prediction", prediction_df["prediction"]) + .withColumn("model_version", lit(model_version)) + .withColumn("inference_timestamp", to_timestamp(lit(ts))) + ) + + output_df.display() + # Model predictions are written to the Delta table provided as input. + # Delta is the default format in Databricks Runtime 8.0 and above. + output_df.write.format("delta").mode("overwrite").saveAsTable(output_table_name) \ No newline at end of file diff --git a/mlops_stacks_gcp_fs/deployment/model_deployment/deploy.py b/mlops_stacks_gcp_fs/deployment/model_deployment/deploy.py new file mode 100644 index 0000000..a5f3616 --- /dev/null +++ b/mlops_stacks_gcp_fs/deployment/model_deployment/deploy.py @@ -0,0 +1,34 @@ +import sys +import pathlib + +sys.path.append(str(pathlib.Path(__file__).parent.parent.parent.resolve())) +from utils import get_deployed_model_stage_for_env +from mlflow.tracking import MlflowClient + + +def deploy(model_uri, env): + """ + Deploys an already-registered model produced by moving it into the appropriate stage for model deployment. + + :param model_uri: URI of the model to deploy. Must be in the format "models://", as described in + https://www.mlflow.org/docs/latest/model-registry.html#fetching-an-mlflow-model-from-the-model-registry + :param env: name of the environment in which we're performing deployment, i.e one of "dev", "staging", "prod". + Defaults to "dev" + :return: + """ + _, model_name, version = model_uri.split("/") + client = MlflowClient() + mv = client.get_model_version(model_name, version) + target_stage = get_deployed_model_stage_for_env(env) + if mv.current_stage != target_stage: + client.transition_model_version_stage( + name=model_name, + version=version, + stage=target_stage, + archive_existing_versions=True, + ) + print(f"Successfully deployed model with URI {model_uri} to {env}") + + +if __name__ == "__main__": + deploy(model_uri=sys.argv[1], env=sys.argv[2]) diff --git a/mlops_stacks_gcp_fs/deployment/model_deployment/notebooks/ModelDeployment.py b/mlops_stacks_gcp_fs/deployment/model_deployment/notebooks/ModelDeployment.py new file mode 100644 index 0000000..cc769ad --- /dev/null +++ b/mlops_stacks_gcp_fs/deployment/model_deployment/notebooks/ModelDeployment.py @@ -0,0 +1,52 @@ +# Databricks notebook source +################################################################################## +# Helper notebook to transition the model stage. This notebook is run +# after the Train.py notebook as part of a multi-task job, in order to transition model +# to target stage after training completes. +# +# Note that we deploy the model to the stage in MLflow Model Registry equivalent to the +# environment in which the multi-task job is executed (e.g deploy the trained model to +# stage=Production if triggered in the prod environment). In a practical setting, we would +# recommend enabling the model validation step between model training and automatically +# registering the model to the Production stage in prod. +# +# This notebook has the following parameters: +# +# * env (required) - String name of the current environment for model deployment, which decides the target stage. +# * model_uri (required) - URI of the model to deploy. Must be in the format "models://", as described in +# https://www.mlflow.org/docs/latest/model-registry.html#fetching-an-mlflow-model-from-the-model-registry +# This parameter is read as a task value +# (https://docs.databricks.com/dev-tools/databricks-utils.html#get-command-dbutilsjobstaskvaluesget), +# rather than as a notebook widget. That is, we assume a preceding task (the Train.py +# notebook) has set a task value with key "model_uri". +################################################################################## + +# List of input args needed to run the notebook as a job. +# Provide them via DB widgets or notebook arguments. +# +# Name of the current environment +dbutils.widgets.dropdown("env", "None", ["None", "staging", "prod"], "Environment Name") + +# COMMAND ---------- + +import os +import sys +notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()) +%cd $notebook_path +%cd .. +sys.path.append("../..") + +# COMMAND ---------- + +from deploy import deploy + +model_uri = dbutils.jobs.taskValues.get("Train", "model_uri", debugValue="") +env = dbutils.widgets.get("env") +assert env != "None", "env notebook parameter must be specified" +assert model_uri != "", "model_uri notebook parameter must be specified" +deploy(model_uri, env) + +# COMMAND ---------- +print( + f"Successfully completed model deployment for {model_uri}" +) diff --git a/mlops_stacks_gcp_fs/feature_engineering/README.md b/mlops_stacks_gcp_fs/feature_engineering/README.md new file mode 100644 index 0000000..b44d39c --- /dev/null +++ b/mlops_stacks_gcp_fs/feature_engineering/README.md @@ -0,0 +1,4 @@ +# Feature Engineering +To set up the feature engineering job via scheduled Databricks workflow, please refer to [mlops_stacks_gcp_fs/assets/README.md](../assets/README.md) + +For additional details on using the feature store, please refer to [mlops_stacks_gcp_fs/docs/ml-developer-guide-fs.md](../../docs/ml-developer-guide-fs.md). \ No newline at end of file diff --git a/mlops_stacks_gcp_fs/feature_engineering/__init__.py b/mlops_stacks_gcp_fs/feature_engineering/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mlops_stacks_gcp_fs/feature_engineering/features/__init__.py b/mlops_stacks_gcp_fs/feature_engineering/features/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mlops_stacks_gcp_fs/feature_engineering/features/dropoff_features.py b/mlops_stacks_gcp_fs/feature_engineering/features/dropoff_features.py new file mode 100644 index 0000000..c1d93c6 --- /dev/null +++ b/mlops_stacks_gcp_fs/feature_engineering/features/dropoff_features.py @@ -0,0 +1,64 @@ +""" +This sample module contains features logic that can be used to generate and populate tables in Feature Store. +You should plug in your own features computation logic in the compute_features_fn method below. +""" +import pyspark.sql.functions as F +from pyspark.sql.types import IntegerType, StringType, TimestampType +from pytz import timezone + + +@F.udf(returnType=IntegerType()) +def _is_weekend(dt): + tz = "America/New_York" + return int(dt.astimezone(timezone(tz)).weekday() >= 5) # 5 = Saturday, 6 = Sunday + + +@F.udf(returnType=StringType()) +def _partition_id(dt): + # datetime -> "YYYY-MM" + return f"{dt.year:04d}-{dt.month:02d}" + + +def _filter_df_by_ts(df, ts_column, start_date, end_date): + if ts_column and start_date: + df = df.filter(F.col(ts_column) >= start_date) + if ts_column and end_date: + df = df.filter(F.col(ts_column) < end_date) + return df + + +def compute_features_fn(input_df, timestamp_column, start_date, end_date): + """Contains logic to compute features. + + Given an input dataframe and time ranges, this function should compute features, populate an output dataframe and + return it. This method will be called from a Feature Store pipeline job and the output dataframe will be written + to a Feature Store table. You should update this method with your own feature computation logic. + + The timestamp_column, start_date, end_date args are optional but strongly recommended for time-series based + features. + + TODO: Update and adapt the sample code for your use case + + :param input_df: Input dataframe. + :param timestamp_column: Column containing the timestamp. This column is used to limit the range of feature + computation. It is also used as the timestamp key column when populating the feature table, so it needs to be + returned in the output. + :param start_date: Start date of the feature computation interval. + :param end_date: End date of the feature computation interval. + :return: Output dataframe containing computed features given the input arguments. + """ + df = _filter_df_by_ts(input_df, timestamp_column, start_date, end_date) + dropoffzip_features = ( + df.groupBy("dropoff_zip", F.window(timestamp_column, "30 minute")) + .agg(F.count("*").alias("count_trips_window_30m_dropoff_zip")) + .select( + F.col("dropoff_zip").alias("zip"), + F.unix_timestamp(F.col("window.end")) + .alias(timestamp_column) + .cast(TimestampType()), + _partition_id(F.to_timestamp(F.col("window.end"))).alias("yyyy_mm"), + F.col("count_trips_window_30m_dropoff_zip").cast(IntegerType()), + _is_weekend(F.col("window.end")).alias("dropoff_is_weekend"), + ) + ) + return dropoffzip_features diff --git a/mlops_stacks_gcp_fs/feature_engineering/features/pickup_features.py b/mlops_stacks_gcp_fs/feature_engineering/features/pickup_features.py new file mode 100644 index 0000000..895a9b8 --- /dev/null +++ b/mlops_stacks_gcp_fs/feature_engineering/features/pickup_features.py @@ -0,0 +1,63 @@ +""" +This sample module contains features logic that can be used to generate and populate tables in Feature Store. +You should plug in your own features computation logic in the compute_features_fn method below. +""" +import pyspark.sql.functions as F +from pyspark.sql.types import FloatType, IntegerType, StringType, TimestampType +from pytz import timezone + + +@F.udf(returnType=StringType()) +def _partition_id(dt): + # datetime -> "YYYY-MM" + return f"{dt.year:04d}-{dt.month:02d}" + + +def _filter_df_by_ts(df, ts_column, start_date, end_date): + if ts_column and start_date: + df = df.filter(F.col(ts_column) >= start_date) + if ts_column and end_date: + df = df.filter(F.col(ts_column) < end_date) + return df + + +def compute_features_fn(input_df, timestamp_column, start_date, end_date): + """Contains logic to compute features. + + Given an input dataframe and time ranges, this function should compute features, populate an output dataframe and + return it. This method will be called from a Feature Store pipeline job and the output dataframe will be written + to a Feature Store table. You should update this method with your own feature computation logic. + + The timestamp_column, start_date, end_date args are optional but strongly recommended for time-series based + features. + + TODO: Update and adapt the sample code for your use case + + :param input_df: Input dataframe. + :param timestamp_column: Column containing a timestamp. This column is used to limit the range of feature + computation. It is also used as the timestamp key column when populating the feature table, so it needs to be + returned in the output. + :param start_date: Start date of the feature computation interval. + :param end_date: End date of the feature computation interval. + :return: Output dataframe containing computed features given the input arguments. + """ + df = _filter_df_by_ts(input_df, timestamp_column, start_date, end_date) + pickupzip_features = ( + df.groupBy( + "pickup_zip", F.window(timestamp_column, "1 hour", "15 minutes") + ) # 1 hour window, sliding every 15 minutes + .agg( + F.mean("fare_amount").alias("mean_fare_window_1h_pickup_zip"), + F.count("*").alias("count_trips_window_1h_pickup_zip"), + ) + .select( + F.col("pickup_zip").alias("zip"), + F.unix_timestamp(F.col("window.end")) + .alias(timestamp_column) + .cast(TimestampType()), + _partition_id(F.to_timestamp(F.col("window.end"))).alias("yyyy_mm"), + F.col("mean_fare_window_1h_pickup_zip").cast(FloatType()), + F.col("count_trips_window_1h_pickup_zip").cast(IntegerType()), + ) + ) + return pickupzip_features diff --git a/mlops_stacks_gcp_fs/feature_engineering/notebooks/GenerateAndWriteFeatures.py b/mlops_stacks_gcp_fs/feature_engineering/notebooks/GenerateAndWriteFeatures.py new file mode 100644 index 0000000..1c6fce7 --- /dev/null +++ b/mlops_stacks_gcp_fs/feature_engineering/notebooks/GenerateAndWriteFeatures.py @@ -0,0 +1,139 @@ +# Databricks notebook source +################################################################################## +# Generate and Write Features Notebook +# +# This notebook can be used to generate and write features to a Databricks Feature Store table. +# It is configured and can be executed as the tasks in the write_feature_table_job workflow defined under +# ``mlops_stacks_gcp_fs/assets/feature-engineering-workflow-asset.yml`` +# +# Parameters: +# +# * input_table_path (required) - Path to input data. +# * output_table_name (required) - Fully qualified schema + Delta table name for the feature table where the features +# * will be written to. Note that this will create the Feature table if it does not +# * exist. +# * primary_keys (required) - A comma separated string of primary key columns of the output feature table. +# * +# * timestamp_column (optional) - Timestamp column of the input data. Used to limit processing based on +# * date ranges. This column is used as the timestamp_key column in the feature table. +# * input_start_date (optional) - Used to limit feature computations based on timestamp_column values. +# * input_end_date (optional) - Used to limit feature computations based on timestamp_column values. +# * +# * features_transform_module (required) - Python module containing the feature transform logic. +################################################################################## + + +# List of input args needed to run this notebook as a job. +# Provide them via DB widgets or notebook arguments. +# +# A Hive-registered Delta table containing the input data. +dbutils.widgets.text( + "input_table_path", + "/databricks-datasets/nyctaxi-with-zipcodes/subsampled", + label="Input Table Name", +) +# Input start date. +dbutils.widgets.text("input_start_date", "", label="Input Start Date") +# Input end date. +dbutils.widgets.text("input_end_date", "", label="Input End Date") +# Timestamp column. Will be used to filter input start/end dates. +# This column is also used as a timestamp key of the feature table. +dbutils.widgets.text( + "timestamp_column", "tpep_pickup_datetime", label="Timestamp column" +) + +# Feature table to store the computed features. +dbutils.widgets.text( + "output_table_name", + "feature_store_taxi_example.trip_pickup_features", + label="Output Feature Table Name", +) + +# Feature transform module name. +dbutils.widgets.text( + "features_transform_module", "pickup_features", label="Features transform file." +) +# Primary Keys columns for the feature table; +dbutils.widgets.text( + "primary_keys", + "zip", + label="Primary keys columns for the feature table, comma separated.", +) + +# COMMAND ---------- + +import os +notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()) +%cd $notebook_path +%cd ../features + + +# COMMAND ---------- +# DBTITLE 1,Define input and output variables + +input_table_path = dbutils.widgets.get("input_table_path") +output_table_name = dbutils.widgets.get("output_table_name") +input_start_date = dbutils.widgets.get("input_start_date") +input_end_date = dbutils.widgets.get("input_end_date") +ts_column = dbutils.widgets.get("timestamp_column") +features_module = dbutils.widgets.get("features_transform_module") +pk_columns = dbutils.widgets.get("primary_keys") + +assert input_table_path != "", "input_table_path notebook parameter must be specified" +assert output_table_name != "", "output_table_name notebook parameter must be specified" + +# Extract database name. Needs to be updated for Unity Catalog. +output_database = output_table_name.split(".")[0] + +# COMMAND ---------- +# DBTITLE 1,Create database. + +spark.sql("CREATE DATABASE IF NOT EXISTS " + output_database) + +# COMMAND ---------- +# DBTITLE 1, Read input data. + +raw_data = spark.read.format("delta").load(input_table_path) + + +# COMMAND ---------- +# DBTITLE 1,Compute features. + +# Compute the features. This is done by dynamically loading the features module. +from importlib import import_module + +mod = import_module(features_module) +compute_features_fn = getattr(mod, "compute_features_fn") + +features_df = compute_features_fn( + input_df=raw_data, + timestamp_column=ts_column, + start_date=input_start_date, + end_date=input_end_date, +) + +# COMMAND ---------- +# DBTITLE 1, Write computed features. + +from databricks import feature_store + +fs = feature_store.FeatureStoreClient() + + +# Create the feature table if it does not exist first. +# Note that this is a no-op if a table with the same name and schema already exists. +fs.create_table( + name=output_table_name, + primary_keys=[x.strip() for x in pk_columns.split(",")], + timestamp_keys=[ts_column], + df=features_df, +) + +# Write the computed features dataframe. +fs.write_table( + name=output_table_name, + df=features_df, + mode="merge", +) + +dbutils.notebook.exit(0) diff --git a/mlops_stacks_gcp_fs/monitoring/README.md b/mlops_stacks_gcp_fs/monitoring/README.md new file mode 100644 index 0000000..909eb5e --- /dev/null +++ b/mlops_stacks_gcp_fs/monitoring/README.md @@ -0,0 +1,5 @@ +# Monitoring + +Databricks Data Monitoring is currently in Private Preview. + +Please contact a Databricks representative for more information. diff --git a/mlops_stacks_gcp_fs/pytest.ini b/mlops_stacks_gcp_fs/pytest.ini new file mode 100644 index 0000000..04680e7 --- /dev/null +++ b/mlops_stacks_gcp_fs/pytest.ini @@ -0,0 +1,4 @@ +# Configure pytest to detect local modules in the current directory +# See https://docs.pytest.org/en/7.1.x/reference/reference.html#confval-pythonpath for details +[pytest] +pythonpath = . diff --git a/mlops_stacks_gcp_fs/requirements.txt b/mlops_stacks_gcp_fs/requirements.txt new file mode 100644 index 0000000..286a0f6 --- /dev/null +++ b/mlops_stacks_gcp_fs/requirements.txt @@ -0,0 +1,8 @@ +mlflow==2.7.1 +numpy>=1.23.0 +pandas>=1.4.3 +scikit-learn>=1.1.1 +matplotlib>=3.5.2 +Jinja2==3.0.3 +pyspark~=3.3.0 +pytz~=2022.2.1 diff --git a/mlops_stacks_gcp_fs/tests/__init__.py b/mlops_stacks_gcp_fs/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mlops_stacks_gcp_fs/tests/feature_engineering/__init__.py b/mlops_stacks_gcp_fs/tests/feature_engineering/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mlops_stacks_gcp_fs/tests/feature_engineering/dropoff_features_test.py b/mlops_stacks_gcp_fs/tests/feature_engineering/dropoff_features_test.py new file mode 100644 index 0000000..832748c --- /dev/null +++ b/mlops_stacks_gcp_fs/tests/feature_engineering/dropoff_features_test.py @@ -0,0 +1,44 @@ +import pyspark.sql +import pytest +import pandas as pd +from datetime import datetime +from pyspark.sql import SparkSession + +from mlops_stacks_gcp_fs.feature_engineering.features.dropoff_features import ( + compute_features_fn, +) + + +@pytest.fixture(scope="session") +def spark(request): + """fixture for creating a spark session + Args: + request: pytest.FixtureRequest object + """ + spark = ( + SparkSession.builder.master("local[1]") + .appName("pytest-pyspark-local-testing") + .getOrCreate() + ) + request.addfinalizer(lambda: spark.stop()) + + return spark + + +@pytest.mark.usefixtures("spark") +def test_dropoff_features_fn(spark): + input_df = pd.DataFrame( + { + "tpep_pickup_datetime": [datetime(2022, 1, 10)], + "tpep_dropoff_datetime": [datetime(2022, 1, 10)], + "dropoff_zip": [94400], + "trip_distance": [2], + "fare_amount": [100], + } + ) + spark_df = spark.createDataFrame(input_df) + output_df = compute_features_fn( + spark_df, "tpep_pickup_datetime", datetime(2022, 1, 1), datetime(2022, 1, 15) + ) + assert isinstance(output_df, pyspark.sql.DataFrame) + assert output_df.count() == 1 diff --git a/mlops_stacks_gcp_fs/tests/feature_engineering/pickup_features_test.py b/mlops_stacks_gcp_fs/tests/feature_engineering/pickup_features_test.py new file mode 100644 index 0000000..01e4d16 --- /dev/null +++ b/mlops_stacks_gcp_fs/tests/feature_engineering/pickup_features_test.py @@ -0,0 +1,42 @@ +import pyspark.sql +import pytest +import pandas as pd +from datetime import datetime +from pyspark.sql import SparkSession + +from mlops_stacks_gcp_fs.feature_engineering.features.pickup_features import compute_features_fn + + +@pytest.fixture(scope="session") +def spark(request): + """fixture for creating a spark session + Args: + request: pytest.FixtureRequest object + """ + spark = ( + SparkSession.builder.master("local[1]") + .appName("pytest-pyspark-local-testing") + .getOrCreate() + ) + request.addfinalizer(lambda: spark.stop()) + + return spark + + +@pytest.mark.usefixtures("spark") +def test_pickup_features_fn(spark): + input_df = pd.DataFrame( + { + "tpep_pickup_datetime": [datetime(2022, 1, 12)], + "tpep_dropoff_datetime": [datetime(2022, 1, 12)], + "pickup_zip": [94400], + "trip_distance": [2], + "fare_amount": [100], + } + ) + spark_df = spark.createDataFrame(input_df) + output_df = compute_features_fn( + spark_df, "tpep_pickup_datetime", datetime(2022, 1, 1), datetime(2022, 1, 15) + ) + assert isinstance(output_df, pyspark.sql.DataFrame) + assert output_df.count() == 4 # 4 15-min intervals over 1 hr window. diff --git a/mlops_stacks_gcp_fs/tests/training/__init__.py b/mlops_stacks_gcp_fs/tests/training/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mlops_stacks_gcp_fs/tests/training/test_notebooks.py b/mlops_stacks_gcp_fs/tests/training/test_notebooks.py new file mode 100644 index 0000000..7293b1b --- /dev/null +++ b/mlops_stacks_gcp_fs/tests/training/test_notebooks.py @@ -0,0 +1,9 @@ +import pathlib + + +def test_notebook_format(): + # Verify that all Databricks notebooks have the required header + paths = list(pathlib.Path("./notebooks").glob("**/*.py")) + for f in paths: + notebook_str = open(str(f)).read() + assert notebook_str.startswith("# Databricks notebook source") diff --git a/mlops_stacks_gcp_fs/training/__init__.py b/mlops_stacks_gcp_fs/training/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mlops_stacks_gcp_fs/training/data/sample.parquet b/mlops_stacks_gcp_fs/training/data/sample.parquet new file mode 100644 index 0000000..4efd171 Binary files /dev/null and b/mlops_stacks_gcp_fs/training/data/sample.parquet differ diff --git a/mlops_stacks_gcp_fs/training/notebooks/TrainWithFeatureStore.py b/mlops_stacks_gcp_fs/training/notebooks/TrainWithFeatureStore.py new file mode 100644 index 0000000..633143f --- /dev/null +++ b/mlops_stacks_gcp_fs/training/notebooks/TrainWithFeatureStore.py @@ -0,0 +1,280 @@ +# Databricks notebook source +################################################################################## +# Model Training Notebook using Databricks Feature Store +# +# This notebook shows an example of a Model Training pipeline using Databricks Feature Store tables. +# It is configured and can be executed as the "Train" task in the model_training_job workflow defined under +# ``mlops_stacks_gcp_fs/assets/model-workflow-asset.yml`` +# +# Parameters: +# * env (required): - Environment the notebook is run in (staging, or prod). Defaults to "staging". +# * training_data_path (required) - Path to the training data. +# * experiment_name (required) - MLflow experiment name for the training runs. Will be created if it doesn't exist. +# * model_name (required) - MLflow registered model name to use for the trained model. Will be created if it +# * doesn't exist. +################################################################################## + +# COMMAND ---------- + +# MAGIC %load_ext autoreload +# MAGIC %autoreload 2 + +# COMMAND ---------- + +import os +notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()) +%cd $notebook_path + +# COMMAND ---------- + +# MAGIC %pip install -r ../../requirements.txt + +# COMMAND ---------- + +dbutils.library.restartPython() + +# COMMAND ---------- +# DBTITLE 1, Notebook arguments + +# List of input args needed to run this notebook as a job. +# Provide them via DB widgets or notebook arguments. + +# Notebook Environment +dbutils.widgets.dropdown("env", "staging", ["staging", "prod"], "Environment Name") +env = dbutils.widgets.get("env") + +# Path to the Hive-registered Delta table containing the training data. +dbutils.widgets.text( + "training_data_path", + "/databricks-datasets/nyctaxi-with-zipcodes/subsampled", + label="Path to the training data", +) + +# MLflow experiment name. +dbutils.widgets.text( + "experiment_name", + f"/dev-mlops_stacks_gcp_fs-experiment", + label="MLflow experiment name", +) +# MLflow registered model name to use for the trained mode. +dbutils.widgets.text( + "model_name", "dev-mlops_stacks_gcp_fs-model", label="Model Name" +) + +# Pickup features table name +dbutils.widgets.text( + "pickup_features_table", + "feature_store_taxi_example.trip_pickup_features", + label="Pickup Features Table", +) + +# Dropoff features table name +dbutils.widgets.text( + "dropoff_features_table", + "feature_store_taxi_example.trip_dropoff_features", + label="Dropoff Features Table", +) + +# COMMAND ---------- +# DBTITLE 1,Define input and output variables + +input_table_path = dbutils.widgets.get("training_data_path") +experiment_name = dbutils.widgets.get("experiment_name") +model_name = dbutils.widgets.get("model_name") + +# COMMAND ---------- +# DBTITLE 1, Set experiment + +import mlflow + +mlflow.set_experiment(experiment_name) + + +# COMMAND ---------- +# DBTITLE 1, Load raw data + +raw_data = spark.read.format("delta").load(input_table_path) +raw_data.display() + +# COMMAND ---------- +# DBTITLE 1, Helper functions + +from datetime import timedelta, timezone +import math +import mlflow.pyfunc +import pyspark.sql.functions as F +from pyspark.sql.types import IntegerType + + +def rounded_unix_timestamp(dt, num_minutes=15): + """ + Ceilings datetime dt to interval num_minutes, then returns the unix timestamp. + """ + nsecs = dt.minute * 60 + dt.second + dt.microsecond * 1e-6 + delta = math.ceil(nsecs / (60 * num_minutes)) * (60 * num_minutes) - nsecs + return int((dt + timedelta(seconds=delta)).replace(tzinfo=timezone.utc).timestamp()) + + +rounded_unix_timestamp_udf = F.udf(rounded_unix_timestamp, IntegerType()) + + +def rounded_taxi_data(taxi_data_df): + # Round the taxi data timestamp to 15 and 30 minute intervals so we can join with the pickup and dropoff features + # respectively. + taxi_data_df = ( + taxi_data_df.withColumn( + "rounded_pickup_datetime", + F.to_timestamp( + rounded_unix_timestamp_udf( + taxi_data_df["tpep_pickup_datetime"], F.lit(15) + ) + ), + ) + .withColumn( + "rounded_dropoff_datetime", + F.to_timestamp( + rounded_unix_timestamp_udf( + taxi_data_df["tpep_dropoff_datetime"], F.lit(30) + ) + ), + ) + .drop("tpep_pickup_datetime") + .drop("tpep_dropoff_datetime") + ) + taxi_data_df.createOrReplaceTempView("taxi_data") + return taxi_data_df + + +def get_latest_model_version(model_name): + latest_version = 1 + mlflow_client = MlflowClient() + for mv in mlflow_client.search_model_versions(f"name='{model_name}'"): + version_int = int(mv.version) + if version_int > latest_version: + latest_version = version_int + return latest_version + + +# COMMAND ---------- +# DBTITLE 1, Read taxi data for training + +taxi_data = rounded_taxi_data(raw_data) +taxi_data.display() + +# COMMAND ---------- +# DBTITLE 1, Create FeatureLookups + +from databricks.feature_store import FeatureLookup +import mlflow + +pickup_features_table = dbutils.widgets.get("pickup_features_table") +dropoff_features_table = dbutils.widgets.get("dropoff_features_table") + +pickup_feature_lookups = [ + FeatureLookup( + table_name=pickup_features_table, + feature_names=[ + "mean_fare_window_1h_pickup_zip", + "count_trips_window_1h_pickup_zip", + ], + lookup_key=["pickup_zip"], + timestamp_lookup_key=["rounded_pickup_datetime"], + ), +] + +dropoff_feature_lookups = [ + FeatureLookup( + table_name=dropoff_features_table, + feature_names=["count_trips_window_30m_dropoff_zip", "dropoff_is_weekend"], + lookup_key=["dropoff_zip"], + timestamp_lookup_key=["rounded_dropoff_datetime"], + ), +] + +# COMMAND ---------- +# DBTITLE 1, Create Training Dataset + +from databricks import feature_store + +# End any existing runs (in the case this notebook is being run for a second time) +mlflow.end_run() + +# Start an mlflow run, which is needed for the feature store to log the model +mlflow.start_run() + +# Since the rounded timestamp columns would likely cause the model to overfit the data +# unless additional feature engineering was performed, exclude them to avoid training on them. +exclude_columns = ["rounded_pickup_datetime", "rounded_dropoff_datetime"] + +fs = feature_store.FeatureStoreClient() + +# Create the training set that includes the raw input data merged with corresponding features from both feature tables +training_set = fs.create_training_set( + taxi_data, + feature_lookups=pickup_feature_lookups + dropoff_feature_lookups, + label="fare_amount", + exclude_columns=exclude_columns, +) + +# Load the TrainingSet into a dataframe which can be passed into sklearn for training a model +training_df = training_set.load_df() + +# COMMAND ---------- + +# Display the training dataframe, and note that it contains both the raw input data and the features from the Feature Store, like `dropoff_is_weekend` +training_df.display() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Train a LightGBM model on the data returned by `TrainingSet.to_df`, then log the model with `FeatureStoreClient.log_model`. The model will be packaged with feature metadata. + +# COMMAND ---------- +# DBTITLE 1, Train model + +import lightgbm as lgb +from sklearn.model_selection import train_test_split +import mlflow.lightgbm +from mlflow.tracking import MlflowClient + + +features_and_label = training_df.columns + +# Collect data into a Pandas array for training +data = training_df.toPandas()[features_and_label] + +train, test = train_test_split(data, random_state=123) +X_train = train.drop(["fare_amount"], axis=1) +X_test = test.drop(["fare_amount"], axis=1) +y_train = train.fare_amount +y_test = test.fare_amount + +mlflow.lightgbm.autolog() +train_lgb_dataset = lgb.Dataset(X_train, label=y_train.values) +test_lgb_dataset = lgb.Dataset(X_test, label=y_test.values) + +param = {"num_leaves": 32, "objective": "regression", "metric": "rmse"} +num_rounds = 100 + +# Train a lightGBM model +model = lgb.train(param, train_lgb_dataset, num_rounds) + +# COMMAND ---------- +# DBTITLE 1, Log model and return output. + +# Log the trained model with MLflow and package it with feature lookup information. +fs.log_model( + model, + artifact_path="model_packaged", + flavor=mlflow.lightgbm, + training_set=training_set, + registered_model_name=model_name, +) + +# The returned model URI is needed by the model deployment notebook. +model_version = get_latest_model_version(model_name) +model_uri = f"models:/{model_name}/{model_version}" +dbutils.jobs.taskValues.set("model_uri", model_uri) +dbutils.jobs.taskValues.set("model_name", model_name) +dbutils.jobs.taskValues.set("model_version", model_version) +dbutils.notebook.exit(model_uri) diff --git a/mlops_stacks_gcp_fs/training/steps/__init__.py b/mlops_stacks_gcp_fs/training/steps/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mlops_stacks_gcp_fs/training/steps/custom_metrics.py b/mlops_stacks_gcp_fs/training/steps/custom_metrics.py new file mode 100644 index 0000000..23d5eb2 --- /dev/null +++ b/mlops_stacks_gcp_fs/training/steps/custom_metrics.py @@ -0,0 +1,47 @@ +""" +This module defines custom metric functions that are invoked during the 'train' and 'evaluate' +steps to provide model performance insights. Custom metric functions defined in this module are +referenced in the ``metrics`` section of ``recipe.yaml``, for example: + +.. code-block:: yaml + :caption: Example custom metrics definition in ``recipe.yaml`` + + metrics: + custom: + - name: weighted_mean_squared_error + function: weighted_mean_squared_error + greater_is_better: False +""" + +from typing import Dict + +from pandas import DataFrame +from sklearn.metrics import mean_squared_error + + +def weighted_mean_squared_error( + eval_df: DataFrame, + builtin_metrics: Dict[str, int], # pylint: disable=unused-argument +) -> int: + """ + Computes the weighted mean squared error (MSE) metric. + + :param eval_df: A Pandas DataFrame containing the following columns: + + - ``"prediction"``: Predictions produced by submitting input data to the model. + - ``"target"``: Ground truth values corresponding to the input data. + + :param builtin_metrics: A dictionary containing the built-in metrics that are calculated + automatically during model evaluation. The keys are the names of the + metrics and the values are the scalar values of the metrics. For more + information, see + https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.evaluate. + :return: A single-entry dictionary containing the MSE metric. The key is the metric name and + the value is the scalar metric value. Note that custom metric functions can return + dictionaries with multiple metric entries as well. + """ + return mean_squared_error( + eval_df["prediction"], + eval_df["target"], + sample_weight=1 / eval_df["prediction"].values, + ) diff --git a/mlops_stacks_gcp_fs/training/steps/ingest.py b/mlops_stacks_gcp_fs/training/steps/ingest.py new file mode 100644 index 0000000..7dfa89f --- /dev/null +++ b/mlops_stacks_gcp_fs/training/steps/ingest.py @@ -0,0 +1,40 @@ +""" +This module defines the following routines used by the 'ingest' step of the regression recipe: + +- ``load_file_as_dataframe``: Defines customizable logic for parsing dataset formats that are not + natively parsed by MLflow Recipes (i.e. formats other than Parquet, Delta, and Spark SQL). +""" + +import logging + +from pandas import DataFrame + +_logger = logging.getLogger(__name__) + + +def load_file_as_dataframe(file_path: str, file_format: str) -> DataFrame: + """ + Load content from the specified dataset file as a Pandas DataFrame. + + This method is used to load dataset types that are not natively managed by MLflow Recipes + (datasets that are not in Parquet, Delta Table, or Spark SQL Table format). This method is + called once for each file in the dataset, and MLflow Recipes automatically combines the + resulting DataFrames together. + + :param file_path: The path to the dataset file. + :param file_format: The file format string, such as "csv". + :return: A Pandas DataFrame representing the content of the specified file. + """ + + if file_format == "csv": + import pandas + + _logger.warning( + "Loading dataset CSV using `pandas.read_csv()` with default arguments and assumed index" + " column 0 which may not produce the desired schema. If the schema is not correct, you" + " can adjust it by modifying the `load_file_as_dataframe()` function in" + " `steps/ingest.py`" + ) + return pandas.read_csv(file_path, index_col=0) + else: + raise NotImplementedError diff --git a/mlops_stacks_gcp_fs/training/steps/split.py b/mlops_stacks_gcp_fs/training/steps/split.py new file mode 100644 index 0000000..5fa7f92 --- /dev/null +++ b/mlops_stacks_gcp_fs/training/steps/split.py @@ -0,0 +1,37 @@ +""" +This module defines the following routines used by the 'split' step of the regression recipe: + +- ``process_splits``: Defines customizable logic for processing & cleaning the training, validation, + and test datasets produced by the data splitting procedure. +""" + +from pandas import DataFrame + + +def process_splits( + train_df: DataFrame, validation_df: DataFrame, test_df: DataFrame +) -> (DataFrame, DataFrame, DataFrame): + """ + Perform additional processing on the split datasets. + + :param train_df: The training dataset produced by the data splitting procedure. + :param validation_df: The validation dataset produced by the data splitting procedure. + :param test_df: The test dataset produced by the data splitting procedure. + :return: A tuple containing, in order: the processed training dataset, the processed + validation dataset, and the processed test dataset. + """ + + def process(df: DataFrame): + # Drop invalid data points + cleaned = df.dropna() + # Filter out invalid fare amounts and trip distance + cleaned = cleaned[ + (cleaned["fare_amount"] > 0) + & (cleaned["trip_distance"] < 400) + & (cleaned["trip_distance"] > 0) + & (cleaned["fare_amount"] < 1000) + ] + + return cleaned + + return process(train_df), process(validation_df), process(test_df) diff --git a/mlops_stacks_gcp_fs/training/steps/train.py b/mlops_stacks_gcp_fs/training/steps/train.py new file mode 100644 index 0000000..c61b2bd --- /dev/null +++ b/mlops_stacks_gcp_fs/training/steps/train.py @@ -0,0 +1,17 @@ +""" +This module defines the following routines used by the 'train' step of the regression recipe: + +- ``estimator_fn``: Defines the customizable estimator type and parameters that are used + during training to produce a model pipeline. +""" + + +def estimator_fn(): + """ + Returns an *unfitted* estimator that defines ``fit()`` and ``predict()`` methods. + The estimator's input and output signatures should be compatible with scikit-learn + estimators. + """ + from sklearn.linear_model import SGDRegressor + + return SGDRegressor(random_state=42) diff --git a/mlops_stacks_gcp_fs/training/steps/transform.py b/mlops_stacks_gcp_fs/training/steps/transform.py new file mode 100644 index 0000000..7851b89 --- /dev/null +++ b/mlops_stacks_gcp_fs/training/steps/transform.py @@ -0,0 +1,62 @@ +""" +This module defines the following routines used by the 'transform' step of the regression recipe: + +- ``transformer_fn``: Defines customizable logic for transforming input data before it is passed + to the estimator during model inference. +""" + +from pandas import DataFrame +from sklearn.compose import ColumnTransformer +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import OneHotEncoder, StandardScaler, FunctionTransformer + + +def calculate_features(df: DataFrame): + """ + Extend the input dataframe with pickup day of week and hour, and trip duration. + Drop the now-unneeded pickup datetime and dropoff datetime columns. + """ + df["pickup_dow"] = df["tpep_pickup_datetime"].dt.dayofweek + df["pickup_hour"] = df["tpep_pickup_datetime"].dt.hour + trip_duration = df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"] + df["trip_duration"] = trip_duration.map(lambda x: x.total_seconds() / 60) + df.drop(columns=["tpep_pickup_datetime", "tpep_dropoff_datetime"], inplace=True) + return df + + +def transformer_fn(): + """ + Returns an *unfitted* transformer that defines ``fit()`` and ``transform()`` methods. + The transformer's input and output signatures should be compatible with scikit-learn + transformers. + """ + return Pipeline( + steps=[ + ( + "calculate_time_and_duration_features", + FunctionTransformer(calculate_features, feature_names_out="one-to-one"), + ), + ( + "encoder", + ColumnTransformer( + transformers=[ + ( + "hour_encoder", + OneHotEncoder(categories="auto", sparse=False), + ["pickup_hour"], + ), + ( + "day_encoder", + OneHotEncoder(categories="auto", sparse=False), + ["pickup_dow"], + ), + ( + "std_scaler", + StandardScaler(), + ["trip_distance", "trip_duration"], + ), + ] + ), + ), + ] + ) diff --git a/mlops_stacks_gcp_fs/utils.py b/mlops_stacks_gcp_fs/utils.py new file mode 100644 index 0000000..aa30820 --- /dev/null +++ b/mlops_stacks_gcp_fs/utils.py @@ -0,0 +1,21 @@ +"""This module contains utils shared between different notebooks""" + +def get_deployed_model_stage_for_env(env): + """ + Get the model version stage under which the latest deployed model version can be found + for the current environment + :param env: Current environment + :return: Model version stage + """ + # For a registered model version to be served, it needs to be in either the Staging or Production + # model registry stage + # (https://docs.databricks.com/applications/machine-learning/manage-model-lifecycle/index.html#transition-a-model-stage). + # For models in dev and staging environments, we deploy the model to the "Staging" stage, and in prod we deploy to the + # "Production" stage + _MODEL_STAGE_FOR_ENV = { + "dev": "Staging", + "staging": "Staging", + "prod": "Production", + "test": "Production", + } + return _MODEL_STAGE_FOR_ENV[env] diff --git a/mlops_stacks_gcp_fs/validation/README.md b/mlops_stacks_gcp_fs/validation/README.md new file mode 100644 index 0000000..539107e --- /dev/null +++ b/mlops_stacks_gcp_fs/validation/README.md @@ -0,0 +1,2 @@ +# Model Validation +To enable model validation as part of scheduled databricks workflow, please refer to [mlops_stacks_gcp_fs/assets/README.md](../assets/README.md) \ No newline at end of file diff --git a/mlops_stacks_gcp_fs/validation/notebooks/ModelValidation.py b/mlops_stacks_gcp_fs/validation/notebooks/ModelValidation.py new file mode 100644 index 0000000..5441846 --- /dev/null +++ b/mlops_stacks_gcp_fs/validation/notebooks/ModelValidation.py @@ -0,0 +1,283 @@ +# Databricks notebook source +################################################################################## +# Model Validation Notebook +## +# This notebook uses mlflow model validation API to run mode validation after training and registering a model +# in model registry, before deploying it to the"Production" stage. +# +# It runs as part of CD and by an automated model training job -> validation -> deployment job defined under ``mlops_stacks_gcp_fs/assets/model-workflow-asset.yml`` +# +# +# Parameters: +# +# * env - Name of the environment the notebook is run in (staging, or prod). Defaults to "prod". +# * `run_mode` - The `run_mode` defines whether model validation is enabled or not. It can be one of the three values: +# * `disabled` : Do not run the model validation notebook. +# * `dry_run` : Run the model validation notebook. Ignore failed model validation rules and proceed to move +# model to the"Production" stage. +# * `enabled` : Run the model validation notebook. Move model to the "Production" stage only if all model validation +# rules are passing. +# * enable_baseline_comparison - Whether to load the current registered "Production" stage model as baseline. +# Baseline model is a requirement for relative change and absolute change validation thresholds. +# * validation_input - Validation input. Please refer to data parameter in mlflow.evaluate documentation https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.evaluate +# * model_type - A string describing the model type. The model type can be either "regressor" and "classifier". +# Please refer to model_type parameter in mlflow.evaluate documentation https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.evaluate +# * targets - The string name of a column from data that contains evaluation labels. +# Please refer to targets parameter in mlflow.evaluate documentation https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.evaluate +# * custom_metrics_loader_function - Specifies the name of the function in mlops_stacks_gcp_fs/validation/validation.py that returns custom metrics. +# * validation_thresholds_loader_function - Specifies the name of the function in mlops_stacks_gcp_fs/validation/validation.py that returns model validation thresholds. +# +# For details on mlflow evaluate API, see doc https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.evaluate +# For details and examples about performing model validation, see the Model Validation documentation https://mlflow.org/docs/latest/models.html#model-validation +# +################################################################################## + +# COMMAND ---------- + +# MAGIC %load_ext autoreload +# MAGIC %autoreload 2 + +# COMMAND ---------- + +import os +notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()) +%cd $notebook_path + +# COMMAND ---------- + +# MAGIC %pip install -r ../../requirements.txt + +# COMMAND ---------- + +dbutils.library.restartPython() + +# COMMAND ---------- + +import os +notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()) +%cd $notebook_path +%cd ../ + +# COMMAND ---------- + +dbutils.widgets.text( + "experiment_name", + "/dev-mlops_stacks_gcp_fs-experiment", + "Experiment Name", +) +dbutils.widgets.dropdown("run_mode", "disabled", ["disabled", "dry_run", "enabled"], "Run Mode") +dbutils.widgets.dropdown("enable_baseline_comparison", "false", ["true", "false"], "Enable Baseline Comparison") +dbutils.widgets.text("validation_input", "SELECT * FROM delta.`dbfs:/databricks-datasets/nyctaxi-with-zipcodes/subsampled`", "Validation Input") + +dbutils.widgets.text("model_type", "regressor", "Model Type") +dbutils.widgets.text("targets", "fare_amount", "Targets") +dbutils.widgets.text("custom_metrics_loader_function", "custom_metrics", "Custom Metrics Loader Function") +dbutils.widgets.text("validation_thresholds_loader_function", "validation_thresholds", "Validation Thresholds Loader Function") +dbutils.widgets.text("evaluator_config_loader_function", "evaluator_config", "Evaluator Config Loader Function") +dbutils.widgets.text("model_name", "dev-mlops_stacks_gcp_fs-model", "Model Name") +dbutils.widgets.text("model_version", "", "Candidate Model Version") + +# COMMAND ---------- + +print( + "Currently model validation is not supported for models registered with feature store. Please refer to " + "issue https://github.com/databricks/mlops-stacks/issues/70 for more details." +) +dbutils.notebook.exit(0) +run_mode = dbutils.widgets.get("run_mode").lower() +assert run_mode == "disabled" or run_mode == "dry_run" or run_mode == "enabled" + +if run_mode == "disabled": + print( + "Model validation is in DISABLED mode. Exit model validation without blocking model deployment." + ) + dbutils.notebook.exit(0) +dry_run = run_mode == "dry_run" + +if dry_run: + print( + "Model validation is in DRY_RUN mode. Validation threshold validation failures will not block model deployment." + ) +else: + print( + "Model validation is in ENABLED mode. Validation threshold validation failures will block model deployment." + ) + +# COMMAND ---------- + +import importlib +import mlflow +import os +import tempfile +import traceback + +from mlflow.tracking.client import MlflowClient + +client = MlflowClient() + +# set experiment +experiment_name = dbutils.widgets.get("experiment_name") +mlflow.set_experiment(experiment_name) + +# set model evaluation parameters that can be inferred from the job +model_uri = dbutils.jobs.taskValues.get("Train", "model_uri", debugValue="") +model_name = dbutils.jobs.taskValues.get("Train", "model_name", debugValue="") +model_version = dbutils.jobs.taskValues.get("Train", "model_version", debugValue="") + +if model_uri == "": + model_name = dbutils.widgets.get("model_name") + model_version = dbutils.widgets.get("model_version") + model_uri = "models:/" + model_name + "/" + model_version + +baseline_model_uri = "models:/" + model_name + "/Production" + +evaluators = "default" +assert model_uri != "", "model_uri notebook parameter must be specified" +assert model_name != "", "model_name notebook parameter must be specified" +assert model_version != "", "model_version notebook parameter must be specified" + +# COMMAND ---------- + +# take input +enable_baseline_comparison = dbutils.widgets.get("enable_baseline_comparison") +assert enable_baseline_comparison == "true" or enable_baseline_comparison == "false" +enable_baseline_comparison = enable_baseline_comparison == "true" + +validation_input = dbutils.widgets.get("validation_input") +assert validation_input +data = spark.sql(validation_input) + +model_type = dbutils.widgets.get("model_type") +targets = dbutils.widgets.get("targets") + +assert model_type +assert targets + +custom_metrics_loader_function_name = dbutils.widgets.get("custom_metrics_loader_function") +validation_thresholds_loader_function_name = dbutils.widgets.get("validation_thresholds_loader_function") +evaluator_config_loader_function_name = dbutils.widgets.get("evaluator_config_loader_function") +assert custom_metrics_loader_function_name +assert validation_thresholds_loader_function_name +assert evaluator_config_loader_function_name +custom_metrics_loader_function = getattr( + importlib.import_module("validation"), custom_metrics_loader_function_name +) +validation_thresholds_loader_function = getattr( + importlib.import_module("validation"), validation_thresholds_loader_function_name +) +evaluator_config_loader_function = getattr( + importlib.import_module("validation"), evaluator_config_loader_function_name +) +custom_metrics = custom_metrics_loader_function() +validation_thresholds = validation_thresholds_loader_function() +evaluator_config = evaluator_config_loader_function() + +# COMMAND ---------- + +# helper methods +def get_run_link(run_info): + return "[Run](#mlflow/experiments/{0}/runs/{1})".format( + run_info.experiment_id, run_info.run_id + ) + + +def get_training_run(model_name, model_version): + version = client.get_model_version(model_name, model_version) + return mlflow.get_run(run_id=version.run_id) + + +def generate_run_name(training_run): + return None if not training_run else training_run.info.run_name + "-validation" + + +def generate_description(training_run): + return ( + None + if not training_run + else "Model Training Details: {0}\n".format(get_run_link(training_run.info)) + ) + + +def log_to_model_description(run, success): + run_link = get_run_link(run.info) + description = client.get_model_version(model_name, model_version).description + status = "SUCCESS" if success else "FAILURE" + if description != "": + description += "\n\n---\n\n" + description += "Model Validation Status: {0}\nValidation Details: {1}".format( + status, run_link + ) + client.update_model_version( + name=model_name, version=model_version, description=description + ) + +# COMMAND ---------- + +training_run = get_training_run(model_name, model_version) + +# run evaluate +with mlflow.start_run( + run_name=generate_run_name(training_run), + description=generate_description(training_run), +) as run, tempfile.TemporaryDirectory() as tmp_dir: + validation_thresholds_file = os.path.join(tmp_dir, "validation_thresholds.txt") + with open(validation_thresholds_file, "w") as f: + if validation_thresholds: + for metric_name in validation_thresholds: + f.write( + "{0:30} {1}\n".format( + metric_name, str(validation_thresholds[metric_name]) + ) + ) + mlflow.log_artifact(validation_thresholds_file) + + try: + eval_result = mlflow.evaluate( + model=model_uri, + data=data, + targets=targets, + model_type=model_type, + evaluators=evaluators, + validation_thresholds=validation_thresholds, + custom_metrics=custom_metrics, + baseline_model=None + if not enable_baseline_comparison + else baseline_model_uri, + evaluator_config=evaluator_config, + ) + metrics_file = os.path.join(tmp_dir, "metrics.txt") + with open(metrics_file, "w") as f: + f.write( + "{0:30} {1:30} {2}\n".format("metric_name", "candidate", "baseline") + ) + for metric in eval_result.metrics: + candidate_metric_value = str(eval_result.metrics[metric]) + baseline_metric_value = "N/A" + if metric in eval_result.baseline_model_metrics: + mlflow.log_metric( + "baseline_" + metric, eval_result.baseline_model_metrics[metric] + ) + baseline_metric_value = str( + eval_result.baseline_model_metrics[metric] + ) + f.write( + "{0:30} {1:30} {2}\n".format( + metric, candidate_metric_value, baseline_metric_value + ) + ) + mlflow.log_artifact(metrics_file) + log_to_model_description(run, True) + + except Exception as err: + log_to_model_description(run, False) + error_file = os.path.join(tmp_dir, "error.txt") + with open(error_file, "w") as f: + f.write("Validation failed : " + str(err) + "\n") + f.write(traceback.format_exc()) + mlflow.log_artifact(error_file) + if not dry_run: + raise err + else: + print( + "Model validation failed in DRY_RUN. It will not block model deployment." + ) diff --git a/mlops_stacks_gcp_fs/validation/validation.py b/mlops_stacks_gcp_fs/validation/validation.py new file mode 100644 index 0000000..ac4f2ea --- /dev/null +++ b/mlops_stacks_gcp_fs/validation/validation.py @@ -0,0 +1,41 @@ +import numpy as np +from mlflow.models import make_metric, MetricThreshold + +# Custom metrics to be included. Return empty list if custom metrics are not needed. +# Please refer to custom_metrics parameter in mlflow.evaluate documentation https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.evaluate +# TODO(optional) : custom_metrics +def custom_metrics(): + + # TODO(optional) : define custom metric function to be included in custom_metrics. + def squared_diff_plus_one(eval_df, _builtin_metrics): + """ + This example custom metric function creates a metric based on the ``prediction`` and + ``target`` columns in ``eval_df`. + """ + return np.sum(np.abs(eval_df["prediction"] - eval_df["target"] + 1) ** 2) + + return [make_metric(eval_fn=squared_diff_plus_one, greater_is_better=False)] + + +# Define model validation rules. Return empty dict if validation rules are not needed. +# Please refer to validation_thresholds parameter in mlflow.evaluate documentation https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.evaluate +# TODO(optional) : validation_thresholds +def validation_thresholds(): + return { + "max_error": MetricThreshold( + threshold=500, higher_is_better=False # max_error should be <= 500 + ), + "mean_squared_error": MetricThreshold( + threshold=20, # mean_squared_error should be <= 20 + # min_absolute_change=0.01, # mean_squared_error should be at least 0.01 greater than baseline model accuracy + # min_relative_change=0.01, # mean_squared_error should be at least 1 percent greater than baseline model accuracy + higher_is_better=False, + ), + } + + +# Define evaluator config. Return empty dict if validation rules are not needed. +# Please refer to evaluator_config parameter in mlflow.evaluate documentation https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.evaluate +# TODO(optional) : evaluator_config +def evaluator_config(): + return {} diff --git a/test-requirements.txt b/test-requirements.txt new file mode 100644 index 0000000..296b56e --- /dev/null +++ b/test-requirements.txt @@ -0,0 +1,2 @@ +pytest>=7.1.2 +pandas==1.5.3 \ No newline at end of file