Skip to content

Commit

Permalink
Merge pull request #51 from arik-so/2023/08/pre_test_refactors
Browse files Browse the repository at this point in the history
Allow arbitrary logger types.
  • Loading branch information
TheBlueMatt authored Aug 13, 2023
2 parents 5e6a00a + 5eb833b commit 55e1b5d
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 132 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ edition = "2021"

[dependencies]
bitcoin = "0.29"
lightning = { version = "0.0.116-alpha1" }
lightning-block-sync = { version = "0.0.116-alpha1", features=["rest-client"] }
lightning-net-tokio = { version = "0.0.116-alpha1" }
lightning = { version = "0.0.116" }
lightning-block-sync = { version = "0.0.116", features=["rest-client"] }
lightning-net-tokio = { version = "0.0.116" }
tokio = { version = "1.25", features = ["full"] }
tokio-postgres = { version="=0.7.5" }
futures = "0.3"
Expand Down
13 changes: 13 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ pub(crate) fn network() -> Network {
}
}

pub(crate) fn log_level() -> lightning::util::logger::Level {
let level = env::var("RAPID_GOSSIP_SYNC_SERVER_LOG_LEVEL").unwrap_or("info".to_string()).to_lowercase();
match level.as_str() {
"gossip" => lightning::util::logger::Level::Gossip,
"trace" => lightning::util::logger::Level::Trace,
"debug" => lightning::util::logger::Level::Debug,
"info" => lightning::util::logger::Level::Info,
"warn" => lightning::util::logger::Level::Warn,
"error" => lightning::util::logger::Level::Error,
_ => panic!("Invalid log level"),
}
}

pub(crate) fn network_graph_cache_path() -> String {
format!("{}/network_graph.bin", cache_path())
}
Expand Down
29 changes: 15 additions & 14 deletions src/downloader.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::ops::Deref;
use std::sync::{Arc, RwLock};

use bitcoin::secp256k1::PublicKey;
use lightning::events::{MessageSendEvent, MessageSendEventsProvider};
use lightning::ln::features::{InitFeatures, NodeFeatures};
use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
use lightning::util::logger::Logger;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;

use crate::TestLogger;
use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
use crate::verifier::ChainVerifier;

Expand All @@ -30,28 +31,28 @@ impl GossipCounter {
}
}

pub(crate) struct GossipRouter {
native_router: P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>,
pub(crate) struct GossipRouter<L: Deref + Clone + Send + Sync + 'static> where L::Target: Logger {
native_router: P2PGossipSync<Arc<NetworkGraph<L>>, GossipChainAccess<L>, L>,
pub(crate) counter: RwLock<GossipCounter>,
sender: mpsc::Sender<GossipMessage>,
verifier: Arc<ChainVerifier>,
outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>>,
verifier: Arc<ChainVerifier<L>>,
outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, GossipChainAccess<L>, L>>,
}

impl GossipRouter {
pub(crate) fn new(network_graph: Arc<NetworkGraph<TestLogger>>, sender: mpsc::Sender<GossipMessage>) -> Self {
let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, TestLogger::new()));
let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper)));
impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
pub(crate) fn new(network_graph: Arc<NetworkGraph<L>>, sender: mpsc::Sender<GossipMessage>, logger: L) -> Self {
let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, logger.clone()));
let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper), logger.clone()));
Self {
native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), TestLogger::new()),
native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), logger.clone()),
outbound_gossiper,
counter: RwLock::new(GossipCounter::new()),
sender,
verifier,
verifier
}
}

pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) {
pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager<L>) {
self.verifier.set_ph(peer_handler);
}

Expand Down Expand Up @@ -83,7 +84,7 @@ impl GossipRouter {
}
}

