Skip to content

Commit

Permalink
fix: Should not panic if source sender errors out because of channel …
Browse files Browse the repository at this point in the history
…error (#1864)

* fix: Should not panic if source sender errors out because of channel error

* fix: Remove `unwrap`s in `ObjectStoreConnector`
  • Loading branch information
chubei authored Aug 17, 2023
1 parent 2dcb6e0 commit dd5c7a5
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 23 deletions.
11 changes: 8 additions & 3 deletions dozer-core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,14 @@ fn start_source(
Ok(_) => {}
// Channel disconnection means the source listener has quit.
// Maybe it quit gracefully so we don't need to panic.
Err(ExecutionError::CannotSendToChannel) => {}
// Other errors result in panic.
Err(e) => std::panic::panic_any(e),
Err(e) => {
if let ExecutionError::Source(e) = &e {
if let Some(ExecutionError::CannotSendToChannel) = e.downcast_ref() {
return;
}
}
std::panic::panic_any(e);
}
})
.map_err(ExecutionError::CannotSpawnWorkerThread)?;

Expand Down
28 changes: 10 additions & 18 deletions dozer-ingestion/src/connectors/object_store/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,14 @@ impl<T: DozerObjectStore> Connector for ObjectStoreConnector<T> {
}
Some(evt) => {
match evt {
IngestionMessageKind::SnapshottingStarted => {
ingestor_clone
.handle_message(IngestionMessage::new_snapshotting_started(
0, seq_no,
))
.map_err(ConnectorError::IngestorError)
.unwrap();
}
IngestionMessageKind::SnapshottingDone => {
ingestor_clone
.handle_message(IngestionMessage::new_snapshotting_done(
0, seq_no,
))
.map_err(ConnectorError::IngestorError)
.unwrap();
}
IngestionMessageKind::SnapshottingStarted => ingestor_clone
.handle_message(IngestionMessage::new_snapshotting_started(
0, seq_no,
))
.map_err(ConnectorError::IngestorError)?,
IngestionMessageKind::SnapshottingDone => ingestor_clone
.handle_message(IngestionMessage::new_snapshotting_done(0, seq_no))
.map_err(ConnectorError::IngestorError)?,
IngestionMessageKind::OperationEvent { table_index, op } => {
ingestor_clone
.handle_message(IngestionMessage::new_op(
Expand All @@ -143,14 +135,14 @@ impl<T: DozerObjectStore> Connector for ObjectStoreConnector<T> {
table_index,
op,
))
.map_err(ConnectorError::IngestorError)
.unwrap();
.map_err(ConnectorError::IngestorError)?
}
}
seq_no += 1;
}
}
}
Ok::<_, ConnectorError>(())
});

// sender sending out message for pipeline
Expand Down
7 changes: 5 additions & 2 deletions dozer-ingestion/src/connectors/object_store/table_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,16 @@ impl<T: Clone + Send + Sync> TableReader<T> {
},
};

sender
if sender
.send(Ok(Some(IngestionMessageKind::OperationEvent {
table_index,
op: evt,
})))
.await
.unwrap();
.is_err()
{
break;
}
}
}

Expand Down

0 comments on commit dd5c7a5

Please sign in to comment.