Skip to content

Commit

Permalink
br_inep_saeb: expandir cobertura temporal dic
Browse files Browse the repository at this point in the history
  • Loading branch information
aspeddro committed Jul 29, 2024
1 parent 0d6217d commit 0e357ed
Showing 1 changed file with 203 additions and 0 deletions.
203 changes: 203 additions & 0 deletions models/br_inep_saeb/code/extend_dict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
# Script para alterar o formato da cobertura temporal do dicionario saeb
# O formato será expandido, cada linha será um ano
import basedosdados as bd
import re
import itertools
import pandas as pd
import os

OUTPUT = os.path.join(os.getcwd(), "output")

os.makedirs(OUTPUT, exist_ok=True)

df = bd.read_sql(
"select * from `basedosdados-dev.br_inep_saeb_staging.dicionario`",
billing_project_id="basedosdados-dev",
)

df = df.loc[(df["cobertura_temporal"] != "1") & (df["cobertura_temporal"] != "D"),]


def parse_temporal_coverage(temporal_coverage: str) -> list[dict[str, int]]:
def parse_common(value: str) -> dict[str, int]:
# single value
# (y)
if value[0] == "(":
return dict(temporal_unit=int(value[1]))

# single year
if len(value) == 4:
return dict(single_year=int(value))

# x, x
# if "," in value:
# # TODO: Generic format
# return None

# x(y) or x(y)z
if "(" in value:
pattern_temporal_unit = r"\((\d+)\)"
# Split and drop empty strings
parts: list[str] = [
i for i in re.split(pattern_temporal_unit, value) if len(i) > 0
]

assert len(parts) <= 3, f"Error: {temporal_coverage=}"

# x(y), 2005(2)
if len(parts) == 2:
return dict(start_year=int(parts[0]), temporal_unit=int(parts[1]))

return dict(
start_year=int(parts[0]),
temporal_unit=int(parts[1]),
end_year=int(parts[2]),
)

raise Exception(f"Failed to parse {temporal_coverage=}")

if "," in temporal_coverage:
return [parse_common(i.strip()) for i in temporal_coverage.split(",")]
else:
return [parse_common(temporal_coverage)]


# Examples:
# {'start_year': 2013, 'temporal_unit': 2, 'end_year': 2017}
def build_date_range(
temporal_coverage: dict[str, int], start_year: int, latest_year: int
):
if (
"start_year" in temporal_coverage
and "temporal_unit" in temporal_coverage
and "end_year" in temporal_coverage
):
return list(
range(
temporal_coverage["start_year"],
temporal_coverage["end_year"] + temporal_coverage["temporal_unit"],
temporal_coverage["temporal_unit"],
)
)
elif "start_year" in temporal_coverage and "temporal_unit" in temporal_coverage:
return list(
range(
temporal_coverage["start_year"],
latest_year + temporal_coverage["temporal_unit"],
temporal_coverage["temporal_unit"],
)
)
elif "temporal_unit" in temporal_coverage:
return list(
range(
start_year,
latest_year + temporal_coverage["temporal_unit"],
temporal_coverage["temporal_unit"],
)
)
elif "single_year" in temporal_coverage:
return [temporal_coverage["single_year"]]


dfs = dict(
[
# Table id is wrong
(table_id.replace("aluno_ef_2_ano", "aluno_ef_2ano"), df_by_table)
for (table_id, df_by_table) in df.groupby("id_tabela")
]
)

backend = bd.Backend(
graphql_url="https://staging.backend.basedosdados.org/api/v1/graphql"
)


def transform_df(table_id: str, df: pd.DataFrame) -> pd.DataFrame:
d = df.copy()
table_slug = backend._get_table_id_from_name(
gcp_dataset_id="br_inep_saeb", gcp_table_id=table_id
)
if not isinstance(table_slug, str):
raise Exception(f"Not found slug fo {table_id=}")

response = backend._execute_query(
query="""
query($table_id: ID) {
allTable(id: $table_id) {
edges {
node {
name,
coverages {
edges {
node {
datetimeRanges {
edges {
node {
id,
startYear,
endYear
}
}
}
}
}
}
}
}
}
}
""",
variables={"table_id": table_slug},
)

payload = backend._simplify_graphql_response(response)["allTable"][0]["coverages"][
0
]["datetimeRanges"][0]

latest_year = payload["endYear"]
start_year = payload["startYear"]

d["temporal_coverage_parsed"] = d["cobertura_temporal"].apply(
lambda x: list(
itertools.chain(
*[ # type: ignore
build_date_range(i, start_year=start_year, latest_year=latest_year)
for i in parse_temporal_coverage(x)
]
)
)
)
return d


new_arch = {
table_id: transform_df(table_id, df_by_table)
for (table_id, df_by_table) in dfs.items()
}

new_arch_5ano = new_arch["aluno_ef_5ano"].copy()

new_arch_5ano["temporal_coverage_parsed"] = new_arch_5ano["temporal_coverage_parsed"]

counts = (
new_arch_5ano[["cobertura_temporal", "temporal_coverage_parsed"]]
.value_counts(dropna=False)
.reset_index()
)

counts[["cobertura_temporal", "temporal_coverage_parsed"]]

new_arch_5ano.drop(columns=["cobertura_temporal"]).explode(
"temporal_coverage_parsed"
).rename(columns={"temporal_coverage_parsed": "cobertura_temporal"}).to_csv(
os.path.join(OUTPUT, "dicionario.csv"), index=False
)

tb = bd.Table(dataset_id="br_inep_saeb", table_id="dicionario")

tb.create(
os.path.join(OUTPUT, "dicionario.csv"),
if_table_exists="replace",
if_storage_data_exists="replace",
)

0 comments on commit 0e357ed

Please sign in to comment.