diff --git a/index.html b/index.html index e27dc81..8c962c0 100644 --- a/index.html +++ b/index.html @@ -3,14 +3,14 @@ - + Lakehouse Engine Documentation - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + + + + + + + + + +
+
+ +

+lakehouse_engine.dq_processors.custom_expectations.expect_column_pair_a_to_be_not_equal_to_b

+ +

Expectation to check if column 'a' is not equal to column 'b'.

+
+ + + + + +
  1"""Expectation to check if column 'a' is not equal to column 'b'."""
+  2
+  3from typing import Any, Dict, Optional
+  4
+  5from great_expectations.core import ExpectationConfiguration
+  6from great_expectations.execution_engine import ExecutionEngine, SparkDFExecutionEngine
+  7from great_expectations.expectations.expectation import ColumnPairMapExpectation
+  8from great_expectations.expectations.metrics.map_metric_provider import (
+  9    ColumnPairMapMetricProvider,
+ 10    column_pair_condition_partial,
+ 11)
+ 12
+ 13from lakehouse_engine.utils.expectations_utils import validate_result
+ 14
+ 15
+ 16class ColumnPairCustom(ColumnPairMapMetricProvider):
+ 17    """Asserts that column 'A' is not equal to column 'B'.
+ 18
+ 19    Additionally, It compares Null as well.
+ 20    """
+ 21
+ 22    condition_metric_name = "column_pair_values.a_not_equal_to_b"
+ 23    condition_domain_keys = (
+ 24        "batch_id",
+ 25        "table",
+ 26        "column_A",
+ 27        "column_B",
+ 28        "ignore_row_if",
+ 29    )
+ 30    condition_value_keys = ()
+ 31
+ 32    @column_pair_condition_partial(engine=SparkDFExecutionEngine)
+ 33    def _spark(
+ 34        self: ColumnPairMapMetricProvider,
+ 35        column_A: Any,
+ 36        column_B: Any,
+ 37        **kwargs: dict,
+ 38    ) -> Any:
+ 39        """Implementation of the expectation's logic.
+ 40
+ 41        Args:
+ 42            column_A: Value of the row of column_A.
+ 43            column_B: Value of the row of column_B.
+ 44            kwargs: dict with additional parameters.
+ 45
+ 46        Returns:
+ 47            If the condition is met.
+ 48        """
+ 49        return ((column_A.isNotNull()) | (column_B.isNotNull())) & (
+ 50            column_A != column_B
+ 51        )  # noqa: E501
+ 52
+ 53
+ 54class ExpectColumnPairAToBeNotEqualToB(ColumnPairMapExpectation):
+ 55    """Expect values in column A to be not equal to column B.
+ 56
+ 57    Args:
+ 58        column_A: The first column name.
+ 59        column_B: The second column name.
+ 60
+ 61    Keyword Args:
+ 62        - allow_cross_type_comparisons: If True, allow
+ 63            comparisons between types (e.g. integer and string).
+ 64            Otherwise, attempting such comparisons will raise an exception.
+ 65        - ignore_row_if: "both_values_are_missing",
+ 66            "either_value_is_missing", "neither" (default).
+ 67        - result_format: Which output mode to use:
+ 68            `BOOLEAN_ONLY`, `BASIC` (default), `COMPLETE`, or `SUMMARY`.
+ 69        - include_config: If True (default), then include the expectation config
+ 70            as part of the result object.
+ 71        - catch_exceptions: If True, then catch exceptions and
+ 72            include them as part of the result object. Default: False.
+ 73        - meta: A JSON-serializable dictionary (nesting allowed)
+ 74            that will be included in the output without modification.
+ 75
+ 76    Returns:
+ 77        An ExpectationSuiteValidationResult.
+ 78    """
+ 79
+ 80    examples = [
+ 81        {
+ 82            "dataset_name": "Test Dataset",
+ 83            "data": [
+ 84                {
+ 85                    "data": {
+ 86                        "a": ["IE4019", "IM6092", "IE1405"],
+ 87                        "b": ["IE4019", "IM6092", "IE1405"],
+ 88                        "c": ["IE1404", "IN6192", "842075"],
+ 89                    },
+ 90                    "schemas": {
+ 91                        "spark": {
+ 92                            "a": "StringType",
+ 93                            "b": "StringType",
+ 94                            "c": "StringType",
+ 95                        }
+ 96                    },
+ 97                }
+ 98            ],
+ 99            "tests": [
+100                {
+101                    "title": "negative_test",
+102                    "exact_match_out": False,
+103                    "include_in_gallery": True,
+104                    "in": {
+105                        "column_A": "a",
+106                        "column_B": "b",
+107                        "result_format": {
+108                            "result_format": "COMPLETE",
+109                            "unexpected_index_column_names": ["b"],
+110                        },
+111                    },
+112                    "out": {
+113                        "success": False,
+114                        "unexpected_index_list": [
+115                            {"b": "IE4019", "a": "IE4019"},
+116                            {"b": "IM6092", "a": "IM6092"},
+117                            {"b": "IE1405", "a": "IE1405"},
+118                        ],
+119                    },
+120                },
+121                {
+122                    "title": "positive_test",
+123                    "exact_match_out": False,
+124                    "include_in_gallery": True,
+125                    "in": {
+126                        "column_A": "a",
+127                        "column_B": "c",
+128                        "result_format": {
+129                            "result_format": "COMPLETE",
+130                            "unexpected_index_column_names": ["a"],
+131                        },
+132                    },
+133                    "out": {
+134                        "success": True,
+135                        "unexpected_index_list": [],
+136                    },
+137                },
+138            ],
+139        },
+140    ]
+141
+142    map_metric = "column_pair_values.a_not_equal_to_b"
+143    success_keys = (
+144        "column_A",
+145        "column_B",
+146        "ignore_row_if",
+147        "mostly",
+148    )
+149    default_kwarg_values = {
+150        "mostly": 1.0,
+151        "ignore_row_if": "neither",
+152        "result_format": "BASIC",
+153        "include_config": True,
+154        "catch_exceptions": False,
+155    }
+156
+157    def _validate(
+158        self,
+159        configuration: ExpectationConfiguration,
+160        metrics: Dict,
+161        runtime_configuration: Optional[dict] = None,
+162        execution_engine: Optional[ExecutionEngine] = None,
+163    ) -> Any:
+164        """Custom implementation of the GE _validate method.
+165
+166        This method is used on the tests to validate both the result
+167        of the tests themselves and if the unexpected index list
+168        is correctly generated.
+169        The GE test logic does not do this validation, and thus
+170        we need to make it manually.
+171
+172        Args:
+173            configuration: Configuration used in the test.
+174            metrics: Test result metrics.
+175            runtime_configuration: Configuration used when running the expectation.
+176            execution_engine: Execution Engine where the expectation was run.
+177
+178        Returns:
+179            Dictionary with the result of the validation.
+180        """
+181        return validate_result(
+182            self,
+183            configuration,
+184            metrics,
+185            runtime_configuration,
+186            execution_engine,
+187            ColumnPairMapExpectation,
+188        )
+189
+190
+191"""Mandatory block of code. If it is removed the expectation will not be available."""
+192if __name__ == "__main__":
+193    # test the custom expectation with the function `print_diagnostic_checklist()`
+194    ExpectColumnPairAToBeNotEqualToB().print_diagnostic_checklist()
+
+ + +
+
+ +
+ + class + ColumnPairCustom(great_expectations.expectations.metrics.map_metric_provider.column_pair_map_metric_provider.ColumnPairMapMetricProvider): + + + +
+ +
17class ColumnPairCustom(ColumnPairMapMetricProvider):
+18    """Asserts that column 'A' is not equal to column 'B'.
+19
+20    Additionally, It compares Null as well.
+21    """
+22
+23    condition_metric_name = "column_pair_values.a_not_equal_to_b"
+24    condition_domain_keys = (
+25        "batch_id",
+26        "table",
+27        "column_A",
+28        "column_B",
+29        "ignore_row_if",
+30    )
+31    condition_value_keys = ()
+32
+33    @column_pair_condition_partial(engine=SparkDFExecutionEngine)
+34    def _spark(
+35        self: ColumnPairMapMetricProvider,
+36        column_A: Any,
+37        column_B: Any,
+38        **kwargs: dict,
+39    ) -> Any:
+40        """Implementation of the expectation's logic.
+41
+42        Args:
+43            column_A: Value of the row of column_A.
+44            column_B: Value of the row of column_B.
+45            kwargs: dict with additional parameters.
+46
+47        Returns:
+48            If the condition is met.
+49        """
+50        return ((column_A.isNotNull()) | (column_B.isNotNull())) & (
+51            column_A != column_B
+52        )  # noqa: E501
+
+ + +

Asserts that column 'A' is not equal to column 'B'.

+ +

Additionally, It compares Null as well.

+
+ + +
+
+ condition_metric_name = +'column_pair_values.a_not_equal_to_b' + + +
+ + + + +
+
+
+ condition_domain_keys = +('batch_id', 'table', 'column_A', 'column_B', 'ignore_row_if') + + +
+ + + + +
+
+
+ condition_value_keys = +() + + +
+ + + + +
+
+
Inherited Members
+
+
great_expectations.expectations.metrics.map_metric_provider.column_pair_map_metric_provider.ColumnPairMapMetricProvider
+
function_domain_keys
+
function_value_keys
+ +
+
great_expectations.expectations.metrics.map_metric_provider.map_metric_provider.MapMetricProvider
+
filter_column_isnull
+
is_sqlalchemy_metric_selectable
+ +
+
great_expectations.expectations.metrics.metric_provider.MetricProvider
+
domain_keys
+
value_keys
+
default_kwarg_values
+
get_evaluation_dependencies
+ +
+
+
+
+
+ +
+ + class + ExpectColumnPairAToBeNotEqualToB(great_expectations.expectations.expectation.ColumnPairMapExpectation): + + + +
+ +
 55class ExpectColumnPairAToBeNotEqualToB(ColumnPairMapExpectation):
+ 56    """Expect values in column A to be not equal to column B.
+ 57
+ 58    Args:
+ 59        column_A: The first column name.
+ 60        column_B: The second column name.
+ 61
+ 62    Keyword Args:
+ 63        - allow_cross_type_comparisons: If True, allow
+ 64            comparisons between types (e.g. integer and string).
+ 65            Otherwise, attempting such comparisons will raise an exception.
+ 66        - ignore_row_if: "both_values_are_missing",
+ 67            "either_value_is_missing", "neither" (default).
+ 68        - result_format: Which output mode to use:
+ 69            `BOOLEAN_ONLY`, `BASIC` (default), `COMPLETE`, or `SUMMARY`.
+ 70        - include_config: If True (default), then include the expectation config
+ 71            as part of the result object.
+ 72        - catch_exceptions: If True, then catch exceptions and
+ 73            include them as part of the result object. Default: False.
+ 74        - meta: A JSON-serializable dictionary (nesting allowed)
+ 75            that will be included in the output without modification.
+ 76
+ 77    Returns:
+ 78        An ExpectationSuiteValidationResult.
+ 79    """
+ 80
+ 81    examples = [
+ 82        {
+ 83            "dataset_name": "Test Dataset",
+ 84            "data": [
+ 85                {
+ 86                    "data": {
+ 87                        "a": ["IE4019", "IM6092", "IE1405"],
+ 88                        "b": ["IE4019", "IM6092", "IE1405"],
+ 89                        "c": ["IE1404", "IN6192", "842075"],
+ 90                    },
+ 91                    "schemas": {
+ 92                        "spark": {
+ 93                            "a": "StringType",
+ 94                            "b": "StringType",
+ 95                            "c": "StringType",
+ 96                        }
+ 97                    },
+ 98                }
+ 99            ],
+100            "tests": [
+101                {
+102                    "title": "negative_test",
+103                    "exact_match_out": False,
+104                    "include_in_gallery": True,
+105                    "in": {
+106                        "column_A": "a",
+107                        "column_B": "b",
+108                        "result_format": {
+109                            "result_format": "COMPLETE",
+110                            "unexpected_index_column_names": ["b"],
+111                        },
+112                    },
+113                    "out": {
+114                        "success": False,
+115                        "unexpected_index_list": [
+116                            {"b": "IE4019", "a": "IE4019"},
+117                            {"b": "IM6092", "a": "IM6092"},
+118                            {"b": "IE1405", "a": "IE1405"},
+119                        ],
+120                    },
+121                },
+122                {
+123                    "title": "positive_test",
+124                    "exact_match_out": False,
+125                    "include_in_gallery": True,
+126                    "in": {
+127                        "column_A": "a",
+128                        "column_B": "c",
+129                        "result_format": {
+130                            "result_format": "COMPLETE",
+131                            "unexpected_index_column_names": ["a"],
+132                        },
+133                    },
+134                    "out": {
+135                        "success": True,
+136                        "unexpected_index_list": [],
+137                    },
+138                },
+139            ],
+140        },
+141    ]
+142
+143    map_metric = "column_pair_values.a_not_equal_to_b"
+144    success_keys = (
+145        "column_A",
+146        "column_B",
+147        "ignore_row_if",
+148        "mostly",
+149    )
+150    default_kwarg_values = {
+151        "mostly": 1.0,
+152        "ignore_row_if": "neither",
+153        "result_format": "BASIC",
+154        "include_config": True,
+155        "catch_exceptions": False,
+156    }
+157
+158    def _validate(
+159        self,
+160        configuration: ExpectationConfiguration,
+161        metrics: Dict,
+162        runtime_configuration: Optional[dict] = None,
+163        execution_engine: Optional[ExecutionEngine] = None,
+164    ) -> Any:
+165        """Custom implementation of the GE _validate method.
+166
+167        This method is used on the tests to validate both the result
+168        of the tests themselves and if the unexpected index list
+169        is correctly generated.
+170        The GE test logic does not do this validation, and thus
+171        we need to make it manually.
+172
+173        Args:
+174            configuration: Configuration used in the test.
+175            metrics: Test result metrics.
+176            runtime_configuration: Configuration used when running the expectation.
+177            execution_engine: Execution Engine where the expectation was run.
+178
+179        Returns:
+180            Dictionary with the result of the validation.
+181        """
+182        return validate_result(
+183            self,
+184            configuration,
+185            metrics,
+186            runtime_configuration,
+187            execution_engine,
+188            ColumnPairMapExpectation,
+189        )
+
+ + +

