Skip to content

Commit

Permalink
Refactor Athena table definitions to move them into the dbt DAG (#111)
Browse files Browse the repository at this point in the history
* Refactor Athena table definitions to move them into the dbt DAG

* Fix location.census_acs5 model to pull acs5 attributes from the correct subquery

* Bump expiration time for AWS OIDC token in build_and_test_dbt workflow

* Temporarily select location.tax for debugging in build_and_test_dbt workflow

* Bump error threshold for vw_pin_address_no_extra_whitespace

* Revert "Temporarily select location.tax for debugging in build_and_test_dbt workflow"

This reverts commit 5ea671f.

* Revert dist_pin_to_pin to a source

---------

Co-authored-by: Jean Cochrane <jecochr@ccao-datals.ccao.local>
  • Loading branch information
jeancochrane and Jean Cochrane authored Sep 14, 2023
1 parent 37ef707 commit 8d9b168
Show file tree
Hide file tree
Showing 68 changed files with 700 additions and 592 deletions.
5 changes: 5 additions & 0 deletions .github/actions/setup_dbt/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ inputs:
role-to-assume:
description: AWS IAM role to assume when running dbt operations.
required: true
role-duration-seconds:
description: Expiration time for AWS OIDC token. Default is one hour.
required: false
default: 3600
runs:
using: composite
steps:
Expand All @@ -18,6 +22,7 @@ runs:
with:
role-to-assume: ${{ inputs.role-to-assume }}
aws-region: us-east-1
role-duration-seconds: ${{ inputs.role-duration-seconds }}

- name: Configure dbt environment
uses: ./.github/actions/configure_dbt_environment
1 change: 1 addition & 0 deletions .github/workflows/build_and_test_dbt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ jobs:
uses: ./.github/actions/setup_dbt
with:
role-to-assume: ${{ secrets.AWS_IAM_ROLE_TO_ASSUME_ARN }}
role-duration-seconds: 14400 # Worst-case time for full build

- name: Restore dbt state cache
id: cache
Expand Down
33 changes: 18 additions & 15 deletions aws-athena/ctas/location-access.sql
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
CREATE TABLE IF NOT EXISTS location.access
WITH (
FORMAT = 'Parquet',
WRITE_COMPRESSION = 'SNAPPY',
EXTERNAL_LOCATION = 's3://ccao-athena-ctas-us-east-1/location/access',
PARTITIONED_BY = ARRAY['year'],
BUCKETED_BY = ARRAY['pin10'],
BUCKET_COUNT = 1
) AS (
{{
config(
materialized='table',
partitioned_by=['year'],
bucketed_by=['pin10'],
bucket_count=1
)
}}

WITH access AS (
WITH distinct_pins AS (
SELECT DISTINCT
x_3435,
y_3435
FROM spatial.parcel
FROM {{ source('spatial', 'parcel') }}
),

distinct_years AS (
SELECT DISTINCT year
FROM spatial.parcel
FROM {{ source('spatial', 'parcel') }}
),

distinct_years_rhs AS (
SELECT DISTINCT year
FROM spatial.walkability
FROM {{ source('spatial', 'walkability') }}
),

walkability AS (
Expand All @@ -43,12 +44,12 @@ WITH (
SELECT
dy.year AS pin_year,
MAX(df.year) AS fill_year
FROM spatial.walkability AS df
FROM {{ source('spatial', 'walkability') }} AS df
CROSS JOIN distinct_years AS dy
WHERE dy.year >= df.year
GROUP BY dy.year
) AS fill_years
LEFT JOIN spatial.walkability AS fill_data
LEFT JOIN {{ source('spatial', 'walkability') }} AS fill_data
ON fill_years.fill_year = fill_data.year
) AS cprod
ON ST_WITHIN(
Expand All @@ -65,10 +66,12 @@ WITH (
walk.access_cmap_walk_total_score,
walk.access_cmap_walk_data_year,
pcl.year
FROM spatial.parcel AS pcl
FROM {{ source('spatial', 'parcel') }} AS pcl
LEFT JOIN walkability AS walk
ON pcl.x_3435 = walk.x_3435
AND pcl.y_3435 = walk.y_3435
AND pcl.year = walk.pin_year
WHERE pcl.year >= (SELECT MIN(year) FROM distinct_years_rhs)
)

SELECT * FROM access
29 changes: 16 additions & 13 deletions aws-athena/ctas/location-census.sql
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
CREATE TABLE IF NOT EXISTS location.census
WITH (
FORMAT = 'Parquet',
WRITE_COMPRESSION = 'SNAPPY',
EXTERNAL_LOCATION = 's3://ccao-athena-ctas-us-east-1/location/census',
PARTITIONED_BY = ARRAY['year'],
BUCKETED_BY = ARRAY['pin10'],
BUCKET_COUNT = 1
) AS (
{{
config(
materialized='table',
partitioned_by=['year'],
bucketed_by=['pin10'],
bucket_count=1
)
}}

WITH census AS (
WITH distinct_pins AS (
SELECT DISTINCT
x_3435,
y_3435
FROM spatial.parcel
FROM {{ source('spatial', 'parcel') }}
),

distinct_years_rhs AS (
SELECT DISTINCT year
FROM spatial.census
FROM {{ source('spatial', 'census') }}
),

distinct_joined AS (
Expand Down Expand Up @@ -64,7 +65,7 @@ WITH (
END) AS census_zcta_geoid,
cen.year
FROM distinct_pins AS dp
LEFT JOIN spatial.census AS cen
LEFT JOIN {{ source('spatial', 'census') }} AS cen
ON ST_WITHIN(
ST_POINT(dp.x_3435, dp.y_3435),
ST_GEOMFROMBINARY(cen.geometry_3435)
Expand All @@ -89,10 +90,12 @@ WITH (
dj.census_zcta_geoid,
dj.year AS census_data_year,
pcl.year
FROM spatial.parcel AS pcl
FROM {{ source('spatial', 'parcel') }} AS pcl
LEFT JOIN distinct_joined AS dj
ON pcl.year = dj.year
AND pcl.x_3435 = dj.x_3435
AND pcl.y_3435 = dj.y_3435
WHERE pcl.year >= (SELECT MIN(year) FROM distinct_years_rhs)
)

SELECT * FROM census
55 changes: 29 additions & 26 deletions aws-athena/ctas/location-census_acs5.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,43 @@
-- current PINs. For example, 2020 ACS data is not yet release, but we do have
-- current (2021) geographies and PINs. We need to join 2019 geographies to
-- 2021 PINs to facilitate joining 2019 ACS5 data
CREATE TABLE IF NOT EXISTS location.census_acs5
WITH (
FORMAT = 'Parquet',
WRITE_COMPRESSION = 'SNAPPY',
EXTERNAL_LOCATION = 's3://ccao-athena-ctas-us-east-1/location/census_acs5',
PARTITIONED_BY = ARRAY['year'],
BUCKETED_BY = ARRAY['pin10'],
BUCKET_COUNT = 1
) AS (
{{
config(
materialized='table',
partitioned_by=['year'],
bucketed_by=['pin10'],
bucket_count=1
)
}}

WITH census_acs5 AS (
WITH distinct_pins AS (
SELECT DISTINCT
x_3435,
y_3435
FROM spatial.parcel
FROM {{ source('spatial', 'parcel') }}
),

distinct_years AS (
SELECT DISTINCT year
FROM spatial.parcel
FROM {{ source('spatial', 'parcel') }}
),

distinct_years_rhs AS (
SELECT DISTINCT year
FROM spatial.census
FROM {{ source('spatial', 'census') }}
),

acs5_max_year AS (
SELECT MAX(year) AS max_year
FROM census.acs5
FROM {{ source('census', 'acs5') }}
),

acs5_year_fill AS (
SELECT
dy.year AS pin_year,
MAX(df.year) AS fill_year
FROM census.acs5 AS df
FROM {{ source('census', 'acs5') }} AS df
CROSS JOIN distinct_years AS dy
WHERE dy.year >= df.year
GROUP BY dy.year
Expand Down Expand Up @@ -81,7 +82,7 @@ WITH (
FROM distinct_pins AS dp
LEFT JOIN (
SELECT *
FROM spatial.census
FROM {{ source('spatial', 'census') }}
WHERE year <= (SELECT max_year FROM acs5_max_year)
) AS cen
ON ST_WITHIN(
Expand All @@ -93,19 +94,19 @@ WITH (

SELECT
pcl.pin10,
ayf.census_acs5_congressional_district_geoid,
ayf.census_acs5_county_subdivision_geoid,
ayf.census_acs5_place_geoid,
ayf.census_acs5_puma_geoid,
ayf.census_acs5_school_district_elementary_geoid,
ayf.census_acs5_school_district_secondary_geoid,
ayf.census_acs5_school_district_unified_geoid,
ayf.census_acs5_state_representative_geoid,
ayf.census_acs5_state_senate_geoid,
ayf.census_acs5_tract_geoid,
dj.census_acs5_congressional_district_geoid,
dj.census_acs5_county_subdivision_geoid,
dj.census_acs5_place_geoid,
dj.census_acs5_puma_geoid,
dj.census_acs5_school_district_elementary_geoid,
dj.census_acs5_school_district_secondary_geoid,
dj.census_acs5_school_district_unified_geoid,
dj.census_acs5_state_representative_geoid,
dj.census_acs5_state_senate_geoid,
dj.census_acs5_tract_geoid,
dj.year AS census_acs5_data_year,
pcl.year
FROM spatial.parcel AS pcl
FROM {{ source('spatial', 'parcel') }} AS pcl
LEFT JOIN acs5_year_fill AS ayf
ON pcl.year = ayf.pin_year
LEFT JOIN distinct_joined AS dj
Expand All @@ -114,3 +115,5 @@ WITH (
AND pcl.y_3435 = dj.y_3435
WHERE pcl.year >= (SELECT MIN(year) FROM distinct_years_rhs)
)

SELECT * FROM census_acs5
46 changes: 25 additions & 21 deletions aws-athena/ctas/location-chicago.sql
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
CREATE TABLE IF NOT EXISTS location.chicago
WITH (
FORMAT = 'Parquet',
WRITE_COMPRESSION = 'SNAPPY',
EXTERNAL_LOCATION = 's3://ccao-athena-ctas-us-east-1/location/chicago',
PARTITIONED_BY = ARRAY['year'],
BUCKETED_BY = ARRAY['pin10'],
BUCKET_COUNT = 1
) AS (
{{
config(
materialized='table',
partitioned_by=['year'],
bucketed_by=['pin10'],
bucket_count=1
)
}}

WITH chicago AS (
WITH distinct_pins AS (
SELECT DISTINCT
x_3435,
y_3435
FROM spatial.parcel
FROM {{ source('spatial', 'parcel') }}
),

distinct_years AS (
SELECT DISTINCT year
FROM spatial.parcel
FROM {{ source('spatial', 'parcel') }}
),

distinct_years_rhs AS (
SELECT DISTINCT year FROM spatial.police_district
SELECT DISTINCT year FROM {{ source('spatial', 'police_district') }}
UNION ALL
SELECT DISTINCT year FROM spatial.community_area
SELECT DISTINCT year FROM {{ source('spatial', 'community_area') }}
UNION ALL
SELECT DISTINCT year FROM spatial.industrial_corridor
SELECT DISTINCT year FROM {{ source('spatial', 'industrial_corridor') }}
),

police_district AS (
Expand All @@ -44,12 +45,12 @@ WITH (
SELECT
dy.year AS pin_year,
MAX(df.year) AS fill_year
FROM spatial.police_district AS df
FROM {{ source('spatial', 'police_district') }} AS df
CROSS JOIN distinct_years AS dy
WHERE dy.year >= df.year
GROUP BY dy.year
) AS fill_years
LEFT JOIN spatial.police_district AS fill_data
LEFT JOIN {{ source('spatial', 'police_district') }} AS fill_data
ON fill_years.fill_year = fill_data.year
) AS cprod
ON ST_WITHIN(
Expand Down Expand Up @@ -77,12 +78,12 @@ WITH (
SELECT
dy.year AS pin_year,
MAX(df.year) AS fill_year
FROM spatial.community_area AS df
FROM {{ source('spatial', 'community_area') }} AS df
CROSS JOIN distinct_years AS dy
WHERE dy.year >= df.year
GROUP BY dy.year
) AS fill_years
LEFT JOIN spatial.community_area AS fill_data
LEFT JOIN {{ source('spatial', 'community_area') }} AS fill_data
ON fill_years.fill_year = fill_data.year
) AS cprod
ON ST_WITHIN(
Expand Down Expand Up @@ -110,12 +111,13 @@ WITH (
SELECT
dy.year AS pin_year,
MAX(df.year) AS fill_year
FROM spatial.industrial_corridor AS df
FROM {{ source('spatial', 'industrial_corridor') }} AS df
CROSS JOIN distinct_years AS dy
WHERE dy.year >= df.year
GROUP BY dy.year
) AS fill_years
LEFT JOIN spatial.industrial_corridor AS fill_data
LEFT JOIN
{{ source('spatial', 'industrial_corridor') }} AS fill_data
ON fill_years.fill_year = fill_data.year
) AS cprod
ON ST_WITHIN(
Expand All @@ -136,7 +138,7 @@ WITH (
pd.chicago_police_district_num,
pd.chicago_police_district_data_year,
pcl.year
FROM spatial.parcel AS pcl
FROM {{ source('spatial', 'parcel') }} AS pcl
LEFT JOIN police_district AS pd
ON pcl.x_3435 = pd.x_3435
AND pcl.y_3435 = pd.y_3435
Expand All @@ -151,3 +153,5 @@ WITH (
AND pcl.year = ic.pin_year
WHERE pcl.year >= (SELECT MIN(year) FROM distinct_years_rhs)
)

SELECT * FROM chicago;
Loading

0 comments on commit 8d9b168

Please sign in to comment.