Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dados] br_rf_arrecadacao #786

Merged
merged 13 commits into from
Nov 7, 2024
24 changes: 24 additions & 0 deletions models/br_rf_arrecadacao/br_rf_arrecadacao__cnae.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{{ config(alias="cnae", schema="br_rf_arrecadacao") }}
select
safe_cast(ano as int64) ano,
safe_cast(mes as int64) mes,
safe_cast(secao_sigla as string) secao_sigla,
safe_cast(secao_nome as string) secao_nome,
safe_cast(imposto_importacao as float64) imposto_importacao,
safe_cast(imposto_exportacao as float64) imposto_exportacao,
safe_cast(ipi as float64) ipi,
safe_cast(irpf as float64) irpf,
safe_cast(irpj as float64) irpj,
safe_cast(irrf as float64) irrf,
safe_cast(iof as float64) iof,
safe_cast(itr as float64) itr,
safe_cast(cofins as float64) cofins,
safe_cast(pis_pasep as float64) pis_pasep,
safe_cast(csll as float64) csll,
safe_cast(cide_combustiveis as float64) cide_combustiveis,
safe_cast(contribuicao_previdenciaria as float64) contribuicao_previdenciaria,
safe_cast(cpsss as float64) cpsss,
safe_cast(pagamento_unificado as float64) pagamento_unificado,
safe_cast(outras_receitas_rfb as float64) outras_receitas_rfb,
safe_cast(demais_receitas as float64) demais_receitas,
from `basedosdados-staging.br_rf_arrecadacao_staging.cnae` as t
13 changes: 13 additions & 0 deletions models/br_rf_arrecadacao/br_rf_arrecadacao__ir_ipi.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{{ config(alias="ir_ipi", schema="br_rf_arrecadacao") }}
select
safe_cast(ano as int64) ano,
safe_cast(mes as int64) mes,
safe_cast(tributo as string) tributo,
safe_cast(decendio as string) decendio,
safe_cast(arrecadacao_bruta as float64) arrecadacao_bruta,
safe_cast(retificacao as float64) retificacao,
safe_cast(compensacao as float64) compensacao,
safe_cast(restituicao as float64) restituicao,
safe_cast(outros as float64) outros,
safe_cast(arrecadacao_liquida as float64) arrecadacao_liquida,
from `basedosdados-staging.br_rf_arrecadacao_staging.ir_ipi` as t
9 changes: 9 additions & 0 deletions models/br_rf_arrecadacao/br_rf_arrecadacao__itr.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{{ config(alias="itr", schema="br_rf_arrecadacao") }}
select
safe_cast(ano as int64) ano,
safe_cast(mes as int64) mes,
safe_cast(nome_uf as string) nome_uf,
safe_cast(regiao_politica as string) regiao_politica,
safe_cast(cidade_uf as string) cidade_uf,
safe_cast(valor_arrecadado as float64) valor_arrecadado,
from `basedosdados-staging.br_rf_arrecadacao_staging.itr` as t
35 changes: 35 additions & 0 deletions models/br_rf_arrecadacao/br_rf_arrecadacao__natureza_juridica.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{{ config(alias="natureza_juridica", schema="br_rf_arrecadacao") }}
with
referencia_codigo as (
select
id_natureza_juridica,
substr(cast(id_natureza_juridica as string), 0, 3) as inicio_codigo
from basedosdados - staging.br_bd_diretorios_brasil.natureza_juridica
)
select
safe_cast(t.ano as int64) ano,
safe_cast(t.mes as int64) mes,
safe_cast(
referencia_codigo.id_natureza_juridica as string
) natureza_juridica_codigo,
safe_cast(t.natureza_juridica_nome as string) natureza_juridica_nome,
safe_cast(t.imposto_importacao as float64) imposto_importacao,
safe_cast(t.imposto_exportacao as float64) imposto_exportacao,
safe_cast(t.ipi as float64) ipi,
safe_cast(t.irpf as float64) irpf,
safe_cast(t.irpj as float64) irpj,
safe_cast(t.irrf as float64) irrf,
safe_cast(t.iof as float64) iof,
safe_cast(t.itr as float64) itr,
safe_cast(t.cofins as float64) cofins,
safe_cast(t.pis_pasep as float64) pis_pasep,
safe_cast(t.csll as float64) csll,
safe_cast(t.cide_combustiveis as float64) cide_combustiveis,
safe_cast(t.contribuicao_previdenciaria as float64) contribuicao_previdenciaria,
safe_cast(t.cpsss as float64) cpsss,
safe_cast(t.pagamento_unificado as float64) pagamento_unificado,
safe_cast(t.outras_receitas_rfb as float64) outras_receitas_rfb,
safe_cast(t.demais_receitas as float64) demais_receitas,
from `basedosdados-staging.br_rf_arrecadacao_staging.natureza_juridica` as t
left join
referencia_codigo on t.natureza_juridica_codigo = referencia_codigo.inicio_codigo
49 changes: 49 additions & 0 deletions models/br_rf_arrecadacao/code/clean_cnae.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import os
import numpy as np
import pandas as pd
from clean_functions import *

