diff --git a/.sqlx/query-2b595edd80e01c38d5792760e3c9ddfa27d5617cec346fcd252e1db7259241ec.json b/.sqlx/query-2b595edd80e01c38d5792760e3c9ddfa27d5617cec346fcd252e1db7259241ec.json new file mode 100644 index 00000000..fb779d6e --- /dev/null +++ b/.sqlx/query-2b595edd80e01c38d5792760e3c9ddfa27d5617cec346fcd252e1db7259241ec.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "SELECT id, stream, raw FROM payloads ORDER BY id ASC LIMIT 1", + "describe": { + "columns": [ + { + "name": "id", + "ordinal": 0, + "type_info": "Integer" + }, + { + "name": "stream", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "raw", + "ordinal": 2, + "type_info": "Text" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "2b595edd80e01c38d5792760e3c9ddfa27d5617cec346fcd252e1db7259241ec" +} diff --git a/.sqlx/query-b452087781be1222c5f9a71a5b72b32ae853c59533b8f82903c76f2e8fc377f9.json b/.sqlx/query-b452087781be1222c5f9a71a5b72b32ae853c59533b8f82903c76f2e8fc377f9.json new file mode 100644 index 00000000..772f739b --- /dev/null +++ b/.sqlx/query-b452087781be1222c5f9a71a5b72b32ae853c59533b8f82903c76f2e8fc377f9.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "DELETE FROM payloads WHERE id = ?1;", + "describe": { + "columns": [], + "parameters": { + "Right": 1 + }, + "nullable": [] + }, + "hash": "b452087781be1222c5f9a71a5b72b32ae853c59533b8f82903c76f2e8fc377f9" +} diff --git a/uplink/src/collector/events.rs b/uplink/src/collector/events.rs index e248a4dc..bb986ff8 100644 --- a/uplink/src/collector/events.rs +++ b/uplink/src/collector/events.rs @@ -63,23 +63,22 @@ impl Queue { Ok(()) } - /// Read out one from the payloads table - pub async fn peek(&mut self) -> Result<(StreamName, RawPayload), Error> { - let row = sqlx::query!("SELECT stream, raw FROM payloads ORDER BY id ASC LIMIT 1") + /// Read out one row from the payloads table, without deleting + pub async fn peek(&mut self) -> Result<(i64, StreamName, RawPayload), Error> { + let row = sqlx::query!("SELECT id, stream, raw FROM payloads ORDER BY id ASC LIMIT 1") .fetch_one(&mut self.conn) .await?; + let id = row.id; let stream = row.stream; let raw = row.raw; - Ok((stream, raw)) + Ok((id, stream, raw)) } /// Forget messages acked by the broker - pub async fn pop(&mut self) -> Result<(), Error> { - sqlx::query!("DELETE FROM payloads WHERE id = (SELECT MIN(id) FROM payloads);") - .execute(&mut self.conn) - .await?; + pub async fn pop(&mut self, id: i64) -> Result<(), Error> { + sqlx::query!("DELETE FROM payloads WHERE id = ?1;", id).execute(&mut self.conn).await?; Ok(()) } @@ -125,7 +124,7 @@ async fn push_to_broker_on_ack( ) { 'outer: loop { let mut guard = queue.lock().await; - let (stream, text) = match guard.peek().await { + let (id, stream, text) = match guard.peek().await { Ok(q) => q, Err(Error::Sql(sqlx::Error::RowNotFound)) => { debug!("Looks like event queue is handled for the time being, check again in 5s"); @@ -152,7 +151,7 @@ async fn push_to_broker_on_ack( error!("{e}") } // Pop acknowledged packet - if let Err(e) = queue.lock().await.pop().await { + if let Err(e) = queue.lock().await.pop(id).await { error!("{e}"); } }