From 90e3356630f9bd8fa4651a1aa06413e588313189 Mon Sep 17 00:00:00 2001 From: Marek Fedorovic Date: Wed, 26 Jul 2023 16:53:17 +1000 Subject: [PATCH] feat: Improve workload generator, use history to avoiud duplicates --- Cargo.lock | 1 + examples/agent_client/Cargo.toml | 1 + .../agent_client/examples/agent_client.rs | 42 +++++---- .../examples/cohort_banking.rs | 90 +++++++++++-------- .../examples/cohort_banking_with_sdk.rs | 86 ++++++++++-------- .../src/load_generator/generator.rs | 9 +- .../src/load_generator/models.rs | 4 + 7 files changed, 141 insertions(+), 92 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d4abace9..bad01c35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,7 @@ dependencies = [ "env_logger", "examples_support", "log", + "rand", "rdkafka 0.29.0", "rdkafka-sys", "serde", diff --git a/examples/agent_client/Cargo.toml b/examples/agent_client/Cargo.toml index 07e9c459..4b117068 100644 --- a/examples/agent_client/Cargo.toml +++ b/examples/agent_client/Cargo.toml @@ -12,6 +12,7 @@ async-channel = { version = "1.8.0" } async-trait = { workspace = true } env_logger = { workspace = true } log = { workspace = true } +rand = { version = "0.8.5" } rdkafka = { version = "0.29.0", features = ["sasl"] } rdkafka-sys = { version = "4.3.0" } serde = { workspace = true } diff --git a/examples/agent_client/examples/agent_client.rs b/examples/agent_client/examples/agent_client.rs index 6fefa040..327e0c50 100644 --- a/examples/agent_client/examples/agent_client.rs +++ b/examples/agent_client/examples/agent_client.rs @@ -1,6 +1,6 @@ use async_channel::Receiver; use examples_support::load_generator::generator::ControlledRateLoadGenerator; -use examples_support::load_generator::models::StopType; +use examples_support::load_generator::models::{Generator, StopType}; use std::num::ParseIntError; use std::{env, sync::Arc, time::Duration}; @@ -57,7 +57,7 @@ async fn certify() -> Result<(), String> { let h_workload_generator = tokio::spawn(async move { let params = params.clone(); - ControlledRateLoadGenerator::generate::(params.stop_type, params.target_rate as f32, &create_new_candidate, tx_generated).await + ControlledRateLoadGenerator::generate(params.stop_type, params.target_rate as f32, RequestGenerator {}, tx_generated).await }); let all_async_services = tokio::spawn(async move { @@ -264,23 +264,6 @@ async fn make_agent(params: LaunchParams) -> impl TalosAgent { agent } -fn create_new_candidate() -> CertificationRequest { - let tx_data = CandidateData { - xid: Uuid::new_v4().to_string(), - readset: Vec::new(), - readvers: Vec::new(), - snapshot: 5, - writeset: Vec::from(["3".to_string()]), - statemap: None, - }; - - CertificationRequest { - message_key: "12345".to_string(), - candidate: tx_data, - timeout: None, // this will use the default global value as defined in AgentConfig - } -} - fn create_stop_controller(params: LaunchParams, queue: Arc>) -> JoinHandle> { tokio::spawn(async move { let mut remaining_checks = params.stop_max_empty_checks; @@ -367,3 +350,24 @@ async fn get_params() -> Result { }) } } + +struct RequestGenerator {} + +impl Generator for RequestGenerator { + fn generate(&mut self) -> CertificationRequest { + let tx_data = CandidateData { + xid: Uuid::new_v4().to_string(), + readset: Vec::new(), + readvers: Vec::new(), + snapshot: 5, + writeset: Vec::from(["3".to_string()]), + statemap: None, + }; + + CertificationRequest { + message_key: "12345".to_string(), + candidate: tx_data, + timeout: None, // this will use the default global value as defined in AgentConfig + } + } +} diff --git a/examples/cohort_banking_example/examples/cohort_banking.rs b/examples/cohort_banking_example/examples/cohort_banking.rs index 1fc5a290..e1f506db 100644 --- a/examples/cohort_banking_example/examples/cohort_banking.rs +++ b/examples/cohort_banking_example/examples/cohort_banking.rs @@ -14,7 +14,10 @@ use cohort::{ state::postgres::{data_access::PostgresApi, database::Database}, }; -use examples_support::load_generator::{generator::ControlledRateLoadGenerator, models::StopType}; +use examples_support::load_generator::{ + generator::ControlledRateLoadGenerator, + models::{Generator, StopType}, +}; use metrics::model::{MicroMetrics, MinMax}; use rand::Rng; use rust_decimal::{prelude::FromPrimitive, Decimal}; @@ -29,6 +32,7 @@ type HeartBeatReceiver = tokio::sync::watch::Receiver; #[derive(Clone)] struct LaunchParams { + accounts: u64, stop_type: StopType, target_rate: f32, threads: u64, @@ -49,7 +53,12 @@ async fn main() -> Result<(), String> { let rx_queue = Arc::new(rx_queue); let rx_queue_ref = Arc::clone(&rx_queue); - let generator = ControlledRateLoadGenerator::generate(params.stop_type, params.target_rate, &create_transfer_request, Arc::new(tx_queue)); + let generator_impl = TransferRequestGenerator { + available_accounts: params.accounts, + generated: Vec::new(), + }; + + let generator = ControlledRateLoadGenerator::generate(params.stop_type, params.target_rate, generator_impl, Arc::new(tx_queue)); let h_generator = tokio::spawn(generator); let (tx_metrics, rx_metrics) = async_channel::unbounded::(); @@ -220,40 +229,6 @@ async fn start_replicator( (h_replicator, h_installer, rx_heartbeat) } -fn create_transfer_request() -> TransferRequest { - let mut available_accounts = 0_u64; - let args: Vec = env::args().collect(); - if args.len() >= 2 { - let mut i = 1; - while i < args.len() { - let param_name = &args[i]; - if param_name.eq("--accounts") { - let param_value = &args[i + 1]; - available_accounts = param_value.parse().unwrap(); - } - i += 2; - } - } - - let mut rnd = rand::thread_rng(); - let mut to; - - let from = rnd.gen_range(1..=available_accounts); - loop { - to = rnd.gen_range(1..=available_accounts); - if to == from { - continue; - } - break; - } - - TransferRequest { - from: format!("{:<04}", from), - to: format!("{:<04}", to), - amount: Decimal::from_f32(1.0).unwrap(), - } -} - async fn get_params() -> Result { let args: Vec = env::args().collect(); let mut threads: Option = Some(1); @@ -322,6 +297,7 @@ async fn get_params() -> Result { Err("Parameter --rate is required".into()) } else { Ok(LaunchParams { + accounts: accounts.unwrap(), target_rate: target_rate.unwrap(), stop_type: stop_type.unwrap(), threads: threads.unwrap(), @@ -331,3 +307,45 @@ async fn get_params() -> Result { }) } } + +struct TransferRequestGenerator { + available_accounts: u64, + generated: Vec<(u64, u64)>, +} + +impl Generator for TransferRequestGenerator { + fn generate(&mut self) -> TransferRequest { + let mut rnd = rand::thread_rng(); + let mut to; + + let from = rnd.gen_range(1..=self.available_accounts); + loop { + to = rnd.gen_range(1..=self.available_accounts); + if to == from { + continue; + } + + let result = self + .generated + .iter() + .find(|(past_from, past_to)| *past_from == from && *past_to == to || *past_from == to && *past_to == from); + + if result.is_none() { + if self.generated.len() < 100 { + self.generated.push((from, to)); + } else { + self.generated.remove(0); + self.generated.insert(0, (from, to)); + } + + break; + } + } + + TransferRequest { + from: format!("{:<04}", from), + to: format!("{:<04}", to), + amount: Decimal::from_f32(1.0).unwrap(), + } + } +} diff --git a/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs b/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs index ecb1acdb..e3fb5aed 100644 --- a/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs +++ b/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs @@ -4,6 +4,7 @@ use std::{collections::HashMap, env, sync::Arc, time::Duration}; use async_channel::Receiver; use cohort_banking::{app::BankingApp, examples_support::queue_processor::QueueProcessor, model::requests::TransferRequest}; use cohort_sdk::model::Config; +use examples_support::load_generator::models::Generator; use examples_support::load_generator::{generator::ControlledRateLoadGenerator, models::StopType}; use opentelemetry_api::KeyValue; @@ -23,6 +24,7 @@ struct LaunchParams { stop_type: StopType, target_rate: f32, threads: u64, + accounts: u64, } #[tokio::main] @@ -36,7 +38,12 @@ async fn main() -> Result<(), String> { let rx_queue = Arc::new(rx_queue); let rx_queue_ref = Arc::clone(&rx_queue); - let generator = ControlledRateLoadGenerator::generate(params.stop_type, params.target_rate, &create_transfer_request, Arc::new(tx_queue)); + let generator_impl = TransferRequestGenerator { + available_accounts: params.accounts, + generated: Vec::new(), + }; + + let generator = ControlledRateLoadGenerator::generate(params.stop_type, params.target_rate, generator_impl, Arc::new(tx_queue)); let h_generator = tokio::spawn(generator); let config = Config { @@ -222,40 +229,6 @@ fn start_queue_monitor(queue: Arc>) -> JoinHandle TransferRequest { - let mut available_accounts = 0_u64; - let args: Vec = env::args().collect(); - if args.len() >= 2 { - let mut i = 1; - while i < args.len() { - let param_name = &args[i]; - if param_name.eq("--accounts") { - let param_value = &args[i + 1]; - available_accounts = param_value.parse().unwrap(); - } - i += 2; - } - } - - let mut rnd = rand::thread_rng(); - let mut to; - - let from = rnd.gen_range(1..=available_accounts); - loop { - to = rnd.gen_range(1..=available_accounts); - if to == from { - continue; - } - break; - } - - TransferRequest { - from: format!("{:<04}", from), - to: format!("{:<04}", to), - amount: Decimal::from_f32(1.0).unwrap(), - } -} - async fn get_params() -> Result { let args: Vec = env::args().collect(); let mut threads: Option = Some(1); @@ -305,6 +278,7 @@ async fn get_params() -> Result { target_rate: target_rate.unwrap(), stop_type: stop_type.unwrap(), threads: threads.unwrap(), + accounts: accounts.unwrap(), }) } } @@ -438,3 +412,45 @@ fn extract_num_value(report: &[&str], value: &str) -> Option None } } + +struct TransferRequestGenerator { + available_accounts: u64, + generated: Vec<(u64, u64)>, +} + +impl Generator for TransferRequestGenerator { + fn generate(&mut self) -> TransferRequest { + let mut rnd = rand::thread_rng(); + let mut to; + + let from = rnd.gen_range(1..=self.available_accounts); + loop { + to = rnd.gen_range(1..=self.available_accounts); + if to == from { + continue; + } + + let result = self + .generated + .iter() + .find(|(past_from, past_to)| *past_from == from && *past_to == to || *past_from == to && *past_to == from); + + if result.is_none() { + if self.generated.len() < 100 { + self.generated.push((from, to)); + } else { + self.generated.remove(0); + self.generated.insert(0, (from, to)); + } + + break; + } + } + + TransferRequest { + from: format!("{:<04}", from), + to: format!("{:<04}", to), + amount: Decimal::from_f32(1.0).unwrap(), + } + } +} diff --git a/packages/examples_support/src/load_generator/generator.rs b/packages/examples_support/src/load_generator/generator.rs index d4aa516a..059a37bd 100644 --- a/packages/examples_support/src/load_generator/generator.rs +++ b/packages/examples_support/src/load_generator/generator.rs @@ -5,10 +5,15 @@ use time::OffsetDateTime; use crate::load_generator::models::{Progress, StopType}; +use super::models::Generator; + pub struct ControlledRateLoadGenerator {} impl ControlledRateLoadGenerator { - pub async fn generate(stop_type: StopType, target_rate: f32, fn_item_factory: &impl Fn() -> T, tx_output: Arc>) -> Result<(), String> { + pub async fn generate(stop_type: StopType, target_rate: f32, mut generator_impl: G, tx_output: Arc>) -> Result<(), String> + where + G: Generator + Sized + 'static, + { let started_at = OffsetDateTime::now_utc().unix_timestamp_nanos(); if let StopType::LimitExecutionDuration { run_duration } = stop_type { let stop_at = started_at + run_duration.as_nanos() as i128; @@ -69,7 +74,7 @@ impl ControlledRateLoadGenerator { } } - let new_item: T = fn_item_factory(); + let new_item: T = generator_impl.generate(); let _ = tx_output.send(new_item).await; generated_count += 1; diff --git a/packages/examples_support/src/load_generator/models.rs b/packages/examples_support/src/load_generator/models.rs index aa460c03..602fb9f6 100644 --- a/packages/examples_support/src/load_generator/models.rs +++ b/packages/examples_support/src/load_generator/models.rs @@ -26,3 +26,7 @@ pub enum StopType { LimitExecutionDuration { run_duration: Duration }, LimitGeneratedTransactions { count: u64 }, } + +pub trait Generator { + fn generate(&mut self) -> T; +}