Resolver os problemas propostos pelo desafio utilizando uma arquitetura simples, resiliente e escalável.
Por existir a possibilidade de uma relação 1:n entre a entidade usuário
e a entidade endereço
, decidi modelar os dados em duas tabelas, uma para usuários e outra para endereços. Ao alterar a modelagem dos dados, as tabelas ficam na Terceira Forma Normal, melhorando a estrutura do banco de dados.
-
Docker: Conteinerização
-
Airflow: Orquestração
-
Postgres: Banco de Dados
-
PgAdmin: Interface gráfica/administrador do banco de dados
-
Metabase: Dashboard
Todos os serviços utilizados no desafio estão dockerizados, garantindo reprodutibilidade e escalabilidade. O ambiente é simples e está dividido em dois elementos principais: o banco de dados e o orquestrador. A divisão dos ambientes foi feita através dos arquivos docker-compose.yaml
, o primeiro arquivo, referente ao ambiente do Airflow, centraliza todos os serviços necessários para a sua execução (banco de dados, variáveis de ambiente, requirements, webserver e etc) e o segundo, referente ao ambiente do Postgres, centraliza os serviços necessários para a sua execução além de criar as conexões necessárias entre o banco, o PgAdmin e o Metabase.
Uma observação importante é que precisa haver uma conexão ativa entre os dois docker composes, para que o Airflow consiga se comunicar com o banco Postgres e vice-versa. Para isso, foi criado um arquivo .env
contendo as variáveis de ambiente necessárias para o Airflow se comunicar com o Postgres, além da criação da conexão entre o contêiner do banco e o contêiner do worker do Airflow (que é responsável por executar as DAGs).
Exemplo do arquivo .env
:
AIRFLOW_UID=501
PG_HOST=pgdatabase
PG_USER=root
PG_PASSWORD=root
PG_PORT=5432
PG_DATABASE=capim
Exemplo da conexão no arquivo postgres/docker-compose.yaml
:
networks:
airflow:
external:
name: airflow_default
O airflow executa a DAG com o nome de CompleteIngestionDag
, que está no arquivo ./airflow/data_ingestion_local.py
. Ela possui 4 tasks que fazem o processo de extração -> validação -> criação das tabelas -> carregamento dos dados no banco. Os dados brutos são extraídos da URL e salvos em um arquivo data.json
, em seguida passam pela validação feita através do pydantic
, que verifica o tipo dos dados, o schema e outras condições específicas (como o formato dos dados da coluna cep
, por exemplo). Caso passem na validação, as tabelas são criadas -- caso não existam -- e os dados são carregados.
Visando resolver o problema do desafio, uma lógica foi criada para impossibilitar o carregamento de dados duplicados no banco. O fluxo confere a existência da chave única na tabela de usuários e a existência do conjunto cep ~ logradouro ~ número
na tabela de endereços, para todas as observações, e só insere os novos dados se eles forem únicos e se não existirem previamente no banco. Esse processo garante a integridade e a atomicidade dos dados.
- Clone o repositório:
git clone https://github.com/arthurfg/desafio-data-engineer.git
- Navege até o repo:
cd </path/to/repo>
- Crie um arquivo
.env
com essas variáveis:
AIRFLOW_UID=<seu UID>
PG_HOST=pgdatabase
PG_USER=root
PG_PASSWORD=root
PG_PORT=5432
PG_DATABASE=capim
Caso use linux/mac, insira o ID que resulta desse comando na variável de ambiente AIRFLOW_UID
:
echo "$(id -u)"
Caso use outro sistema operacional use AIRFLOW_UID=50000
.
- Navege até o diretório
./airflow
:
cd airflow
- Inicialize o airflow:
docker compose up airflow-init
- Rode o docker compose:
docker compose up
- Navege até o diretório
./postgres
:
cd postgres
- Rode o docker compose:
docker compose up
Após todos os contêiners estarem ativos e rodando, vá até o http://localhost:8080/
, digite o usuário airflow
e a senha airflow
e rode a DAG CompleteIngestionDag
.