Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
varshith257 committed Oct 19, 2024
1 parent b86f178 commit 86455d0
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 20 deletions.
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ lazy_static = "1.4.0"
log = "0.4.21"
ollama-rs = "=0.2.1"
pgmq = "0.29"
pgrx = "=0.12.5"
regex = "1.9.2"
reqwest = {version = "0.11.18", features = ["json"] }
serde = { version = "1.0.173", features = ["derive"] }
Expand Down
4 changes: 2 additions & 2 deletions core/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use chrono::serde::ts_seconds_option::deserialize as from_tsopt;

use pgrx::pg_sys::Oid;
use serde::{Deserialize, Serialize};
use sqlx::types::chrono::Utc;
use sqlx::FromRow;
Expand Down Expand Up @@ -103,8 +104,7 @@ pub enum TableMethod {

#[derive(Clone, Debug, Default, Serialize, Deserialize, FromRow)]
pub struct JobParams {
pub schema: String,
pub table: String,
pub table: PgOid,
pub columns: Vec<String>,
pub update_time_col: Option<String>,
pub table_method: TableMethod,
Expand Down
6 changes: 2 additions & 4 deletions extension/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::search::{self, init_table};
use crate::transformers::generic::env_interpolate_string;
use crate::transformers::transform;
use crate::types;
use crate::util::pg_oid_to_table_name;

use anyhow::Result;
use pgrx::prelude::*;
Expand All @@ -28,10 +27,10 @@ fn table(
schedule: default!(&str, "'* * * * *'"),
) -> Result<String> {
let model = Model::new(transformer)?;
let table_name_str = pg_oid_to_table_name(table_name);

init_table(
job_name,
&table_name_str,
table_name,
columns,
primary_key,
Some(update_col),
Expand Down Expand Up @@ -107,7 +106,6 @@ fn init_rag(
let transformer_model = Model::new(transformer)?;
init_table(
agent_name,
schema,
table_name,
columns,
unique_record_id,
Expand Down
4 changes: 2 additions & 2 deletions extension/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,14 @@ pub fn get_column_datatype(table: &str, column: &str) -> Result<String> {
)
.map_err(|_| {
anyhow!(
"One of schema:`{}`, table:`{}`, column:`{}` does not exist.",
"One of table:`{}`, column:`{}` does not exist.",
table,
column
)
})?
.ok_or_else(|| {
anyhow!(
"An unknown error occurred while fetching the data type for column `{}` in `{}.{}`.",
"An unknown error occurred while fetching the data type for column `{}` in `{}`.",
table,
column
)
Expand Down
9 changes: 4 additions & 5 deletions extension/src/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub fn init_table(
}

// get prim key type
let pkey_type = init::get_column_datatype(&table_name_str, primary_key)?;
let pkey_type = init::get_column_datatype(table_name, primary_key)?;
init::init_pgmq()?;

let guc_configs = get_guc_configs(&transformer.source);
Expand Down Expand Up @@ -102,8 +102,7 @@ pub fn init_table(
};

let valid_params = types::JobParams {
schema: schema.to_string(),
table: table.to_string(),
table: table_name_str.clone(),
columns: columns.clone(),
update_time_col: update_col,
table_method: table_method.clone(),
Expand Down Expand Up @@ -168,8 +167,8 @@ pub fn init_table(
// setup triggers
// create the trigger if not exists
let trigger_handler = create_trigger_handler(job_name, &columns, primary_key);
let insert_trigger = create_event_trigger(job_name, schema, table, "INSERT");
let update_trigger = create_event_trigger(job_name, schema, table, "UPDATE");
let insert_trigger = create_event_trigger(job_name, table_name_str.clone(), "INSERT");
let update_trigger = create_event_trigger(job_name, table_name_str.clone(), "UPDATE");
let _: Result<_, spi::Error> = Spi::connect(|mut c| {
let _r = c.update(&trigger_handler, None, None)?;
let _r = c.update(&insert_trigger, None, None)?;
Expand Down
15 changes: 8 additions & 7 deletions extension/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use pgrx::*;
use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
use sqlx::{Pool, Postgres};
use std::env;
use std::ffi::CStr;
use url::{ParseError, Url};

use crate::guc;
Expand Down Expand Up @@ -207,12 +206,14 @@ pub fn get_pg_options(cfg: Config) -> Result<PgConnectOptions> {
}

pub fn pg_oid_to_table_name(oid: PgOid) -> String {
unsafe {
let regclass_cstring = regclassout(oid.value() as Oid);
CStr::from_ptr(regclass_cstring)
.to_string_lossy()
.into_owned()
}
let query = "SELECT relname FROM pg_class WHERE oid = $1";
let table_name: String = Spi::get_one_with_args(
query,
vec![(PgBuiltInOids::REGCLASSOID.oid(), oid.into_datum())]
)
.expect("Failed to fetch table name from oid")
.unwrap_or_else(|| panic!("Table name not found for oid: {}", oid.value()));
table_name
}

pub async fn ready(conn: &Pool<Postgres>) -> bool {
Expand Down

0 comments on commit 86455d0

Please sign in to comment.