Skip to content

Commit

Permalink
Check reminder necessity for node updates.
Browse files Browse the repository at this point in the history
As we had already been doing with channel updates, we now also
consider node announcements' seen timestamps to determinate whether
a given node might necessitate a reminder. We reuse the same
threshold that we've already been using for channels.
  • Loading branch information
arik-so committed Jun 27, 2024
1 parent c70cacc commit 1d57bc5
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 41 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ async fn calculate_delta<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>,
log_info!(logger, "announcement channel count: {}", delta_set.len());
lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await;
log_info!(logger, "update-fetched channel count: {}", delta_set.len());
let node_delta_set = lookup::fetch_node_updates(&client, last_sync_timestamp, logger.clone()).await;
let node_delta_set = lookup::fetch_node_updates(&client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await;
log_info!(logger, "update-fetched node count: {}", node_delta_set.len());
lookup::filter_delta_set(&mut delta_set, logger.clone());
log_info!(logger, "update-filtered channel count: {}", delta_set.len());
Expand Down
122 changes: 82 additions & 40 deletions src/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ pub(super) struct NodeDelta {
/// node listens on changed?
pub(super) has_address_set_changed: bool,

/// Neither features nor addresses have changed within the reminder threshold interval
/// Only applicable if that interval is greater than the seen interval
pub(super) requires_reminder: bool,

/// The most recent node details that the client would have seen already
pub(super) last_details_before_seen: Option<NodeDetails>
}
Expand Down Expand Up @@ -94,6 +98,7 @@ impl Default for NodeDelta {
latest_details_after_seen: None,
has_feature_set_changed: false,
has_address_set_changed: false,
requires_reminder: false,
last_details_before_seen: None,
}
}
Expand All @@ -110,11 +115,29 @@ impl Default for DirectedUpdateDelta {
}
}

fn should_snapshot_include_reminders<L: Deref>(last_sync_timestamp: u32, current_timestamp: u64, logger: L) -> bool where L::Target: Logger {
let current_hour = current_timestamp / 3600;
let current_day = current_timestamp / (24 * 3600);

log_debug!(logger, "Current day index: {}", current_day);
log_debug!(logger, "Current hour: {}", current_hour);

// every 5th day at midnight
let is_reminder_hour = (current_hour % 24) == 0;
let is_reminder_day = (current_day % 5) == 0;

let snapshot_scope = current_timestamp.saturating_sub(last_sync_timestamp as u64);
let is_reminder_scope = snapshot_scope > (50 * 3600);
log_debug!(logger, "Snapshot scope: {}s", snapshot_scope);

(is_reminder_hour && is_reminder_day) || is_reminder_scope
}

