Skip to content

Commit

Permalink
Feature/PADW 68 Refactor RTD - BK Status Contributes to DV Table Crea…
Browse files Browse the repository at this point in the history
…tion (#13)

* fix: Correct function name spelling

* refactor: BK DV Table Hold Logic for Downstream Table Creation
** Refactored source_column and insert_into_build_call functions to hold the creation of downstream DV tables if the business key (BK) is not marked as Ready-to-Deploy.
** This ensures that only tables associated with Ready-to-Deploy business keys create downstream DV tables, improving overall data integrity in the deployment process.

* chore: bump version to 0.0.2 in Cargo.toml and Trunk.toml
  • Loading branch information
analyzer1 authored Oct 8, 2024
1 parent 89f9dd8 commit 1eba513
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 106 deletions.
6 changes: 3 additions & 3 deletions extension/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "pg_auto_dw"
version = "0.0.1"
edition = "2021"
name = "pg_auto_dw" # Extension name
version = "0.0.2" # Extension version (SemVer: MAJOR.MINOR.PATCH)
edition = "2021" # Rust 2021 edition

[lib]
crate-type = ["cdylib"]
Expand Down
2 changes: 1 addition & 1 deletion extension/Trunk.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[extension]
name = "pg_auto_dw"
version = "0.0.1"
version = "0.0.2"
repository = "https://github.com/tembo-io/pg_auto_dw"
license = "PostgreSQL"
description = "An auto data warehouse extension for Postgres."
Expand Down
2 changes: 1 addition & 1 deletion extension/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ fn source_column() -> Result<
error!("GUC: Unable to obtain parameter \"pg_auto_dw.accepted_transformer_confidence_level.\"");
});

let query: &str = &queries::source_coumn(&accepted_transformer_confidence_level);
let query: &str = &queries::source_column(&accepted_transformer_confidence_level);

