Skip to content

Commit

Permalink
feat: don't use SQL min
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Oct 2, 2024
1 parent 73157e1 commit 2485f83
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 10 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 9 additions & 10 deletions uplink/src/collector/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,22 @@ impl Queue {
Ok(())
}

/// Read out one from the payloads table
pub async fn peek(&mut self) -> Result<(StreamName, RawPayload), Error> {
let row = sqlx::query!("SELECT stream, raw FROM payloads ORDER BY id ASC LIMIT 1")
/// Read out one row from the payloads table, without deleting
pub async fn peek(&mut self) -> Result<(i64, StreamName, RawPayload), Error> {
let row = sqlx::query!("SELECT id, stream, raw FROM payloads ORDER BY id ASC LIMIT 1")
.fetch_one(&mut self.conn)
.await?;

let id = row.id;
let stream = row.stream;
let raw = row.raw;

Ok((stream, raw))
Ok((id, stream, raw))
}

/// Forget messages acked by the broker
pub async fn pop(&mut self) -> Result<(), Error> {
sqlx::query!("DELETE FROM payloads WHERE id = (SELECT MIN(id) FROM payloads);")
.execute(&mut self.conn)
.await?;
pub async fn pop(&mut self, id: i64) -> Result<(), Error> {
sqlx::query!("DELETE FROM payloads WHERE id = ?1;", id).execute(&mut self.conn).await?;

Ok(())
}
Expand Down Expand Up @@ -125,7 +124,7 @@ async fn push_to_broker_on_ack(
) {
'outer: loop {
let mut guard = queue.lock().await;
let (stream, text) = match guard.peek().await {
let (id, stream, text) = match guard.peek().await {
Ok(q) => q,
Err(Error::Sql(sqlx::Error::RowNotFound)) => {
debug!("Looks like event queue is handled for the time being, check again in 5s");
Expand All @@ -152,7 +151,7 @@ async fn push_to_broker_on_ack(
error!("{e}")
}
// Pop acknowledged packet
if let Err(e) = queue.lock().await.pop().await {
if let Err(e) = queue.lock().await.pop(id).await {
error!("{e}");
}
}
Expand Down

0 comments on commit 2485f83

Please sign in to comment.