Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add proximity.dist_pin_to_pin to the dbt DAG using intermediate transforms #138

138 changes: 2 additions & 136 deletions aws-athena/ctas/proximity-dist_pin_to_pin.sql
Original file line number Diff line number Diff line change
@@ -1,136 +1,2 @@
-- CTAS to find the 3 nearest neighbor PINs for every PIN for every year
CREATE TABLE IF NOT EXISTS proximity.dist_pin_to_pin_temp
dfsnow marked this conversation as resolved.
Show resolved Hide resolved
WITH (
FORMAT = 'Parquet',
WRITE_COMPRESSION = 'SNAPPY',
EXTERNAL_LOCATION
= 's3://ccao-athena-results-us-east-1/dist_pin_to_pin_temp',
PARTITIONED_BY = ARRAY['year']
) AS (
WITH pin_locations AS (
SELECT
pin10,
year,
x_3435,
y_3435,
ST_POINT(x_3435, y_3435) AS point
FROM spatial.parcel
),

most_recent_pins AS (
-- Parcel centroids may shift very slightly over time in GIS shapefiles.
-- We want to make sure we only grab the most recent instance of a given
-- parcel to avoid duplicates caused by these slight shifts.
SELECT
x_3435,
y_3435,
RANK() OVER (PARTITION BY pin10 ORDER BY year DESC) AS r
FROM spatial.parcel
),

distinct_pins AS (
SELECT DISTINCT
x_3435,
y_3435
FROM most_recent_pins
WHERE r = 1
),

pin_dists AS (
SELECT *
FROM (
SELECT
dists.*,
ROW_NUMBER()
OVER (
PARTITION BY dists.x_3435, dists.y_3435, dists.year
ORDER BY dists.dist
)
AS row_num
FROM (
SELECT
dp.x_3435,
dp.y_3435,
loc.year,
loc.pin10,
ST_DISTANCE(ST_POINT(dp.x_3435, dp.y_3435), loc.point) AS dist
FROM distinct_pins AS dp
INNER JOIN pin_locations AS loc
ON ST_CONTAINS(
ST_BUFFER(ST_POINT(dp.x_3435, dp.y_3435), 1000), loc.point
)
) AS dists
)
WHERE row_num <= 4
)

SELECT *
FROM (
SELECT
pcl.pin10,
MAX(CASE
WHEN pd.row_num = 2 THEN pd.pin10
END) AS nearest_neighbor_1_pin10,
MAX(CASE
WHEN pd.row_num = 2 THEN pd.dist
END) AS nearest_neighbor_1_dist_ft,
MAX(CASE
WHEN pd.row_num = 3 THEN pd.pin10
END) AS nearest_neighbor_2_pin10,
MAX(CASE
WHEN pd.row_num = 3 THEN pd.dist
END) AS nearest_neighbor_2_dist_ft,
MAX(CASE
WHEN pd.row_num = 4 THEN pd.pin10
END) AS nearest_neighbor_3_pin10,
MAX(CASE
WHEN pd.row_num = 4 THEN pd.dist
END) AS nearest_neighbor_3_dist_ft,
pcl.year
FROM spatial.parcel AS pcl
INNER JOIN pin_dists AS pd
ON pcl.x_3435 = pd.x_3435
AND pcl.y_3435 = pd.y_3435
AND pcl.year = pd.year
GROUP BY pcl.pin10, pcl.year
)
WHERE nearest_neighbor_1_pin10 IS NOT NULL
AND nearest_neighbor_2_pin10 IS NOT NULL
AND nearest_neighbor_3_pin10 IS NOT NULL
)

-- Consolidate unbucketed files into single files and delete temp table
CREATE TABLE IF NOT EXISTS proximity.dist_pin_to_pin
WITH (
format='Parquet',
write_compression = 'SNAPPY',
external_location='s3://ccao-athena-ctas-us-east-1/proximity/dist_pin_to_pin',
partitioned_by = ARRAY['year'],
bucketed_by = ARRAY['pin10'],
bucket_count = 1
) AS (
SELECT
pin10,
nearest_neighbor_1_pin10,
nearest_neighbor_1_dist_ft,
nearest_neighbor_2_pin10,
nearest_neighbor_2_dist_ft,
nearest_neighbor_3_pin10,
nearest_neighbor_3_dist_ft,
year
FROM proximity.dist_pin_to_pin_temp
UNION
SELECT
pin10,
nearest_neighbor_1_pin10,
nearest_neighbor_1_dist_ft,
nearest_neighbor_2_pin10,
nearest_neighbor_2_dist_ft,
nearest_neighbor_3_pin10,
nearest_neighbor_3_dist_ft,
year
FROM proximity.dist_pin_to_pin_temp2
);