info!("Evaluation of TABLE customer");
Spi::connect(|client| {
Expand Down
249 changes: 148 additions & 101 deletions extension/src/model/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,60 +326,79 @@ pub fn insert_into_build_call(
) -> String {
format!(r#"
INSERT INTO auto_dw.build_call (fk_transformer_responses, build_id, build_flag, build_status)
WITH
source_objects_tranformation_cal AS (
SELECT
MAX(pk_transformer_responses)AS max_pk_transformer_response
FROM auto_dw.transformer_responses AS t
GROUP BY fk_source_objects
),
source_object_transformation_latest AS (
SELECT t.* FROM auto_dw.transformer_responses AS t
JOIN source_objects_tranformation_cal AS c ON t.pk_transformer_responses = c.max_pk_transformer_response
),
sour_object_status AS (
SELECT
t.pk_transformer_responses,
s.schema_name::TEXT AS schema,
s.table_name::TEXT AS table,
s.column_name::TEXT AS column,
CASE
WHEN t.confidence_score IS NULL THEN 'Queued for Processing'
WHEN t.confidence_score >= {accepted_transformer_confidence_level} THEN 'Ready to Deploy'
ELSE 'Requires Attention'
END AS status,
CASE
WHEN t.confidence_score IS NOT NULL THEN CONCAT((t.confidence_score * 100)::INT::TEXT, '%')
ELSE '-'
END AS confidence_level,
CASE
WHEN t.confidence_score IS NOT NULL THEN
(
'Status: ' ||
CASE
WHEN t.confidence_score IS NULL THEN 'Queued for Processing'
WHEN t.confidence_score >= {accepted_transformer_confidence_level} THEN 'Ready to Deploy'
ELSE 'Requires Attention'
END || ': ' ||
'Model: ' || model_name ||
' categorized this column as a ' || category ||
' with a confidence of ' || CONCAT((t.confidence_score * 100)::INT::TEXT, '%') || '. ' ||
'Model Reasoning: ' || t.reason
)
ELSE '-'
END AS status_response
FROM auto_dw.source_objects AS s
LEFT JOIN source_object_transformation_latest AS t ON s.pk_source_objects = t.fk_source_objects
WHERE s.current_flag = 'Y' AND s.deleted_flag = 'N'
ORDER BY s.schema_name, s.table_name, s.column_ordinal_position
)
SELECT
pk_transformer_responses AS fk_transformer_responses,
'{build_id}' AS build_id,
'{build_flag}' AS build_flag,
'{build_status}' AS build_status
FROM sour_object_status
WHERE status = '{status}';
WITH
confidence_level AS (SELECT {accepted_transformer_confidence_level} AS value),
source_objects_tranformation_cal AS (
SELECT
MAX(pk_transformer_responses)AS max_pk_transformer_response
FROM auto_dw.transformer_responses AS t
GROUP BY fk_source_objects
),
source_object_transformation_latest AS (
SELECT t.* FROM auto_dw.transformer_responses AS t
JOIN source_objects_tranformation_cal AS c ON t.pk_transformer_responses = c.max_pk_transformer_response
),
source_object_status_prep AS (
SELECT
t.pk_transformer_responses,
s.schema_name,
s.table_name,
s.column_name,
s.column_ordinal_position,
t.confidence_score,
t.reason,
t.category,
t.model_name,
MAX(
CASE
WHEN t.category = 'Business Key Part' AND t.confidence_score < cl.value THEN 1
ELSE 0
END
) OVER (PARTITION BY s.schema_name, s.table_name) AS bk_hold
FROM auto_dw.source_objects AS s
JOIN confidence_level AS cl ON true
LEFT JOIN source_object_transformation_latest AS t ON s.pk_source_objects = t.fk_source_objects
WHERE s.current_flag = 'Y' AND s.deleted_flag = 'N'
),
source_object AS (
SELECT *,
CASE
WHEN confidence_score IS NULL THEN 'Queued for Processing'
WHEN category = 'Business Key Part' AND confidence_score >= cl.value THEN 'Ready to Deploy'
WHEN category <> 'Business Key Part' AND confidence_score >= cl.value AND bk_hold = 0 THEN 'Ready to Deploy'
WHEN category <> 'Business Key Part' AND confidence_score >= cl.value AND bk_hold = 1 THEN 'Ready to Deploy - Awaiting Business Key (BK)'
ELSE 'Requires Attention'
END AS status,
CASE
WHEN confidence_score IS NOT NULL THEN CONCAT((confidence_score * 100)::INT::TEXT, '%')
ELSE '-'
END AS confidence_level,
CASE
WHEN confidence_score IS NOT NULL THEN
(
'Status: ' ||
CASE
WHEN confidence_score IS NULL THEN 'Queued for Processing'
WHEN confidence_score >= cl.value THEN 'Ready to Deploy'
ELSE 'Requires Attention'
END || ': ' ||
'Model: ' || model_name ||
' categorized this column as a ' || category ||
' with a confidence of ' || CONCAT((confidence_score * 100)::INT::TEXT, '%') || '. ' ||
'Model Reasoning: ' || reason
)
ELSE '-'
END AS status_response
FROM source_object_status_prep
JOIN confidence_level AS cl ON true
)
SELECT
pk_transformer_responses AS fk_transformer_responses,
'{build_id}' AS build_id,
'{build_flag}' AS build_flag,
'{build_status}' AS build_status
FROM source_object
WHERE status = '{status}';
"#)
}

Expand Down Expand Up @@ -407,56 +426,84 @@ pub fn build_object_pull(build_id: &str) -> String {
}

#[no_mangle]
pub fn source_coumn(accepted_transformer_confidence_level: &str) -> String {
pub fn source_column(accepted_transformer_confidence_level: &str) -> String {
format!(r#"
WITH
source_objects_tranformation_cal AS (
SELECT
MAX(pk_transformer_responses)AS max_pk_transformer_response
FROM auto_dw.transformer_responses AS t
GROUP BY fk_source_objects
),
source_object_transformation_latest AS (
SELECT t.* FROM auto_dw.transformer_responses AS t
JOIN source_objects_tranformation_cal AS c ON t.pk_transformer_responses = c.max_pk_transformer_response
)
SELECT
s.schema_name::TEXT AS schema,
s.table_name::TEXT AS table,
s.column_name::TEXT AS column,
CASE
WHEN t.confidence_score IS NULL THEN 'Queued for Processing'
WHEN t.confidence_score >= {accepted_transformer_confidence_level} THEN 'Ready to Deploy'
ELSE 'Requires Attention'
END AS status,
CASE
WHEN t.confidence_score IS NOT NULL THEN CONCAT((t.confidence_score * 100)::INT::TEXT, '%')
ELSE '-'
END AS confidence_level,
CASE
WHEN t.confidence_score IS NOT NULL THEN
(
'Status: ' ||
CASE
WHEN t.confidence_score IS NULL THEN 'Queued for Processing'
WHEN t.confidence_score >= {accepted_transformer_confidence_level} THEN 'Ready to Deploy'
ELSE 'Requires Attention'
END || ': ' ||
'Model: ' || model_name ||
' categorized this column as a ' || category ||
' with a confidence of ' || CONCAT((t.confidence_score * 100)::INT::TEXT, '%') || '. ' ||
confidence_level AS (SELECT {accepted_transformer_confidence_level} AS value),
source_objects_tranformation_cal AS (
SELECT
MAX(pk_transformer_responses)AS max_pk_transformer_response
FROM auto_dw.transformer_responses AS t
GROUP BY fk_source_objects
),
source_object_transformation_latest AS (
SELECT t.* FROM auto_dw.transformer_responses AS t
JOIN source_objects_tranformation_cal AS c ON t.pk_transformer_responses = c.max_pk_transformer_response
),
source_object_status_prep AS (
SELECT
t.pk_transformer_responses,
s.schema_name,
s.table_name,
s.column_name,
s.column_ordinal_position,
t.confidence_score,
t.reason,
t.category,
t.model_name,
MAX(
CASE
WHEN t.category = 'Business Key Part' AND t.confidence_score < cl.value THEN 1
ELSE 0
END
) OVER (PARTITION BY s.schema_name, s.table_name) AS bk_hold
FROM auto_dw.source_objects AS s
JOIN confidence_level AS cl ON true
LEFT JOIN source_object_transformation_latest AS t ON s.pk_source_objects = t.fk_source_objects
WHERE s.current_flag = 'Y' AND s.deleted_flag = 'N'
),
source_object AS (
SELECT *,
CASE
WHEN t.business_key_name <> 'NA' THEN 'Further, Business Key Part has been associated with Business Key ' || UPPER(t.business_key_name) || '. '
ELSE ''
END ||
'Model Reasoning: ' || t.reason
)
ELSE '-'
END AS status_response
FROM auto_dw.source_objects AS s
LEFT JOIN source_object_transformation_latest AS t ON s.pk_source_objects = t.fk_source_objects
WHERE s.current_flag = 'Y' AND s.deleted_flag = 'N'
ORDER BY s.schema_name, s.table_name, s.column_ordinal_position;
WHEN confidence_score IS NULL THEN 'Queued for Processing'
WHEN category = 'Business Key Part' AND confidence_score >= cl.value THEN 'Ready to Deploy'
WHEN category <> 'Business Key Part' AND confidence_score >= cl.value AND bk_hold = 0 THEN 'Ready to Deploy'
WHEN category <> 'Business Key Part' AND confidence_score >= cl.value AND bk_hold = 1 THEN 'Ready to Deploy - Awaiting Business Key (BK)'
ELSE 'Requires Attention'
END AS status,
CASE
WHEN confidence_score IS NOT NULL THEN CONCAT((confidence_score * 100)::INT::TEXT, '%')
ELSE '-'
END AS confidence_level,
CASE
WHEN confidence_score IS NOT NULL THEN
(
'Status: ' ||
CASE
WHEN confidence_score IS NULL THEN 'Queued for Processing'
WHEN confidence_score >= cl.value THEN 'Ready to Deploy'
ELSE 'Requires Attention'
END || ': ' ||
'Model: ' || model_name ||
' categorized this column as a ' || category ||
' with a confidence of ' || CONCAT((confidence_score * 100)::INT::TEXT, '%') || '. ' ||
'Model Reasoning: ' || reason
)
ELSE '-'
END AS status_response
FROM source_object_status_prep
JOIN confidence_level AS cl ON true
)
SELECT
schema_name::TEXT AS schema,
table_name::TEXT AS table,
column_name::TEXT AS column,
status,
confidence_level,
status_response
FROM source_object
ORDER BY schema_name, table_name, column_ordinal_position
;
"#)
}

Expand Down

0 comments on commit 1eba513

Please sign in to comment.