Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add bigframes.ml.compose.SQLScalarColumnTransformer to create custom SQL-based transformations #955

Merged
merged 52 commits into from
Sep 25, 2024

Conversation

ferenc-hechler
Copy link
Contributor

@ferenc-hechler ferenc-hechler commented Sep 4, 2024

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)

Fixes #954 🦕

This PR adds the possibility to create custom-transformers to be used in ColumnTransformer.
The base idea is to implement the _compile_to_sql() and _parse_from_sql() methods which are called from ColumnTransformer.
Each CustomTransformer registers itself in a registry and in addition to _BQML_TRANSFROM_TYPE_MAPPING, which contains the ML. transformers, this registry is queried, when transformers are created from reading in a model.

A very simple example is the Identity-Transformer. It does not change the value:

  column AS ident_column

The implementation looks like this:

import bigframes.pandas as bpd
from bigframes.ml.compose import CustomTransformer
from typing import List, Optional, Union, Dict
import re

IDENT_BQSQL_RX = re.compile("^(?P<colname>[a-z][a-z0-9_]+)$", flags=re.IGNORECASE)

class IdentityTransformer(CustomTransformer):
    _CTID = "IDENT"

    def custom_compile_to_sql(self, X: bpd.DataFrame, column: str) -> str:
        return f"{column}"

    @classmethod
    def custom_parse_from_sql(cls, config: Optional[Union[Dict, List]], sql: str) -> tuple[CustomTransformer, str]:
        col_label = IDENT_BQSQL_RX.match(sql).group("colname")
        return cls(), col_label

CustomTransformer.register(IdentityTransformer)

The class CustomTransformer is new and was added in this PR to bigframes.ml.compose, where also ColumnTransformer is implemented.
It provides some base functionality, like iterating over the column list, which can be reused for all inherited classes.
It also maintains the registry of custom transformers.
The class variable _CTID is used for identifying the transform and has to be unique over all registered custom transformers.

Another example which replaces a string column with its length and uses a configurable default-value for NULL values.
The SQL for implementing this transformation looks like this:

  CASE WHEN {column} IS NULL THEN {default_value} ELSE LENGTH({column}) END AS len_{column}

Here the corresponding code

LEN_BQSQL_RX = re.compile("^CASE WHEN (?P<colname>[a-z][a-z0-9_]*) IS NULL THEN (?P<defaultvalue>[-]?[0-9]+) ELSE LENGTH[(](?P=colname)[)] END$", flags=re.IGNORECASE)

class LengthTransformer(CustomTransformer):
    _CTID = "LEN"
    _DEFAULT_VALUE_DEFAULT = -1
    
    def __init__(self, default_value: Optional[int] = None):
        self.default_value = default_value

    def custom_compile_to_sql(self, X: bpd.DataFrame, column: str) -> str:
        default_value = (
            self.default_value
            if self.default_value is not None
            else LengthTransformer._DEFAULT_VALUE_DEFAULT
        )
        return f"CASE WHEN {column} IS NULL THEN {default_value} ELSE LENGTH({column}) END"
    
    @classmethod
    def custom_parse_from_sql(cls, config: Optional[Union[Dict, List]], sql: str) -> tuple[CustomTransformer, str]:
        m = LEN_BQSQL_RX.match(sql)
        col_label = m.group("colname")
        default_value = int(m.group("defaultvalue"))
        return cls(default_value), col_label

CustomTransformer.register(LengthTransformer)

The custom_compile_to_sql() implementation is straight forward.
In custom_parse_from_sql() the configured parameters are reverse engineered from the SQL code using RegEx.

But this might not always be possible or useful, so we added a feature to persist config parameters along with the sql.
This is done by using a comment section in the SQL.

  CASE WHEN {column} IS NULL THEN {default_value} ELSE LENGTH({column}) END /*LEN([99])*/ AS len_{column}

So, the comment /*<custom-transformer-id>(<json-args>)*/ is stored in the SQL text of the model and can be used to retrieve this information when parsing the SQL.
Okay, this is a hack and It will not work, when the model strips away the comments.
But at the moment it works and if this will change in future, another solution can be found to store this information in the model.

The CustomTransformer class implements the serialization of the config, so only the method get_persistent_config() has to be implemented returning a dict or list:

    def get_persistent_config(self, column: str) -> Optional[Union[Dict, List]]:
        return [self.default_value]

When parsing the SQL, this persistent_config is retrieved from the SQL and given as additional input to the parse method:

    @classmethod
    def custom_parse_from_sql(cls, persistent_config: Optional[Union[Dict, List]], sql: str) -> tuple[CustomTransformer, str]:
        col_label = LEN2_BQSQL_RX.match(sql).group("colname")
        default_value = persistent_config[0]   # get default value from persistent_config instead of parsing SQL
        return cls(default_value), col_label