Expect values in column A to be not equal to column B.

+ +
Arguments:
+ +
    +
  • column_A: The first column name.
  • +
  • column_B: The second column name.
  • +
+ +
Keyword Args:
+ +
+
    +
  • allow_cross_type_comparisons: If True, allow + comparisons between types (e.g. integer and string). + Otherwise, attempting such comparisons will raise an exception.
  • +
  • ignore_row_if: "both_values_are_missing", + "either_value_is_missing", "neither" (default).
  • +
  • result_format: Which output mode to use: + BOOLEAN_ONLY, BASIC (default), COMPLETE, or SUMMARY.
  • +
  • include_config: If True (default), then include the expectation config + as part of the result object.
  • +
  • catch_exceptions: If True, then catch exceptions and + include them as part of the result object. Default: False.
  • +
  • meta: A JSON-serializable dictionary (nesting allowed) + that will be included in the output without modification.
  • +
+
+ +
Returns:
+ +
+

An ExpectationSuiteValidationResult.

+
+
+ + +
+
+ examples = + + [{'dataset_name': 'Test Dataset', 'data': [{'data': {'a': ['IE4019', 'IM6092', 'IE1405'], 'b': ['IE4019', 'IM6092', 'IE1405'], 'c': ['IE1404', 'IN6192', '842075']}, 'schemas': {'spark': {'a': 'StringType', 'b': 'StringType', 'c': 'StringType'}}}], 'tests': [{'title': 'negative_test', 'exact_match_out': False, 'include_in_gallery': True, 'in': {'column_A': 'a', 'column_B': 'b', 'result_format': {'result_format': 'COMPLETE', 'unexpected_index_column_names': ['b']}}, 'out': {'success': False, 'unexpected_index_list': [{'b': 'IE4019', 'a': 'IE4019'}, {'b': 'IM6092', 'a': 'IM6092'}, {'b': 'IE1405', 'a': 'IE1405'}]}}, {'title': 'positive_test', 'exact_match_out': False, 'include_in_gallery': True, 'in': {'column_A': 'a', 'column_B': 'c', 'result_format': {'result_format': 'COMPLETE', 'unexpected_index_column_names': ['a']}}, 'out': {'success': True, 'unexpected_index_list': []}}]}] + + +
+ + + + +
+
+
+ map_metric = +'column_pair_values.a_not_equal_to_b' + + +
+ + + + +
+
+
+ success_keys = +('column_A', 'column_B', 'ignore_row_if', 'mostly') + + +
+ + + + +
+
+
+ default_kwarg_values = + + {'include_config': True, 'catch_exceptions': False, 'result_format': 'BASIC', 'row_condition': None, 'condition_parser': None, 'mostly': 1.0, 'ignore_row_if': 'neither'} + + +
+ + + + +
+
+
+ expectation_type: ClassVar[str] = +'expect_column_pair_a_to_be_not_equal_to_b' + + +
+ + + + +
+
+
Inherited Members
+
+
great_expectations.expectations.expectation.Expectation
+
Expectation
+
version
+
runtime_keys
+
get_allowed_config_keys
+
metrics_validate
+
get_domain_kwargs
+
get_success_kwargs
+
get_runtime_kwargs
+
get_result_format
+
validate
+
configuration
+
run_diagnostics
+
print_diagnostic_checklist
+
is_expectation_self_initializing
+
is_expectation_auto_initializing
+ +
+
great_expectations.expectations.expectation.ColumnPairMapExpectation
+
domain_keys
+
domain_type
+
is_abstract
+
validate_configuration
+
get_validation_dependencies
+ +
+
great_expectations.expectations.expectation.BatchExpectation
+
metric_dependencies
+
args_keys
+
validate_metric_value_between_configuration
+ +
+
+
+
+
+ + \ No newline at end of file diff --git a/lakehouse_engine/dq_processors/custom_expectations/expect_column_pair_a_to_be_smaller_or_equal_than_b.html b/lakehouse_engine/dq_processors/custom_expectations/expect_column_pair_a_to_be_smaller_or_equal_than_b.html index 935e640..97553bc 100644 --- a/lakehouse_engine/dq_processors/custom_expectations/expect_column_pair_a_to_be_smaller_or_equal_than_b.html +++ b/lakehouse_engine/dq_processors/custom_expectations/expect_column_pair_a_to_be_smaller_or_equal_than_b.html @@ -3,14 +3,14 @@ - + lakehouse_engine.dq_processors.custom_expectations.expect_column_pair_a_to_be_smaller_or_equal_than_b - + + + + + + + + + +
+
+ +

+lakehouse_engine.dq_processors.custom_expectations.expect_column_pair_date_a_to_be_greater_than_or_equal_to_date_b

+ +

Expectation to check if date column 'a' is greater or equal to date column 'b'.

+
+ + + + + +
  1"""Expectation to check if date column 'a' is greater or equal to date column 'b'."""
+  2
+  3import datetime
+  4from typing import Any, Dict, Optional
+  5
+  6from great_expectations.core import ExpectationConfiguration
+  7from great_expectations.execution_engine import ExecutionEngine, SparkDFExecutionEngine
+  8from great_expectations.expectations.expectation import ColumnPairMapExpectation
+  9from great_expectations.expectations.metrics.map_metric_provider import (
+ 10    ColumnPairMapMetricProvider,
+ 11    column_pair_condition_partial,
+ 12)
+ 13
+ 14from lakehouse_engine.utils.expectations_utils import validate_result
+ 15
+ 16
+ 17# This class defines a Metric to support your Expectation
+ 18class ColumnPairDateAToBeGreaterOrEqualToDateB(ColumnPairMapMetricProvider):
+ 19    """Asserts that date column 'A' is greater or equal to date column 'B'."""
+ 20
+ 21    # This is the id string that will be used to refer your metric.
+ 22    condition_metric_name = "column_pair_values.date_a_greater_or_equal_to_date_b"
+ 23    condition_domain_keys = (
+ 24        "batch_id",
+ 25        "table",
+ 26        "column_A",
+ 27        "column_B",
+ 28        "ignore_row_if",
+ 29    )
+ 30
+ 31    @column_pair_condition_partial(engine=SparkDFExecutionEngine)
+ 32    def _spark(
+ 33        self: ColumnPairMapMetricProvider,
+ 34        column_A: Any,
+ 35        column_B: Any,
+ 36        **kwargs: dict,
+ 37    ) -> Any:
+ 38        """Implementation of the expectation's logic.
+ 39
+ 40        Args:
+ 41            column_A: Value of the row of column_A.
+ 42            column_B: Value of the row of column_B.
+ 43            kwargs: dict with additional parameters.
+ 44
+ 45        Returns:
+ 46            Boolean on the basis of condition.
+ 47        """
+ 48        return (
+ 49            (column_A.isNotNull()) & (column_B.isNotNull()) & (column_A >= column_B)
+ 50        )  # type: ignore
+ 51
+ 52
+ 53class ExpectColumnPairDateAToBeGreaterThanOrEqualToDateB(ColumnPairMapExpectation):
+ 54    """Expect values in date column A to be greater than or equal to date column B.
+ 55
+ 56    Args:
+ 57        column_A: The first date column name.
+ 58        column_B: The second date column name.
+ 59
+ 60    Keyword Args:
+ 61        - ignore_row_if: "both_values_are_missing",
+ 62            "either_value_is_missing", "neither" (default).
+ 63        - result_format: Which output mode to use:
+ 64            `BOOLEAN_ONLY`, `BASIC` (default), `COMPLETE`, or `SUMMARY`.
+ 65        - include_config: If True (default), then include the
+ 66            expectation config as part of the result object.
+ 67        - catch_exceptions: If True, then catch exceptions and
+ 68            include them as part of the result object. Default: False.
+ 69        - meta: A JSON-serializable dictionary (nesting allowed)
+ 70            that will be included in the output without modification.
+ 71
+ 72    Returns:
+ 73        An ExpectationSuiteValidationResult.
+ 74    """
+ 75
+ 76    examples = [
+ 77        {
+ 78            "dataset_name": "Test Dataset",
+ 79            "data": [
+ 80                {
+ 81                    "data": {
+ 82                        "a": [
+ 83                            "2029-01-12",
+ 84                            "2024-11-21",
+ 85                            "2022-01-01",
+ 86                        ],
+ 87                        "b": [
+ 88                            "2019-02-11",
+ 89                            "2014-12-22",
+ 90                            "2012-09-09",
+ 91                        ],
+ 92                        "c": [
+ 93                            "2010-02-11",
+ 94                            "2015-12-22",
+ 95                            "2022-09-09",
+ 96                        ],
+ 97                    },
+ 98                    "schemas": {
+ 99                        "spark": {
+100                            "a": "DateType",
+101                            "b": "DateType",
+102                            "c": "DateType",
+103                        }
+104                    },
+105                }
+106            ],
+107            "tests": [
+108                {
+109                    "title": "positive_test",
+110                    "exact_match_out": False,
+111                    "include_in_gallery": True,
+112                    "in": {
+113                        "column_A": "a",
+114                        "column_B": "b",
+115                        "result_format": {
+116                            "result_format": "COMPLETE",
+117                            "unexpected_index_column_names": ["a", "b"],
+118                        },
+119                    },
+120                    "out": {"success": True, "unexpected_index_list": []},
+121                },
+122                {
+123                    "title": "negative_test",
+124                    "exact_match_out": False,
+125                    "include_in_gallery": True,
+126                    "in": {
+127                        "column_A": "b",
+128                        "column_B": "c",
+129                        "result_format": {
+130                            "result_format": "COMPLETE",
+131                            "unexpected_index_column_names": ["a"],
+132                        },
+133                    },
+134                    "out": {
+135                        "success": False,
+136                        "unexpected_index_list": [
+137                            {
+138                                "a": datetime.date(2024, 11, 21),
+139                                "b": datetime.date(2014, 12, 22),
+140                                "c": datetime.date(2015, 12, 22),
+141                            },
+142                            {
+143                                "a": datetime.date(2022, 1, 1),
+144                                "b": datetime.date(2012, 9, 9),
+145                                "c": datetime.date(2022, 9, 9),
+146                            },
+147                        ],
+148                    },
+149                },
+150            ],
+151        }
+152    ]
+153
+154    map_metric = "column_pair_values.date_a_greater_or_equal_to_date_b"
+155    success_keys = (
+156        "column_A",
+157        "column_B",
+158        "ignore_row_if",
+159        "mostly",
+160    )
+161    default_kwarg_values = {
+162        "mostly": 1.0,
+163        "ignore_row_if": "neither",
+164        "result_format": "BASIC",
+165        "include_config": True,
+166        "catch_exceptions": True,
+167    }
+168
+169    def _validate(
+170        self,
+171        configuration: ExpectationConfiguration,
+172        metrics: Dict,
+173        runtime_configuration: Optional[dict] = None,
+174        execution_engine: Optional[ExecutionEngine] = None,
+175    ) -> Any:
+176        """Custom implementation of the GE _validate method.
+177
+178        This method is used on the tests to validate both the result
+179        of the tests themselves and if the unexpected index list
+180        is correctly generated.
+181        The GE test logic does not do this validation, and thus
+182        we need to make it manually.
+183
+184        Args:
+185            configuration: Configuration used in the test.
+186            metrics: Test result metrics.
+187            runtime_configuration: Configuration used when running the expectation.
+188            execution_engine: Execution Engine where the expectation was run.
+189
+190        Returns:
+191            Dictionary with the result of the validation.
+192        """
+193        return validate_result(
+194            self,
+195            configuration,
+196            metrics,
+197            runtime_configuration,
+198            execution_engine,
+199            ColumnPairMapExpectation,
+200        )
+201
+202
+203"""Mandatory block of code. If it is removed the expectation will not be available."""
+204if __name__ == "__main__":
+205    # test the custom expectation with the function `print_diagnostic_checklist()`
+206    ExpectColumnPairDateAToBeGreaterThanOrEqualToDateB().print_diagnostic_checklist()
+
+ + +
+
+ +
+ + class + ColumnPairDateAToBeGreaterOrEqualToDateB(great_expectations.expectations.metrics.map_metric_provider.column_pair_map_metric_provider.ColumnPairMapMetricProvider): + + + +
+ +
19class ColumnPairDateAToBeGreaterOrEqualToDateB(ColumnPairMapMetricProvider):
+20    """Asserts that date column 'A' is greater or equal to date column 'B'."""
+21
+22    # This is the id string that will be used to refer your metric.
+23    condition_metric_name = "column_pair_values.date_a_greater_or_equal_to_date_b"
+24    condition_domain_keys = (
+25        "batch_id",
+26        "table",
+27        "column_A",
+28        "column_B",
+29        "ignore_row_if",
+30    )
+31
+32    @column_pair_condition_partial(engine=SparkDFExecutionEngine)
+33    def _spark(
+34        self: ColumnPairMapMetricProvider,
+35        column_A: Any,
+36        column_B: Any,
+37        **kwargs: dict,
+38    ) -> Any:
+39        """Implementation of the expectation's logic.
+40
+41        Args:
+42            column_A: Value of the row of column_A.
+43            column_B: Value of the row of column_B.
+44            kwargs: dict with additional parameters.
+45
+46        Returns:
+47            Boolean on the basis of condition.
+48        """
+49        return (
+50            (column_A.isNotNull()) & (column_B.isNotNull()) & (column_A >= column_B)
+51        )  # type: ignore
+
+ + +