DROP TABLE IF EXISTS proximity.dist_pin_to_pin_temp
DROP TABLE IF EXISTS proximity.dist_pin_to_pin_temp2
-- View that finds the 3 nearest neighbor PINs for every PIN for every year
SELECT * FROM {{ ref('proximity.dist_pin_to_pin_10km') }}
22 changes: 22 additions & 0 deletions aws-athena/ctas/proximity-dist_pin_to_pin_10km.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- CTAS that finds the 3 nearest neighbor PINs for every PIN for every year
-- within a 10km radius, filtered for PINs that do not have three neighbors
-- within a 1km radius
{{
config(
materialized='table',
partitioned_by=['year'],
bucketed_by=['pin10'],
bucket_count=1
)
}}

SELECT *
FROM (
{{
nearest_pin_neighbors(
ref('proximity.dist_pin_to_pin_1km_missing_matches'),
3,
10000
)
}}
)
21 changes: 21 additions & 0 deletions aws-athena/ctas/proximity-dist_pin_to_pin_1km.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- CTAS that finds the 3 nearest neighbor PINs for every PIN for every year
-- within a 1km radius
{{
config(
materialized='table',
partitioned_by=['year'],
bucketed_by=['pin10'],
bucket_count=1
)
}}

SELECT *
FROM (
{{
nearest_pin_neighbors(
source('spatial', 'parcel'),
3,
1000
)
}}
)
14 changes: 14 additions & 0 deletions aws-athena/ctas/proximity-dist_pin_to_pin_1km_missing_matches.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- Select all PINs from spatial.parcel that do not have 3 neighbors within 1km
{{ config(materialized='ephemeral') }}

SELECT
pcl.pin10,
pcl.year,
pcl.x_3435,
pcl.y_3435,
dist_pin_to_pin_1km.pin10
FROM {{ source('spatial', 'parcel') }} AS pcl
LEFT JOIN {{ ref('proximity.dist_pin_to_pin_1km') }} AS dist_pin_to_pin_1km
ON pcl.pin10 = dist_pin_to_pin_1km.pin10
AND pcl.year = dist_pin_to_pin_1km.year
WHERE dist_pin_to_pin_1km.pin10 IS NULL
2 changes: 1 addition & 1 deletion aws-athena/views/proximity-vw_pin10_proximity.sql
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,6 @@ LEFT JOIN
LEFT JOIN {{ ref('proximity.dist_pin_to_water') }} AS dist_pin_to_water
ON pin.pin10 = dist_pin_to_water.pin10
AND pin.year = dist_pin_to_water.year
LEFT JOIN {{ source('proximity', 'dist_pin_to_pin') }} AS dist_pin_to_pin
LEFT JOIN {{ ref('proximity.dist_pin_to_pin') }} AS dist_pin_to_pin
ON pin.pin10 = dist_pin_to_pin.pin10
AND pin.year = dist_pin_to_pin.year
2 changes: 1 addition & 1 deletion aws-athena/views/proximity-vw_pin10_proximity_fill.sql
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ LEFT JOIN
LEFT JOIN {{ ref('proximity.dist_pin_to_park') }} AS dist_pin_to_park
ON pin.pin10 = dist_pin_to_park.pin10
AND cyf.nearest_park_data_year = dist_pin_to_park.year
LEFT JOIN {{ source('proximity', 'dist_pin_to_pin') }} AS dist_pin_to_pin
LEFT JOIN {{ ref('proximity.dist_pin_to_pin') }} AS dist_pin_to_pin
ON pin.pin10 = dist_pin_to_pin.pin10
AND cyf.year = dist_pin_to_pin.year -- NOTE, doesn't need to be filled
LEFT JOIN
Expand Down
82 changes: 82 additions & 0 deletions dbt/macros/nearest_pin_neighbors.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
{% macro nearest_pin_neighbors(source_model, num_neighbors, radius_km) %}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This macro is nearly identical to the pre-existing aws-athena/ctas/proximity-dist_pin_to_pin.sql query, with three main changes:

  1. Parameterization of source_model, num_neigbors, and radius_km
  2. Addition of pin10 to the distinct_pins subquery per our missing data discussion on Teams (diff here)
  3. Reformatting to match dbt styles using sqlfmt

Everything else should be identical!

with
pin_locations as (
select pin10, year, x_3435, y_3435, st_point(x_3435, y_3435) as point
from {{ source("spatial", "parcel") }}
),