Custom transformers can be used in the same way, as the standard transformers, like LabelEncoder.
This piece of codes creates a ColumnTransformer, fits it and stores the Transform-Only model.

df = bpd.read_gbq(SELECT_TABLE)
col_trans = ColumnTransformer([
    ("identity", IdentityTransformer(), ['address', 'pack', 'store_location']),
    ("length", Length1Transformer(-2), ['store_location']),
    ("length", Length2Transformer(99), ['address']),
    ("labelenc", LabelEncoder(), ['county', 'category_name']),
])
col_trans = col_trans.fit(df)
col_trans.to_gbq(TRANSFORM_MODEL, replace=True)

IdentityTransformer, Length1Transformer and Length2Transformer are inhreited from CustomTransformer, while LabelEncoder is from bigframes.ml.preprocessing.

Loading and applying the model is done with the following code:

df = bpd.read_gbq(SELECT_TABLE)
transform_model = bpd.get_global_session().read_gbq_model(TRANSFORM_MODEL)
df_trans = transform_model.transform(df)

transform_model is the ColumnTransformer which we persisted above.

Here is the complete source code:
custom_transformers.zip

Copy link

google-cla bot commented Sep 4, 2024

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

@product-auto-label product-auto-label bot added size: m Pull request size is medium. api: bigquery Issues related to the googleapis/python-bigquery-dataframes API. labels Sep 4, 2024
@tswast
Copy link
Collaborator

tswast commented Sep 4, 2024

Thanks so much for the Pull Request and the detailed discussion @ferenc-hechler !

So, the comment /()/ is stored in the SQL text of the model and can be used to retrieve this information when parsing the SQL.
Okay, this is a hack and It will not work, when the model strips away the comments.

This is a fascinating approach, and not one that I've considered. I had considered embedding parameters in SQL, but not how one could recover them as you do here. I'll check in with some backend BQML folks to see if this comment embedding is something we could rely on.

@product-auto-label product-auto-label bot added size: l Pull request size is large. and removed size: m Pull request size is medium. labels Sep 5, 2024
Copy link
Collaborator

@tswast tswast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much for your work here and the code examples in the tests. I definitely want to provide something like this, so this is a great starting point. I'll do some design work this week to come up with some proposals for how we might hook into our compilation steps to be able to do something like this in a less SQL-forward way.

bigframes/ml/compose.py Outdated Show resolved Hide resolved
bigframes/ml/compose.py Outdated Show resolved Hide resolved
Copy link
Collaborator

@tswast tswast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your patience as I try to understand this change and your requirements.

Some questions for you:

  1. How important is it for the custom transformation to be deployed as part of the Pipeline to BQML? Another way to phrase this: does all preprocessing need to happen in the TRANSFORM clause?
  2. If (1) is important, then how important is it that one can recover the original Pipeline after training? That is to say, if it were possible to use the trained model for predictions without being able to inspect exactly what model / transformations were applied from Python, would that be acceptable?

Overall, I'm warming up to the idea of a SQL-based extension mechanism / helpers, but I am concerned about

  1. Offering such a bigframes.ml specific way to hook into how BigQuery DataFrames compiles its SQL.
  2. Trying to recover the original pipeline via special comments.

Long-term, I'd like for our extension mechanism to be provided by the existing scalar BigQuery DataFrames APIs, such as Series.case_when.

I envision something like the following:

