From db63107f5807066ed9eeb7e4212dd0bfbae5ad8a Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 2 Aug 2023 14:33:05 -0700 Subject: [PATCH 01/10] Upgrade to 116 release. --- Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ec3f3a6..99aeed7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,9 +5,9 @@ edition = "2021" [dependencies] bitcoin = "0.29" -lightning = { version = "0.0.116-alpha1" } -lightning-block-sync = { version = "0.0.116-alpha1", features=["rest-client"] } -lightning-net-tokio = { version = "0.0.116-alpha1" } +lightning = { version = "0.0.116" } +lightning-block-sync = { version = "0.0.116", features=["rest-client"] } +lightning-net-tokio = { version = "0.0.116" } tokio = { version = "1.25", features = ["full"] } tokio-postgres = { version="=0.7.5" } futures = "0.3" From 2cf9129a187a66cfed10f9583c14fc8ee7339a18 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 2 Aug 2023 15:41:14 -0700 Subject: [PATCH 02/10] Allow custom logger types. --- src/downloader.rs | 26 +++++++++++++------------- src/lib.rs | 29 ++++++++++++++++------------- src/lookup.rs | 5 +++-- src/main.rs | 5 ++++- src/persistence.rs | 11 ++++++----- src/snapshot.rs | 11 ++++++----- src/tracking.rs | 15 +++++++++------ src/types.rs | 17 ++++++----------- src/verifier.rs | 18 +++++++++--------- 9 files changed, 72 insertions(+), 65 deletions(-) diff --git a/src/downloader.rs b/src/downloader.rs index ac11ec7..0c672e3 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -5,10 +5,10 @@ use lightning::events::{MessageSendEvent, MessageSendEventsProvider}; use lightning::ln::features::{InitFeatures, NodeFeatures}; use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler}; use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; +use lightning::util::logger::Logger; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; -use crate::TestLogger; use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager}; use crate::verifier::ChainVerifier; @@ -30,28 +30,28 @@ impl GossipCounter { } } -pub(crate) struct GossipRouter { - native_router: P2PGossipSync>, GossipChainAccess, TestLogger>, +pub(crate) struct GossipRouter { + native_router: P2PGossipSync>>, GossipChainAccess, Arc>, pub(crate) counter: RwLock, sender: mpsc::Sender, - verifier: Arc, - outbound_gossiper: Arc>, GossipChainAccess, TestLogger>>, + verifier: Arc>, + outbound_gossiper: Arc>>, GossipChainAccess, Arc>>, } -impl GossipRouter { - pub(crate) fn new(network_graph: Arc>, sender: mpsc::Sender) -> Self { - let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, TestLogger::new())); +impl GossipRouter { + pub(crate) fn new(network_graph: Arc>>, sender: mpsc::Sender, logger: Arc) -> Self { + let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, logger.clone())); let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper))); Self { - native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), TestLogger::new()), + native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), logger.clone()), outbound_gossiper, counter: RwLock::new(GossipCounter::new()), sender, - verifier, + verifier } } - pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) { + pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) { self.verifier.set_ph(peer_handler); } @@ -83,7 +83,7 @@ impl GossipRouter { } } -impl MessageSendEventsProvider for GossipRouter { +impl MessageSendEventsProvider for GossipRouter { fn get_and_clear_pending_msg_events(&self) -> Vec { let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events(); for ev in gossip_evs { @@ -102,7 +102,7 @@ impl MessageSendEventsProvider for GossipRouter { } } -impl RoutingMessageHandler for GossipRouter { +impl RoutingMessageHandler for GossipRouter { fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result { self.native_router.handle_node_announcement(msg) } diff --git a/src/lib.rs b/src/lib.rs index 37bbe4c..363d4ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,7 @@ use std::io::BufReader; use std::sync::Arc; use lightning::routing::gossip::{NetworkGraph, NodeId}; +use lightning::util::logger::Logger; use lightning::util::ser::{ReadableArgs, Writeable}; use tokio::sync::mpsc; use crate::lookup::DeltaSet; @@ -22,10 +23,9 @@ use crate::lookup::DeltaSet; use crate::persistence::GossipPersister; use crate::serialization::UpdateSerialization; use crate::snapshot::Snapshotter; -use crate::types::TestLogger; +use crate::types::RGSSLogger; mod downloader; -mod types; mod tracking; mod lookup; mod persistence; @@ -35,14 +35,17 @@ mod config; mod hex_utils; mod verifier; +pub mod types; + /// The purpose of this prefix is to identify the serialization format, should other rapid gossip /// sync formats arise in the future. /// /// The fourth byte is the protocol version in case our format gets updated. const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1]; -pub struct RapidSyncProcessor { - network_graph: Arc>, +pub struct RapidSyncProcessor { + network_graph: Arc>>, + logger: Arc } pub struct SerializedResponse { @@ -54,27 +57,27 @@ pub struct SerializedResponse { pub update_count_incremental: u32, } -impl RapidSyncProcessor { - pub fn new() -> Self { +impl RapidSyncProcessor { + pub fn new(logger: Arc) -> Self { let network = config::network(); - let logger = TestLogger::new(); let network_graph = if let Ok(file) = File::open(&config::network_graph_cache_path()) { println!("Initializing from cached network graph…"); let mut buffered_reader = BufReader::new(file); - let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger); + let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger.clone()); if let Ok(network_graph) = network_graph_result { println!("Initialized from cached network graph!"); network_graph } else { println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap()); - NetworkGraph::new(network, logger) + NetworkGraph::new(network, logger.clone()) } } else { - NetworkGraph::new(network, logger) + NetworkGraph::new(network, logger.clone()) }; let arc_network_graph = Arc::new(network_graph); Self { network_graph: arc_network_graph, + logger } } @@ -87,7 +90,7 @@ impl RapidSyncProcessor { println!("Starting gossip download"); tokio::spawn(tracking::download_gossip(persistence_sender, sync_completion_sender, - Arc::clone(&self.network_graph))); + Arc::clone(&self.network_graph), Arc::clone(&self.logger))); println!("Starting gossip db persistence listener"); tokio::spawn(async move { persister.persist_gossip().await; }); } else { @@ -126,7 +129,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec { let chain_hash = genesis_block.block_hash(); chain_hash.write(&mut blob).unwrap(); - let blob_timestamp = Snapshotter::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32; + let blob_timestamp = Snapshotter::::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32; blob_timestamp.write(&mut blob).unwrap(); 0u32.write(&mut blob).unwrap(); // node count @@ -136,7 +139,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec { blob } -async fn serialize_delta(network_graph: Arc>, last_sync_timestamp: u32) -> SerializedResponse { +async fn serialize_delta(network_graph: Arc>>, last_sync_timestamp: u32) -> SerializedResponse { let (client, connection) = lookup::connect_to_db().await; network_graph.remove_stale_channels_and_tracking(); diff --git a/src/lookup.rs b/src/lookup.rs index c554f9f..79fb84e 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -11,8 +11,9 @@ use tokio_postgres::{Client, Connection, NoTls, Socket}; use tokio_postgres::tls::NoTlsStream; use futures::StreamExt; +use lightning::util::logger::Logger; -use crate::{config, TestLogger}; +use crate::config; use crate::serialization::MutatedProperties; /// The delta set needs to be a BTreeMap so the keys are sorted. @@ -75,7 +76,7 @@ pub(super) async fn connect_to_db() -> (Client, Connection) /// whether they had been seen before. /// Also include all announcements for which the first update was announced /// after `last_sync_timestamp` -pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc>, client: &Client, last_sync_timestamp: u32) { +pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc>>, client: &Client, last_sync_timestamp: u32) { println!("Obtaining channel ids from network graph"); let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64)); let channel_ids = { diff --git a/src/main.rs b/src/main.rs index e3468be..dae4d3e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,9 @@ +use std::sync::Arc; use rapid_gossip_sync_server::RapidSyncProcessor; +use rapid_gossip_sync_server::types::RGSSLogger; #[tokio::main] async fn main() { - RapidSyncProcessor::new().start_sync().await; + let logger = Arc::new(RGSSLogger::new()); + RapidSyncProcessor::new(logger).start_sync().await; } diff --git a/src/persistence.rs b/src/persistence.rs index ac66733..22abf02 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -3,22 +3,23 @@ use std::io::{BufWriter, Write}; use std::sync::Arc; use std::time::{Duration, Instant}; use lightning::routing::gossip::NetworkGraph; +use lightning::util::logger::Logger; use lightning::util::ser::Writeable; use tokio::sync::mpsc; use tokio_postgres::NoTls; -use crate::{config, TestLogger}; +use crate::config; use crate::types::GossipMessage; const POSTGRES_INSERT_TIMEOUT: Duration = Duration::from_secs(15); -pub(crate) struct GossipPersister { +pub(crate) struct GossipPersister { gossip_persistence_receiver: mpsc::Receiver, - network_graph: Arc>, + network_graph: Arc>>, } -impl GossipPersister { - pub fn new(network_graph: Arc>) -> (Self, mpsc::Sender) { +impl GossipPersister { + pub fn new(network_graph: Arc>>) -> (Self, mpsc::Sender) { let (gossip_persistence_sender, gossip_persistence_receiver) = mpsc::channel::(100); (GossipPersister { diff --git a/src/snapshot.rs b/src/snapshot.rs index ac80079..c81cc35 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -5,16 +5,17 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use lightning::routing::gossip::NetworkGraph; +use lightning::util::logger::Logger; -use crate::{config, TestLogger}; +use crate::config; use crate::config::cache_path; -pub(crate) struct Snapshotter { - network_graph: Arc>, +pub(crate) struct Snapshotter { + network_graph: Arc>>, } -impl Snapshotter { - pub fn new(network_graph: Arc>) -> Self { +impl Snapshotter { + pub fn new(network_graph: Arc>>) -> Self { Self { network_graph } } diff --git a/src/tracking.rs b/src/tracking.rs index 8d2668f..2935eb2 100644 --- a/src/tracking.rs +++ b/src/tracking.rs @@ -12,15 +12,18 @@ use lightning::ln::peer_handler::{ }; use lightning::routing::gossip::NetworkGraph; use lightning::sign::KeysManager; +use lightning::util::logger::Logger; use tokio::sync::mpsc; -use crate::{config, TestLogger}; +use crate::config; use crate::downloader::GossipRouter; use crate::types::{GossipMessage, GossipPeerManager}; -pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, +pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, completion_sender: mpsc::Sender<()>, - network_graph: Arc>) { + network_graph: Arc>>, + logger: Arc +) { let mut key = [42; 32]; let mut random_data = [43; 32]; // Get something psuedo-random from std. @@ -33,7 +36,7 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender bool { +async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool { eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string()); let connection = lightning_net_tokio::connect_outbound( Arc::clone(&peer_manager), diff --git a/src/types.rs b/src/types.rs index 77a53c4..b18cbe7 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,5 +1,4 @@ use std::sync::Arc; -use std::ops::Deref; use lightning::sign::KeysManager; use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate}; @@ -9,8 +8,8 @@ use lightning::util::logger::{Logger, Record}; use crate::downloader::GossipRouter; use crate::verifier::ChainVerifier; -pub(crate) type GossipChainAccess = Arc; -pub(crate) type GossipPeerManager = Arc, IgnoringMessageHandler, TestLogger, IgnoringMessageHandler, Arc>>; +pub(crate) type GossipChainAccess = Arc>; +pub(crate) type GossipPeerManager = Arc>, IgnoringMessageHandler, Arc, IgnoringMessageHandler, Arc>>; #[derive(Debug)] pub(crate) enum GossipMessage { @@ -19,19 +18,15 @@ pub(crate) enum GossipMessage { } #[derive(Clone, Copy)] -pub(crate) struct TestLogger {} -impl Deref for TestLogger { - type Target = Self; - fn deref(&self) -> &Self { self } -} +pub struct RGSSLogger {} -impl TestLogger { - pub(crate) fn new() -> TestLogger { +impl RGSSLogger { + pub fn new() -> RGSSLogger { Self {} } } -impl Logger for TestLogger { +impl Logger for RGSSLogger { fn log(&self, record: &Record) { // TODO: allow log level threshold to be set println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args); diff --git a/src/verifier.rs b/src/verifier.rs index 4bda871..a88f2b7 100644 --- a/src/verifier.rs +++ b/src/verifier.rs @@ -7,25 +7,25 @@ use bitcoin::blockdata::block::Block; use bitcoin::hashes::Hash; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupError}; +use lightning::util::logger::Logger; use lightning_block_sync::{BlockData, BlockSource}; use lightning_block_sync::http::BinaryResponse; use lightning_block_sync::rest::RestClient; use crate::config; -use crate::TestLogger; use crate::types::GossipPeerManager; -pub(crate) struct ChainVerifier { +pub(crate) struct ChainVerifier { rest_client: Arc, - graph: Arc>, - outbound_gossiper: Arc>, Arc, TestLogger>>, - peer_handler: Mutex>, + graph: Arc>>, + outbound_gossiper: Arc>>, Arc, Arc>>, + peer_handler: Mutex>>, } struct RestBinaryResponse(Vec); -impl ChainVerifier { - pub(crate) fn new(graph: Arc>, outbound_gossiper: Arc>, Arc, TestLogger>>) -> Self { +impl ChainVerifier { + pub(crate) fn new(graph: Arc>>, outbound_gossiper: Arc>>, Arc, Arc>>) -> Self { ChainVerifier { rest_client: Arc::new(RestClient::new(config::bitcoin_rest_endpoint()).unwrap()), outbound_gossiper, @@ -33,7 +33,7 @@ impl ChainVerifier { peer_handler: Mutex::new(None), } } - pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager) { + pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager) { *self.peer_handler.lock().unwrap() = Some(peer_handler); } @@ -73,7 +73,7 @@ impl ChainVerifier { } } -impl UtxoLookup for ChainVerifier { +impl UtxoLookup for ChainVerifier { fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult { let res = UtxoFuture::new(); let fut = res.clone(); From cf326fd7a9f42c58a24788943745f066ecb5f641 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 2 Aug 2023 16:00:36 -0700 Subject: [PATCH 03/10] Switch Logger from Arc to Deref. --- src/downloader.rs | 15 ++++++++------- src/lib.rs | 17 +++++++++-------- src/lookup.rs | 4 ++-- src/persistence.rs | 9 +++++---- src/snapshot.rs | 9 +++++---- src/tracking.rs | 11 ++++++----- src/types.rs | 2 +- src/verifier.rs | 13 +++++++------ 8 files changed, 43 insertions(+), 37 deletions(-) diff --git a/src/downloader.rs b/src/downloader.rs index 0c672e3..34a5c66 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -1,3 +1,4 @@ +use std::ops::Deref; use std::sync::{Arc, RwLock}; use bitcoin::secp256k1::PublicKey; @@ -30,16 +31,16 @@ impl GossipCounter { } } -pub(crate) struct GossipRouter { - native_router: P2PGossipSync>>, GossipChainAccess, Arc>, +pub(crate) struct GossipRouter where L::Target: Logger { + native_router: P2PGossipSync>, GossipChainAccess, L>, pub(crate) counter: RwLock, sender: mpsc::Sender, verifier: Arc>, - outbound_gossiper: Arc>>, GossipChainAccess, Arc>>, + outbound_gossiper: Arc>, GossipChainAccess, L>>, } -impl GossipRouter { - pub(crate) fn new(network_graph: Arc>>, sender: mpsc::Sender, logger: Arc) -> Self { +impl GossipRouter where L::Target: Logger { + pub(crate) fn new(network_graph: Arc>, sender: mpsc::Sender, logger: L) -> Self { let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, logger.clone())); let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper))); Self { @@ -83,7 +84,7 @@ impl GossipRouter { } } -impl MessageSendEventsProvider for GossipRouter { +impl MessageSendEventsProvider for GossipRouter where L::Target: Logger { fn get_and_clear_pending_msg_events(&self) -> Vec { let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events(); for ev in gossip_evs { @@ -102,7 +103,7 @@ impl MessageSendEventsProvider for GossipRouter { } } -impl RoutingMessageHandler for GossipRouter { +impl RoutingMessageHandler for GossipRouter where L::Target: Logger { fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result { self.native_router.handle_node_announcement(msg) } diff --git a/src/lib.rs b/src/lib.rs index 363d4ae..803c077 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ extern crate core; use std::collections::{HashMap, HashSet}; use std::fs::File; use std::io::BufReader; +use std::ops::Deref; use std::sync::Arc; use lightning::routing::gossip::{NetworkGraph, NodeId}; @@ -43,9 +44,9 @@ pub mod types; /// The fourth byte is the protocol version in case our format gets updated. const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1]; -pub struct RapidSyncProcessor { - network_graph: Arc>>, - logger: Arc +pub struct RapidSyncProcessor where L::Target: Logger { + network_graph: Arc>, + logger: L } pub struct SerializedResponse { @@ -57,8 +58,8 @@ pub struct SerializedResponse { pub update_count_incremental: u32, } -impl RapidSyncProcessor { - pub fn new(logger: Arc) -> Self { +impl RapidSyncProcessor where L::Target: Logger { + pub fn new(logger: L) -> Self { let network = config::network(); let network_graph = if let Ok(file) = File::open(&config::network_graph_cache_path()) { println!("Initializing from cached network graph…"); @@ -90,7 +91,7 @@ impl RapidSyncProcessor { println!("Starting gossip download"); tokio::spawn(tracking::download_gossip(persistence_sender, sync_completion_sender, - Arc::clone(&self.network_graph), Arc::clone(&self.logger))); + Arc::clone(&self.network_graph), self.logger.clone())); println!("Starting gossip db persistence listener"); tokio::spawn(async move { persister.persist_gossip().await; }); } else { @@ -129,7 +130,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec { let chain_hash = genesis_block.block_hash(); chain_hash.write(&mut blob).unwrap(); - let blob_timestamp = Snapshotter::::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32; + let blob_timestamp = Snapshotter::>::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32; blob_timestamp.write(&mut blob).unwrap(); 0u32.write(&mut blob).unwrap(); // node count @@ -139,7 +140,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec { blob } -async fn serialize_delta(network_graph: Arc>>, last_sync_timestamp: u32) -> SerializedResponse { +async fn serialize_delta(network_graph: Arc>, last_sync_timestamp: u32) -> SerializedResponse where L::Target: Logger { let (client, connection) = lookup::connect_to_db().await; network_graph.remove_stale_channels_and_tracking(); diff --git a/src/lookup.rs b/src/lookup.rs index 79fb84e..9e56199 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -1,6 +1,6 @@ use std::collections::{BTreeMap, HashSet}; use std::io::Cursor; -use std::ops::Add; +use std::ops::{Add, Deref}; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; @@ -76,7 +76,7 @@ pub(super) async fn connect_to_db() -> (Client, Connection) /// whether they had been seen before. /// Also include all announcements for which the first update was announced /// after `last_sync_timestamp` -pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc>>, client: &Client, last_sync_timestamp: u32) { +pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc>, client: &Client, last_sync_timestamp: u32) where L::Target: Logger { println!("Obtaining channel ids from network graph"); let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64)); let channel_ids = { diff --git a/src/persistence.rs b/src/persistence.rs index 22abf02..e2d9eb0 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -1,5 +1,6 @@ use std::fs::OpenOptions; use std::io::{BufWriter, Write}; +use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; use lightning::routing::gossip::NetworkGraph; @@ -13,13 +14,13 @@ use crate::types::GossipMessage; const POSTGRES_INSERT_TIMEOUT: Duration = Duration::from_secs(15); -pub(crate) struct GossipPersister { +pub(crate) struct GossipPersister where L::Target: Logger { gossip_persistence_receiver: mpsc::Receiver, - network_graph: Arc>>, + network_graph: Arc>, } -impl GossipPersister { - pub fn new(network_graph: Arc>>) -> (Self, mpsc::Sender) { +impl GossipPersister where L::Target: Logger { + pub fn new(network_graph: Arc>) -> (Self, mpsc::Sender) { let (gossip_persistence_sender, gossip_persistence_receiver) = mpsc::channel::(100); (GossipPersister { diff --git a/src/snapshot.rs b/src/snapshot.rs index c81cc35..05cc63e 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::fs; +use std::ops::Deref; use std::os::unix::fs::symlink; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -10,12 +11,12 @@ use lightning::util::logger::Logger; use crate::config; use crate::config::cache_path; -pub(crate) struct Snapshotter { - network_graph: Arc>>, +pub(crate) struct Snapshotter where L::Target: Logger { + network_graph: Arc>, } -impl Snapshotter { - pub fn new(network_graph: Arc>>) -> Self { +impl Snapshotter where L::Target: Logger { + pub fn new(network_graph: Arc>) -> Self { Self { network_graph } } diff --git a/src/tracking.rs b/src/tracking.rs index 2935eb2..99166af 100644 --- a/src/tracking.rs +++ b/src/tracking.rs @@ -1,6 +1,7 @@ use std::collections::hash_map::RandomState; use std::hash::{BuildHasher, Hasher}; use std::net::SocketAddr; +use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -19,11 +20,11 @@ use crate::config; use crate::downloader::GossipRouter; use crate::types::{GossipMessage, GossipPeerManager}; -pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, +pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, completion_sender: mpsc::Sender<()>, - network_graph: Arc>>, - logger: Arc -) { + network_graph: Arc>, + logger: L +) where L::Target: Logger { let mut key = [42; 32]; let mut random_data = [43; 32]; // Get something psuedo-random from std. @@ -145,7 +146,7 @@ pub(crate) async fn download_gossip(persisten }); } -async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool { +async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool where L::Target: Logger { eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string()); let connection = lightning_net_tokio::connect_outbound( Arc::clone(&peer_manager), diff --git a/src/types.rs b/src/types.rs index b18cbe7..6fafbd3 100644 --- a/src/types.rs +++ b/src/types.rs @@ -9,7 +9,7 @@ use crate::downloader::GossipRouter; use crate::verifier::ChainVerifier; pub(crate) type GossipChainAccess = Arc>; -pub(crate) type GossipPeerManager = Arc>, IgnoringMessageHandler, Arc, IgnoringMessageHandler, Arc>>; +pub(crate) type GossipPeerManager = Arc>, IgnoringMessageHandler, L, IgnoringMessageHandler, Arc>>; #[derive(Debug)] pub(crate) enum GossipMessage { diff --git a/src/verifier.rs b/src/verifier.rs index a88f2b7..872a501 100644 --- a/src/verifier.rs +++ b/src/verifier.rs @@ -1,4 +1,5 @@ use std::convert::TryInto; +use std::ops::Deref; use std::sync::Arc; use std::sync::Mutex; @@ -15,17 +16,17 @@ use lightning_block_sync::rest::RestClient; use crate::config; use crate::types::GossipPeerManager; -pub(crate) struct ChainVerifier { +pub(crate) struct ChainVerifier where L::Target: Logger { rest_client: Arc, - graph: Arc>>, - outbound_gossiper: Arc>>, Arc, Arc>>, + graph: Arc>, + outbound_gossiper: Arc>, Arc, L>>, peer_handler: Mutex>>, } struct RestBinaryResponse(Vec); -impl ChainVerifier { - pub(crate) fn new(graph: Arc>>, outbound_gossiper: Arc>>, Arc, Arc>>) -> Self { +impl ChainVerifier where L::Target: Logger { + pub(crate) fn new(graph: Arc>, outbound_gossiper: Arc>, Arc, L>>) -> Self { ChainVerifier { rest_client: Arc::new(RestClient::new(config::bitcoin_rest_endpoint()).unwrap()), outbound_gossiper, @@ -73,7 +74,7 @@ impl ChainVerifier { } } -impl UtxoLookup for ChainVerifier { +impl UtxoLookup for ChainVerifier where L::Target: Logger { fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult { let res = UtxoFuture::new(); let fut = res.clone(); From 561c7f8f9dfb64d1bf408cacdcaa676360c2d052 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 2 Aug 2023 18:11:06 -0700 Subject: [PATCH 04/10] Remove println from lib.rs --- src/lib.rs | 27 ++++++++++++++------------- src/snapshot.rs | 11 ++++++----- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 803c077..73d3550 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ use std::fs::File; use std::io::BufReader; use std::ops::Deref; use std::sync::Arc; +use lightning::log_info; use lightning::routing::gossip::{NetworkGraph, NodeId}; use lightning::util::logger::Logger; @@ -62,14 +63,14 @@ impl RapidSyncProcessor where L::Ta pub fn new(logger: L) -> Self { let network = config::network(); let network_graph = if let Ok(file) = File::open(&config::network_graph_cache_path()) { - println!("Initializing from cached network graph…"); + log_info!(logger, "Initializing from cached network graph…"); let mut buffered_reader = BufReader::new(file); let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger.clone()); if let Ok(network_graph) = network_graph_result { - println!("Initialized from cached network graph!"); + log_info!(logger, "Initialized from cached network graph!"); network_graph } else { - println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap()); + log_info!(logger, "Initialization from cached network graph failed: {}", network_graph_result.err().unwrap()); NetworkGraph::new(network, logger.clone()) } } else { @@ -89,10 +90,10 @@ impl RapidSyncProcessor where L::Ta if config::DOWNLOAD_NEW_GOSSIP { let (mut persister, persistence_sender) = GossipPersister::new(Arc::clone(&self.network_graph)); - println!("Starting gossip download"); + log_info!(self.logger, "Starting gossip download"); tokio::spawn(tracking::download_gossip(persistence_sender, sync_completion_sender, Arc::clone(&self.network_graph), self.logger.clone())); - println!("Starting gossip db persistence listener"); + log_info!(self.logger, "Starting gossip db persistence listener"); tokio::spawn(async move { persister.persist_gossip().await; }); } else { sync_completion_sender.send(()).await.unwrap(); @@ -102,10 +103,10 @@ impl RapidSyncProcessor where L::Ta if sync_completion.is_none() { panic!("Sync failed!"); } - println!("Initial sync complete!"); + log_info!(self.logger, "Initial sync complete!"); // start the gossip snapshotting service - Snapshotter::new(Arc::clone(&self.network_graph)).snapshot_gossip().await; + Snapshotter::new(Arc::clone(&self.network_graph), self.logger.clone()).snapshot_gossip().await; } } @@ -140,7 +141,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec { blob } -async fn serialize_delta(network_graph: Arc>, last_sync_timestamp: u32) -> SerializedResponse where L::Target: Logger { +async fn serialize_delta(network_graph: Arc>, last_sync_timestamp: u32, logger: L) -> SerializedResponse where L::Target: Logger { let (client, connection) = lookup::connect_to_db().await; network_graph.remove_stale_channels_and_tracking(); @@ -175,11 +176,11 @@ async fn serialize_delta(network_graph: Arc>, last_syn let mut delta_set = DeltaSet::new(); lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp).await; - println!("announcement channel count: {}", delta_set.len()); + log_info!(logger, "announcement channel count: {}", delta_set.len()); lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp).await; - println!("update-fetched channel count: {}", delta_set.len()); + log_info!(logger, "update-fetched channel count: {}", delta_set.len()); lookup::filter_delta_set(&mut delta_set); - println!("update-filtered channel count: {}", delta_set.len()); + log_info!(logger, "update-filtered channel count: {}", delta_set.len()); let serialization_details = serialization::serialize_delta_set(delta_set, last_sync_timestamp); // process announcements @@ -250,8 +251,8 @@ async fn serialize_delta(network_graph: Arc>, last_syn prefixed_output.append(&mut output); - println!("duplicated node ids: {}", duplicate_node_ids); - println!("latest seen timestamp: {:?}", serialization_details.latest_seen); + log_info!(logger, "duplicated node ids: {}", duplicate_node_ids); + log_info!(logger, "latest seen timestamp: {:?}", serialization_details.latest_seen); SerializedResponse { data: prefixed_output, diff --git a/src/snapshot.rs b/src/snapshot.rs index 05cc63e..63975be 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -11,13 +11,14 @@ use lightning::util::logger::Logger; use crate::config; use crate::config::cache_path; -pub(crate) struct Snapshotter where L::Target: Logger { +pub(crate) struct Snapshotter where L::Target: Logger { network_graph: Arc>, + logger: L } -impl Snapshotter where L::Target: Logger { - pub fn new(network_graph: Arc>) -> Self { - Self { network_graph } +impl Snapshotter where L::Target: Logger { + pub fn new(network_graph: Arc>, logger: L) -> Self { + Self { network_graph, logger } } pub(crate) async fn snapshot_gossip(&self) { @@ -79,7 +80,7 @@ impl Snapshotter where L::Target: Logger { { println!("Calculating {}-day snapshot", day_range); // calculate the snapshot - let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32).await; + let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await; // persist the snapshot and update the symlink let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-days__previous-sync:{}.lngossip", reference_timestamp, day_range, current_last_sync_timestamp); From 8ae08218d11f75f1ab1229b2326572883ac0a428 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 2 Aug 2023 18:16:24 -0700 Subject: [PATCH 05/10] Remove println from lookup.rs --- src/lib.rs | 8 ++++---- src/lookup.rs | 27 ++++++++++++++------------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 73d3550..41057d3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -141,7 +141,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec { blob } -async fn serialize_delta(network_graph: Arc>, last_sync_timestamp: u32, logger: L) -> SerializedResponse where L::Target: Logger { +async fn serialize_delta(network_graph: Arc>, last_sync_timestamp: u32, logger: L) -> SerializedResponse where L::Target: Logger { let (client, connection) = lookup::connect_to_db().await; network_graph.remove_stale_channels_and_tracking(); @@ -175,11 +175,11 @@ async fn serialize_delta(network_graph: Arc>, last_syn }; let mut delta_set = DeltaSet::new(); - lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp).await; + lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp, logger.clone()).await; log_info!(logger, "announcement channel count: {}", delta_set.len()); - lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp).await; + lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await; log_info!(logger, "update-fetched channel count: {}", delta_set.len()); - lookup::filter_delta_set(&mut delta_set); + lookup::filter_delta_set(&mut delta_set, logger.clone()); log_info!(logger, "update-filtered channel count: {}", delta_set.len()); let serialization_details = serialization::serialize_delta_set(delta_set, last_sync_timestamp); diff --git a/src/lookup.rs b/src/lookup.rs index 9e56199..1c6b418 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -11,6 +11,7 @@ use tokio_postgres::{Client, Connection, NoTls, Socket}; use tokio_postgres::tls::NoTlsStream; use futures::StreamExt; +use lightning::log_info; use lightning::util::logger::Logger; use crate::config; @@ -76,12 +77,12 @@ pub(super) async fn connect_to_db() -> (Client, Connection) /// whether they had been seen before. /// Also include all announcements for which the first update was announced /// after `last_sync_timestamp` -pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc>, client: &Client, last_sync_timestamp: u32) where L::Target: Logger { - println!("Obtaining channel ids from network graph"); +pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc>, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger { + log_info!(logger, "Obtaining channel ids from network graph"); let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64)); let channel_ids = { let read_only_graph = network_graph.read_only(); - println!("Retrieved read-only network graph copy"); + log_info!(logger, "Retrieved read-only network graph copy"); let channel_iterator = read_only_graph.channels().unordered_iter(); channel_iterator .filter(|c| c.1.announcement_message.is_some()) @@ -89,7 +90,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS .collect::>() }; - println!("Obtaining corresponding database entries"); + log_info!(logger, "Obtaining corresponding database entries"); // get all the channel announcements that are currently in the network graph let announcement_rows = client.query_raw("SELECT announcement_signed, seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap(); let mut pinned_rows = Box::pin(announcement_rows); @@ -114,7 +115,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS { // THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA - println!("Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync"); + log_info!(logger, "Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync"); // Steps: // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction // — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction) @@ -156,7 +157,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS { // THIS STEP IS USED TO DETERMINE IF A REMINDER UPDATE SHOULD BE SENT - println!("Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago"); + log_info!(logger, "Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago"); // Steps: // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction) @@ -214,7 +215,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaS } } -pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32) { +pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger { let start = Instant::now(); let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64)); @@ -236,7 +237,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli ", [last_sync_timestamp_object]).await.unwrap(); let mut pinned_rows = Box::pin(reference_rows); - println!("Fetched reference rows in {:?}", start.elapsed()); + log_info!(logger, "Fetched reference rows in {:?}", start.elapsed()); let mut last_seen_update_ids: Vec = Vec::new(); let mut non_intermediate_ids: HashSet = HashSet::new(); @@ -264,7 +265,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli reference_row_count += 1; } - println!("Processed {} reference rows (delta size: {}) in {:?}", + log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}", reference_row_count, delta_set.len(), start.elapsed()); // get all the intermediate channel updates @@ -277,7 +278,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli WHERE seen >= $1 ", [last_sync_timestamp_object]).await.unwrap(); let mut pinned_updates = Box::pin(intermediate_updates); - println!("Fetched intermediate rows in {:?}", start.elapsed()); + log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed()); let mut previous_scid = u64::MAX; let mut previously_seen_directions = (false, false); @@ -352,10 +353,10 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli } } } - println!("Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed()); + log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed()); } -pub(super) fn filter_delta_set(delta_set: &mut DeltaSet) { +pub(super) fn filter_delta_set(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger { let original_length = delta_set.len(); let keys: Vec = delta_set.keys().cloned().collect(); for k in keys { @@ -387,6 +388,6 @@ pub(super) fn filter_delta_set(delta_set: &mut DeltaSet) { let new_length = delta_set.len(); if original_length != new_length { - println!("length modified!"); + log_info!(logger, "length modified!"); } } From 3a7938605a1968e7d0bae41f63af735e304671ae Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 2 Aug 2023 18:19:13 -0700 Subject: [PATCH 06/10] Remove println from persistence.rs --- src/lib.rs | 2 +- src/persistence.rs | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 41057d3..b6281bc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -88,7 +88,7 @@ impl RapidSyncProcessor where L::Ta let (sync_completion_sender, mut sync_completion_receiver) = mpsc::channel::<()>(1); if config::DOWNLOAD_NEW_GOSSIP { - let (mut persister, persistence_sender) = GossipPersister::new(Arc::clone(&self.network_graph)); + let (mut persister, persistence_sender) = GossipPersister::new(self.network_graph.clone(), self.logger.clone()); log_info!(self.logger, "Starting gossip download"); tokio::spawn(tracking::download_gossip(persistence_sender, sync_completion_sender, diff --git a/src/persistence.rs b/src/persistence.rs index e2d9eb0..f638894 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -3,6 +3,7 @@ use std::io::{BufWriter, Write}; use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; +use lightning::log_info; use lightning::routing::gossip::NetworkGraph; use lightning::util::logger::Logger; use lightning::util::ser::Writeable; @@ -17,15 +18,17 @@ const POSTGRES_INSERT_TIMEOUT: Duration = Duration::from_secs(15); pub(crate) struct GossipPersister where L::Target: Logger { gossip_persistence_receiver: mpsc::Receiver, network_graph: Arc>, + logger: L } impl GossipPersister where L::Target: Logger { - pub fn new(network_graph: Arc>) -> (Self, mpsc::Sender) { + pub fn new(network_graph: Arc>, logger: L) -> (Self, mpsc::Sender) { let (gossip_persistence_sender, gossip_persistence_receiver) = mpsc::channel::(100); (GossipPersister { gossip_persistence_receiver, - network_graph + network_graph, + logger }, gossip_persistence_sender) } @@ -101,7 +104,7 @@ impl GossipPersister where L::Target: Logger { i += 1; // count the persisted gossip messages if latest_persistence_log.elapsed().as_secs() >= 60 { - println!("Persisting gossip message #{}", i); + log_info!(self.logger, "Persisting gossip message #{}", i); latest_persistence_log = Instant::now(); } @@ -179,7 +182,7 @@ impl GossipPersister where L::Target: Logger { } fn persist_network_graph(&self) { - println!("Caching network graph…"); + log_info!(self.logger, "Caching network graph…"); let cache_path = config::network_graph_cache_path(); let file = OpenOptions::new() .create(true) @@ -191,6 +194,6 @@ impl GossipPersister where L::Target: Logger { let mut writer = BufWriter::new(file); self.network_graph.write(&mut writer).unwrap(); writer.flush().unwrap(); - println!("Cached network graph!"); + log_info!(self.logger, "Cached network graph!"); } } From 66cb299a1c28daaf0ea54efbd20e01a4725d8921 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 2 Aug 2023 18:21:29 -0700 Subject: [PATCH 07/10] Remove println from snapshot.rs --- src/snapshot.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/snapshot.rs b/src/snapshot.rs index 63975be..96c1e4d 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -4,6 +4,7 @@ use std::ops::Deref; use std::os::unix::fs::symlink; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use lightning::log_info; use lightning::routing::gossip::NetworkGraph; use lightning::util::logger::Logger; @@ -22,7 +23,7 @@ impl Snapshotter where L::Target: Logger { } pub(crate) async fn snapshot_gossip(&self) { - println!("Initiating snapshotting service"); + log_info!(self.logger, "Initiating snapshotting service"); let snapshot_sync_day_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX]; let round_day_seconds = config::SNAPSHOT_CALCULATION_INTERVAL as u64; @@ -38,7 +39,7 @@ impl Snapshotter where L::Target: Logger { // 1. get the current timestamp let snapshot_generation_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); let reference_timestamp = Self::round_down_to_nearest_multiple(snapshot_generation_timestamp, round_day_seconds); - println!("Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp); + log_info!(self.logger, "Capturing snapshots at {} for: {}", snapshot_generation_timestamp, reference_timestamp); // 2. sleep until the next round 24 hours // 3. refresh all snapshots @@ -78,14 +79,14 @@ impl Snapshotter where L::Target: Logger { for (day_range, current_last_sync_timestamp) in &snapshot_sync_timestamps { let network_graph_clone = self.network_graph.clone(); { - println!("Calculating {}-day snapshot", day_range); + log_info!(self.logger, "Calculating {}-day snapshot", day_range); // calculate the snapshot let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, self.logger.clone()).await; // persist the snapshot and update the symlink let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-days__previous-sync:{}.lngossip", reference_timestamp, day_range, current_last_sync_timestamp); let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename); - println!("Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental); + log_info!(self.logger, "Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental); fs::write(&snapshot_path, snapshot.data).unwrap(); snapshot_filenames_by_day_range.insert(day_range.clone(), snapshot_filename); } @@ -100,7 +101,7 @@ impl Snapshotter where L::Target: Logger { let dummy_symlink_path = format!("{}/{}.bin", pending_symlink_directory, reference_timestamp); let relative_dummy_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, dummy_filename); - println!("Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path); + log_info!(self.logger, "Symlinking dummy: {} -> {}", dummy_symlink_path, relative_dummy_snapshot_path); symlink(&relative_dummy_snapshot_path, &dummy_symlink_path).unwrap(); } @@ -129,7 +130,7 @@ impl Snapshotter where L::Target: Logger { }; let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp); - println!("Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path); + log_info!(self.logger, "Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path); symlink(&relative_snapshot_path, &symlink_path).unwrap(); } @@ -151,7 +152,7 @@ impl Snapshotter where L::Target: Logger { let remainder = current_time % round_day_seconds; let time_until_next_day = round_day_seconds - remainder; - println!("Sleeping until next snapshot capture: {}s", time_until_next_day); + log_info!(self.logger, "Sleeping until next snapshot capture: {}s", time_until_next_day); // add in an extra five seconds to assure the rounding down works correctly let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_day + 5)); sleep.await; From b6bbdf5500568fb3194d725b9a42379ce64ae37d Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 2 Aug 2023 18:26:55 -0700 Subject: [PATCH 08/10] Remove println from tracking.rs --- src/tracking.rs | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/src/tracking.rs b/src/tracking.rs index 99166af..c53fc5b 100644 --- a/src/tracking.rs +++ b/src/tracking.rs @@ -11,6 +11,7 @@ use lightning; use lightning::ln::peer_handler::{ ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager, }; +use lightning::{log_error, log_info, log_warn}; use lightning::routing::gossip::NetworkGraph; use lightning::sign::KeysManager; use lightning::util::logger::Logger; @@ -49,7 +50,7 @@ pub(crate) async fn download_gossip(pe message_handler, 0xdeadbeef, &random_data, - logger, + logger.clone(), keys_manager, )); router.set_pm(Arc::clone(&peer_handler)); @@ -63,12 +64,12 @@ pub(crate) async fn download_gossip(pe } }); - println!("Connecting to Lightning peers..."); + log_info!(logger, "Connecting to Lightning peers..."); let peers = config::ln_peers(); let mut connected_peer_count = 0; for current_peer in peers { - let initial_connection_succeeded = connect_peer(current_peer, Arc::clone(&peer_handler)).await; + let initial_connection_succeeded = connect_peer(current_peer, peer_handler.clone(), logger.clone()).await; if initial_connection_succeeded { connected_peer_count += 1; } @@ -78,7 +79,7 @@ pub(crate) async fn download_gossip(pe panic!("Failed to connect to any peer."); } - println!("Connected to {} Lightning peers!", connected_peer_count); + log_info!(logger, "Connected to {} Lightning peers!", connected_peer_count); tokio::spawn(async move { let mut previous_announcement_count = 0u64; @@ -108,7 +109,8 @@ pub(crate) async fn download_gossip(pe // if we either aren't caught up, or just stopped/started being caught up if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) { - println!( + log_info!( + logger, "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n", i, total_message_count, @@ -119,19 +121,19 @@ pub(crate) async fn download_gossip(pe counter.channel_updates_without_htlc_max_msats ); } else { - println!("Monitoring for gossip…") + log_info!(logger, "Monitoring for gossip…") } if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip { - println!("caught up with gossip!"); + log_info!(logger, "caught up with gossip!"); needs_to_notify_persister = true; } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip { - println!("Received new messages since catching up with gossip!"); + log_info!(logger, "Received new messages since catching up with gossip!"); } let continuous_caught_up_duration = latest_new_gossip_time.elapsed(); if continuous_caught_up_duration.as_secs() > 600 { - eprintln!("No new gossip messages in 10 minutes! Something's amiss!"); + log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!"); } previous_announcement_count = counter.channel_announcements; @@ -146,19 +148,19 @@ pub(crate) async fn download_gossip(pe }); } -async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool where L::Target: Logger { - eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string()); +async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager, logger: L) -> bool where L::Target: Logger { + log_info!(logger, "Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string()); let connection = lightning_net_tokio::connect_outbound( Arc::clone(&peer_manager), current_peer.0, current_peer.1, ).await; if let Some(disconnection_future) = connection { - eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string()); + log_info!(logger, "Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string()); tokio::spawn(async move { disconnection_future.await; loop { - eprintln!("Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string()); + log_warn!(logger, "Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string()); if let Some(disconnection_future) = lightning_net_tokio::connect_outbound( Arc::clone(&peer_manager), current_peer.0, @@ -171,7 +173,7 @@ async fn connect_peer(current_peer: (P }); true } else { - eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string()); + log_error!(logger, "Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string()); false } } From 3f500ef230699fdf566cb09231dd7c15c4a55c35 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 2 Aug 2023 18:34:01 -0700 Subject: [PATCH 09/10] Remove println from verifier.rs --- src/downloader.rs | 2 +- src/verifier.rs | 28 +++++++++++++++++++--------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/downloader.rs b/src/downloader.rs index 34a5c66..8d5cdb0 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -42,7 +42,7 @@ pub(crate) struct GossipRouter where L impl GossipRouter where L::Target: Logger { pub(crate) fn new(network_graph: Arc>, sender: mpsc::Sender, logger: L) -> Self { let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, logger.clone())); - let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper))); + let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper), logger.clone())); Self { native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), logger.clone()), outbound_gossiper, diff --git a/src/verifier.rs b/src/verifier.rs index 872a501..6813ff7 100644 --- a/src/verifier.rs +++ b/src/verifier.rs @@ -6,6 +6,7 @@ use std::sync::Mutex; use bitcoin::{BlockHash, TxOut}; use bitcoin::blockdata::block::Block; use bitcoin::hashes::Hash; +use lightning::log_error; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupError}; use lightning::util::logger::Logger; @@ -21,41 +22,49 @@ pub(crate) struct ChainVerifier where graph: Arc>, outbound_gossiper: Arc>, Arc, L>>, peer_handler: Mutex>>, + logger: L } struct RestBinaryResponse(Vec); impl ChainVerifier where L::Target: Logger { - pub(crate) fn new(graph: Arc>, outbound_gossiper: Arc>, Arc, L>>) -> Self { + pub(crate) fn new(graph: Arc>, outbound_gossiper: Arc>, Arc, L>>, logger: L) -> Self { ChainVerifier { rest_client: Arc::new(RestClient::new(config::bitcoin_rest_endpoint()).unwrap()), outbound_gossiper, graph, peer_handler: Mutex::new(None), + logger } } pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager) { *self.peer_handler.lock().unwrap() = Some(peer_handler); } - async fn retrieve_utxo(client: Arc, short_channel_id: u64) -> Result { + async fn retrieve_utxo(client: Arc, short_channel_id: u64, logger: L) -> Result { let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32; let output_index = (short_channel_id & 0xffff) as u16; - let mut block = Self::retrieve_block(client, block_height).await?; - if transaction_index as usize >= block.txdata.len() { return Err(UtxoLookupError::UnknownTx); } + let mut block = Self::retrieve_block(client, block_height, logger.clone()).await?; + if transaction_index as usize >= block.txdata.len() { + log_error!(logger, "Could't find transaction {} in block {}", transaction_index, block_height); + return Err(UtxoLookupError::UnknownTx); + } let mut transaction = block.txdata.swap_remove(transaction_index as usize); - if output_index as usize >= transaction.output.len() { return Err(UtxoLookupError::UnknownTx); } + if output_index as usize >= transaction.output.len() { + log_error!(logger, "Could't find output {} in transaction {}", output_index, transaction.txid()); + return Err(UtxoLookupError::UnknownTx); + } Ok(transaction.output.swap_remove(output_index as usize)) } - async fn retrieve_block(client: Arc, block_height: u32) -> Result { + async fn retrieve_block(client: Arc, block_height: u32, logger: L) -> Result { let uri = format!("blockhashbyheight/{}.bin", block_height); let block_hash_result = client.request_resource::(&uri).await; let block_hash: Vec = block_hash_result.map_err(|error| { - eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string()); + log_error!(logger, "Could't find block hash at height {}: {}", block_height, error.to_string()); UtxoLookupError::UnknownChain })?.0; let block_hash = BlockHash::from_slice(&block_hash).unwrap(); @@ -67,7 +76,7 @@ impl ChainVerifier where L::Target: }, Ok(_) => unreachable!(), Err(error) => { - eprintln!("Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash); + log_error!(logger, "Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash); Err(UtxoLookupError::UnknownChain) } } @@ -82,8 +91,9 @@ impl UtxoLookup for ChainVerifier w let client_ref = Arc::clone(&self.rest_client); let gossip_ref = Arc::clone(&self.outbound_gossiper); let pm_ref = self.peer_handler.lock().unwrap().clone(); + let logger_ref = self.logger.clone(); tokio::spawn(async move { - let res = Self::retrieve_utxo(client_ref, short_channel_id).await; + let res = Self::retrieve_utxo(client_ref, short_channel_id, logger_ref).await; fut.resolve(&*graph_ref, &*gossip_ref, res); if let Some(pm) = pm_ref { pm.process_events(); } }); From 5eb833bb7ca47f76d8853e44121cfc785fe1a1c3 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Fri, 4 Aug 2023 14:30:02 -0700 Subject: [PATCH 10/10] Allow log threshold configuration. --- src/config.rs | 13 +++++++++++++ src/types.rs | 6 +++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/config.rs b/src/config.rs index b0e3a7c..804b0f8 100644 --- a/src/config.rs +++ b/src/config.rs @@ -35,6 +35,19 @@ pub(crate) fn network() -> Network { } } +pub(crate) fn log_level() -> lightning::util::logger::Level { + let level = env::var("RAPID_GOSSIP_SYNC_SERVER_LOG_LEVEL").unwrap_or("info".to_string()).to_lowercase(); + match level.as_str() { + "gossip" => lightning::util::logger::Level::Gossip, + "trace" => lightning::util::logger::Level::Trace, + "debug" => lightning::util::logger::Level::Debug, + "info" => lightning::util::logger::Level::Info, + "warn" => lightning::util::logger::Level::Warn, + "error" => lightning::util::logger::Level::Error, + _ => panic!("Invalid log level"), + } +} + pub(crate) fn network_graph_cache_path() -> String { format!("{}/network_graph.bin", cache_path()) } diff --git a/src/types.rs b/src/types.rs index 6fafbd3..0b03081 100644 --- a/src/types.rs +++ b/src/types.rs @@ -4,6 +4,7 @@ use lightning::sign::KeysManager; use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate}; use lightning::ln::peer_handler::{ErroringMessageHandler, IgnoringMessageHandler, PeerManager}; use lightning::util::logger::{Logger, Record}; +use crate::config; use crate::downloader::GossipRouter; use crate::verifier::ChainVerifier; @@ -28,7 +29,10 @@ impl RGSSLogger { impl Logger for RGSSLogger { fn log(&self, record: &Record) { - // TODO: allow log level threshold to be set + let threshold = config::log_level(); + if record.level < threshold { + return; + } println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args); } }