Skip to content

Commit

Permalink
Add RabbitMQ adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
albert-dm committed Sep 11, 2024
1 parent 29cfa73 commit 5c3635f
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
DB_URL=postgres://postgres:postgres@localhost:5432/postgres
QUEUE_NAME=pagamento-queue
RABBITMQ_URL=amqp://rabbitmq:rabbitmq@127.0.0.1:5672/%2f
RABBITMQ_ADDR=amqp://rabbitmq:rabbitmq@127.0.0.1:5672/%2f
ENV=dev
2 changes: 1 addition & 1 deletion src/adapters/rabbitmq_pagament_update_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl RabbitMQPagamentoUpdateSubscriber {
self,
) -> Result<()> {
let res: Result<()> = async_global_executor::block_on(async {
let conn = Connection::connect(self.config.rabbitmq_addr.as_str(), ConnectionProperties::default()).await?;
let conn = Connection::connect(self.config.rabbitmq_addr.unwrap().as_str(), ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
let mut consumer = channel
.basic_consume(
Expand Down
8 changes: 6 additions & 2 deletions src/api/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct Config {
pub env: Env,
pub db_url: String,
pub api_key: String,
pub rabbitmq_addr: String,
pub rabbitmq_addr: Option<String>,
pub queue_name: String,
}

Expand All @@ -48,7 +48,11 @@ impl Config {
let db_url = env::var("DB_URL")
.unwrap_or("postgres://postgres:postgres@localhost:5432/postgres".to_string());
let api_key = env::var("API_KEY").unwrap_or("api_key".to_string());
let rabbitmq_addr = env::var("RABBITMQ_ADDR").unwrap_or("amqp://rabbitmq:rabbitmq@localhost:5672/%2f".to_string());
let rabbitmq_addr = match env::var("RABBITMQ_ADDR"){
Ok(addr) => Some(addr),
Err(_) => None
};

let queue_name = env::var("QUEUE_NAME").unwrap_or("queue_name".to_string());

Config {
Expand Down
21 changes: 14 additions & 7 deletions src/api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,20 @@ pub async fn main() -> Rocket<Build> {
let user_group_validator: Arc<dyn UserGroupValidatorAdapter + Sync + Send> =
Arc::new(user_group_validator);

let pagamento_update_subscriber = RabbitMQPagamentoUpdateSubscriber::new(
config.clone(),
produto_gateway.clone(),
pedido_gateway.clone(),
);

pagamento_update_subscriber.subscribe();
match config.rabbitmq_addr {
Some(_) => {
let pagamento_update_subscriber = RabbitMQPagamentoUpdateSubscriber::new(
config.clone(),
produto_gateway.clone(),
pedido_gateway.clone(),
);

pagamento_update_subscriber.subscribe()
}
None => {
println!("Not subscribed to RabbitMQ");
}
}

rocket::build()
.mount("/", routes![redirect_to_docs])
Expand Down
32 changes: 32 additions & 0 deletions src/bin/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,35 @@ async fn main() -> Result<(), rocket::Error> {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::{env, thread};

#[tokio::test]
async fn test_api() {
test_api_works().await;
test_api_breaks().await;
}

async fn test_api_works() {
env::set_var("ENV", "test");
env::remove_var("RABBITMQ_ADDR");

let service = thread::spawn(|| {
let _rocket = main();
});

assert!(service.join().is_ok());
}

async fn test_api_breaks() {
env::set_var("ENV", "prod");

thread::spawn(|| {
let rocket = main();
assert!(rocket.is_err());
});
}
}

0 comments on commit 5c3635f

Please sign in to comment.