diff --git a/.gitignore b/.gitignore index 9aad67de9e..42537a12ba 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ martian-filetypes/example.bincode martian-filetypes/example.json martian-filetypes/example_lazy.bincode martian-filetypes/example_lazy.json +.DS_Store \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index b60252233c..1ef4018d02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -356,6 +356,12 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + [[package]] name = "errno" version = "0.3.1" @@ -395,6 +401,12 @@ dependencies = [ "log", ] +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flate2" version = "1.0.25" @@ -440,6 +452,12 @@ version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" +[[package]] +name = "hashbrown" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" + [[package]] name = "heck" version = "0.4.1" @@ -461,6 +479,16 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286" +[[package]] +name = "indexmap" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" +dependencies = [ + "equivalent", + "hashbrown", +] + [[package]] name = "indoc" version = "2.0.2" @@ -665,6 +693,18 @@ dependencies = [ "serde", ] +[[package]] +name = "martian-pipestance" +version = "0.1.0" +dependencies = [ + "anyhow", + "ordered-float", + "petgraph", + "serde", + "serde_json", + "zstd", +] + [[package]] name = "memchr" version = "2.5.0" @@ -739,6 +779,15 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" +[[package]] +name = "ordered-float" +version = "3.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a54938017eacd63036332b4ae5c8a49fc8c0c1d6d629893057e4f13609edd06" +dependencies = [ + "num-traits", +] + [[package]] name = "output_vt100" version = "0.1.3" @@ -748,6 +797,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "petgraph" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" +dependencies = [ + "fixedbitset", + "indexmap", +] + [[package]] name = "pkg-config" version = "0.3.26" diff --git a/Cargo.toml b/Cargo.toml index 2e61a535e6..c776e2dfb4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,4 +5,5 @@ members = [ "martian-derive", "martian-lab", "martian-filetypes", -] \ No newline at end of file + "martian-pipestance", +] diff --git a/martian-pipestance/Cargo.toml b/martian-pipestance/Cargo.toml new file mode 100644 index 0000000000..a32529684e --- /dev/null +++ b/martian-pipestance/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "martian-pipestance" +version = "0.1.0" +authors = ["Sreenath Krishnan "] +edition = "2021" +include = ["src/**/*"] +license = "MIT" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde = { version = "1.0", features = ['derive'] } +serde_json = "1.0" +anyhow = { version = "1", features = ["backtrace"] } +ordered-float = ">=3.7" +petgraph = { version = "0.6", default-features = false } + +[dev-dependencies] +zstd = "0.12.4" diff --git a/martian-pipestance/src/common.rs b/martian-pipestance/src/common.rs new file mode 100644 index 0000000000..f5beec39c6 --- /dev/null +++ b/martian-pipestance/src/common.rs @@ -0,0 +1,19 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum NodeType { + Pipeline, + Stage, +} + +#[cfg(test)] +pub fn read_zst(fname: &str) -> anyhow::Result { + use anyhow::Context; + use std::io::Read; + let mut file = std::fs::File::open(fname).context(format!("While opening {fname}"))?; + let mut decoder = zstd::stream::read::Decoder::new(&mut file)?; + let mut buffer = String::new(); + decoder.read_to_string(&mut buffer)?; + Ok(buffer) +} diff --git a/martian-pipestance/src/critical_path.rs b/martian-pipestance/src/critical_path.rs new file mode 100644 index 0000000000..08bc9fc6da --- /dev/null +++ b/martian-pipestance/src/critical_path.rs @@ -0,0 +1,308 @@ +//! +//! Critical path in the pipestance +//! + +use crate::common::NodeType; +use crate::final_state::{ArgumentMode, FinalState}; +use crate::perf::{Perf, PerfElement}; +use crate::PipestanceFile; +use anyhow::Result; +use ordered_float::OrderedFloat; +use petgraph::algo::toposort; +use petgraph::graph::{DiGraph, NodeIndex}; +use serde_json::Value; +use std::collections::{BTreeMap, VecDeque}; +use std::fmt; +use std::path::Path; + +pub enum StageWeight { + /// The wall time of the stage ignoring any queueing time. This will be + /// the sum of wall times of split + slowest chunk + join + NoQueueWallTime, + /// Provide a custom function to compute the weight from the stage perf + CustomWeight(Box f64>), +} + +impl StageWeight { + fn weight(&self, perf_element: &PerfElement) -> f64 { + match self { + StageWeight::NoQueueWallTime => perf_element.no_queue_wall_time_seconds(), + StageWeight::CustomWeight(f) => f(perf_element), + } + } +} + +pub struct CriticalPathStage { + pub id: String, + pub name: String, + pub weight: f64, +} + +impl fmt::Debug for CriticalPathStage { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{} [{}s]", self.name, self.weight) + } +} + +#[derive(Debug)] +pub struct CriticalPath { + pub total_weight: f64, + pub path: Vec, +} + +impl CriticalPath { + pub fn from_pipestance_folder( + folder: impl AsRef, + weight_function: StageWeight, + ) -> Result { + Ok(Self::compute( + &FinalState::from_pipestance_folder(folder.as_ref())?, + &Perf::from_pipestance_folder(folder.as_ref())?, + weight_function, + )) + } + pub fn compute(final_state: &FinalState, perf: &Perf, weight_function: StageWeight) -> Self { + CriticalPathBuilder::new().build(final_state, perf, weight_function) + } +} + +#[derive(Default)] +struct GraphNode { + weight: OrderedFloat, + stage_id: String, +} + +#[derive(Default)] +struct CriticalPathBuilder { + // stage_info: BTreeMap, + graph: DiGraph, + node_id_of_stage_id: BTreeMap, +} + +fn collect_all_nested_strings(v: &Value) -> Vec { + let mut queue = VecDeque::from([v]); + let mut nested_strings = vec![]; + while !queue.is_empty() { + match queue.pop_front().unwrap() { + Value::Null | Value::Bool(_) | Value::Number(_) => {} + Value::String(s) => { + nested_strings.push(s.to_string()); + } + Value::Array(arr) => { + for a in arr { + queue.push_back(a); + } + } + Value::Object(obj) => { + for o in obj.values() { + queue.push_back(o); + } + } + } + } + nested_strings +} + +impl CriticalPathBuilder { + const START_NODE: &str = "__START__"; + const END_NODE: &str = "__END__"; + + fn new() -> Self { + Self::default() + } + + fn build( + mut self, + final_state: &FinalState, + perf: &Perf, + weight_function: StageWeight, + ) -> CriticalPath { + for perf_element in perf.0.iter().filter(|p| p.ty == NodeType::Stage) { + self.add_node(&perf_element.fqname, weight_function.weight(perf_element)); + } + + let return_path_map: BTreeMap<_, _> = final_state + .completed_stages() + .flat_map(|f| { + f.return_bindings() + .filter_map(|r| r.value.as_ref().map(collect_all_nested_strings)) + .flatten() + .filter_map(|ret_val| { + ret_val + .contains(&format!("/{}/fork", f.name)) + .then_some((ret_val, f.fqname.clone())) + }) + }) + .collect(); + + for stage_state in final_state.completed_stages() { + assert!(self.node_id_of_stage_id.contains_key(&stage_state.fqname)); + + for binding_info in stage_state.argument_bindings() { + match binding_info.mode { + ArgumentMode::Empty => { + if let Some(v) = &binding_info.value { + for parent in collect_all_nested_strings(v) + .into_iter() + .filter_map(|path| return_path_map.get(&path)) + { + self.add_edge(parent, &stage_state.fqname); + } + } + } + ArgumentMode::Reference => { + self.add_edge(binding_info.node.as_ref().unwrap(), &stage_state.fqname); + } + } + } + } + self._critical_path() + } + + fn add_node(&mut self, stage_id: &str, weight: f64) { + let node = GraphNode { + weight: weight.into(), + stage_id: stage_id.to_string(), + }; + assert!(!self.node_id_of_stage_id.contains_key(&node.stage_id)); + self.node_id_of_stage_id + .insert(node.stage_id.to_string(), self.graph.add_node(node)); + } + + fn add_edge(&mut self, from: &str, to: &str) { + self.graph.update_edge( + self.node_id_of_stage_id[from], + self.node_id_of_stage_id[to], + (), + ); + } + + fn add_start_and_end_node(&mut self) { + let all_nodes: Vec<_> = self.node_id_of_stage_id.keys().cloned().collect(); + + self.add_node(Self::START_NODE, 0.0); + self.add_node(Self::END_NODE, 0.0); + + for node in all_nodes { + self.add_edge(Self::START_NODE, &node); + self.add_edge(&node, Self::END_NODE); + } + } + + fn _critical_path(mut self) -> CriticalPath { + self.add_start_and_end_node(); + + let graph = self.graph; + let topological_order: Vec = toposort(&graph, None).unwrap(); + + #[derive(Default, Debug, PartialEq, Eq, PartialOrd, Ord)] + struct MaxWeight { + weight: OrderedFloat, + child: Option, + } + + // Calculate maximum weighted path + let mut max_weights: BTreeMap = BTreeMap::new(); + + for node in topological_order.iter().rev() { + let node_weight = graph[*node].weight; + let path_edge = graph + .neighbors_directed(*node, petgraph::Direction::Outgoing) + .map(|child| MaxWeight { + weight: max_weights[&child].weight + node_weight, + child: Some(child), + }) + .max() + .unwrap_or(MaxWeight { + weight: node_weight, + child: None, + }); + + max_weights.insert(*node, path_edge); + } + + // Trace back the path + let mut path = Vec::new(); + let mut current_node = topological_order[0]; + while let Some(child) = max_weights[¤t_node].child { + current_node = child; + let stage_id = &graph[current_node].stage_id; + if stage_id != CriticalPathBuilder::END_NODE { + path.push(CriticalPathStage { + id: stage_id.clone(), + name: stage_id.split('.').last().unwrap().to_string(), + weight: graph[current_node].weight.0, + }); + } + } + + CriticalPath { + total_weight: max_weights[&topological_order[0]].weight.0, + path, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::common::read_zst; + use crate::PipestanceFile; + use anyhow::Result; + + #[test] + fn test_critical_path() -> Result<()> { + let final_state = FinalState::from_string(read_zst("test_data/_finalstate.zst")?)?; + let perf = Perf::from_string(read_zst("test_data/_perf.zst")?)?; + + let critical_path = + CriticalPath::compute(&final_state, &perf, StageWeight::NoQueueWallTime); + + println!("{:#?}", critical_path); + + let max_weight = critical_path.total_weight; + let stages: Vec<_> = critical_path.path.into_iter().map(|p| p.name).collect(); + assert!((max_weight - 3263.950373765).abs() / max_weight <= 1e-6); + assert_eq!( + stages, + [ + "WRITE_GENE_INDEX", + "PARSE_TARGET_FEATURES", + "MAKE_SHARD", + "MAKE_HD_CORRECTION_MAP", + "BARCODE_CORRECTION", + "SET_ALIGNER_SUBSAMPLE_RATE", + "ALIGN_AND_COUNT", + "WRITE_H5_MATRIX", + "FILTER_BARCODES", + "COLLATE_PROBE_METRICS", + "WRITE_MOLECULE_INFO", + "DISABLE_SECONDARY_ANALYSIS", + "ANALYZER_PREFLIGHT", + "PREPROCESS_MATRIX", + "RUN_PCA", + "RUN_GRAPH_CLUSTERING", + "COMBINE_CLUSTERING", + "RUN_DIFFERENTIAL_EXPRESSION", + "SUMMARIZE_ANALYSIS", + "DECONVOLVE_SPOTS", + "CLOUPE_PREPROCESS", + ] + ); + Ok(()) + } + + #[test] + fn test_critical_path_custom() -> Result<()> { + let final_state = FinalState::from_string(read_zst("test_data/_finalstate.zst")?)?; + let perf = Perf::from_string(read_zst("test_data/_perf.zst")?)?; + + let critical_path = CriticalPath::compute( + &final_state, + &perf, + StageWeight::CustomWeight(Box::new(|p| p.observed_wall_time())), + ); + println!("{:#?}", critical_path); + Ok(()) + } +} diff --git a/martian-pipestance/src/final_state.rs b/martian-pipestance/src/final_state.rs new file mode 100644 index 0000000000..c72516012f --- /dev/null +++ b/martian-pipestance/src/final_state.rs @@ -0,0 +1,192 @@ +use crate::common::NodeType; +use crate::PipestanceFile; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::HashMap; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FinalState(pub Vec); + +impl PipestanceFile for FinalState { + fn filename() -> &'static str { + "_finalstate" + } +} + +impl FinalState { + pub fn completed_stages(&self) -> impl Iterator { + self.0 + .iter() + .filter(|s| (s.ty == NodeType::Stage) && (s.state == State::Complete)) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct FinalStateElement { + pub name: String, + pub fqname: String, + pub path: String, + pub state: State, + pub metadata: Metadata, + pub error: Option, + pub stagecode_cmd: String, + pub forks: Vec, + pub edges: Vec, + pub stagecode_lang: StagecodeLang, + #[serde(rename = "type")] + pub ty: NodeType, +} + +impl FinalStateElement { + pub fn return_bindings(&self) -> impl Iterator { + self.forks + .iter() + .filter_map(|f| { + f.bindings + .as_ref() + .map(|binding| binding.bindings_return.iter().flatten()) + }) + .flatten() + } + pub fn argument_bindings(&self) -> impl Iterator { + self.forks + .iter() + .filter_map(|f| f.bindings.as_ref().map(|binding| binding.argument.iter())) + .flatten() + } +} + +// Encapsulates information about a node failure. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct NodeErrorInfo { + fqname: String, + path: String, + summary: Option, + log: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Edge { + pub from: String, + pub to: String, +} + +// Exportable information from a Fork object. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Fork { + #[serde(rename = "argPermute")] + pub arg_permute: Option, + #[serde(rename = "joinDef")] + pub join_def: Option, + pub state: State, + pub metadata: Option, + pub split_metadata: Option, + pub join_metadata: Option, + pub bindings: Option, + pub chunks: Vec, + pub index: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct Bindings { + pub argument: Vec, + #[serde(rename = "Return")] + pub bindings_return: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct BindingInfo { + pub mode: ArgumentMode, + pub node: Option, + pub matched_fork: Option, + pub value: Option, + pub id: String, + #[serde(rename = "type")] + pub argument_type: String, + pub waiting: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum ArgumentMode { + #[serde(rename = "")] + Empty, + Reference, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ForkChunk { + pub chunk_def: ChunkDef, + pub metadata: Metadata, + pub state: State, + pub index: i64, +} + +// Defines resources used by a stage. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct JobResources { + #[serde(rename = "__special")] + special: Option, + #[serde(rename = "__threads")] + threads: Option, + #[serde(rename = "__mem_gb")] + mem_gb: Option, + #[serde(rename = "__vmem_gb")] + vmem_gb: Option, +} + +// Defines the resources and arguments of a chunk. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ChunkDef { + /// Declared resources + #[serde(flatten)] + pub resources: Option, + /// Addition arguments + #[serde(flatten)] + pub args: HashMap, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Metadata { + pub path: String, + pub names: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum State { + Complete, + Failed, + Disabled, + Running, + Queued, + Ready, + #[serde(alias = "")] + Waiting, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum StagecodeLang { + Compiled, + #[serde(rename = "")] + Empty, + Python, +} + +#[cfg(test)] +mod tests { + use super::FinalState; + use crate::common::read_zst; + use crate::PipestanceFile; + use anyhow::Result; + + #[test] + fn test_finalstate_deserialize() -> Result<()> { + let _finalstate = FinalState::from_string(read_zst("test_data/_finalstate.zst")?)?; + Ok(()) + } +} diff --git a/martian-pipestance/src/lib.rs b/martian-pipestance/src/lib.rs new file mode 100644 index 0000000000..b5dd5c01ae --- /dev/null +++ b/martian-pipestance/src/lib.rs @@ -0,0 +1,26 @@ +use anyhow::{Context, Result}; +use serde::de::DeserializeOwned; +use std::path::Path; + +pub mod common; +pub mod critical_path; +pub mod final_state; +pub mod perf; + +/// A file associated with a martian pipestance such as _perf, _finalstate etc +/// that can be deserialized into a concrete type +pub trait PipestanceFile: DeserializeOwned { + fn filename() -> &'static str; + fn from_pipestance_folder(pipestance_folder: impl AsRef) -> Result { + Self::from_file(pipestance_folder.as_ref().join(Self::filename())) + } + fn from_file(filename: impl AsRef) -> Result { + let path = filename.as_ref(); + Self::from_string( + std::fs::read_to_string(path).context(format!("Failed to open {:?}", path))?, + ) + } + fn from_string(file_contents: String) -> Result { + Ok(serde_json::from_str(&file_contents)?) + } +} diff --git a/martian-pipestance/src/perf.rs b/martian-pipestance/src/perf.rs new file mode 100644 index 0000000000..00766fd82b --- /dev/null +++ b/martian-pipestance/src/perf.rs @@ -0,0 +1,223 @@ +use crate::common::NodeType; +use crate::PipestanceFile; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Perf(pub Vec); + +impl PipestanceFile for Perf { + fn filename() -> &'static str { + "_perf" + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ObservedMemory { + pub rss: i64, + pub shared: i64, + pub vmem: i64, + pub text: i64, + pub stack: i64, + pub proc_count: i64, +} + +/// Performance data for a stage or pipeline execution. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct PerfElement { + /// Name of the execution (stage name or pipeline name) + pub name: String, + /// Fully qualified name (e.g. ID.INSITU_DECODING_CS.DECODE_POINTS.fork0) + pub fqname: String, + /// Information about memory usage. Only used in pipelines. + pub highmem: Option, + /// Information about all forks executed by this node + pub forks: Vec, + /// History of byte allocations and deletions + pub bytehist: Option>, + /// Maximum number of bytes allocated + pub maxbytes: i64, + /// Type of the node ("stage" or "pipeline") + #[serde(rename(deserialize = "type"))] + pub ty: NodeType, +} + +impl PerfElement { + /// Wall time if there is no queueing time + /// Slowest (split + slowest chunk + join) among all forks + pub fn no_queue_wall_time_seconds(&self) -> f64 { + self.forks + .iter() + .map(|f| f.no_queue_wall_time_seconds()) + .reduce(f64::max) + .unwrap_or(0.0) + } + + /// Actual wall time of the slowest fork + pub fn observed_wall_time(&self) -> f64 { + self.forks + .iter() + .map(|f| f.fork_stats.walltime) + .reduce(f64::max) + .unwrap_or(0.0) + } +} + +/// Core type of a `_perf` file. Contains all information about a stage execution. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct PerfStats { + /// The start time of this node, or the earliest start time of any of its + /// child nodes. Format is YYYY-mm-ddTHH:MM:SS.nano+-HH:MM, e.g. + /// 2022-01-01T01:01:01.123456789-08:00 + pub start: String, + /// The end time of this node, or the latest end time of any of its child + /// nodes. + pub end: String, + /// The total time elapsed during event execution [seconds] + pub walltime: f64, + /// Number of jobs. For a stage, this is 1. + pub num_jobs: i32, + /// Number of threads used during execution + pub num_threads: i32, + /// For split/main/join nodes, this should be the same as walltime. For + /// other nodes, it is the sum of the durations for child nodes. + pub duration: f64, + /// Number of threads * duration / 3600 [hours] + pub core_hours: f64, + /// Maximum resident set size (portion of memory occupied by process) + /// [kilobytes] + pub maxrss: i64, + /// Maximum number of virtual memory used [kilobytes] + pub maxvmem: i64, + /// Number of kilobytes requested [kilobytes] + pub maxrss_requested: i64, + /// Number of kilobytes of virtual memory requested [kilobytes] + pub maxvmem_requested: i64, + /// The number of times the filesystem had to perform input. + pub in_blocks: i64, + /// The number of times the filesystem had to perform output. + pub out_blocks: i64, + /// The number of times the filesystem had to perform input or output. + pub total_blocks: i64, + /// The average number of times the filesystem had to perform input per + /// second [events / second]. + pub in_blocks_rate: f64, + /// The average number of times the filesystem had to perform output per + /// second [events / second]. + pub out_blocks_rate: f64, + /// The average number of times the filesystem had to perform input/output + /// per second [events / second] + pub total_blocks_rate: f64, + /// Total number of bytes read in by the stage [bytes] + pub in_bytes: i64, + /// Total number of bytes written out by the stage [bytes] + pub out_bytes: i64, + /// Average number of bytes read [bytes/second] + pub in_bytes_rate: f64, + /// Average number of bytes written [bytes/second] + pub out_bytes_rate: f64, + /// Maximum read rate [bytes / second] + pub in_bytes_peak: f64, + /// Maximum write rate in [bytes / second] + pub out_bytes_peak: f64, + /// The total CPU time used by the user [seconds] + pub usertime: f64, + /// The total CPU time used by the system [seconds] + pub systemtime: f64, + /// The number of output files and intermediate files (removed by VDR). + pub total_files: u32, + /// The number of bytes of disk space used for output files and intermediate + /// files (removed by VDR). + pub total_bytes: u64, + /// The number of output files. + pub output_files: u32, + /// The number of bytes of disk space used for output files. + pub output_bytes: u64, + /// The total number of intermediate files that were removed by VDR. + pub vdr_files: u32, + /// The total number bytes used by intermediate files that were removed by + /// VDR. + pub vdr_bytes: u64, + /// Standard deviation of the read rate [bytes / second] + pub in_bytes_dev: f64, + /// Standard deviation of the write rate [bytes / second] + pub out_bytes_dev: f64, +} + +/// pub structure holding information about data flow throughout pipeline. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct NodeByteStamp { + /// Timestamp of the measurement + pub ts: String, + /// Description of the measurement + pub desc: String, + /// Number of bytes used for action described in `desc`. E.g., an allocation. + pub bytes: i64, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ForkPerfInfo { + /// Statistics for the split portion for this fork + pub split_stats: Option, + /// Statistics for the join portion for this fork + pub join_stats: Option, + /// Statistics for the whole fork + pub fork_stats: PerfStats, + /// List of the stages executed in this fork + pub stages: Vec, + /// Performance information about each chunk execution + pub chunks: Vec, + /// Index of the fork + pub index: i32, +} + +impl ForkPerfInfo { + /// Wall time if there is no queueing time (split + slowest chunk + join) + pub fn no_queue_wall_time_seconds(&self) -> f64 { + self.split_stats.as_ref().map_or(0.0, |s| s.walltime) + + self + .chunks + .iter() + .map(|c| c.chunk_stats.walltime) + .reduce(f64::max) + .unwrap_or(0.0) + + self.join_stats.as_ref().map_or(0.0, |s| s.walltime) + } + + pub fn wall_time_seconds(&self) -> f64 { + self.fork_stats.walltime + } +} + +/// Information about a single chunk (executed during main) +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ChunkPerfInfo { + /// Performance information for the chunk + pub chunk_stats: PerfStats, + /// Index identifying the chunk. Each chunk gets a unique index. + pub index: i32, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct StagePerfInfo { + /// Name of the stage. + pub name: String, + /// Fully qualified name of the stage. + pub fqname: String, + /// Index of the fork for this stage. Doesn't need to be the same as it's + /// parent fork index. + pub forki: i32, +} + +#[cfg(test)] +mod tests { + use super::Perf; + use crate::common::read_zst; + use crate::PipestanceFile; + use anyhow::Result; + + #[test] + fn test_perf_deserialize() -> Result<()> { + let _perf = Perf::from_string(read_zst("test_data/_perf.zst")?)?; + Ok(()) + } +} diff --git a/martian-pipestance/test_data/_finalstate.zst b/martian-pipestance/test_data/_finalstate.zst new file mode 100644 index 0000000000..07d97f195f Binary files /dev/null and b/martian-pipestance/test_data/_finalstate.zst differ diff --git a/martian-pipestance/test_data/_perf.zst b/martian-pipestance/test_data/_perf.zst new file mode 100644 index 0000000000..55fa291339 Binary files /dev/null and b/martian-pipestance/test_data/_perf.zst differ