diff --git a/Cargo.lock b/Cargo.lock index 90ea1fa764..4ce19c9f4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3007,6 +3007,8 @@ version = "0.1.0" dependencies = [ "dozer-ingestion-connector", "env_logger 0.11.2", + "fxhash", + "memchr", "oracle", "regex", ] @@ -3860,6 +3862,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "genawaiter" version = "0.99.1" @@ -4339,7 +4350,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.6", "tokio", "tower-service", "tracing", @@ -5238,9 +5249,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.7.1" +version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" [[package]] name = "memoffset" diff --git a/dozer-ingestion/oracle/Cargo.toml b/dozer-ingestion/oracle/Cargo.toml index 2532eab1ad..c94ee72d63 100644 --- a/dozer-ingestion/oracle/Cargo.toml +++ b/dozer-ingestion/oracle/Cargo.toml @@ -8,6 +8,8 @@ license = "AGPL-3.0-or-later" [dependencies] dozer-ingestion-connector = { path = "../connector" } +fxhash = "0.2.1" +memchr = "2.7.2" oracle = { version = "0.5.7", features = ["chrono", "stmt_without_lifetime"] } regex = "1.10.3" diff --git a/dozer-ingestion/oracle/src/connector/mod.rs b/dozer-ingestion/oracle/src/connector/mod.rs index 5fb3be5961..ff1384ea85 100644 --- a/dozer-ingestion/oracle/src/connector/mod.rs +++ b/dozer-ingestion/oracle/src/connector/mod.rs @@ -12,9 +12,8 @@ use dozer_ingestion_connector::{ log::{debug, error}, models::ingestion_types::{IngestionMessage, OracleReplicator, TransactionInfo}, node::OpIdentifier, - rust_decimal::{self, Decimal}, - thiserror, - types::{FieldType, Operation, Schema}, + rust_decimal, thiserror, + types::{Operation, Schema}, }, Ingestor, SourceSchema, TableIdentifier, TableInfo, }; @@ -33,7 +32,15 @@ pub struct Connector { } #[derive(Debug, thiserror::Error)] -pub enum Error { +pub(crate) enum ParseDateError { + #[error("Invalid date format: {0}")] + Chrono(#[from] chrono::ParseError), + #[error("Invalid oracle format")] + Oracle, +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum Error { #[error("oracle error: {0:?}")] Oracle(#[from] oracle::Error), #[error("pdb not found: {0}")] @@ -58,26 +65,16 @@ pub enum Error { DeleteFailedToMatch(String), #[error("update failed to match: {0}")] UpdateFailedToMatch(String), - #[error("field {0} not found")] - FieldNotFound(String), #[error("null value for non-nullable field {0}")] NullValue(String), #[error("cannot parse float: {0}")] ParseFloat(#[from] ParseFloatError), #[error("cannot parse date time from {1}: {0}")] - ParseDateTime(#[source] chrono::ParseError, String), - #[error("got overflow float number {0}")] - FloatOverflow(Decimal), + ParseDateTime(ParseDateError, String), #[error("got error when parsing uint {0}")] - ParseUIntFailed(Decimal), + ParseUIntFailed(String), #[error("got error when parsing int {0}")] - ParseIntFailed(Decimal), - #[error("type mismatch for {field}, expected {expected:?}, actual {actual:?}")] - TypeMismatch { - field: String, - expected: FieldType, - actual: FieldType, - }, + ParseIntFailed(String), } /// `oracle`'s `ToSql` implementation for `&str` uses `NVARCHAR2` type, which Oracle expects to be UTF16 encoded by default. @@ -377,7 +374,13 @@ impl Connector { }) }; - for transaction in processor.process(receiver) { + let mut recv = receiver.into_iter(); + let first = processor + .process(recv.by_ref()) + .find(|op| !op.as_ref().unwrap().operations.is_empty()) + .unwrap() + .unwrap(); + for transaction in recv.map(|_| Ok::<_, Error>(first.clone())) { let transaction = match transaction { Ok(transaction) => transaction, Err(e) => { diff --git a/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs b/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs index a2edd8b05d..454d420d20 100644 --- a/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs +++ b/dozer-ingestion/oracle/src/connector/replicate/log/mod.rs @@ -1,3 +1,4 @@ +use std::time::Instant; use std::{sync::mpsc::SyncSender, time::Duration}; use dozer_ingestion_connector::dozer_types::log::debug; @@ -31,6 +32,7 @@ pub struct LogManagerContent { pub rbasqn: u32, pub sql_redo: Option, pub csf: u8, + pub received: Instant, } /// `ingestor` is only used for checking if ingestion has ended so we can break the loop. diff --git a/dozer-ingestion/oracle/src/connector/replicate/log/redo/log_miner.rs b/dozer-ingestion/oracle/src/connector/replicate/log/redo/log_miner.rs index 513eb9e6a7..655f47a9aa 100644 --- a/dozer-ingestion/oracle/src/connector/replicate/log/redo/log_miner.rs +++ b/dozer-ingestion/oracle/src/connector/replicate/log/redo/log_miner.rs @@ -1,4 +1,4 @@ -use std::env; +use std::{env, time::Instant}; use dozer_ingestion_connector::dozer_types::{ chrono::{DateTime, Utc}, @@ -11,12 +11,12 @@ use crate::connector::{Error, Scn}; use super::{LogManagerContent, RedoReader}; #[derive(Debug, Clone, Copy)] -pub struct LogMiner { +pub(crate) struct LogMiner { pub fetch_batch_size: u32, } #[derive(Debug)] -pub struct LogMinerIter<'a> { +pub(crate) struct LogMinerIter<'a> { result_set: ResultSet<'a, LogManagerContent>, connection: &'a Connection, } @@ -62,7 +62,6 @@ impl RedoReader for LogMiner { STARTSCN => :start_scn, OPTIONS => DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG + - DBMS_LOGMNR.PRINT_PRETTY_SQL + DBMS_LOGMNR.NO_ROWID_IN_STMT ); END;"; @@ -74,7 +73,6 @@ impl RedoReader for LogMiner { DBMS_LOGMNR.START_LOGMNR( OPTIONS => DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG + - DBMS_LOGMNR.PRINT_PRETTY_SQL + DBMS_LOGMNR.NO_ROWID_IN_STMT ); END;"; @@ -162,6 +160,7 @@ impl RowValue for LogManagerContent { rbasqn, sql_redo, csf, + received: Instant::now(), }) } } diff --git a/dozer-ingestion/oracle/src/connector/replicate/log/redo/mod.rs b/dozer-ingestion/oracle/src/connector/replicate/log/redo/mod.rs index bfbc64cda2..0f58efdfd8 100644 --- a/dozer-ingestion/oracle/src/connector/replicate/log/redo/mod.rs +++ b/dozer-ingestion/oracle/src/connector/replicate/log/redo/mod.rs @@ -3,7 +3,7 @@ use oracle::Connection; use crate::connector::{Error, Scn}; /// Given a log file name, a redo reader emits `LogManagerContent` rows -pub trait RedoReader { +pub(crate) trait RedoReader { type Iterator<'a>: Iterator>; /// Reads the `LogManagerContent` rows that have: @@ -20,6 +20,6 @@ pub trait RedoReader { mod log_miner; -pub use log_miner::LogMiner; +pub(crate) use log_miner::LogMiner; use super::LogManagerContent; diff --git a/dozer-ingestion/oracle/src/connector/replicate/mod.rs b/dozer-ingestion/oracle/src/connector/replicate/mod.rs index 284bfee75b..4a8d686830 100644 --- a/dozer-ingestion/oracle/src/connector/replicate/mod.rs +++ b/dozer-ingestion/oracle/src/connector/replicate/mod.rs @@ -1,5 +1,5 @@ mod log; -mod transaction; +pub mod transaction; pub use log::log_miner_loop; pub use transaction::Processor; diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/mod.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/mod.rs index bf54d3e1ef..ef7f8409df 100644 --- a/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/mod.rs +++ b/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/mod.rs @@ -12,11 +12,11 @@ use crate::connector::{ pub struct Transaction { pub commit_scn: Scn, pub commit_timestamp: DateTime, - pub operations: Vec, + pub operations: Vec, } #[derive(Debug, Clone)] -pub struct Operation { +pub struct RawOperation { pub seg_owner: String, pub table_name: String, pub kind: OperationKind, @@ -52,7 +52,7 @@ impl Aggregator { } } -type TransactionForest = forest::Forest>; +type TransactionForest = forest::Forest>; #[derive(Debug)] struct Processor> { @@ -120,7 +120,7 @@ impl> Iterator for Processor { op::process_operation( content.xid, content.pxid, - Operation { + RawOperation { seg_owner, table_name, kind, diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/op.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/op.rs index ad393b4729..bfdf57ce6d 100644 --- a/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/op.rs +++ b/dozer-ingestion/oracle/src/connector/replicate/transaction/aggregate/op.rs @@ -2,12 +2,12 @@ use dozer_ingestion_connector::dozer_types::log::warn; use crate::connector::replicate::log::TransactionId; -use super::{Operation, TransactionForest}; +use super::{RawOperation, TransactionForest}; pub fn process_operation( xid: TransactionId, pxid: TransactionId, - operation: Operation, + operation: RawOperation, transaction_forest: &mut TransactionForest, ) { if xid == pxid { diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/map.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/map.rs index 34e76583af..7cf4ccc9bf 100644 --- a/dozer-ingestion/oracle/src/connector/replicate/transaction/map.rs +++ b/dozer-ingestion/oracle/src/connector/replicate/transaction/map.rs @@ -1,94 +1,19 @@ +use std::borrow::Cow; + use dozer_ingestion_connector::dozer_types::{ - chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, ParseError, Utc}, + chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, Utc}, ordered_float::OrderedFloat, - rust_decimal::prelude::ToPrimitive, - types::{Field, FieldType, Operation, Record, Schema}, -}; - -use crate::connector::Error; - -use super::{ - parse::{ParsedOperation, ParsedOperationKind, ParsedRow, ParsedTransaction, ParsedValue}, - Transaction, + types::{Field, FieldType, Record, Schema}, }; +use memchr::memchr; -#[derive(Debug, Clone)] -pub struct Mapper { - schemas: Vec, -} - -impl Mapper { - pub fn new(schemas: Vec) -> Self { - Self { schemas } - } - - pub fn process<'a>( - &'a self, - iterator: impl Iterator> + 'a, - ) -> impl Iterator> + 'a { - Processor { - iterator, - mapper: self, - } - } - - fn map(&self, operation: ParsedOperation) -> Result<(usize, Operation), Error> { - let schema = &self.schemas[operation.table_index]; - Ok(( - operation.table_index, - match operation.kind { - ParsedOperationKind::Insert(row) => Operation::Insert { - new: map_row(row, schema)?, - }, - ParsedOperationKind::Delete(row) => Operation::Delete { - old: map_row(row, schema)?, - }, - ParsedOperationKind::Update { old, new } => Operation::Update { - old: map_row(old, schema)?, - new: map_row(new, schema)?, - }, - }, - )) - } -} - -#[derive(Debug)] -struct Processor<'a, I: Iterator>> { - iterator: I, - mapper: &'a Mapper, -} +use crate::connector::{Error, ParseDateError}; -impl<'a, I: Iterator>> Iterator for Processor<'a, I> { - type Item = Result; +use super::parse::ParsedRow; - fn next(&mut self) -> Option { - let transaction = match self.iterator.next()? { - Ok(transaction) => transaction, - Err(err) => return Some(Err(err)), - }; - - let mut operations = vec![]; - for operation in transaction.operations { - match self.mapper.map(operation) { - Ok(operation) => operations.push(operation), - Err(err) => return Some(Err(err)), - } - } - - Some(Ok(Transaction { - commit_scn: transaction.commit_scn, - commit_timestamp: transaction.commit_timestamp, - operations, - })) - } -} - -fn map_row(mut row: ParsedRow, schema: &Schema) -> Result { +pub fn map_row(row: ParsedRow, schema: &Schema) -> Result { let mut values = vec![]; - for field in &schema.fields { - let value = row - .remove(&field.name) - .ok_or_else(|| Error::FieldNotFound(field.name.clone()))?; + for (field, value) in schema.fields.iter().zip(row) { values.push(map_value(value, field.typ, field.nullable, &field.name)?); } @@ -96,128 +21,85 @@ fn map_row(mut row: ParsedRow, schema: &Schema) -> Result { } fn map_value( - value: ParsedValue, + value: Option>, typ: FieldType, nullable: bool, name: &str, ) -> Result { - match (value, typ, nullable) { - (ParsedValue::Null, _, false) => Err(Error::NullValue(name.to_string())), - (ParsedValue::Null, _, true) => Ok(Field::Null), - (ParsedValue::String(string), FieldType::Float, _) => { - Ok(Field::Float(OrderedFloat(string.parse()?))) + let Some(string) = value else { + if nullable { + return Ok(Field::Null); + } else { + return Err(Error::NullValue(name.to_owned())); } - (ParsedValue::Number(number), FieldType::Float, _) => Ok(Field::Float(OrderedFloat( - number - .to_f64() - .ok_or_else(|| Error::FloatOverflow(number))?, - ))), - (ParsedValue::String(s), FieldType::Decimal, nullable) => { - let string = s.replace(',', ""); - - if string == *"NULL" { - if nullable { - Ok(Field::Null) - } else { - Err(Error::NullValue(name.to_string())) - } - } else { - Ok(Field::Decimal( - string - .parse() - .map_err(|e| Error::NumberToDecimal(e, string))?, - )) - } + }; + match typ { + FieldType::Float => Ok(Field::Float(OrderedFloat(string.parse()?))), + FieldType::Decimal => { + let string = string.replace(',', ""); + Ok(Field::Decimal( + string + .parse() + .map_err(|e| Error::NumberToDecimal(e, string))?, + )) } - (ParsedValue::Number(number), FieldType::Decimal, _) => Ok(Field::Decimal(number)), - (ParsedValue::Number(number), FieldType::Int, _) => Ok(Field::Int( - number - .to_i64() - .ok_or_else(|| Error::ParseIntFailed(number))?, + FieldType::Int => Ok(Field::Int( + string + .parse() + .map_err(|_| Error::ParseIntFailed(string.into()))?, )), - (ParsedValue::Number(number), FieldType::UInt, _) => Ok(Field::UInt( - number - .to_u64() - .ok_or_else(|| Error::ParseUIntFailed(number))?, + FieldType::UInt => Ok(Field::UInt( + string + .parse() + .map_err(|_| Error::ParseUIntFailed(string.into()))?, )), - (ParsedValue::String(string), FieldType::String, _) => Ok(Field::String(string)), - (ParsedValue::Number(_), FieldType::String, _) => Err(Error::TypeMismatch { - field: name.to_string(), - expected: FieldType::String, - actual: FieldType::Decimal, - }), - (_, FieldType::Binary, _) => unimplemented!("parse binary from redo sql"), - (ParsedValue::String(string), FieldType::Date, _) => Ok(Field::Date( - parse_date(&string).map_err(|e| Error::ParseDateTime(e, string))?, + FieldType::String => Ok(Field::String(string.into())), + FieldType::Binary => unimplemented!("parse binary from redo sql"), + FieldType::Date => Ok(Field::Date( + parse_date(&string).map_err(|e| Error::ParseDateTime(e, string.into()))?, )), - (ParsedValue::Number(_), FieldType::Date, _) => Err(Error::TypeMismatch { - field: name.to_string(), - expected: FieldType::Date, - actual: FieldType::Decimal, - }), - (ParsedValue::String(string), FieldType::Timestamp, _) => Ok(Field::Timestamp( - parse_date_time(&string).map_err(|e| Error::ParseDateTime(e, string))?, + FieldType::Timestamp => Ok(Field::Timestamp( + parse_date_time(&string).map_err(|e| Error::ParseDateTime(e, string.into()))?, )), - (ParsedValue::Number(_), FieldType::Timestamp, _) => Err(Error::TypeMismatch { - field: name.to_string(), - expected: FieldType::Timestamp, - actual: FieldType::Decimal, - }), _ => unreachable!(), } } -fn parse_date(string: &str) -> Result { - NaiveDate::parse_from_str(string, "%d-%m-%Y") +fn parse_date(string: &str) -> Result { + const TO_DATE: &str = "TO_DATE('"; + + let date = string.get(TO_DATE.len()..).ok_or(ParseDateError::Oracle)?; + let end = memchr(b'\'', date.as_bytes()).ok_or(ParseDateError::Oracle)?; + Ok(NaiveDate::parse_from_str(&date[..end], "%d-%m-%Y")?) } -fn parse_date_time(string: &str) -> Result, ParseError> { - let date_time = NaiveDateTime::parse_from_str(string, "%d-%m-%Y %I.%M.%S%.6f %p")?; - Ok(Ok(DateTime::::from_naive_utc_and_offset(date_time, Utc))?.fixed_offset()) +fn parse_date_time(string: &str) -> Result, ParseDateError> { + const TO_TIMESTAMP: &str = "TO_TIMESTAMP('"; + + let date = string + .get(TO_TIMESTAMP.len()..) + .ok_or(ParseDateError::Oracle)?; + let end = memchr(b'\'', date.as_bytes()).ok_or(ParseDateError::Oracle)?; + let date_time = NaiveDateTime::parse_from_str(dbg!(&date[..end]), "%d-%m-%Y %I.%M.%S%.6f %p")?; + Ok(DateTime::::from_naive_utc_and_offset(date_time, Utc).fixed_offset()) } #[cfg(test)] mod tests { - use dozer_ingestion_connector::dozer_types::chrono; - use dozer_ingestion_connector::dozer_types::types::{Field, FieldType}; + use dozer_ingestion_connector::dozer_types::chrono::{self, DateTime}; #[test] fn test_parse_date() { - let date = super::parse_date("01-01-2021").unwrap(); + let date = super::parse_date("TO_DATE('01-01-2021','DD-MM-YYYY')").unwrap(); assert_eq!(date, chrono::NaiveDate::from_ymd_opt(2021, 1, 1).unwrap()); } #[test] - fn parse_malformed_decimal() { - let number = "NULL,"; - let result = super::map_value( - super::ParsedValue::String(number.to_string()), - FieldType::Decimal, - true, - "test", - ) - .unwrap(); - - assert_eq!(result, super::Field::Null); - - let result = super::map_value( - super::ParsedValue::String(number.to_string()), - FieldType::Decimal, - false, - "test", + fn test_parse_timestamp() { + let date = super::parse_date_time("TO_TIMESTAMP('01-01-2021 05.00.00.024589 AM')").unwrap(); + assert_eq!( + date, + DateTime::parse_from_rfc3339("2021-01-01T05:00:00.024589Z").unwrap() ); - - assert!(result.is_err()); - - let number = "9999999.99,"; - let result = super::map_value( - super::ParsedValue::String(number.to_string()), - FieldType::Decimal, - false, - "test", - ) - .unwrap(); - - assert_eq!(result, Field::Decimal("9999999.99".parse().unwrap())); } } diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/mod.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/mod.rs index d12afc6c81..a09e18ed66 100644 --- a/dozer-ingestion/oracle/src/connector/replicate/transaction/mod.rs +++ b/dozer-ingestion/oracle/src/connector/replicate/transaction/mod.rs @@ -10,7 +10,7 @@ use crate::connector::{Error, Scn}; use super::log::LogManagerContent; #[derive(Debug, Clone)] -pub struct Transaction { +pub struct ParsedTransaction { pub commit_scn: Scn, pub commit_timestamp: DateTime, pub operations: Vec<(usize, Operation)>, @@ -20,7 +20,6 @@ pub struct Transaction { pub struct Processor { aggregator: aggregate::Aggregator, parser: parse::Parser, - mapper: map::Mapper, } impl Processor { @@ -31,19 +30,17 @@ impl Processor { ) -> Self { Self { aggregator: aggregate::Aggregator::new(start_scn), - parser: parse::Parser::new(table_pair_to_index), - mapper: map::Mapper::new(schemas), + parser: parse::Parser::new(table_pair_to_index, schemas), } } pub fn process<'a>( &'a self, iterator: impl IntoIterator + 'a, - ) -> impl Iterator> + 'a { + ) -> impl Iterator> + 'a { let csf = csf::process(iterator.into_iter()); let transaction = self.aggregator.process(csf); - let parse = self.parser.process(transaction); - self.mapper.process(parse) + self.parser.process(transaction) } } diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/delete.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/delete.rs index 8a90dc8f8f..8b13789179 100644 --- a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/delete.rs +++ b/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/delete.rs @@ -1,62 +1 @@ -use dozer_ingestion_connector::dozer_types::log::warn; -use regex::Regex; -use crate::connector::Error; - -use super::{row, ParsedRow}; - -#[derive(Debug, Clone)] -pub struct Parser { - regex: Regex, - row_parser: row::Parser, -} - -impl Parser { - pub fn new() -> Self { - let regex = - Regex::new(r#"^delete from "((?:C##)?\w+)"\."(\w+)"\n *where\n(?s)(.+)$"#).unwrap(); - Self { - regex, - row_parser: row::Parser::new(" and", ";"), - } - } - - pub fn parse(&self, sql_redo: &str, table_pair: &(String, String)) -> Result { - let captures = self - .regex - .captures(sql_redo) - .ok_or_else(|| Error::DeleteFailedToMatch(sql_redo.to_string()))?; - let owner = captures.get(1).unwrap().as_str(); - let table_name = captures.get(2).unwrap().as_str(); - if owner != table_pair.0 || table_name != table_pair.1 { - warn!( - "Table name {}.{} doesn't match {}.{} in log content", - owner, table_name, table_pair.0, table_pair.1 - ); - } - - self.row_parser.parse(captures.get(3).unwrap().as_str()) - } -} - -#[test] -fn test_parse() { - let parser = Parser::new(); - let sql_redo = r#"delete from "HR"."EMPLOYEES" - where - "EMPLOYEE_ID" = 306 and - "FIRST_NAME" = 'Nandini' and - "LAST_NAME" = 'Shastry' and - "EMAIL" = 'NSHASTRY' and - "PHONE_NUMBER" = '1234567890' and - "JOB_ID" = 'HR_REP' and - "SALARY" = 120000 and - "COMMISSION_PCT" = .05 and - "MANAGER_ID" = 105 and - "DEPARTMENT_ID" = 10; - "#; - let parsed = parser - .parse(sql_redo, &("HR".to_string(), "EMPLOYEES".to_string())) - .unwrap(); - assert_eq!(parsed.len(), 10); -} diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/insert.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/insert.rs index a5ab56daf2..b15733d6f9 100644 --- a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/insert.rs +++ b/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/insert.rs @@ -1,63 +1,497 @@ -use dozer_ingestion_connector::dozer_types::log::warn; -use regex::Regex; +use std::borrow::Cow; -use crate::connector::Error; +use fxhash::FxHashMap; +use memchr::{memchr, memchr3_iter, memmem}; -use super::{row, ParsedRow}; +use super::ParsedRow; -#[derive(Debug, Clone)] -pub struct Parser { - regex: Regex, - row_parser: row::Parser, +const INSERT_INTO: &[u8] = "insert into ".as_bytes(); +const UPDATE: &[u8] = "update ".as_bytes(); +const DELETE_FROM: &[u8] = "delete from ".as_bytes(); + +const VALUES: &[u8] = " values ".as_bytes(); +const SET: &[u8] = " set ".as_bytes(); +const WHERE: &[u8] = " where ".as_bytes(); +const UNSUPPORTED_TYPE: &[u8] = "Unsupported Type".as_bytes(); +const UNSUPPORTED: &[u8] = "Unsupported".as_bytes(); +const NULL: &[u8] = "NULL".as_bytes(); +const IS_NULL: &[u8] = "IS NULL".as_bytes(); +const AND: &[u8] = "and ".as_bytes(); +const OR: &[u8] = "or ".as_bytes(); + +pub(super) struct DmlParser<'a> { + remaining: &'a [u8], + column_indices: &'a FxHashMap, } -impl Parser { - pub fn new() -> Self { - let regex = - Regex::new(r#"^insert into "((?:C##)?\w+)"\."(\w+)"\n *values\n(?s)(.+)$"#).unwrap(); +impl<'a> DmlParser<'a> { + pub(super) fn new(sql: &'a str, column_indices: &'a FxHashMap) -> Self { Self { - regex, - row_parser: row::Parser::new(",", ";"), + remaining: sql.as_bytes(), + column_indices, + } + } + + pub(super) fn parse_insert(&mut self) -> Option { + self.remaining = self.remaining.get(INSERT_INTO.len()..)?; + self.parse_table_name()?; + + let parsed_col_names = self.parse_column_names()?; + self.parse_column_values(parsed_col_names) + } + + pub(super) fn parse_update(&mut self) -> Option<(ParsedRow, ParsedRow)> { + self.remaining = self.remaining.get(UPDATE.len()..)?; + + self.parse_table_name()?; + + let new = dbg!(self.parse_set_clause())?; + let old = self.parse_where_clause()?; + Some((old, new)) + } + + pub(super) fn parse_delete(&mut self) -> Option { + self.remaining = self.remaining.get(DELETE_FROM.len()..)?; + + self.parse_table_name()?; + + self.parse_where_clause() + } + + fn parse_column_names(&mut self) -> Option> { + let mut in_quotes = false; + let mut ret = vec![]; + let mut start = 0; + for i in memchr3_iter(b'(', b')', b'"', self.remaining) { + let c = self.remaining[i]; + if c == b')' && !in_quotes { + self.remaining = &self.remaining[i + 1..]; + return Some(ret); + } else if c == b'(' { + continue; + } else { + if in_quotes { + // This came from utf-8 and the found character cannot split + // a UTF-8 character, so this unwrap should never panic + ret.push(unsafe { std::str::from_utf8_unchecked(&self.remaining[start..i]) }); + } else { + start = i + 1; + } + in_quotes = !in_quotes; + } } + None } - pub fn parse(&self, sql_redo: &str, table_pair: &(String, String)) -> Result { - let captures = self - .regex - .captures(sql_redo) - .ok_or_else(|| Error::InsertFailedToMatch(sql_redo.to_string()))?; - let owner = captures.get(1).unwrap().as_str(); - let table_name = captures.get(2).unwrap().as_str(); - if owner != table_pair.0 || table_name != table_pair.1 { - warn!( - "Table name {}.{} doesn't match {}.{} in log content", - owner, table_name, table_pair.0, table_pair.1 - ); + fn parse_column_values( + &mut self, + column_names: Vec<&str>, + ) -> Option>>> { + if !self.remaining.starts_with(VALUES) { + return None; } + self.remaining = &self.remaining[VALUES.len()..]; + + let start_of_values = memchr(b'(', self.remaining)?; + self.remaining = &self.remaining[start_of_values + 1..]; + let mut nesting_level = 0; + let mut in_quotes = false; + let mut start_of_value = 0; + let mut quoted_value_contains_quote = false; + let mut ret = vec![None; self.column_indices.len()]; + let mut col_no = 0; + let mut i = 0; + while i < self.remaining.len() { + let c = self.remaining[i]; + if in_quotes { + i += memchr(b'\'', &self.remaining[i..])?; + if *self.remaining.get(i + 1)? == b'\'' { + // Skip the next one + quoted_value_contains_quote = true; + i += 2; + continue; + } + in_quotes = false; + i += 1; + continue; + } - self.row_parser.parse(captures.get(3).unwrap().as_str()) + if c == b'\'' { + in_quotes = true + } else if c == b'(' && !in_quotes { + nesting_level += 1; + } else if !in_quotes && (c == b',' || c == b')') { + if c == b')' && nesting_level > 0 { + nesting_level -= 1; + i += 1; + continue; + } + + if c == b',' && nesting_level > 0 { + i += 1; + continue; + } + + let value = if self.remaining[start_of_value] == b'\'' { + assert_eq!(self.remaining[i - 1], b'\''); + Some(if quoted_value_contains_quote { + Cow::Owned( + unsafe { + std::str::from_utf8_unchecked( + &self.remaining[start_of_value + 1..(i - 1)], + ) + } + .replace("''", "'"), + ) + } else { + Cow::Borrowed(unsafe { + std::str::from_utf8_unchecked( + &self.remaining[start_of_value + 1..(i - 1)], + ) + }) + }) + } else { + let s = &self.remaining[start_of_value..i]; + assert_ne!(s, UNSUPPORTED_TYPE); + if s == NULL { + None + } else { + Some(Cow::Borrowed(unsafe { std::str::from_utf8_unchecked(s) })) + } + }; + // Get column index + let index = self.column_indices.get(column_names[col_no])?; + ret[*index] = value; + quoted_value_contains_quote = false; + + col_no += 1; + start_of_value = i + 1; + } + i += 1; + } + Some(ret) + } + + fn parse_table_name(&mut self) -> Option<()> { + let mut in_quotes = false; + let mut index = 0; + for i in memchr3_iter(b' ', b'(', b'"', self.remaining) { + let c = self.remaining[i]; + index = i; + if c == b'"' { + in_quotes = !in_quotes; + } else if !in_quotes { + break; + } + } + self.remaining = &self.remaining[index..]; + Some(()) + } + + fn parse_set_clause(&mut self) -> Option> { + self.remaining = &self.remaining[memmem::find(self.remaining, SET)? + SET.len()..]; + + let mut i = 0; + let mut in_single_quotes = false; + let mut in_double_quotes = false; + let mut quoted_value_contains_quote = false; + let mut in_column_name = true; + let mut in_column_value = false; + let mut in_special = false; + let mut column_name = ""; + let mut start = 0; + let mut nesting_level = 0; + let mut values = vec![None; self.column_indices.len()]; + + while i < self.remaining.len() { + let c = self.remaining[i]; + let lookahead = self.remaining.get(i + 1); + if in_single_quotes { + i += memchr(b'\'', &self.remaining[i..])?; + if *self.remaining.get(i + 1)? == b'\'' { + // Skip the next one + quoted_value_contains_quote = true; + i += 2; + continue; + } + in_single_quotes = false; + if nesting_level == 0 { + let pos = self.column_indices.get(column_name)?; + values[*pos] = Some(if quoted_value_contains_quote { + Cow::Owned( + unsafe { std::str::from_utf8_unchecked(&self.remaining[start + 1..i]) } + .replace("''", "'"), + ) + } else { + Cow::Borrowed(unsafe { + std::str::from_utf8_unchecked(&self.remaining[start + 1..i]) + }) + }); + start = i + 1; + in_column_name = false; + in_column_value = false; + } + i += 1; + continue; + } + + if c == b'"' && in_column_name { + if in_double_quotes { + in_double_quotes = false; + column_name = + unsafe { std::str::from_utf8_unchecked(&self.remaining[start + 1..i]) }; + in_column_name = false; + start = i + 1; + i += 1; + continue; + } + in_double_quotes = true; + start = i; + } else if c == b'=' && !in_column_name && !in_column_value { + in_column_value = true; + i += 1; + start = i + 1; + } else if nesting_level == 0 && c == b' ' && lookahead == Some(&b'|') { + } else if nesting_level == 0 + && c == b'|' + && lookahead == Some(&b'|') + && !in_single_quotes + { + i += memchr(b'\'', &self.remaining[i + 2..])? - 1; + } else if c == b'\'' && in_column_value { + if !in_special { + start = i; + } + in_single_quotes = true; + } else if c == b',' && !in_column_value && !in_column_name { + in_column_name = true; + i += 1; + start = i; + } else if in_column_value && !in_single_quotes { + if !in_special { + start = i; + in_special = true; + } + + if c == b'(' { + nesting_level += 1; + } else if c == b')' && nesting_level > 0 { + nesting_level -= 1; + } else if (c == b',' || c == b' ' || c == b';') && nesting_level == 0 { + let s = &self.remaining[start..i]; + assert_ne!(s, UNSUPPORTED_TYPE); + assert_ne!(s, UNSUPPORTED); + if s != NULL { + let pos = self.column_indices.get(column_name)?; + values[*pos] = Some(unsafe { std::str::from_utf8_unchecked(s) }.into()); + } + start = i + 1; + in_column_value = false; + in_special = false; + in_column_name = true; + } + } else if !in_double_quotes + && !in_single_quotes + && self.remaining[i..].starts_with(WHERE) + { + break; + } + i += 1; + } + self.remaining = &self.remaining[i..]; + Some(values) + } + + fn parse_where_clause(&mut self) -> Option> { + self.remaining = &self.remaining[memmem::find(self.remaining, WHERE)? + WHERE.len()..]; + + let mut i = 0; + + let mut in_single_quotes = false; + let mut in_double_quotes = false; + let mut quoted_value_contains_quote = false; + let mut in_column_name = true; + let mut in_column_value = false; + let mut in_special = false; + let mut column_name = ""; + let mut start = 0; + let mut nesting_level = 0; + let mut values = vec![None; self.column_indices.len()]; + + while i < self.remaining.len() { + let c = self.remaining[i]; + let lookahead = self.remaining.get(i + 1); + if in_single_quotes { + i += memchr(b'\'', &self.remaining[i..])?; + if *self.remaining.get(i + 1)? == b'\'' { + // Skip the next one + quoted_value_contains_quote = true; + i += 2; + continue; + } + in_single_quotes = false; + if nesting_level == 0 { + let pos = self.column_indices.get(column_name)?; + values[*pos] = Some(if quoted_value_contains_quote { + Cow::Owned( + unsafe { std::str::from_utf8_unchecked(&self.remaining[start + 1..i]) } + .replace("''", "'"), + ) + } else { + Cow::Borrowed(unsafe { + std::str::from_utf8_unchecked(&self.remaining[start + 1..i]) + }) + }); + start = i + 1; + in_column_name = false; + in_column_value = false; + } + i += 1; + continue; + } + + if c == b'"' && in_column_name { + if in_double_quotes { + in_double_quotes = false; + column_name = + unsafe { std::str::from_utf8_unchecked(&self.remaining[start + 1..i]) }; + in_column_name = false; + start = i + 1; + i += 1; + continue; + } + in_double_quotes = true; + start = i; + } else if c == b'=' && !in_column_name && !in_column_value { + in_column_value = true; + i += 1; + start = i + 1; + } else if c == b'I' && !in_column_name && !in_column_value { + if self.remaining[i..].starts_with(IS_NULL) { + i += IS_NULL.len(); + start = i; + continue; + } + } else if nesting_level == 0 && c == b' ' && lookahead == Some(&b'|') { + } else if nesting_level == 0 + && c == b'|' + && lookahead == Some(&b'|') + && !in_single_quotes + { + i += memchr(b'\'', &self.remaining[i + 2..])? - 1; + } else if c == b'\'' && in_column_value { + if !in_special { + start = i; + } + in_single_quotes = true; + } else if c == b',' && !in_column_value && !in_column_name { + in_column_name = true; + i += 1; + start = i; + } else if in_column_value && !in_single_quotes { + if !in_special { + start = i; + in_special = true; + } + + if c == b'(' { + nesting_level += 1; + } else if c == b')' && nesting_level > 0 { + nesting_level -= 1; + } else if (c == b' ' || c == b';') && nesting_level == 0 { + let s = &self.remaining[start..i]; + assert_ne!(s, UNSUPPORTED_TYPE); + assert_ne!(s, UNSUPPORTED); + if s != NULL { + let pos = self.column_indices.get(column_name)?; + values[*pos] = Some(unsafe { std::str::from_utf8_unchecked(s) }.into()); + } + start = i + 1; + in_column_value = false; + in_special = false; + in_column_name = true; + } + } else if !in_column_value && !in_column_name { + if c == b'a' && lookahead == Some(&b'n') && self.remaining[i..].starts_with(AND) { + i += AND.len(); + start = i; + in_column_name = true; + continue; + } else if c == b'o' + && lookahead == Some(&b'r') + && self.remaining[i..].starts_with(OR) + { + // We don't support OR conditions (what would that even mean?) + return None; + } + } + i += 1; + } + Some(values) } } -#[test] -fn test_parse() { - let parser = Parser::new(); - let sql_redo = r#"insert into "HR"."EMPLOYEES" - values - "EMPLOYEE_ID" = 306, - "FIRST_NAME" = 'Nandini', - "LAST_NAME" = 'Shastry', - "EMAIL" = 'NSHASTRY', - "PHONE_NUMBER" = '1234567890', - "JOB_ID" = 'HR_REP', - "SALARY" = 120000, - "COMMISSION_PCT" = .05, - "MANAGER_ID" = 105, - "NULL_FIELD" IS NULL, - "DEPARTMENT_ID" = 10; - "#; - let parsed = parser - .parse(sql_redo, &("HR".to_string(), "EMPLOYEES".to_string())) - .unwrap(); - assert_eq!(parsed.len(), 11); +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_insert() { + let sql = r#"insert into "DOZER"."TEST"("ID","NAME","TS","DATE","C1","C2","N") values ('1','Acme',TO_TIMESTAMP('2020-02-01 00:00:00.'),TO_DATE('2020-02-01 00:00:00','YYYY-MM-DD HH24:MI:SS'),NULL,NULL,'A''B'); + "#; + let column_indices = ["NAME", "ID", "TS", "C1", "C2", "DATE", "N"] + .into_iter() + .enumerate() + .map(|(i, s)| (s.to_owned(), i)) + .collect(); + assert_eq!( + DmlParser::new(sql, &column_indices).parse_insert(), + Some(vec![ + Some("Acme".into()), + Some("1".into()), + Some("TO_TIMESTAMP('2020-02-01 00:00:00.')".into()), + None, + None, + Some("TO_DATE('2020-02-01 00:00:00','YYYY-MM-DD HH24:MI:SS')".into()), + Some("A'B".into()), + ]) + ) + } + + #[test] + fn test_parse_update() { + let string = r#"update "DOZER"."TEST" set "ID" = '1', "NAME" = 'Acme', "TS" = TO_TIMESTAMP('2020-02-01 00:00:00.'), "DATE" = TO_DATE('2020-02-01 00:00:00','YYYY-MM-DD HH24:MI:SS'), "C1" = NULL, "C2" = NULL, "N" = 'A''B' where "ID" = '2' and "NAME" = 'Corp' and "TS" = TO_TIMESTAMP('2020-02-01 00:00:00.') and "DATE" = TO_DATE('2020-02-01 00:00:00','YYYY-MM-DD HH24:MI:SS') and "C1" IS NULL and "C2" IS NULL" and "N" = 'A''B';"#; + let column_indices = ["NAME", "ID", "TS", "C1", "C2", "DATE", "N"] + .into_iter() + .enumerate() + .map(|(i, s)| (s.to_owned(), i)) + .collect(); + assert_eq!( + DmlParser { + remaining: string.as_bytes(), + column_indices: &column_indices + } + .parse_update(), + Some(( + vec![ + Some("Corp".into()), + Some("2".into()), + Some("TO_TIMESTAMP('2020-02-01 00:00:00.')".into()), + None, + None, + Some("TO_DATE('2020-02-01 00:00:00','YYYY-MM-DD HH24:MI:SS')".into()), + Some("A'B".into()), + ], + vec![ + Some("Acme".into()), + Some("1".into()), + Some("TO_TIMESTAMP('2020-02-01 00:00:00.')".into()), + None, + None, + Some("TO_DATE('2020-02-01 00:00:00','YYYY-MM-DD HH24:MI:SS')".into()), + Some("A'B".into()), + ], + )) + ); + } } diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/mod.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/mod.rs index 84f6b9ffeb..53228526db 100644 --- a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/mod.rs +++ b/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/mod.rs @@ -1,60 +1,59 @@ -use std::{collections::HashMap, str::FromStr}; +use std::{borrow::Cow, collections::HashMap}; use dozer_ingestion_connector::dozer_types::{ - chrono::{DateTime, Utc}, log::trace, - rust_decimal::Decimal, + types::{Operation, Schema}, }; +use fxhash::FxHashMap; -use crate::connector::{Error, Scn}; - -use super::aggregate::{Operation, OperationKind, Transaction}; - -#[derive(Debug, Clone)] -pub struct ParsedTransaction { - pub commit_scn: Scn, - pub commit_timestamp: DateTime, - pub operations: Vec, -} - -#[derive(Debug, Clone)] -pub struct ParsedOperation { - pub table_index: usize, - pub kind: ParsedOperationKind, -} +use crate::connector::{ + replicate::transaction::{map::map_row, parse::insert::DmlParser}, + Error, +}; -pub type ParsedRow = HashMap; +use super::{ + aggregate::{OperationKind, RawOperation, Transaction}, + ParsedTransaction, +}; +pub type ParsedRow<'a> = Vec>>; #[derive(Debug, Clone)] -pub enum ParsedOperationKind { - Insert(ParsedRow), - Delete(ParsedRow), - Update { old: ParsedRow, new: ParsedRow }, +struct TableInfo { + index: usize, + column_indices: FxHashMap, + schema: Schema, } - -#[derive(Debug, Clone, PartialEq)] -pub enum ParsedValue { - String(String), - Number(Decimal), - Null, +impl TableInfo { + fn new(index: usize, schema: Schema) -> Self { + let column_indices = schema + .fields + .iter() + .enumerate() + .map(|(i, field)| (field.name.clone(), i)) + .collect(); + Self { + index, + schema, + column_indices, + } + } } #[derive(Debug, Clone)] pub struct Parser { - insert_parser: insert::Parser, - delete_parser: delete::Parser, - update_parser: update::Parser, - table_pair_to_index: HashMap<(String, String), usize>, + table_infos: FxHashMap<(String, String), TableInfo>, } impl Parser { - pub fn new(table_pair_to_index: HashMap<(String, String), usize>) -> Self { - Self { - insert_parser: insert::Parser::new(), - delete_parser: delete::Parser::new(), - update_parser: update::Parser::new(), - table_pair_to_index, - } + pub fn new( + table_pair_to_index: HashMap<(String, String), usize>, + schemas: Vec, + ) -> Self { + let table_infos = table_pair_to_index + .into_iter() + .map(|(k, v)| (k, TableInfo::new(v, schemas[v].clone()))) + .collect(); + Self { table_infos } } pub fn process<'a>( @@ -67,9 +66,9 @@ impl Parser { } } - fn parse(&self, operation: Operation) -> Result, Error> { + fn parse(&self, operation: RawOperation) -> Result, Error> { let table_pair = (operation.seg_owner, operation.table_name); - let Some(&table_index) = self.table_pair_to_index.get(&table_pair) else { + let Some(&table_info) = self.table_infos.get(&table_pair).as_ref() else { trace!( "Ignoring operation on table {}.{}", table_pair.0, @@ -80,19 +79,35 @@ impl Parser { trace!(target: "oracle_replication_parser", "Parsing operation on table {}.{}", table_pair.0, table_pair.1); - let kind = match operation.kind { - OperationKind::Insert => ParsedOperationKind::Insert( - self.insert_parser.parse(&operation.sql_redo, &table_pair)?, - ), - OperationKind::Delete => ParsedOperationKind::Delete( - self.delete_parser.parse(&operation.sql_redo, &table_pair)?, - ), + let mut parser = DmlParser::new(&operation.sql_redo, &table_info.column_indices); + let op = match operation.kind { + OperationKind::Insert => { + let new_values = parser + .parse_insert() + .ok_or_else(|| Error::InsertFailedToMatch(operation.sql_redo.clone()))?; + Operation::Insert { + new: map_row(new_values, &table_info.schema)?, + } + } + OperationKind::Delete => { + let old = parser + .parse_delete() + .ok_or_else(|| Error::DeleteFailedToMatch(operation.sql_redo.clone()))?; + Operation::Delete { + old: map_row(old, &table_info.schema)?, + } + } OperationKind::Update => { - let (old, new) = self.update_parser.parse(&operation.sql_redo, &table_pair)?; - ParsedOperationKind::Update { old, new } + let (old, new) = parser + .parse_update() + .ok_or_else(|| Error::UpdateFailedToMatch(operation.sql_redo.clone()))?; + Operation::Update { + old: map_row(old, &table_info.schema)?, + new: map_row(new, &table_info.schema)?, + } } }; - Ok(Some(ParsedOperation { table_index, kind })) + Ok(Some((table_info.index, op))) } } @@ -125,21 +140,6 @@ impl<'a, I: Iterator> Iterator for Processor<'a, I> { } } -impl FromStr for ParsedValue { - type Err = Error; - - fn from_str(s: &str) -> Result { - if s.starts_with('\'') { - Ok(ParsedValue::String(s[1..s.len() - 1].to_string())) - } else { - Ok(ParsedValue::Number( - s.parse() - .map_err(|e| Error::NumberToDecimal(e, s.to_owned()))?, - )) - } - } -} - mod delete; mod insert; mod row; diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/row.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/row.rs index fcafe6a5ed..8b13789179 100644 --- a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/row.rs +++ b/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/row.rs @@ -1,36 +1 @@ -use std::collections::HashMap; -use regex::Regex; - -use crate::connector::Error; - -use super::{ParsedRow, ParsedValue}; - -#[derive(Debug, Clone)] -pub struct Parser { - regex: Regex, -} - -impl Parser { - pub fn new(delimiter: &str, end: &str) -> Self { - let regex = Regex::new(&format!( - "\"(\\w+)\" (= (.+)|IS NULL)({} *\\n|{})", - delimiter, end - )) - .unwrap(); - Self { regex } - } - - pub fn parse(&self, values: &str) -> Result { - let mut result = HashMap::new(); - for cap in self.regex.captures_iter(values) { - let column = cap.get(1).unwrap().as_str(); - let value = match cap.get(3) { - Some(value) => value.as_str().parse()?, - None => ParsedValue::Null, - }; - result.insert(column.to_string(), value); - } - Ok(result) - } -} diff --git a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/update.rs b/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/update.rs index e963938010..8b13789179 100644 --- a/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/update.rs +++ b/dozer-ingestion/oracle/src/connector/replicate/transaction/parse/update.rs @@ -1,100 +1 @@ -use dozer_ingestion_connector::dozer_types::log::warn; -use regex::Regex; -use crate::connector::Error; - -use super::{row, ParsedRow}; - -#[derive(Debug, Clone)] -pub struct Parser { - regex: Regex, - new_row_parser: row::Parser, - old_row_parser: row::Parser, -} - -impl Parser { - pub fn new() -> Self { - let regex = Regex::new( - r#"^update "((?:C##)?\w+)"\."(\w+)"\n *set *\n *(?s)(.+) *where *\n(?s)(.+)$"#, - ) - .unwrap(); - Self { - regex, - new_row_parser: row::Parser::new(",", "\n"), - old_row_parser: row::Parser::new(" and", ";"), - } - } - - pub fn parse( - &self, - sql_redo: &str, - table_pair: &(String, String), - ) -> Result<(ParsedRow, ParsedRow), Error> { - let captures = self - .regex - .captures(sql_redo) - .ok_or_else(|| Error::UpdateFailedToMatch(sql_redo.to_string()))?; - let owner = captures.get(1).unwrap().as_str(); - let table_name = captures.get(2).unwrap().as_str(); - if owner != table_pair.0 || table_name != table_pair.1 { - warn!( - "Table name {}.{} doesn't match {}.{} in log content", - owner, table_name, table_pair.0, table_pair.1 - ); - } - - let mut new_row = self - .new_row_parser - .parse(captures.get(3).unwrap().as_str())?; - let old_row = self - .old_row_parser - .parse(captures.get(4).unwrap().as_str())?; - for (column, old_value) in old_row.iter() { - if !new_row.contains_key(column) { - new_row.insert(column.clone(), old_value.clone()); - } - } - Ok((old_row, new_row)) - } -} - -#[test] -fn test_parse() { - use super::ParsedValue; - - let parser = Parser::new(); - let sql_redo = r#"update "DOZER"."TRANSACTIONS" - set - "TYPE" = 'REBATE' - where - "TRANSACTION_ID" = 12001 and - "CUSTOMER_ID" = 63147 and - "TYPE" = 'Withdrawal' and - "AMOUNT" = 9691.34 and - "CURRENCY" = 'USD' and - "TRANSACTION_DATE" = '28-JAN-24' and - "STATUS" = 'Completed' and - "DESCRIPTION" = 'Yeah become language inside purpose.'; - "#; - let (old, new) = parser - .parse(sql_redo, &("HR".to_string(), "EMPLOYEES".to_string())) - .unwrap(); - assert_eq!(old.len(), 8); - assert_eq!(new.len(), 8); - assert_eq!( - old.get("TRANSACTION_ID").unwrap(), - &ParsedValue::Number("12001".parse().unwrap()) - ); - assert_eq!( - new.get("TRANSACTION_ID").unwrap(), - &ParsedValue::Number("12001".parse().unwrap()) - ); - assert_eq!( - old.get("TYPE").unwrap(), - &ParsedValue::String("Withdrawal".to_string()) - ); - assert_eq!( - new.get("TYPE").unwrap(), - &ParsedValue::String("REBATE".to_string()) - ); -}