most_recent_pins as (
-- Parcel centroids may shift very slightly over time in GIS shapefiles.
-- We want to make sure we only grab the most recent instance of a given
-- parcel to avoid duplicates caused by these slight shifts.
select
x_3435, y_3435, rank() over (partition by pin10 order by year desc) as r
from {{ source_model }}
),

distinct_pins as (
select distinct x_3435, y_3435 from most_recent_pins where r = 1
),

pin_dists as (
select *
from
(
select
dists.*,
row_number() over (
partition by dists.x_3435, dists.y_3435, dists.year
order by dists.dist
) as row_num
from
(
select
dp.x_3435,
dp.y_3435,
loc.year,
loc.pin10,
st_distance(
st_point(dp.x_3435, dp.y_3435), loc.point
) as dist
from distinct_pins as dp
inner join
pin_locations as loc
on st_contains(
st_buffer(
st_point(dp.x_3435, dp.y_3435), {{ radius_km }}
),
loc.point
)
) as dists
)
where row_num <= 4
)

select *
from
(
select
pcl.pin10,
{% for idx in range(1, num_neighbors + 1) %}
max(
case when pd.row_num = {{ idx + 1 }} then pd.pin10 end
) as nearest_neighbor_{{ idx }}_pin10,
max(
case when pd.row_num = {{ idx + 1 }} then pd.dist end
) as nearest_neighbor_{{ idx }}_dist_ft,
{% endfor %}
pcl.year
from {{ source("spatial", "parcel") }} as pcl
inner join
pin_dists as pd
on pcl.x_3435 = pd.x_3435
and pcl.y_3435 = pd.y_3435
and pcl.year = pd.year
group by pcl.pin10, pcl.year
)
where
{% for idx in range(1, num_neighbors + 1) %}
{% if idx != 1 %} and{% endif %}
nearest_neighbor_{{ idx }}_pin10 is not null
{% endfor %}
{% endmacro %}
22 changes: 22 additions & 0 deletions dbt/models/proximity/docs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{% docs dist_pin_to_pin %}
View that finds the three nearest neighbor PINs for every PIN for every year.
{% enddocs %}

{% docs dist_pin_to_pin_intermediate %}
Intermediate table used to generate `proximity.dist_pin_to_pin`.

The `proximity.dist_pin_to_pin` view is intended to record distances to the
three closest PINs for all PINs in the county for all years in the data.
This type of recursive spatial query is expensive, however, and some PINs are
quite far (>1km) from the nearest three PINs, so we use intermediate tables
to strike a balance between data completeness and computational efficiency.

To compute the full set of distances in `proximity.dist_pin_to_pin`, we first
generate PIN-to-PIN distances using a 1km buffer and store the results in the
`proximity.dist_pin_to_pin_1km` table. Then, we query for PINs that did not have
any matches within 1km, using the ephemeral model
`proximity.dist_pin_to_pin_1km_missing_matches` to encapsulate the logic. Next,
we redo the query with an expanded 10km buffer, storing the results in the
`proximity.dist_pin_to_pin_10km` table. Finally, the 10km table is aliased to
the `proximity.dist_pin_to_pin` view for ease of querying.
{% enddocs %}
1 change: 1 addition & 0 deletions dbt/models/proximity/proximity.dist_pin_to_pin.sql
1 change: 1 addition & 0 deletions dbt/models/proximity/proximity.dist_pin_to_pin_10km.sql
1 change: 1 addition & 0 deletions dbt/models/proximity/proximity.dist_pin_to_pin_1km.sql
13 changes: 8 additions & 5 deletions dbt/models/proximity/schema.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
version: 2


sources:
- name: proximity
tables:
- name: dist_pin_to_pin

models:
- name: proximity.cnt_pin_num_bus_stop
- name: proximity.cnt_pin_num_foreclosure
Expand All @@ -22,6 +17,14 @@ models:
- name: proximity.dist_pin_to_metra_route
- name: proximity.dist_pin_to_metra_stop
- name: proximity.dist_pin_to_park
- name: proximity.dist_pin_to_pin
description: '{{ doc("dist_pin_to_pin") }}'
- name: proximity.dist_pin_to_pin_1km
description: '{{ doc("dist_pin_to_pin_intermediate") }}'
- name: proximity.dist_pin_to_pin_1km_missing_matches
description: '{{ doc("dist_pin_to_pin_intermediate") }}'
- name: proximity.dist_pin_to_pin_10km
description: '{{ doc("dist_pin_to_pin_intermediate") }}'
- name: proximity.dist_pin_to_railroad
- name: proximity.dist_pin_to_water
- name: proximity.vw_pin10_proximity
Expand Down
Loading