Skip to content

Commit

Permalink
feat: Improve workload generator, use history to avoiud duplicates
Browse files Browse the repository at this point in the history
  • Loading branch information
fmarek-kindred committed Jul 26, 2023
1 parent 941d964 commit 90e3356
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 92 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/agent_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ async-channel = { version = "1.8.0" }
async-trait = { workspace = true }
env_logger = { workspace = true }
log = { workspace = true }
rand = { version = "0.8.5" }
rdkafka = { version = "0.29.0", features = ["sasl"] }
rdkafka-sys = { version = "4.3.0" }
serde = { workspace = true }
Expand Down
42 changes: 23 additions & 19 deletions examples/agent_client/examples/agent_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_channel::Receiver;
use examples_support::load_generator::generator::ControlledRateLoadGenerator;
use examples_support::load_generator::models::StopType;
use examples_support::load_generator::models::{Generator, StopType};
use std::num::ParseIntError;
use std::{env, sync::Arc, time::Duration};

Expand Down Expand Up @@ -57,7 +57,7 @@ async fn certify() -> Result<(), String> {

let h_workload_generator = tokio::spawn(async move {
let params = params.clone();
ControlledRateLoadGenerator::generate::<CertificationRequest>(params.stop_type, params.target_rate as f32, &create_new_candidate, tx_generated).await
ControlledRateLoadGenerator::generate(params.stop_type, params.target_rate as f32, RequestGenerator {}, tx_generated).await
});

let all_async_services = tokio::spawn(async move {
Expand Down Expand Up @@ -264,23 +264,6 @@ async fn make_agent(params: LaunchParams) -> impl TalosAgent {
agent
}

fn create_new_candidate() -> CertificationRequest {
let tx_data = CandidateData {
xid: Uuid::new_v4().to_string(),
readset: Vec::new(),
readvers: Vec::new(),
snapshot: 5,
writeset: Vec::from(["3".to_string()]),
statemap: None,
};

CertificationRequest {
message_key: "12345".to_string(),
candidate: tx_data,
timeout: None, // this will use the default global value as defined in AgentConfig
}
}

fn create_stop_controller(params: LaunchParams, queue: Arc<Receiver<CertificationRequest>>) -> JoinHandle<Result<(), String>> {
tokio::spawn(async move {
let mut remaining_checks = params.stop_max_empty_checks;
Expand Down Expand Up @@ -367,3 +350,24 @@ async fn get_params() -> Result<LaunchParams, String> {
})
}
}

struct RequestGenerator {}

impl Generator<CertificationRequest> for RequestGenerator {
fn generate(&mut self) -> CertificationRequest {
let tx_data = CandidateData {
xid: Uuid::new_v4().to_string(),
readset: Vec::new(),
readvers: Vec::new(),
snapshot: 5,
writeset: Vec::from(["3".to_string()]),
statemap: None,
};

CertificationRequest {
message_key: "12345".to_string(),
candidate: tx_data,
timeout: None, // this will use the default global value as defined in AgentConfig
}
}
}
90 changes: 54 additions & 36 deletions examples/cohort_banking_example/examples/cohort_banking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use cohort::{
state::postgres::{data_access::PostgresApi, database::Database},
};

use examples_support::load_generator::{generator::ControlledRateLoadGenerator, models::StopType};
use examples_support::load_generator::{
generator::ControlledRateLoadGenerator,
models::{Generator, StopType},
};
use metrics::model::{MicroMetrics, MinMax};
use rand::Rng;
use rust_decimal::{prelude::FromPrimitive, Decimal};
Expand All @@ -29,6 +32,7 @@ type HeartBeatReceiver = tokio::sync::watch::Receiver<u64>;

#[derive(Clone)]
struct LaunchParams {
accounts: u64,
stop_type: StopType,
target_rate: f32,
threads: u64,
Expand All @@ -49,7 +53,12 @@ async fn main() -> Result<(), String> {
let rx_queue = Arc::new(rx_queue);
let rx_queue_ref = Arc::clone(&rx_queue);

let generator = ControlledRateLoadGenerator::generate(params.stop_type, params.target_rate, &create_transfer_request, Arc::new(tx_queue));
let generator_impl = TransferRequestGenerator {
available_accounts: params.accounts,
generated: Vec::new(),
};

let generator = ControlledRateLoadGenerator::generate(params.stop_type, params.target_rate, generator_impl, Arc::new(tx_queue));

let h_generator = tokio::spawn(generator);
let (tx_metrics, rx_metrics) = async_channel::unbounded::<Stats>();
Expand Down Expand Up @@ -220,40 +229,6 @@ async fn start_replicator(
(h_replicator, h_installer, rx_heartbeat)
}

fn create_transfer_request() -> TransferRequest {
let mut available_accounts = 0_u64;
let args: Vec<String> = env::args().collect();
if args.len() >= 2 {
let mut i = 1;
while i < args.len() {
let param_name = &args[i];
if param_name.eq("--accounts") {
let param_value = &args[i + 1];
available_accounts = param_value.parse().unwrap();
}
i += 2;
}
}

let mut rnd = rand::thread_rng();
let mut to;

let from = rnd.gen_range(1..=available_accounts);
loop {
to = rnd.gen_range(1..=available_accounts);
if to == from {
continue;
}
break;
}

TransferRequest {
from: format!("{:<04}", from),
to: format!("{:<04}", to),
amount: Decimal::from_f32(1.0).unwrap(),
}
}

async fn get_params() -> Result<LaunchParams, String> {
let args: Vec<String> = env::args().collect();
let mut threads: Option<u64> = Some(1);
Expand Down Expand Up @@ -322,6 +297,7 @@ async fn get_params() -> Result<LaunchParams, String> {
Err("Parameter --rate is required".into())
} else {
Ok(LaunchParams {
accounts: accounts.unwrap(),
target_rate: target_rate.unwrap(),
stop_type: stop_type.unwrap(),
threads: threads.unwrap(),
Expand All @@ -331,3 +307,45 @@ async fn get_params() -> Result<LaunchParams, String> {
})
}
}

struct TransferRequestGenerator {
available_accounts: u64,
generated: Vec<(u64, u64)>,
}

impl Generator<TransferRequest> for TransferRequestGenerator {
fn generate(&mut self) -> TransferRequest {
let mut rnd = rand::thread_rng();
let mut to;

let from = rnd.gen_range(1..=self.available_accounts);
loop {
to = rnd.gen_range(1..=self.available_accounts);
if to == from {
continue;
}

let result = self
.generated
.iter()
.find(|(past_from, past_to)| *past_from == from && *past_to == to || *past_from == to && *past_to == from);

if result.is_none() {
if self.generated.len() < 100 {
self.generated.push((from, to));
} else {
self.generated.remove(0);
self.generated.insert(0, (from, to));
}

break;
}
}

TransferRequest {
from: format!("{:<04}", from),
to: format!("{:<04}", to),
amount: Decimal::from_f32(1.0).unwrap(),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{collections::HashMap, env, sync::Arc, time::Duration};
use async_channel::Receiver;
use cohort_banking::{app::BankingApp, examples_support::queue_processor::QueueProcessor, model::requests::TransferRequest};
use cohort_sdk::model::Config;
use examples_support::load_generator::models::Generator;
use examples_support::load_generator::{generator::ControlledRateLoadGenerator, models::StopType};

use opentelemetry_api::KeyValue;
Expand All @@ -23,6 +24,7 @@ struct LaunchParams {
stop_type: StopType,
target_rate: f32,
threads: u64,
accounts: u64,
}

#[tokio::main]
Expand All @@ -36,7 +38,12 @@ async fn main() -> Result<(), String> {
let rx_queue = Arc::new(rx_queue);
let rx_queue_ref = Arc::clone(&rx_queue);

let generator = ControlledRateLoadGenerator::generate(params.stop_type, params.target_rate, &create_transfer_request, Arc::new(tx_queue));
let generator_impl = TransferRequestGenerator {
available_accounts: params.accounts,
generated: Vec::new(),
};

let generator = ControlledRateLoadGenerator::generate(params.stop_type, params.target_rate, generator_impl, Arc::new(tx_queue));
let h_generator = tokio::spawn(generator);

let config = Config {
Expand Down Expand Up @@ -222,40 +229,6 @@ fn start_queue_monitor(queue: Arc<Receiver<TransferRequest>>) -> JoinHandle<Resu
})
}

fn create_transfer_request() -> TransferRequest {
let mut available_accounts = 0_u64;
let args: Vec<String> = env::args().collect();
if args.len() >= 2 {
let mut i = 1;
while i < args.len() {
let param_name = &args[i];
if param_name.eq("--accounts") {
let param_value = &args[i + 1];
available_accounts = param_value.parse().unwrap();
}
i += 2;
}
}

let mut rnd = rand::thread_rng();
let mut to;

let from = rnd.gen_range(1..=available_accounts);
loop {
to = rnd.gen_range(1..=available_accounts);
if to == from {
continue;
}
break;
}

TransferRequest {
from: format!("{:<04}", from),
to: format!("{:<04}", to),
amount: Decimal::from_f32(1.0).unwrap(),
}
}

async fn get_params() -> Result<LaunchParams, String> {
let args: Vec<String> = env::args().collect();
let mut threads: Option<u64> = Some(1);
Expand Down Expand Up @@ -305,6 +278,7 @@ async fn get_params() -> Result<LaunchParams, String> {
target_rate: target_rate.unwrap(),
stop_type: stop_type.unwrap(),
threads: threads.unwrap(),
accounts: accounts.unwrap(),
})
}
}
Expand Down Expand Up @@ -438,3 +412,45 @@ fn extract_num_value<T: FromStr + Clone>(report: &[&str], value: &str) -> Option
None
}
}

struct TransferRequestGenerator {
available_accounts: u64,
generated: Vec<(u64, u64)>,
}

impl Generator<TransferRequest> for TransferRequestGenerator {
fn generate(&mut self) -> TransferRequest {
let mut rnd = rand::thread_rng();
let mut to;

let from = rnd.gen_range(1..=self.available_accounts);
loop {
to = rnd.gen_range(1..=self.available_accounts);
if to == from {
continue;
}

let result = self
.generated
.iter()
.find(|(past_from, past_to)| *past_from == from && *past_to == to || *past_from == to && *past_to == from);

if result.is_none() {
if self.generated.len() < 100 {
self.generated.push((from, to));
} else {
self.generated.remove(0);
self.generated.insert(0, (from, to));
}

break;
}
}

TransferRequest {
from: format!("{:<04}", from),
to: format!("{:<04}", to),
amount: Decimal::from_f32(1.0).unwrap(),
}
}
}
9 changes: 7 additions & 2 deletions packages/examples_support/src/load_generator/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ use time::OffsetDateTime;

use crate::load_generator::models::{Progress, StopType};

use super::models::Generator;

pub struct ControlledRateLoadGenerator {}

impl ControlledRateLoadGenerator {
pub async fn generate<T>(stop_type: StopType, target_rate: f32, fn_item_factory: &impl Fn() -> T, tx_output: Arc<Sender<T>>) -> Result<(), String> {
pub async fn generate<T, G>(stop_type: StopType, target_rate: f32, mut generator_impl: G, tx_output: Arc<Sender<T>>) -> Result<(), String>
where
G: Generator<T> + Sized + 'static,
{
let started_at = OffsetDateTime::now_utc().unix_timestamp_nanos();
if let StopType::LimitExecutionDuration { run_duration } = stop_type {
let stop_at = started_at + run_duration.as_nanos() as i128;
Expand Down Expand Up @@ -69,7 +74,7 @@ impl ControlledRateLoadGenerator {
}
}

let new_item: T = fn_item_factory();
let new_item: T = generator_impl.generate();
let _ = tx_output.send(new_item).await;
generated_count += 1;

Expand Down
4 changes: 4 additions & 0 deletions packages/examples_support/src/load_generator/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ pub enum StopType {
LimitExecutionDuration { run_duration: Duration },
LimitGeneratedTransactions { count: u64 },
}

pub trait Generator<T> {
fn generate(&mut self) -> T;
}

0 comments on commit 90e3356

Please sign in to comment.