Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use separate Tokio runtime for gossip persistence. #73

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::hex_utils;

use std::convert::TryInto;
use std::env;
use std::io::Cursor;
use std::net::{SocketAddr, ToSocketAddrs};
Expand Down
45 changes: 19 additions & 26 deletions src/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::fs::OpenOptions;
use std::io::{BufWriter, Write};
use std::ops::Deref;
use std::mem;
use std::sync::Arc;
use std::time::{Duration, Instant};
use lightning::log_info;
use lightning::routing::gossip::NetworkGraph;
use lightning::util::logger::Logger;
use lightning::util::ser::Writeable;
use tokio::runtime::Runtime;
use tokio::sync::{mpsc, Mutex, Semaphore};

use crate::config;
Expand All @@ -19,25 +19,28 @@ const INSERT_PARALELLISM: usize = 16;
pub(crate) struct GossipPersister<L: Deref> where L::Target: Logger {
gossip_persistence_receiver: mpsc::Receiver<GossipMessage>,
network_graph: Arc<NetworkGraph<L>>,
tokio_runtime: Runtime,
logger: L
}

impl<L: Deref> GossipPersister<L> where L::Target: Logger {
pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> (Self, mpsc::Sender<GossipMessage>) {
let (gossip_persistence_sender, gossip_persistence_receiver) =
mpsc::channel::<GossipMessage>(100);
let runtime = Runtime::new().unwrap();
(GossipPersister {
gossip_persistence_receiver,
network_graph,
tokio_runtime: runtime,
logger
}, gossip_persistence_sender)
}

pub(crate) async fn persist_gossip(&mut self) {
let mut client = crate::connect_to_db().await;
{ // initialize the database
// this client instance is only used once
let mut client = crate::connect_to_db().await;

{
// initialize the database
let initialization = client
.execute(config::db_config_table_creation_query(), &[])
.await;
Expand Down Expand Up @@ -118,6 +121,16 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
insert_limiter.acquire().await.unwrap().forget();

let limiter_ref = Arc::clone(&insert_limiter);
let client = {
let mut connections_set = connections_cache.lock().await;
let client = if connections_set.is_empty() {
crate::connect_to_db().await
} else {
connections_set.pop().unwrap()
};
client
};

let connections_cache_ref = Arc::clone(&connections_cache);
match gossip_message {
GossipMessage::ChannelAnnouncement(announcement, seen_override) => {
Expand All @@ -127,17 +140,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
let mut announcement_signed = Vec::new();
announcement.write(&mut announcement_signed).unwrap();

let _task = tokio::spawn(async move {
let client;
{
let mut connections_set = connections_cache_ref.lock().await;
if connections_set.is_empty() {
mem::drop(connections_set);
client = crate::connect_to_db().await;
} else {
client = connections_set.pop().unwrap();
}
}
let _task = self.tokio_runtime.spawn(async move {
if cfg!(test) && seen_override.is_some() {
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_announcements (\
Expand Down Expand Up @@ -219,17 +222,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
// this may not be used outside test cfg
let _seen_timestamp = seen_override.unwrap_or(timestamp as u32) as f64;

let _task = tokio::spawn(async move {
let client;
{
let mut connections_set = connections_cache_ref.lock().await;
if connections_set.is_empty() {
mem::drop(connections_set);
client = crate::connect_to_db().await;
} else {
client = connections_set.pop().unwrap();
}
}
let _task = self.tokio_runtime.spawn(async move {
tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute(insertion_statement, &[
&scid,
Expand Down
56 changes: 56 additions & 0 deletions src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,22 @@ async fn clean_test_db() {
});
}

#[tokio::test]
async fn test_persistence_runtime() {
let _sanitizer = SchemaSanitizer::new();
let logger = Arc::new(TestLogger::new());
let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
let network_graph_arc = Arc::new(network_graph);
let (_persister, _receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());

tokio::task::spawn_blocking(move || {
drop(_persister);
}).await.unwrap();

clean_test_db().await;
}


#[tokio::test]
async fn test_trivial_setup() {
let _sanitizer = SchemaSanitizer::new();
Expand Down Expand Up @@ -240,6 +256,10 @@ async fn test_trivial_setup() {
println!("last update b: {}", last_update_seen_b);
assert_eq!(last_update_seen_a, update_result - CLIENT_BACKDATE_INTERVAL);
assert_eq!(last_update_seen_b, update_result - CLIENT_BACKDATE_INTERVAL);

tokio::task::spawn_blocking(move || {
drop(persister);
}).await.unwrap();
}

/// If a channel has only seen updates in one direction, it should not be announced
Expand Down Expand Up @@ -303,6 +323,10 @@ async fn test_unidirectional_intermediate_update_consideration() {
let client_channel_count = channels.len();
assert_eq!(client_channel_count, 1);

tokio::task::spawn_blocking(move || {
drop(persister);
}).await.unwrap();

clean_test_db().await;
}

Expand Down Expand Up @@ -357,6 +381,10 @@ async fn test_bidirectional_intermediate_update_consideration() {
assert_eq!(serialization.update_count_full, 0);
assert_eq!(serialization.update_count_incremental, 1);

tokio::task::spawn_blocking(move || {
drop(persister);
}).await.unwrap();

clean_test_db().await;
}

Expand Down Expand Up @@ -399,6 +427,10 @@ async fn test_full_snapshot_recency() {

drop(receiver);
persister.persist_gossip().await;

tokio::task::spawn_blocking(move || {
drop(persister);
}).await.unwrap();
}

let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
Expand Down Expand Up @@ -475,6 +507,10 @@ async fn test_full_snapshot_recency_with_wrong_seen_order() {

drop(receiver);
persister.persist_gossip().await;

tokio::task::spawn_blocking(move || {
drop(persister);
}).await.unwrap();
}

let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
Expand Down Expand Up @@ -550,6 +586,10 @@ async fn test_full_snapshot_recency_with_wrong_propagation_order() {

drop(receiver);
persister.persist_gossip().await;

tokio::task::spawn_blocking(move || {
drop(persister);
}).await.unwrap();
}

let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
Expand Down Expand Up @@ -679,6 +719,10 @@ async fn test_full_snapshot_mutiny_scenario() {

drop(receiver);
persister.persist_gossip().await;

tokio::task::spawn_blocking(move || {
drop(persister);
}).await.unwrap();
}

let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
Expand Down Expand Up @@ -788,6 +832,10 @@ async fn test_full_snapshot_interlaced_channel_timestamps() {

drop(receiver);
persister.persist_gossip().await;

tokio::task::spawn_blocking(move || {
drop(persister);
}).await.unwrap();
}

let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
Expand Down Expand Up @@ -859,6 +907,10 @@ async fn test_full_snapshot_persistence() {

drop(receiver);
persister.persist_gossip().await;

tokio::task::spawn_blocking(move || {
drop(persister);
}).await.unwrap();
}

let cache_path = cache_sanitizer.cache_path();
Expand Down Expand Up @@ -900,6 +952,10 @@ async fn test_full_snapshot_persistence() {

drop(receiver);
persister.persist_gossip().await;

tokio::task::spawn_blocking(move || {
drop(persister);
}).await.unwrap();
}

// regenerate snapshots
Expand Down
1 change: 0 additions & 1 deletion src/tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::time::{Duration, Instant};

use bitcoin::hashes::hex::ToHex;
use bitcoin::secp256k1::PublicKey;
use lightning;
use lightning::ln::peer_handler::{
ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
};
Expand Down
1 change: 0 additions & 1 deletion src/verifier.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::convert::TryInto;
use std::io::ErrorKind;
use std::ops::Deref;
use std::sync::Arc;
Expand Down
Loading