diff --git a/martian-pipestance/src/critical_path.rs b/martian-pipestance/src/critical_path.rs index b52b34a6da..179c979702 100644 --- a/martian-pipestance/src/critical_path.rs +++ b/martian-pipestance/src/critical_path.rs @@ -4,35 +4,65 @@ use crate::common::NodeType; use crate::final_state::{ArgumentMode, FinalState}; -use crate::perf::Perf; +use crate::perf::{Perf, PerfElement}; +use crate::PipestanceFile; +use anyhow::Result; use ordered_float::OrderedFloat; use petgraph::algo::toposort; use petgraph::graph::{DiGraph, NodeIndex}; use serde_json::Value; use std::collections::{BTreeMap, VecDeque}; use std::fmt; +use std::path::Path; + +pub enum StageWeight { + /// The wall time of the stage ignoring any queueing time. This will be + /// the sum of wall times of split + slowest chunk + join + NoQueueWallTime, + /// Provide a custom function to compute the weight from the stage perf + CustomWeight(Box f64>), +} + +impl StageWeight { + fn weight(&self, perf_element: &PerfElement) -> f64 { + match self { + StageWeight::NoQueueWallTime => perf_element.no_queue_wall_time_seconds(), + StageWeight::CustomWeight(f) => f(perf_element), + } + } +} -pub struct CriticalPathNode { +pub struct CriticalPathStage { pub id: String, pub name: String, - pub no_queue_wall_time_seconds: f64, + pub weight: f64, } -impl fmt::Debug for CriticalPathNode { +impl fmt::Debug for CriticalPathStage { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{} [{}s]", self.name, self.no_queue_wall_time_seconds) + write!(f, "{} [{}s]", self.name, self.weight) } } #[derive(Debug)] pub struct CriticalPath { - pub total_time_seconds: f64, - pub path: Vec, + pub total_weight: f64, + pub path: Vec, } impl CriticalPath { - pub fn compute(final_state: &FinalState, perf: &Perf) -> Self { - CriticalPathBuilder::new(final_state, perf).critical_path() + pub fn from_pipestance_folder( + folder: impl AsRef, + weight_function: StageWeight, + ) -> Result { + Ok(Self::compute( + &FinalState::from_pipestance_folder(folder.as_ref())?, + &Perf::from_pipestance_folder(folder.as_ref())?, + weight_function, + )) + } + pub fn compute(final_state: &FinalState, perf: &Perf, weight_function: StageWeight) -> Self { + CriticalPathBuilder::new().build(final_state, perf, weight_function) } } @@ -77,7 +107,20 @@ impl CriticalPathBuilder { const START_NODE: &str = "__START__"; const END_NODE: &str = "__END__"; - fn new(final_state: &FinalState, perf: &Perf) -> Self { + fn new() -> Self { + Self::default() + } + + fn build( + mut self, + final_state: &FinalState, + perf: &Perf, + weight_function: StageWeight, + ) -> CriticalPath { + for perf_element in perf.0.iter().filter(|p| p.ty == NodeType::Stage) { + self.add_node(&perf_element.fqname, weight_function.weight(perf_element)); + } + let return_path_map: BTreeMap<_, _> = final_state .completed_stages() .flat_map(|f| { @@ -92,17 +135,8 @@ impl CriticalPathBuilder { }) .collect(); - let mut builder = Self::default(); - - for perf_element in perf.0.iter().filter(|p| p.ty == NodeType::Stage) { - builder.add_node( - &perf_element.fqname, - perf_element.no_queue_wall_time_seconds(), - ); - } - for stage_state in final_state.completed_stages() { - assert!(builder.stage_id_map.contains_key(&stage_state.fqname)); + assert!(self.stage_id_map.contains_key(&stage_state.fqname)); for binding_info in stage_state.argument_bindings() { match binding_info.mode { @@ -112,18 +146,17 @@ impl CriticalPathBuilder { .into_iter() .filter_map(|path| return_path_map.get(&path)) { - builder.add_edge(parent, &stage_state.fqname); + self.add_edge(parent, &stage_state.fqname); } } } ArgumentMode::Reference => { - builder.add_edge(binding_info.node.as_ref().unwrap(), &stage_state.fqname); + self.add_edge(binding_info.node.as_ref().unwrap(), &stage_state.fqname); } } } } - - builder + self._critical_path() } fn add_node(&mut self, stage_id: &str, weight: f64) { @@ -153,13 +186,10 @@ impl CriticalPathBuilder { } } - fn critical_path(mut self) -> CriticalPath { + fn _critical_path(mut self) -> CriticalPath { self.add_start_and_end_node(); - // Create a directed graph let graph = self.graph; - - // Find topological order let topological_order: Vec = toposort(&graph, None).unwrap(); #[derive(Default, Debug, PartialEq, Eq, PartialOrd, Ord)] @@ -188,8 +218,6 @@ impl CriticalPathBuilder { max_weights.insert(*node, path_edge); } - println!("{:#?}", max_weights); - // Trace back the path let mut path = Vec::new(); let mut current_node = topological_order[0]; @@ -197,16 +225,16 @@ impl CriticalPathBuilder { current_node = child; let stage_id = &graph[current_node].stage_id; if stage_id != CriticalPathBuilder::END_NODE { - path.push(CriticalPathNode { + path.push(CriticalPathStage { id: stage_id.clone(), name: stage_id.split('.').last().unwrap().to_string(), - no_queue_wall_time_seconds: graph[current_node].weight.0, + weight: graph[current_node].weight.0, }); } } CriticalPath { - total_time_seconds: max_weights[&topological_order[0]].weight.0, + total_weight: max_weights[&topological_order[0]].weight.0, path, } } @@ -219,22 +247,17 @@ mod tests { use crate::PipestanceFile; use anyhow::Result; - fn path_builder() -> Result { + #[test] + fn test_critical_path() -> Result<()> { let final_state = FinalState::from_string(read_zst("test_data/_finalstate.zst")?)?; let perf = Perf::from_string(read_zst("test_data/_perf.zst")?)?; - let builder = CriticalPathBuilder::new(&final_state, &perf); - Ok(builder) - } - #[test] - fn test_critical_path() -> Result<()> { - let builder = path_builder()?; - // println!("{:#?}", builder.stage_info); + let critical_path = + CriticalPath::compute(&final_state, &perf, StageWeight::NoQueueWallTime); - let critical_path = builder.critical_path(); println!("{:#?}", critical_path); - let max_weight = critical_path.total_time_seconds; + let max_weight = critical_path.total_weight; let stages: Vec<_> = critical_path.path.into_iter().map(|p| p.name).collect(); assert!((max_weight - 3263.950373765).abs() / max_weight <= 1e-6); assert_eq!( @@ -265,4 +288,18 @@ mod tests { ); Ok(()) } + + #[test] + fn test_critical_path_custom() -> Result<()> { + let final_state = FinalState::from_string(read_zst("test_data/_finalstate.zst")?)?; + let perf = Perf::from_string(read_zst("test_data/_perf.zst")?)?; + + let critical_path = CriticalPath::compute( + &final_state, + &perf, + StageWeight::CustomWeight(Box::new(|p| p.observed_wall_time())), + ); + println!("{:#?}", critical_path); + Ok(()) + } } diff --git a/martian-pipestance/src/perf.rs b/martian-pipestance/src/perf.rs index 0128a17e88..00766fd82b 100644 --- a/martian-pipestance/src/perf.rs +++ b/martian-pipestance/src/perf.rs @@ -51,6 +51,15 @@ impl PerfElement { .reduce(f64::max) .unwrap_or(0.0) } + + /// Actual wall time of the slowest fork + pub fn observed_wall_time(&self) -> f64 { + self.forks + .iter() + .map(|f| f.fork_stats.walltime) + .reduce(f64::max) + .unwrap_or(0.0) + } } /// Core type of a `_perf` file. Contains all information about a stage execution.