diff --git a/examples/cohort_banking/examples/cohort_banking.rs b/examples/cohort_banking/examples/cohort_banking.rs index 46081823..c02d6c3f 100644 --- a/examples/cohort_banking/examples/cohort_banking.rs +++ b/examples/cohort_banking/examples/cohort_banking.rs @@ -8,10 +8,7 @@ use cohort::{ replicator::{ core::{Replicator, ReplicatorCandidate}, pg_replicator_installer::PgReplicatorStatemapInstaller, - services::{ - replicator_service::replicator_service, - statemap_installer_service::{installation_service, installer_queue_service}, - }, + services::{replicator_service::replicator_service, statemap_installer_service::installation_service, statemap_queue_service::statemap_queue_service}, utils::installer_utils::get_snapshot_callback, }, snapshot_api::SnapshotApi, @@ -225,7 +222,7 @@ async fn start_replicator( // let future_installer = installer_service(rx_install_req, tx_install_resp, installer); let get_snapshot_fn = get_snapshot_callback(SnapshotApi::query(database.clone())); - let future_installer_queue = installer_queue_service(rx_install_req, rx_installation_feedback_req, tx_installation_req, get_snapshot_fn); + let future_installer_queue = statemap_queue_service(rx_install_req, rx_installation_feedback_req, tx_installation_req, get_snapshot_fn); let future_installation = installation_service(tx_install_resp, Arc::new(installer), rx_installation_req, tx_installation_feedback_req); let h_replicator = tokio::spawn(future_replicator); diff --git a/packages/cohort/src/bin/replicator.rs b/packages/cohort/src/bin/replicator.rs index 7c691f3e..401380c2 100644 --- a/packages/cohort/src/bin/replicator.rs +++ b/packages/cohort/src/bin/replicator.rs @@ -7,10 +7,7 @@ use cohort::{ replicator::{ core::{Replicator, ReplicatorCandidate}, pg_replicator_installer::PgReplicatorStatemapInstaller, - services::{ - replicator_service::replicator_service, - statemap_installer_service::{installation_service, installer_queue_service}, - }, + services::{replicator_service::replicator_service, statemap_installer_service::installation_service, statemap_queue_service::statemap_queue_service}, utils::installer_utils::get_snapshot_callback, }, snapshot_api::SnapshotApi, @@ -75,7 +72,7 @@ async fn main() { let replicator_service = replicator_service(statemap_installer_tx, replicator_rx, replicator); let get_snapshot_fn = get_snapshot_callback(SnapshotApi::query(database.clone())); - let future_installer_queue = installer_queue_service(statemap_installer_rx, rx_installation_feedback_req, tx_installation_req, get_snapshot_fn); + let future_installer_queue = statemap_queue_service(statemap_installer_rx, rx_installation_feedback_req, tx_installation_req, get_snapshot_fn); let future_installation = installation_service( replicator_tx, Arc::new(pg_statemap_installer), diff --git a/packages/cohort/src/replicator/core.rs b/packages/cohort/src/replicator/core.rs index 9ec14e82..cb37496b 100644 --- a/packages/cohort/src/replicator/core.rs +++ b/packages/cohort/src/replicator/core.rs @@ -14,6 +14,27 @@ use super::{ utils::replicator_utils::{get_filtered_batch, get_statemap_from_suffix_items}, }; +#[derive(Debug, Clone, PartialEq)] +pub enum StatemapInstallState { + Awaiting, + Inflight, + Installed, +} +#[derive(Debug, Clone)] +pub struct StatemapInstallerHashmap { + pub statemaps: Vec, + pub version: u64, + pub safepoint: Option, + pub state: StatemapInstallState, +} + +#[derive(Debug)] +pub enum StatemapInstallationStatus { + Success(u64), + GaveUp(u64), + Error(u64, String), +} + #[derive(Debug)] pub enum ReplicatorChannel { InstallationSuccess(Vec), diff --git a/packages/cohort/src/replicator/services/mod.rs b/packages/cohort/src/replicator/services/mod.rs index cc63915a..9dccc34f 100644 --- a/packages/cohort/src/replicator/services/mod.rs +++ b/packages/cohort/src/replicator/services/mod.rs @@ -1,2 +1,3 @@ pub mod replicator_service; pub mod statemap_installer_service; +pub mod statemap_queue_service; diff --git a/packages/cohort/src/replicator/services/statemap_installer_service.rs b/packages/cohort/src/replicator/services/statemap_installer_service.rs index c590a355..007e9ebf 100644 --- a/packages/cohort/src/replicator/services/statemap_installer_service.rs +++ b/packages/cohort/src/replicator/services/statemap_installer_service.rs @@ -1,234 +1,18 @@ // $coverage:ignore-start -use std::{ - sync::Arc, - time::{Duration, Instant}, -}; +use std::{sync::Arc, time::Instant}; -use crate::replicator::{ - core::{ReplicatorChannel, ReplicatorInstallStatus, ReplicatorInstaller, StatemapItem}, - utils::installer_utils::StatemapInstallerQueue, -}; +use crate::replicator::core::{ReplicatorChannel, ReplicatorInstallStatus, ReplicatorInstaller, StatemapInstallationStatus, StatemapItem}; -use futures::Future; -use log::{debug, error, info}; -use rayon::prelude::*; -use time::OffsetDateTime; +use log::{debug, error}; use tokio::sync::{mpsc, Semaphore}; -#[derive(Debug, Clone, PartialEq)] -pub enum StatemapInstallState { - Awaiting, - Inflight, - Installed, -} -#[derive(Debug, Clone)] -pub struct StatemapInstallerHashmap { - pub statemaps: Vec, - pub version: u64, - pub safepoint: Option, - pub state: StatemapInstallState, -} - -#[derive(Debug)] -pub enum StatemapInstallationStatus { - Success(u64), - GaveUp(u64), - Error(u64, String), -} - -pub async fn installer_queue_service( - mut statemaps_rx: mpsc::Receiver>, - mut statemap_installation_rx: mpsc::Receiver, - installation_tx: mpsc::Sender<(u64, Vec)>, - // Get snapshot callback fn - get_snapshot_fn: impl Future>, -) -> Result<(), String> { - info!("Starting Installer Queue Service.... "); - // let last_installed = 0_u64; - // let mut statemap_queue = IndexMap::::default(); - let mut interval = tokio::time::interval(Duration::from_millis(10_000)); - - let mut installation_success_count = 0; - let mut installation_gaveup = 0; - let mut send_for_install_count = 0; - let mut first_install_start: i128 = 0; // - let mut last_install_end: i128 = 0; // = OffsetDateTime::now_utc().unix_timestamp_nanos(); - - let mut statemap_installer_queue = StatemapInstallerQueue::default(); - - //Gets snapshot initial version from db. - statemap_installer_queue.update_snapshot(get_snapshot_fn.await.unwrap_or(0)); - - let mut last_item_send_for_install = 0; - - let mut prev_last_item_send_for_install = 0; - let mut hang_state_try = 5; - loop { - let enable_extra_logging = hang_state_try == 0; - tokio::select! { - statemap_batch_option = statemaps_rx.recv() => { - - if let Some(statemaps) = statemap_batch_option { - - let ver = statemaps.first().unwrap().version; - // let last_version = batch.last().unwrap(); - - // let start_time_insert = Instant::now(); - // Inserts the statemaps to the map - // for (ver, statemap_batch) in batch { - - let safepoint = if let Some(first_statemap) = statemaps.first() { - first_statemap.safepoint - } else { - None - }; - statemap_installer_queue.insert_queue_item(&ver, StatemapInstallerHashmap { statemaps, version: ver, safepoint, state: StatemapInstallState::Awaiting }); - - // } - // let insert_elapsed = start_time_insert.elapsed(); - - // Gets the statemaps to send for installation. - // let start_time_create_install_items = Instant::now(); - let items_to_install: Vec = statemap_installer_queue.get_versions_to_install(); - // let end_time_install_items = start_time_create_install_items.elapsed(); - - - // let start_time_send = Instant::now(); - // let install_len = items_to_install.len(); - - // Sends for installation. - for key in items_to_install { - // Send for installation - // warn!("Sending... {key}"); - if send_for_install_count == 0 { - first_install_start = OffsetDateTime::now_utc().unix_timestamp_nanos(); - } - send_for_install_count += 1; - installation_tx.send((key, statemap_installer_queue.queue.get(&key).unwrap().statemaps.clone())).await.unwrap(); - - last_item_send_for_install = key; - - // Update the status flag - statemap_installer_queue.update_queue_item_state(&key, StatemapInstallState::Inflight); - } - // let end_time_send = start_time_send.elapsed(); - - // error!("Time taken to create versions from={first_version} to={last_version} in {insert_elapsed:?}. Created batch of {install_len} in {end_time_install_items:?} and send with update items to inflight in {end_time_send:?}"); - } - } - Some(install_result) = statemap_installation_rx.recv() => { - match install_result { - StatemapInstallationStatus::Success(key) => { - // let start_time_success = Instant::now(); - - // installed successfully and will remove the item - // statemap_queue.shift_remove(&key); - statemap_installer_queue.update_queue_item_state(&key, StatemapInstallState::Installed); - - // let index = statemap_queue.get_index_of(&key).unwrap(); - - if let Some(last_contiguous_install_item) = statemap_installer_queue.queue.iter().take_while(|item| item.1.state == StatemapInstallState::Installed).last(){ - statemap_installer_queue.update_snapshot(last_contiguous_install_item.1.version) ; - }; - - - installation_success_count += 1; - last_install_end = OffsetDateTime::now_utc().unix_timestamp_nanos(); - // error!("Installed successfully version={key} and total_installs={install_count}"); - // let end_time_success = start_time_success.elapsed(); - // error!("(Statemap successfully installed) for version={key} in {end_time_success:?}"); - }, - StatemapInstallationStatus::GaveUp(_) => { - installation_gaveup += 1; - last_install_end = OffsetDateTime::now_utc().unix_timestamp_nanos(); - }, - StatemapInstallationStatus::Error(ver, error) => { - error!("Failed to install version={ver} due to error={error:?}"); - let items_in_flight: Vec<&StatemapInstallerHashmap> = statemap_installer_queue.queue - .par_values() - // Picking waiting items - .filter_map(|v| { - if v.state == StatemapInstallState::Inflight || v.state == StatemapInstallState::Installed { - Some(v) - } else { - None - } - }).collect(); - panic!("[Panic Panic Panic] panic for ver={ver} with error={error} \n\n\n Items still in statemap \n\n\n {items_in_flight:#?}"); - }, - } - } - _ = interval.tick() => { - - // let installed_keys = statemap_queue.par_values().filter_map(|v| {if v.status == StatemapInstallState::Installed { - // Some(v.version) - // } else { - // None - // }}) - // .collect::>(); - - let duration_sec = Duration::from_nanos((last_install_end - first_install_start) as u64).as_secs_f32(); - let tps = installation_success_count as f32 / duration_sec; - - let awaiting_count = statemap_installer_queue.queue.values().filter(|v| v.state == StatemapInstallState::Awaiting).count(); - if enable_extra_logging { - - let awaiting_install_versions: Vec<(u64, Option)> = statemap_installer_queue.queue.values().filter_map(|v| { if v.state == StatemapInstallState::Awaiting { - Some((v.version, v.safepoint)) - } else { - None - }}).collect(); - - error!("Awaiting (versions, safepoint) tuple to install .... \n {awaiting_install_versions:#?}"); - - } - - if prev_last_item_send_for_install == last_item_send_for_install { - hang_state_try -= 1; - } else { - hang_state_try = 5; - } - - let inflight_count = statemap_installer_queue.queue.values().filter(|v| v.state == StatemapInstallState::Inflight).count(); - error!("Currently Statemap installation tps={tps:.3}"); - error!(" - Statemap Installer Queue Stats: - tps : {tps:.3} - counts : - | success={installation_success_count} - | gaveup={installation_gaveup} - | awaiting_installs={awaiting_count} - | inflight_count={inflight_count} - | installation_gaveup={installation_gaveup} - current snapshot: {} - last vers send to install : {last_item_send_for_install} - \n ", statemap_installer_queue.snapshot_version); - // awaiting_install_versions - // {awaiting_install_versions:?} - - statemap_installer_queue.cleanup_queue(); - - prev_last_item_send_for_install = last_item_send_for_install; - - // installed_keys.iter().for_each(|key| { - // statemap_queue.shift_remove(key); - // }) - } - - } - } -} - pub async fn installation_service( replicator_tx: mpsc::Sender, statemap_installer: Arc, mut installation_rx: mpsc::Receiver<(u64, Vec)>, statemap_installation_tx: mpsc::Sender, -) -> Result<(), String> -// where -// T: ReplicatorInstaller, -{ +) -> Result<(), String> { // TODO: Pass the number of permits over an environment variable? let permit_count = 50; let semaphore = Arc::new(Semaphore::new(permit_count)); @@ -247,12 +31,7 @@ pub async fn installation_service( debug!("[Statemap Installer Service] Received statemap batch ={statemaps:?} and version={ver:?}"); let start_installation_time = Instant::now(); - // TODO: Implement proper logic to update the snapshot - let snapshot_version_to_update = Some(ver); //if ver % 5 == 0 { Some(ver) } else { None }; - - // error!("Going to install statemaps for version={ver} with snapshot={snapshot_version_to_update:?}"); - - match installer.install(statemaps, snapshot_version_to_update).await { + match installer.install(statemaps, Some(ver)).await { Ok(status) => { // let end_installation_time = start_installation_time.elapsed(); // error!("[installation_service] Installed successfully version={ver} in {end_installation_time:?}"); @@ -267,12 +46,11 @@ pub async fn installation_service( } drop(permit); - // return Ok(()); } Err(err) => { error!( - "Installed failed for version={snapshot_version_to_update:?} with time={:?} error={err:?}", + "Installed failed for version={ver:?} with time={:?} error={err:?}", start_installation_time.elapsed() ); replicator_tx_clone @@ -287,8 +65,6 @@ pub async fn installation_service( .await .unwrap(); drop(permit); - - // return Err(err.to_string()); } }; }); diff --git a/packages/cohort/src/replicator/services/statemap_queue_service.rs b/packages/cohort/src/replicator/services/statemap_queue_service.rs new file mode 100644 index 00000000..aaa46fa0 --- /dev/null +++ b/packages/cohort/src/replicator/services/statemap_queue_service.rs @@ -0,0 +1,146 @@ +// $coverage:ignore-start + +use std::time::Duration; + +use futures::Future; +use log::{error, info}; +use rayon::prelude::ParallelIterator; +use time::OffsetDateTime; +use tokio::sync::mpsc; + +use crate::replicator::{ + core::{StatemapInstallState, StatemapInstallationStatus, StatemapInstallerHashmap, StatemapItem}, + utils::installer_utils::StatemapInstallerQueue, +}; + +pub async fn statemap_queue_service( + mut statemaps_rx: mpsc::Receiver>, + mut statemap_installation_rx: mpsc::Receiver, + installation_tx: mpsc::Sender<(u64, Vec)>, + // Get snapshot callback fn + get_snapshot_fn: impl Future>, +) -> Result<(), String> { + info!("Starting Installer Queue Service.... "); + // let last_installed = 0_u64; + // let mut statemap_queue = IndexMap::::default(); + let mut interval = tokio::time::interval(Duration::from_millis(10_000)); + + let mut installation_success_count = 0; + let mut installation_gaveup = 0; + let mut send_for_install_count = 0; + let mut first_install_start: i128 = 0; // + let mut last_install_end: i128 = 0; // = OffsetDateTime::now_utc().unix_timestamp_nanos(); + + let mut statemap_installer_queue = StatemapInstallerQueue::default(); + + //Gets snapshot initial version from db. + statemap_installer_queue.update_snapshot(get_snapshot_fn.await.unwrap_or(0)); + + let mut last_item_send_for_install = 0; + + loop { + tokio::select! { + statemap_batch_option = statemaps_rx.recv() => { + + if let Some(statemaps) = statemap_batch_option { + + let ver = statemaps.first().unwrap().version; + // Inserts the statemaps to the map + + let safepoint = if let Some(first_statemap) = statemaps.first() { + first_statemap.safepoint + } else { + None + }; + statemap_installer_queue.insert_queue_item(&ver, StatemapInstallerHashmap { statemaps, version: ver, safepoint, state: StatemapInstallState::Awaiting }); + + // Gets the statemaps to send for installation. + let items_to_install: Vec = statemap_installer_queue.get_versions_to_install(); + + // Sends for installation. + for key in items_to_install { + // Send for installation + // warn!("Sending... {key}"); + if send_for_install_count == 0 { + first_install_start = OffsetDateTime::now_utc().unix_timestamp_nanos(); + } + send_for_install_count += 1; + installation_tx.send((key, statemap_installer_queue.queue.get(&key).unwrap().statemaps.clone())).await.unwrap(); + + last_item_send_for_install = key; + + // Update the status flag + statemap_installer_queue.update_queue_item_state(&key, StatemapInstallState::Inflight); + } + } + } + Some(install_result) = statemap_installation_rx.recv() => { + match install_result { + StatemapInstallationStatus::Success(key) => { + // let start_time_success = Instant::now(); + + // installed successfully and will remove the item + // statemap_queue.shift_remove(&key); + statemap_installer_queue.update_queue_item_state(&key, StatemapInstallState::Installed); + + // let index = statemap_queue.get_index_of(&key).unwrap(); + + if let Some(last_contiguous_install_item) = statemap_installer_queue.queue.iter().take_while(|item| item.1.state == StatemapInstallState::Installed).last(){ + statemap_installer_queue.update_snapshot(last_contiguous_install_item.1.version) ; + }; + + + installation_success_count += 1; + last_install_end = OffsetDateTime::now_utc().unix_timestamp_nanos(); + // error!("Installed successfully version={key} and total_installs={install_count}"); + // let end_time_success = start_time_success.elapsed(); + // error!("(Statemap successfully installed) for version={key} in {end_time_success:?}"); + }, + StatemapInstallationStatus::GaveUp(_) => { + installation_gaveup += 1; + last_install_end = OffsetDateTime::now_utc().unix_timestamp_nanos(); + }, + StatemapInstallationStatus::Error(ver, error) => { + error!("Failed to install version={ver} due to error={error:?}"); + let items_in_flight: Vec<&StatemapInstallerHashmap> = statemap_installer_queue.queue + .par_values() + // Picking waiting items + .filter_map(|v| { + if v.state == StatemapInstallState::Inflight || v.state == StatemapInstallState::Installed { + Some(v) + } else { + None + } + }).collect(); + panic!("[Panic Panic Panic] panic for ver={ver} with error={error} \n\n\n Items still in statemap \n\n\n {items_in_flight:#?}"); + }, + } + } + _ = interval.tick() => { + + let duration_sec = Duration::from_nanos((last_install_end - first_install_start) as u64).as_secs_f32(); + let tps = installation_success_count as f32 / duration_sec; + + let awaiting_count = statemap_installer_queue.filter_items_by_state(StatemapInstallState::Awaiting).count(); + let inflight_count = statemap_installer_queue.filter_items_by_state(StatemapInstallState::Inflight).count(); + error!("Currently Statemap installation tps={tps:.3}"); + error!(" + Statemap Installer Queue Stats: + tps : {tps:.3} + counts : + | success={installation_success_count} + | gaveup={installation_gaveup} + | awaiting_installs={awaiting_count} + | inflight_count={inflight_count} + | installation_gaveup={installation_gaveup} + current snapshot: {} + last vers send to install : {last_item_send_for_install} + \n ", statemap_installer_queue.snapshot_version); + + statemap_installer_queue.remove_installed(); + } + + } + } +} +// $coverage:ignore-end diff --git a/packages/cohort/src/replicator/utils/installer_utils.rs b/packages/cohort/src/replicator/utils/installer_utils.rs index cd5e390a..7439f5bd 100644 --- a/packages/cohort/src/replicator/utils/installer_utils.rs +++ b/packages/cohort/src/replicator/utils/installer_utils.rs @@ -4,7 +4,7 @@ use indexmap::IndexMap; use crate::{ model::snapshot::Snapshot, - replicator::services::statemap_installer_service::{StatemapInstallState, StatemapInstallerHashmap}, + replicator::core::{StatemapInstallState, StatemapInstallerHashmap}, }; /// Callback fn used in the `installer_queue_service` to retrieve the current snapshot. @@ -34,7 +34,7 @@ impl StatemapInstallerQueue { }; } - pub fn cleanup_queue(&mut self) -> Option { + pub fn remove_installed(&mut self) -> Option { let Some(index) = self.queue.get_index_of(&self.snapshot_version) else { return None;}; let items = self.queue.drain(..index); @@ -42,34 +42,35 @@ impl StatemapInstallerQueue { Some(items.count() as u64) } + /// Filter items in queue based on the `StatemapInstallState` + pub fn filter_items_by_state(&self, state: StatemapInstallState) -> impl Iterator { + self.queue.values().filter(move |&x| x.state == state) + } + pub fn get_versions_to_install(&self) -> Vec { - self.queue - .values() - // Picking waiting items - .filter(|v| v.state == StatemapInstallState::Awaiting) + self + // Get items in awaiting + .filter_items_by_state(StatemapInstallState::Awaiting) + // Get items whose safepoint is below the snapshot. .take_while(|v| { // If no safepoint, this could be a abort item and is safe to install as statemap will be empty. let Some(safepoint) = v.safepoint else { - return true; - }; - - if self.snapshot_version >= safepoint { return true; }; - false + self.snapshot_version >= safepoint }) // filter out the ones that can't be serialized .filter_map(|v| { // If no safepoint, this could be a abort item and is safe to install as statemap will be empty. let Some(safepoint) = v.safepoint else { - return Some(v.version); - }; + return Some(v.version); + }; // If there is no version matching the safepoint, then it is safe to install let Some(safepoint_pointing_item) = self.queue.get(&safepoint) else { - return Some(v.version); - }; + return Some(v.version); + }; if safepoint_pointing_item.state == StatemapInstallState::Installed { return Some(v.version); }; @@ -81,42 +82,3 @@ impl StatemapInstallerQueue { .collect::>() } } - -pub fn get_install_versions_in_queue(queue: &IndexMap, snapshot_version: &u64) -> Vec { - queue - .values() - // Picking waiting items - .filter(|v| v.state == StatemapInstallState::Awaiting) - .take_while(|v| { - // If no safepoint, this could be a abort item and is safe to install as statemap will be empty. - let Some(safepoint) = v.safepoint else { - return true; - }; - - if snapshot_version >= &safepoint { - return true; - }; - - false - }) - // filter out the ones that can't be serialized - .filter_map(|v| { - // If no safepoint, this could be a abort item and is safe to install as statemap will be empty. - let Some(safepoint) = v.safepoint else { - return Some(v.version); - }; - - // If there is no version matching the safepoint, then it is safe to install - let Some(safepoint_pointing_item) = queue.get(&safepoint) else { - return Some(v.version); - }; - if safepoint_pointing_item.state == StatemapInstallState::Installed { - return Some(v.version); - }; - // error!("[items_to_install] Not picking {} as safepoint={safepoint} criteria failed against={:?}", v.version, statemap_queue.get(&safepoint)); - - None - }) - // take the remaining we can install - .collect::>() -}