Asserts that date column 'A' is greater or equal to date column 'B'.

+
+ + +
+
+ condition_metric_name = +'column_pair_values.date_a_greater_or_equal_to_date_b' + + +
+ + + + +
+
+
+ condition_domain_keys = +('batch_id', 'table', 'column_A', 'column_B', 'ignore_row_if') + + +
+ + + + +
+
+
Inherited Members
+
+
great_expectations.expectations.metrics.map_metric_provider.column_pair_map_metric_provider.ColumnPairMapMetricProvider
+
function_domain_keys
+
condition_value_keys
+
function_value_keys
+ +
+
great_expectations.expectations.metrics.map_metric_provider.map_metric_provider.MapMetricProvider
+
filter_column_isnull
+
is_sqlalchemy_metric_selectable
+ +
+
great_expectations.expectations.metrics.metric_provider.MetricProvider
+
domain_keys
+
value_keys
+
default_kwarg_values
+
get_evaluation_dependencies
+ +
+
+
+
+
+ +
+ + class + ExpectColumnPairDateAToBeGreaterThanOrEqualToDateB(great_expectations.expectations.expectation.ColumnPairMapExpectation): + + + +
+ +
 54class ExpectColumnPairDateAToBeGreaterThanOrEqualToDateB(ColumnPairMapExpectation):
+ 55    """Expect values in date column A to be greater than or equal to date column B.
+ 56
+ 57    Args:
+ 58        column_A: The first date column name.
+ 59        column_B: The second date column name.
+ 60
+ 61    Keyword Args:
+ 62        - ignore_row_if: "both_values_are_missing",
+ 63            "either_value_is_missing", "neither" (default).
+ 64        - result_format: Which output mode to use:
+ 65            `BOOLEAN_ONLY`, `BASIC` (default), `COMPLETE`, or `SUMMARY`.
+ 66        - include_config: If True (default), then include the
+ 67            expectation config as part of the result object.
+ 68        - catch_exceptions: If True, then catch exceptions and
+ 69            include them as part of the result object. Default: False.
+ 70        - meta: A JSON-serializable dictionary (nesting allowed)
+ 71            that will be included in the output without modification.
+ 72
+ 73    Returns:
+ 74        An ExpectationSuiteValidationResult.
+ 75    """
+ 76
+ 77    examples = [
+ 78        {
+ 79            "dataset_name": "Test Dataset",
+ 80            "data": [
+ 81                {
+ 82                    "data": {
+ 83                        "a": [
+ 84                            "2029-01-12",
+ 85                            "2024-11-21",
+ 86                            "2022-01-01",
+ 87                        ],
+ 88                        "b": [
+ 89                            "2019-02-11",
+ 90                            "2014-12-22",
+ 91                            "2012-09-09",
+ 92                        ],
+ 93                        "c": [
+ 94                            "2010-02-11",
+ 95                            "2015-12-22",
+ 96                            "2022-09-09",
+ 97                        ],
+ 98                    },
+ 99                    "schemas": {
+100                        "spark": {
+101                            "a": "DateType",
+102                            "b": "DateType",
+103                            "c": "DateType",
+104                        }
+105                    },
+106                }
+107            ],
+108            "tests": [
+109                {
+110                    "title": "positive_test",
+111                    "exact_match_out": False,
+112                    "include_in_gallery": True,
+113                    "in": {
+114                        "column_A": "a",
+115                        "column_B": "b",
+116                        "result_format": {
+117                            "result_format": "COMPLETE",
+118                            "unexpected_index_column_names": ["a", "b"],
+119                        },
+120                    },
+121                    "out": {"success": True, "unexpected_index_list": []},
+122                },
+123                {
+124                    "title": "negative_test",
+125                    "exact_match_out": False,
+126                    "include_in_gallery": True,
+127                    "in": {
+128                        "column_A": "b",
+129                        "column_B": "c",
+130                        "result_format": {
+131                            "result_format": "COMPLETE",
+132                            "unexpected_index_column_names": ["a"],
+133                        },
+134                    },
+135                    "out": {
+136                        "success": False,
+137                        "unexpected_index_list": [
+138                            {
+139                                "a": datetime.date(2024, 11, 21),
+140                                "b": datetime.date(2014, 12, 22),
+141                                "c": datetime.date(2015, 12, 22),
+142                            },
+143                            {
+144                                "a": datetime.date(2022, 1, 1),
+145                                "b": datetime.date(2012, 9, 9),
+146                                "c": datetime.date(2022, 9, 9),
+147                            },
+148                        ],
+149                    },
+150                },
+151            ],
+152        }
+153    ]
+154
+155    map_metric = "column_pair_values.date_a_greater_or_equal_to_date_b"
+156    success_keys = (
+157        "column_A",
+158        "column_B",
+159        "ignore_row_if",
+160        "mostly",
+161    )
+162    default_kwarg_values = {
+163        "mostly": 1.0,
+164        "ignore_row_if": "neither",
+165        "result_format": "BASIC",
+166        "include_config": True,
+167        "catch_exceptions": True,
+168    }
+169
+170    def _validate(
+171        self,
+172        configuration: ExpectationConfiguration,
+173        metrics: Dict,
+174        runtime_configuration: Optional[dict] = None,
+175        execution_engine: Optional[ExecutionEngine] = None,
+176    ) -> Any:
+177        """Custom implementation of the GE _validate method.
+178
+179        This method is used on the tests to validate both the result
+180        of the tests themselves and if the unexpected index list
+181        is correctly generated.
+182        The GE test logic does not do this validation, and thus
+183        we need to make it manually.
+184
+185        Args:
+186            configuration: Configuration used in the test.
+187            metrics: Test result metrics.
+188            runtime_configuration: Configuration used when running the expectation.
+189            execution_engine: Execution Engine where the expectation was run.
+190
+191        Returns:
+192            Dictionary with the result of the validation.
+193        """
+194        return validate_result(
+195            self,
+196            configuration,
+197            metrics,
+198            runtime_configuration,
+199            execution_engine,
+200            ColumnPairMapExpectation,
+201        )
+
+ + +

Expect values in date column A to be greater than or equal to date column B.

+ +
Arguments:
+ +
    +
  • column_A: The first date column name.
  • +
  • column_B: The second date column name.
  • +
+ +
Keyword Args:
+ +
+
    +
  • ignore_row_if: "both_values_are_missing", + "either_value_is_missing", "neither" (default).
  • +
  • result_format: Which output mode to use: + BOOLEAN_ONLY, BASIC (default), COMPLETE, or SUMMARY.
  • +
  • include_config: If True (default), then include the + expectation config as part of the result object.
  • +
  • catch_exceptions: If True, then catch exceptions and + include them as part of the result object. Default: False.
  • +
  • meta: A JSON-serializable dictionary (nesting allowed) + that will be included in the output without modification.
  • +
+
+ +
Returns:
+ +
+

An ExpectationSuiteValidationResult.

+
+
+ + +
+
+ examples = + + [{'dataset_name': 'Test Dataset', 'data': [{'data': {'a': ['2029-01-12', '2024-11-21', '2022-01-01'], 'b': ['2019-02-11', '2014-12-22', '2012-09-09'], 'c': ['2010-02-11', '2015-12-22', '2022-09-09']}, 'schemas': {'spark': {'a': 'DateType', 'b': 'DateType', 'c': 'DateType'}}}], 'tests': [{'title': 'positive_test', 'exact_match_out': False, 'include_in_gallery': True, 'in': {'column_A': 'a', 'column_B': 'b', 'result_format': {'result_format': 'COMPLETE', 'unexpected_index_column_names': ['a', 'b']}}, 'out': {'success': True, 'unexpected_index_list': []}}, {'title': 'negative_test', 'exact_match_out': False, 'include_in_gallery': True, 'in': {'column_A': 'b', 'column_B': 'c', 'result_format': {'result_format': 'COMPLETE', 'unexpected_index_column_names': ['a']}}, 'out': {'success': False, 'unexpected_index_list': [{'a': datetime.date(2024, 11, 21), 'b': datetime.date(2014, 12, 22), 'c': datetime.date(2015, 12, 22)}, {'a': datetime.date(2022, 1, 1), 'b': datetime.date(2012, 9, 9), 'c': datetime.date(2022, 9, 9)}]}}]}] + + +
+ + + + +
+
+
+ map_metric = +'column_pair_values.date_a_greater_or_equal_to_date_b' + + +
+ + + + +
+
+
+ success_keys = +('column_A', 'column_B', 'ignore_row_if', 'mostly') + + +
+ + + + +
+
+
+ default_kwarg_values = + + {'include_config': True, 'catch_exceptions': True, 'result_format': 'BASIC', 'row_condition': None, 'condition_parser': None, 'mostly': 1.0, 'ignore_row_if': 'neither'} + + +
+ + + + +
+
+
+ expectation_type: ClassVar[str] = +'expect_column_pair_date_a_to_be_greater_than_or_equal_to_date_b' + + +
+ + + + +
+
+
Inherited Members
+
+
great_expectations.expectations.expectation.Expectation
+
Expectation
+
version
+
runtime_keys
+
get_allowed_config_keys
+
metrics_validate
+
get_domain_kwargs
+
get_success_kwargs
+
get_runtime_kwargs
+
get_result_format
+
validate
+
configuration
+
run_diagnostics
+
print_diagnostic_checklist
+
is_expectation_self_initializing
+
is_expectation_auto_initializing
+ +
+
great_expectations.expectations.expectation.ColumnPairMapExpectation
+
domain_keys
+
domain_type
+
is_abstract
+
validate_configuration
+
get_validation_dependencies
+ +
+
great_expectations.expectations.expectation.BatchExpectation
+
metric_dependencies
+
args_keys
+
validate_metric_value_between_configuration
+ +
+
+
+
+
+ + \ No newline at end of file diff --git a/lakehouse_engine/dq_processors/custom_expectations/expect_column_values_to_be_date_not_older_than.html b/lakehouse_engine/dq_processors/custom_expectations/expect_column_values_to_be_date_not_older_than.html index b06dcf9..a905c49 100644 --- a/lakehouse_engine/dq_processors/custom_expectations/expect_column_values_to_be_date_not_older_than.html +++ b/lakehouse_engine/dq_processors/custom_expectations/expect_column_values_to_be_date_not_older_than.html @@ -3,14 +3,14 @@ - + lakehouse_engine.dq_processors.custom_expectations.expect_column_values_to_be_date_not_older_than - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + + + + + + + + + +
+
+ +

+lakehouse_engine.io.writers.rest_api_writer

+ +

Module to define behaviour to write to REST APIs.

+
+ + + + + +
  1"""Module to define behaviour to write to REST APIs."""