def rename_columns(df):
name_dict = {
'Ano':'ano',
'Mês':'mes',
'Seção - Sigla':'secao_sigla',
'Seção - Nome':'secao_nome',
'II':'imposto_importacao',
'IE':'imposto_exportacao',
'IPI':'ipi',
'IRPF':'irpf',
'IRPJ':'irpj',
'IRRF':'irrf',
'IOF':'iof',
'ITR':'itr',
'Cofins':'cofins',
'Pis/Pasep':'pis_pasep',
'CSLL':'csll',
'Cide': 'cide_combustiveis',
'Contribuição Previdenciária':'contribuicao_previdenciaria',
'CPSSS':'cpsss',
'Pagamento Unificado':'pagamento_unificado',
'Outras Receitas Administradas':'outras_receitas_rfb',
'Receitas Não Administradas':'demais_receitas'
}

return df.rename(columns=name_dict)

def change_types(df):
df['ano'] = df['ano'].astype('int')
df['mes'] = get_month_number(df['mes'])

#All remaining columns are monetary values
for col in df.columns[4:]:
df[col] = df[col].apply(replace_commas).apply(remove_dots).astype('float')

return df

if __name__ == '__main__':
df = read_data(file_dir='../input/arrecadacao-cnae.csv')
df = remove_empty_columns(df)
df = remove_empty_rows(df)
df = rename_columns(df)
df = change_types(df)
save_data(df=df,file_dir='../output/br_rf_arrecadacao_cnae',partition_cols=['ano','mes'])
136 changes: 136 additions & 0 deletions models/br_rf_arrecadacao/code/clean_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import os
import numpy as np
import pandas as pd
from typing import List
from pathlib import Path

file_directory = os.path.dirname(__file__)

def read_data(file_dir,separator=';'):
data_directory = os.path.join(file_directory, file_dir)

return pd.read_csv(data_directory, sep=separator)

def remove_empty_rows(df):
return df.dropna(axis=0, how='all')

def remove_empty_columns(df):
return df.drop(list(df.filter(regex='Unnamed')), axis=1)

def replace_commas(value):
string_value = str(value)
num_commas = string_value.count(',')
if num_commas == 1:
return string_value.replace(',','.')
elif num_commas > 1:
return string_value.replace(',','',num_commas-1).replace(',','.')
else:
return string_value

def remove_dots(value):
string_value = str(value)
num_dots = string_value.count('.')
if num_dots > 1:
return string_value.replace('.','',num_dots-1)
else:
return string_value

def get_month_number(month_column):

month_lower = month_column.str.lower()
month_inits = month_lower.str[:3]

month_numbers = {
'jan': '1',
'fev': '2',
'mar': '3',
'abr': '4',
'mai': '5',
'jun': '6',
'jul': '7',
'ago': '8',
'set': '9',
'out': '10',
'nov': '11',
'dez': '12'
}
return month_inits.replace(month_numbers).astype('int')

def to_partitions(
data: pd.DataFrame,
partition_columns: List[str],
savepath: str,
file_type: str = "csv",
):
"""Save data in to hive patitions schema, given a dataframe and a list of partition columns.
Args:
data (pandas.core.frame.DataFrame): Dataframe to be partitioned.
partition_columns (list): List of columns to be used as partitions.
savepath (str, pathlib.PosixPath): folder path to save the partitions.
file_type (str): default to csv. Accepts parquet.
Exemple:
data = {
"ano": [2020, 2021, 2020, 2021, 2020, 2021, 2021,2025],
"mes": [1, 2, 3, 4, 5, 6, 6,9],
"sigla_uf": ["SP", "SP", "RJ", "RJ", "PR", "PR", "PR","PR"],
"dado": ["a", "b", "c", "d", "e", "f", "g",'h'],
}
to_partitions(
data=pd.DataFrame(data),
partition_columns=['ano','mes','sigla_uf'],
savepath='partitions/',
)
"""

if isinstance(data, (pd.core.frame.DataFrame)):
savepath = Path(savepath)
# create unique combinations between partition columns
unique_combinations = (
data[partition_columns]
# .astype(str)
.drop_duplicates(subset=partition_columns).to_dict(orient="records")
)

for filter_combination in unique_combinations:
patitions_values = [
f"{partition}={value}"
for partition, value in filter_combination.items()
]

# get filtered data
df_filter = data.loc[
data[filter_combination.keys()]
.isin(filter_combination.values())
.all(axis=1),
:,
]
df_filter = df_filter.drop(columns=partition_columns)

# create folder tree
filter_save_path = Path(savepath / "/".join(patitions_values))
filter_save_path.mkdir(parents=True, exist_ok=True)

