From 7de892c714dc74524c84d88fef6cc999ca90d979 Mon Sep 17 00:00:00 2001 From: harshtech123 Date: Tue, 22 Oct 2024 16:45:04 +0530 Subject: [PATCH] added functionality to api.rs need review for this functionality --- extension/src/api.rs | 79 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/extension/src/api.rs b/extension/src/api.rs index 700cc6b..eb41aa0 100644 --- a/extension/src/api.rs +++ b/extension/src/api.rs @@ -10,6 +10,77 @@ use anyhow::Result; use pgrx::prelude::*; use vectorize_core::types::Model; +fn chunk_text(text: &str, max_chunk_size: usize) -> Vec { + let mut chunks = Vec::new(); + let mut start = 0; + + // Loop through the text and create chunks + while start < text.len() { + let end = (start + max_chunk_size).min(text.len()); + let chunk = text[start..end].to_string(); + chunks.push(chunk); + start = end; + } + + chunks +} + +#[pg_extern] +fn chunk_table( + input_table: &str, + column_name: &str, + max_chunk_size: default!(i32, 1000), + output_table: default!(&str, "'chunked_data'"), +) -> Result { + let max_chunk_size = max_chunk_size as usize; + + // Retrieve rows from the input table, ensuring column existence + let query = format!("SELECT id, {} FROM {}", column_name, input_table); + + // Reverting back to use get_two + let (id_opt, text_opt): (Option, Option) = Spi::get_two(&query)?; + let rows = vec![(id_opt, text_opt)]; // Wrap in a vector if needed + + + // Prepare to hold chunked rows + let mut chunked_rows: Vec<(i32, i32, String)> = Vec::new(); // (original_id, chunk_index, chunk) + + // Chunk the data and keep track of the original id and chunk index + for (id_opt, text_opt) in rows { + // Only process rows where both id and text exist + if let (Some(id), Some(text)) = (id_opt, text_opt.map(|s| s.to_string())) { + let chunks = chunk_text(&text, max_chunk_size); + for (index, chunk) in chunks.iter().enumerate() { + chunked_rows.push((id, index as i32, chunk.clone())); // Add chunk index + } + } + + } + + // Create output table with an additional column for chunk index + let create_table_query = format!( + "CREATE TABLE IF NOT EXISTS {} (id SERIAL PRIMARY KEY, original_id INT, chunk_index INT, chunk TEXT)", + output_table + ); + Spi::run(&create_table_query) + .map_err(|e| anyhow::anyhow!("Failed to create table {}: {}", output_table, e))?; + + // Insert chunked rows into output table + for (original_id, chunk_index, chunk) in chunked_rows { + let insert_query = format!( + "INSERT INTO {} (original_id, chunk_index, chunk) VALUES ($1, $2, $3)", + output_table + ); + Spi::run_with_args(&insert_query, Some(vec![ + (pgrx::PgOid::Custom(pgrx::pg_sys::INT4OID), original_id.into_datum()), // OID for integer + (pgrx::PgOid::Custom(pgrx::pg_sys::INT4OID), chunk_index.into_datum()), // OID for integer + (pgrx::PgOid::Custom(pgrx::pg_sys::TEXTOID), chunk.into_datum()), // OID for text + ]))?; + } + + Ok(format!("Chunked data inserted into table: {}", output_table)) +} + #[allow(clippy::too_many_arguments)] #[pg_extern] fn table( @@ -26,7 +97,15 @@ fn table( table_method: default!(types::TableMethod, "'join'"), // cron-like for a cron based update model, or 'realtime' for a trigger-based schedule: default!(&str, "'* * * * *'"), + chunk_input: default!(bool, false), // New parameter to enable chunking + max_chunk_size: default!(i32, 1000), // New parameter for chunk size ) -> Result { + if chunk_input { + // Call chunk_table if chunking is enabled + chunk_table(table, &columns[0], max_chunk_size, "'chunked_data'")?; + } + + // Proceed with the original table initialization logic let model = Model::new(transformer)?; init_table( job_name,