Skip to content

Commit

Permalink
fix: swap with write buffer when no packets left to be read
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Oct 13, 2024
1 parent 9bbad51 commit 52c8da8
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 40 deletions.
70 changes: 31 additions & 39 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ pub struct Storage {
current_write_file: BytesMut,
/// current_read_file
current_read_file: BytesMut,
/// Marked true only if current_read_file buffer is loaded from file
is_read_from_file: bool,
/// disk persistence
persistence: Option<Persistence>,
}
Expand All @@ -43,7 +41,6 @@ impl Storage {
max_file_size,
current_write_file: BytesMut::with_capacity(max_file_size * 2),
current_read_file: BytesMut::with_capacity(max_file_size * 2),
is_read_from_file: false,
persistence: None,
}
}
Expand Down Expand Up @@ -104,6 +101,13 @@ impl Storage {

/// Force flush the contents of write buffer onto disk
pub fn flush(&mut self) -> Result<Option<u64>, Error> {
let read_buffer_requires_flushing =
!self.persistence.as_ref().is_some_and(|p| p.current_read_file_id.is_some())
&& !self.current_read_file.is_empty();
if self.current_write_file.is_empty() && !read_buffer_requires_flushing {
return Err(Error::NoWrites);
}

let Some(persistence) = &mut self.persistence else {
// TODO(RT): Make sure that disk files starts with id 1 to represent in memory file
// with id 0
Expand All @@ -115,7 +119,7 @@ impl Storage {
return Ok(Some(0));
};

if !self.is_read_from_file && !self.current_read_file.is_empty() {
if read_buffer_requires_flushing {
let NextFile { mut file, deleted } = persistence.open_next_write_file()?;
if let Some(id) = deleted {
error!("Deleted file while flushing to disk: {id}");
Expand All @@ -124,9 +128,6 @@ impl Storage {
file.write(&mut self.current_read_file)?;
}

if self.current_write_file.is_empty() {
return Err(Error::NoWrites);
}
let NextFile { mut file, deleted } = persistence.open_next_write_file()?;
info!("Flushing data to disk for stoarge: {}; path = {:?}", self.name, file.path());
file.write(&mut self.current_write_file)?;
Expand All @@ -149,43 +150,34 @@ impl Storage {
return Ok(());
}

let Some(persistence) = &mut self.persistence else {
mem::swap(&mut self.current_read_file, &mut self.current_write_file);
// If read buffer is 0 after swapping, all the data is caught up
if self.current_read_file.is_empty() {
return Err(Error::Done);
if let Some(persistence) = &mut self.persistence {
// Remove read file on completion in destructive-read mode
if let Some(id) =
persistence.current_read_file_id.take_if(|_| !persistence.non_destructive_read)
{
let deleted_file = persistence.remove(id)?;
debug!("Completed reading a persistence file, deleting it; storage = {}, path = {deleted_file:?}", self.name);
}

return Ok(());
};

// Remove read file on completion in destructive-read mode
let read_is_destructive = !persistence.non_destructive_read;
let read_file_id = persistence.current_read_file_id.take();
if let Some(id) = read_is_destructive.then_some(read_file_id).flatten() {
let deleted_file = persistence.remove(id)?;
debug!("Completed reading a persistence file, deleting it; storage = {}, path = {deleted_file:?}", self.name);
}

// Swap read buffer with write buffer to read data in inmemory write
// buffer when all the backlog disk files are done
if persistence.backlog_files.is_empty() {
self.is_read_from_file = false;
mem::swap(&mut self.current_read_file, &mut self.current_write_file);
// If read buffer is 0 after swapping, all the data is caught up
if self.current_read_file.is_empty() {
return Err(Error::Done);
match persistence.load_next_read_file(&mut self.current_read_file) {
Err(Error::Done) => {}
Err(e) => {
error!("Couldn't read persisted file: {e}");
return Err(e);
}
_ => return Ok(()),
}
};

return Ok(());
}

if let Err(e) = persistence.load_next_read_file(&mut self.current_read_file) {
self.current_read_file.clear();
persistence.current_read_file_id.take();
return Err(e);
// Swap read buffer with write buffer to read data from inmemory write
// buffer when all the backlog disk files are done.
mem::swap(&mut self.current_read_file, &mut self.current_write_file);
// Write buffer is emptied to ensure fresh start.
self.current_write_file.clear();
// If read buffer is 0 after swapping, all the data is caught up
if self.current_read_file.is_empty() {
return Err(Error::Done);
}
self.is_read_from_file = true;

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl StorageHandler {

return Some((stream.to_owned(), publish));
}
// Reset read_stream if it is empty
// All packets read from storage
Err(storage::Error::Done) => {
if self.read_stream.take_if(|s| s == stream).is_some() {
debug!("Done reading from: {}", stream.topic);
Expand Down

0 comments on commit 52c8da8

Please sign in to comment.