if file_type == "csv":
# append data to csv
file_filter_save_path = Path(filter_save_path) / "data.csv"
df_filter.to_csv(
file_filter_save_path,
sep=",",
encoding="utf-8",
na_rep="",
index=False,
mode="a",
header=not file_filter_save_path.exists(),
)
elif file_type == "parquet":
# append data to parquet
file_filter_save_path = Path(filter_save_path) / "data.parquet"
df_filter.to_parquet(
file_filter_save_path, index=False, compression="gzip"
)
else:
raise BaseException("Data need to be a pandas DataFrame")

def save_data(df,file_dir,partition_cols):
data_directory = os.path.join(file_directory,file_dir)
to_partitions(data=df,partition_columns=partition_cols,savepath=data_directory)
37 changes: 37 additions & 0 deletions models/br_rf_arrecadacao/code/clean_ir_ipi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import os
import numpy as np
import pandas as pd
from clean_functions import *

def rename_columns(df):
name_dict = {
'Ano':'ano',
'Mês':'mes',
'Tributo':'tributo',
'Decêndio':'decendio',
'Arrecadação Bruta':'arrecadacao_bruta',
'Retificação':'retificacao',
'Compensação':'compensacao',
'Restituição':'restituicao',
'Outros':'outros',
'Arrecadação Líquida':'arrecadacao_liquida'
}

return df.rename(columns=name_dict)

def change_types(df):
df['ano'] = df['ano'].astype('int')
df['mes'] = get_month_number(df['mes'])

#All remaining columns are monetary values
for col in df.columns[4:]:
df[col] = df[col].apply(replace_commas).apply(remove_dots).astype('float')

return df

if __name__ == '__main__':
df = read_data(file_dir='../input/arrecadacao-ir-ipi.csv')
df = remove_empty_rows(df)
df = rename_columns(df)
df = change_types(df)
save_data(df=df,file_dir='../output/br_rf_arrecadacao_ir_ipi',partition_cols=['ano','mes'])
30 changes: 30 additions & 0 deletions models/br_rf_arrecadacao/code/clean_itr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os
import numpy as np
import pandas as pd
from clean_functions import *

def rename_columns(df):
name_dict = {
'Ano':'ano',
'Mês':'mes',
'Unidade da Federação':'nome_uf',
'Região Política':'regiao_politica',
'Cidade e UF':'cidade_uf',
'Valor':'valor_arrecadado'
}

return df.rename(columns=name_dict)

def change_types(df):
df['ano'] = df['ano'].astype('int')
df['mes'] = get_month_number(df['mes'])
df['valor_arrecadado'] = df['valor_arrecadado'].apply(replace_commas).apply(remove_dots).astype('float')

return df

if __name__ == '__main__':
df = read_data(file_dir='../input/arrecadacao-itr.csv')
df = remove_empty_rows(df)
df = rename_columns(df)
df = change_types(df)
save_data(df=df,file_dir='../output/br_rf_arrecadacao_itr',partition_cols=['ano','mes'])
50 changes: 50 additions & 0 deletions models/br_rf_arrecadacao/code/clean_natureza_juridica.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import os
import numpy as np
import pandas as pd
from clean_functions import *

def rename_columns(df):
name_dict = {
'Ano':'ano',
'Mês':'mes',
'Natureza Jurídica - Código':'natureza_juridica_codigo',
'Natureza Jurídica - Nome':'natureza_juridica_nome',
'II':'imposto_importacao',
'IE':'imposto_exportacao',
'IPI':'ipi',
'IRPF':'irpf',
'IRPJ':'irpj',
'IRRF':'irrf',
'IOF':'iof',
'ITR':'itr',
'Cofins':'cofins',
'Pis/Pasep':'pis_pasep',
'CSLL':'csll',
'Cide': 'cide_combustiveis',
'Contribuição Previdenciária':'contribuicao_previdenciaria',
'CPSSS':'cpsss',
'Pagamento Unificado':'pagamento_unificado',
'Outras Receitas Administradas':'outras_receitas_rfb',
'Receitas Não Administradas':'demais_receitas'
}

return df.rename(columns=name_dict)

def change_types(df):
df['ano'] = df['ano'].astype('int')
df['mes'] = get_month_number(df['mes'])
df['natureza_juridica_nome'] = df['natureza_juridica_nome'].str.title()

#All remaining columns are monetary values
for col in df.columns[4:]:
df[col] = df[col].apply(replace_commas).apply(remove_dots).astype('float')

return df

if __name__ == '__main__':
df = read_data(file_dir='../input/arrecadacao-natureza.csv')
df = remove_empty_rows(df)
df = remove_empty_columns(df)
df = rename_columns(df)
df = change_types(df)
save_data(df=df,file_dir='../output/br_rf_arrecadacao_natureza_juridica',partition_cols=['ano','mes'])
Loading
Loading