diff --git a/.github/codecov.yml b/.github/codecov.yml new file mode 100644 index 0000000..e69de29 diff --git a/.github/workflows/ci-terraglue.yml b/.github/workflows/ci-terraglue.yml index 7ca8117..5fa0c1c 100644 --- a/.github/workflows/ci-terraglue.yml +++ b/.github/workflows/ci-terraglue.yml @@ -4,6 +4,7 @@ on: push: branches: - develop + - feature** pull_request: branches: - develop @@ -65,11 +66,13 @@ jobs: -u root -v ${{ github.workspace }}:/home/glue_user/workspace/terraglue -e DISABLE_SSL=true + -e AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} + -e AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} + -e AWS_REGION=sa-east-1 --rm -p 4040:4040 -p 18080:18080 --name terraglue run: | - export AWS_REGION=$(AWS_REGION) cd terraglue python3 -m pip install --user --upgrade pip -r app/requirements-container.txt python3 -m pytest app/ -vv --color=yes --cov=./ --cov-report=xml diff --git a/README.md b/README.md index 9a008c1..aa4c39a 100644 --- a/README.md +++ b/README.md @@ -218,6 +218,10 @@ Todos são muito bem vindos a contribuírem com evoluções e novas funcionalida - [Eduardo Mendes - Live de Python #168 - Pytest Fixtures](https://www.youtube.com/watch?v=sidi9Z_IkLU&t) - [Databricks - Data + AI Summit 2022 - Learn to Efficiently Test ETL Pipelines](https://www.youtube.com/watch?v=uzVewG8M6r0&t=1127s) - [Real Python - Getting Started with Testing in Python](https://realpython.com/python-testing/) +- [Inspired Python - Five Advanced Pytest Fixture Patterns](https://www.inspiredpython.com/article/five-advanced-pytest-fixture-patterns) +- [getmoto/moto - mock inputs](https://github.com/getmoto/moto/blob/master/tests/test_glue/fixtures/datacatalog.py) +- [Codecov - Do test files belong in code coverage calculations?](https://about.codecov.io/blog/should-i-include-test-files-in-code-coverage-calculations/) +- [Jenkins Issue: Endpoint does not contain a valid host name](https://issues.jenkins.io/browse/JENKINS-63177) **_Outros_** - [Differences between System of Record and Source of Truth](https://www.linkedin.com/pulse/difference-between-system-record-source-truth-santosh-kudva/) diff --git a/app/pytest.ini b/app/pytest.ini index 002519f..1f65732 100644 --- a/app/pytest.ini +++ b/app/pytest.ini @@ -7,6 +7,8 @@ markers = terraform: testes relacionados à declarações do usuário em arquivos Terraform usados para criação de recursos na nuvem job_manager: testes relacionados à classe GlueJobManager responsável por fornecer todos os insumos necessários para a execução de um job Glue na AWS etl_manager: testes relacionados à classe GlueETLManager responsável por fornecer métodos de transformação de dados e operações gerais utilizando o Glue + generate_dynamicframe_dict: testes relacionados ao método generate_dynamicframe_dict da classe GlueETLManager + generate_dataframe_dict: testes relacionados ao método generate_dataframe_dict da classe GlueETLManager date_attributes_extraction: testes relacionados ao método date_attributes_extraction da classe GlueETLManager add_partition: testes relacionados ao método add_partition da classe GlueETLManager repartition_dataframe: testes relacionados ao método repartition_dataframe da classe GlueETLManager diff --git a/app/requirements-container.txt b/app/requirements-container.txt index 85388d3..1ef8166 100644 --- a/app/requirements-container.txt +++ b/app/requirements-container.txt @@ -1,5 +1,9 @@ +setuptools +boto3 flake8 pydocstyle pytest>=7.2.0 pytest-cov +pyparsing Faker +moto diff --git a/app/requirements-dev.txt b/app/requirements-dev.txt index 05c2735..07e71ae 100644 --- a/app/requirements-dev.txt +++ b/app/requirements-dev.txt @@ -1,6 +1,10 @@ pyspark +setuptools +boto3 flake8 pydocstyle pytest>=7.2.0 pytest-cov +pyparsing Faker +moto diff --git a/app/src/main.py b/app/src/main.py index 49b3d7c..398485d 100644 --- a/app/src/main.py +++ b/app/src/main.py @@ -16,9 +16,11 @@ # Bibliotecas utilizadas na construção do módulo from datetime import datetime + from pyspark.sql import DataFrame from pyspark.sql.functions import col, count, avg, sum,\ round, countDistinct, max, expr + from terraglue import GlueETLManager, log_config diff --git a/app/src/terraglue.py b/app/src/terraglue.py index 5412a67..b0136cf 100644 --- a/app/src/terraglue.py +++ b/app/src/terraglue.py @@ -39,9 +39,11 @@ import sys import logging from time import sleep + from pyspark.context import SparkContext from pyspark.sql import DataFrame from pyspark.sql.functions import lit, expr + from awsglue.utils import getResolvedOptions from awsglue.context import GlueContext from awsglue.job import Job @@ -218,14 +220,9 @@ def get_context_and_session(self) -> None: para os elementos SparkContext, GlueContext e SparkSession. """ logger.info("Criando SparkContext, GlueContext e SparkSession") - try: - self.sc = SparkContext.getOrCreate() - self.glueContext = GlueContext(self.sc) - self.spark = self.glueContext.spark_session - except Exception as e: - logger.error("Erro ao criar elementos de contexto e sessão " - f"da aplicação. Exception: {e}") - raise e + self.sc = SparkContext.getOrCreate() + self.glueContext = GlueContext(self.sc) + self.spark = self.glueContext.spark_session def init_job(self) -> Job: """ @@ -250,13 +247,10 @@ def init_job(self) -> Job: self.get_context_and_session() # Inicializando objeto de Job do Glue - try: - job = Job(self.glueContext) - job.init(self.args['JOB_NAME'], self.args) - return job - except Exception as e: - logger.error(f"Erro ao inicializar job do Glue. Exception: {e}") - raise e + job = Job(self.glueContext) + job.init(self.args['JOB_NAME'], self.args) + + return job # Classe para o gerenciamento de transformações Spark em um job @@ -467,16 +461,12 @@ def generate_dynamic_frames_dict(self) -> dict: logger.info("Mapeando DynamicFrames às chaves do dicionário") sleep(0.01) - try: - # Criando dicionário de Dynamic Frames - dynamic_dict = {k: dyf for k, dyf - in zip(self.data_dict.keys(), dynamic_frames)} - logger.info("Dados gerados com sucesso. Total de DynamicFrames: " - f"{len(dynamic_dict.values())}") - except Exception as e: - logger.error("Erro ao mapear DynamicFrames às chaves do " - f"dicionário de dados fornecido. Exception: {e}") - raise e + + # Criando dicionário de Dynamic Frames + dynamic_dict = {k: dyf for k, dyf + in zip(self.data_dict.keys(), dynamic_frames)} + logger.info("Dados gerados com sucesso. Total de DynamicFrames: " + f"{len(dynamic_dict.values())}") # Retornando dicionário de DynamicFrames sleep(0.01) @@ -688,6 +678,7 @@ def date_attributes_extraction(df: DataFrame, """ try: # Criando expressões de conversão com base no tipo do campo + date_col_type = date_col_type.strip().lower() if convert_string_to_date: if date_col_type == "date": conversion_expr = f"to_date({date_col},\ @@ -728,7 +719,7 @@ def date_attributes_extraction(df: DataFrame, return df except Exception as e: - logger.error('Erro ao adicionar coluns em DataFrame com' + logger.error('Erro ao adicionar colunas em DataFrame com' f'novos atributos de data. Exception: {e}') raise e diff --git a/app/tests/configs/inputs.py b/app/tests/configs/inputs.py new file mode 100644 index 0000000..616a32f --- /dev/null +++ b/app/tests/configs/inputs.py @@ -0,0 +1,152 @@ +""" +SCRIPT: configs/inputs.py + +CONTEXTO E OBJETIVO: +-------------------- +Arquivo de configuração de parâmetros e variáveis +utilizadas nos testes. O usuário deve atentar-se a todas +as configurações e declarações de variáveis aqui +realizadas para que os testes unitários possam ser +executados de maneira adequada. +---------------------------------------------------""" + +from pyspark.sql.types import StructType, StructField,\ + IntegerType, StringType, BooleanType, DecimalType,\ + DateType, TimestampType + + +"""--------------------------------------------------- +------ 1. DEFININDO PARÂMETROS DE CONFIGURAÇÃO ------- + 1.1 Argumentos do job e entradas do usuário +---------------------------------------------------""" + +# Argumentos obrigatórios do job a serem validados +JOB_REQUIRED_ARGS = [ + "JOB_NAME", + "OUTPUT_BUCKET", + "OUTPUT_DB", + "OUTPUT_TABLE", + "CONNECTION_TYPE", + "UPDATE_BEHAVIOR", + "DATA_FORMAT", + "COMPRESSION", + "ENABLE_UPDATE_CATALOG" +] + +# Nome da variável terraform onde os demais parâmetros são declarados +TF_VAR_NAME_JOB_ARGS = "glue_job_user_arguments" + +# Lista de argumentos do job definidos em tempo de execução +JOB_RUNTIME_ARGS = ["JOB_NAME", "OUTPUT_BUCKET"] + +# Lista de chaves obrigatórias da variável DATA_DICT em main.py +DATA_DICT_REQUIRED_KEYS = ["database", "table_name", "transformation_ctx"] + + +"""--------------------------------------------------- +------ 2. DEFININDO PARÂMETROS DE CONFIGURAÇÃO ------- + 2.2 Parâmetros para mock de DataFrame Spark +---------------------------------------------------""" + +# Schema para criação de DataFrame fictício Spark +FAKE_DATAFRAME_SCHEMA = StructType([ + StructField("id", StringType()), + StructField("value", IntegerType()), + StructField("decimal", DecimalType()), + StructField("boolean", BooleanType()), + StructField("date", DateType()), + StructField("timestamp", TimestampType()) +]) + + +"""--------------------------------------------------- +------ 2. DEFININDO PARÂMETROS DE CONFIGURAÇÃO ------- + 2.3 Parâmetros para mock no Data Catalog +---------------------------------------------------""" + +# Input para mock de banco de dados +FAKE_CATALOG_DATABASE_INPUT = { + "Name": "db_fake", + "Description": "a fake database", + "LocationUri": "s3://bucket-fake/db_fake", + "Parameters": {}, + "CreateTableDefaultPermissions": [ + { + "Principal": {"DataLakePrincipalIdentifier": "a_fake_owner"}, + "Permissions": ["ALL"], + }, + ], +} + +# Input para mock de tabelas no catálogo +FAKE_CATALOG_TABLE_INPUT = { + "Name": "tbl_fake", + "Description": "Entrada para tabela db_fake.tbl_fake contendo " + "metadados mockados para uso em testes unitários", + "Retention": 0, + "StorageDescriptor": { + "Columns": [ + { + "Name": "fake_col_1", + "Type": "string", + "Comment": "", + "Parameters": {} + }, + { + "Name": "fake_col_2", + "Type": "string", + "Comment": "", + "Parameters": {} + }, + { + "Name": "fake_col_3", + "Type": "string", + "Comment": "", + "Parameters": {} + }, + { + "Name": "fake_col_4", + "Type": "string", + "Comment": "", + "Parameters": {} + }, + { + "Name": "fake_col_5", + "Type": "string", + "Comment": "", + "Parameters": {} + } + ], + "Location": "s3://bucket-fake/db_fake/tbl_fake", + "InputFormat": "org.apache.hadoop.hive.ql.io.parquet" + ".MapredParquetInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.parquet." + "MapredParquetOutputFormat", + "Compressed": False, + "NumberOfBuckets": 0, + "SerdeInfo": { + "Name": "main-stream", + "SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet." + "serde.ParquetHiveSerDe", + }, + "BucketColumns": [], + "SortColumns": [], + "Parameters": {}, + "StoredAsSubDirectories": False + }, + "PartitionKeys": [], + "TableType": "EXTERNAL_TABLE", + "Parameters": { + "EXTERNAL": "TRUE" + } +} + +# Simulação de data_dict com recursos mockados +FAKE_DATA_DICT = { + "fake": { + "database": "db_fake", + "table_name": "tbl_fake", + "transformation_ctx": "dyf_fake", + "create_temp_view": False + } +} diff --git a/app/tests/conftest.py b/app/tests/conftest.py index f2c1552..c8c904d 100644 --- a/app/tests/conftest.py +++ b/app/tests/conftest.py @@ -21,15 +21,22 @@ # Importando módulos para uso import sys import os -from pytest import fixture + +import pytest +from faker import Faker + +from pyspark.sql import SparkSession +from pyspark.sql.types import StructType + from src.main import ARGV_LIST, DATA_DICT, GlueTransformationManager from src.terraglue import GlueJobManager, GlueETLManager -from pyspark.sql import SparkSession -from pyspark.sql.types import StructType, StructField, StringType,\ - IntegerType, DateType, TimestampType -from tests.utils.iac_helper import extract_map_variable_from_ferraform -from tests.utils.spark_helper import generate_fake_spark_dataframe -from faker import Faker + +from tests.helpers.iac_helper import extract_map_variable_from_ferraform +from tests.helpers.spark_helper import generate_fake_spark_dataframe + +from tests.configs.inputs import JOB_REQUIRED_ARGS, TF_VAR_NAME_JOB_ARGS,\ + JOB_RUNTIME_ARGS, DATA_DICT_REQUIRED_KEYS, FAKE_DATAFRAME_SCHEMA + # Instanciando objeto Faker faker = Faker() @@ -43,55 +50,41 @@ # Variável ARGV_LIST definida no código da aplicação -@fixture() +@pytest.fixture() def argv_list() -> list: return ARGV_LIST # Lista drgumentos obrigatórios definidos pelo usuário -@fixture() +@pytest.fixture() def user_required_args() -> list: - return [ - "JOB_NAME", - "OUTPUT_BUCKET", - "OUTPUT_DB", - "OUTPUT_TABLE", - "CONNECTION_TYPE", - "UPDATE_BEHAVIOR", - "DATA_FORMAT", - "COMPRESSION", - "ENABLE_UPDATE_CATALOG" - ] + return JOB_REQUIRED_ARGS # Dicionário de argumentos definidos pelo usuário no Terraform -@fixture() +@pytest.fixture() def iac_job_user_args() -> dict: return extract_map_variable_from_ferraform( - var_name="glue_job_user_arguments" + var_name=TF_VAR_NAME_JOB_ARGS ) # Lista de argumentos do job criados apenas em tempo de execução -@fixture() +@pytest.fixture() def job_runtime_args() -> list: - return ["JOB_NAME", "OUTPUT_BUCKET"] + return JOB_RUNTIME_ARGS # Variável DATA_DICT definida no código da aplicação -@fixture() +@pytest.fixture() def data_dict() -> dict: return DATA_DICT # Lista de chaves obrigatórias da variável DATA_DICT -@fixture() +@pytest.fixture() def required_data_dict_keys() -> list: - return [ - "database", - "table_name", - "transformation_ctx" - ] + return DATA_DICT_REQUIRED_KEYS """--------------------------------------------------- @@ -100,8 +93,8 @@ def required_data_dict_keys() -> list: ---------------------------------------------------""" -# Argumentos do job utilizado para instância de classe para testes -@fixture() +# Argumentos do job utilizados para instância de classe para testes +@pytest.fixture() def job_args_for_testing(iac_job_user_args, job_runtime_args) -> dict: # Simulando argumentos obtidos apenas em tempo de execução iac_job_user_args_complete = dict(iac_job_user_args) @@ -112,9 +105,9 @@ def job_args_for_testing(iac_job_user_args, job_runtime_args) -> dict: # Objeto instanciado da classe GlueJobManager -@fixture() +@pytest.fixture() def job_manager(job_args_for_testing) -> GlueJobManager: - # Adicionando argumentos ao vetor de argumentos + # Adicionando argumentos ao vetor de argumentos do sistema for arg_name, arg_value in job_args_for_testing.items(): sys.argv.append(f"--{arg_name}={arg_value}") @@ -133,18 +126,19 @@ def job_manager(job_args_for_testing) -> GlueJobManager: # Objeto de sessão Spark para uso genérico -@fixture() +@pytest.fixture() def spark(): return SparkSession.builder.getOrCreate() # Objeto instanciado da classe GlueETLManager -@fixture() +@pytest.fixture() def etl_manager(job_args_for_testing): # Adicionando argumentos ao vetor de argumentos for arg_name, arg_value in job_args_for_testing.items(): sys.argv.append(f"--{arg_name}={arg_value}") + # Instanciando objeto etl_manager = GlueETLManager( argv_list=ARGV_LIST, data_dict=DATA_DICT @@ -153,21 +147,26 @@ def etl_manager(job_args_for_testing): return etl_manager +# Resultado da execução do método generate_dynamic_frames_dict +@pytest.fixture() +def dyf_dict(etl_manager): + etl_manager.init_job() + return etl_manager.generate_dynamic_frames_dict() + + +# Resultado da execução do método generate_dynamic_frames_dict +@pytest.fixture() +def df_dict(etl_manager): + etl_manager.init_job() + return etl_manager.generate_dataframes_dict() + + # DataFrame fake para testagem de transformações Spark -@fixture() +@pytest.fixture() def fake_dataframe(spark): - # Definindo schema para DataFrame fictício - schema = StructType([ - StructField("id", StringType()), - StructField("value", IntegerType()), - StructField("date", DateType()), - StructField("timestamp", TimestampType()) - ]) - - # Gerando DataFrame fictício return generate_fake_spark_dataframe( spark=spark, - schema_input=schema + schema_input=FAKE_DATAFRAME_SCHEMA ) @@ -178,7 +177,7 @@ def fake_dataframe(spark): # Objeto instanciado da classe GlueETLManager -@fixture() +@pytest.fixture() def glue_manager(job_args_for_testing): # Adicionando argumentos ao vetor de argumentos for arg_name, arg_value in job_args_for_testing.items(): @@ -193,7 +192,7 @@ def glue_manager(job_args_for_testing): # DataFrame vazio para teste de exceção dos métodos de transformação -@fixture() +@pytest.fixture() def empty_df(spark): # Criando RDD e schema vazios empty_rdd = spark.sparkContext.emptyRDD() @@ -204,7 +203,7 @@ def empty_df(spark): # Amostra de DataFrame df_orders -@fixture() +@pytest.fixture() def df_orders(spark): # Definindo variável para leitura do DataFrame filename = "sample_olist_orders_dataset.csv" @@ -223,13 +222,13 @@ def df_orders(spark): # Resultado do método de transformação df_orders_prep -@fixture() +@pytest.fixture() def df_orders_prep(glue_manager, df_orders): return glue_manager.transform_orders(df_orders) # Amostra de DataFrame df_order_items -@fixture() +@pytest.fixture() def df_order_items(spark): # Definindo variável para leitura do DataFrame filename = "sample_olist_order_items_dataset.csv" @@ -248,13 +247,13 @@ def df_order_items(spark): # Resultado do método de transformação df_order_items_prep -@fixture() +@pytest.fixture() def df_order_items_prep(glue_manager, df_order_items): return glue_manager.transform_order_items(df_order_items) # Amostra de DataFrame df_customers -@fixture() +@pytest.fixture() def df_customers(spark): # Definindo variável para leitura do DataFrame filename = "sample_olist_customers_dataset.csv" @@ -273,13 +272,13 @@ def df_customers(spark): # Resultado do método de transformação df_customers_prep -@fixture() +@pytest.fixture() def df_customers_prep(glue_manager, df_customers): return glue_manager.transform_customers(df_customers) # Amostra de DataFrame df_payments -@fixture() +@pytest.fixture() def df_payments(spark): # Definindo variável para leitura do DataFrame filename = "sample_olist_order_payments_dataset.csv" @@ -298,13 +297,13 @@ def df_payments(spark): # Resultado do método de transformação df_payments_prep -@fixture() +@pytest.fixture() def df_payments_prep(glue_manager, df_payments): return glue_manager.transform_payments(df_payments) # Amostra de DataFrame df_reviews -@fixture() +@pytest.fixture() def df_reviews(spark): # Definindo variável para leitura do DataFrame filename = "sample_olist_order_reviews_dataset.csv" @@ -323,13 +322,13 @@ def df_reviews(spark): # Resultado do método de transformação df_reviews_prep -@fixture() +@pytest.fixture() def df_reviews_prep(glue_manager, df_reviews): return glue_manager.transform_reviews(df_reviews) # Resultado do método de transformação df_sot_prep -@fixture() +@pytest.fixture() def df_sot_prep(glue_manager, df_orders_prep, df_order_items_prep, df_customers_prep, df_payments_prep, df_reviews_prep): diff --git a/app/tests/utils/iac_helper.py b/app/tests/helpers/iac_helper.py similarity index 99% rename from app/tests/utils/iac_helper.py rename to app/tests/helpers/iac_helper.py index f3e585f..fbbad25 100644 --- a/app/tests/utils/iac_helper.py +++ b/app/tests/helpers/iac_helper.py @@ -1,5 +1,5 @@ """ -MÓDULO: terraform_helper.py +MÓDULO: iac_helper.py CONTEXTO: --------- diff --git a/app/tests/utils/spark_helper.py b/app/tests/helpers/spark_helper.py similarity index 98% rename from app/tests/utils/spark_helper.py rename to app/tests/helpers/spark_helper.py index 7f6238e..3b17c14 100644 --- a/app/tests/utils/spark_helper.py +++ b/app/tests/helpers/spark_helper.py @@ -48,11 +48,6 @@ ---------------------------------------------------""" -# Criando e retornando objeto de sessão Spark -def create_spark_session() -> SparkSession: - return SparkSession.builder.getOrCreate() - - # Gerando schema Spark através de dicionário de metadados def generate_schema_from_dict(schema_dict: dict, nullable: bool = True) -> StructType(): diff --git a/app/tests/test_01_user_inputs.py b/app/tests/test_01_user_inputs.py index 5be1642..81dad80 100644 --- a/app/tests/test_01_user_inputs.py +++ b/app/tests/test_01_user_inputs.py @@ -20,7 +20,7 @@ ---------------------------------------------------""" # Importando módulos para uso -from pytest import mark +import pytest """--------------------------------------------------- @@ -29,9 +29,9 @@ ---------------------------------------------------""" -@mark.user_input -@mark.argv_list -def test_variavel_argv_list_definida_como_uma_lista(argv_list): +@pytest.mark.user_input +@pytest.mark.argv_list +def test_variavel_argvlist_definida_como_uma_lista(argv_list): """ G: dado que o usuário iniciou a codificação do seu job Glue W: quando a variável ARGV_LIST for definida/adaptada pelo usuário @@ -40,8 +40,8 @@ def test_variavel_argv_list_definida_como_uma_lista(argv_list): assert type(argv_list) == list -@mark.user_input -@mark.argv_list +@pytest.mark.user_input +@pytest.mark.argv_list def test_variavel_argvlist_possui_argumentos_obrigatorios(argv_list, user_required_args): """ @@ -52,9 +52,9 @@ def test_variavel_argvlist_possui_argumentos_obrigatorios(argv_list, assert all(arg in argv_list for arg in user_required_args) -@mark.user_input -@mark.argv_list -@mark.terraform +@pytest.mark.user_input +@pytest.mark.argv_list +@pytest.mark.terraform def test_variavel_argvlist_possui_argumentos_declarados_no_terraform( argv_list, iac_job_user_args, job_runtime_args ): @@ -86,9 +86,9 @@ def test_variavel_argvlist_possui_argumentos_declarados_no_terraform( assert set(argv_list_custom) == set(tf_args) -@mark.user_input -@mark.data_dict -def test_tipo_primitivo_variavel_datadict(data_dict): +@pytest.mark.user_input +@pytest.mark.data_dict +def test_variavel_datadict_definida_como_um_dicionario(data_dict): """ G: dado que o usuário iniciou a codificação do seu job Glue W: quando a variável DATA_DICT for definida/adaptada pelo usuário @@ -97,8 +97,8 @@ def test_tipo_primitivo_variavel_datadict(data_dict): assert type(data_dict) == dict -@mark.user_input -@mark.data_dict +@pytest.mark.user_input +@pytest.mark.data_dict def test_variavel_datadict_possui_chaves_obrigatorias(data_dict, required_data_dict_keys): """ diff --git a/app/tests/test_02_job_manager.py b/app/tests/test_02_job_manager.py index 15e2156..863d785 100644 --- a/app/tests/test_02_job_manager.py +++ b/app/tests/test_02_job_manager.py @@ -22,10 +22,12 @@ ---------------------------------------------------""" # Importando módulos para uso -from pytest import mark +import pytest + from pyspark.context import SparkContext -from awsglue.context import GlueContext from pyspark.sql import SparkSession + +from awsglue.context import GlueContext from awsglue.job import Job @@ -35,22 +37,37 @@ ---------------------------------------------------""" -@mark.job_manager +@pytest.mark.job_manager def test_captura_de_argumentos_do_job_na_construcao_da_classe( job_manager, job_args_for_testing ): """ - G: dado que o usuário instanciou um objeto da classe Job Manager + G: dado que o usuário instanciou um objeto da classe GlueJobManager W: quando o método construtor da classe for executado T: os argumentos passados pelo usuário no ato de criação de objeto devem estar contidos no dicionário de argumentos gerais do job - obtidos, na classe, a partir do método awsglue.utilsgetResolverOptions """ assert job_args_for_testing.items() <= job_manager.args.items() -@mark.job_manager +@pytest.mark.job_manager +def test_erro_ao_executar_metodo_print_args(job_manager): + """ + G: dado que o usuário inicializou o objeto job_manager com + algum argumento de sistema nulo + W: quando o método print_args() for executado + T: então uma exceção deve ser lançada + """ + # Modificando argumentos do sistema salvos no objeto da classe + job_manager.args = None + + with pytest.raises(Exception): + # Chamando método para print de argumentos no log + job_manager.print_args() + + +@pytest.mark.job_manager def test_obtencao_de_elementos_de_contexto_e_sessao(job_manager): """ G: dado que o usuário deseja obter os elementos de contexto e sessão no Job @@ -58,7 +75,6 @@ def test_obtencao_de_elementos_de_contexto_e_sessao(job_manager): T: então os atributos self.sc, self.glueContext e self.spark devem existir na classe GlueJobManager """ - # Executando método de coleta de elementos de contexto e sessão do job job_manager.get_context_and_session() @@ -66,11 +82,11 @@ def test_obtencao_de_elementos_de_contexto_e_sessao(job_manager): class_attribs = job_manager.__dict__ attribs_names = ["sc", "glueContext", "spark"] - # Validando existência de atributos da classe + # Validando se a lista de atributos existem na classe assert all(a in list(class_attribs.keys()) for a in attribs_names) -@mark.job_manager +@pytest.mark.job_manager def test_tipos_primitivos_de_elementos_de_contexto_e_sessao(job_manager): """ G: dado que o usuário deseja obter os elementos de contexto e sessão no Job @@ -79,7 +95,6 @@ def test_tipos_primitivos_de_elementos_de_contexto_e_sessao(job_manager): representar elementos do tipo SparkContext, GlueContext e SparkSession, respectivamente """ - # Executando método de coleta de elementos de contexto e sessão do job job_manager.get_context_and_session() @@ -92,7 +107,7 @@ def test_tipos_primitivos_de_elementos_de_contexto_e_sessao(job_manager): assert type(class_attribs["spark"]) == SparkSession -@mark.job_manager +@pytest.mark.job_manager def test_metodo_de_inicializacao_do_job_gera_contexto_e_sessao(job_manager): """ G: dado que deseja-se inicializar um job Glue pela classe GlueJobManager @@ -100,7 +115,6 @@ def test_metodo_de_inicializacao_do_job_gera_contexto_e_sessao(job_manager): T: então os elementos de contexto e sessão (Spark e Glue) devem existir e apresentarem os tipos primitivos adequados """ - # Executando método de inicialização do job _ = job_manager.init_job() @@ -117,13 +131,11 @@ def test_metodo_de_inicializacao_do_job_gera_contexto_e_sessao(job_manager): assert type(class_attribs["spark"]) == SparkSession -@mark.job_manager +@pytest.mark.job_manager def test_metodo_de_inicializacao_do_job_retorna_tipo_job(job_manager): """ G: dado que deseja-se inicializar um job Glue pela classe GlueJobManager W: quando o método init_job() for chamado T: então o retorno deve ser um objeto do tipo awsglue.job.Job """ - - # Executando método de inicialização do job assert type(job_manager.init_job()) == Job diff --git a/app/tests/test_03_etl_manager.py b/app/tests/test_03_etl_manager.py index 1451f61..79a7c72 100644 --- a/app/tests/test_03_etl_manager.py +++ b/app/tests/test_03_etl_manager.py @@ -21,10 +21,14 @@ 1.1 Importação das bibliotecas ---------------------------------------------------""" -# Importando módulos para uso -from pytest import mark +import pytest from datetime import datetime +import copy + from pyspark.sql.types import StringType, DateType, TimestampType +from pyspark.sql import DataFrame + +from awsglue.dynamicframe import DynamicFrame """--------------------------------------------------- @@ -33,8 +37,133 @@ ---------------------------------------------------""" -@mark.etl_manager -@mark.date_attributes_extraction +@pytest.mark.etl_manager +@pytest.mark.generate_dynamicframe_dict +def test_metodo_de_geracao_de_dynamicframes_gera_dicionario( + dyf_dict +): + """ + G: dado que o usuário deseja realizar a leitura de dynamicframes + com base na variável DATA_DICT devidamente definida + W: quando o método generate_dynamic_frames_dict() for executado + T: então o objeto resultante deve ser um dicionário + """ + assert type(dyf_dict) == dict + + +@pytest.mark.etl_manager +@pytest.mark.generate_dynamicframe_dict +def test_dict_de_dyfs_possui_qtd_de_elementos_iguais_ao_dict_datadict( + dyf_dict, data_dict +): + """ + G: dado que o usuário deseja realizar a leitura de dynamicframes + com base na variável DATA_DICT devidamente definida + W: quando o método generate_dynamic_frames_dict() for executado + T: então a quantidade de elementos presente no dicionário de + dynamicframes resultante precisa ser igual à quantidade de + elementos do dicionário DATA_DICT + """ + assert len(dyf_dict) == len(data_dict) + + +@pytest.mark.etl_manager +@pytest.mark.generate_dynamicframe_dict +def test_tipo_primitivo_dos_elementos_do_dicionario_de_dynamicframes( + dyf_dict +): + """ + G: dado que o usuário deseja realizar a leitura de dynamicframes + com base na variável DATA_DICT devidamente definida + W: quando o método generate_dynamic_frames_dict() for executado + T: então os elementos que compõem o dicionário resultante precisam + ser do tipo DynamicFrame do Glue + """ + # Extraindo lista de tipos primitivos + dyfs = list(dyf_dict.values()) + assert all(type(dyf) == DynamicFrame for dyf in dyfs) + + +@pytest.mark.etl_manager +@pytest.mark.generate_dynamicframe_dict +def test_erro_ao_gerar_dynamicframes_de_tabelas_inexistentes_no_catalogo( + etl_manager +): + """ + G: dado que o usuário deseja realizar a leitura de dynamicframes + com base na variável DATA_DICT devidamente definida + W: quando o método generate_dynamic_frames_dict() for executado com + alguma inconsistência na definição da variável DATA_DICT que + indique a leitura de tabelas inexistentes no catálogo de dados ou + com alguma restrição de acesso do usuário + T: então uma exceção deve ser lançada + """ + # Copiando objeto + etl_manager_copy = copy.deepcopy(etl_manager) + + # Modificando atributo data_dict + data_dict_copy = etl_manager_copy.data_dict + data_dict_copy[list(data_dict_copy.keys())[0]]["database"] \ + = "a fake database" + + # Assimilando DATA_DICT modificado ao objeto da classe + etl_manager_copy.data_dict = data_dict_copy + etl_manager_copy.init_job() + + # Executando método de geração de DynamicFrames + with pytest.raises(Exception): + _ = etl_manager_copy.generate_dynamic_frames_dict() + + +@pytest.mark.etl_manager +@pytest.mark.generate_dataframe_dict +def test_metodo_de_geracao_de_dataframes_gera_dicionario( + df_dict +): + """ + G: dado que o usuário deseja realizar a leitura de dataframes + com base na variável DATA_DICT devidamente definida + W: quando o método generate_dataframes_dict() for executado + T: então o objeto resultante deve ser um dicionário + """ + assert type(df_dict) == dict + + +@pytest.mark.etl_manager +@pytest.mark.generate_dataframe_dict +def test_dict_de_dfs_possui_qtd_de_elementos_iguais_ao_dict_datadict( + df_dict, data_dict +): + """ + G: dado que o usuário deseja realizar a leitura de dataframes + com base na variável DATA_DICT devidamente definida + W: quando o método generate_dataframes_dict() for executado + T: então a quantidade de elementos presente no dicionário de + dataframes resultante precisa ser igual à quantidade de + elementos do dicionário DATA_DICT + """ + assert len(df_dict) == len(data_dict) + + +@pytest.mark.etl_manager +@pytest.mark.generate_dataframe_dict +def test_tipo_primitivo_dos_elementos_do_dicionario_de_dataframes( + df_dict +): + """ + G: dado que o usuário deseja realizar a leitura de dataframes + com base na variável DATA_DICT devidamente definida + W: quando o método generate_dataframes_dict() for executado + T: então os elementos que compõem o dicionário resultante precisam + ser do tipo DataFrame do Spark + """ + # Extraindo lista de tipos primitivos + dfs = list(df_dict.values()) + assert all(type(df) == DataFrame for df in dfs) + + +@pytest.mark.etl_manager +@pytest.mark.date_attributes_extraction def test_conversao_de_data_no_metodo_de_extracao_de_atributos_de_data( etl_manager, fake_dataframe, date_col="date", date_col_type="date", date_test_col_name="date_test", date_format='%Y-%m-%d' @@ -77,8 +206,8 @@ def test_conversao_de_data_no_metodo_de_extracao_de_atributos_de_data( assert dtype_pos_casting == DateType() -@mark.etl_manager -@mark.date_attributes_extraction +@pytest.mark.etl_manager +@pytest.mark.date_attributes_extraction def test_conversao_de_timestamp_no_metodo_de_extracao_de_atributos_de_data( etl_manager, fake_dataframe, date_col="timestamp", date_col_type="timestamp", date_test_col_name="timestamp_test", @@ -122,8 +251,8 @@ def test_conversao_de_timestamp_no_metodo_de_extracao_de_atributos_de_data( assert dtype_pos_casting == TimestampType() -@mark.etl_manager -@mark.date_attributes_extraction +@pytest.mark.etl_manager +@pytest.mark.date_attributes_extraction def test_adicao_de_novas_colunas_apos_extracao_de_atributos_de_data( etl_manager, fake_dataframe, date_col="date" ): @@ -160,8 +289,8 @@ def test_adicao_de_novas_colunas_apos_extracao_de_atributos_de_data( assert all(c in df_date.schema.fieldNames() for c in date_attribs_names) -@mark.etl_manager -@mark.date_attributes_extraction +@pytest.mark.etl_manager +@pytest.mark.date_attributes_extraction def test_extracao_do_ano_de_atributo_de_data( etl_manager, fake_dataframe, date_col="date" ): @@ -197,8 +326,8 @@ def test_extracao_do_ano_de_atributo_de_data( assert calculated_years[:] == expected_years[:] -@mark.etl_manager -@mark.date_attributes_extraction +@pytest.mark.etl_manager +@pytest.mark.date_attributes_extraction def test_extracao_do_mes_de_atributo_de_data( etl_manager, fake_dataframe, date_col="date" ): @@ -234,8 +363,8 @@ def test_extracao_do_mes_de_atributo_de_data( assert expected_months[:] == calculated_months[:] -@mark.etl_manager -@mark.date_attributes_extraction +@pytest.mark.etl_manager +@pytest.mark.date_attributes_extraction def test_extracao_do_dia_de_atributo_de_data( etl_manager, fake_dataframe, date_col="date" ): @@ -271,8 +400,53 @@ def test_extracao_do_dia_de_atributo_de_data( assert expected_days[:] == calculated_days[:] -@mark.etl_manager -@mark.add_partition +@pytest.mark.etl_manager +@pytest.mark.date_attributes_extraction +def test_erro_ao_fornecer_argumento_date_col_type_incorreto( + etl_manager, fake_dataframe, date_col="date" +): + """ + G: dado que o usuário deseja aplicar o método de extração de + atributos de data em uma operação de transformação de dados + W: quando o método método date_attributes_extraction() da classe + GlueETLManager for executado em uma coluna de data com o + argumento date_col_type não configurado como 'date' ou 'timestamp' + T: então uma exceção deve ser lançada + """ + with pytest.raises(Exception): + _ = etl_manager.date_attributes_extraction( + df=fake_dataframe, + date_col=date_col, + convert_string_to_date=True, + date_col_type="string" + ) + + +@pytest.mark.etl_manager +@pytest.mark.date_attributes_extraction +def test_erro_ao_extrair_atributos_de_data_em_coluna_nao_compativel( + etl_manager, fake_dataframe, date_col="bool" +): + """ + G: dado que o usuário deseja aplicar o método de extração de + atributos de data em uma operação de transformação de dados + W: quando o método método date_attributes_extraction() da classe + GlueETLManager for executado em uma coluna não compatível ou + não transformável em data e com o flag convert_string_to_date + igual a True + T: então uma exceção deve ser lançada + """ + with pytest.raises(Exception): + _ = etl_manager.date_attributes_extraction( + df=fake_dataframe, + date_col=date_col, + convert_string_to_date=True, + date_col_type="date" + ) + + +@pytest.mark.etl_manager +@pytest.mark.add_partition def test_adicao_de_coluna_de_particao_anomesdia_dataframe( etl_manager, fake_dataframe, partition_name="anomesdia", partition_value=int(datetime.now().strftime("%Y%m%d")) @@ -287,7 +461,6 @@ def test_adicao_de_coluna_de_particao_anomesdia_dataframe( de nome "anomesdia" do mesmo tipo primitivo do argumento partition_value """ - # Executando método de adição de partição em DataFrame df_partitioned = etl_manager.add_partition( df=fake_dataframe, @@ -306,8 +479,29 @@ def test_adicao_de_coluna_de_particao_anomesdia_dataframe( assert generated_partition_value == partition_value -@mark.etl_manager -@mark.repartition_dataframe +@pytest.mark.etl_manager +@pytest.mark.add_partition +def test_erro_ao_adicionar_particao_com_nome_de_particao_invalido( + etl_manager, fake_dataframe, partition_name=None, + partition_value="1" +): + """ + G: dado que o usuário deseja adicionar uma coluna de data + para servir de partição de seu DataFrame Spark + W: quando o usuário executar o método add_partition da classe + GlueETLManager com o argumento partition_name igual a None + T: então uma exceção deve ser lançada + """ + with pytest.raises(Exception): + _ = etl_manager.add_partition( + df=fake_dataframe, + partition_name=partition_name, + partition_value=partition_value + ) + + +@pytest.mark.etl_manager +@pytest.mark.repartition_dataframe def test_reparticionamento_de_dataframe_para_menos_particoes( etl_manager, fake_dataframe ): @@ -320,7 +514,6 @@ def test_reparticionamento_de_dataframe_para_menos_particoes( T: então o DataFrame resultante deverá conter o número especificado de partições """ - # Coletando informações sobre partições atuais do DataFrame current_partitions = fake_dataframe.rdd.getNumPartitions() partitions_to_set = current_partitions // 2 @@ -335,8 +528,8 @@ def test_reparticionamento_de_dataframe_para_menos_particoes( assert df_repartitioned.rdd.getNumPartitions() == partitions_to_set -@mark.etl_manager -@mark.repartition_dataframe +@pytest.mark.etl_manager +@pytest.mark.repartition_dataframe def test_reparticionamento_de_dataframe_para_mais_particoes( etl_manager, fake_dataframe ): @@ -349,7 +542,6 @@ def test_reparticionamento_de_dataframe_para_mais_particoes( T: então o DataFrame resultante deverá conter o número especificado de partições """ - # Coletando informações sobre partições atuais do DataFrame current_partitions = fake_dataframe.rdd.getNumPartitions() partitions_to_set = current_partitions * 2 @@ -362,3 +554,60 @@ def test_reparticionamento_de_dataframe_para_mais_particoes( # Validando resultado assert df_repartitioned.rdd.getNumPartitions() == partitions_to_set + + +@pytest.mark.etl_manager +@pytest.mark.repartition_dataframe +def test_reparticionamento_de_dataframe_para_mesmo_numero_de_particoes( + etl_manager, fake_dataframe +): + """ + G: dado que o usuário deseja reparticionar um DataFrame Spark + para otimização do armazenamento físico do mesmo no s3 + W: quando o usuário executar o método repartition_dataframe() + da classe GlueETLManager passando um número IGUAL de + partições físicas do que o número atual do DataFrame + T: então nenhuma operação deve ser realizada e o mesmo DataFrame + passado como input deve ser retornado com o número de + partições intacto + """ + # Coletando informações sobre partições atuais do DataFrame + current_partitions = fake_dataframe.rdd.getNumPartitions() + partitions_to_set = current_partitions + + # Repartitionando DataFrame + df_repartitioned = etl_manager.repartition_dataframe( + df=fake_dataframe, + num_partitions=partitions_to_set + ) + + # Validando resultado + assert df_repartitioned.rdd.getNumPartitions() == partitions_to_set + + +@pytest.mark.etl_manager +@pytest.mark.repartition_dataframe +def test_erro_ao_repartitionar_dataframe_com_numero_invalido_de_particoes( + etl_manager, fake_dataframe, partitions_to_set=-1 +): + """ + G: dado que o usuário deseja reparticionar um DataFrame Spark + para otimização do armazenamento físico do mesmo no s3 + W: quando o usuário executar o método repartition_dataframe() + da classe GlueETLManager passando um número negativo de + partições + T: então nenhuma operação deve ser realizada e o mesmo DataFrame + passado como input deve ser retornado com o número de + partições intacto + """ + # Coletando informações sobre partições atuais do DataFrame + current_partitions = fake_dataframe.rdd.getNumPartitions() + + # Repartitionando DataFrame + df_repartitioned = etl_manager.repartition_dataframe( + df=fake_dataframe, + num_partitions=partitions_to_set + ) + + # Validando resultado + assert df_repartitioned.rdd.getNumPartitions() == current_partitions diff --git a/app/tests/test_04_main.py b/app/tests/test_04_main.py index ad505e0..2ab5865 100644 --- a/app/tests/test_04_main.py +++ b/app/tests/test_04_main.py @@ -23,7 +23,6 @@ # Importando módulos para uso import pytest -from pytest import mark from pyspark.sql.types import StructType, StructField,\ StringType, IntegerType, DateType, TimestampType,\ LongType, DecimalType, DoubleType @@ -35,8 +34,8 @@ ---------------------------------------------------""" -@mark.main -@mark.orders +@pytest.mark.main +@pytest.mark.orders def test_qtd_linhas_resultantes_pos_transformacao_orders( df_orders, df_orders_prep ): @@ -52,8 +51,8 @@ def test_qtd_linhas_resultantes_pos_transformacao_orders( assert df_orders_prep.count() == df_orders.count() -@mark.main -@mark.orders +@pytest.mark.main +@pytest.mark.orders def test_schema_resultante_pos_transformacao_orders( df_orders_prep ): @@ -88,8 +87,8 @@ def test_schema_resultante_pos_transformacao_orders( assert df_orders_prep.schema == expected_schema -@mark.main -@mark.orders +@pytest.mark.main +@pytest.mark.orders def test_erro_criacao_de_dag_transformacao_orders( glue_manager, empty_df ): @@ -108,8 +107,8 @@ def test_erro_criacao_de_dag_transformacao_orders( _ = glue_manager.transform_orders(empty_df) -@mark.main -@mark.order_items +@pytest.mark.main +@pytest.mark.order_items def test_qtd_linhas_resultantes_pos_transformacao_order_items( df_order_items_prep ): @@ -125,8 +124,8 @@ def test_qtd_linhas_resultantes_pos_transformacao_order_items( assert df_order_items_prep.count() == 10 -@mark.main -@mark.order_items +@pytest.mark.main +@pytest.mark.order_items def test_nao_duplicidade_de_order_id_pos_transformacao_order_items( df_order_items_prep ): @@ -146,8 +145,8 @@ def test_nao_duplicidade_de_order_id_pos_transformacao_order_items( assert lines_distinct == lines -@mark.main -@mark.order_items +@pytest.mark.main +@pytest.mark.order_items def test_schema_resultante_pos_transformacao_order_items( df_order_items_prep ): @@ -175,8 +174,8 @@ def test_schema_resultante_pos_transformacao_order_items( assert df_order_items_prep.schema == expected_schema -@mark.main -@mark.order_items +@pytest.mark.main +@pytest.mark.order_items def test_erro_criacao_de_dag_transformacao_order_items( glue_manager, empty_df ): @@ -195,8 +194,8 @@ def test_erro_criacao_de_dag_transformacao_order_items( _ = glue_manager.transform_order_items(empty_df) -@mark.main -@mark.customers +@pytest.mark.main +@pytest.mark.customers def test_qtd_linhas_resultantes_pos_transformacao_customers( df_customers, df_customers_prep ): @@ -212,8 +211,8 @@ def test_qtd_linhas_resultantes_pos_transformacao_customers( assert df_customers_prep.count() == df_customers.count() -@mark.main -@mark.customers +@pytest.mark.main +@pytest.mark.customers def test_nao_duplicidade_de_customer_id_pos_transformacao_customers( df_customers_prep ): @@ -233,8 +232,8 @@ def test_nao_duplicidade_de_customer_id_pos_transformacao_customers( assert lines_distinct == lines -@mark.main -@mark.customers +@pytest.mark.main +@pytest.mark.customers def test_schema_resultante_pos_transformacao_customers( df_customers_prep ): @@ -257,8 +256,8 @@ def test_schema_resultante_pos_transformacao_customers( assert df_customers_prep.schema == expected_schema -@mark.main -@mark.customers +@pytest.mark.main +@pytest.mark.customers def test_erro_criacao_de_dag_transformacao_customers( glue_manager, empty_df ): @@ -277,8 +276,8 @@ def test_erro_criacao_de_dag_transformacao_customers( _ = glue_manager.transform_customers(empty_df) -@mark.main -@mark.payments +@pytest.mark.main +@pytest.mark.payments def test_qtd_linhas_resultantes_pos_transformacao_payments( df_payments_prep ): @@ -294,8 +293,8 @@ def test_qtd_linhas_resultantes_pos_transformacao_payments( assert df_payments_prep.count() == 10 -@mark.main -@mark.payments +@pytest.mark.main +@pytest.mark.payments def test_nao_duplicidade_de_order_id_pos_transformacao_payments( df_payments_prep ): @@ -315,8 +314,8 @@ def test_nao_duplicidade_de_order_id_pos_transformacao_payments( assert lines_distinct == lines -@mark.main -@mark.payments +@pytest.mark.main +@pytest.mark.payments def test_schema_resultante_pos_transformacao_payments( df_payments_prep ): @@ -342,8 +341,8 @@ def test_schema_resultante_pos_transformacao_payments( assert df_payments_prep.schema == expected_schema -@mark.main -@mark.payments +@pytest.mark.main +@pytest.mark.payments def test_erro_criacao_de_dag_transformacao_payments( glue_manager, empty_df ): @@ -362,8 +361,8 @@ def test_erro_criacao_de_dag_transformacao_payments( _ = glue_manager.transform_payments(empty_df) -@mark.main -@mark.reviews +@pytest.mark.main +@pytest.mark.reviews def test_qtd_linhas_resultantes_pos_transformacao_reviews( df_reviews_prep ): @@ -379,8 +378,8 @@ def test_qtd_linhas_resultantes_pos_transformacao_reviews( assert df_reviews_prep.count() == 10 -@mark.main -@mark.reviews +@pytest.mark.main +@pytest.mark.reviews def test_nao_duplicidade_de_order_id_pos_transformacao_reviews( df_reviews_prep ): @@ -400,8 +399,8 @@ def test_nao_duplicidade_de_order_id_pos_transformacao_reviews( assert lines_distinct == lines -@mark.main -@mark.reviews +@pytest.mark.main +@pytest.mark.reviews def test_schema_resultante_pos_transformacao_reviews( df_reviews_prep ): @@ -424,8 +423,8 @@ def test_schema_resultante_pos_transformacao_reviews( assert df_reviews_prep.schema == expected_schema -@mark.main -@mark.reviews +@pytest.mark.main +@pytest.mark.reviews def test_erro_criacao_de_dag_transformacao_reviews( glue_manager, empty_df ): @@ -444,8 +443,8 @@ def test_erro_criacao_de_dag_transformacao_reviews( _ = glue_manager.transform_reviews(empty_df) -@mark.main -@mark.sot +@pytest.mark.main +@pytest.mark.sot def test_qtd_linhas_resultantes_pos_transformacao_sot( df_sot_prep ): @@ -461,8 +460,8 @@ def test_qtd_linhas_resultantes_pos_transformacao_sot( assert df_sot_prep.count() == 10 -@mark.main -@mark.sot +@pytest.mark.main +@pytest.mark.sot def test_nao_duplicidade_de_order_id_pos_transformacao_sot( df_sot_prep ): @@ -482,8 +481,8 @@ def test_nao_duplicidade_de_order_id_pos_transformacao_sot( assert lines_distinct == lines -@mark.main -@mark.sot +@pytest.mark.main +@pytest.mark.sot def test_schema_resultante_pos_transformacao_sot( df_sot_prep ): @@ -536,8 +535,8 @@ def test_schema_resultante_pos_transformacao_sot( assert df_sot_prep.schema == expected_schema -@mark.main -@mark.sot +@pytest.mark.main +@pytest.mark.sot def test_erro_criacao_de_dag_transformacao_sot( glue_manager, empty_df ): @@ -553,4 +552,10 @@ def test_erro_criacao_de_dag_transformacao_sot( # Testando exceção with pytest.raises(Exception): - _ = glue_manager.transform_sot(empty_df) + _ = glue_manager.transform_sot( + df_orders_prep=empty_df, + df_order_items_prep=empty_df, + df_customers_prep=empty_df, + df_payments_prep=empty_df, + df_reviews_prep=empty_df + )