Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RGS v2: NodeAnnouncement Delta Serialization #76

Merged
merged 5 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks
/// That reminder may be either in the form of a channel announcement, or in the form of empty
/// updates in both directions.
pub(crate) const CHANNEL_REMINDER_AGE: Duration = Duration::from_secs(6 * 24 * 60 * 60);

/// Maximum number of default features to calculate for node announcements
pub(crate) const NODE_DEFAULT_FEATURE_COUNT: u8 = 6;

/// The number of successful peer connections to await prior to continuing to gossip storage.
/// The application will still work if the number of specified peers is lower, as long as there is
/// at least one successful peer connection, but it may result in long startup times.
Expand Down Expand Up @@ -299,6 +303,11 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client)
tx.execute("UPDATE config SET db_schema = 13 WHERE id = 1", &[]).await.unwrap();
tx.commit().await.unwrap();
}
if schema >= 1 && schema <= 13 {
let tx = client.transaction().await.unwrap();
tx.execute("UPDATE config SET db_schema = 14 WHERE id = 1", &[]).await.unwrap();
tx.commit().await.unwrap();
}
if schema <= 1 || schema > SCHEMA_VERSION {
panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION);
}
Expand Down Expand Up @@ -384,7 +393,7 @@ mod tests {
// Set the environment variable, including a repeated comma, leading space, and trailing comma.
std::env::set_var("LN_PEERS", "035e4ff418fc8b5554c5d9eea66396c227bd429a3251c8cbc711002ba215bfc226@170.75.163.209:9735,, 035e4ff418fc8b5554c5d9eea66396c227bd429a3251c8cbc711002ba215bfc227@170.75.163.210:9735,");
let peers = ln_peers();

// Assert output is as expected
assert_eq!(
peers,
Expand Down
143 changes: 122 additions & 21 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::config::SYMLINK_GRANULARITY_INTERVAL;
use crate::lookup::DeltaSet;

use crate::persistence::GossipPersister;
use crate::serialization::UpdateSerialization;
use crate::serialization::{SerializationSet, UpdateSerialization};
use crate::snapshot::Snapshotter;
use crate::types::RGSSLogger;

Expand All @@ -49,7 +49,7 @@ mod tests;
/// sync formats arise in the future.
///
/// The fourth byte is the protocol version in case our format gets updated.
const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1];
const GOSSIP_PREFIX: [u8; 3] = [76, 68, 75];

pub struct RapidSyncProcessor<L: Deref> where L::Target: Logger {
network_graph: Arc<NetworkGraph<L>>,
Expand All @@ -59,7 +59,13 @@ pub struct RapidSyncProcessor<L: Deref> where L::Target: Logger {
pub struct SerializedResponse {
pub data: Vec<u8>,
pub message_count: u32,
pub announcement_count: u32,
pub node_announcement_count: u32,
/// Despite the name, the count of node announcements that have associated updates, be those
/// features, addresses, or both
pub node_update_count: u32,
pub node_feature_update_count: u32,
pub node_address_update_count: u32,
pub channel_announcement_count: u32,
pub update_count: u32,
pub update_count_full: u32,
pub update_count_incremental: u32,
Expand Down Expand Up @@ -171,18 +177,31 @@ fn serialize_empty_blob(current_timestamp: u64) -> Vec<u8> {
blob
}

async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>, last_sync_timestamp: u32, snapshot_reference_timestamp: Option<u64>, logger: L) -> SerializedResponse where L::Target: Logger {
async fn calculate_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>, last_sync_timestamp: u32, snapshot_reference_timestamp: Option<u64>, logger: L) -> SerializationSet where L::Target: Logger {
let client = connect_to_db().await;

network_graph.remove_stale_channels_and_tracking();

let mut output: Vec<u8> = vec![];
let snapshot_interval = config::snapshot_generation_interval();

// set a flag if the chain hash is prepended
// chain hash only necessary if either channel announcements or non-incremental updates are present
// for announcement-free incremental-only updates, chain hash can be skipped

let mut delta_set = DeltaSet::new();
lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await;
log_info!(logger, "announcement channel count: {}", delta_set.len());
lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await;
log_info!(logger, "update-fetched channel count: {}", delta_set.len());
let node_delta_set = lookup::fetch_node_updates(&client, last_sync_timestamp, logger.clone()).await;
log_info!(logger, "update-fetched node count: {}", node_delta_set.len());
lookup::filter_delta_set(&mut delta_set, logger.clone());
log_info!(logger, "update-filtered channel count: {}", delta_set.len());
serialization::serialize_delta_set(delta_set, node_delta_set, last_sync_timestamp)
Comment on lines +189 to +198
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Probably best to return the sets in all of these instead of passing in a &mut, but can be done in a follow-up.

}

fn serialize_delta<L: Deref + Clone>(serialization_details: &SerializationSet, serialization_version: u8, logger: L) -> SerializedResponse where L::Target: Logger {
let mut output: Vec<u8> = vec![];
let snapshot_interval = config::snapshot_generation_interval();

let mut node_id_set: HashSet<NodeId> = HashSet::new();
let mut node_id_indices: HashMap<NodeId, usize> = HashMap::new();
let mut node_ids: Vec<NodeId> = Vec::new();
Expand All @@ -199,21 +218,12 @@ async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>,
node_id_indices[&node_id]
};

let mut delta_set = DeltaSet::new();
lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await;
log_info!(logger, "announcement channel count: {}", delta_set.len());
lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await;
log_info!(logger, "update-fetched channel count: {}", delta_set.len());
lookup::filter_delta_set(&mut delta_set, logger.clone());
log_info!(logger, "update-filtered channel count: {}", delta_set.len());
let serialization_details = serialization::serialize_delta_set(delta_set, last_sync_timestamp);

// process announcements
// write the number of channel announcements to the output
let announcement_count = serialization_details.announcements.len() as u32;
announcement_count.write(&mut output).unwrap();
let mut previous_announcement_scid = 0;
for current_announcement in serialization_details.announcements {
for current_announcement in &serialization_details.announcements {
let id_index_1 = get_node_id_index(current_announcement.node_id_1);
let id_index_2 = get_node_id_index(current_announcement.node_id_2);
let mut stripped_announcement = serialization::serialize_stripped_channel_announcement(&current_announcement, id_index_1, id_index_2, previous_announcement_scid);
Expand All @@ -227,7 +237,7 @@ async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>,
let update_count = serialization_details.updates.len() as u32;
update_count.write(&mut output).unwrap();

let default_update_values = serialization_details.full_update_defaults;
let default_update_values = &serialization_details.full_update_defaults;
if update_count > 0 {
default_update_values.cltv_expiry_delta.write(&mut output).unwrap();
default_update_values.htlc_minimum_msat.write(&mut output).unwrap();
Expand All @@ -238,7 +248,7 @@ async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>,

let mut update_count_full = 0;
let mut update_count_incremental = 0;
for current_update in serialization_details.updates {
for current_update in &serialization_details.updates {
match &current_update {
UpdateSerialization::Full(_) => {
update_count_full += 1;
Expand All @@ -258,6 +268,7 @@ async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>,
let message_count = announcement_count + update_count;

let mut prefixed_output = GOSSIP_PREFIX.to_vec();
prefixed_output.push(serialization_version);

// always write the chain hash
serialization_details.chain_hash.write(&mut prefixed_output).unwrap();
Expand All @@ -267,11 +278,97 @@ async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>,
let serialized_seen_timestamp = latest_seen_timestamp.saturating_sub(overflow_seconds);
serialized_seen_timestamp.write(&mut prefixed_output).unwrap();

if serialization_version >= 2 { // serialize the most common node features
for mutated_node_id in serialization_details.node_mutations.keys() {
// consider mutated nodes outside channel announcements
get_node_id_index(mutated_node_id.clone());
}

let default_feature_count = serialization_details.node_announcement_feature_defaults.len() as u8;
debug_assert!(default_feature_count <= config::NODE_DEFAULT_FEATURE_COUNT, "Default feature count cannot exceed maximum");
default_feature_count.write(&mut prefixed_output).unwrap();

for current_feature in &serialization_details.node_announcement_feature_defaults {
current_feature.write(&mut prefixed_output).unwrap();
}
}

let node_id_count = node_ids.len() as u32;
node_id_count.write(&mut prefixed_output).unwrap();

let mut node_update_count = 0u32;
let mut node_feature_update_count = 0u32;
let mut node_address_update_count = 0u32;

for current_node_id in node_ids {
arik-so marked this conversation as resolved.
Show resolved Hide resolved
current_node_id.write(&mut prefixed_output).unwrap();
let mut current_node_delta_serialization: Vec<u8> = Vec::new();
current_node_id.write(&mut current_node_delta_serialization).unwrap();

if serialization_version >= 2 {
if let Some(node_delta) = serialization_details.node_mutations.get(&current_node_id) {
/*
Bitmap:
7: expect extra data after the pubkey (a u16 for the count, and then that number of bytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so bit 6 is now unused? Should we clearly define it to mean something? Not sure what it would mean but worth mentioning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could use it for more feature defaults? Frankly, I think we can leave it unused on the server, and decide its interpretation on the client.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we definitely don't have to interpret it here before merging this PR, I'm just suggesting we think on it.

5-3: index of new features among default (1-6). If index is 7 (all 3 bits are set, it's
outside the present default range). 0 means no feature changes.
2: addresses have changed

1: used for all keys
0: used for odd keys
*/

if node_delta.has_address_set_changed {
node_address_update_count += 1;

let address_set = &node_delta.latest_details_after_seen.as_ref().unwrap().addresses;
let mut address_serialization = Vec::new();

// we don't know a priori how many are <= 255 bytes
let mut total_address_count = 0u8;

for address in address_set.iter() {
if total_address_count == u8::MAX {
// don't serialize more than 255 addresses
break;
}
if let Ok(serialized_length) = u8::try_from(address.serialized_length()) {
total_address_count += 1;
serialized_length.write(&mut address_serialization).unwrap();
address.write(&mut address_serialization).unwrap();
};
}

if total_address_count > 0 {
// signal the presence of node addresses
current_node_delta_serialization[0] |= 1 << 2;
// serialize the actual addresses and count
total_address_count.write(&mut current_node_delta_serialization).unwrap();
current_node_delta_serialization.append(&mut address_serialization);
}
}

if node_delta.has_feature_set_changed {
node_feature_update_count += 1;

let latest_features = &node_delta.latest_details_after_seen.as_ref().unwrap().features;

// are these features among the most common ones?
if let Some(index) = serialization_details.node_announcement_feature_defaults.iter().position(|f| f == latest_features) {
// this feature set is among the 6 defaults
arik-so marked this conversation as resolved.
Show resolved Hide resolved
current_node_delta_serialization[0] |= ((index + 1) as u8) << 3;
} else {
current_node_delta_serialization[0] |= 0b_0011_1000; // 7 << 3
latest_features.write(&mut current_node_delta_serialization).unwrap();
}
}

if node_delta.has_address_set_changed || node_delta.has_feature_set_changed {
node_update_count += 1;
}
}
}

prefixed_output.append(&mut current_node_delta_serialization);
}

prefixed_output.append(&mut output);
Expand All @@ -282,7 +379,11 @@ async fn serialize_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>,
SerializedResponse {
data: prefixed_output,
message_count,
announcement_count,
node_announcement_count: node_id_count,
node_update_count,
node_feature_update_count,
node_address_update_count,
channel_announcement_count: announcement_count,
update_count,
update_count_full,
update_count_incremental,
Expand Down
Loading