Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add proximity.dist_pin_to_pin to the dbt DAG using intermediate transforms #138

Merged
3 changes: 3 additions & 0 deletions .sqlfluff
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ extended_capitalisation_policy = upper

[sqlfluff:rules:convention.casting_style]
preferred_type_casting_style = cast

[sqlfluff:templater:jinja]
load_macros_from_path = dbt/macros
dfsnow marked this conversation as resolved.
Show resolved Hide resolved
140 changes: 4 additions & 136 deletions aws-athena/ctas/proximity-dist_pin_to_pin.sql
Original file line number Diff line number Diff line change
@@ -1,136 +1,4 @@
-- CTAS to find the 3 nearest neighbor PINs for every PIN for every year
dfsnow marked this conversation as resolved.
Show resolved Hide resolved
CREATE TABLE IF NOT EXISTS proximity.dist_pin_to_pin_temp
WITH (
FORMAT = 'Parquet',
WRITE_COMPRESSION = 'SNAPPY',
EXTERNAL_LOCATION
= 's3://ccao-athena-results-us-east-1/dist_pin_to_pin_temp',
PARTITIONED_BY = ARRAY['year']
) AS (
WITH pin_locations AS (
SELECT
pin10,
year,
x_3435,
y_3435,
ST_POINT(x_3435, y_3435) AS point
FROM spatial.parcel
),

most_recent_pins AS (
-- Parcel centroids may shift very slightly over time in GIS shapefiles.
-- We want to make sure we only grab the most recent instance of a given
-- parcel to avoid duplicates caused by these slight shifts.
SELECT
x_3435,
y_3435,
RANK() OVER (PARTITION BY pin10 ORDER BY year DESC) AS r
FROM spatial.parcel
),

distinct_pins AS (
SELECT DISTINCT
x_3435,
y_3435
FROM most_recent_pins
WHERE r = 1
),

pin_dists AS (
SELECT *
FROM (
SELECT
dists.*,
ROW_NUMBER()
OVER (
PARTITION BY dists.x_3435, dists.y_3435, dists.year
ORDER BY dists.dist
)
AS row_num
FROM (
SELECT
dp.x_3435,
dp.y_3435,
loc.year,
loc.pin10,
ST_DISTANCE(ST_POINT(dp.x_3435, dp.y_3435), loc.point) AS dist
FROM distinct_pins AS dp
INNER JOIN pin_locations AS loc
ON ST_CONTAINS(
ST_BUFFER(ST_POINT(dp.x_3435, dp.y_3435), 1000), loc.point
)
) AS dists
)
WHERE row_num <= 4
)

SELECT *
FROM (
SELECT
pcl.pin10,
MAX(CASE
WHEN pd.row_num = 2 THEN pd.pin10
END) AS nearest_neighbor_1_pin10,
MAX(CASE
WHEN pd.row_num = 2 THEN pd.dist
END) AS nearest_neighbor_1_dist_ft,
MAX(CASE
WHEN pd.row_num = 3 THEN pd.pin10
END) AS nearest_neighbor_2_pin10,
MAX(CASE
WHEN pd.row_num = 3 THEN pd.dist
END) AS nearest_neighbor_2_dist_ft,
MAX(CASE
WHEN pd.row_num = 4 THEN pd.pin10
END) AS nearest_neighbor_3_pin10,
MAX(CASE
WHEN pd.row_num = 4 THEN pd.dist
END) AS nearest_neighbor_3_dist_ft,
pcl.year
FROM spatial.parcel AS pcl
INNER JOIN pin_dists AS pd
ON pcl.x_3435 = pd.x_3435
AND pcl.y_3435 = pd.y_3435
AND pcl.year = pd.year
GROUP BY pcl.pin10, pcl.year
)
WHERE nearest_neighbor_1_pin10 IS NOT NULL
AND nearest_neighbor_2_pin10 IS NOT NULL
AND nearest_neighbor_3_pin10 IS NOT NULL
)

-- Consolidate unbucketed files into single files and delete temp table
CREATE TABLE IF NOT EXISTS proximity.dist_pin_to_pin
WITH (
format='Parquet',
write_compression = 'SNAPPY',
external_location='s3://ccao-athena-ctas-us-east-1/proximity/dist_pin_to_pin',
partitioned_by = ARRAY['year'],
bucketed_by = ARRAY['pin10'],
bucket_count = 1
) AS (
SELECT
pin10,
nearest_neighbor_1_pin10,
nearest_neighbor_1_dist_ft,
nearest_neighbor_2_pin10,
nearest_neighbor_2_dist_ft,
nearest_neighbor_3_pin10,
nearest_neighbor_3_dist_ft,
year
FROM proximity.dist_pin_to_pin_temp
UNION
SELECT
pin10,
nearest_neighbor_1_pin10,
nearest_neighbor_1_dist_ft,
nearest_neighbor_2_pin10,
nearest_neighbor_2_dist_ft,
nearest_neighbor_3_pin10,
nearest_neighbor_3_dist_ft,
year
FROM proximity.dist_pin_to_pin_temp2
);

