Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Athena table definitions to move them into the dbt DAG #111

Merged
5 changes: 5 additions & 0 deletions .github/actions/setup_dbt/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ inputs:
role-to-assume:
description: AWS IAM role to assume when running dbt operations.
required: true
role-duration-seconds:
description: Expiration time for AWS OIDC token. Default is one hour.
required: false
default: 3600
jeancochrane marked this conversation as resolved.
Show resolved Hide resolved
runs:
using: composite
steps:
Expand All @@ -18,6 +22,7 @@ runs:
with:
role-to-assume: ${{ inputs.role-to-assume }}
aws-region: us-east-1
role-duration-seconds: ${{ inputs.role-duration-seconds }}

- name: Configure dbt environment
uses: ./.github/actions/configure_dbt_environment
1 change: 1 addition & 0 deletions .github/workflows/build_and_test_dbt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ jobs:
uses: ./.github/actions/setup_dbt
with:
role-to-assume: ${{ secrets.AWS_IAM_ROLE_TO_ASSUME_ARN }}
role-duration-seconds: 14400 # Worst-case time for full build
jeancochrane marked this conversation as resolved.
Show resolved Hide resolved

- name: Restore dbt state cache
id: cache
Expand Down
33 changes: 18 additions & 15 deletions aws-athena/ctas/location-access.sql
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
CREATE TABLE IF NOT EXISTS location.access
WITH (
FORMAT = 'Parquet',
WRITE_COMPRESSION = 'SNAPPY',
EXTERNAL_LOCATION = 's3://ccao-athena-ctas-us-east-1/location/access',
dfsnow marked this conversation as resolved.
Show resolved Hide resolved
PARTITIONED_BY = ARRAY['year'],
BUCKETED_BY = ARRAY['pin10'],
BUCKET_COUNT = 1
) AS (
{{
config(
materialized='table',
partitioned_by=['year'],
bucketed_by=['pin10'],
bucket_count=1
)
}}

WITH access AS (
jeancochrane marked this conversation as resolved.
Show resolved Hide resolved
WITH distinct_pins AS (
SELECT DISTINCT
x_3435,
y_3435
FROM spatial.parcel
FROM {{ source('spatial', 'parcel') }}
),

distinct_years AS (
SELECT DISTINCT year
FROM spatial.parcel
FROM {{ source('spatial', 'parcel') }}
),

distinct_years_rhs AS (
SELECT DISTINCT year
FROM spatial.walkability
FROM {{ source('spatial', 'walkability') }}
),

walkability AS (
Expand All @@ -43,12 +44,12 @@ WITH (
SELECT
dy.year AS pin_year,
MAX(df.year) AS fill_year
FROM spatial.walkability AS df
FROM {{ source('spatial', 'walkability') }} AS df
CROSS JOIN distinct_years AS dy
WHERE dy.year >= df.year
GROUP BY dy.year
) AS fill_years
LEFT JOIN spatial.walkability AS fill_data
LEFT JOIN {{ source('spatial', 'walkability') }} AS fill_data
ON fill_years.fill_year = fill_data.year
) AS cprod
ON ST_WITHIN(
Expand All @@ -65,10 +66,12 @@ WITH (
walk.access_cmap_walk_total_score,
walk.access_cmap_walk_data_year,
pcl.year
FROM spatial.parcel AS pcl
FROM {{ source('spatial', 'parcel') }} AS pcl
LEFT JOIN walkability AS walk
ON pcl.x_3435 = walk.x_3435
AND pcl.y_3435 = walk.y_3435
AND pcl.year = walk.pin_year
WHERE pcl.year >= (SELECT MIN(year) FROM distinct_years_rhs)
)

SELECT * FROM access
29 changes: 16 additions & 13 deletions aws-athena/ctas/location-census.sql
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
CREATE TABLE IF NOT EXISTS location.census
WITH (
FORMAT = 'Parquet',
WRITE_COMPRESSION = 'SNAPPY',
EXTERNAL_LOCATION = 's3://ccao-athena-ctas-us-east-1/location/census',
PARTITIONED_BY = ARRAY['year'],
BUCKETED_BY = ARRAY['pin10'],
BUCKET_COUNT = 1
) AS (
{{
config(
materialized='table',
partitioned_by=['year'],
bucketed_by=['pin10'],
bucket_count=1
)
}}

WITH census AS (
WITH distinct_pins AS (
SELECT DISTINCT
x_3435,
y_3435
FROM spatial.parcel
FROM {{ source('spatial', 'parcel') }}
),

distinct_years_rhs AS (
SELECT DISTINCT year
FROM spatial.census
FROM {{ source('spatial', 'census') }}
),

distinct_joined AS (
Expand Down Expand Up @@ -64,7 +65,7 @@ WITH (
END) AS census_zcta_geoid,
cen.year
FROM distinct_pins AS dp
LEFT JOIN spatial.census AS cen
LEFT JOIN {{ source('spatial', 'census') }} AS cen
ON ST_WITHIN(
ST_POINT(dp.x_3435, dp.y_3435),
ST_GEOMFROMBINARY(cen.geometry_3435)
Expand All @@ -89,10 +90,12 @@ WITH (
dj.census_zcta_geoid,
dj.year AS census_data_year,
pcl.year
FROM spatial.parcel AS pcl
FROM {{ source('spatial', 'parcel') }} AS pcl
LEFT JOIN distinct_joined AS dj
ON pcl.year = dj.year
AND pcl.x_3435 = dj.x_3435
AND pcl.y_3435 = dj.y_3435
WHERE pcl.year >= (SELECT MIN(year) FROM distinct_years_rhs)
)

SELECT * FROM census
55 changes: 29 additions & 26 deletions aws-athena/ctas/location-census_acs5.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,43 @@
-- current PINs. For example, 2020 ACS data is not yet release, but we do have
-- current (2021) geographies and PINs. We need to join 2019 geographies to
-- 2021 PINs to facilitate joining 2019 ACS5 data
CREATE TABLE IF NOT EXISTS location.census_acs5
WITH (
FORMAT = 'Parquet',
WRITE_COMPRESSION = 'SNAPPY',
EXTERNAL_LOCATION = 's3://ccao-athena-ctas-us-east-1/location/census_acs5',
PARTITIONED_BY = ARRAY['year'],
BUCKETED_BY = ARRAY['pin10'],
BUCKET_COUNT = 1
) AS (
{{
config(
materialized='table',
partitioned_by=['year'],
bucketed_by=['pin10'],
bucket_count=1
)
}}

WITH census_acs5 AS (
WITH distinct_pins AS (
SELECT DISTINCT
x_3435,
y_3435
FROM spatial.parcel
FROM {{ source('spatial', 'parcel') }}
),

distinct_years AS (
SELECT DISTINCT year
FROM spatial.parcel
FROM {{ source('spatial', 'parcel') }}
),

distinct_years_rhs AS (
SELECT DISTINCT year
FROM spatial.census
FROM {{ source('spatial', 'census') }}
),

acs5_max_year AS (
SELECT MAX(year) AS max_year
FROM census.acs5
FROM {{ source('census', 'acs5') }}
),

acs5_year_fill AS (
SELECT
dy.year AS pin_year,
MAX(df.year) AS fill_year
FROM census.acs5 AS df
FROM {{ source('census', 'acs5') }} AS df
CROSS JOIN distinct_years AS dy
WHERE dy.year >= df.year
GROUP BY dy.year
Expand Down Expand Up @@ -81,7 +82,7 @@ WITH (
FROM distinct_pins AS dp
LEFT JOIN (
SELECT *
FROM spatial.census
FROM {{ source('spatial', 'census') }}
WHERE year <= (SELECT max_year FROM acs5_max_year)
) AS cen
ON ST_WITHIN(
Expand All @@ -93,19 +94,19 @@ WITH (

SELECT
pcl.pin10,
ayf.census_acs5_congressional_district_geoid,
ayf.census_acs5_county_subdivision_geoid,
ayf.census_acs5_place_geoid,
ayf.census_acs5_puma_geoid,
ayf.census_acs5_school_district_elementary_geoid,
ayf.census_acs5_school_district_secondary_geoid,
ayf.census_acs5_school_district_unified_geoid,
ayf.census_acs5_state_representative_geoid,
ayf.census_acs5_state_senate_geoid,
ayf.census_acs5_tract_geoid,
dj.census_acs5_congressional_district_geoid,
dj.census_acs5_county_subdivision_geoid,
dj.census_acs5_place_geoid,
dj.census_acs5_puma_geoid,
dj.census_acs5_school_district_elementary_geoid,
dj.census_acs5_school_district_secondary_geoid,
dj.census_acs5_school_district_unified_geoid,
dj.census_acs5_state_representative_geoid,
dj.census_acs5_state_senate_geoid,
dj.census_acs5_tract_geoid,
jeancochrane marked this conversation as resolved.
Show resolved Hide resolved
dj.year AS census_acs5_data_year,
pcl.year
FROM spatial.parcel AS pcl
FROM {{ source('spatial', 'parcel') }} AS pcl
LEFT JOIN acs5_year_fill AS ayf
ON pcl.year = ayf.pin_year
LEFT JOIN distinct_joined AS dj
Expand All @@ -114,3 +115,5 @@ WITH (
AND pcl.y_3435 = dj.y_3435
WHERE pcl.year >= (SELECT MIN(year) FROM distinct_years_rhs)
)

SELECT * FROM census_acs5
46 changes: 25 additions & 21 deletions aws-athena/ctas/location-chicago.sql
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
CREATE TABLE IF NOT EXISTS location.chicago
WITH (
FORMAT = 'Parquet',
WRITE_COMPRESSION = 'SNAPPY',
EXTERNAL_LOCATION = 's3://ccao-athena-ctas-us-east-1/location/chicago',
PARTITIONED_BY = ARRAY['year'],
BUCKETED_BY = ARRAY['pin10'],
BUCKET_COUNT = 1
) AS (
{{
config(
materialized='table',
partitioned_by=['year'],
bucketed_by=['pin10'],
bucket_count=1
)
}}

WITH chicago AS (
WITH distinct_pins AS (
SELECT DISTINCT
x_3435,
y_3435
FROM spatial.parcel
FROM {{ source('spatial', 'parcel') }}
),

distinct_years AS (
SELECT DISTINCT year
FROM spatial.parcel
FROM {{ source('spatial', 'parcel') }}
),

distinct_years_rhs AS (
SELECT DISTINCT year FROM spatial.police_district
SELECT DISTINCT year FROM {{ source('spatial', 'police_district') }}
UNION ALL
SELECT DISTINCT year FROM spatial.community_area
SELECT DISTINCT year FROM {{ source('spatial', 'community_area') }}
UNION ALL
SELECT DISTINCT year FROM spatial.industrial_corridor
SELECT DISTINCT year FROM {{ source('spatial', 'industrial_corridor') }}
),

police_district AS (
Expand All @@ -44,12 +45,12 @@ WITH (
SELECT
dy.year AS pin_year,
MAX(df.year) AS fill_year
FROM spatial.police_district AS df
FROM {{ source('spatial', 'police_district') }} AS df
CROSS JOIN distinct_years AS dy
WHERE dy.year >= df.year
GROUP BY dy.year
) AS fill_years
LEFT JOIN spatial.police_district AS fill_data
LEFT JOIN {{ source('spatial', 'police_district') }} AS fill_data
ON fill_years.fill_year = fill_data.year
) AS cprod
ON ST_WITHIN(
Expand Down Expand Up @@ -77,12 +78,12 @@ WITH (
SELECT
dy.year AS pin_year,
MAX(df.year) AS fill_year
FROM spatial.community_area AS df
FROM {{ source('spatial', 'community_area') }} AS df
CROSS JOIN distinct_years AS dy
WHERE dy.year >= df.year
GROUP BY dy.year
) AS fill_years
LEFT JOIN spatial.community_area AS fill_data
LEFT JOIN {{ source('spatial', 'community_area') }} AS fill_data
ON fill_years.fill_year = fill_data.year
) AS cprod
ON ST_WITHIN(
Expand Down Expand Up @@ -110,12 +111,13 @@ WITH (
SELECT
dy.year AS pin_year,
MAX(df.year) AS fill_year
FROM spatial.industrial_corridor AS df
FROM {{ source('spatial', 'industrial_corridor') }} AS df
CROSS JOIN distinct_years AS dy
WHERE dy.year >= df.year
GROUP BY dy.year
) AS fill_years
LEFT JOIN spatial.industrial_corridor AS fill_data
LEFT JOIN
{{ source('spatial', 'industrial_corridor') }} AS fill_data
ON fill_years.fill_year = fill_data.year
) AS cprod
ON ST_WITHIN(
Expand All @@ -136,7 +138,7 @@ WITH (
pd.chicago_police_district_num,
pd.chicago_police_district_data_year,
pcl.year
FROM spatial.parcel AS pcl
FROM {{ source('spatial', 'parcel') }} AS pcl
LEFT JOIN police_district AS pd
ON pcl.x_3435 = pd.x_3435
AND pcl.y_3435 = pd.y_3435
Expand All @@ -151,3 +153,5 @@ WITH (
AND pcl.year = ic.pin_year
WHERE pcl.year >= (SELECT MIN(year) FROM distinct_years_rhs)
)

SELECT * FROM chicago;
Loading