diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-traffic.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-traffic.R index bc5cc4fc4..087b0d496 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-traffic.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-traffic.R @@ -5,7 +5,7 @@ library(sf) library(geoarrow) # Define the S3 bucket and folder path -AWS_S3_RAW_BUCKET <- "s3://ccao-data-raw-us-east-1" +AWS_S3_RAW_BUCKET <- Sys.getenv("AWS_S3_RAW_BUCKET") AWS_S3_WAREHOUSE_BUCKET <- Sys.getenv("AWS_S3_WAREHOUSE_BUCKET") s3_folder <- "spatial/environment/traffic/" output_bucket <- file.path(AWS_S3_WAREHOUSE_BUCKET, s3_folder) @@ -105,109 +105,119 @@ walk(parquet_files, \(file_key) { inventory_id = if ("INVENTORY" %in% colnames(.)) INVENTORY else NA ) %>% mutate(surface_type = road_codes[as.character(surface_type)], - speed_limit = as.numeric(speed_limit)) %>% - select(-one_of(required_columns)) %>% + speed_limit = as.numeric(speed_limit), + road_name = str_to_lower(road_name), # Convert to lowercase + road_name = gsub("[[:punct:]]", "", road_name)) %>% # Remove punctuation like . / etc. + select(-one_of(required_columns)) %>% mutate(across(-geometry, ~replace(., . %in% c(0, "0000"), NA))) %>% mutate(surface_year = ifelse(surface_year == 9999, NA, surface_year)) %>% group_by(road_name, speed_limit, lanes, surface_type, daily_traffic) %>% summarize(geometry = st_union(geometry)) %>% ungroup() - # Function to create the intersection matrix and compute average traffic - calculate_traffic_averages <- function(shapefile_data) { - # Create an intersection matrix for averages - intersection_matrix <- st_intersects(shapefile_data) - - # Create intersecting pairs - intersecting_pairs <- do.call(rbind, lapply(seq_along(intersection_matrix), function(i) { - data.frame(polygon_1 = i, polygon_2 = intersection_matrix[[i]]) - })) %>% - filter(polygon_1 != polygon_2) # Remove self-matches - - # Add polygon ID and relevant columns to shapefile data. This allows us to later merge - # data with the intersection pairs above. - shapefile_with_ids <- shapefile_data %>% - mutate(polygon_id = row_number()) %>% - select(polygon_id, road_name, daily_traffic, speed_limit, lanes) - - # Join intersecting pairs with matching street IDs - averages <- intersecting_pairs %>% - left_join( - shapefile_with_ids %>% - # Create IDs for the "home" street - rename( - road_name_1 = road_name, - daily_traffic_1 = daily_traffic, - speed_limit_1 = speed_limit, - lanes_1 = lanes - ), - by = c("polygon_1" = "polygon_id") - ) %>% - left_join( - shapefile_with_ids %>% - # Create IDs for the neighboring streets - rename( - road_name_2 = road_name, - daily_traffic_2 = daily_traffic, - speed_limit_2 = speed_limit, - lanes_2 = lanes - ), - by = c("polygon_2" = "polygon_id") - ) %>% - filter(road_name_1 == road_name_2) %>% # Keep only matching road names - group_by(polygon_1) %>% - # Create averages - summarize( - average_daily_traffic = mean(daily_traffic_2, na.rm = TRUE), - average_speed_limit = mean(speed_limit_2, na.rm = TRUE), - average_lanes = mean(lanes_2, na.rm = TRUE), - .groups = 'drop' - ) - - # Update traffic, speed limit, and lanes with averages - shapefile_data <- shapefile_data %>% - mutate(polygon_id = row_number()) %>% - left_join(averages, by = c("polygon_id" = "polygon_1")) %>% - mutate( - daily_traffic = if_else(is.na(daily_traffic), average_daily_traffic, daily_traffic), - speed_limit = if_else(is.na(speed_limit), average_speed_limit, speed_limit), - lanes = if_else(is.na(lanes), average_lanes, lanes) - ) - - return(shapefile_data) - } - + # Function to compute traffic averages with a loop until no changes are made + calculate_traffic_data <- function(shapefile_data) { + # Helper function to calculate averages based on intersections + calculate_traffic_averages <- function(data) { + # Create an intersection matrix + intersection_matrix <- st_intersects(data) + + # Create intersecting pairs + intersecting_pairs <- do.call(rbind, lapply(seq_along(intersection_matrix), function(i) { + data.frame(polygon_1 = i, polygon_2 = intersection_matrix[[i]]) + })) %>% + filter(polygon_1 != polygon_2) # Remove self-matches + + # Add polygon IDs and relevant columns for merging + data_with_ids <- data %>% + mutate(polygon_id = row_number()) %>% + select(polygon_id, road_name, daily_traffic, speed_limit, lanes) + + # Join intersecting pairs with their respective polygon data + averages <- intersecting_pairs %>% + left_join( + data_with_ids %>% + rename( + road_name_1 = road_name, + daily_traffic_1 = daily_traffic, + speed_limit_1 = speed_limit, + lanes_1 = lanes + ), + by = c("polygon_1" = "polygon_id") + ) %>% + left_join( + data_with_ids %>% + rename( + road_name_2 = road_name, + daily_traffic_2 = daily_traffic, + speed_limit_2 = speed_limit, + lanes_2 = lanes + ), + by = c("polygon_2" = "polygon_id") + ) %>% + filter(road_name_1 == road_name_2) %>% # Keep only matching road names + group_by(polygon_1) %>% + summarize( + average_daily_traffic = mean(daily_traffic_2, na.rm = TRUE), + average_speed_limit = mean(speed_limit_2, na.rm = TRUE), + average_lanes = mean(lanes_2, na.rm = TRUE), + .groups = 'drop' + ) + + # Update the original data with averages where needed + updated_data <- data %>% + mutate(polygon_id = row_number()) %>% + left_join(averages, by = c("polygon_id" = "polygon_1")) %>% + mutate( + daily_traffic = if_else(is.na(daily_traffic), average_daily_traffic, daily_traffic), + speed_limit = if_else(is.na(speed_limit), average_speed_limit, speed_limit), + lanes = if_else(is.na(lanes), average_lanes, lanes) + ) + + return(updated_data) + } - # Loop until no changes are made - shapefile_data_final <- shapefile_data - calculate_traffic_with_loop <- function(shapefile_data) { - # Initialize final shapefile data + # Initialize loop shapefile_data_final <- shapefile_data - repeat { - # Save current values to compare changes + # Save current NA counts to compare changes + previous_na_traffic <- sum(is.na(shapefile_data_final$daily_traffic)) + previous_na_speed <- sum(is.na(shapefile_data_final$speed_limit)) + previous_na_lanes <- sum(is.na(shapefile_data_final$lanes)) + + # Save the current state to track changes previous_traffic <- shapefile_data_final$daily_traffic previous_speed <- shapefile_data_final$speed_limit previous_lanes <- shapefile_data_final$lanes - # Recalculate averages and update shapefile data - shapefile_data_final <- calculate_traffic_averages(shapefile_data_final) - - # Check if all values remain unchanged - if (all(previous_traffic == shapefile_data_final$daily_traffic, na.rm = TRUE) && - all(previous_speed == shapefile_data_final$speed_limit, na.rm = TRUE) && - all(previous_lanes == shapefile_data_final$num_lanes, na.rm = TRUE)) { - break # Exit loop if no changes were made + # Recalculate averages and update the data + shapefile_data_final <- calculate_traffic_averages(shapefile_data_final) %>% + select(-average_intersect_value) + + # Calculate current NA counts after updating + current_na_traffic <- sum(is.na(shapefile_data_final$daily_traffic)) + current_na_speed <- sum(is.na(shapefile_data_final$speed_limit)) + current_na_lanes <- sum(is.na(shapefile_data_final$lanes)) + + # Exit loop if no changes in NA counts are detected + if (current_na_traffic >= previous_na_traffic && + current_na_speed >= previous_na_speed && + current_na_lanes >= previous_na_lanes) { + cat("No reduction in NA counts detected. Exiting loop.\n") + break } } + return(shapefile_data_final) } - calculate_traffic_with_loop(shapefile_data) + # Run the function + calculate_traffic_data(shapefile_data) + output_path <- file.path(output_bucket, basename(file_key)) - # geoarrow::write_geoparquet(shapefile_data_final, output_path) + geoarrow::write_geoparquet(shapefile_data_final, output_path) print(paste(file_key, "cleaned and uploaded.")) }