DROP TABLE IF EXISTS proximity.dist_pin_to_pin_temp
DROP TABLE IF EXISTS proximity.dist_pin_to_pin_temp2
-- View that finds the 3 nearest neighbor PINs for every PIN for every year
SELECT * FROM {{ ref('proximity.dist_pin_to_pin_1km') }}
UNION
SELECT * FROM {{ ref('proximity.dist_pin_to_pin_10km') }}
dfsnow marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (blocking): If dist_pin_to_pin_10km.sql only contains the values missing from dist_pin_to_pin_1km.sql, then shouldn't this be the union of dist_pin_to_pin_1km.sql and dist_pin_to_pin_10km.sql?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, excellent catch @dfsnow! Embarrassing mistake on my part 🙈 This is fixed in a0f7423, and I updated the table docs to reflect it 👍🏻

36 changes: 36 additions & 0 deletions aws-athena/ctas/proximity-dist_pin_to_pin_10km.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- CTAS that finds the 3 nearest neighbor PINs for every PIN for every year
-- within a 10km radius, filtered for PINs that do not have three neighbors
-- within a 1km radius
{{
config(
materialized='table',
partitioned_by=['year'],
bucketed_by=['pin10'],
bucket_count=1
)
}}

WITH missing_matches AS ( -- noqa: ST03
dfsnow marked this conversation as resolved.
Show resolved Hide resolved
SELECT
pcl.pin10,
pcl.year,
pcl.x_3435,
pcl.y_3435,
dist_pin_to_pin_1km.pin10 AS matching_pin10
FROM {{ source('spatial', 'parcel') }} AS pcl
LEFT JOIN {{ ref('proximity.dist_pin_to_pin_1km') }} AS dist_pin_to_pin_1km
ON pcl.pin10 = dist_pin_to_pin_1km.pin10
AND pcl.year = dist_pin_to_pin_1km.year
WHERE dist_pin_to_pin_1km.pin10 IS NULL
)

SELECT *
FROM (
{{
nearest_pin_neighbors(
'missing_matches',
3,
10000
)
}}
)
21 changes: 21 additions & 0 deletions aws-athena/ctas/proximity-dist_pin_to_pin_1km.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- CTAS that finds the 3 nearest neighbor PINs for every PIN for every year
-- within a 1km radius
{{
config(
materialized='table',
partitioned_by=['year'],
bucketed_by=['pin10'],
bucket_count=1
)
}}

SELECT *
FROM (
{{
nearest_pin_neighbors(
source('spatial', 'parcel'),
3,
1000
)
}}
)
2 changes: 1 addition & 1 deletion aws-athena/views/proximity-vw_pin10_proximity.sql
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,6 @@ LEFT JOIN
LEFT JOIN {{ ref('proximity.dist_pin_to_water') }} AS dist_pin_to_water
ON pin.pin10 = dist_pin_to_water.pin10
AND pin.year = dist_pin_to_water.year
LEFT JOIN {{ source('proximity', 'dist_pin_to_pin') }} AS dist_pin_to_pin
LEFT JOIN {{ ref('proximity.dist_pin_to_pin') }} AS dist_pin_to_pin
ON pin.pin10 = dist_pin_to_pin.pin10
AND pin.year = dist_pin_to_pin.year
2 changes: 1 addition & 1 deletion aws-athena/views/proximity-vw_pin10_proximity_fill.sql
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ LEFT JOIN
LEFT JOIN {{ ref('proximity.dist_pin_to_park') }} AS dist_pin_to_park
ON pin.pin10 = dist_pin_to_park.pin10
AND cyf.nearest_park_data_year = dist_pin_to_park.year
LEFT JOIN {{ source('proximity', 'dist_pin_to_pin') }} AS dist_pin_to_pin
LEFT JOIN {{ ref('proximity.dist_pin_to_pin') }} AS dist_pin_to_pin
ON pin.pin10 = dist_pin_to_pin.pin10
AND cyf.year = dist_pin_to_pin.year -- NOTE, doesn't need to be filled
LEFT JOIN
Expand Down
95 changes: 95 additions & 0 deletions dbt/macros/nearest_pin_neighbors.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
-- Macro that takes a `source_model` containing PIN geometries and joins it
-- against spatial.parcel in order to generate the `num_neighbors` nearest
-- neighbors for each PIN for each year in the data, where a "neighbor" is
-- defined as another PIN that is within `radius_km` of the given PIN.
--
-- The `source_model` must contain four required columns, and is modeled
-- after spatial.parcel, which is the primary source for this macro:
--
-- * pin10
-- * year
-- * x_3435
-- * y_3435
{% macro nearest_pin_neighbors(source_model, num_neighbors, radius_km) %}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This macro is nearly identical to the pre-existing aws-athena/ctas/proximity-dist_pin_to_pin.sql query, with three main changes:

  1. Parameterization of source_model, num_neigbors, and radius_km
  2. Addition of pin10 to the distinct_pins subquery per our missing data discussion on Teams (diff here)
  3. Reformatting to match dbt styles using sqlfmt

