Skip to content

Commit

Permalink
Added hashing function and deprecated PGCRYPTO. (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
analyzer1 authored Sep 23, 2024
1 parent 9a39972 commit 75b9039
Show file tree
Hide file tree
Showing 9 changed files with 346 additions and 653 deletions.
2 changes: 2 additions & 0 deletions extension/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ uuid = { version = "1.1", features = ["v4", "v5", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
anyhow = "1.0"
regex = "1.7"
sha2 = "0.10"
hex = "0.4"

[dev-dependencies]
pgrx-tests = "=0.11.4"
Expand Down
1 change: 0 additions & 1 deletion extension/pg_auto_dw.control
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,3 @@ module_pathname = '$libdir/pg_auto_dw'
relocatable = false
superuser = true
schema = 'auto_dw'
requires = 'pgcrypto'
9 changes: 4 additions & 5 deletions extension/src/controller/bgw_transformer_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use std::collections::HashMap;
use tokio::runtime::Runtime;
use serde::Deserialize;

use crate::queries;
use crate::model::source_objects;
use crate::model::*;
// use crate::utility::ollama_client;
use crate::utility::openai_client;
use crate::utility::guc;
Expand Down Expand Up @@ -75,7 +74,7 @@ pub extern "C" fn background_worker_transformer_client(_arg: pg_sys::Datum) {
while retries < MAX_TRANSFORMER_RETRIES {
runtime.block_on(async {
// Get Generation
generation_json_bk_identification = match openai_client::send_request(table_details_json_str.as_str(), openai_client::PromptTemplate::BKIdentification, &0, &hints).await {
generation_json_bk_identification = match openai_client::send_request(table_details_json_str.as_str(), prompt_template::PromptTemplate::BKIdentification, &0, &hints).await {
Ok(response_json) => {

// TODO: Add a function to enable logging.
Expand Down Expand Up @@ -123,7 +122,7 @@ pub extern "C" fn background_worker_transformer_client(_arg: pg_sys::Datum) {
while retries < MAX_TRANSFORMER_RETRIES {
runtime.block_on(async {
// Get Generation
generation_json_bk_name = match openai_client::send_request(table_details_json_str.as_str(), openai_client::PromptTemplate::BKName, &0, &hints).await {
generation_json_bk_name = match openai_client::send_request(table_details_json_str.as_str(), prompt_template::PromptTemplate::BKName, &0, &hints).await {
Ok(response_json) => {

// let response_json_pretty = serde_json::to_string_pretty(&response_json)
Expand Down Expand Up @@ -174,7 +173,7 @@ pub extern "C" fn background_worker_transformer_client(_arg: pg_sys::Datum) {
generation_json_descriptor_sensitive =
match openai_client::send_request(
table_details_json_str.as_str(),
openai_client::PromptTemplate::DescriptorSensitive,
prompt_template::PromptTemplate::DescriptorSensitive,
column,
&hints).await {
Ok(response_json) => {
Expand Down
25 changes: 11 additions & 14 deletions extension/src/controller/dv_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,14 @@ fn dv_data_loader_hub_dml (dv_schema: &DVSchema) -> String {
FROM {}.hub_{}
)
SELECT
ENCODE(PUBLIC.DIGEST(ARRAY_TO_STRING(ARRAY[-1], ',')::TEXT, 'sha256'), 'hex') AS hub_{}_hk,
auto_dw.hash(ARRAY_TO_STRING(ARRAY[-1], ',')::TEXT) AS hub_{}_hk,
'0001-01-01'::TIMESTAMP WITHOUT TIME ZONE AS load_ts,
'SYSTEM'::TEXT AS record_source
{}
FROM initialized WHERE NOT initialized.is_initialized
UNION
SELECT
ENCODE(PUBLIC.DIGEST(ARRAY_TO_STRING(ARRAY[-2], ',')::TEXT, 'sha256'), 'hex') AS hub_{}_hk,
auto_dw.hash(ARRAY_TO_STRING(ARRAY[-2], ',')::TEXT) AS hub_{}_hk,
'0001-01-01'::TIMESTAMP WITHOUT TIME ZONE AS load_ts,
'SYSTEM'::TEXT AS record_source
{}
Expand Down Expand Up @@ -168,10 +168,9 @@ fn dv_data_loader_hub_dml (dv_schema: &DVSchema) -> String {
WITH
stg_data AS (
SELECT
ENCODE(
public.DIGEST(
ARRAY_TO_STRING(
ARRAY[{}], ','), 'sha256'), 'hex') AS hub_{}_hk,
auto_dw.hash(
ARRAY_TO_STRING(ARRAY[{}], ',')
) AS hub_{}_hk,
(CURRENT_TIMESTAMP AT TIME ZONE 'UTC')::TIMESTAMP(6) AS load_ts,
'{}' AS record_source{}
FROM {}.{} AS stg
Expand Down Expand Up @@ -330,14 +329,12 @@ fn dv_data_loader_sat_dml (dv_schema: &DVSchema) -> String {
WITH stg AS (
SELECT
*,
ENCODE(
{source_schema_name}.DIGEST(
ARRAY_TO_STRING(
ARRAY[{hub_bk_parts_sql_stg_array}], ','), 'sha256'), 'hex') AS hub_{business_key_name}_hk,
ENCODE(
{source_schema_name}.DIGEST(
ARRAY_TO_STRING(
ARRAY[{sat_source_sql_array}], ','), 'sha256'), 'hex') AS sat_{key}_hd
auto_dw.hash(
ARRAY_TO_STRING(ARRAY[{hub_bk_parts_sql_stg_array}], ',')
) AS hub_{business_key_name}_hk,
auto_dw.hash(
ARRAY_TO_STRING(ARRAY[{sat_source_sql_array}], ',')
) AS sat_{key}_hd
FROM {source_schema_name}.{source_table_name} AS stg
),
new_stg_data AS (
Expand Down
11 changes: 11 additions & 0 deletions extension/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ mod utility; // Initialization, Configuration Management, and External Servic
pub use pgrx::prelude::*;
use uuid::Uuid;

use sha2::{Sha256, Digest};
use hex;

pgrx::pg_module_magic!();

use model::queries;
Expand Down Expand Up @@ -134,6 +137,14 @@ fn source_column() -> Result<
.map(TableIterator::new)
}

#[pg_extern]
fn hash(inputs: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(inputs.as_bytes());
let digest = hasher.finalize();
hex::encode(digest)
}

#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
mod tests {
Expand Down
3 changes: 2 additions & 1 deletion extension/src/model/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod source_objects;
pub mod dv_schema;
pub mod queries;
pub mod queries;
pub mod prompt_template;
Loading

0 comments on commit 75b9039

Please sign in to comment.