Skip to content

Commit

Permalink
feat: Serializer pushes latest data first if configured to do so (#365
Browse files Browse the repository at this point in the history
)

* feat: Serializer pushes latest data first if configured

* doc: example config explainig usecase

* feat: allow option per-stream

* style: don't warn, rm unnecessary handling
  • Loading branch information
de-sh authored Oct 14, 2024
1 parent 678a616 commit ae325b5
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 15 deletions.
13 changes: 12 additions & 1 deletion configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ default_buf_size = 1024 # 1KB
# Maximum number of data streams that can be accepted by uplink
max_stream_count = 10

# All streams will first push the latest packet before pushing historical data in
# FIFO order, defaults to false. This solves the problem of bad networks leading to
# data being pushed so slow that it is practically impossible to track the device.
default_live_data_first = true

# MQTT client configuration
#
# Required Parameters
Expand Down Expand Up @@ -84,13 +89,19 @@ blacklist = ["cancollector_metrics", "candump_metrics", "pinger"]
# used when there is a network/system failure.
# - priority(optional, u8): Higher prioirity streams get to push their data
# onto the network first.
# - live_data_first(optional, bool): All streams will first push the latest packet
# before pushing historical data in FIFO order, defaults to false. This solves the
# problem of bad networks leading to data being pushed so slow that it is practically
# impossible to track the device.
#
# In the following config for the device_shadow stream we set batch_size to 1 and mark
# it as non-persistent. streams are internally constructed as a map of Name -> Config
# it as non-persistent, also setting up live_data_first to enable quick delivery of stats.
# Streams are internally constructed as a map of Name -> Config
[streams.device_shadow]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray"
flush_period = 5
priority = 75
live_data_first = true

# Example using compression
[streams.imu]
Expand Down
55 changes: 45 additions & 10 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{sync::Arc, time::Duration};