Everything else should be identical!

with
pin_locations as (
select pin10, year, x_3435, y_3435, st_point(x_3435, y_3435) as point
from {{ source("spatial", "parcel") }}
),

most_recent_pins as (
-- Parcel centroids may shift very slightly over time in GIS shapefiles.
-- We want to make sure we only grab the most recent instance of a given
-- parcel to avoid duplicates caused by these slight shifts.
select
x_3435,
y_3435,
pin10,
rank() over (partition by pin10 order by year desc) as r
from {{ source_model }}
),

distinct_pins as (
select distinct x_3435, y_3435, pin10 from most_recent_pins where r = 1
),

pin_dists as (
select *
from
(
select
dists.*,
row_number() over (
partition by dists.x_3435, dists.y_3435, dists.year
order by dists.dist
) as row_num
from
(
select
dp.pin10,
dp.x_3435,
dp.y_3435,
loc.year,
loc.pin10 as neighbor_pin10,
st_distance(
st_point(dp.x_3435, dp.y_3435), loc.point
) as dist
from distinct_pins as dp
inner join
pin_locations as loc
on st_contains(
st_buffer(
st_point(dp.x_3435, dp.y_3435), {{ radius_km }}
),
loc.point
)
and dp.pin10 != loc.pin10
) as dists
)
where row_num <= 4
)

select *
from
(
select
pcl.pin10,
{% for idx in range(1, num_neighbors + 1) %}
max(
case when pd.row_num = {{ idx + 1 }} then pd.neighbor_pin10 end
) as nearest_neighbor_{{ idx }}_pin10,
max(
case when pd.row_num = {{ idx + 1 }} then pd.dist end
) as nearest_neighbor_{{ idx }}_dist_ft,
{% endfor %}
jeancochrane marked this conversation as resolved.
Show resolved Hide resolved
pcl.year
from {{ source("spatial", "parcel") }} as pcl
inner join pin_dists as pd on pcl.pin10 = pd.pin10 and pcl.year = pd.year
group by pcl.pin10, pcl.year
)
where
{% for idx in range(1, num_neighbors + 1) %}
{% if idx != 1 %} and{% endif %}
nearest_neighbor_{{ idx }}_pin10 is not null
{% endfor %}
{% endmacro %}
21 changes: 21 additions & 0 deletions dbt/models/proximity/docs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{% docs dist_pin_to_pin %}
View that finds the three nearest neighbor PINs for every PIN for every year.
{% enddocs %}

{% docs dist_pin_to_pin_intermediate %}
Intermediate table used to generate `proximity.dist_pin_to_pin`.

The `proximity.dist_pin_to_pin` view is intended to record distances to the
three closest PINs for all PINs in the county for all years in the data.
This type of recursive spatial query is expensive, however, and some PINs are
quite far (>1km) from the nearest three PINs, so we use intermediate tables
to strike a balance between data completeness and computational efficiency.

To compute the full set of distances in `proximity.dist_pin_to_pin`, we first
generate PIN-to-PIN distances using a 1km buffer and store the results in the
`proximity.dist_pin_to_pin_1km` table. Then, we query for PINs that did not have
any matches within 1km and redo the distance query with an expanded 10km buffer,
storing the results in the `proximity.dist_pin_to_pin_10km` table. Finally, the
union of the 1km table and the 10km table is aliased to the
`proximity.dist_pin_to_pin` view for ease of querying.
{% enddocs %}
1 change: 1 addition & 0 deletions dbt/models/proximity/proximity.dist_pin_to_pin.sql
1 change: 1 addition & 0 deletions dbt/models/proximity/proximity.dist_pin_to_pin_10km.sql
1 change: 1 addition & 0 deletions dbt/models/proximity/proximity.dist_pin_to_pin_1km.sql
11 changes: 6 additions & 5 deletions dbt/models/proximity/schema.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
version: 2


sources:
- name: proximity
tables:
- name: dist_pin_to_pin

models:
- name: proximity.cnt_pin_num_bus_stop
- name: proximity.cnt_pin_num_foreclosure
Expand All @@ -22,6 +17,12 @@ models:
- name: proximity.dist_pin_to_metra_route
- name: proximity.dist_pin_to_metra_stop
- name: proximity.dist_pin_to_park
- name: proximity.dist_pin_to_pin
description: '{{ doc("dist_pin_to_pin") }}'
- name: proximity.dist_pin_to_pin_1km
description: '{{ doc("dist_pin_to_pin_intermediate") }}'
- name: proximity.dist_pin_to_pin_10km
description: '{{ doc("dist_pin_to_pin_intermediate") }}'
- name: proximity.dist_pin_to_railroad
- name: proximity.dist_pin_to_water
- name: proximity.vw_pin10_proximity
Expand Down