Skip to content

Commit

Permalink
support custom weight
Browse files Browse the repository at this point in the history
  • Loading branch information
sreenathkrishnan committed Sep 14, 2023
1 parent 2c04984 commit 662e923
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 43 deletions.
123 changes: 80 additions & 43 deletions martian-pipestance/src/critical_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,65 @@

use crate::common::NodeType;
use crate::final_state::{ArgumentMode, FinalState};
use crate::perf::Perf;
use crate::perf::{Perf, PerfElement};
use crate::PipestanceFile;
use anyhow::Result;
use ordered_float::OrderedFloat;
use petgraph::algo::toposort;
use petgraph::graph::{DiGraph, NodeIndex};
use serde_json::Value;
use std::collections::{BTreeMap, VecDeque};
use std::fmt;
use std::path::Path;

pub enum StageWeight {
/// The wall time of the stage ignoring any queueing time. This will be
/// the sum of wall times of split + slowest chunk + join
NoQueueWallTime,
/// Provide a custom function to compute the weight from the stage perf
CustomWeight(Box<dyn Fn(&PerfElement) -> f64>),
}

impl StageWeight {
fn weight(&self, perf_element: &PerfElement) -> f64 {
match self {
StageWeight::NoQueueWallTime => perf_element.no_queue_wall_time_seconds(),
StageWeight::CustomWeight(f) => f(perf_element),
}
}
}

pub struct CriticalPathNode {
pub struct CriticalPathStage {
pub id: String,
pub name: String,
pub no_queue_wall_time_seconds: f64,
pub weight: f64,
}

impl fmt::Debug for CriticalPathNode {
impl fmt::Debug for CriticalPathStage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} [{}s]", self.name, self.no_queue_wall_time_seconds)
write!(f, "{} [{}s]", self.name, self.weight)
}
}

#[derive(Debug)]
pub struct CriticalPath {
pub total_time_seconds: f64,
pub path: Vec<CriticalPathNode>,
pub total_weight: f64,
pub path: Vec<CriticalPathStage>,
}

