From 3f5c377aca4f230b5257dc397b68ab910c963730 Mon Sep 17 00:00:00 2001 From: Vamshi Maskuri <117595548+varshith257@users.noreply.github.com> Date: Sat, 19 Oct 2024 08:37:31 +0530 Subject: [PATCH 1/4] fix: rename 'table' parameter to 'table_name' and switch to REGCLASS type --- CONTRIBUTING.md | 2 +- README.md | 2 +- docs/api/search.md | 6 ++---- extension/sql/vectorize--0.18.2--0.18.3.sql | 18 ++++++++++++++++++ extension/src/api.rs | 8 ++++---- 5 files changed, 26 insertions(+), 10 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 34a02bb..d3b81c9 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -197,7 +197,7 @@ SELECT * FROM products limit 2; ```sql SELECT vectorize.table( job_name => 'product_search_hf', -"table" => 'products', +"table_name" => 'products', primary_key => 'product_id', columns => ARRAY['product_name', 'description'], transformer => 'sentence-transformers/multi-qa-MiniLM-L6-dot-v1' diff --git a/README.md b/README.md index de6f9e8..fbfda19 100644 --- a/README.md +++ b/README.md @@ -131,7 +131,7 @@ Create a job to vectorize the products table. We'll specify the tables primary k ```sql SELECT vectorize.table( job_name => 'product_search_hf', - "table" => 'products', + "table_name" => 'products', primary_key => 'product_id', columns => ARRAY['product_name', 'description'], transformer => 'sentence-transformers/all-MiniLM-L6-v2', diff --git a/docs/api/search.md b/docs/api/search.md index 60c73be..bc9b042 100644 --- a/docs/api/search.md +++ b/docs/api/search.md @@ -8,11 +8,10 @@ Initialize a table for vector search. Generates embeddings and index. Creates tr ```sql vectorize."table"( - "table" TEXT, + "table_name" REGCLASS, "columns" TEXT[], "job_name" TEXT, "primary_key" TEXT, - "schema" TEXT DEFAULT 'public', "update_col" TEXT DEFAULT 'last_updated_at', "transformer" TEXT DEFAULT 'sentence-transformers/all-MiniLM-L6-v2', "index_dist_type" vectorize.IndexDist DEFAULT 'pgv_hnsw_cosine', @@ -23,12 +22,11 @@ vectorize."table"( | Parameter | Type | Description | | :--- | :---- | :--- | -| table | text | The name of the table to be initialized. | +| table_name | regclass | The name of the table to be initialized. Automatically includes schema information. | | columns | text | The name of the columns that contains the content that is used for context for RAG. Multiple columns are concatenated. | | job_name | text | A unique name for the project. | | primary_key | text | The name of the column that contains the unique record id. | | args | json | Additional arguments for the transformer. Defaults to '{}'. | -| schema | text | The name of the schema where the table is located. Defaults to 'public'. | | update_col | text | Column specifying the last time the record was updated. Required for cron-like schedule. Defaults to `last_updated_at` | | transformer | text | The name of the transformer to use for the embeddings. Defaults to 'text-embedding-ada-002'. | | index_dist_type | IndexDist | The name of index type to build. Defaults to 'pgv_hnsw_cosine'. | diff --git a/extension/sql/vectorize--0.18.2--0.18.3.sql b/extension/sql/vectorize--0.18.2--0.18.3.sql index e69de29..ce65bb3 100644 --- a/extension/sql/vectorize--0.18.2--0.18.3.sql +++ b/extension/sql/vectorize--0.18.2--0.18.3.sql @@ -0,0 +1,18 @@ +DROP function vectorize."table"; + +-- vectorize::api::table +CREATE FUNCTION vectorize."table"( + "table_name" REGCLASS, /* PgOid*/ + "columns" TEXT[], /* alloc::vec::Vec */ + "job_name" TEXT, /* alloc::string::String */ + "primary_key" TEXT, /* alloc::string::String */ + "args" json DEFAULT '{}', /* pgrx::datum::json::Json */ + "update_col" TEXT DEFAULT 'last_updated_at', /* alloc::string::String */ + "transformer" vectorize.Transformer DEFAULT 'openai', /* vectorize::types::Transformer */ + "search_alg" vectorize.SimilarityAlg DEFAULT 'pgv_cosine_similarity', /* vectorize::types::SimilarityAlg */ + "table_method" vectorize.TableMethod DEFAULT 'append', /* vectorize::types::TableMethod */ + "schedule" TEXT DEFAULT '* * * * *' /* alloc::string::String */ +) RETURNS TEXT /* core::result::Result */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'table_wrapper'; diff --git a/extension/src/api.rs b/extension/src/api.rs index 700cc6b..f4cb425 100644 --- a/extension/src/api.rs +++ b/extension/src/api.rs @@ -7,17 +7,17 @@ use crate::transformers::transform; use crate::types; use anyhow::Result; +use pgrx::pg_sys::PgOid; use pgrx::prelude::*; use vectorize_core::types::Model; #[allow(clippy::too_many_arguments)] #[pg_extern] fn table( - table: &str, + table_name: PgOid, columns: Vec, job_name: &str, primary_key: &str, - schema: default!(&str, "'public'"), update_col: default!(String, "'last_updated_at'"), index_dist_type: default!(types::IndexDist, "'pgv_hnsw_cosine'"), transformer: default!(&str, "'sentence-transformers/all-MiniLM-L6-v2'"), @@ -28,10 +28,10 @@ fn table( schedule: default!(&str, "'* * * * *'"), ) -> Result { let model = Model::new(transformer)?; + let table_name_str = table_name.to_regclass()?.to_string(); init_table( job_name, - schema, - table, + &table_name_str, columns, primary_key, Some(update_col), From 09d16ef99cf3100035312862129765d67fbbcda8 Mon Sep 17 00:00:00 2001 From: Vamshi Maskuri <117595548+varshith257@users.noreply.github.com> Date: Sat, 19 Oct 2024 09:48:46 +0530 Subject: [PATCH 2/4] fix clippy issues --- extension/src/api.rs | 4 ++-- extension/src/init.rs | 5 +---- extension/src/search.rs | 9 +++++---- extension/src/util.rs | 9 +++++++++ 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/extension/src/api.rs b/extension/src/api.rs index f4cb425..7a35754 100644 --- a/extension/src/api.rs +++ b/extension/src/api.rs @@ -5,9 +5,9 @@ use crate::search::{self, init_table}; use crate::transformers::generic::env_interpolate_string; use crate::transformers::transform; use crate::types; +use crate::util::pg_oid_to_table_name; use anyhow::Result; -use pgrx::pg_sys::PgOid; use pgrx::prelude::*; use vectorize_core::types::Model; @@ -28,7 +28,7 @@ fn table( schedule: default!(&str, "'* * * * *'"), ) -> Result { let model = Model::new(transformer)?; - let table_name_str = table_name.to_regclass()?.to_string(); + let table_name_str = pg_oid_to_table_name(table_name); init_table( job_name, &table_name_str, diff --git a/extension/src/init.rs b/extension/src/init.rs index ac92a50..78a8ee2 100644 --- a/extension/src/init.rs +++ b/extension/src/init.rs @@ -239,7 +239,7 @@ fn append_embedding_column(job_name: &str, schema: &str, table: &str, col_type: ) } -pub fn get_column_datatype(schema: &str, table: &str, column: &str) -> Result { +pub fn get_column_datatype(table: &str, column: &str) -> Result { Spi::get_one_with_args( " SELECT data_type @@ -250,7 +250,6 @@ pub fn get_column_datatype(schema: &str, table: &str, column: &str) -> Result Result Result, primary_key: &str, update_col: Option, @@ -28,6 +27,8 @@ pub fn init_table( // cron-like for a cron based update model, or 'realtime' for a trigger-based schedule: &str, ) -> Result { + let table_name_str = pg_oid_to_table_name(table_name); + // validate table method // realtime is only compatible with the join method if schedule == "realtime" && table_method != TableMethod::join { @@ -35,7 +36,7 @@ pub fn init_table( } // get prim key type - let pkey_type = init::get_column_datatype(schema, table, primary_key)?; + let pkey_type = init::get_column_datatype(&table_name_str, primary_key)?; init::init_pgmq()?; let guc_configs = get_guc_configs(&transformer.source); diff --git a/extension/src/util.rs b/extension/src/util.rs index 2b5f3c2..49610b1 100644 --- a/extension/src/util.rs +++ b/extension/src/util.rs @@ -1,9 +1,11 @@ use anyhow::Result; +use pgrx::pg_sys::{regclassout, Oid}; use pgrx::spi::SpiTupleTable; use pgrx::*; use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; use sqlx::{Pool, Postgres}; use std::env; +use std::ffi::CStr; use url::{ParseError, Url}; use crate::guc; @@ -204,6 +206,13 @@ pub fn get_pg_options(cfg: Config) -> Result { } } +pub fn pg_oid_to_table_name(oid: PgOid) -> String { + unsafe { + let regclass_cstring = regclassout(oid.value() as Oid); + CStr::from_ptr(regclass_cstring).to_string_lossy().into_owned() + } +} + pub async fn ready(conn: &Pool) -> bool { sqlx::query_scalar( "SELECT EXISTS ( From b86f1785b6337490984e634effde3605c8cb114d Mon Sep 17 00:00:00 2001 From: Vamshi Maskuri <117595548+varshith257@users.noreply.github.com> Date: Sat, 19 Oct 2024 10:21:30 +0530 Subject: [PATCH 3/4] fmt --- extension/src/util.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/extension/src/util.rs b/extension/src/util.rs index 49610b1..b030991 100644 --- a/extension/src/util.rs +++ b/extension/src/util.rs @@ -209,7 +209,9 @@ pub fn get_pg_options(cfg: Config) -> Result { pub fn pg_oid_to_table_name(oid: PgOid) -> String { unsafe { let regclass_cstring = regclassout(oid.value() as Oid); - CStr::from_ptr(regclass_cstring).to_string_lossy().into_owned() + CStr::from_ptr(regclass_cstring) + .to_string_lossy() + .into_owned() } } From 86455d0a87d038e44674317f586f2c08f70f477d Mon Sep 17 00:00:00 2001 From: Vamshi Maskuri <117595548+varshith257@users.noreply.github.com> Date: Sat, 19 Oct 2024 11:12:18 +0530 Subject: [PATCH 4/4] refactor --- core/Cargo.toml | 1 + core/src/types.rs | 4 ++-- extension/src/api.rs | 6 ++---- extension/src/init.rs | 4 ++-- extension/src/search.rs | 9 ++++----- extension/src/util.rs | 15 ++++++++------- 6 files changed, 19 insertions(+), 20 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index a941f7a..347e7da 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -21,6 +21,7 @@ lazy_static = "1.4.0" log = "0.4.21" ollama-rs = "=0.2.1" pgmq = "0.29" +pgrx = "=0.12.5" regex = "1.9.2" reqwest = {version = "0.11.18", features = ["json"] } serde = { version = "1.0.173", features = ["derive"] } diff --git a/core/src/types.rs b/core/src/types.rs index 3828a1a..8b3f86f 100644 --- a/core/src/types.rs +++ b/core/src/types.rs @@ -1,5 +1,6 @@ use chrono::serde::ts_seconds_option::deserialize as from_tsopt; +use pgrx::pg_sys::Oid; use serde::{Deserialize, Serialize}; use sqlx::types::chrono::Utc; use sqlx::FromRow; @@ -103,8 +104,7 @@ pub enum TableMethod { #[derive(Clone, Debug, Default, Serialize, Deserialize, FromRow)] pub struct JobParams { - pub schema: String, - pub table: String, + pub table: PgOid, pub columns: Vec, pub update_time_col: Option, pub table_method: TableMethod, diff --git a/extension/src/api.rs b/extension/src/api.rs index 7a35754..2dbd246 100644 --- a/extension/src/api.rs +++ b/extension/src/api.rs @@ -5,7 +5,6 @@ use crate::search::{self, init_table}; use crate::transformers::generic::env_interpolate_string; use crate::transformers::transform; use crate::types; -use crate::util::pg_oid_to_table_name; use anyhow::Result; use pgrx::prelude::*; @@ -28,10 +27,10 @@ fn table( schedule: default!(&str, "'* * * * *'"), ) -> Result { let model = Model::new(transformer)?; - let table_name_str = pg_oid_to_table_name(table_name); + init_table( job_name, - &table_name_str, + table_name, columns, primary_key, Some(update_col), @@ -107,7 +106,6 @@ fn init_rag( let transformer_model = Model::new(transformer)?; init_table( agent_name, - schema, table_name, columns, unique_record_id, diff --git a/extension/src/init.rs b/extension/src/init.rs index 78a8ee2..3ef91e7 100644 --- a/extension/src/init.rs +++ b/extension/src/init.rs @@ -256,14 +256,14 @@ pub fn get_column_datatype(table: &str, column: &str) -> Result { ) .map_err(|_| { anyhow!( - "One of schema:`{}`, table:`{}`, column:`{}` does not exist.", + "One of table:`{}`, column:`{}` does not exist.", table, column ) })? .ok_or_else(|| { anyhow!( - "An unknown error occurred while fetching the data type for column `{}` in `{}.{}`.", + "An unknown error occurred while fetching the data type for column `{}` in `{}`.", table, column ) diff --git a/extension/src/search.rs b/extension/src/search.rs index 9dba1d2..6cf7a55 100644 --- a/extension/src/search.rs +++ b/extension/src/search.rs @@ -36,7 +36,7 @@ pub fn init_table( } // get prim key type - let pkey_type = init::get_column_datatype(&table_name_str, primary_key)?; + let pkey_type = init::get_column_datatype(table_name, primary_key)?; init::init_pgmq()?; let guc_configs = get_guc_configs(&transformer.source); @@ -102,8 +102,7 @@ pub fn init_table( }; let valid_params = types::JobParams { - schema: schema.to_string(), - table: table.to_string(), + table: table_name_str.clone(), columns: columns.clone(), update_time_col: update_col, table_method: table_method.clone(), @@ -168,8 +167,8 @@ pub fn init_table( // setup triggers // create the trigger if not exists let trigger_handler = create_trigger_handler(job_name, &columns, primary_key); - let insert_trigger = create_event_trigger(job_name, schema, table, "INSERT"); - let update_trigger = create_event_trigger(job_name, schema, table, "UPDATE"); + let insert_trigger = create_event_trigger(job_name, table_name_str.clone(), "INSERT"); + let update_trigger = create_event_trigger(job_name, table_name_str.clone(), "UPDATE"); let _: Result<_, spi::Error> = Spi::connect(|mut c| { let _r = c.update(&trigger_handler, None, None)?; let _r = c.update(&insert_trigger, None, None)?; diff --git a/extension/src/util.rs b/extension/src/util.rs index b030991..25e8f16 100644 --- a/extension/src/util.rs +++ b/extension/src/util.rs @@ -5,7 +5,6 @@ use pgrx::*; use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; use sqlx::{Pool, Postgres}; use std::env; -use std::ffi::CStr; use url::{ParseError, Url}; use crate::guc; @@ -207,12 +206,14 @@ pub fn get_pg_options(cfg: Config) -> Result { } pub fn pg_oid_to_table_name(oid: PgOid) -> String { - unsafe { - let regclass_cstring = regclassout(oid.value() as Oid); - CStr::from_ptr(regclass_cstring) - .to_string_lossy() - .into_owned() - } + let query = "SELECT relname FROM pg_class WHERE oid = $1"; + let table_name: String = Spi::get_one_with_args( + query, + vec![(PgBuiltInOids::REGCLASSOID.oid(), oid.into_datum())] + ) + .expect("Failed to fetch table name from oid") + .unwrap_or_else(|| panic!("Table name not found for oid: {}", oid.value())); + table_name } pub async fn ready(conn: &Pool) -> bool {