diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 6c3b914c..55752ebc 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -381,23 +381,7 @@ impl Serializer { select! { data = self.collector_rx.recv_async() => { let data = data?; - let stream = data.stream_config(); - let publish = construct_publish(data, &mut self.stream_metrics)?; - let storage = self.storage_handler.select(&stream); - match write_to_storage(publish, storage) { - Ok(Some(deleted)) => { - debug!("Lost segment = {deleted}"); - self.metrics.increment_lost_segments(); - } - Ok(_) => {}, - Err(e) => { - error!("Storage write error = {e}"); - self.metrics.increment_errors(); - } - }; - - // Update metrics - self.metrics.add_batch(); + store_received_data(data, &mut self.storage_handler,&mut self.stream_metrics, &mut self.metrics)?; } o = &mut publish => match o { Ok(_) => break Ok(Status::EventLoopReady), @@ -438,56 +422,21 @@ impl Serializer { self.metrics.set_mode("catchup"); let max_packet_size = self.config.mqtt.max_packet_size; - let client = self.client.clone(); - - let Some((stream, storage)) = self.storage_handler.next(&mut self.metrics) else { + let Some((mut last_publish_stream, publish)) = + next_publish(&mut self.storage_handler, &mut self.metrics, max_packet_size)? + else { return Ok(Status::Normal); }; - // TODO(RT): This can fail when packet sizes > max_payload_size in config are written to disk. - // This leads to force switching to normal mode. Increasing max_payload_size to bypass this - let publish = match Packet::read(storage.reader(), max_packet_size) { - Ok(Packet::Publish(publish)) => publish, - Ok(packet) => unreachable!("Unexpected packet: {:?}", packet), - Err(e) => { - self.metrics.increment_errors(); - error!("Failed to read from storage. Forcing into Normal mode. Error = {e}"); - save_and_prepare_next_metrics( - &mut self.pending_metrics, - &mut self.metrics, - &mut self.stream_metrics, - &self.storage_handler, - ); - return Ok(Status::Normal); - } - }; - let mut last_publish_payload_size = publish.payload.len(); - let mut last_publish_stream = stream.clone(); - let send = send_publish(client, publish.topic, publish.payload); + let send = send_publish(self.client.clone(), publish.topic, publish.payload); tokio::pin!(send); let v: Result = loop { select! { data = self.collector_rx.recv_async() => { let data = data?; - let stream = data.stream_config(); - let publish = construct_publish(data, &mut self.stream_metrics)?; - let storage = self.storage_handler.select(&stream); - match write_to_storage(publish, storage) { - Ok(Some(deleted)) => { - debug!("Lost segment = {deleted}"); - self.metrics.increment_lost_segments(); - } - Ok(_) => {}, - Err(e) => { - error!("Storage write error = {e}"); - self.metrics.increment_errors(); - } - }; - - // Update metrics - self.metrics.add_batch(); + store_received_data(data, &mut self.storage_handler,&mut self.stream_metrics, &mut self.metrics)?; } o = &mut send => { self.metrics.add_sent_size(last_publish_payload_size); @@ -495,25 +444,16 @@ impl Serializer { // indefinitely write to disk to not loose data let client = match o { Ok(c) => c, - Err(MqttError::Send(Request::Publish(publish))) => break Ok(Status::EventLoopCrash(publish, last_publish_stream.clone())), - Err(e) => unreachable!("Unexpected error: {e}"), - }; - - let Some((stream, storage)) = self.storage_handler.next(&mut self.metrics) else { - return Ok(Status::Normal); - }; - - let publish = match Packet::read(storage.reader(), max_packet_size) { - Ok(Packet::Publish(publish)) => publish, - Ok(packet) => unreachable!("Unexpected packet: {:?}", packet), - Err(e) => { - error!("Failed to read from storage. Forcing into Normal mode. Error = {e}"); - break Ok(Status::Normal) + Err(MqttError::Send(Request::Publish(publish))) => { + break Ok(Status::EventLoopCrash(publish, last_publish_stream.clone())) } + Err(e) => unreachable!("Unexpected error: {e}"), }; - self.metrics.add_batch(); - + let Some((stream, publish)) = next_publish(&mut self.storage_handler, &mut self.metrics, max_packet_size)? + else { + return Ok(Status::Normal); + }; let payload = publish.payload; last_publish_payload_size = payload.len(); last_publish_stream = stream.clone(); @@ -606,6 +546,61 @@ impl Serializer { } } +// Selects the right read buffer for storage and serializes received data as a Publish packet into it. +// Updates metrics regarding the serializer as well. +fn store_received_data( + data: Box, + storage_handler: &mut StorageHandler, + stream_metrics: &mut HashMap, + metrics: &mut Metrics, +) -> Result<(), Error> { + let stream = data.stream_config(); + let publish = construct_publish(data, stream_metrics)?; + let storage = storage_handler.select(&stream); + match write_to_storage(publish, storage) { + Ok(Some(deleted)) => { + debug!("Lost segment = {deleted}"); + metrics.increment_lost_segments(); + } + Ok(_) => {} + Err(e) => { + error!("Storage write error = {e}"); + metrics.increment_errors(); + } + }; + + // Update metrics + metrics.add_batch(); + + Ok(()) +} + +// Deserializes a Publish packet from the storage read buffer and updates metrics regarding the serializer. +fn next_publish( + storage_handler: &mut StorageHandler, + metrics: &mut Metrics, + max_packet_size: usize, +) -> Result, Publish)>, Error> { + let Some((stream, storage)) = storage_handler.next(metrics) else { + return Ok(None); + }; + + // TODO(RT): This can fail when packet sizes > max_payload_size in config are written to disk. + // This leads to force switching to normal mode. Increasing max_payload_size to bypass this + let publish = match Packet::read(storage.reader(), max_packet_size) { + Ok(Packet::Publish(publish)) => publish, + Ok(packet) => unreachable!("Unexpected packet: {:?}", packet), + Err(e) => { + error!("Failed to read from storage. Forcing into Normal mode. Error = {e}"); + return Ok(None); + } + }; + + metrics.add_batch(); + + Ok(Some((stream.clone(), publish))) +} + async fn send_publish( client: C, topic: String,