use bytes::Bytes;
use flume::{bounded, Receiver, RecvError, Sender, TrySendError};
use log::{debug, error, info, trace};
use log::{debug, error, info, trace, warn};
use lz4_flex::frame::FrameEncoder;
use rumqttc::*;
use thiserror::Error;
Expand Down Expand Up @@ -133,11 +133,17 @@ impl MqttClient for AsyncClient {

pub struct Storage {
inner: storage::Storage,
live_data_first: bool,
latest_data: Option<Publish>,
}

impl Storage {
pub fn new(name: impl Into<String>, max_file_size: usize) -> Self {
Self { inner: storage::Storage::new(name, max_file_size) }
pub fn new(name: impl Into<String>, max_file_size: usize, live_data_first: bool) -> Self {
Self {
inner: storage::Storage::new(name, max_file_size),
live_data_first,
latest_data: None,
}
}

pub fn set_persistence(
Expand All @@ -151,6 +157,13 @@ impl Storage {
// Stores the provided publish packet by serializing it into storage, after setting its pkid to 1.
// If the write buffer is full, it is flushed/written onto disk based on config.
pub fn write(&mut self, mut publish: Publish) -> Result<Option<u64>, storage::Error> {
if self.live_data_first {
let Some(previous) = self.latest_data.replace(publish) else { return Ok(None) };
publish = previous;
} else if self.latest_data.is_some() {
warn!("Latest data should be unoccupied if not using the live data first scheme");
}

publish.pkid = 1;
if let Err(e) = publish.write(self.inner.writer()) {
error!("Failed to fill disk buffer. Error = {e}");
Expand All @@ -170,6 +183,10 @@ impl Storage {
// ## Panic
// When any packet other than a publish is deserialized.
pub fn read(&mut self, max_packet_size: usize) -> Option<Publish> {
if let Some(publish) = self.latest_data.take() {
return Some(publish);
}

// TODO(RT): This can fail when packet sizes > max_payload_size in config are written to disk.
// This leads to force switching to normal mode. Increasing max_payload_size to bypass this
match Packet::read(self.inner.reader(), max_packet_size) {
Expand All @@ -184,6 +201,15 @@ impl Storage {

// Ensures all data is written into persistence, when configured.
pub fn flush(&mut self) -> Result<Option<u64>, storage::Error> {
// Write live cache to disk when flushing
if let Some(mut publish) = self.latest_data.take() {
publish.pkid = 1;
if let Err(e) = publish.write(self.inner.writer()) {
error!("Failed to fill disk buffer. Error = {e}");
return Ok(None);
}
}

self.inner.flush()
}
}
Expand All @@ -202,8 +228,11 @@ impl StorageHandler {
// NOTE: persist action_status if not configured otherwise
streams.insert("action_status".into(), config.action_status.clone());
for (stream_name, stream_config) in streams {
let mut storage =
Storage::new(&stream_config.topic, stream_config.persistence.max_file_size);
let mut storage = Storage::new(
&stream_config.topic,
stream_config.persistence.max_file_size,
stream_config.live_data_first,
);
if stream_config.persistence.max_file_count > 0 {
let mut path = config.persistence_path.clone();
path.push(&stream_name);
Expand All @@ -229,7 +258,13 @@ impl StorageHandler {
match self
.map
.entry(stream.to_owned())
.or_insert_with(|| Storage::new(&stream.topic, self.config.default_buf_size))
.or_insert_with(|| {
Storage::new(
&stream.topic,
self.config.default_buf_size,
self.config.default_live_data_first,
)
})
.write(publish)
{
Ok(Some(deleted)) => {
Expand Down Expand Up @@ -907,7 +942,7 @@ pub mod tests {
fn read_write_storage() {
let config = Arc::new(default_config());

let mut storage = Storage::new("hello/world", 1024);
let mut storage = Storage::new("hello/world", 1024, false);
let mut publish = Publish::new(
"hello/world",
QoS::AtLeastOnce,
Expand Down Expand Up @@ -1024,7 +1059,7 @@ pub mod tests {
.storage_handler
.map
.entry(Arc::new(Default::default()))
.or_insert(Storage::new("hello/world", 1024));
.or_insert(Storage::new("hello/world", 1024, false));

let (stream_name, stream_config) = (
"hello",
Expand Down Expand Up @@ -1082,7 +1117,7 @@ pub mod tests {
topic: "hello/world".to_string(),
..Default::default()
}))
.or_insert(Storage::new("hello/world", 1024));
.or_insert(Storage::new("hello/world", 1024, false));

let (stream_name, stream_config) = (
"hello",
Expand Down Expand Up @@ -1194,7 +1229,7 @@ pub mod tests {
priority: 0,
..Default::default()
}))
.or_insert(Storage::new("topic/default", 1024));
.or_insert(Storage::new("topic/default", 1024, false));
default.write(publish("topic/default".to_string(), 0)).unwrap();
default.write(publish("topic/default".to_string(), 2)).unwrap();

Expand Down
5 changes: 5 additions & 0 deletions uplink/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ pub struct StreamConfig {
pub persistence: Persistence,
#[serde(default)]
pub priority: u8,
#[serde(default)]
pub live_data_first: bool,
}

impl Default for StreamConfig {
Expand All @@ -104,6 +106,7 @@ impl Default for StreamConfig {
compression: Compression::Disabled,
persistence: Persistence::default(),
priority: 0,
live_data_first: false,
}
}
}
Expand Down Expand Up @@ -536,6 +539,8 @@ pub struct Config {
pub logging: Option<LogcatConfig>,
pub precondition_checks: Option<PreconditionCheckerConfig>,
pub bus: Option<BusConfig>,
#[serde(default)]
pub default_live_data_first: bool,
}

impl Config {
Expand Down
60 changes: 56 additions & 4 deletions uplink/tests/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bytes::Bytes;
use flume::bounded;
use rumqttc::{Publish, QoS, Request};
use tempdir::TempDir;
use tokio::{runtime::Runtime, spawn};
use tokio::{runtime::Runtime, spawn, time::sleep};

use uplink::{
base::{bridge::Payload, serializer::Serializer},
Expand Down Expand Up @@ -83,18 +83,18 @@ async fn preferential_send_on_network() {
};

// write packets for one, two and top onto disk
let mut one = Storage::new("topic/one", 1024 * 1024);
let mut one = Storage::new("topic/one", 1024 * 1024, false);
one.set_persistence(persistence_path(&config.persistence_path, "one"), 1).unwrap();
one.write(publish("topic/one".to_string(), 4)).unwrap();
one.write(publish("topic/one".to_string(), 5)).unwrap();
one.flush().unwrap();

let mut two = Storage::new("topic/two", 1024 * 1024);
let mut two = Storage::new("topic/two", 1024 * 1024, false);
two.set_persistence(persistence_path(&config.persistence_path, "two"), 1).unwrap();
two.write(publish("topic/two".to_string(), 3)).unwrap();
two.flush().unwrap();

let mut top = Storage::new("topic/top", 1024 * 1024);
let mut top = Storage::new("topic/top", 1024 * 1024, false);
top.set_persistence(persistence_path(&config.persistence_path, "top"), 1).unwrap();
top.write(publish("topic/top".to_string(), 1)).unwrap();
top.write(publish("topic/top".to_string(), 2)).unwrap();
Expand Down Expand Up @@ -216,3 +216,55 @@ async fn fifo_data_push() {
assert_eq!(topic, "topic/default");
assert_eq!(payload, "[{\"sequence\":2,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");
}

#[tokio::test]
// Ensures that live data is pushed first if configured to do so
async fn prefer_live_data() {
let mut config = Config::default();
config.default_buf_size = 1024 * 1024;
config.mqtt.max_packet_size = 1024 * 1024;
config.default_live_data_first = true;
let config = Arc::new(config);
let (data_tx, data_rx) = bounded(0);
let (net_tx, req_rx) = bounded(0);
let (metrics_tx, _metrics_rx) = bounded(1);
let client = MockClient { net_tx };
let serializer = Serializer::new(config, data_rx, client, metrics_tx).unwrap();

// start serializer in the background
thread::spawn(|| _ = Runtime::new().unwrap().block_on(serializer.start()));

spawn(async {
let mut default = MockCollector::new(
"default",
StreamConfig { topic: "topic/default".to_owned(), batch_size: 1, ..Default::default() },
data_tx,
);
for i in 0.. {
default.send(i).await.unwrap();
sleep(Duration::from_millis(250)).await;
}
});

sleep(Duration::from_millis(750)).await;
let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/default");
assert_eq!(payload, "[{\"sequence\":0,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");

let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/default");
assert_eq!(payload, "[{\"sequence\":2,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");

let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap()
else {
unreachable!()
};
assert_eq!(topic, "topic/default");
assert_eq!(payload, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]");
}

0 comments on commit ae325b5

Please sign in to comment.