+  2
+  3import json
+  4from typing import Any, Callable, OrderedDict
+  5
+  6from pyspark.sql import DataFrame, Row
+  7
+  8from lakehouse_engine.core.definitions import OutputSpec
+  9from lakehouse_engine.io.writer import Writer
+ 10from lakehouse_engine.utils.logging_handler import LoggingHandler
+ 11from lakehouse_engine.utils.rest_api import (
+ 12    RESTApiException,
+ 13    RestMethods,
+ 14    RestStatusCodes,
+ 15    execute_api_request,
+ 16)
+ 17
+ 18
+ 19class RestApiWriter(Writer):
+ 20    """Class to write data to a REST API."""
+ 21
+ 22    _logger = LoggingHandler(__name__).get_logger()
+ 23
+ 24    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
+ 25        """Construct RestApiWriter instances.
+ 26
+ 27        Args:
+ 28            output_spec: output specification.
+ 29            df: dataframe to be written.
+ 30            data: list of all dfs generated on previous steps before writer.
+ 31        """
+ 32        super().__init__(output_spec, df, data)
+ 33
+ 34    def write(self) -> None:
+ 35        """Write data to REST API."""
+ 36        if not self._df.isStreaming:
+ 37            self._write_to_rest_api_in_batch_mode(self._df, self._output_spec)
+ 38        else:
+ 39            self._write_to_rest_api_in_streaming_mode(
+ 40                self._df, self._output_spec, self._data
+ 41            )
+ 42
+ 43    @staticmethod
+ 44    def _get_func_to_send_payload_to_rest_api(output_spec: OutputSpec) -> Callable:
+ 45        """Define and return a function to send the payload to the REST api.
+ 46
+ 47        Args:
+ 48            output_spec: Output Specification containing configurations to
+ 49                communicate with the REST api. Within the output_spec, the user
+ 50                can specify several options:
+ 51                    - rest_api_header: http headers.
+ 52                    - rest_api_basic_auth: basic http authentication details
+ 53                        (e.g., {"username": "x", "password": "y"}).
+ 54                    - rest_api_url: url of the api.
+ 55                    - rest_api_method: REST method (e.g., POST or PUT).
+ 56                    - rest_api_sleep_seconds: sleep seconds to avoid throttling.
+ 57                    - rest_api_is_file_payload: if the payload to be sent to the
+ 58                        api is in the format of a file using multipart encoding
+ 59                        upload. if this is true, then the payload will always be
+ 60                        sent using the "files" parameter in Python's requests
+ 61                        library.
+ 62                    - rest_api_file_payload_name: when rest_api_is_file_payload
+ 63                        is true, this option can be used to define the file
+ 64                        identifier in Python's requests library.
+ 65                    - extra_json_payload: when rest_api_file_payload_name is False,
+ 66                        can be used to provide additional JSON variables to add to
+ 67                        the original payload. This is useful to complement
+ 68                        the original payload with some extra input to better
+ 69                        configure the final payload to send to the REST api. An
+ 70                        example can be to add a constant configuration value to
+ 71                        add to the payload data.
+ 72
+ 73        Returns:
+ 74            Function to be called inside Spark dataframe.foreach.
+ 75        """
+ 76        headers = output_spec.options.get("rest_api_header", None)
+ 77        basic_auth_dict = output_spec.options.get("rest_api_basic_auth", None)
+ 78        url = output_spec.options["rest_api_url"]
+ 79        method = output_spec.options.get("rest_api_method", RestMethods.POST.value)
+ 80        sleep_seconds = output_spec.options.get("rest_api_sleep_seconds", 0)
+ 81        is_file_payload = output_spec.options.get("rest_api_is_file_payload", False)
+ 82        file_payload_name = output_spec.options.get(
+ 83            "rest_api_file_payload_name", "file"
+ 84        )
+ 85        extra_json_payload = output_spec.options.get(
+ 86            "rest_api_extra_json_payload", None
+ 87        )
+ 88        success_status_codes = output_spec.options.get(
+ 89            "rest_api_success_status_codes", RestStatusCodes.OK_STATUS_CODES.value
+ 90        )
+ 91
+ 92        def send_payload_to_rest_api(row: Row) -> Any:
+ 93            """Send payload to the REST API.
+ 94
+ 95            The payload needs to be prepared as a JSON string column in a dataframe.
+ 96            E.g., {"a": "a value", "b": "b value"}.
+ 97
+ 98            Args:
+ 99                row: a row in a dataframe.
+100            """
+101            if "payload" not in row:
+102                raise ValueError("Input DataFrame must contain 'payload' column.")
+103
+104            str_payload = row.payload
+105
+106            payload = None
+107            if not is_file_payload:
+108                payload = json.loads(str_payload)
+109            else:
+110                payload = {file_payload_name: str_payload}
+111
+112            if extra_json_payload:
+113                payload.update(extra_json_payload)
+114
+115            RestApiWriter._logger.debug(f"Original payload: {str_payload}")
+116            RestApiWriter._logger.debug(f"Final payload: {payload}")
+117
+118            response = execute_api_request(
+119                method=method,
+120                url=url,
+121                headers=headers,
+122                basic_auth_dict=basic_auth_dict,
+123                json=payload if not is_file_payload else None,
+124                files=payload if is_file_payload else None,
+125                sleep_seconds=sleep_seconds,
+126            )
+127
+128            RestApiWriter._logger.debug(
+129                f"Response: {response.status_code} - {response.text}"
+130            )
+131
+132            if response.status_code not in success_status_codes:
+133                raise RESTApiException(
+134                    f"API response status code {response.status_code} is not in"
+135                    f" {success_status_codes}. Got {response.text}"
+136                )
+137
+138        return send_payload_to_rest_api
+139
+140    @staticmethod
+141    def _write_to_rest_api_in_batch_mode(
+142        df: DataFrame, output_spec: OutputSpec
+143    ) -> None:
+144        """Write to REST API in Spark batch mode.
+145
+146        This function uses the dataframe.foreach function to generate a payload
+147        for each row of the dataframe and send it to the REST API endpoint.
+148
+149        Warning! Make sure your execution environment supports RDD api operations,
+150        as there are environments where RDD operation may not be supported. As,
+151        df.foreach() is a shorthand for df.rdd.foreach(), this can bring issues
+152        in such environments.
+153
+154        Args:
+155            df: dataframe to write.
+156            output_spec: output specification.
+157        """
+158        df.foreach(RestApiWriter._get_func_to_send_payload_to_rest_api(output_spec))
+159
+160    @staticmethod
+161    def _write_to_rest_api_in_streaming_mode(
+162        df: DataFrame, output_spec: OutputSpec, data: OrderedDict
+163    ) -> None:
+164        """Write to REST API in streaming mode.
+165
+166        Args:
+167            df: dataframe to write.
+168            output_spec: output specification.
+169            data: list of all dfs generated on previous steps before writer.
+170        """
+171        df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec))
+172
+173        stream_df = (
+174            df_writer.options(**output_spec.options if output_spec.options else {})
+175            .foreachBatch(
+176                RestApiWriter._write_transformed_micro_batch(output_spec, data)
+177            )
+178            .start()
+179        )
+180
+181        if output_spec.streaming_await_termination:
+182            stream_df.awaitTermination(output_spec.streaming_await_termination_timeout)
+183
+184    @staticmethod
+185    def _write_transformed_micro_batch(  # type: ignore
+186        output_spec: OutputSpec, data: OrderedDict
+187    ) -> Callable:
+188        """Define how to write a streaming micro batch after transforming it.
+189
+190        Args:
+191            output_spec: output specification.
+192            data: list of all dfs generated on previous steps before writer.
+193
+194        Returns:
+195            A function to be executed in the foreachBatch spark write method.
+196        """
+197
+198        def inner(batch_df: DataFrame, batch_id: int) -> None:
+199            transformed_df = Writer.get_transformed_micro_batch(
+200                output_spec, batch_df, batch_id, data
+201            )
+202
+203            if output_spec.streaming_micro_batch_dq_processors:
+204                transformed_df = Writer.run_micro_batch_dq_process(
+205                    transformed_df, output_spec.streaming_micro_batch_dq_processors
+206                )
+207
+208            RestApiWriter._write_to_rest_api_in_batch_mode(transformed_df, output_spec)
+209
+210        return inner
+
+ + +
+
+ +
+ + class + RestApiWriter(lakehouse_engine.io.writer.Writer): + + + +
+ +
 20class RestApiWriter(Writer):
+ 21    """Class to write data to a REST API."""
+ 22
+ 23    _logger = LoggingHandler(__name__).get_logger()
+ 24
+ 25    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
+ 26        """Construct RestApiWriter instances.
+ 27
+ 28        Args:
+ 29            output_spec: output specification.
+ 30            df: dataframe to be written.
+ 31            data: list of all dfs generated on previous steps before writer.
+ 32        """
+ 33        super().__init__(output_spec, df, data)
+ 34
+ 35    def write(self) -> None:
+ 36        """Write data to REST API."""
+ 37        if not self._df.isStreaming:
+ 38            self._write_to_rest_api_in_batch_mode(self._df, self._output_spec)
+ 39        else:
+ 40            self._write_to_rest_api_in_streaming_mode(
+ 41                self._df, self._output_spec, self._data
+ 42            )
+ 43
+ 44    @staticmethod
+ 45    def _get_func_to_send_payload_to_rest_api(output_spec: OutputSpec) -> Callable:
+ 46        """Define and return a function to send the payload to the REST api.
+ 47
+ 48        Args:
+ 49            output_spec: Output Specification containing configurations to
+ 50                communicate with the REST api. Within the output_spec, the user
+ 51                can specify several options:
+ 52                    - rest_api_header: http headers.
+ 53                    - rest_api_basic_auth: basic http authentication details
+ 54                        (e.g., {"username": "x", "password": "y"}).
+ 55                    - rest_api_url: url of the api.
+ 56                    - rest_api_method: REST method (e.g., POST or PUT).
+ 57                    - rest_api_sleep_seconds: sleep seconds to avoid throttling.
+ 58                    - rest_api_is_file_payload: if the payload to be sent to the
+ 59                        api is in the format of a file using multipart encoding
+ 60                        upload. if this is true, then the payload will always be
+ 61                        sent using the "files" parameter in Python's requests
+ 62                        library.
+ 63                    - rest_api_file_payload_name: when rest_api_is_file_payload
+ 64                        is true, this option can be used to define the file
+ 65                        identifier in Python's requests library.
+ 66                    - extra_json_payload: when rest_api_file_payload_name is False,
+ 67                        can be used to provide additional JSON variables to add to
+ 68                        the original payload. This is useful to complement
+ 69                        the original payload with some extra input to better
+ 70                        configure the final payload to send to the REST api. An
+ 71                        example can be to add a constant configuration value to
+ 72                        add to the payload data.
+ 73
+ 74        Returns:
+ 75            Function to be called inside Spark dataframe.foreach.
+ 76        """
+ 77        headers = output_spec.options.get("rest_api_header", None)
+ 78        basic_auth_dict = output_spec.options.get("rest_api_basic_auth", None)
+ 79        url = output_spec.options["rest_api_url"]
+ 80        method = output_spec.options.get("rest_api_method", RestMethods.POST.value)
+ 81        sleep_seconds = output_spec.options.get("rest_api_sleep_seconds", 0)
+ 82        is_file_payload = output_spec.options.get("rest_api_is_file_payload", False)
+ 83        file_payload_name = output_spec.options.get(
+ 84            "rest_api_file_payload_name", "file"
+ 85        )
+ 86        extra_json_payload = output_spec.options.get(
+ 87            "rest_api_extra_json_payload", None
+ 88        )
+ 89        success_status_codes = output_spec.options.get(
+ 90            "rest_api_success_status_codes", RestStatusCodes.OK_STATUS_CODES.value
+ 91        )
+ 92
+ 93        def send_payload_to_rest_api(row: Row) -> Any:
+ 94            """Send payload to the REST API.
+ 95
+ 96            The payload needs to be prepared as a JSON string column in a dataframe.
+ 97            E.g., {"a": "a value", "b": "b value"}.
+ 98
+ 99            Args:
+100                row: a row in a dataframe.
+101            """
+102            if "payload" not in row:
+103                raise ValueError("Input DataFrame must contain 'payload' column.")
+104
+105            str_payload = row.payload
+106
+107            payload = None
+108            if not is_file_payload:
+109                payload = json.loads(str_payload)
+110            else:
+111                payload = {file_payload_name: str_payload}
+112
+113            if extra_json_payload:
+114                payload.update(extra_json_payload)
+115
+116            RestApiWriter._logger.debug(f"Original payload: {str_payload}")
+117            RestApiWriter._logger.debug(f"Final payload: {payload}")
+118
+119            response = execute_api_request(
+120                method=method,
+121                url=url,
+122                headers=headers,
+123                basic_auth_dict=basic_auth_dict,
+124                json=payload if not is_file_payload else None,
+125                files=payload if is_file_payload else None,
+126                sleep_seconds=sleep_seconds,
+127            )
+128
+129            RestApiWriter._logger.debug(
+130                f"Response: {response.status_code} - {response.text}"
+131            )
+132
+133            if response.status_code not in success_status_codes:
+134                raise RESTApiException(
+135                    f"API response status code {response.status_code} is not in"
+136                    f" {success_status_codes}. Got {response.text}"
+137                )
+138
+139        return send_payload_to_rest_api
+140
+141    @staticmethod
+142    def _write_to_rest_api_in_batch_mode(
+143        df: DataFrame, output_spec: OutputSpec
+144    ) -> None:
+145        """Write to REST API in Spark batch mode.
+146
+147        This function uses the dataframe.foreach function to generate a payload
+148        for each row of the dataframe and send it to the REST API endpoint.
+149
+150        Warning! Make sure your execution environment supports RDD api operations,
+151        as there are environments where RDD operation may not be supported. As,
+152        df.foreach() is a shorthand for df.rdd.foreach(), this can bring issues
+153        in such environments.
+154
+155        Args:
+156            df: dataframe to write.
+157            output_spec: output specification.
+158        """
+159        df.foreach(RestApiWriter._get_func_to_send_payload_to_rest_api(output_spec))
+160
+161    @staticmethod
+162    def _write_to_rest_api_in_streaming_mode(
+163        df: DataFrame, output_spec: OutputSpec, data: OrderedDict
+164    ) -> None:
+165        """Write to REST API in streaming mode.
+166
+167        Args:
+168            df: dataframe to write.
+169            output_spec: output specification.
+170            data: list of all dfs generated on previous steps before writer.
+171        """
+172        df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec))
+173
+174        stream_df = (
+175            df_writer.options(**output_spec.options if output_spec.options else {})
+176            .foreachBatch(
+177                RestApiWriter._write_transformed_micro_batch(output_spec, data)
+178            )
+179            .start()
+180        )
+181
+182        if output_spec.streaming_await_termination:
+183            stream_df.awaitTermination(output_spec.streaming_await_termination_timeout)
+184
+185    @staticmethod
+186    def _write_transformed_micro_batch(  # type: ignore
+187        output_spec: OutputSpec, data: OrderedDict
+188    ) -> Callable:
+189        """Define how to write a streaming micro batch after transforming it.
+190
+191        Args:
+192            output_spec: output specification.
+193            data: list of all dfs generated on previous steps before writer.
+194
+195        Returns:
+196            A function to be executed in the foreachBatch spark write method.
+197        """
+198
+199        def inner(batch_df: DataFrame, batch_id: int) -> None:
+200            transformed_df = Writer.get_transformed_micro_batch(
+201                output_spec, batch_df, batch_id, data
+202            )
+203
+204            if output_spec.streaming_micro_batch_dq_processors:
+205                transformed_df = Writer.run_micro_batch_dq_process(
+206                    transformed_df, output_spec.streaming_micro_batch_dq_processors
+207                )
+208
+209            RestApiWriter._write_to_rest_api_in_batch_mode(transformed_df, output_spec)
+210
+211        return inner
+
+ + +