impl CriticalPath {
pub fn compute(final_state: &FinalState, perf: &Perf) -> Self {
CriticalPathBuilder::new(final_state, perf).critical_path()
pub fn from_pipestance_folder(
folder: impl AsRef<Path>,
weight_function: StageWeight,
) -> Result<Self> {
Ok(Self::compute(
&FinalState::from_pipestance_folder(folder.as_ref())?,
&Perf::from_pipestance_folder(folder.as_ref())?,
weight_function,
))
}
pub fn compute(final_state: &FinalState, perf: &Perf, weight_function: StageWeight) -> Self {
CriticalPathBuilder::new().build(final_state, perf, weight_function)
}
}

Expand Down Expand Up @@ -77,7 +107,20 @@ impl CriticalPathBuilder {
const START_NODE: &str = "__START__";
const END_NODE: &str = "__END__";

fn new(final_state: &FinalState, perf: &Perf) -> Self {
fn new() -> Self {
Self::default()
}

fn build(
mut self,
final_state: &FinalState,
perf: &Perf,
weight_function: StageWeight,
) -> CriticalPath {
for perf_element in perf.0.iter().filter(|p| p.ty == NodeType::Stage) {
self.add_node(&perf_element.fqname, weight_function.weight(perf_element));
}

let return_path_map: BTreeMap<_, _> = final_state
.completed_stages()
.flat_map(|f| {
Expand All @@ -92,17 +135,8 @@ impl CriticalPathBuilder {
})
.collect();

let mut builder = Self::default();

for perf_element in perf.0.iter().filter(|p| p.ty == NodeType::Stage) {
builder.add_node(
&perf_element.fqname,
perf_element.no_queue_wall_time_seconds(),
);
}

for stage_state in final_state.completed_stages() {
assert!(builder.stage_id_map.contains_key(&stage_state.fqname));
assert!(self.stage_id_map.contains_key(&stage_state.fqname));

for binding_info in stage_state.argument_bindings() {
match binding_info.mode {
Expand All @@ -112,18 +146,17 @@ impl CriticalPathBuilder {
.into_iter()
.filter_map(|path| return_path_map.get(&path))
{
builder.add_edge(parent, &stage_state.fqname);
self.add_edge(parent, &stage_state.fqname);
}
}
}
ArgumentMode::Reference => {
builder.add_edge(binding_info.node.as_ref().unwrap(), &stage_state.fqname);
self.add_edge(binding_info.node.as_ref().unwrap(), &stage_state.fqname);
}
}
}
}

builder
self._critical_path()
}

fn add_node(&mut self, stage_id: &str, weight: f64) {
Expand Down Expand Up @@ -153,13 +186,10 @@ impl CriticalPathBuilder {
}
}

fn critical_path(mut self) -> CriticalPath {
fn _critical_path(mut self) -> CriticalPath {
self.add_start_and_end_node();

// Create a directed graph
let graph = self.graph;

// Find topological order
let topological_order: Vec<NodeIndex> = toposort(&graph, None).unwrap();

#[derive(Default, Debug, PartialEq, Eq, PartialOrd, Ord)]
Expand Down Expand Up @@ -188,25 +218,23 @@ impl CriticalPathBuilder {
max_weights.insert(*node, path_edge);
}

println!("{:#?}", max_weights);

// Trace back the path
let mut path = Vec::new();
let mut current_node = topological_order[0];
while let Some(child) = max_weights[&current_node].child {
current_node = child;
let stage_id = &graph[current_node].stage_id;
if stage_id != CriticalPathBuilder::END_NODE {
path.push(CriticalPathNode {
path.push(CriticalPathStage {
id: stage_id.clone(),
name: stage_id.split('.').last().unwrap().to_string(),
no_queue_wall_time_seconds: graph[current_node].weight.0,
weight: graph[current_node].weight.0,
});
}
}

CriticalPath {
total_time_seconds: max_weights[&topological_order[0]].weight.0,
total_weight: max_weights[&topological_order[0]].weight.0,
path,
}
}
Expand All @@ -219,22 +247,17 @@ mod tests {
use crate::PipestanceFile;
use anyhow::Result;

fn path_builder() -> Result<CriticalPathBuilder> {
#[test]
fn test_critical_path() -> Result<()> {
let final_state = FinalState::from_string(read_zst("test_data/_finalstate.zst")?)?;
let perf = Perf::from_string(read_zst("test_data/_perf.zst")?)?;
let builder = CriticalPathBuilder::new(&final_state, &perf);
Ok(builder)
}

#[test]
fn test_critical_path() -> Result<()> {
let builder = path_builder()?;
// println!("{:#?}", builder.stage_info);
let critical_path =
CriticalPath::compute(&final_state, &perf, StageWeight::NoQueueWallTime);

let critical_path = builder.critical_path();
println!("{:#?}", critical_path);

Check warning on line 258 in martian-pipestance/src/critical_path.rs

View workflow job for this annotation

GitHub Actions / clippy_check

variables can be used directly in the `format!` string

warning: variables can be used directly in the `format!` string --> martian-pipestance/src/critical_path.rs:258:9 | 258 | println!("{:#?}", critical_path); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#uninlined_format_args = note: requested on the command line with `-W clippy::uninlined-format-args` help: change this to | 258 - println!("{:#?}", critical_path); 258 + println!("{critical_path:#?}"); |

let max_weight = critical_path.total_time_seconds;
let max_weight = critical_path.total_weight;
let stages: Vec<_> = critical_path.path.into_iter().map(|p| p.name).collect();
assert!((max_weight - 3263.950373765).abs() / max_weight <= 1e-6);
assert_eq!(
Expand Down Expand Up @@ -265,4 +288,18 @@ mod tests {
);
Ok(())
}

#[test]
fn test_critical_path_custom() -> Result<()> {
let final_state = FinalState::from_string(read_zst("test_data/_finalstate.zst")?)?;
let perf = Perf::from_string(read_zst("test_data/_perf.zst")?)?;

let critical_path = CriticalPath::compute(
&final_state,
&perf,
StageWeight::CustomWeight(Box::new(|p| p.observed_wall_time())),
);
println!("{:#?}", critical_path);
Ok(())
}
}
9 changes: 9 additions & 0 deletions martian-pipestance/src/perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ impl PerfElement {
.reduce(f64::max)
.unwrap_or(0.0)
}

/// Actual wall time of the slowest fork
pub fn observed_wall_time(&self) -> f64 {
self.forks
.iter()
.map(|f| f.fork_stats.walltime)
.reduce(f64::max)
.unwrap_or(0.0)
}
}

/// Core type of a `_perf` file. Contains all information about a stage execution.
Expand Down

0 comments on commit 662e923

Please sign in to comment.