/// Fetch all the channel announcements that are presently in the network graph, regardless of
/// whether they had been seen before.
/// Also include all announcements for which the first update was announced
/// after `last_sync_timestamp`
pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, snapshot_reference_timestamp: Option<u64>, logger: L) where L::Target: Logger {
pub(super) async fn fetch_channel_announcements<L: Deref + Clone>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, snapshot_reference_timestamp: Option<u64>, logger: L) where L::Target: Logger {
log_info!(logger, "Obtaining channel ids from network graph");
let channel_ids = {
let read_only_graph = network_graph.read_only();
Expand All @@ -133,23 +156,7 @@ pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaS
let current_timestamp = snapshot_reference_timestamp.unwrap_or(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs());
log_info!(logger, "Current timestamp: {}", current_timestamp);

let include_reminders = {
let current_hour = current_timestamp / 3600;
let current_day = current_timestamp / (24 * 3600);

log_debug!(logger, "Current day index: {}", current_day);
log_debug!(logger, "Current hour: {}", current_hour);

// every 5th day at midnight
let is_reminder_hour = (current_hour % 24) == 0;
let is_reminder_day = (current_day % 5) == 0;

let snapshot_scope = current_timestamp.saturating_sub(last_sync_timestamp as u64);
let is_reminder_scope = snapshot_scope > (50 * 3600);
log_debug!(logger, "Snapshot scope: {}s", snapshot_scope);

(is_reminder_hour && is_reminder_day) || is_reminder_scope
};
let include_reminders = should_snapshot_include_reminders(last_sync_timestamp, current_timestamp, logger.clone());

log_info!(logger, "Obtaining corresponding database entries");
// get all the channel announcements that are currently in the network graph
Expand Down Expand Up @@ -474,7 +481,7 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
}

pub(super) async fn fetch_node_updates<L: Deref>(client: &Client, last_sync_timestamp: u32, logger: L) -> NodeDeltaSet where L::Target: Logger {
pub(super) async fn fetch_node_updates<L: Deref + Clone>(client: &Client, last_sync_timestamp: u32, snapshot_reference_timestamp: Option<u64>, logger: L) -> NodeDeltaSet where L::Target: Logger {
let start = Instant::now();
let last_sync_timestamp_float = last_sync_timestamp as f64;

Expand Down Expand Up @@ -523,12 +530,24 @@ pub(super) async fn fetch_node_updates<L: Deref>(client: &Client, last_sync_time
// get all the intermediate node updates
// (to calculate the set of mutated fields for snapshotting, where intermediate updates may
// have been omitted)

let current_timestamp = snapshot_reference_timestamp.unwrap_or(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs());
let reminder_lookup_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs()).unwrap() as u32;

// this is the timestamp we need to fetch all relevant updates
let include_reminders = should_snapshot_include_reminders(last_sync_timestamp, current_timestamp, logger.clone());
let effective_threshold_timestamp = if include_reminders {
std::cmp::min(last_sync_timestamp, reminder_lookup_threshold_timestamp) as f64
} else {
last_sync_timestamp as f64
};

let intermediate_updates = client.query_raw("
SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
FROM node_announcements
WHERE seen >= TO_TIMESTAMP($1)
ORDER BY public_key ASC, timestamp DESC
", [last_sync_timestamp_float]).await.unwrap();
", [effective_threshold_timestamp]).await.unwrap();
let mut pinned_updates = Box::pin(intermediate_updates);
log_info!(logger, "Fetched intermediate node announcement rows in {:?}", start.elapsed());

Expand All @@ -547,35 +566,58 @@ pub(super) async fn fetch_node_updates<L: Deref>(client: &Client, last_sync_time
let node_id = unsigned_node_announcement.node_id;
let is_previously_processed_node_id = Some(node_id) == previous_node_id;

let current_node_delta = delta_set.entry(node_id).or_insert(NodeDelta {
// start out as true by default, and unset later if necessary
requires_reminder: include_reminders,
..Default::default()
});

// get this node's address set
let current_node_delta = delta_set.entry(node_id).or_insert(NodeDelta::default());
let address_set: HashSet<SocketAddress> = unsigned_node_announcement.addresses.into_iter().collect();

// determine mutations
if let Some(last_seen_update) = current_node_delta.last_details_before_seen.as_ref() {
if unsigned_node_announcement.features != last_seen_update.features {
current_node_delta.has_feature_set_changed = true;
}
if address_set != last_seen_update.addresses {
current_node_delta.has_address_set_changed = true;
}
} else if !is_previously_processed_node_id {
if current_node_delta.last_details_before_seen.is_none() {
if !address_set.is_empty() {
if current_seen_timestamp >= last_sync_timestamp {
if let Some(last_seen_update) = current_node_delta.last_details_before_seen.as_ref() {
if unsigned_node_announcement.features != last_seen_update.features {
current_node_delta.has_feature_set_changed = true;
current_node_delta.requires_reminder = false;
}
if address_set != last_seen_update.addresses {
current_node_delta.has_address_set_changed = true;
current_node_delta.requires_reminder = false;
}
if unsigned_node_announcement.features != NodeFeatures::empty() {
current_node_delta.has_feature_set_changed = true;
} else if !is_previously_processed_node_id {
if current_node_delta.last_details_before_seen.is_none() {
if !address_set.is_empty() {
current_node_delta.has_address_set_changed = true;
current_node_delta.requires_reminder = false;
}
if unsigned_node_announcement.features != NodeFeatures::empty() {
current_node_delta.has_feature_set_changed = true;
current_node_delta.requires_reminder = false;
}
}
}
}

if !is_previously_processed_node_id {
(*current_node_delta).latest_details_after_seen.get_or_insert(NodeDetails {
seen: current_seen_timestamp,
features: unsigned_node_announcement.features,
addresses: address_set,
});
if !is_previously_processed_node_id {
(*current_node_delta).latest_details_after_seen.get_or_insert(NodeDetails {
seen: current_seen_timestamp,
features: unsigned_node_announcement.features,
addresses: address_set,
});
}
} else if include_reminders && current_node_delta.requires_reminder {
// This node update occurred prior to the last_sync_timestamp, which means that this is
// purely for the purpose of considering whether a reminder might be necessary. Any
// mutation seen within the previous scope would have marked a reminder as unnecessary
if let Some(latest_update) = current_node_delta.latest_details_after_seen.as_ref() {
if unsigned_node_announcement.features != latest_update.features {
current_node_delta.requires_reminder = false;
}
if address_set != latest_update.addresses {
current_node_delta.requires_reminder = false;
}
}
}

previous_node_id = Some(node_id);
Expand Down

0 comments on commit 1d57bc5

Please sign in to comment.