Class to write data to a REST API.

+
+ + +
+ +
+ + RestApiWriter( output_spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict) + + + +
+ +
25    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
+26        """Construct RestApiWriter instances.
+27
+28        Args:
+29            output_spec: output specification.
+30            df: dataframe to be written.
+31            data: list of all dfs generated on previous steps before writer.
+32        """
+33        super().__init__(output_spec, df, data)
+
+ + +

Construct RestApiWriter instances.

+ +
Arguments:
+ +
    +
  • output_spec: output specification.
  • +
  • df: dataframe to be written.
  • +
  • data: list of all dfs generated on previous steps before writer.
  • +
+
+ + +
+
+ +
+ + def + write(self) -> None: + + + +
+ +
35    def write(self) -> None:
+36        """Write data to REST API."""
+37        if not self._df.isStreaming:
+38            self._write_to_rest_api_in_batch_mode(self._df, self._output_spec)
+39        else:
+40            self._write_to_rest_api_in_streaming_mode(
+41                self._df, self._output_spec, self._data
+42            )
+
+ + +

Write data to REST API.

+
+ + +
+ +
+
+ + \ No newline at end of file diff --git a/lakehouse_engine/io/writers/table_writer.html b/lakehouse_engine/io/writers/table_writer.html index fd5c251..bfdd432 100644 --- a/lakehouse_engine/io/writers/table_writer.html +++ b/lakehouse_engine/io/writers/table_writer.html @@ -3,14 +3,14 @@ - + lakehouse_engine.io.writers.table_writer - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + + + + + + + + + +
+
+ +

+lakehouse_engine.utils.acon_utils

+ +

Module to perform validations and resolve the acon.

+
+ + + + + +
 1"""Module to perform validations and resolve the acon."""
+ 2
+ 3from lakehouse_engine.core.definitions import DQType, InputFormat, OutputFormat
+ 4from lakehouse_engine.io.exceptions import WrongIOFormatException
+ 5from lakehouse_engine.utils.dq_utils import PrismaUtils
+ 6from lakehouse_engine.utils.logging_handler import LoggingHandler
+ 7
+ 8_LOGGER = LoggingHandler(__name__).get_logger()
+ 9
+10
+11def validate_and_resolve_acon(acon: dict, execution_point: str = "") -> dict:
+12    """Function to validate and resolve the acon.
+13
+14    Args:
+15        acon: Acon to be validated and resolved.
+16        execution_point: Execution point to resolve the dq functions.
+17
+18    Returns:
+19        Acon after validation and resolution.
+20    """
+21    # Performing validations
+22    validate_readers(acon)
+23    validate_writers(acon)
+24
+25    # Resolving the acon
+26    if execution_point:
+27        acon = resolve_dq_functions(acon, execution_point)
+28
+29    _LOGGER.info(f"Read Algorithm Configuration: {str(acon)}")
+30
+31    return acon
+32
+33
+34def validate_readers(acon: dict) -> None:
+35    """Function to validate the readers in the acon.
+36
+37    Args:
+38        acon: Acon to be validated.
+39
+40    Raises:
+41        RuntimeError: If the input format is not supported.
+42    """
+43    if "input_specs" in acon.keys() or "input_spec" in acon.keys():
+44        for spec in acon.get("input_specs", []) or [acon.get("input_spec", {})]:
+45            if (
+46                not InputFormat.exists(spec.get("data_format"))
+47                and "db_table" not in spec.keys()
+48            ):
+49                raise WrongIOFormatException(
+50                    f"Input format not supported: {spec.get('data_format')}"
+51                )
+52
+53
+54def validate_writers(acon: dict) -> None:
+55    """Function to validate the writers in the acon.
+56
+57    Args:
+58        acon: Acon to be validated.
+59
+60    Raises:
+61        RuntimeError: If the output format is not supported.
+62    """
+63    if "output_specs" in acon.keys() or "output_spec" in acon.keys():
+64        for spec in acon.get("output_specs", []) or [acon.get("output_spec", {})]:
+65            if not OutputFormat.exists(spec.get("data_format")):
+66                raise WrongIOFormatException(
+67                    f"Output format not supported: {spec.get('data_format')}"
+68                )
+69
+70
+71def resolve_dq_functions(acon: dict, execution_point: str) -> dict:
+72    """Function to resolve the dq functions in the acon.
+73
+74    Args:
+75        acon: Acon to resolve the dq functions.
+76        execution_point: Execution point of the dq_functions.
+77
+78    Returns:
+79        Acon after resolving the dq functions.
+80    """
+81    if acon.get("dq_spec"):
+82        if acon.get("dq_spec").get("dq_type") == DQType.PRISMA.value:
+83            acon["dq_spec"] = PrismaUtils.build_prisma_dq_spec(
+84                spec=acon.get("dq_spec"), execution_point=execution_point
+85            )
+86    elif acon.get("dq_specs"):
+87        resolved_dq_specs = []
+88        for spec in acon.get("dq_specs", []):
+89            if spec.get("dq_type") == DQType.PRISMA.value:
+90                resolved_dq_specs.append(
+91                    PrismaUtils.build_prisma_dq_spec(
+92                        spec=spec, execution_point=execution_point
+93                    )
+94                )
+95            else:
+96                resolved_dq_specs.append(spec)
+97        acon["dq_specs"] = resolved_dq_specs
+98    return acon
+
+ + +
+
+ +
+ + def + validate_and_resolve_acon(acon: dict, execution_point: str = '') -> dict: + + + +
+ +
12def validate_and_resolve_acon(acon: dict, execution_point: str = "") -> dict:
+13    """Function to validate and resolve the acon.
+14
+15    Args:
+16        acon: Acon to be validated and resolved.
+17        execution_point: Execution point to resolve the dq functions.
+18
+19    Returns:
+20        Acon after validation and resolution.
+21    """
+22    # Performing validations
+23    validate_readers(acon)
+24    validate_writers(acon)
+25
+26    # Resolving the acon
+27    if execution_point:
+28        acon = resolve_dq_functions(acon, execution_point)
+29
+30    _LOGGER.info(f"Read Algorithm Configuration: {str(acon)}")
+31
+32    return acon
+
+ + +

Function to validate and resolve the acon.

+ +
Arguments:
+ +
    +
  • acon: Acon to be validated and resolved.
  • +
  • execution_point: Execution point to resolve the dq functions.
  • +
+ +
Returns:
+ +
+

Acon after validation and resolution.

+
+
+ + +
+
+ +
+ + def + validate_readers(acon: dict) -> None: + + + +
+ +
35def validate_readers(acon: dict) -> None:
+36    """Function to validate the readers in the acon.
+37
+38    Args:
+39        acon: Acon to be validated.
+40
+41    Raises:
+42        RuntimeError: If the input format is not supported.
+43    """
+44    if "input_specs" in acon.keys() or "input_spec" in acon.keys():
+45        for spec in acon.get("input_specs", []) or [acon.get("input_spec", {})]:
+46            if (
+47                not InputFormat.exists(spec.get("data_format"))
+48                and "db_table" not in spec.keys()
+49            ):
+50                raise WrongIOFormatException(
+51                    f"Input format not supported: {spec.get('data_format')}"
+52                )
+
+ + +

Function to validate the readers in the acon.

+ +
Arguments:
+ +
    +
  • acon: Acon to be validated.
  • +
+ +
Raises:
+ +
    +
  • RuntimeError: If the input format is not supported.
  • +
+
+ + +
+
+ +
+ + def + validate_writers(acon: dict) -> None: + + + +
+ +
55def validate_writers(acon: dict) -> None:
+56    """Function to validate the writers in the acon.
+57
+58    Args:
+59        acon: Acon to be validated.
+60
+61    Raises:
+62        RuntimeError: If the output format is not supported.
+63    """
+64    if "output_specs" in acon.keys() or "output_spec" in acon.keys():
+65        for spec in acon.get("output_specs", []) or [acon.get("output_spec", {})]:
+66            if not OutputFormat.exists(spec.get("data_format")):
+67                raise WrongIOFormatException(
+68                    f"Output format not supported: {spec.get('data_format')}"
+69                )
+
+ + +

Function to validate the writers in the acon.

+ +
Arguments:
+ +
    +
  • acon: Acon to be validated.
  • +
+ +
Raises:
+ +
    +
  • RuntimeError: If the output format is not supported.
  • +
+
+ + +
+
+ +
+ + def + resolve_dq_functions(acon: dict, execution_point: str) -> dict: + + + +
+ +
72def resolve_dq_functions(acon: dict, execution_point: str) -> dict:
+73    """Function to resolve the dq functions in the acon.
+74
+75    Args:
+76        acon: Acon to resolve the dq functions.
+77        execution_point: Execution point of the dq_functions.
+78
+79    Returns:
+80        Acon after resolving the dq functions.
+81    """
+82    if acon.get("dq_spec"):
+83        if acon.get("dq_spec").get("dq_type") == DQType.PRISMA.value:
+84            acon["dq_spec"] = PrismaUtils.build_prisma_dq_spec(
+85                spec=acon.get("dq_spec"), execution_point=execution_point
+86            )
+87    elif acon.get("dq_specs"):
+88        resolved_dq_specs = []
+89        for spec in acon.get("dq_specs", []):
+90            if spec.get("dq_type") == DQType.PRISMA.value:
+91                resolved_dq_specs.append(
+92                    PrismaUtils.build_prisma_dq_spec(
+93                        spec=spec, execution_point=execution_point
+94                    )
+95                )
+96            else:
+97                resolved_dq_specs.append(spec)
+98        acon["dq_specs"] = resolved_dq_specs
+99    return acon
+
+ + +

Function to resolve the dq functions in the acon.

+ +
Arguments:
+ +
    +
  • acon: Acon to resolve the dq functions.
  • +
  • execution_point: Execution point of the dq_functions.
  • +
+ +
Returns:
+ +
+

Acon after resolving the dq functions.

+
+
+ + +
+
+ + \ No newline at end of file diff --git a/lakehouse_engine/utils/configs.html b/lakehouse_engine/utils/configs.html index 5cd23ab..0f035e9 100644 --- a/lakehouse_engine/utils/configs.html +++ b/lakehouse_engine/utils/configs.html @@ -3,14 +3,14 @@ - + lakehouse_engine.utils.configs - + - + - + + + + + + + + + +
+
+ +

+lakehouse_engine.utils.dq_utils

+ +

Module containing utils for DQ processing.

+
+ + + + + +
  1"""Module containing utils for DQ processing."""
