Skip to content

Commit

Permalink
Resolve schema issues in test.
Browse files Browse the repository at this point in the history
  • Loading branch information
arik-so committed Mar 13, 2024
1 parent 81cb3b3 commit 7795d9c
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
13 changes: 10 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ impl<L: Deref + Clone + Send + Sync + 'static> RapidSyncProcessor<L> where L::Ta
}

pub(crate) async fn connect_to_db() -> Client {
let mut _schema = None;
#[cfg(test)]
{
_schema = Some(tests::db_test_schema());
}
connect_to_db_internal(_schema).await
}

pub(crate) async fn connect_to_db_internal(schema: Option<String>) -> Client {
let connection_config = config::db_connection_config();
let (client, connection) = connection_config.connect(NoTls).await.unwrap();

Expand All @@ -129,9 +138,7 @@ pub(crate) async fn connect_to_db() -> Client {
}
});

#[cfg(test)]
{
let schema_name = tests::db_test_schema();
if let Some(schema_name) = schema {
let schema_creation_command = format!("CREATE SCHEMA IF NOT EXISTS {}", schema_name);
client.execute(&schema_creation_command, &[]).await.unwrap();
client.execute(&format!("SET search_path TO {}", schema_name), &[]).await.unwrap();
Expand Down
12 changes: 10 additions & 2 deletions src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
let mut latest_graph_cache_time = Instant::now();
let insert_limiter = Arc::new(Semaphore::new(INSERT_PARALELLISM));
let connections_cache = Arc::new(Mutex::new(Vec::with_capacity(INSERT_PARALELLISM)));

let mut _db_schema = None;
#[cfg(test)]
{
_db_schema = Some(crate::tests::db_test_schema());
}

#[cfg(test)]
let mut tasks_spawned = Vec::new();
// TODO: it would be nice to have some sort of timeout here so after 10 seconds of
Expand All @@ -123,6 +130,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {

let limiter_ref = Arc::clone(&insert_limiter);
let connections_cache_ref = Arc::clone(&connections_cache);
let current_db_schema = _db_schema.clone();
match gossip_message {
GossipMessage::ChannelAnnouncement(announcement, seen_override) => {
let scid = announcement.contents.short_channel_id as i64;
Expand All @@ -137,7 +145,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
let mut connections_set = connections_cache_ref.lock().await;
if connections_set.is_empty() {
mem::drop(connections_set);
client = crate::connect_to_db().await;
client = crate::connect_to_db_internal(current_db_schema).await;
} else {
client = connections_set.pop().unwrap();
}
Expand Down Expand Up @@ -229,7 +237,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
let mut connections_set = connections_cache_ref.lock().await;
if connections_set.is_empty() {
mem::drop(connections_set);
client = crate::connect_to_db().await;
client = crate::connect_to_db_internal(current_db_schema).await;
} else {
client = connections_set.pop().unwrap();
}
Expand Down

0 comments on commit 7795d9c

Please sign in to comment.