impl MessageSendEventsProvider for GossipRouter {
impl<L: Deref + Clone + Send + Sync> MessageSendEventsProvider for GossipRouter<L> where L::Target: Logger {
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events();
for ev in gossip_evs {
Expand All @@ -102,7 +103,7 @@ impl MessageSendEventsProvider for GossipRouter {
}
}

impl RoutingMessageHandler for GossipRouter {
impl<L: Deref + Clone + Send + Sync> RoutingMessageHandler for GossipRouter<L> where L::Target: Logger {
fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
self.native_router.handle_node_announcement(msg)
}
Expand Down
63 changes: 34 additions & 29 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@ extern crate core;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::BufReader;
use std::ops::Deref;
use std::sync::Arc;
use lightning::log_info;

use lightning::routing::gossip::{NetworkGraph, NodeId};
use lightning::util::logger::Logger;
use lightning::util::ser::{ReadableArgs, Writeable};
use tokio::sync::mpsc;
use crate::lookup::DeltaSet;

use crate::persistence::GossipPersister;
use crate::serialization::UpdateSerialization;
use crate::snapshot::Snapshotter;
use crate::types::TestLogger;
use crate::types::RGSSLogger;

mod downloader;
mod types;
mod tracking;
mod lookup;
mod persistence;
Expand All @@ -35,14 +37,17 @@ mod config;
mod hex_utils;
mod verifier;

pub mod types;

/// The purpose of this prefix is to identify the serialization format, should other rapid gossip
/// sync formats arise in the future.
///
/// The fourth byte is the protocol version in case our format gets updated.
const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1];

pub struct RapidSyncProcessor {
network_graph: Arc<NetworkGraph<TestLogger>>,
pub struct RapidSyncProcessor<L: Deref> where L::Target: Logger {
network_graph: Arc<NetworkGraph<L>>,
logger: L
}

pub struct SerializedResponse {
Expand All @@ -54,27 +59,27 @@ pub struct SerializedResponse {
pub update_count_incremental: u32,
}

impl RapidSyncProcessor {
pub fn new() -> Self {
impl<L: Deref + Clone + Send + Sync + 'static> RapidSyncProcessor<L> where L::Target: Logger {
pub fn new(logger: L) -> Self {
let network = config::network();
let logger = TestLogger::new();
let network_graph = if let Ok(file) = File::open(&config::network_graph_cache_path()) {
println!("Initializing from cached network graph…");
log_info!(logger, "Initializing from cached network graph…");
let mut buffered_reader = BufReader::new(file);
let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger);
let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger.clone());
if let Ok(network_graph) = network_graph_result {
println!("Initialized from cached network graph!");
log_info!(logger, "Initialized from cached network graph!");
network_graph
} else {
println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap());
NetworkGraph::new(network, logger)
log_info!(logger, "Initialization from cached network graph failed: {}", network_graph_result.err().unwrap());
NetworkGraph::new(network, logger.clone())
}
} else {
NetworkGraph::new(network, logger)
NetworkGraph::new(network, logger.clone())
};
let arc_network_graph = Arc::new(network_graph);
Self {
network_graph: arc_network_graph,
logger
}
}

Expand All @@ -83,12 +88,12 @@ impl RapidSyncProcessor {
let (sync_completion_sender, mut sync_completion_receiver) = mpsc::channel::<()>(1);

if config::DOWNLOAD_NEW_GOSSIP {
let (mut persister, persistence_sender) = GossipPersister::new(Arc::clone(&self.network_graph));
let (mut persister, persistence_sender) = GossipPersister::new(self.network_graph.clone(), self.logger.clone());

println!("Starting gossip download");
log_info!(self.logger, "Starting gossip download");
tokio::spawn(tracking::download_gossip(persistence_sender, sync_completion_sender,
Arc::clone(&self.network_graph)));
println!("Starting gossip db persistence listener");
Arc::clone(&self.network_graph), self.logger.clone()));
log_info!(self.logger, "Starting gossip db persistence listener");
tokio::spawn(async move { persister.persist_gossip().await; });
} else {
sync_completion_sender.send(()).await.unwrap();
Expand All @@ -98,10 +103,10 @@ impl RapidSyncProcessor {
if sync_completion.is_none() {
panic!("Sync failed!");
}
println!("Initial sync complete!");
log_info!(self.logger, "Initial sync complete!");

// start the gossip snapshotting service
Snapshotter::new(Arc::clone(&self.network_graph)).snapshot_gossip().await;
Snapshotter::new(Arc::clone(&self.network_graph), self.logger.clone()).snapshot_gossip().await;
}
}

Expand All @@ -126,7 +131,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec<u8> {
let chain_hash = genesis_block.block_hash();
chain_hash.write(&mut blob).unwrap();

let blob_timestamp = Snapshotter::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32;
let blob_timestamp = Snapshotter::<Arc<RGSSLogger>>::round_down_to_nearest_multiple(current_timestamp, config::SNAPSHOT_CALCULATION_INTERVAL as u64) as u32;
blob_timestamp.write(&mut blob).unwrap();

0u32.write(&mut blob).unwrap(); // node count
Expand All @@ -136,7 +141,7 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec<u8> {
blob
}

async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync_timestamp: u32) -> SerializedResponse {
async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>, last_sync_timestamp: u32, logger: L) -> SerializedResponse where L::Target: Logger {
let (client, connection) = lookup::connect_to_db().await;

network_graph.remove_stale_channels_and_tracking();
Expand Down Expand Up @@ -170,12 +175,12 @@ async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync
};

let mut delta_set = DeltaSet::new();
lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp).await;
println!("announcement channel count: {}", delta_set.len());
lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp).await;
println!("update-fetched channel count: {}", delta_set.len());
lookup::filter_delta_set(&mut delta_set);
println!("update-filtered channel count: {}", delta_set.len());
lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp, logger.clone()).await;
log_info!(logger, "announcement channel count: {}", delta_set.len());
lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await;
log_info!(logger, "update-fetched channel count: {}", delta_set.len());
lookup::filter_delta_set(&mut delta_set, logger.clone());
log_info!(logger, "update-filtered channel count: {}", delta_set.len());
let serialization_details = serialization::serialize_delta_set(delta_set, last_sync_timestamp);