+  2
+  3from collections import Counter
+  4from json import loads
+  5
+  6from pyspark.sql.functions import col, from_json, schema_of_json, struct
+  7
+  8from lakehouse_engine.core.definitions import DQTableBaseParameters
+  9from lakehouse_engine.core.exec_env import ExecEnv
+ 10from lakehouse_engine.dq_processors.exceptions import DQSpecMalformedException
+ 11from lakehouse_engine.utils.logging_handler import LoggingHandler
+ 12
+ 13_LOGGER = LoggingHandler(__name__).get_logger()
+ 14
+ 15
+ 16class DQUtils:
+ 17    """Utils related to the data quality process."""
+ 18
+ 19    @staticmethod
+ 20    def import_dq_rules_from_table(
+ 21        spec: dict,
+ 22        execution_point: str,
+ 23        base_expectation_arguments: list,
+ 24        extra_meta_arguments: list,
+ 25    ) -> dict:
+ 26        """Import dq rules from a table.
+ 27
+ 28        Args:
+ 29            spec: data quality specification.
+ 30            execution_point: if the execution is in_motion or at_rest.
+ 31            base_expectation_arguments: base arguments for dq functions.
+ 32            extra_meta_arguments: extra meta arguments for dq functions.
+ 33
+ 34        Returns:
+ 35            The dictionary containing the dq spec with dq functions defined.
+ 36        """
+ 37        dq_db_table = spec["dq_db_table"]
+ 38        dq_functions = []
+ 39
+ 40        if spec.get("dq_table_table_filter"):
+ 41            dq_table_table_filter = spec["dq_table_table_filter"]
+ 42        else:
+ 43            raise DQSpecMalformedException(
+ 44                "When importing rules from a table "
+ 45                "dq_table_table_filter must be defined."
+ 46            )
+ 47
+ 48        extra_filters_query = (
+ 49            f""" and {spec["dq_table_extra_filters"]}"""
+ 50            if spec.get("dq_table_extra_filters")
+ 51            else ""
+ 52        )
+ 53
+ 54        fields = base_expectation_arguments + extra_meta_arguments
+ 55
+ 56        dq_functions_query = f"""
+ 57            SELECT {", ".join(fields)}
+ 58            FROM {dq_db_table}
+ 59            WHERE
+ 60            execution_point='{execution_point}' and table = '{dq_table_table_filter}'
+ 61            {extra_filters_query}"""  # nosec: B608
+ 62
+ 63        raw_dq_functions = ExecEnv.SESSION.sql(dq_functions_query)
+ 64
+ 65        arguments = raw_dq_functions.select("arguments").collect()
+ 66        parsed_arguments = [loads(argument.arguments) for argument in arguments]
+ 67        combined_dict: dict = {}
+ 68
+ 69        for argument in parsed_arguments:
+ 70            combined_dict = {**combined_dict, **argument}
+ 71
+ 72        dq_function_arguments_schema = schema_of_json(str(combined_dict))
+ 73
+ 74        processed_dq_functions = (
+ 75            raw_dq_functions.withColumn(
+ 76                "json_data", from_json(col("arguments"), dq_function_arguments_schema)
+ 77            )
+ 78            .withColumn(
+ 79                "parsed_arguments",
+ 80                struct(
+ 81                    col("json_data.*"),
+ 82                    struct(extra_meta_arguments).alias("meta"),
+ 83                ),
+ 84            )
+ 85            .drop(col("json_data"))
+ 86        )
+ 87
+ 88        unique_dq_functions = processed_dq_functions.drop_duplicates(
+ 89            ["dq_tech_function", "arguments"]
+ 90        )
+ 91
+ 92        duplicated_rows = processed_dq_functions.subtract(unique_dq_functions)
+ 93
+ 94        if duplicated_rows.count() > 0:
+ 95            _LOGGER.warn("Found Duplicates Rows:")
+ 96            duplicated_rows.show(truncate=False)
+ 97
+ 98        processed_dq_functions_list = unique_dq_functions.collect()
+ 99        for processed_dq_function in processed_dq_functions_list:
+100            dq_functions.append(
+101                {
+102                    "function": f"{processed_dq_function.dq_tech_function}",
+103                    "args": {
+104                        k: v
+105                        for k, v in processed_dq_function.parsed_arguments.asDict(
+106                            recursive=True
+107                        ).items()
+108                        if v is not None
+109                    },
+110                }
+111            )
+112
+113        spec["dq_functions"] = dq_functions
+114
+115        return spec
+116
+117    @staticmethod
+118    def validate_dq_functions(
+119        spec: dict, execution_point: str = "", extra_meta_arguments: list = None
+120    ) -> None:
+121        """Function to validate the dq functions defined in the dq_spec.
+122
+123        This function validates that the defined dq_functions contain all
+124        the fields defined in the extra_meta_arguments parameter.
+125
+126        Args:
+127            spec: data quality specification.
+128            execution_point: if the execution is in_motion or at_rest.
+129            extra_meta_arguments: extra meta arguments for dq functions.
+130
+131        Raises:
+132            DQSpecMalformedException: If the dq spec is malformed.
+133        """
+134        dq_functions = spec["dq_functions"]
+135        if not extra_meta_arguments:
+136            _LOGGER.info(
+137                "No extra meta parameters defined. "
+138                "Skipping validation of imported dq rule."
+139            )
+140            return
+141
+142        for dq_function in dq_functions:
+143            if not dq_function.get("args").get("meta", None):
+144                raise DQSpecMalformedException(
+145                    "The dq function must have a meta field containing all "
+146                    f"the fields defined: {extra_meta_arguments}."
+147                )
+148            else:
+149
+150                meta = dq_function["args"]["meta"]
+151                if Counter(meta.keys()) != Counter(extra_meta_arguments):
+152                    raise DQSpecMalformedException(
+153                        "The dq function meta field must contain all the "
+154                        f"fields defined: {extra_meta_arguments}.\n"
+155                        f"Found fields: {list(meta.keys())}"
+156                    )
+157                if execution_point and meta["execution_point"] != execution_point:
+158                    raise DQSpecMalformedException(
+159                        "The dq function execution point must be the same as "
+160                        "the execution point of the dq spec."
+161                    )
+162
+163
+164class PrismaUtils:
+165    """Prisma related utils."""
+166
+167    @staticmethod
+168    def build_prisma_dq_spec(spec: dict, execution_point: str) -> dict:
+169        """Fetch dq functions from given table.
+170
+171        Args:
+172            spec: data quality specification.
+173            execution_point: if the execution is in_motion or at_rest.
+174
+175        Returns:
+176            The dictionary containing the dq spec with dq functions defined.
+177        """
+178        if spec.get("dq_db_table"):
+179            spec = DQUtils.import_dq_rules_from_table(
+180                spec,
+181                execution_point,
+182                DQTableBaseParameters.PRISMA_BASE_PARAMETERS.value,
+183                ExecEnv.ENGINE_CONFIG.dq_functions_column_list,
+184            )
+185        elif spec.get("dq_functions"):
+186            DQUtils.validate_dq_functions(
+187                spec,
+188                execution_point,
+189                ExecEnv.ENGINE_CONFIG.dq_functions_column_list,
+190            )
+191        else:
+192            raise DQSpecMalformedException(
+193                "When using PRISMA either dq_db_table or "
+194                "dq_functions needs to be defined."
+195            )
+196
+197        spec["critical_functions"] = []
+198        spec["execution_point"] = execution_point
+199        spec["result_sink_db_table"] = None
+200        spec["result_sink_explode"] = True
+201        spec["fail_on_error"] = spec.get("fail_on_error", False)
+202        spec["max_percentage_failure"] = spec.get("max_percentage_failure", 1)
+203
+204        if not spec.get("result_sink_extra_columns", None):
+205            spec["result_sink_extra_columns"] = [
+206                "validation_results.expectation_config.meta",
+207            ]
+208        else:
+209            spec["result_sink_extra_columns"] = [
+210                "validation_results.expectation_config.meta",
+211            ] + spec["result_sink_extra_columns"]
+212        if not spec.get("data_product_name", None):
+213            raise DQSpecMalformedException(
+214                "When using PRISMA DQ data_product_name must be defined."
+215            )
+216        spec["result_sink_location"] = (
+217            f"{ExecEnv.ENGINE_CONFIG.dq_bucket}"
+218            f"/{spec['data_product_name']}/result_sink/"
+219        )
+220        if not spec.get("tbl_to_derive_pk", None) and not spec.get(
+221            "unexpected_rows_pk", None
+222        ):
+223            raise DQSpecMalformedException(
+224                "When using PRISMA DQ either "
+225                "tbl_to_derive_pk or unexpected_rows_pk need to be defined."
+226            )
+227        return spec
+
+ + +
+
+ +
+ + class + DQUtils: + + + +
+ +
 17class DQUtils:
+ 18    """Utils related to the data quality process."""
+ 19
+ 20    @staticmethod
+ 21    def import_dq_rules_from_table(
+ 22        spec: dict,
+ 23        execution_point: str,
+ 24        base_expectation_arguments: list,
+ 25        extra_meta_arguments: list,
+ 26    ) -> dict:
+ 27        """Import dq rules from a table.
+ 28
+ 29        Args:
+ 30            spec: data quality specification.
+ 31            execution_point: if the execution is in_motion or at_rest.
+ 32            base_expectation_arguments: base arguments for dq functions.
+ 33            extra_meta_arguments: extra meta arguments for dq functions.
+ 34
+ 35        Returns:
+ 36            The dictionary containing the dq spec with dq functions defined.
+ 37        """
+ 38        dq_db_table = spec["dq_db_table"]
+ 39        dq_functions = []
+ 40
+ 41        if spec.get("dq_table_table_filter"):
+ 42            dq_table_table_filter = spec["dq_table_table_filter"]
+ 43        else:
+ 44            raise DQSpecMalformedException(
+ 45                "When importing rules from a table "
+ 46                "dq_table_table_filter must be defined."
+ 47            )
+ 48
+ 49        extra_filters_query = (
+ 50            f""" and {spec["dq_table_extra_filters"]}"""
+ 51            if spec.get("dq_table_extra_filters")
+ 52            else ""
+ 53        )
+ 54
+ 55        fields = base_expectation_arguments + extra_meta_arguments
+ 56
+ 57        dq_functions_query = f"""
+ 58            SELECT {", ".join(fields)}
+ 59            FROM {dq_db_table}
+ 60            WHERE
+ 61            execution_point='{execution_point}' and table = '{dq_table_table_filter}'
+ 62            {extra_filters_query}"""  # nosec: B608
+ 63
+ 64        raw_dq_functions = ExecEnv.SESSION.sql(dq_functions_query)
+ 65
+ 66        arguments = raw_dq_functions.select("arguments").collect()
+ 67        parsed_arguments = [loads(argument.arguments) for argument in arguments]
+ 68        combined_dict: dict = {}
+ 69
+ 70        for argument in parsed_arguments:
+ 71            combined_dict = {**combined_dict, **argument}
+ 72
+ 73        dq_function_arguments_schema = schema_of_json(str(combined_dict))
+ 74
+ 75        processed_dq_functions = (
+ 76            raw_dq_functions.withColumn(
+ 77                "json_data", from_json(col("arguments"), dq_function_arguments_schema)
+ 78            )
+ 79            .withColumn(
+ 80                "parsed_arguments",
+ 81                struct(
+ 82                    col("json_data.*"),
+ 83                    struct(extra_meta_arguments).alias("meta"),
+ 84                ),
+ 85            )
+ 86            .drop(col("json_data"))
+ 87        )
+ 88
+ 89        unique_dq_functions = processed_dq_functions.drop_duplicates(
+ 90            ["dq_tech_function", "arguments"]
+ 91        )
+ 92
+ 93        duplicated_rows = processed_dq_functions.subtract(unique_dq_functions)
+ 94
+ 95        if duplicated_rows.count() > 0:
+ 96            _LOGGER.warn("Found Duplicates Rows:")
+ 97            duplicated_rows.show(truncate=False)
+ 98
+ 99        processed_dq_functions_list = unique_dq_functions.collect()
+100        for processed_dq_function in processed_dq_functions_list:
+101            dq_functions.append(
+102                {
+103                    "function": f"{processed_dq_function.dq_tech_function}",
+104                    "args": {
+105                        k: v
+106                        for k, v in processed_dq_function.parsed_arguments.asDict(
+107                            recursive=True
+108                        ).items()
+109                        if v is not None
+110                    },
+111                }
+112            )
+113
+114        spec["dq_functions"] = dq_functions
+115
+116        return spec
+117
+118    @staticmethod
+119    def validate_dq_functions(
+120        spec: dict, execution_point: str = "", extra_meta_arguments: list = None
+121    ) -> None:
+122        """Function to validate the dq functions defined in the dq_spec.
+123
+124        This function validates that the defined dq_functions contain all
+125        the fields defined in the extra_meta_arguments parameter.
+126
+127        Args:
+128            spec: data quality specification.
+129            execution_point: if the execution is in_motion or at_rest.
+130            extra_meta_arguments: extra meta arguments for dq functions.
+131
+132        Raises:
+133            DQSpecMalformedException: If the dq spec is malformed.
+134        """
+135        dq_functions = spec["dq_functions"]
+136        if not extra_meta_arguments:
+137            _LOGGER.info(
+138                "No extra meta parameters defined. "
+139                "Skipping validation of imported dq rule."
+140            )
+141            return
+142
+143        for dq_function in dq_functions:
+144            if not dq_function.get("args").get("meta", None):
+145                raise DQSpecMalformedException(
+146                    "The dq function must have a meta field containing all "
+147                    f"the fields defined: {extra_meta_arguments}."
+148                )
+149            else:
+150
+151                meta = dq_function["args"]["meta"]
+152                if Counter(meta.keys()) != Counter(extra_meta_arguments):
+153                    raise DQSpecMalformedException(
+154                        "The dq function meta field must contain all the "
+155                        f"fields defined: {extra_meta_arguments}.\n"
+156                        f"Found fields: {list(meta.keys())}"
+157                    )
+158                if execution_point and meta["execution_point"] != execution_point:
+159                    raise DQSpecMalformedException(
+160                        "The dq function execution point must be the same as "
+161                        "the execution point of the dq spec."
+162                    )
+
+ + +

Utils related to the data quality process.

+
+ + +
+ +
+
@staticmethod
+ + def + import_dq_rules_from_table( spec: dict, execution_point: str, base_expectation_arguments: list, extra_meta_arguments: list) -> dict: + + + +
+ +
 20    @staticmethod
+ 21    def import_dq_rules_from_table(
+ 22        spec: dict,
+ 23        execution_point: str,
+ 24        base_expectation_arguments: list,
+ 25        extra_meta_arguments: list,
+ 26    ) -> dict:
+ 27        """Import dq rules from a table.
+ 28
+ 29        Args:
+ 30            spec: data quality specification.
+ 31            execution_point: if the execution is in_motion or at_rest.
+ 32            base_expectation_arguments: base arguments for dq functions.
+ 33            extra_meta_arguments: extra meta arguments for dq functions.
+ 34
+ 35        Returns:
+ 36            The dictionary containing the dq spec with dq functions defined.
+ 37        """
+ 38        dq_db_table = spec["dq_db_table"]
+ 39        dq_functions = []
+ 40
+ 41        if spec.get("dq_table_table_filter"):
+ 42            dq_table_table_filter = spec["dq_table_table_filter"]
+ 43        else:
+ 44            raise DQSpecMalformedException(
+ 45                "When importing rules from a table "
+ 46                "dq_table_table_filter must be defined."
+ 47            )
+ 48
+ 49        extra_filters_query = (
+ 50            f""" and {spec["dq_table_extra_filters"]}"""
+ 51            if spec.get("dq_table_extra_filters")
+ 52            else ""
+ 53        )
+ 54
+ 55        fields = base_expectation_arguments + extra_meta_arguments
+ 56
+ 57        dq_functions_query = f"""
+ 58            SELECT {", ".join(fields)}
+ 59            FROM {dq_db_table}
+ 60            WHERE
+ 61            execution_point='{execution_point}' and table = '{dq_table_table_filter}'
+ 62            {extra_filters_query}"""  # nosec: B608
+ 63
+ 64        raw_dq_functions = ExecEnv.SESSION.sql(dq_functions_query)
+ 65
+ 66        arguments = raw_dq_functions.select("arguments").collect()
+ 67        parsed_arguments = [loads(argument.arguments) for argument in arguments]
+ 68        combined_dict: dict = {}
+ 69
+ 70        for argument in parsed_arguments:
+ 71            combined_dict = {**combined_dict, **argument}
+ 72
+ 73        dq_function_arguments_schema = schema_of_json(str(combined_dict))
+ 74
+ 75        processed_dq_functions = (
+ 76            raw_dq_functions.withColumn(
+ 77                "json_data", from_json(col("arguments"), dq_function_arguments_schema)
+ 78            )
+ 79            .withColumn(
+ 80                "parsed_arguments",
+ 81                struct(
+ 82                    col("json_data.*"),
+ 83                    struct(extra_meta_arguments).alias("meta"),
+ 84                ),
+ 85            )
+ 86            .drop(col("json_data"))
+ 87        )
+ 88
+ 89        unique_dq_functions = processed_dq_functions.drop_duplicates(
+ 90            ["dq_tech_function", "arguments"]
+ 91        )
+ 92
+ 93        duplicated_rows = processed_dq_functions.subtract(unique_dq_functions)
+ 94
+ 95        if duplicated_rows.count() > 0:
+ 96            _LOGGER.warn("Found Duplicates Rows:")
+ 97            duplicated_rows.show(truncate=False)
+ 98
+ 99        processed_dq_functions_list = unique_dq_functions.collect()
+100        for processed_dq_function in processed_dq_functions_list:
+101            dq_functions.append(
+102                {
+103                    "function": f"{processed_dq_function.dq_tech_function}",
+104                    "args": {
+105                        k: v
+106                        for k, v in processed_dq_function.parsed_arguments.asDict(
+107                            recursive=True
+108                        ).items()
+109                        if v is not None
+110                    },
+111                }
+112            )
+113
+114        spec["dq_functions"] = dq_functions
+115
+116        return spec
+
+ + +

