Skip to content

Commit

Permalink
Introduce node serialization strategy.
Browse files Browse the repository at this point in the history
With the addition of reminders, we may encounter scenarios where
either a bit flip may suffice, instructing the client to look up its
latest data, or we may need to serialize all announcement details
a new if the client may have already purged the old data.

To better distinguish between these scenarios, we introduce a
serialization strategy enum that allows serializing either the full
announcement, just the mutations, or serve solely as a reminder and
serialize nothing at all.
  • Loading branch information
arik-so committed Sep 18, 2024
1 parent f46b5ec commit c9aaf09
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 103 deletions.
92 changes: 52 additions & 40 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::config::SYMLINK_GRANULARITY_INTERVAL;
use crate::lookup::DeltaSet;

use crate::persistence::GossipPersister;
use crate::serialization::{SerializationSet, UpdateSerialization};
use crate::serialization::{MutatedNodeProperties, NodeSerializationStrategy, SerializationSet, UpdateSerialization};
use crate::snapshot::Snapshotter;
use crate::types::RGSSLogger;

Expand Down Expand Up @@ -191,7 +191,7 @@ async fn calculate_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>,
log_info!(logger, "announcement channel count: {}", delta_set.len());
lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await;
log_info!(logger, "update-fetched channel count: {}", delta_set.len());
let node_delta_set = lookup::fetch_node_updates(network_graph, &client, last_sync_timestamp, logger.clone()).await;
let node_delta_set = lookup::fetch_node_updates(network_graph, &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await;
log_info!(logger, "update-fetched node count: {}", node_delta_set.len());
lookup::filter_delta_set(&mut delta_set, logger.clone());
log_info!(logger, "update-filtered channel count: {}", delta_set.len());
Expand Down Expand Up @@ -306,6 +306,9 @@ fn serialize_delta<L: Deref + Clone>(serialization_details: &SerializationSet, s

if serialization_version >= 2 {
if let Some(node_delta) = serialization_details.node_mutations.get(&current_node_id) {
let strategy = node_delta.strategy.as_ref().unwrap();
let mut node_has_update = false;

/*
Bitmap:
7: expect extra data after the pubkey (a u16 for the count, and then that number of bytes)
Expand All @@ -317,51 +320,60 @@ fn serialize_delta<L: Deref + Clone>(serialization_details: &SerializationSet, s
0: used for odd keys
*/

if node_delta.has_address_set_changed {
node_address_update_count += 1;

let address_set = &node_delta.latest_details.as_ref().unwrap().addresses;
let mut address_serialization = Vec::new();

// we don't know a priori how many are <= 255 bytes
let mut total_address_count = 0u8;

for address in address_set.iter() {
if total_address_count == u8::MAX {
// don't serialize more than 255 addresses
break;
match strategy {
NodeSerializationStrategy::Mutated(MutatedNodeProperties { addresses: true, .. }) | NodeSerializationStrategy::Full => {
let address_set = &node_delta.latest_details.as_ref().unwrap().addresses;
let mut address_serialization = Vec::new();

// we don't know a priori how many are <= 255 bytes
let mut total_address_count = 0u8;

for address in address_set.iter() {
if total_address_count == u8::MAX {
// don't serialize more than 255 addresses
break;
}
if let Ok(serialized_length) = u8::try_from(address.serialized_length()) {
total_address_count += 1;
serialized_length.write(&mut address_serialization).unwrap();
address.write(&mut address_serialization).unwrap();
};
}
if let Ok(serialized_length) = u8::try_from(address.serialized_length()) {
total_address_count += 1;
serialized_length.write(&mut address_serialization).unwrap();
address.write(&mut address_serialization).unwrap();
};
}

// signal the presence of node addresses
current_node_delta_serialization[0] |= 1 << 2;
// serialize the actual addresses and count
total_address_count.write(&mut current_node_delta_serialization).unwrap();
current_node_delta_serialization.append(&mut address_serialization);
}

if node_delta.has_feature_set_changed {
node_feature_update_count += 1;
node_address_update_count += 1;
node_has_update = true;

let latest_features = &node_delta.latest_details.as_ref().unwrap().features;
// signal the presence of node addresses
current_node_delta_serialization[0] |= 1 << 2;
// serialize the actual addresses and count
total_address_count.write(&mut current_node_delta_serialization).unwrap();
current_node_delta_serialization.append(&mut address_serialization);
},
_ => {}
}

// are these features among the most common ones?
if let Some(index) = serialization_details.node_announcement_feature_defaults.iter().position(|f| f == latest_features) {
// this feature set is among the 6 defaults
current_node_delta_serialization[0] |= ((index + 1) as u8) << 3;
} else {
current_node_delta_serialization[0] |= 0b_0011_1000; // 7 << 3
latest_features.write(&mut current_node_delta_serialization).unwrap();
}
match strategy {
NodeSerializationStrategy::Mutated(MutatedNodeProperties { features: true, .. }) | NodeSerializationStrategy::Full => {
let latest_features = &node_delta.latest_details.as_ref().unwrap().features;
node_feature_update_count += 1;
node_has_update = true;

// are these features among the most common ones?
if let Some(index) = serialization_details.node_announcement_feature_defaults.iter().position(|f| f == latest_features) {
// this feature set is among the 6 defaults
current_node_delta_serialization[0] |= ((index + 1) as u8) << 3;
} else {
current_node_delta_serialization[0] |= 0b_0011_1000; // 7 << 3
latest_features.write(&mut current_node_delta_serialization).unwrap();
}
},
_ => {}
}

if node_delta.has_address_set_changed || node_delta.has_feature_set_changed {
if node_has_update {
node_update_count += 1;
} else if let NodeSerializationStrategy::Reminder = strategy {
current_node_delta_serialization[0] |= 1 << 6;
}
}
}
Expand Down
113 changes: 72 additions & 41 deletions src/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use lightning::ln::features::NodeFeatures;
use lightning::util::logger::Logger;

use crate::config;
use crate::serialization::MutatedProperties;
use crate::serialization::{MutatedNodeProperties, MutatedProperties, NodeSerializationStrategy};

/// The delta set needs to be a BTreeMap so the keys are sorted.
/// That way, the scids in the response automatically grow monotonically
Expand Down Expand Up @@ -58,23 +58,15 @@ pub(super) struct NodeDelta {
/// The most recently received, but new-to-the-client, node details
pub(super) latest_details: Option<NodeDetails>,

/// Between last_details_before_seen and latest_details_after_seen, including any potential
/// intermediate updates that are not kept track of here, has the set of features this node
/// supports changed?
pub(super) has_feature_set_changed: bool,

/// Between last_details_before_seen and latest_details_after_seen, including any potential
/// intermediate updates that are not kept track of here, has the set of socket addresses this
/// node listens on changed?
pub(super) has_address_set_changed: bool,
/// How should this delta be serialized?
pub(super) strategy: Option<NodeSerializationStrategy>,

/// The most recent node details that the client would have seen already
pub(super) last_details_before_seen: Option<NodeDetails>
}

pub(super) struct NodeDetails {
#[allow(unused)]
pub(super) seen: u32,
pub(super) seen: Option<u32>,
pub(super) features: NodeFeatures,
pub(super) addresses: HashSet<SocketAddress>
}
Expand All @@ -94,9 +86,8 @@ impl Default for NodeDelta {
fn default() -> Self {
Self {
latest_details: None,
has_feature_set_changed: false,
has_address_set_changed: false,
last_details_before_seen: None,
strategy: None,
}
}
}
Expand Down Expand Up @@ -478,7 +469,7 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
}

pub(super) async fn fetch_node_updates<L: Deref>(network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, logger: L) -> NodeDeltaSet where L::Target: Logger {
pub(super) async fn fetch_node_updates<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, snapshot_reference_timestamp: Option<u64>, logger: L) -> NodeDeltaSet where L::Target: Logger {
let start = Instant::now();
let last_sync_timestamp_float = last_sync_timestamp as f64;

Expand All @@ -487,7 +478,7 @@ pub(super) async fn fetch_node_updates<L: Deref>(network_graph: Arc<NetworkGraph
read_only_graph.nodes().unordered_iter().flat_map(|(node_id, node_info)| {
let details: NodeDetails = if let Some(details) = node_info.announcement_info.as_ref() {
NodeDetails {
seen: 0,
seen: None,
features: details.features().clone(),
addresses: details.addresses().into_iter().cloned().collect(),
}
Expand All @@ -496,8 +487,7 @@ pub(super) async fn fetch_node_updates<L: Deref>(network_graph: Arc<NetworkGraph
};
Some((node_id.clone(), NodeDelta {
latest_details: Some(details),
has_feature_set_changed: false,
has_address_set_changed: false,
strategy: None,
last_details_before_seen: None,
}))
}).collect()
Expand Down Expand Up @@ -536,7 +526,7 @@ pub(super) async fn fetch_node_updates<L: Deref>(network_graph: Arc<NetworkGraph
(*current_node_delta).last_details_before_seen.get_or_insert_with(|| {
let address_set: HashSet<SocketAddress> = unsigned_node_announcement.addresses.into_iter().collect();
NodeDetails {
seen,
seen: Some(seen),
features: unsigned_node_announcement.features,
addresses: address_set,
}
Expand All @@ -550,10 +540,29 @@ pub(super) async fn fetch_node_updates<L: Deref>(network_graph: Arc<NetworkGraph
log_info!(logger, "Processed {} node announcement reference rows (delta size: {}) in {:?}",
reference_row_count, delta_set.len(), start.elapsed());

let current_timestamp = snapshot_reference_timestamp.unwrap_or(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs());
let reminder_inclusion_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs()).unwrap() as u32;
let reminder_lookup_threshold_timestamp = current_timestamp.checked_sub(config::PRUNE_INTERVAL.as_secs()).unwrap() as u32;

// this is the timestamp we need to fetch all relevant updates
let include_reminders = should_snapshot_include_reminders(last_sync_timestamp, current_timestamp, &logger);
let effective_threshold_timestamp = if include_reminders {
std::cmp::min(last_sync_timestamp, reminder_lookup_threshold_timestamp) as f64
} else {
// If we include reminders, the decision logic is as follows:
// If the pre-sync update was more than 6 days ago, serialize in full.
// Otherwise:
// If the last mutation occurred after the last sync, serialize the mutated properties.
// Otherwise:
// If the last mutation occurred more than 6 days ago, serialize as a reminder.
// Otherwise, don't serialize at all.
last_sync_timestamp as f64
};

// get all the intermediate node updates
// (to calculate the set of mutated fields for snapshotting, where intermediate updates may
// have been omitted)
let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&node_ids, &last_sync_timestamp_float];
let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&node_ids, &effective_threshold_timestamp];
let intermediate_updates = client.query_raw("
SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
FROM node_announcements
Expand All @@ -568,6 +577,9 @@ pub(super) async fn fetch_node_updates<L: Deref>(network_graph: Arc<NetworkGraph
let mut previous_node_id: Option<NodeId> = None;

let mut intermediate_update_count = 0;
let mut has_address_set_changed = false;
let mut has_feature_set_changed = false;
let mut latest_mutation_timestamp = None;
while let Some(row_res) = pinned_updates.next().await {
let intermediate_update = row_res.unwrap();
intermediate_update_count += 1;
Expand All @@ -578,37 +590,56 @@ pub(super) async fn fetch_node_updates<L: Deref>(network_graph: Arc<NetworkGraph
let unsigned_node_announcement = NodeAnnouncement::read(&mut readable).unwrap().contents;

let node_id = unsigned_node_announcement.node_id;
let is_previously_processed_node_id = Some(node_id) == previous_node_id;

// get this node's address set
let current_node_delta = delta_set.entry(node_id).or_insert(NodeDelta::default());
let address_set: HashSet<SocketAddress> = unsigned_node_announcement.addresses.into_iter().collect();

// determine mutations
if previous_node_id != Some(node_id) {
// we're traversing a new node id, initialize the values
has_address_set_changed = false;
has_feature_set_changed = false;
latest_mutation_timestamp = None;

// this is the highest timestamp value, so set the seen timestamp accordingly
current_node_delta.latest_details.as_mut().map(|mut d| d.seen.replace(current_seen_timestamp));
}

if let Some(last_seen_update) = current_node_delta.last_details_before_seen.as_ref() {
if unsigned_node_announcement.features != last_seen_update.features {
current_node_delta.has_feature_set_changed = true;
}
if address_set != last_seen_update.addresses {
current_node_delta.has_address_set_changed = true;
}
} else if !is_previously_processed_node_id {
if current_node_delta.last_details_before_seen.is_none() {
if !address_set.is_empty() {
current_node_delta.has_address_set_changed = true;
{ // determine the latest mutation timestamp
if address_set != last_seen_update.addresses {
has_address_set_changed = true;
if latest_mutation_timestamp.is_none() {
latest_mutation_timestamp = Some(current_seen_timestamp);
}
}
if unsigned_node_announcement.features != NodeFeatures::empty() {
current_node_delta.has_feature_set_changed = true;
if unsigned_node_announcement.features != last_seen_update.features {
has_feature_set_changed = true;
if latest_mutation_timestamp.is_none() {
latest_mutation_timestamp = Some(current_seen_timestamp);
}
}
}
}

if !is_previously_processed_node_id {
(*current_node_delta).latest_details.get_or_insert(NodeDetails {
seen: current_seen_timestamp,
features: unsigned_node_announcement.features,
addresses: address_set,
});
if current_seen_timestamp >= last_sync_timestamp {
if has_address_set_changed || has_feature_set_changed {
// if the last mutation occurred since the last sync, send the mutation variant
current_node_delta.strategy = Some(NodeSerializationStrategy::Mutated(MutatedNodeProperties {
addresses: has_address_set_changed,
features: has_feature_set_changed,
}));
}
} else if include_reminders && latest_mutation_timestamp.unwrap_or(u32::MAX) <= reminder_inclusion_threshold_timestamp {
// only send a reminder if the latest mutation occurred at least 6 days ago
current_node_delta.strategy = Some(NodeSerializationStrategy::Reminder);
}

// Note that we completely ignore the case when the last mutation occurred less than
// 6 days ago, but prior to the last sync. In that scenario, we send nothing.

} else {
// absent any update that was seen prior to the last sync, send the full version
current_node_delta.strategy = Some(NodeSerializationStrategy::Full);
}

previous_node_id = Some(node_id);
Expand Down
26 changes: 24 additions & 2 deletions src/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,22 @@ impl UpdateSerialization {
}
}

pub(super) struct MutatedNodeProperties {
pub(super) addresses: bool,
pub(super) features: bool,
}

pub(super) enum NodeSerializationStrategy {
/// Only serialize the aspects of the node ID that have been mutated. Skip if they haven't been
Mutated(MutatedNodeProperties),
/// Whether or not the addresses or features have been mutated, serialize this node in full. It
/// may have been purged from the client.
Full,
/// This node ID has been seen recently enough to not have been pruned, and this update serves
/// solely the purpose of delaying any pruning, without applying any mutations
Reminder
}

struct FullUpdateValueHistograms {
cltv_expiry_delta: HashMap<u16, usize>,
htlc_minimum_msat: HashMap<u64, usize>,
Expand Down Expand Up @@ -222,12 +238,18 @@ pub(super) fn serialize_delta_set(channel_delta_set: DeltaSet, node_delta_set: N

serialization_set.node_mutations = node_delta_set.into_iter().filter(|(_id, delta)| {
// either something changed, or this node is new
delta.has_feature_set_changed || delta.has_address_set_changed || delta.last_details_before_seen.is_none()
delta.strategy.is_some()
}).collect();

let mut node_feature_histogram: HashMap<&NodeFeatures, usize> = Default::default();
for (_id, delta) in serialization_set.node_mutations.iter() {
if delta.has_feature_set_changed || delta.last_details_before_seen.is_none() {
// consider either full or feature-mutating serializations for histogram
let mut should_add_to_histogram = matches!(delta.strategy, Some(NodeSerializationStrategy::Full));
if let Some(NodeSerializationStrategy::Mutated(mutation)) = delta.strategy.as_ref() {
should_add_to_histogram = mutation.features;
}

if should_add_to_histogram {
if let Some(latest_details) = delta.latest_details.as_ref() {
*node_feature_histogram.entry(&latest_details.features).or_insert(0) += 1;
};
Expand Down
Loading

0 comments on commit c9aaf09

Please sign in to comment.