Skip to content

Commit

Permalink
Merge pull request #31 from butaneprotocol/sg/network-timeout
Browse files Browse the repository at this point in the history
Add timeouts to network operations
  • Loading branch information
SupernaviX authored Aug 13, 2024
2 parents c92e698 + f5e0ec3 commit 53d2c9b
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 35 deletions.
11 changes: 6 additions & 5 deletions config.base.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
api_port: 8080 # Port to serve API requests
network_port: 31415 # Port to handle traffic from other nodes
health_port: 18000 # Port to report our health status
heartbeat_ms: 500 # How long should the leader wait before sending a heartbeat
timeout_ms: 2000 # How long should we wait before picking a new leader
api_port: 8080 # Port to serve API requests
network_port: 31415 # Port to handle traffic from other nodes
health_port: 18000 # Port to report our health status
heartbeat_ms: 500 # How long should the leader wait before sending a heartbeat
timeout_ms: 2000 # How long should we wait before picking a new leader
network_timeout_ms: 30000 # How long to allow TCP requests to run before timing out
consensus: true
logs:
json: true
Expand Down
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ struct RawOracleConfig {
pub network_port: u16,
pub health_port: u16,
pub api_port: u16,
pub network_timeout_ms: u64,
pub consensus: bool,
pub peers: Vec<RawPeerConfig>,
pub heartbeat_ms: u64,
Expand Down Expand Up @@ -119,6 +120,7 @@ impl TryFrom<RawOracleConfig> for OracleConfig {
id: id.clone(),
private_key,
peers,
timeout: Duration::from_millis(raw.network_timeout_ms),
};

let logs = LogConfig {
Expand Down Expand Up @@ -157,6 +159,7 @@ pub struct NetworkConfig {
pub private_key: SigningKey,
pub port: u16,
pub peers: Vec<Peer>,
pub timeout: Duration,
}

#[derive(Deserialize)]
Expand Down
108 changes: 78 additions & 30 deletions src/network/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio::{
select,
sync::{mpsc, watch, Mutex},
task::JoinSet,
time::{sleep, timeout},
time,
};
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use tracing::{debug, error, info, info_span, trace, warn, Instrument, Level};
Expand All @@ -40,9 +40,6 @@ use crate::{
};

use super::{Message as AppMessage, NodeId};
type EncodeSink = AsyncWriter<Compat<OwnedWriteHalf>>;
type DecodeStream = AsyncReader<Compat<OwnedReadHalf>>;

const ORACLE_VERSION: &str = env!("CARGO_PKG_VERSION");

#[derive(Decode, Encode, Clone, Debug)]
Expand Down Expand Up @@ -139,6 +136,51 @@ type OutgoingMessageReceiver = mpsc::Receiver<(Option<NodeId>, AppMessage, OtelC
type IncomingMessageSender = mpsc::Sender<(NodeId, AppMessage, OtelContext)>;
type MessageAndContext = (AppMessage, OtelContext);

fn wrap_stream(stream: TcpStream, timeout: Duration) -> (DecodeStream, EncodeSink) {
let (reader, writer) = stream.into_split();
let stream = DecodeStream::new(reader, timeout);
let sink = EncodeSink::new(writer, timeout);
(stream, sink)
}

#[derive(Debug)]
struct EncodeSink(AsyncWriter<Compat<OwnedWriteHalf>>, Duration);
impl EncodeSink {
fn new(writer: OwnedWriteHalf, timeout: Duration) -> Self {
Self(AsyncWriter::new(writer.compat_write()), timeout)
}
async fn write(&mut self, val: Message) -> Result<()> {
self.write_with_timeout(self.1, val).await
}

async fn write_with_timeout(&mut self, timeout: Duration, val: Message) -> Result<()> {
time::timeout(timeout, self.0.write(val))
.await
.context("write timed out")?
.context("write failed")?;
Ok(())
}
}

#[derive(Debug)]
struct DecodeStream(AsyncReader<Compat<OwnedReadHalf>>, Duration);
impl DecodeStream {
fn new(reader: OwnedReadHalf, timeout: Duration) -> Self {
Self(AsyncReader::new(reader.compat()), timeout)
}
async fn read(&mut self) -> Result<Option<Message>> {
self.read_with_timeout(self.1).await
}

async fn read_with_timeout(&mut self, timeout: Duration) -> Result<Option<Message>> {
let res = time::timeout(timeout, self.0.read())
.await
.context("read timed out")?
.context("read failed")?;
Ok(res)
}
}

#[derive(Clone)]
pub struct Core {
id: NodeId,
Expand All @@ -148,6 +190,7 @@ pub struct Core {
health_sink: Arc<HealthSink>,
outgoing_rx: Arc<Mutex<OutgoingMessageReceiver>>,
incoming_tx: Arc<IncomingMessageSender>,
timeout: Duration,
}

impl Core {
Expand All @@ -171,6 +214,7 @@ impl Core {
health_sink: Arc::new(health_sink),
outgoing_rx: Arc::new(Mutex::new(outgoing_rx)),
incoming_tx: Arc::new(incoming_tx),
timeout: config.timeout,
}
}

Expand Down Expand Up @@ -277,12 +321,22 @@ impl Core {
info!("Listening on: {}", addr);

let listener = TcpListener::bind(addr).await.unwrap();
while let Ok((stream, _)) = listener.accept().await {
let core = self.clone();
let rxs = outgoing_message_rxs.clone();
tokio::spawn(async move {
core.handle_incoming_connection(stream, rxs).await;
});
loop {
match listener.accept().await {
Ok((stream, _)) => {
let core = self.clone();
let rxs = outgoing_message_rxs.clone();
tokio::spawn(async move {
core.handle_incoming_connection(stream, rxs).await;
});
}
Err(error) => {
let span = info_span!("failed_incoming_connection");
span.in_scope(|| {
warn!("Error listening for new connections: {}", error);
});
}
};
}
}

Expand All @@ -301,9 +355,7 @@ impl Core {

let mut them = format!("<unknown> ({})", stream.peer_addr().unwrap());

let (read, write) = stream.into_split();
let mut stream: DecodeStream = DecodeStream::new(read.compat());
let mut sink: EncodeSink = EncodeSink::new(write.compat_write());
let (mut stream, mut sink) = wrap_stream(stream, self.timeout);

let mut peer_id = None;
let (peer, peer_version, secret) = match self
Expand Down Expand Up @@ -450,18 +502,15 @@ impl Core {
Ok(stream) => stream,
Err(error) => {
self.report_unhealthy_connection(&peer.id, &format!("{:#}", error));
sleep(Duration::from_secs(sleep_seconds)).await;
time::sleep(Duration::from_secs(sleep_seconds)).await;
if sleep_seconds < 8 {
sleep_seconds *= 2;
}
continue;
}
};

let (read, write) = stream.into_split();

let mut stream = DecodeStream::new(read.compat());
let mut sink = EncodeSink::new(write.compat_write());
let (mut stream, mut sink) = wrap_stream(stream, self.timeout);

let (peer_version, secret) = match self
.handshake_outgoing(&peer, &mut stream, &mut sink)
Expand All @@ -472,7 +521,7 @@ impl Core {
Err(error) => {
self.report_unhealthy_connection(&peer.id, &format!("{:#}", error));
try_send_disconnect(&them, &mut sink, format!("{:#}", error)).await;
sleep(Duration::from_secs(sleep_seconds)).await;
time::sleep(Duration::from_secs(sleep_seconds)).await;
if sleep_seconds < 8 {
sleep_seconds *= 2;
}
Expand All @@ -498,8 +547,9 @@ impl Core {
async fn open_connection(&self, peer: &Peer) -> Result<TcpStream> {
let them = peer.label.clone();
debug!(them, "Attempting to connect to {}", peer.id);
let stream = TcpStream::connect(&peer.address)
let stream = time::timeout(self.timeout, TcpStream::connect(&peer.address))
.await
.context("timeout opening connection")?
.context("error opening connection")?;

debug!(
Expand Down Expand Up @@ -793,7 +843,7 @@ async fn handle_ping(
};
trace!(them, ping_id, "ping sent");

match timeout(PING_TIMEOUT, pong_source.recv()).await {
match time::timeout(PING_TIMEOUT, pong_source.recv()).await {
Err(timeout) => return format!("could not receive ping response: {}", timeout),
Ok(None) => return "could not receive ping response: stream was closed".into(),
Ok(Some(pong_id)) => {
Expand All @@ -805,20 +855,18 @@ async fn handle_ping(
}
}

sleep(PING_TIMEOUT).await;
time::sleep(PING_TIMEOUT).await;
}
}

#[tracing::instrument]
pub(super) async fn try_send_disconnect(them: &str, sink: &mut EncodeSink, reason: String) {
match timeout(
Duration::from_secs(3),
sink.write(Message::Disconnect(reason)),
)
.await
match sink
.write_with_timeout(Duration::from_secs(3), Message::Disconnect(reason))
.await
.context("could not send disconnect message")
{
Err(timeout) => warn!(them, "could not send disconnect message: {}", timeout),
Ok(Err(send)) => warn!(them, "could not send disconnect message: {}", send),
Ok(Ok(_)) => {}
Ok(()) => {}
Err(error) => warn!(them, "{:#}", error),
}
}

0 comments on commit 53d2c9b

Please sign in to comment.