Import dq rules from a table.

+ +
Arguments:
+ +
    +
  • spec: data quality specification.
  • +
  • execution_point: if the execution is in_motion or at_rest.
  • +
  • base_expectation_arguments: base arguments for dq functions.
  • +
  • extra_meta_arguments: extra meta arguments for dq functions.
  • +
+ +
Returns:
+ +
+

The dictionary containing the dq spec with dq functions defined.

+
+
+ + +
+
+ +
+
@staticmethod
+ + def + validate_dq_functions( spec: dict, execution_point: str = '', extra_meta_arguments: list = None) -> None: + + + +
+ +
118    @staticmethod
+119    def validate_dq_functions(
+120        spec: dict, execution_point: str = "", extra_meta_arguments: list = None
+121    ) -> None:
+122        """Function to validate the dq functions defined in the dq_spec.
+123
+124        This function validates that the defined dq_functions contain all
+125        the fields defined in the extra_meta_arguments parameter.
+126
+127        Args:
+128            spec: data quality specification.
+129            execution_point: if the execution is in_motion or at_rest.
+130            extra_meta_arguments: extra meta arguments for dq functions.
+131
+132        Raises:
+133            DQSpecMalformedException: If the dq spec is malformed.
+134        """
+135        dq_functions = spec["dq_functions"]
+136        if not extra_meta_arguments:
+137            _LOGGER.info(
+138                "No extra meta parameters defined. "
+139                "Skipping validation of imported dq rule."
+140            )
+141            return
+142
+143        for dq_function in dq_functions:
+144            if not dq_function.get("args").get("meta", None):
+145                raise DQSpecMalformedException(
+146                    "The dq function must have a meta field containing all "
+147                    f"the fields defined: {extra_meta_arguments}."
+148                )
+149            else:
+150
+151                meta = dq_function["args"]["meta"]
+152                if Counter(meta.keys()) != Counter(extra_meta_arguments):
+153                    raise DQSpecMalformedException(
+154                        "The dq function meta field must contain all the "
+155                        f"fields defined: {extra_meta_arguments}.\n"
+156                        f"Found fields: {list(meta.keys())}"
+157                    )
+158                if execution_point and meta["execution_point"] != execution_point:
+159                    raise DQSpecMalformedException(
+160                        "The dq function execution point must be the same as "
+161                        "the execution point of the dq spec."
+162                    )
+
+ + +

Function to validate the dq functions defined in the dq_spec.

+ +

This function validates that the defined dq_functions contain all +the fields defined in the extra_meta_arguments parameter.

+ +
Arguments:
+ +
    +
  • spec: data quality specification.
  • +
  • execution_point: if the execution is in_motion or at_rest.
  • +
  • extra_meta_arguments: extra meta arguments for dq functions.
  • +
+ +
Raises:
+ +
    +
  • DQSpecMalformedException: If the dq spec is malformed.
  • +
+
+ + +
+
+
+ +
+ + class + PrismaUtils: + + + +
+ +
165class PrismaUtils:
+166    """Prisma related utils."""
+167
+168    @staticmethod
+169    def build_prisma_dq_spec(spec: dict, execution_point: str) -> dict:
+170        """Fetch dq functions from given table.
+171
+172        Args:
+173            spec: data quality specification.
+174            execution_point: if the execution is in_motion or at_rest.
+175
+176        Returns:
+177            The dictionary containing the dq spec with dq functions defined.
+178        """
+179        if spec.get("dq_db_table"):
+180            spec = DQUtils.import_dq_rules_from_table(
+181                spec,
+182                execution_point,
+183                DQTableBaseParameters.PRISMA_BASE_PARAMETERS.value,
+184                ExecEnv.ENGINE_CONFIG.dq_functions_column_list,
+185            )
+186        elif spec.get("dq_functions"):
+187            DQUtils.validate_dq_functions(
+188                spec,
+189                execution_point,
+190                ExecEnv.ENGINE_CONFIG.dq_functions_column_list,
+191            )
+192        else:
+193            raise DQSpecMalformedException(
+194                "When using PRISMA either dq_db_table or "
+195                "dq_functions needs to be defined."
+196            )
+197
+198        spec["critical_functions"] = []
+199        spec["execution_point"] = execution_point
+200        spec["result_sink_db_table"] = None
+201        spec["result_sink_explode"] = True
+202        spec["fail_on_error"] = spec.get("fail_on_error", False)
+203        spec["max_percentage_failure"] = spec.get("max_percentage_failure", 1)
+204
+205        if not spec.get("result_sink_extra_columns", None):
+206            spec["result_sink_extra_columns"] = [
+207                "validation_results.expectation_config.meta",
+208            ]
+209        else:
+210            spec["result_sink_extra_columns"] = [
+211                "validation_results.expectation_config.meta",
+212            ] + spec["result_sink_extra_columns"]
+213        if not spec.get("data_product_name", None):
+214            raise DQSpecMalformedException(
+215                "When using PRISMA DQ data_product_name must be defined."
+216            )
+217        spec["result_sink_location"] = (
+218            f"{ExecEnv.ENGINE_CONFIG.dq_bucket}"
+219            f"/{spec['data_product_name']}/result_sink/"
+220        )
+221        if not spec.get("tbl_to_derive_pk", None) and not spec.get(
+222            "unexpected_rows_pk", None
+223        ):
+224            raise DQSpecMalformedException(
+225                "When using PRISMA DQ either "
+226                "tbl_to_derive_pk or unexpected_rows_pk need to be defined."
+227            )
+228        return spec
+
+ + +

Prisma related utils.

+
+ + +
+ +
+
@staticmethod
+ + def + build_prisma_dq_spec(spec: dict, execution_point: str) -> dict: + + + +
+ +
168    @staticmethod
+169    def build_prisma_dq_spec(spec: dict, execution_point: str) -> dict:
+170        """Fetch dq functions from given table.
+171
+172        Args:
+173            spec: data quality specification.
+174            execution_point: if the execution is in_motion or at_rest.
+175
+176        Returns:
+177            The dictionary containing the dq spec with dq functions defined.
+178        """
+179        if spec.get("dq_db_table"):
+180            spec = DQUtils.import_dq_rules_from_table(
+181                spec,
+182                execution_point,
+183                DQTableBaseParameters.PRISMA_BASE_PARAMETERS.value,
+184                ExecEnv.ENGINE_CONFIG.dq_functions_column_list,
+185            )
+186        elif spec.get("dq_functions"):
+187            DQUtils.validate_dq_functions(
+188                spec,
+189                execution_point,
+190                ExecEnv.ENGINE_CONFIG.dq_functions_column_list,
+191            )
+192        else:
+193            raise DQSpecMalformedException(
+194                "When using PRISMA either dq_db_table or "
+195                "dq_functions needs to be defined."
+196            )
+197
+198        spec["critical_functions"] = []
+199        spec["execution_point"] = execution_point
+200        spec["result_sink_db_table"] = None
+201        spec["result_sink_explode"] = True
+202        spec["fail_on_error"] = spec.get("fail_on_error", False)
+203        spec["max_percentage_failure"] = spec.get("max_percentage_failure", 1)
+204
+205        if not spec.get("result_sink_extra_columns", None):
+206            spec["result_sink_extra_columns"] = [
+207                "validation_results.expectation_config.meta",
+208            ]
+209        else:
+210            spec["result_sink_extra_columns"] = [
+211                "validation_results.expectation_config.meta",
+212            ] + spec["result_sink_extra_columns"]
+213        if not spec.get("data_product_name", None):
+214            raise DQSpecMalformedException(
+215                "When using PRISMA DQ data_product_name must be defined."
+216            )
+217        spec["result_sink_location"] = (
+218            f"{ExecEnv.ENGINE_CONFIG.dq_bucket}"
+219            f"/{spec['data_product_name']}/result_sink/"
+220        )
+221        if not spec.get("tbl_to_derive_pk", None) and not spec.get(
+222            "unexpected_rows_pk", None
+223        ):
+224            raise DQSpecMalformedException(
+225                "When using PRISMA DQ either "
+226                "tbl_to_derive_pk or unexpected_rows_pk need to be defined."
+227            )
+228        return spec
+
+ + +

Fetch dq functions from given table.

+ +
Arguments:
+ +
    +
  • spec: data quality specification.
  • +
  • execution_point: if the execution is in_motion or at_rest.
  • +
+ +
Returns:
+ +
+

The dictionary containing the dq spec with dq functions defined.

+
+
+ + +
+
+
+ + \ No newline at end of file diff --git a/lakehouse_engine/utils/engine_usage_stats.html b/lakehouse_engine/utils/engine_usage_stats.html index 840eb9f..bbac17b 100644 --- a/lakehouse_engine/utils/engine_usage_stats.html +++ b/lakehouse_engine/utils/engine_usage_stats.html @@ -3,14 +3,14 @@ - + lakehouse_engine.utils.engine_usage_stats - + - + - + - + - + - + - + - + - + - + + + + + + + + + +
+
+ +

+lakehouse_engine.utils.rest_api

+ +

Module to handle REST API operations.

+
+ + + + + +
  1"""Module to handle REST API operations."""
+  2
+  3import time
+  4from enum import Enum
+  5
+  6import requests
+  7from requests.adapters import HTTPAdapter
+  8from urllib3.util.retry import Retry
+  9
+ 10from lakehouse_engine.utils.logging_handler import LoggingHandler
+ 11
+ 12LOG = LoggingHandler(__name__).get_logger()
+ 13DEFAULT_CONTENT_TYPE = "application/json"
+ 14
+ 15
+ 16class RestMethods(Enum):
+ 17    """Methods for REST API calls."""
+ 18
+ 19    POST = "POST"
+ 20    PUT = "PUT"
+ 21    ALLOWED_METHODS = ["POST", "PUT"]
+ 22
+ 23
+ 24class RestStatusCodes(Enum):
+ 25    """REST Status Code."""
+ 26
+ 27    RETRY_STATUS_CODES = [429, 500, 502, 503, 504]
+ 28    OK_STATUS_CODES = [200]
+ 29
+ 30
+ 31class RESTApiException(requests.RequestException):
+ 32    """Class representing any possible REST API Exception."""
+ 33
+ 34    def __init__(self, message: str) -> None:
+ 35        """Construct RESTApiException instances.
+ 36
+ 37        Args:
+ 38            message: message to display on exception event.
+ 39        """
+ 40        super().__init__(message)
+ 41
+ 42
+ 43def get_basic_auth(username: str, password: str) -> requests.auth.HTTPBasicAuth:
+ 44    """Get the basic authentication object to authenticate REST requests.
+ 45
+ 46    Args:
+ 47        username: username.
+ 48        password: password.
+ 49
+ 50    Returns:
+ 51        requests.auth.HTTPBasicAuth: the HTTPBasicAuth object.
+ 52    """
+ 53    return requests.auth.HTTPBasicAuth(username, password)
+ 54
+ 55
+ 56def get_configured_session(
+ 57    sleep_seconds: float = 0.2,
+ 58    total_retries: int = 5,
+ 59    backoff_factor: int = 2,
+ 60    retry_status_codes: list = None,
+ 61    allowed_methods: list = None,
+ 62    protocol: str = "https://",
+ 63) -> requests.Session:
+ 64    """Get a configured requests Session with exponential backoff.
+ 65
+ 66    Args:
+ 67        sleep_seconds: seconds to sleep before each request to avoid rate limits.
+ 68        total_retries: number of times to retry.
+ 69        backoff_factor: factor for the exponential backoff.
+ 70        retry_status_codes: list of status code that triggers a retry.
+ 71        allowed_methods: http methods that are allowed for retry.
+ 72        protocol: http:// or https://.
+ 73
+ 74    Returns
+ 75        requests.Session: the configured session.
+ 76    """
+ 77    retry_status_codes = (
+ 78        retry_status_codes
+ 79        if retry_status_codes
+ 80        else RestStatusCodes.RETRY_STATUS_CODES.value
+ 81    )
+ 82    allowed_methods = (
+ 83        allowed_methods if allowed_methods else RestMethods.ALLOWED_METHODS.value
+ 84    )
+ 85    time.sleep(sleep_seconds)
+ 86    session = requests.Session()
+ 87    retries = Retry(
+ 88        total=total_retries,
+ 89        backoff_factor=backoff_factor,
+ 90        status_forcelist=retry_status_codes,
+ 91        allowed_methods=allowed_methods,
+ 92    )
+ 93    session.mount(protocol, HTTPAdapter(max_retries=retries))
+ 94    return session
+ 95
+ 96
+ 97def execute_api_request(
+ 98    method: str,
+ 99    url: str,
+100    headers: dict = None,
+101    basic_auth_dict: dict = None,
+102    json: dict = None,
+103    files: dict = None,
+104    sleep_seconds: float = 0.2,
+105) -> requests.Response:
+106    """Execute a REST API request.
+107
+108    Args:
+109        method: REST method (e.g., POST or PUT).
+110        url: url of the api.
+111        headers: request headers.
+112        basic_auth_dict: basic http authentication details
+113            (e.g., {"username": "x", "password": "y"}).
+114        json: json payload to send in the request.
+115        files: files payload to send in the request.
+116        sleep_seconds: for how many seconds to sleep to avoid error 429.
+117
+118    Returns:
+119        response from the HTTP request.
+120    """
+121    basic_auth: requests.auth.HTTPBasicAuth = None
+122    if basic_auth_dict:
+123        basic_auth = get_basic_auth(
+124            basic_auth_dict["username"], basic_auth_dict["password"]
+125        )
+126
+127    return get_configured_session(sleep_seconds=sleep_seconds).request(
+128        method=method,
+129        url=url,
+130        headers=headers,
+131        auth=basic_auth,
+132        json=json,
+133        files=files,
+134    )
+
+ + +
+
+
+ LOG = +<Logger lakehouse_engine.utils.rest_api (DEBUG)> + + +
+ + + + +
+
+
+ DEFAULT_CONTENT_TYPE = +'application/json' + + +
+ + + + +
+
+ +
+ + class + RestMethods(enum.Enum): + + + +
+ +
17class RestMethods(Enum):
+18    """Methods for REST API calls."""
+19
+20    POST = "POST"
+21    PUT = "PUT"
+22    ALLOWED_METHODS = ["POST", "PUT"]
+
+ + +