class OneHotEncodingPlusOutliers([bigframes.ml.ScalarTransformerMixin](http://bigframes.ml.scalartransformermixin/)):
	def __init__(self, column_name, num_values):
		self.column_name = column_name
		self.num_values = num_values

	def fit(self, X):
		# Find unique values in column_name.
		values = X[[self.column_name]].groupby(self.column_name, as_index=False).size()
		self.top_values_ = values.sort_values("size").head(self.num_values).to_list()

	def transform(self, X):
		# Note: This can only do scalar operations.
		original = X[self.column_name]
		X = X.drop(columns=[self.column_name])
		X = X.assign(**{
			f"{self.column_name}_{value}": original.eq(value)			for value in self.top_values_
		})
		X = X.assign(**{f"{self.column_name}_others": ~original.isin(self.top_values)})
		return X

This will take some time, though. Probably longer than the timelines we'd like. Compilation is changing, so extracting the SQL for these scalar transformations is a bit blocked by #827

Some alternatives to consider:

(a). Make bigframes better able to utilize TRANSFORM_ONLY models that have already been deployed to BQML "models" by making them compatible with read_gbq_model(). This would required relaxing some of the validations, as you've done in this PR. We're tracking this request internally in issue 365996054.

A downside of this approach is that the BQML TRANSFORM clause cannot contain a call to ML.PREDICT / ML.TRANSFORM. To allow these TRANSFORM_ONLY models in a Pipeline would require changes to our compilation and possibly our execution. All transforms that happen before (and including) one of these opaque read_gbq_model() TRANSFORM_ONLY models cannot be deployed with the final BQML model for the Pipeline. Instead, we'd have to embed these transformations in the SQL table expression representing the training data and/or execute several queries.

(b). If (a) is acceptable, then I would be open to some helpers in bigframes to make deploying custom TRANSFORM_ONLY models based on SQL easier.

(c). If (a/b) is not acceptable, perhaps there is some intermediary step we can take towards the long-term vision that would unblock you?

bigframes/ml/compose.py Outdated Show resolved Hide resolved
@ferenc-hechler
Copy link
Contributor Author

ferenc-hechler commented Sep 12, 2024

We discussed your questions internally and came to the following conclusions.

  1. How important is it for the custom transformation to be deployed as part of the Pipeline to BQML?
    Another way to phrase this:
    does all preprocessing need to happen in the TRANSFORM clause?

If all preprocessing could happen in one TRANSFORM clause, this would be the ideal case (top scenario).

image

But we already recognized that our existing Transformer-Framework has no limitation on transforming
a column multiple times. So, we would most likely end up with multiple input transformers and only the
last one can be integrated into the bundle (middle scenario).

To answer your question: If we anyway have to apply other transformations, we do not need to bundle
the last transformation with the estimator (scenario 3). So, we are fine if this would not be supported.

[...] how important is it that one can recover the original Pipeline after training?
That is to say, if it were possible to use the trained model for predictions without being able
to inspect exactly what model / transformations were applied from Python,
would that be acceptable?

That one was harder to answer. Let´s formulate it this way:
It would not be a show-stopper for us, but we are loosing transparency.
If a Model has bad prediction performance in production, the reason for this has to be analyzed.
And for the DataScientists the fitted-parameters are a valuable hint,
to find out what might be different between training and production data.

Some kind of data-shift/-skew can be detected based on the transformer-model parameters.

A question:
Where do the ML.xxx transformers store there fitted parameters?
Could we use the same place for storing our custom transformer parameters?

Long term [...] I envision something like the following:
class OneHotEncodingPlusOutliers( [...]

This is a good example, what is not possible using plain ColumnTransformer transformations.
If this is possible (and performant) we do not need to talk any more about generating SQLs :-)

But, as we focus on a Transformer-Framework which creates the transformations based on a given configuration,
we can manage such transformations outside of the ColumnTransformer fit() method.
The elementary column-transformers would then just be EqualityTransformers for the same column with different output names.

col_trans = ColumnTransformer([
    ("book_cat", EqualityTransformer("book"), ['category']),
    ("record_cat", EqualityTransformer("record"), ['category']),
    ("magazine_cat", EqualityTransformer("magazine"), ['category']),
    ...
])

And in our Transformer-Framework this would be a config like:

preprocessing:
  stages:
    - stage_name: "CategoricalEncoder"
      stage_params:
        cardinalityThreshold: 10
        newCategory: "other"
        frequencyThreshold: .01

This means, the transformer frameworks creates one-hot-encoding columns if the cardinality is below 10,
otherwise another encoder (Mean-Target) is used, which would then look like this:

col_trans = ColumnTransformer([
    ("tme", TargetMeanEncoder(), ['category']),
    ...
])

And the parameters for the TargetMeanEncoder would then be derived in the ColumnTransformer.fit() call.

{"book": 33.27,"record": 18.45,"magazin": 4.61,...}

Finally the SQL generated would be a case-when.

So, we are able to split the preprocessing and are not limited to one-column transformations.
But the result will always be a list of ColumnTransformers which are then applied sequentially.

(a). Make bigframes better able to utilize TRANSFORM_ONLY models
that have already been deployed to BQML [...]

Does this mean, that unknown transformations are not dropped but passed through as black-box transformers?
I think this would be good, independently of our needs.

A downside of this approach is that the BQML TRANSFORM clause
cannot contain a call to ML.PREDICT / ML.TRANSFORM. [...]

As already mentioned above, this is fine for us.

(b). If (a) is acceptable, then I would be open to some helpers in bigframes
to make deploying custom TRANSFORM_ONLY models based on SQL easier.

If we also get the possibility to store our fitted parameters somewhere,
this is a good alternative to pimping up ColumnTransformer.

(c). If (a/b) is not acceptable, perhaps there is some intermediary step
we can take towards the long-term vision that would unblock you?

It feels like playing a book adventure and after each chapter you have to decide,
which chapter to continue reading until you find the treasure... :-)

@ferenc-hechler
Copy link
Contributor Author

ferenc-hechler commented Sep 17, 2024

Just tested loading a ColumnTransformer containing CustomTransformers and adding it to a pipeline and then train a Linear-Regression model including the transformation:

col_trans = bpd.get_global_session().read_gbq_model(transformer_name)
model = LinearRegression(fit_intercept=False)
pipeline = Pipeline([
  ('preproc', col_trans),
  ('linreg', model)
])
pipeline.fit(X_train, y_train)
pipeline.to_gbq(full_model_name, replace=True)

That worked without any problems.
Here is the final SQL generated from the pipeline.fit() step:

CREATE OR REPLACE MODEL `de0360-pkce-spoke2-al-dev0`.`_4159a931697fdd40d66d89a5a31999be23c1295f`.`88a63fa5508c41688499916e032e68d9`
TRANSFORM(
  address /*CT.IDENT()*/ AS ident_address,
  CASE WHEN address IS NULL THEN 99 ELSE LENGTH(address) END /*CT.LEN2([99])*/ AS len2_address,
  store_location /*CT.IDENT()*/ AS ident_store_location,
  ML.STANDARD_SCALER(sale_dollars) OVER() AS standard_scaled_sale_dollars,
  ML.STANDARD_SCALER(volume_sold_gallons) OVER() AS standard_scaled_volume_sold_gallons,
  CASE WHEN store_location IS NULL THEN -2 ELSE LENGTH(store_location) END /*CT.LEN1()*/ AS len1_store_location,
  ML.LABEL_ENCODER(county, 1000000, 0) OVER() AS labelencoded_county,
  ML.LABEL_ENCODER(category_name, 1000000, 0) OVER() AS labelencoded_category_name,
  pack /*CT.IDENT()*/ AS ident_pack,
  sale_dollars)
OPTIONS(
  model_type="LINEAR_REG",
  data_split_method="NO_SPLIT",
  optimize_strategy="auto_strategy",
  fit_intercept=False,
  l2_reg=0.0,
  max_iterations=20,
  learn_rate_strategy="line_search",
  min_rel_progress=0.01,
  calculate_p_values=False,
  enable_global_explain=False,
  INPUT_LABEL_COLS=["sale_dollars"])
AS SELECT
  t0.`col_9` AS `address`,
  t0.`col_10` AS `pack`,
  t0.`col_11` AS `store_location`,
  t0.`col_12` AS `county`,
  t0.`col_13` AS `category_name`,
  t0.`col_14` AS `volume_sold_gallons`,
  t0.`col_16` AS `sale_dollars`
FROM (
  SELECT
    t1.`col_9`,
    t1.`col_10`,
    t1.`col_11`,
    t1.`col_12`,
    t1.`col_13`,
    t1.`col_14`,
    t1.`col_16`
  FROM `de0360-pkce-spoke2-al-dev0`._4159a931697fdd40d66d89a5a31999be23c1295f.bqdf20240917_sessionc82c96_bd13a2e0379c47b796f26ff3b554ed74 AS t1
) AS t0

Loading the stored model also works fine:

predict_pipeline = bpd.get_global_session().read_gbq_model(full_model_name)
print(predict_pipeline)
Pipeline(steps=[('transform',
                 ColumnTransformer(transformers=[('identity_transformer',
                                                  IdentityTransformer(),
                                                  'pack'),
                                                 ('identity_transformer',
                                                  IdentityTransformer(),
                                                  'address'),
                                                 ('identity_transformer',
                                                  IdentityTransformer(),
                                                  'store_location'),
                                                 ('standard_scaler',
                                                  StandardScaler(),
                                                  'volume_sold_gallons'),
                                                 ('standard_scaler',
                                                  StandardScaler(),
                                                  'sale_dollars'),
                                                 ('length1_transformer',
                                                  Length1Transformer(default_value=-2),
                                                  'store_location'),
                                                 ('length2_transformer',
                                                  Length2Transformer(default_value=99),
                                                  'address'),
                                                 ('label_encoder',
                                                  LabelEncoder(max_categories=1000001,
                                                               min_frequency=0),
                                                  'category_name'),
                                                 ('label_encoder',
                                                  LabelEncoder(max_categories=1000001,
                                                               min_frequency=0),
                                                  'county')])),
                ('estimator',
                 LinearRegression(fit_intercept=False, l1_reg=0.0,
                                  learning_rate_strategy='LINE_SEARCH',
                                  ls_init_learning_rate=0.1,
                                  optimize_strategy='BATCH_GRADIENT_DESCENT'))])

@tswast
Copy link
Collaborator

tswast commented Sep 17, 2024

That worked without any problems.

Excellent!

[...] how important is it that one can recover the original Pipeline after training?
That is to say, if it were possible to use the trained model for predictions without being able
to inspect exactly what model / transformations were applied from Python,
would that be acceptable?

That one was harder to answer. Let´s formulate it this way:
It would not be a show-stopper for us, but we are loosing transparency.

This feature (using SQL comments to embed learned parameters) remains my main concern / hesitation with merging this PR. It's very cool, but I worry about committing bigframes to supporting this feature long-term.

I propose we offer a subset of what you've written here with the following:

my_transformer = SQLScalarColumnTransformer(
    "CASE WHEN {0} IS NULL THEN -1 ELSE LENGTH({0}) END",
    columns=["colname"],
    dtype="int64",  # Omit this? Not needed with current bigframes.ml expressions, but required in bigframes.pandas.*.
)

With such a feature, you won't get the fully nested set of transformers when restoring a Pipeline, but perhaps you could still embed such comments in the SQL with the library of transformers you provide to your downstream users for internal transparency?

bigframes/ml/compose.py Outdated Show resolved Hide resolved
bigframes/ml/compose.py Outdated Show resolved Hide resolved
@tswast tswast added the owlbot:run Add this label to trigger the Owlbot post processor. label Sep 24, 2024
@gcf-owl-bot gcf-owl-bot bot removed the owlbot:run Add this label to trigger the Owlbot post processor. label Sep 24, 2024
@tswast tswast added kokoro:force-run Add this label to force Kokoro to re-run the tests. owlbot:run Add this label to trigger the Owlbot post processor. labels Sep 24, 2024
@gcf-owl-bot gcf-owl-bot bot removed the owlbot:run Add this label to trigger the Owlbot post processor. label Sep 24, 2024
@bigframes-bot bigframes-bot removed the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Sep 24, 2024
Copy link
Collaborator

@tswast tswast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love it! One suggestion and then let's make sure the e2e tests pass before merging.

bigframes/ml/compose.py Outdated Show resolved Hide resolved
@tswast tswast added kokoro:force-run Add this label to force Kokoro to re-run the tests. owlbot:run Add this label to trigger the Owlbot post processor. labels Sep 25, 2024
@gcf-owl-bot gcf-owl-bot bot removed the owlbot:run Add this label to trigger the Owlbot post processor. label Sep 25, 2024
@bigframes-bot bigframes-bot removed the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Sep 25, 2024
Copy link
Collaborator

@tswast tswast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much for your contribution and patience!

Copy link
Collaborator

@tswast tswast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of small fixes needed for the test suite.

bigframes/ml/compose.py Outdated Show resolved Hide resolved
bigframes/ml/compose.py Outdated Show resolved Hide resolved
bigframes/ml/compose.py Outdated Show resolved Hide resolved
@tswast tswast added kokoro:run Add this label to force Kokoro to re-run the tests. owlbot:run Add this label to trigger the Owlbot post processor. labels Sep 25, 2024
@gcf-owl-bot gcf-owl-bot bot removed the owlbot:run Add this label to trigger the Owlbot post processor. label Sep 25, 2024
@bigframes-bot bigframes-bot removed the kokoro:run Add this label to force Kokoro to re-run the tests. label Sep 25, 2024
@tswast
Copy link
Collaborator

tswast commented Sep 25, 2024

Notebook failure FAILED notebooks/remote_functions/remote_function.ipynb:: - ResourceExhausted: 429 Too many operations are currently being executed, try again later. not caused by this PR and can be ignored.

@tswast tswast enabled auto-merge (squash) September 25, 2024 15:29
@tswast tswast merged commit 1930b4e into googleapis:main Sep 25, 2024
22 of 23 checks passed
@tswast
Copy link
Collaborator

tswast commented Sep 25, 2024

This will be included in the 1.20.0 release. Keep an eye on #1017

Thanks again for your help @ferenc-hechler !

@ferenc-hechler
Copy link
Contributor Author

@tswast thanks for all your support and reviews 🥇
It really helps us to have this solution implemented!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the googleapis/python-bigquery-dataframes API. size: l Pull request size is large.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add possibility to create Custom-Transformers in compose.ColumnTransformer
6 participants