Skip to content

Commit

Permalink
Introduce node serialization strategy.
Browse files Browse the repository at this point in the history
With the addition of reminders, we may encounter scenarios where
either a bit flip may suffice, instructing the client to look up its
latest data, or we may need to serialize all announcement details
a new if the client may have already purged the old data.

To better distinguish between these scenarios, we introduce a
serialization strategy enum that allows serializing either the full
announcement, just the mutations, or serve solely as a reminder and
serialize nothing at all.
  • Loading branch information
arik-so committed Jun 27, 2024
1 parent cb5f8e7 commit 9318017
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 41 deletions.
24 changes: 14 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::config::SYMLINK_GRANULARITY_INTERVAL;
use crate::lookup::DeltaSet;

use crate::persistence::GossipPersister;
use crate::serialization::{SerializationSet, UpdateSerialization};
use crate::serialization::{NodeSerializationStrategy, SerializationSet, UpdateSerialization};
use crate::snapshot::Snapshotter;
use crate::types::RGSSLogger;

Expand Down Expand Up @@ -306,6 +306,9 @@ fn serialize_delta<L: Deref + Clone>(serialization_details: &SerializationSet, s

if serialization_version >= 2 {
if let Some(node_delta) = serialization_details.node_mutations.get(&current_node_id) {
let strategy = node_delta.strategy.as_ref().unwrap();
let mut node_has_update = false;

/*
Bitmap:
7: expect extra data after the pubkey (a u16 for the count, and then that number of bytes)
Expand All @@ -317,10 +320,8 @@ fn serialize_delta<L: Deref + Clone>(serialization_details: &SerializationSet, s
0: used for odd keys
*/

if node_delta.has_address_set_changed {
node_address_update_count += 1;

let address_set = &node_delta.latest_details_after_seen.as_ref().unwrap().addresses;
if node_delta.has_address_set_changed || matches!(strategy, NodeSerializationStrategy::Full) {
let address_set = &node_delta.latest_known_details.as_ref().unwrap().addresses;
let mut address_serialization = Vec::new();

// we don't know a priori how many are <= 255 bytes
Expand All @@ -338,17 +339,20 @@ fn serialize_delta<L: Deref + Clone>(serialization_details: &SerializationSet, s
};
}

node_address_update_count += 1;
node_has_update = true;

// signal the presence of node addresses
current_node_delta_serialization[0] |= 1 << 2;
// serialize the actual addresses and count
total_address_count.write(&mut current_node_delta_serialization).unwrap();
current_node_delta_serialization.append(&mut address_serialization);
}

if node_delta.has_feature_set_changed {
if node_delta.has_feature_set_changed || matches!(strategy, NodeSerializationStrategy::Full) {
let latest_features = &node_delta.latest_known_details.as_ref().unwrap().features;
node_feature_update_count += 1;

let latest_features = &node_delta.latest_details_after_seen.as_ref().unwrap().features;
node_has_update = true;

// are these features among the most common ones?
if let Some(index) = serialization_details.node_announcement_feature_defaults.iter().position(|f| f == latest_features) {
Expand All @@ -360,9 +364,9 @@ fn serialize_delta<L: Deref + Clone>(serialization_details: &SerializationSet, s
}
}

if node_delta.has_address_set_changed || node_delta.has_feature_set_changed {
if node_has_update {
node_update_count += 1;
} else if node_delta.requires_reminder {
} else if node_delta.requires_reminder && matches!(strategy, NodeSerializationStrategy::Reminder) {
current_node_delta_serialization[0] |= 1 << 6;
}
}
Expand Down
58 changes: 35 additions & 23 deletions src/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use lightning::ln::features::NodeFeatures;
use lightning::util::logger::Logger;

use crate::config;
use crate::serialization::MutatedProperties;
use crate::serialization::{MutatedProperties, NodeSerializationStrategy};

/// The delta set needs to be a BTreeMap so the keys are sorted.
/// That way, the scids in the response automatically grow monotonically
Expand Down Expand Up @@ -54,7 +54,7 @@ pub(super) struct ChannelDelta {

pub(super) struct NodeDelta {
/// The most recently received, but new-to-the-client, node details
pub(super) latest_details_after_seen: Option<NodeDetails>,
pub(super) latest_known_details: Option<NodeDetails>,

/// Between last_details_before_seen and latest_details_after_seen, including any potential
/// intermediate updates that are not kept track of here, has the set of features this node
Expand All @@ -71,7 +71,10 @@ pub(super) struct NodeDelta {
pub(super) requires_reminder: bool,

/// The most recent node details that the client would have seen already
pub(super) last_details_before_seen: Option<NodeDetails>
pub(super) last_details_before_seen: Option<NodeDetails>,

/// How should this node be serialized
pub(super) strategy: Option<NodeSerializationStrategy>
}

pub(super) struct NodeDetails {
Expand All @@ -95,11 +98,12 @@ impl Default for ChannelDelta {
impl Default for NodeDelta {
fn default() -> Self {
Self {
latest_details_after_seen: None,
latest_known_details: None,
has_feature_set_changed: false,
has_address_set_changed: false,
requires_reminder: false,
last_details_before_seen: None,
strategy: None
}
}
}
Expand Down Expand Up @@ -532,7 +536,8 @@ pub(super) async fn fetch_node_updates<L: Deref + Clone>(client: &Client, last_s
// have been omitted)

let current_timestamp = snapshot_reference_timestamp.unwrap_or(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs());
let reminder_lookup_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs()).unwrap() as u32;
let reminder_inclusion_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs()).unwrap() as u32;
let reminder_lookup_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs() * 3).unwrap() as u32;

// this is the timestamp we need to fetch all relevant updates
let include_reminders = should_snapshot_include_reminders(last_sync_timestamp, current_timestamp, logger.clone());
Expand Down Expand Up @@ -586,21 +591,10 @@ pub(super) async fn fetch_node_updates<L: Deref + Clone>(client: &Client, last_s
current_node_delta.has_address_set_changed = true;
current_node_delta.requires_reminder = false;
}
} else if !is_previously_processed_node_id {
if current_node_delta.last_details_before_seen.is_none() {
if !address_set.is_empty() {
current_node_delta.has_address_set_changed = true;
current_node_delta.requires_reminder = false;
}
if unsigned_node_announcement.features != NodeFeatures::empty() {
current_node_delta.has_feature_set_changed = true;
current_node_delta.requires_reminder = false;
}
}
}

if !is_previously_processed_node_id {
(*current_node_delta).latest_details_after_seen.get_or_insert(NodeDetails {
(*current_node_delta).latest_known_details.get_or_insert(NodeDetails {
seen: current_seen_timestamp,
features: unsigned_node_announcement.features,
addresses: address_set,
Expand All @@ -610,13 +604,31 @@ pub(super) async fn fetch_node_updates<L: Deref + Clone>(client: &Client, last_s
// This node update occurred prior to the last_sync_timestamp, which means that this is
// purely for the purpose of considering whether a reminder might be necessary. Any
// mutation seen within the previous scope would have marked a reminder as unnecessary
if let Some(latest_update) = current_node_delta.latest_details_after_seen.as_ref() {
if unsigned_node_announcement.features != latest_update.features {
current_node_delta.requires_reminder = false;
}
if address_set != latest_update.addresses {
current_node_delta.requires_reminder = false;

// If the most recent mutation occurred prior to the last_sync_timestamp, there are only
// two considerations:
// If the latest mutation occurred within the last 6 days, ignore
// If the latest mutation occurred more than 6 days ago, send a reminder

// If we are in this current else clause, it already means that no mutation has occurred
// between last_sync_timestamp and now.
if let Some(latest_update) = current_node_delta.latest_known_details.as_ref() {
if current_seen_timestamp > reminder_inclusion_threshold_timestamp {
// if this mutation occurred within the last 6 days, no reminder is necessary
if unsigned_node_announcement.features != latest_update.features {
current_node_delta.requires_reminder = false;
}
if address_set != latest_update.addresses {
current_node_delta.requires_reminder = false;
}
}
} else {
// we're obtaining the latest seen details from before the last snapshot scope
current_node_delta.latest_known_details = Some(NodeDetails {
seen: current_seen_timestamp,
features: unsigned_node_announcement.features,
addresses: address_set,
});
}
}

Expand Down
44 changes: 36 additions & 8 deletions src/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,17 @@ impl UpdateSerialization {
}
}

pub(super) enum NodeSerializationStrategy {
/// Only serialize the aspects of the node ID that have been mutated. Skip if they haven't been
Mutated,
/// Whether or not the addresses or features have been mutated, serialize this node in full. It
/// may have been purged from the client.
Full,
/// This node ID has been seen recently enough to not have been pruned, and this update serves
/// solely the purpose of delaying any pruning, without applying any mutations
Reminder
}

struct FullUpdateValueHistograms {
cltv_expiry_delta: HashMap<u16, usize>,
htlc_minimum_msat: HashMap<u64, usize>,
Expand Down Expand Up @@ -220,24 +231,41 @@ pub(super) fn serialize_delta_set(channel_delta_set: DeltaSet, node_delta_set: N

serialization_set.full_update_defaults = default_update_values;

serialization_set.node_mutations = node_delta_set.into_iter().filter(|(_id, delta)| {
if delta.latest_details_after_seen.is_none() {
serialization_set.node_mutations = node_delta_set.into_iter().filter_map(|(id, mut delta)| {
if delta.latest_known_details.is_none() {
// this entry is vestigial due to the optimized reminder necessity lookup
return false;
return None;
}

let last_details_before_seen = if let Some(last_details_before_seen) = &delta.last_details_before_seen {
last_details_before_seen
} else {
// this node is new and needs full serialization
delta.strategy = Some(NodeSerializationStrategy::Full);
return Some((id, delta));
};

delta.strategy = Some(NodeSerializationStrategy::Mutated);
if delta.has_feature_set_changed || delta.has_address_set_changed {
return Some((id, delta));
}
if delta.last_details_before_seen.is_none() {
// this node is new and needs including
return true;

if !delta.requires_reminder {
return None;
}

// either something changed, or we're sending a reminder
// consider restricting snapshots that include reminders in the future
delta.has_feature_set_changed || delta.has_address_set_changed || delta.requires_reminder
if last_details_before_seen.seen > non_incremental_previous_update_threshold_timestamp {
delta.strategy = Some(NodeSerializationStrategy::Reminder);
}
Some((id, delta))
}).collect();

let mut node_feature_histogram: HashMap<&NodeFeatures, usize> = Default::default();
for (_id, delta) in serialization_set.node_mutations.iter() {
if delta.has_feature_set_changed || delta.last_details_before_seen.is_none() {
if let Some(latest_details) = delta.latest_details_after_seen.as_ref() {
if let Some(latest_details) = delta.latest_known_details.as_ref() {
*node_feature_histogram.entry(&latest_details.features).or_insert(0) += 1;
};
}
Expand Down

0 comments on commit 9318017

Please sign in to comment.