Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
sreenathkrishnan committed Sep 14, 2023
1 parent dce4a33 commit 8972515
Showing 1 changed file with 36 additions and 24 deletions.
60 changes: 36 additions & 24 deletions martian-pipestance/src/critical_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,25 @@ impl CriticalPath {
}
}

#[derive(Default)]
struct GraphNode {
weight: OrderedFloat<f64>,
stage_id: String,
}

impl GraphNode {
fn new(weight: impl Into<OrderedFloat<f64>>, stage_id: impl ToString) -> Self {
GraphNode {
weight: weight.into(),
stage_id: stage_id.to_string(),
}
}
}

#[derive(Default)]
struct CriticalPathBuilder {
// stage_info: BTreeMap<StageId, StageInfo>,
graph: DiGraph<OrderedFloat<f64>, ()>,
graph: DiGraph<GraphNode, ()>,
stage_id_map: BTreeMap<String, NodeIndex>,
}

Expand Down Expand Up @@ -88,18 +103,16 @@ impl CriticalPathBuilder {

let mut builder = Self::default();

let stage_weights: BTreeMap<_, _> = perf
.0
.iter()
.filter_map(|p| {
(p.ty == NodeType::Stage).then(|| (&p.fqname, p.no_queue_wall_time_seconds()))
})
.collect();

for (&stage_id, &weight) in &stage_weights {
for perf_element in perf.0.iter().filter(|p| p.ty == NodeType::Stage) {
let stage_id = &perf_element.fqname;
assert!(!builder.stage_id_map.contains_key(stage_id));
let node_idx = builder.graph.add_node(weight.into());
builder.stage_id_map.insert(stage_id.to_string(), node_idx);
builder.stage_id_map.insert(
stage_id.to_string(),
builder.graph.add_node(GraphNode::new(
perf_element.no_queue_wall_time_seconds(),
stage_id,
)),
);
}

for stage_state in final_state.completed_stages() {
Expand Down Expand Up @@ -140,11 +153,13 @@ impl CriticalPathBuilder {

self.stage_id_map.insert(
Self::START_NODE.to_string(),
self.graph.add_node(0.0.into()),
self.graph.add_node(GraphNode::new(0.0, Self::START_NODE)),
);

self.stage_id_map
.insert(Self::END_NODE.to_string(), self.graph.add_node(0.0.into()));
self.stage_id_map.insert(
Self::END_NODE.to_string(),
self.graph.add_node(GraphNode::new(0.0, Self::END_NODE)),
);

for node in all_nodes {
self.graph.update_edge(
Expand All @@ -166,9 +181,6 @@ impl CriticalPathBuilder {
// Create a directed graph
let graph = self.graph;

let stage_id_of_node: BTreeMap<_, _> =
self.stage_id_map.iter().map(|(k, v)| (v, k)).collect();

// Find topological order
let topological_order: Vec<NodeIndex> = toposort(&graph, None).unwrap();

Expand All @@ -182,7 +194,7 @@ impl CriticalPathBuilder {
let mut max_weights: BTreeMap<NodeIndex, MaxWeight> = BTreeMap::new();

for node in topological_order.iter().rev() {
let node_weight = graph[*node];
let node_weight = graph[*node].weight;
let path_edge = graph
.neighbors_directed(*node, petgraph::Direction::Outgoing)
.map(|child| MaxWeight {
Expand All @@ -205,12 +217,12 @@ impl CriticalPathBuilder {
let mut current_node = topological_order[0];
while let Some(child) = max_weights[&current_node].child {
current_node = child;
let node_id = stage_id_of_node[&current_node];
if node_id != CriticalPathBuilder::END_NODE {
let stage_id = &graph[current_node].stage_id;
if stage_id != CriticalPathBuilder::END_NODE {
path.push(CriticalPathNode {
id: node_id.clone(),
name: node_id.split('.').last().unwrap().to_string(),
no_queue_wall_time_seconds: graph[current_node].0,
id: stage_id.clone(),
name: stage_id.split('.').last().unwrap().to_string(),
no_queue_wall_time_seconds: graph[current_node].weight.0,
});
}
}
Expand Down

0 comments on commit 8972515

Please sign in to comment.