Skip to content

Commit

Permalink
Fill time steps for single time-series in DeepAR predict (#152)
Browse files Browse the repository at this point in the history
* Fill missing time steps in DeepAR predict

* Add single series test

* Fix multi series missing time steps and add test

* nit

* Support multiple id_cols and add test case

* Bump version

* docstring

* Address comments and add test
  • Loading branch information
es94129 authored Oct 9, 2024
1 parent f6de201 commit f803112
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 25 deletions.
28 changes: 14 additions & 14 deletions runtime/databricks/automl_runtime/forecast/deepar/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
from gluonts.torch.model.predictor import PyTorchPredictor
from mlflow.utils.environment import _mlflow_conda_env

from databricks.automl_runtime.forecast.model import ForecastModel, mlflow_forecast_log_model
from databricks.automl_runtime import version

from databricks.automl_runtime.forecast.model import ForecastModel, mlflow_forecast_log_model
from databricks.automl_runtime.forecast.deepar.utils import set_index_and_fill_missing_time_steps

DEEPAR_ADDITIONAL_PIP_DEPS = [
f"gluonts[torch]=={gluonts.__version__}",
Expand All @@ -42,26 +42,25 @@ class DeepARModel(ForecastModel):
DeepAR mlflow model wrapper for forecasting.
"""

def __init__(self, model: PyTorchPredictor, horizon: int, num_samples: int,
def __init__(self, model: PyTorchPredictor, horizon: int, frequency: str,
num_samples: int,
target_col: str, time_col: str,
id_cols: Optional[List[str]] = None) -> None:
"""
Initialize the DeepAR mlflow Python model wrapper
:param model: DeepAR model
:param horizon: the number of periods to forecast forward
:param frequency: the frequency of the time series
:param num_samples: the number of samples to draw from the distribution
:param target_col: the target column name
:param time_col: the time column name
:param id_cols: the column names of the identity columns for multi-series time series; None for single series
"""

# TODO: combine id_cols in predict() to ts_id when there are multiple id_cols
if id_cols and len(id_cols) > 1:
raise NotImplementedError("Logging multiple id_cols for DeepAR in AutoML are not supported yet")

super().__init__()
self._model = model
self._horizon = horizon
self._frequency = frequency
self._num_samples = num_samples
self._target_col = target_col
self._time_col = time_col
Expand Down Expand Up @@ -99,7 +98,8 @@ def predict(self,

pred_df = pred_df.rename(columns={'index': self._time_col})
if self._id_cols:
pred_df = pred_df.rename(columns={'item_id': self._id_cols[0]})
id_col_name = '-'.join(self._id_cols)
pred_df = pred_df.rename(columns={'item_id': id_col_name})
else:
pred_df = pred_df.drop(columns='item_id')

Expand All @@ -119,12 +119,12 @@ def predict_samples(self,
if num_samples is None:
num_samples = self._num_samples

model_input = model_input.set_index(self._time_col)
if self._id_cols:
test_ds = PandasDataset.from_long_dataframe(model_input, target=self._target_col,
item_id=self._id_cols[0], unchecked=True)
else:
test_ds = PandasDataset(model_input, target=self._target_col)
model_input_transformed = set_index_and_fill_missing_time_steps(model_input,
self._time_col,
self._frequency,
self._id_cols)

test_ds = PandasDataset(model_input_transformed, target=self._target_col)

forecast_iter = self._model.predict(test_ds, num_samples=num_samples)
forecast_sample_list = list(forecast_iter)
Expand Down
55 changes: 55 additions & 0 deletions runtime/databricks/automl_runtime/forecast/deepar/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Copyright (C) 2024 Databricks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import List, Optional

import pandas as pd


def set_index_and_fill_missing_time_steps(df: pd.DataFrame, time_col: str,
frequency: str,
id_cols: Optional[List[str]] = None):
"""
Transform the input dataframe to an acceptable format for the GluonTS library.
- Set the time column as the index
- Impute missing time steps between the min and max time steps
:param df: the input dataframe that contains time_col
:param time_col: time column name
:param frequency: the frequency of the time series
:param id_cols: the column names of the identity columns for multi-series time series; None for single series
:return: single-series - transformed dataframe;
multi-series - dictionary of transformed dataframes, each key is the (concatenated) id of the time series
"""
total_min, total_max = df[time_col].min(), df[time_col].max()
new_index_full = pd.date_range(total_min, total_max, freq=frequency)

if id_cols is not None:
df_dict = {}
for grouped_id, grouped_df in df.groupby(id_cols):
if isinstance(grouped_id, tuple):
ts_id = "-".join([str(x) for x in grouped_id])
else:
ts_id = str(grouped_id)
df_dict[ts_id] = (grouped_df.set_index(time_col).sort_index()
.reindex(new_index_full).drop(id_cols, axis=1))

return df_dict

df = df.set_index(time_col).sort_index()

# Fill in missing time steps between the min and max time steps
return df.reindex(new_index_full)
2 changes: 1 addition & 1 deletion runtime/databricks/automl_runtime/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
# limitations under the License.
#

__version__ = "0.2.20.3.dev" # pragma: no cover
__version__ = "0.2.20.3" # pragma: no cover
67 changes: 57 additions & 10 deletions runtime/tests/automl_runtime/forecast/deepar/model_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def test_model_save_and_load_single_series(self):
deepar_model = DeepARModel(
model=self.model,
horizon=self.prediction_length,
frequency="d",
num_samples=1,
target_col=target_col,
time_col=time_col,
Expand Down Expand Up @@ -114,9 +115,9 @@ def test_model_save_and_load_single_series(self):
loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model")
pred_df = loaded_model.predict(sample_input)

assert pred_df.columns.tolist() == [time_col, "yhat"]
assert len(pred_df) == self.prediction_length
assert pred_df[time_col].min() > sample_input[time_col].max()
self.assertEqual(pred_df.columns.tolist(), [time_col, "yhat"])
self.assertEqual(len(pred_df), self.prediction_length)
self.assertGreater(pred_df[time_col].min(), sample_input[time_col].max())

def test_model_save_and_load_multi_series(self):
target_col = "sales"
Expand All @@ -127,25 +128,26 @@ def test_model_save_and_load_multi_series(self):
model=self.model,
horizon=self.prediction_length,
num_samples=1,
frequency="d",
target_col=target_col,
time_col=time_col,
id_cols=[id_col],
)

num_rows = 10
num_rows_per_ts = 10
sample_input_base = pd.concat(
[
pd.to_datetime(
pd.Series(range(num_rows), name=time_col).apply(
pd.Series(range(num_rows_per_ts), name=time_col).apply(
lambda i: f"2020-10-{3 * i + 1}"
)
),
pd.Series(range(num_rows), name=target_col),
pd.Series(range(num_rows_per_ts), name=target_col),
],
axis=1,
)
sample_input = pd.concat([sample_input_base.copy(), sample_input_base.copy()], ignore_index=True)
sample_input[id_col] = [1] * num_rows + [2] * num_rows
sample_input[id_col] = [1] * num_rows_per_ts + [2] * num_rows_per_ts

with mlflow.start_run() as run:
mlflow_deepar_log_model(deepar_model, sample_input)
Expand All @@ -155,13 +157,58 @@ def test_model_save_and_load_multi_series(self):
# check if all additional dependencies are logged
self._check_requirements(run_id)

# load the model and predict
loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model")

pred_df = loaded_model.predict(sample_input)

self.assertEqual(pred_df.columns.tolist(), [time_col, "yhat", id_col])
self.assertEqual(len(pred_df), self.prediction_length * 2)
self.assertGreater(pred_df[time_col].min(), sample_input[time_col].max())

def test_model_save_and_load_multi_series_multi_id_cols(self):
target_col = "sales"
time_col = "date"
id_cols = ["store", "dept"]

deepar_model = DeepARModel(
model=self.model,
horizon=self.prediction_length,
num_samples=1,
frequency="d",
target_col=target_col,
time_col=time_col,
id_cols=id_cols,
)

num_rows_per_ts = 5
sample_input_base = pd.concat(
[
pd.to_datetime(
pd.Series(range(num_rows_per_ts), name=time_col).apply(
lambda i: f"2020-10-{3 * i + 1}"
)
),
pd.Series(range(num_rows_per_ts), name=target_col),
],
axis=1,
)
sample_input = pd.concat([sample_input_base.copy(), sample_input_base.copy(),
sample_input_base.copy(), sample_input_base.copy(),], ignore_index=True)
sample_input[id_cols[0]] = ['A'] * (2 * num_rows_per_ts) + ['B'] * (2 * num_rows_per_ts)
sample_input[id_cols[1]] = (['X'] * num_rows_per_ts + ['Y'] * num_rows_per_ts) * 2

with mlflow.start_run() as run:
mlflow_deepar_log_model(deepar_model, sample_input)
run_id = run.info.run_id

# load the model and predict
loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model")
pred_df = loaded_model.predict(sample_input)

assert pred_df.columns.tolist() == [time_col, "yhat", id_col]
assert len(pred_df) == self.prediction_length * 2
assert pred_df[time_col].min() > sample_input[time_col].max()
self.assertEqual(pred_df.columns.tolist(), [time_col, "yhat", "store-dept"])
self.assertEqual(len(pred_df), self.prediction_length * 4)
self.assertGreater(pred_df[time_col].min(), sample_input[time_col].max())

def _check_requirements(self, run_id: str):
# read requirements.txt from the run
Expand Down
110 changes: 110 additions & 0 deletions runtime/tests/automl_runtime/forecast/deepar/utils_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#
# Copyright (C) 2024 Databricks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest

import pandas as pd

from databricks.automl_runtime.forecast.deepar.utils import set_index_and_fill_missing_time_steps


class TestDeepARUtils(unittest.TestCase):
def test_single_series_filled(self):
target_col = "sales"
time_col = "date"
num_rows = 10

base_df = pd.concat(
[
pd.to_datetime(
pd.Series(range(num_rows), name=time_col).apply(
lambda i: f"2020-10-{i + 1}"
)
),
pd.Series(range(num_rows), name=target_col),
],
axis=1,
)
dropped_df = base_df.drop([4, 5]).reset_index(drop=True)

transformed_df = set_index_and_fill_missing_time_steps(dropped_df, time_col, "D")

expected_df = base_df.copy()
expected_df.loc[[4, 5], target_col] = float('nan')
expected_df = expected_df.set_index(time_col).rename_axis(None).asfreq("D")

pd.testing.assert_frame_equal(transformed_df, expected_df)

def test_multi_series_filled(self):
target_col = "sales"
time_col = "date"
id_col = "store"

num_rows_per_ts = 10
base_df = pd.concat(
[
pd.to_datetime(
pd.Series(range(num_rows_per_ts), name=time_col).apply(
lambda i: f"2020-10-{i + 1}"
)
),
pd.Series(range(num_rows_per_ts), name=target_col),
],
axis=1,
)
dropped_base_df = base_df.drop([4, 5]).reset_index(drop=True)
dropped_df = pd.concat([dropped_base_df.copy(), dropped_base_df.copy()], ignore_index=True)
dropped_df[id_col] = [1] * (num_rows_per_ts - 2) + [2] * (num_rows_per_ts - 2)

transformed_df_dict = set_index_and_fill_missing_time_steps(dropped_df, time_col, "D", id_cols=[id_col])
self.assertEqual(transformed_df_dict.keys(), {"1", "2"})

expected_first_df = base_df.copy()
expected_first_df.loc[[4, 5], target_col] = float('nan')
expected_first_df = expected_first_df.set_index(time_col).rename_axis(None).asfreq("D")

pd.testing.assert_frame_equal(transformed_df_dict["1"], expected_first_df)

def test_multi_series_multi_id_cols_filled(self):
target_col = "sales"
time_col = "date"
id_cols = ["store", "dept"]

num_rows_per_ts = 10
base_df = pd.concat(
[
pd.to_datetime(
pd.Series(range(num_rows_per_ts), name=time_col).apply(
lambda i: f"2020-10-{i + 1}"
)
),
pd.Series(range(num_rows_per_ts), name=target_col),
],
axis=1,
)
dropped_base_df = base_df.drop([4, 5]).reset_index(drop=True)
dropped_df = pd.concat([dropped_base_df.copy(), dropped_base_df.copy(),
dropped_base_df.copy(), dropped_base_df.copy()], ignore_index=True)
dropped_df[id_cols[0]] = ([1] * (num_rows_per_ts - 2) + [2] * (num_rows_per_ts - 2)) * 2
dropped_df[id_cols[1]] = [1] * (2 * (num_rows_per_ts - 2)) + [2] * (2 * (num_rows_per_ts - 2))

transformed_df_dict = set_index_and_fill_missing_time_steps(dropped_df, time_col, "D", id_cols=id_cols)
self.assertEqual(transformed_df_dict.keys(), {"1-1", "1-2", "2-1", "2-2"})

expected_first_df = base_df.copy()
expected_first_df.loc[[4, 5], target_col] = float('nan')
expected_first_df = expected_first_df.set_index(time_col).rename_axis(None).asfreq("D")

pd.testing.assert_frame_equal(transformed_df_dict["1-1"], expected_first_df)

0 comments on commit f803112

Please sign in to comment.