Skip to content

Commit

Permalink
added functionality to api.rs
Browse files Browse the repository at this point in the history
need review for this functionality
  • Loading branch information
harshtech123 committed Oct 22, 2024
1 parent 4857eb6 commit 7de892c
Showing 1 changed file with 79 additions and 0 deletions.
79 changes: 79 additions & 0 deletions extension/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,77 @@ use anyhow::Result;
use pgrx::prelude::*;
use vectorize_core::types::Model;

fn chunk_text(text: &str, max_chunk_size: usize) -> Vec<String> {
let mut chunks = Vec::new();
let mut start = 0;

// Loop through the text and create chunks
while start < text.len() {
let end = (start + max_chunk_size).min(text.len());
let chunk = text[start..end].to_string();
chunks.push(chunk);
start = end;
}

chunks
}

#[pg_extern]
fn chunk_table(
input_table: &str,
column_name: &str,
max_chunk_size: default!(i32, 1000),
output_table: default!(&str, "'chunked_data'"),
) -> Result<String> {
let max_chunk_size = max_chunk_size as usize;

// Retrieve rows from the input table, ensuring column existence
let query = format!("SELECT id, {} FROM {}", column_name, input_table);

// Reverting back to use get_two
let (id_opt, text_opt): (Option<i32>, Option<String>) = Spi::get_two(&query)?;
let rows = vec![(id_opt, text_opt)]; // Wrap in a vector if needed


// Prepare to hold chunked rows
let mut chunked_rows: Vec<(i32, i32, String)> = Vec::new(); // (original_id, chunk_index, chunk)

// Chunk the data and keep track of the original id and chunk index
for (id_opt, text_opt) in rows {
// Only process rows where both id and text exist
if let (Some(id), Some(text)) = (id_opt, text_opt.map(|s| s.to_string())) {
let chunks = chunk_text(&text, max_chunk_size);
for (index, chunk) in chunks.iter().enumerate() {
chunked_rows.push((id, index as i32, chunk.clone())); // Add chunk index
}
}

}

// Create output table with an additional column for chunk index
let create_table_query = format!(
"CREATE TABLE IF NOT EXISTS {} (id SERIAL PRIMARY KEY, original_id INT, chunk_index INT, chunk TEXT)",
output_table
);
Spi::run(&create_table_query)
.map_err(|e| anyhow::anyhow!("Failed to create table {}: {}", output_table, e))?;

// Insert chunked rows into output table
for (original_id, chunk_index, chunk) in chunked_rows {
let insert_query = format!(
"INSERT INTO {} (original_id, chunk_index, chunk) VALUES ($1, $2, $3)",
output_table
);
Spi::run_with_args(&insert_query, Some(vec![
(pgrx::PgOid::Custom(pgrx::pg_sys::INT4OID), original_id.into_datum()), // OID for integer
(pgrx::PgOid::Custom(pgrx::pg_sys::INT4OID), chunk_index.into_datum()), // OID for integer
(pgrx::PgOid::Custom(pgrx::pg_sys::TEXTOID), chunk.into_datum()), // OID for text
]))?;
}

Ok(format!("Chunked data inserted into table: {}", output_table))
}

#[allow(clippy::too_many_arguments)]
#[pg_extern]
fn table(
Expand All @@ -26,7 +97,15 @@ fn table(
table_method: default!(types::TableMethod, "'join'"),
// cron-like for a cron based update model, or 'realtime' for a trigger-based
schedule: default!(&str, "'* * * * *'"),
chunk_input: default!(bool, false), // New parameter to enable chunking
max_chunk_size: default!(i32, 1000), // New parameter for chunk size
) -> Result<String> {
if chunk_input {
// Call chunk_table if chunking is enabled
chunk_table(table, &columns[0], max_chunk_size, "'chunked_data'")?;
}

// Proceed with the original table initialization logic
let model = Model::new(transformer)?;
init_table(
job_name,
Expand Down

0 comments on commit 7de892c

Please sign in to comment.