Skip to content

Commit

Permalink
Error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
ameba23 committed Oct 30, 2024
1 parent 0221607 commit 08dafa8
Showing 1 changed file with 39 additions and 13 deletions.
52 changes: 39 additions & 13 deletions crates/protocol/src/execute_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,31 +104,57 @@ where
let artifact_tx = artifact_tx.clone();
let destination = destination.clone();
tokio::spawn(async move {
let (message, artifact) =
session_arc.make_message(&mut OsRng, &destination).unwrap();
tx.send(ProtocolMessage::new(&my_id, &destination, message)).unwrap();
artifact_tx.send(artifact).await.unwrap();
let result = match session_arc.make_message(&mut OsRng, &destination) {
Ok((message, artifact)) => {
match tx.send(ProtocolMessage::new(&my_id, &destination, message)) {
Ok(_) => Ok(artifact),
Err(err) => {
let err: GenericProtocolError<Res> = err.into();
Err(err)
},
}
},
Err(err) => Err(err.into()),
};
if artifact_tx.send(result).await.is_err() {
tracing::error!("Protocol finished before outgoing message artifact sent");
}
});
}

for _ in 0..destinations.len() {
if let Some(artifact) = artifact_rx.recv().await {
accum.add_artifact(artifact)?;
if let Some(result) = artifact_rx.recv().await {
accum.add_artifact(result?)?;
}
}
}

for preprocessed in cached_messages {
// TODO (#641): this may happen in a spawned task.
let processed = session_arc.process_message(&mut OsRng, preprocessed)?;
// Process cached messages
if !cached_messages.is_empty() {
let cached_messages_len = cached_messages.len();
let (process_tx, mut process_rx) = mpsc::channel(cached_messages_len);
for preprocessed in cached_messages {
let session_arc = session_arc.clone();
let process_tx = process_tx.clone();
tokio::spawn(async move {
let result = session_arc.process_message(&mut OsRng, preprocessed);
if process_tx.send(result).await.is_err() {
tracing::error!(
"Protocol finished before cached message processing result sent"
);
}
});
}

// This will happen in a host task.
accum.add_processed_message(processed)??;
for _ in 0..cached_messages_len {
if let Some(result) = process_rx.recv().await {
accum.add_processed_message(result?)??;
}
}
}

// Channel for receiving results of processing messages
// Receive and process incoming messages
let (process_tx, mut process_rx) = mpsc::channel(1024);

while !session_arc.can_finalize(&accum)? {
tokio::select! {
// Incoming message from remote peer
Expand Down

0 comments on commit 08dafa8

Please sign in to comment.