// process announcements
Expand Down Expand Up @@ -246,8 +251,8 @@ async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync

prefixed_output.append(&mut output);

println!("duplicated node ids: {}", duplicate_node_ids);
println!("latest seen timestamp: {:?}", serialization_details.latest_seen);
log_info!(logger, "duplicated node ids: {}", duplicate_node_ids);
log_info!(logger, "latest seen timestamp: {:?}", serialization_details.latest_seen);

SerializedResponse {
data: prefixed_output,
Expand Down
32 changes: 17 additions & 15 deletions src/lookup.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::{BTreeMap, HashSet};
use std::io::Cursor;
use std::ops::Add;
use std::ops::{Add, Deref};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};

Expand All @@ -11,8 +11,10 @@ use tokio_postgres::{Client, Connection, NoTls, Socket};
use tokio_postgres::tls::NoTlsStream;

use futures::StreamExt;
use lightning::log_info;
use lightning::util::logger::Logger;

use crate::{config, TestLogger};
use crate::config;
use crate::serialization::MutatedProperties;

/// The delta set needs to be a BTreeMap so the keys are sorted.
Expand Down Expand Up @@ -75,20 +77,20 @@ pub(super) async fn connect_to_db() -> (Client, Connection<Socket, NoTlsStream>)
/// whether they had been seen before.
/// Also include all announcements for which the first update was announced
/// after `last_sync_timestamp`
pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<TestLogger>>, client: &Client, last_sync_timestamp: u32) {
println!("Obtaining channel ids from network graph");
pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
log_info!(logger, "Obtaining channel ids from network graph");
let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
let channel_ids = {
let read_only_graph = network_graph.read_only();
println!("Retrieved read-only network graph copy");
log_info!(logger, "Retrieved read-only network graph copy");
let channel_iterator = read_only_graph.channels().unordered_iter();
channel_iterator
.filter(|c| c.1.announcement_message.is_some())
.map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
.collect::<Vec<_>>()
};

println!("Obtaining corresponding database entries");
log_info!(logger, "Obtaining corresponding database entries");
// get all the channel announcements that are currently in the network graph
let announcement_rows = client.query_raw("SELECT announcement_signed, seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
let mut pinned_rows = Box::pin(announcement_rows);
Expand All @@ -113,7 +115,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ
{
// THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA

println!("Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
log_info!(logger, "Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
// Steps:
// — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction
// — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction)
Expand Down Expand Up @@ -155,7 +157,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ
{
// THIS STEP IS USED TO DETERMINE IF A REMINDER UPDATE SHOULD BE SENT

println!("Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
log_info!(logger, "Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
// Steps:
// — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
// — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
Expand Down Expand Up @@ -213,7 +215,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ
}
}

pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32) {
pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
let start = Instant::now();
let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));

Expand All @@ -235,7 +237,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli
", [last_sync_timestamp_object]).await.unwrap();
let mut pinned_rows = Box::pin(reference_rows);

println!("Fetched reference rows in {:?}", start.elapsed());
log_info!(logger, "Fetched reference rows in {:?}", start.elapsed());

let mut last_seen_update_ids: Vec<i32> = Vec::new();
let mut non_intermediate_ids: HashSet<i32> = HashSet::new();
Expand Down Expand Up @@ -263,7 +265,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli
reference_row_count += 1;
}

println!("Processed {} reference rows (delta size: {}) in {:?}",
log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}",
reference_row_count, delta_set.len(), start.elapsed());

// get all the intermediate channel updates
Expand All @@ -276,7 +278,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli
WHERE seen >= $1
", [last_sync_timestamp_object]).await.unwrap();
let mut pinned_updates = Box::pin(intermediate_updates);
println!("Fetched intermediate rows in {:?}", start.elapsed());
log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());

let mut previous_scid = u64::MAX;
let mut previously_seen_directions = (false, false);
Expand Down Expand Up @@ -351,10 +353,10 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli
}
}
}
println!("Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
}

pub(super) fn filter_delta_set(delta_set: &mut DeltaSet) {
pub(super) fn filter_delta_set<L: Deref>(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger {
let original_length = delta_set.len();
let keys: Vec<u64> = delta_set.keys().cloned().collect();
for k in keys {
Expand Down Expand Up @@ -386,6 +388,6 @@ pub(super) fn filter_delta_set(delta_set: &mut DeltaSet) {

let new_length = delta_set.len();
if original_length != new_length {
println!("length modified!");
log_info!(logger, "length modified!");
}
}
5 changes: 4 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::sync::Arc;
use rapid_gossip_sync_server::RapidSyncProcessor;
use rapid_gossip_sync_server::types::RGSSLogger;

#[tokio::main]
async fn main() {
RapidSyncProcessor::new().start_sync().await;
let logger = Arc::new(RGSSLogger::new());
RapidSyncProcessor::new(logger).start_sync().await;
}
Loading

0 comments on commit 55e1b5d

Please sign in to comment.