Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize oracle parser #2483

Merged
merged 2 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions dozer-ingestion/oracle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ license = "AGPL-3.0-or-later"

[dependencies]
dozer-ingestion-connector = { path = "../connector" }
fxhash = "0.2.1"
memchr = "2.7.2"
oracle = { version = "0.5.7", features = ["chrono", "stmt_without_lifetime"] }
regex = "1.10.3"

Expand Down
39 changes: 21 additions & 18 deletions dozer-ingestion/oracle/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ use dozer_ingestion_connector::{
log::{debug, error},
models::ingestion_types::{IngestionMessage, OracleReplicator, TransactionInfo},
node::OpIdentifier,
rust_decimal::{self, Decimal},
thiserror,
types::{FieldType, Operation, Schema},
rust_decimal, thiserror,
types::{Operation, Schema},
},
Ingestor, SourceSchema, TableIdentifier, TableInfo,
};
Expand All @@ -33,7 +32,15 @@ pub struct Connector {
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
pub(crate) enum ParseDateError {
#[error("Invalid date format: {0}")]
Chrono(#[from] chrono::ParseError),
#[error("Invalid oracle format")]
Oracle,
}

#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
#[error("oracle error: {0:?}")]
Oracle(#[from] oracle::Error),
#[error("pdb not found: {0}")]
Expand All @@ -58,26 +65,16 @@ pub enum Error {
DeleteFailedToMatch(String),
#[error("update failed to match: {0}")]
UpdateFailedToMatch(String),
#[error("field {0} not found")]
FieldNotFound(String),
#[error("null value for non-nullable field {0}")]
NullValue(String),
#[error("cannot parse float: {0}")]
ParseFloat(#[from] ParseFloatError),
#[error("cannot parse date time from {1}: {0}")]
ParseDateTime(#[source] chrono::ParseError, String),
#[error("got overflow float number {0}")]
FloatOverflow(Decimal),
ParseDateTime(ParseDateError, String),
#[error("got error when parsing uint {0}")]
ParseUIntFailed(Decimal),
ParseUIntFailed(String),
#[error("got error when parsing int {0}")]
ParseIntFailed(Decimal),
#[error("type mismatch for {field}, expected {expected:?}, actual {actual:?}")]
TypeMismatch {
field: String,
expected: FieldType,
actual: FieldType,
},
ParseIntFailed(String),
}

/// `oracle`'s `ToSql` implementation for `&str` uses `NVARCHAR2` type, which Oracle expects to be UTF16 encoded by default.
Expand Down Expand Up @@ -377,7 +374,13 @@ impl Connector {
})
};

for transaction in processor.process(receiver) {
let mut recv = receiver.into_iter();
let first = processor
.process(recv.by_ref())
.find(|op| !op.as_ref().unwrap().operations.is_empty())
.unwrap()
.unwrap();
for transaction in recv.map(|_| Ok::<_, Error>(first.clone())) {
let transaction = match transaction {
Ok(transaction) => transaction,
Err(e) => {
Expand Down
2 changes: 2 additions & 0 deletions dozer-ingestion/oracle/src/connector/replicate/log/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::time::Instant;
use std::{sync::mpsc::SyncSender, time::Duration};

use dozer_ingestion_connector::dozer_types::log::debug;
Expand Down Expand Up @@ -31,6 +32,7 @@ pub struct LogManagerContent {
pub rbasqn: u32,
pub sql_redo: Option<String>,
pub csf: u8,
pub received: Instant,
}

/// `ingestor` is only used for checking if ingestion has ended so we can break the loop.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::env;
use std::{env, time::Instant};

use dozer_ingestion_connector::dozer_types::{
chrono::{DateTime, Utc},
Expand All @@ -11,12 +11,12 @@ use crate::connector::{Error, Scn};
use super::{LogManagerContent, RedoReader};

#[derive(Debug, Clone, Copy)]
pub struct LogMiner {
pub(crate) struct LogMiner {
pub fetch_batch_size: u32,
}

#[derive(Debug)]
pub struct LogMinerIter<'a> {
pub(crate) struct LogMinerIter<'a> {
result_set: ResultSet<'a, LogManagerContent>,
connection: &'a Connection,
}
Expand Down Expand Up @@ -62,7 +62,6 @@ impl RedoReader for LogMiner {
STARTSCN => :start_scn,
OPTIONS =>
DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG +
DBMS_LOGMNR.PRINT_PRETTY_SQL +
DBMS_LOGMNR.NO_ROWID_IN_STMT
);
END;";
Expand All @@ -74,7 +73,6 @@ impl RedoReader for LogMiner {
DBMS_LOGMNR.START_LOGMNR(
OPTIONS =>
DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG +
DBMS_LOGMNR.PRINT_PRETTY_SQL +
DBMS_LOGMNR.NO_ROWID_IN_STMT
);
END;";
Expand Down Expand Up @@ -162,6 +160,7 @@ impl RowValue for LogManagerContent {
rbasqn,
sql_redo,
csf,
received: Instant::now(),
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use oracle::Connection;
use crate::connector::{Error, Scn};

/// Given a log file name, a redo reader emits `LogManagerContent` rows
pub trait RedoReader {
pub(crate) trait RedoReader {
type Iterator<'a>: Iterator<Item = Result<LogManagerContent, Error>>;

/// Reads the `LogManagerContent` rows that have:
Expand All @@ -20,6 +20,6 @@ pub trait RedoReader {

mod log_miner;

pub use log_miner::LogMiner;
pub(crate) use log_miner::LogMiner;

use super::LogManagerContent;
2 changes: 1 addition & 1 deletion dozer-ingestion/oracle/src/connector/replicate/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod log;
mod transaction;
pub mod transaction;

pub use log::log_miner_loop;
pub use transaction::Processor;
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use crate::connector::{
pub struct Transaction {
pub commit_scn: Scn,
pub commit_timestamp: DateTime<Utc>,
pub operations: Vec<Operation>,
pub operations: Vec<RawOperation>,
}

#[derive(Debug, Clone)]
pub struct Operation {
pub struct RawOperation {
pub seg_owner: String,
pub table_name: String,
pub kind: OperationKind,
Expand Down Expand Up @@ -52,7 +52,7 @@ impl Aggregator {
}
}

type TransactionForest = forest::Forest<TransactionId, Vec<Operation>>;
type TransactionForest = forest::Forest<TransactionId, Vec<RawOperation>>;

#[derive(Debug)]
struct Processor<I: Iterator<Item = LogManagerContent>> {
Expand Down Expand Up @@ -120,7 +120,7 @@ impl<I: Iterator<Item = LogManagerContent>> Iterator for Processor<I> {
op::process_operation(
content.xid,
content.pxid,
Operation {
RawOperation {
seg_owner,
table_name,
kind,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use dozer_ingestion_connector::dozer_types::log::warn;

use crate::connector::replicate::log::TransactionId;

use super::{Operation, TransactionForest};
use super::{RawOperation, TransactionForest};

pub fn process_operation(
xid: TransactionId,
pxid: TransactionId,
operation: Operation,
operation: RawOperation,
transaction_forest: &mut TransactionForest,
) {
if xid == pxid {
Expand Down
Loading
Loading