Methods for REST API calls.

+
+ + +
+
+ POST = +<RestMethods.POST: 'POST'> + + +
+ + + + +
+
+
+ PUT = +<RestMethods.PUT: 'PUT'> + + +
+ + + + +
+
+
+ ALLOWED_METHODS = +<RestMethods.ALLOWED_METHODS: ['POST', 'PUT']> + + +
+ + + + +
+
+
Inherited Members
+
+
enum.Enum
+
name
+
value
+ +
+
+
+
+
+ +
+ + class + RestStatusCodes(enum.Enum): + + + +
+ +
25class RestStatusCodes(Enum):
+26    """REST Status Code."""
+27
+28    RETRY_STATUS_CODES = [429, 500, 502, 503, 504]
+29    OK_STATUS_CODES = [200]
+
+ + +

REST Status Code.

+
+ + +
+
+ RETRY_STATUS_CODES = +<RestStatusCodes.RETRY_STATUS_CODES: [429, 500, 502, 503, 504]> + + +
+ + + + +
+
+
+ OK_STATUS_CODES = +<RestStatusCodes.OK_STATUS_CODES: [200]> + + +
+ + + + +
+
+
Inherited Members
+
+
enum.Enum
+
name
+
value
+ +
+
+
+
+
+ +
+ + class + RESTApiException(requests.exceptions.RequestException): + + + +
+ +
32class RESTApiException(requests.RequestException):
+33    """Class representing any possible REST API Exception."""
+34
+35    def __init__(self, message: str) -> None:
+36        """Construct RESTApiException instances.
+37
+38        Args:
+39            message: message to display on exception event.
+40        """
+41        super().__init__(message)
+
+ + +

Class representing any possible REST API Exception.

+
+ + +
+ +
+ + RESTApiException(message: str) + + + +
+ +
35    def __init__(self, message: str) -> None:
+36        """Construct RESTApiException instances.
+37
+38        Args:
+39            message: message to display on exception event.
+40        """
+41        super().__init__(message)
+
+ + +

Construct RESTApiException instances.

+ +
Arguments:
+ +
    +
  • message: message to display on exception event.
  • +
+
+ + +
+
+
Inherited Members
+
+
requests.exceptions.RequestException
+
response
+
request
+ +
+
builtins.OSError
+
errno
+
strerror
+
filename
+
filename2
+
characters_written
+ +
+
builtins.BaseException
+
with_traceback
+
args
+ +
+
+
+
+
+ +
+ + def + get_basic_auth(username: str, password: str) -> requests.auth.HTTPBasicAuth: + + + +
+ +
44def get_basic_auth(username: str, password: str) -> requests.auth.HTTPBasicAuth:
+45    """Get the basic authentication object to authenticate REST requests.
+46
+47    Args:
+48        username: username.
+49        password: password.
+50
+51    Returns:
+52        requests.auth.HTTPBasicAuth: the HTTPBasicAuth object.
+53    """
+54    return requests.auth.HTTPBasicAuth(username, password)
+
+ + +

Get the basic authentication object to authenticate REST requests.

+ +
Arguments:
+ +
    +
  • username: username.
  • +
  • password: password.
  • +
+ +
Returns:
+ +
+

requests.auth.HTTPBasicAuth: the HTTPBasicAuth object.

+
+
+ + +
+
+ +
+ + def + get_configured_session( sleep_seconds: float = 0.2, total_retries: int = 5, backoff_factor: int = 2, retry_status_codes: list = None, allowed_methods: list = None, protocol: str = 'https://') -> requests.sessions.Session: + + + +
+ +
57def get_configured_session(
+58    sleep_seconds: float = 0.2,
+59    total_retries: int = 5,
+60    backoff_factor: int = 2,
+61    retry_status_codes: list = None,
+62    allowed_methods: list = None,
+63    protocol: str = "https://",
+64) -> requests.Session:
+65    """Get a configured requests Session with exponential backoff.
+66
+67    Args:
+68        sleep_seconds: seconds to sleep before each request to avoid rate limits.
+69        total_retries: number of times to retry.
+70        backoff_factor: factor for the exponential backoff.
+71        retry_status_codes: list of status code that triggers a retry.
+72        allowed_methods: http methods that are allowed for retry.
+73        protocol: http:// or https://.
+74
+75    Returns
+76        requests.Session: the configured session.
+77    """
+78    retry_status_codes = (
+79        retry_status_codes
+80        if retry_status_codes
+81        else RestStatusCodes.RETRY_STATUS_CODES.value
+82    )
+83    allowed_methods = (
+84        allowed_methods if allowed_methods else RestMethods.ALLOWED_METHODS.value
+85    )
+86    time.sleep(sleep_seconds)
+87    session = requests.Session()
+88    retries = Retry(
+89        total=total_retries,
+90        backoff_factor=backoff_factor,
+91        status_forcelist=retry_status_codes,
+92        allowed_methods=allowed_methods,
+93    )
+94    session.mount(protocol, HTTPAdapter(max_retries=retries))
+95    return session
+
+ + +

Get a configured requests Session with exponential backoff.

+ +
Arguments:
+ +
    +
  • sleep_seconds: seconds to sleep before each request to avoid rate limits.
  • +
  • total_retries: number of times to retry.
  • +
  • backoff_factor: factor for the exponential backoff.
  • +
  • retry_status_codes: list of status code that triggers a retry.
  • +
  • allowed_methods: http methods that are allowed for retry.
  • +
  • protocol: http:// or https://.
  • +
+ +

Returns + requests.Session: the configured session.

+
+ + +
+
+ +
+ + def + execute_api_request( method: str, url: str, headers: dict = None, basic_auth_dict: dict = None, json: dict = None, files: dict = None, sleep_seconds: float = 0.2) -> requests.models.Response: + + + +
+ +
 98def execute_api_request(
+ 99    method: str,
+100    url: str,
+101    headers: dict = None,
+102    basic_auth_dict: dict = None,
+103    json: dict = None,
+104    files: dict = None,
+105    sleep_seconds: float = 0.2,
+106) -> requests.Response:
+107    """Execute a REST API request.
+108
+109    Args:
+110        method: REST method (e.g., POST or PUT).
+111        url: url of the api.
+112        headers: request headers.
+113        basic_auth_dict: basic http authentication details
+114            (e.g., {"username": "x", "password": "y"}).
+115        json: json payload to send in the request.
+116        files: files payload to send in the request.
+117        sleep_seconds: for how many seconds to sleep to avoid error 429.
+118
+119    Returns:
+120        response from the HTTP request.
+121    """
+122    basic_auth: requests.auth.HTTPBasicAuth = None
+123    if basic_auth_dict:
+124        basic_auth = get_basic_auth(
+125            basic_auth_dict["username"], basic_auth_dict["password"]
+126        )
+127
+128    return get_configured_session(sleep_seconds=sleep_seconds).request(
+129        method=method,
+130        url=url,
+131        headers=headers,
+132        auth=basic_auth,
+133        json=json,
+134        files=files,
+135    )
+
+ + +

Execute a REST API request.

+ +
Arguments:
+ +
    +
  • method: REST method (e.g., POST or PUT).
  • +
  • url: url of the api.
  • +
  • headers: request headers.
  • +
  • basic_auth_dict: basic http authentication details +(e.g., {"username": "x", "password": "y"}).
  • +
  • json: json payload to send in the request.
  • +
  • files: files payload to send in the request.
  • +
  • sleep_seconds: for how many seconds to sleep to avoid error 429.
  • +
+ +
Returns:
+ +
+

response from the HTTP request.

+
+
+ + +
+
+ + \ No newline at end of file diff --git a/lakehouse_engine/utils/schema_utils.html b/lakehouse_engine/utils/schema_utils.html index 77656be..9620054 100644 --- a/lakehouse_engine/utils/schema_utils.html +++ b/lakehouse_engine/utils/schema_utils.html @@ -3,14 +3,14 @@ - + lakehouse_engine.utils.schema_utils - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + + + + + + + + + +
+
+ +

Write to REST API

+ +

REST API writer is an interesting feature to send data from Spark to a REST API within the data pipeline context. It uses the Python requests library to execute the REST calls.

+ +

It is possible to configure a few aspects of the writer, like if the payload should be sent via JSON body or via file, or configure additional JSON body parameters to add to the payload generated via Spark.

+ +

In the current implementation of the writer, each row will generate a request to the API, so it is important that you prepare your dataframe accordingly (check example below).

+ +

Silver Dummy Sales Write to REST API Example

+ +

In this template we will cover the Dummy Sales write to a REST API. An ACON is used to read from bronze, apply silver transformations to prepare the REST api payload and write to the API through the following steps:

+ +
    +
  1. Definition of how to read data (input data location, read type and data format);
  2. +
  3. Transformation of the data so that we form a payload column per each row. +Important Note: In the current implementation of the writer, each row will generate a request to the API, so create_payload is a lakehouse engine custom transformer function that creates a JSON string with the payload to be sent to the API. The column name should be exactly "payload", so that the lakehouse engine further processes that column accordingly, in order to correctly write the data to the REST API.
  4. +
  5. Definition of how to write to a REST api (url, authentication, payload format configuration, ...);
  6. +
+ +

For this, the ACON specs are :

+ +
    +
  • input_specs (MANDATORY): specify how to read data;
  • +
  • transform specs (MANDATORY): specify how to transform data to prepare the payload;
  • +
  • output_specs (MANDATORY): specify how to write data to the target.
  • +
+ +
+
from lakehouse_engine.engine import load_data
+
+def create_payload(df: DataFrame) -> DataFrame:
+    payload_df = payload_df.withColumn(
+        "payload",
+        lit('{"just a dummy key": "just a dummy value"}')
+    )
+
+    return payload_df
+
+acon = {
+    "input_specs": [
+        {
+            "spec_id": "dummy_sales_bronze",
+            "read_type": "streaming",
+            "data_format": "delta",
+            "location": "s3://my_data_product_bucket/bronze/dummy_sales",
+        }
+    ],
+    "transform_specs": [
+        {
+            "spec_id": "dummy_sales_transform",
+            "input_id": "dummy_sales_bronze",
+            "transformers": [
+                {
+                    "function": "custom_transformation",
+                    "args": {
+                        "custom_transformer": create_payload,
+                    },
+                }
+            ],
+        },
+    ],
+    "output_specs": [
+        { 
+            "spec_id": "data_to_send_to_api",
+            "input_id": "dummy_sales_transform",
+            "data_format": "rest_api",
+            "options": {
+                "rest_api_url": "https://foo.bar.com",
+                "rest_api_method": "post",
+                "rest_api_basic_auth": {
+                    "username": "...",
+                    "password": "...",
+                },
+                "rest_api_is_file_payload": False, # True if payload is to be sent via JSON file instead of JSON body (application/json)
+                "rest_api_file_payload_name": "custom_file", # this is the name of the file to be sent in cases where the payload uses file uploads rather than JSON body.
+                "rest_api_extra_json_payload": {"x": "y"}
+            }
+        }
+    ],
+}
+
+load_data(acon=acon)
+
+
+
+ +
+
+ + \ No newline at end of file diff --git a/lakehouse_engine_usage/data_quality.html b/lakehouse_engine_usage/data_quality.html index a92e0af..40004ae 100644 --- a/lakehouse_engine_usage/data_quality.html +++ b/lakehouse_engine_usage/data_quality.html @@ -3,14 +3,14 @@ - + lakehouse_engine_usage.data_quality - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - +