Skip to content

Commit

Permalink
Merge branch 'main' into update-br-inep-sinopse-educacao-basica
Browse files Browse the repository at this point in the history
  • Loading branch information
aspeddro authored Mar 28, 2024
2 parents 42984a1 + 746fb53 commit 675a088
Show file tree
Hide file tree
Showing 42 changed files with 2,412 additions and 96 deletions.
20 changes: 10 additions & 10 deletions .github/workflows/scripts/table_approve.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def push_table_to_bq(
Dataset(dataset_id).update(mode="prod")
delete_storage_path = file_path.replace("./downloaded_data/", "")
print(
f"DELETE HEADER FILE FROM basedosdados/staing/{dataset_id}_staging/{table_id}/{delete_storage_path}"
f"DELETE HEADER FILE FROM basedosdados/staging/{dataset_id}_staging/{table_id}/{delete_storage_path}"
)
st = Storage(dataset_id=dataset_id, table_id=table_id)
st.delete_file(filename=delete_storage_path, mode="staging")
Expand Down Expand Up @@ -146,27 +146,27 @@ def save_header_files(dataset_id, table_id):
print("Found blob: ", str(blob.name))
print("Renamed blob: ", blob_path)
break
### save table header in storage

print(f"DOWNLOAD HEADER FILE FROM basedosdados-dev.{dataset_id}_staging.{table_id}")
query = f"""
SELECT * FROM `basedosdados-dev.{dataset_id}_staging.{table_id}` LIMIT 1
"""
df = bd.read_sql(query, billing_project_id="basedosdados", from_file=True)
df = df.drop(columns=partitions)

file_name = blob_path.split("/")[-1]
file_type = file_name.split(".")[-1]

path = Path(blob_path.replace(f"/{file_name}", ""))
path.mkdir(parents=True, exist_ok=True)

### save table header in storage
if file_type == "csv":
print(f"DOWNLOAD HEADER FILE FROM basedosdados-dev.{dataset_id}_staging.{table_id}")
query = f"""
SELECT * FROM `basedosdados-dev.{dataset_id}_staging.{table_id}` LIMIT 1
"""
df = bd.read_sql(query, billing_project_id="basedosdados", from_file=True)
df = df.drop(columns=partitions)

file_path = f"./{path}/table_approve_temp_file_271828.csv"
df.to_csv(file_path, index=False)
elif file_type == "parquet":
file_path = f"./{path}/table_approve_temp_file_271828.parquet"
df.to_parquet(file_path)
blob.download_to_filename(file_path)
print("SAVE HEADER FILE: ", file_path)
return file_path

Expand Down
236 changes: 236 additions & 0 deletions .github/workflows/scripts/table_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
from argparse import ArgumentParser
from time import sleep
import re
from backend import Backend
from utils import expand_alls, get_datasets_tables_from_modified_files


def get_flow_run_state(flow_run_id: str, backend: Backend, auth_token: str):
query = """
query ($flow_run_id: uuid!) {
flow_run_by_pk (id: $flow_run_id) {
state
}
}
"""
response = backend._execute_query(
query,
variables={"flow_run_id": flow_run_id},
headers={"Authorization": f"Bearer {auth_token}"},
)
return response["flow_run_by_pk"]["state"]

def get_flow_status_logs(flow_run_id: str, backend: Backend, auth_token: str):
query = """query ($flow_run_id: uuid!){
log(where:{
flow_run_id:{_eq:$flow_run_id},
message:{_like:"%Done.%"}}){
message
}
}"""
response = backend._execute_query(
query,
variables={"flow_run_id": flow_run_id},
headers={"Authorization": f"Bearer {auth_token}"},
)
print(response)
message = response['log']['message']
result = {}
result['pass'] = int(re.findall("PASS=\d+", message)[0].split('=')[1])
result['skip'] = int(re.findall("SKIP=\d+", message)[0].split('=')[1])
result['warn'] = int(re.findall("WARN=\d+", message)[0].split('=')[1])

return result


def get_materialization_flow_id(backend: Backend, auth_token: str):
query = """
query {
flow (where: {
name: {
_like: "BD template: Executa DBT model"
},
archived: {
_eq: false
},
project: {
name: {_eq: "main"}
}
}) {
id
}
}
"""
response = backend._execute_query(
query, headers={"Authorization": f"Bearer {auth_token}"}
)
return response["flow"][0]["id"]


if __name__ == "__main__":
# Start argument parser
arg_parser = ArgumentParser()

# Add GraphQL URL argument
arg_parser.add_argument(
"--graphql-url",
type=str,
required=True,
help="URL of the GraphQL endpoint.",
)

# Add list of modified files argument
arg_parser.add_argument(
"--modified-files",
type=str,
required=True,
help="List of modified files.",
)


# Add Prefect backend URL argument
arg_parser.add_argument(
"--prefect-backend-url",
type=str,
required=False,
default="https://prefect.basedosdados.org/api",
help="Prefect backend URL.",
)

# Add prefect base URL argument
arg_parser.add_argument(
"--prefect-base-url",
type=str,
required=False,
default="https://prefect.basedosdados.org",
help="Prefect base URL.",
)

# Add Prefect API token argument
arg_parser.add_argument(
"--prefect-backend-token",
type=str,
required=True,
help="Prefect backend token.",
)

# Add materialization mode argument
arg_parser.add_argument(
"--materialization-mode",
type=str,
required=False,
default="dev",
help="Materialization mode.",
)

# Add materialization label argument
arg_parser.add_argument(
"--materialization-label",
type=str,
required=False,
default="basedosdados-dev",
help="Materialization label.",
)

# Add dbt command label argument
arg_parser.add_argument(
"--dbt-command",
type=str,
required=False,
default = "test",
help="Materialization label.",
)

# Get arguments
args = arg_parser.parse_args()

# Get datasets and tables from modified files
modified_files = args.modified_files.split(",")
datasets_tables = get_datasets_tables_from_modified_files(
modified_files, show_details=True
)
# Split deleted datasets and tables
deleted_datasets_tables = []
existing_datasets_tables = []
for dataset_id, table_id, exists, alias in datasets_tables:
if exists:
existing_datasets_tables.append((dataset_id, table_id, alias))
else:
deleted_datasets_tables.append((dataset_id, table_id, alias))
# Expand `__all__` tables
backend = Backend(args.graphql_url)
expanded_existing_datasets_tables = []
for dataset_id, table_id, alias in existing_datasets_tables:
expanded_table_ids = expand_alls(dataset_id, table_id, backend)
for expanded_dataset_id, expanded_table_id in expanded_table_ids:
expanded_existing_datasets_tables.append(
(expanded_dataset_id, expanded_table_id, alias)
)
existing_datasets_tables = expanded_existing_datasets_tables

# Launch materialization flows
backend = Backend(args.prefect_backend_url)
flow_id = get_materialization_flow_id(backend, args.prefect_backend_token)
launched_flow_run_ids = []
for dataset_id, table_id, alias in existing_datasets_tables:
print(
f"Launching materialization flow for {dataset_id}.{table_id} (alias={alias})..."
)
parameters = {
"dataset_id": dataset_id,
"dbt_alias": alias,
"mode": args.materialization_mode,
"table_id": table_id,
"dbt_command": args.dbt_command
}

mutation = """
mutation ($flow_id: UUID, $parameters: JSON, $label: String!) {
create_flow_run (input: {
flow_id: $flow_id,
parameters: $parameters,
labels: [$label],
}) {
id
}
}
"""
variables = {
"flow_id": flow_id,
"parameters": parameters,
"label": args.materialization_label,
}

response = backend._execute_query(
mutation,
variables,
headers={"Authorization": f"Bearer {args.prefect_backend_token}"},
)

flow_run_id = response["create_flow_run"]["id"]
launched_flow_run_ids.append(flow_run_id)
flow_run_url = f"{args.prefect_base_url}/flow-run/{flow_run_id}"
print(f" - Materialization flow run launched: {flow_run_url}")

# Keep monitoring the launched flow runs until they are finished
for launched_flow_run_id in launched_flow_run_ids:
print(f"Monitoring flow run {launched_flow_run_id}...")
flow_run_state = get_flow_run_state(
flow_run_id=launched_flow_run_id,
backend=backend,
auth_token=args.prefect_backend_token,
)
while flow_run_state not in ["Success", "Failed", "Cancelled"]:
sleep(5)
flow_run_state = get_flow_run_state(
flow_run_id=launched_flow_run_id,
backend=backend,
auth_token=args.prefect_backend_token,
)
if flow_run_state != "Success":
raise Exception(
f'Flow run {launched_flow_run_id} finished with state "{flow_run_state}". '
f"Check the logs at {args.prefect_base_url}/flow-run/{launched_flow_run_id}"
)
else:
print("Congrats! Everything seems fine!")
34 changes: 34 additions & 0 deletions .github/workflows/test_dbt_model.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
name: Test DBT model
on:
pull_request:
types: [labeled, opened]
branches: [main]
paths: [models/**, .github/workflows/test_dbt_model.yaml]
jobs:
test_dbt_model:
if: contains(github.event.pull_request.labels.*.name, 'test-dev-model')
name: Test DBT dev model
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
with:
ref: ${{ github.head_ref }}
- name: Get all changed files using a comma separator
id: changed-files
uses: tj-actions/changed-files@v35
with:
separator: ','
- name: Set up poetry
run: pipx install poetry
- name: Set up python
uses: actions/setup-python@v4
with:
cache: poetry
python-version: '3.9'
- name: Install requirements
run: poetry install --only=dev
- name: Run script to test DBT model
run: |-
poetry run python .github/workflows/scripts/table_test.py --modified-files ${{ steps.changed-files.outputs.all_modified_files }} --graphql-url ${{ secrets.BACKEND_GRAPHQL_URL }} --prefect-backend-token ${{ secrets.PREFECT_BACKEND_TOKEN }}
2 changes: 2 additions & 0 deletions .user.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
---
id: cc3f54e0-fd01-4495-bd12-aa41f3b24444
10 changes: 8 additions & 2 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ profile: default
vars:
disable_run_results: false
disable_tests_results: false
disable_dbt_artifacts_autoupload: false
disable_dbt_invocation_autoupload: false
disable_dbt_artifacts_autoupload: true
disable_dbt_invocation_autoupload: true
# These configurations specify where dbt should look for different types of files.
# The `source-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
Expand Down Expand Up @@ -231,12 +231,18 @@ models:
+post-hook:
- REVOKE `roles/bigquery.dataViewer` ON TABLE {{ this }} FROM "specialGroup:allUsers"
- GRANT `roles/bigquery.dataViewer` ON TABLE {{ this }} TO "group:bd-pro@basedosdados.org"
br_mme_consumo_energia_eletrica:
+materialized: table
+schema: br_mme_consumo_energia_eletrica
br_mp_pep:
+materialized: table
+schema: br_mp_pep
br_ms_cnes:
+materialized: table
+schema: br_ms_cnes
br_ms_sia:
+materialized: table
+schema: br_ms_sia
br_ms_sim:
+materialized: table
+schema: br_ms_sim
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
{{ config(alias="orgao_deputado", schema="br_camara_dados_abertos") }}
select distinct
regexp_extract(uriorgao, r'/orgaos/(\d+)') as id_orgao,
safe_cast(nomeorgao as string) nome,
safe_cast(siglaorgao as string) sigla,
safe_cast(nomedeputado as string) nome_deputado,
safe_cast(cargo as string) cargo,
safe_cast(siglauf as string) sigla_uf,
safe_cast(datainicio as date) data_inicio,
safe_cast(datafim as date) data_final,
safe_cast(siglapartido as string) sigla_partido,
from `basedosdados-staging.br_camara_dados_abertos_staging.orgao_deputado` as t
with
orgao_deputado as (
select distinct
regexp_extract(uriorgao, r'/orgaos/(\d+)') as id_orgao,
safe_cast(nomeorgao as string) nome,
safe_cast(siglaorgao as string) sigla,
safe_cast(nomedeputado as string) nome_deputado,
safe_cast(cargo as string) cargo,
safe_cast(siglauf as string) sigla_uf,
safe_cast(datainicio as date) data_inicio,
safe_cast(datafim as date) data_final,
safe_cast(siglapartido as string) sigla_partido,
from `basedosdados-staging.br_camara_dados_abertos_staging.orgao_deputado`
)
select *
from orgao_deputado
where
not (
nome_deputado = 'Hélio Leite'
and cargo = 'Titular'
and sigla_uf is null
and data_inicio = '2022-05-03'
and data_final = '2023-02-01'
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{{ config(alias="proposicao_autor", schema="br_camara_dados_abertos") }}

select
select distinct
safe_cast(idproposicao as string) id_proposicao,
replace(safe_cast(iddeputadoautor as string), ".0", "") id_deputado,
initcap(safe_cast(tipoautor as string)) tipo_autor,
Expand Down
Loading

0 comments on commit 675a088

Please sign in to comment.