Skip to content

Commit

Permalink
Drop half-life-based bucket decay in update_history_buckets
Browse files Browse the repository at this point in the history
Because we decay the bucket information in the background, there's
not much reason to try to decay them immediately prior to updating,
and in removing that we can also clean up a good bit of dead code,
which we do here.
  • Loading branch information
TheBlueMatt committed Dec 13, 2023
1 parent 21facd0 commit 18b4231
Showing 1 changed file with 38 additions and 98 deletions.
136 changes: 38 additions & 98 deletions lightning/src/routing/scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ struct ChannelLiquidity {
min_liquidity_offset_history: HistoricalBucketRangeTracker,
max_liquidity_offset_history: HistoricalBucketRangeTracker,

/// Time when the liquidity bounds were last modified as an offset since the unix epoch.
/// Time when either liquidity bound was last modified as an offset since the unix epoch.
last_updated: Duration,

/// Time when the historical liquidity bounds were last modified as an offset against the unix
Expand All @@ -805,7 +805,6 @@ struct DirectedChannelLiquidity<L: Deref<Target = u64>, BRT: Deref<Target = Hist
capacity_msat: u64,
last_updated: T,
offset_history_last_updated: T,
decay_params: ProbabilisticScoringDecayParameters,
}

impl<G: Deref<Target = NetworkGraph<L>>, L: Deref> ProbabilisticScorer<G, L> where L::Target: Logger {
Expand Down Expand Up @@ -837,7 +836,7 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref> ProbabilisticScorer<G, L> whe
let log_direction = |source, target| {
if let Some((directed_info, _)) = chan_debug.as_directed_to(target) {
let amt = directed_info.effective_capacity().as_msat();
let dir_liq = liq.as_directed(source, target, amt, self.decay_params);
let dir_liq = liq.as_directed(source, target, amt);

let min_buckets = &dir_liq.liquidity_history.min_liquidity_offset_history.buckets;
let max_buckets = &dir_liq.liquidity_history.max_liquidity_offset_history.buckets;
Expand Down Expand Up @@ -889,7 +888,7 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref> ProbabilisticScorer<G, L> whe
if let Some(liq) = self.channel_liquidities.get(&scid) {
if let Some((directed_info, source)) = chan.as_directed_to(target) {
let amt = directed_info.effective_capacity().as_msat();
let dir_liq = liq.as_directed(source, target, amt, self.decay_params);
let dir_liq = liq.as_directed(source, target, amt);
return Some((dir_liq.min_liquidity_msat(), dir_liq.max_liquidity_msat()));
}
}
Expand Down Expand Up @@ -931,7 +930,7 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref> ProbabilisticScorer<G, L> whe
if let Some(liq) = self.channel_liquidities.get(&scid) {
if let Some((directed_info, source)) = chan.as_directed_to(target) {
let amt = directed_info.effective_capacity().as_msat();
let dir_liq = liq.as_directed(source, target, amt, self.decay_params);
let dir_liq = liq.as_directed(source, target, amt);

let min_buckets = dir_liq.liquidity_history.min_liquidity_offset_history.buckets;
let mut max_buckets = dir_liq.liquidity_history.max_liquidity_offset_history.buckets;
Expand Down Expand Up @@ -962,7 +961,7 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref> ProbabilisticScorer<G, L> whe
if let Some(liq) = self.channel_liquidities.get(&scid) {
if let Some((directed_info, source)) = chan.as_directed_to(target) {
let capacity_msat = directed_info.effective_capacity().as_msat();
let dir_liq = liq.as_directed(source, target, capacity_msat, self.decay_params);
let dir_liq = liq.as_directed(source, target, capacity_msat);

return dir_liq.liquidity_history.calculate_success_probability_times_billion(
&params, amount_msat, capacity_msat
Expand All @@ -989,7 +988,7 @@ impl ChannelLiquidity {
/// Returns a view of the channel liquidity directed from `source` to `target` assuming
/// `capacity_msat`.
fn as_directed(
&self, source: &NodeId, target: &NodeId, capacity_msat: u64, decay_params: ProbabilisticScoringDecayParameters
&self, source: &NodeId, target: &NodeId, capacity_msat: u64,
) -> DirectedChannelLiquidity<&u64, &HistoricalBucketRangeTracker, &Duration> {
let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) =
if source < target {
Expand All @@ -1010,14 +1009,13 @@ impl ChannelLiquidity {
capacity_msat,
last_updated: &self.last_updated,
offset_history_last_updated: &self.offset_history_last_updated,
decay_params: decay_params,
}
}

/// Returns a mutable view of the channel liquidity directed from `source` to `target` assuming
/// `capacity_msat`.
fn as_directed_mut(
&mut self, source: &NodeId, target: &NodeId, capacity_msat: u64, decay_params: ProbabilisticScoringDecayParameters
&mut self, source: &NodeId, target: &NodeId, capacity_msat: u64,
) -> DirectedChannelLiquidity<&mut u64, &mut HistoricalBucketRangeTracker, &mut Duration> {
let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) =
if source < target {
Expand All @@ -1038,7 +1036,6 @@ impl ChannelLiquidity {
capacity_msat,
last_updated: &mut self.last_updated,
offset_history_last_updated: &mut self.offset_history_last_updated,
decay_params: decay_params,
}
}

Expand Down Expand Up @@ -1289,14 +1286,6 @@ DirectedChannelLiquidity<L, BRT, T> {
/// state"), we allow the caller to set an offset applied to our liquidity bounds which
/// represents the amount of the successful payment we just made.
fn update_history_buckets(&mut self, bucket_offset_msat: u64, duration_since_epoch: Duration) {
let half_lives =
duration_since_epoch.checked_sub(*self.offset_history_last_updated)
.unwrap_or(Duration::ZERO).as_secs()
.checked_div(self.decay_params.historical_no_updates_half_life.as_secs())
.map(|v| v.try_into().unwrap_or(u32::max_value())).unwrap_or(u32::max_value());
self.liquidity_history.min_liquidity_offset_history.time_decay_data(half_lives);
self.liquidity_history.max_liquidity_offset_history.time_decay_data(half_lives);

self.liquidity_history.min_liquidity_offset_history.track_datapoint(
*self.min_liquidity_offset_msat + bucket_offset_msat, self.capacity_msat
);
Expand Down Expand Up @@ -1369,7 +1358,7 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref> ScoreLookUp for Probabilistic
self.channel_liquidities
.get(&scid)
.unwrap_or(&ChannelLiquidity::new(Duration::ZERO))
.as_directed(&source, &target, capacity_msat, self.decay_params)
.as_directed(&source, &target, capacity_msat)
.penalty_msat(amount_msat, score_params)
.saturating_add(anti_probing_penalty_msat)
.saturating_add(base_penalty_msat)
Expand Down Expand Up @@ -1399,14 +1388,14 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref> ScoreUpdate for Probabilistic
self.channel_liquidities
.entry(hop.short_channel_id)
.or_insert_with(|| ChannelLiquidity::new(duration_since_epoch))
.as_directed_mut(source, &target, capacity_msat, self.decay_params)
.as_directed_mut(source, &target, capacity_msat)
.failed_at_channel(amount_msat, duration_since_epoch,
format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger);
} else {
self.channel_liquidities
.entry(hop.short_channel_id)
.or_insert_with(|| ChannelLiquidity::new(duration_since_epoch))
.as_directed_mut(source, &target, capacity_msat, self.decay_params)
.as_directed_mut(source, &target, capacity_msat)
.failed_downstream(amount_msat, duration_since_epoch,
format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger);
}
Expand Down Expand Up @@ -1435,7 +1424,7 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref> ScoreUpdate for Probabilistic
self.channel_liquidities
.entry(hop.short_channel_id)
.or_insert_with(|| ChannelLiquidity::new(duration_since_epoch))
.as_directed_mut(source, &target, capacity_msat, self.decay_params)
.as_directed_mut(source, &target, capacity_msat)
.successful(amount_msat, duration_since_epoch,
format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger);
} else {
Expand Down Expand Up @@ -1959,14 +1948,6 @@ mod bucketed_history {
self.buckets[bucket] = self.buckets[bucket].saturating_add(BUCKET_FIXED_POINT_ONE);
}
}
/// Decay all buckets by the given number of half-lives. Used to more aggressively remove old
/// datapoints as we receive newer information.
#[inline]
pub(super) fn time_decay_data(&mut self, half_lives: u32) {
for e in self.buckets.iter_mut() {
*e = e.checked_shr(half_lives).unwrap_or(0);
}
}
}

impl_writeable_tlv_based!(HistoricalBucketRangeTracker, { (0, buckets, required) });
Expand Down Expand Up @@ -2359,52 +2340,52 @@ mod tests {
// Update minimum liquidity.

let liquidity = scorer.channel_liquidities.get(&42).unwrap()
.as_directed(&source, &target, 1_000, decay_params);
.as_directed(&source, &target, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 100);
assert_eq!(liquidity.max_liquidity_msat(), 300);

let liquidity = scorer.channel_liquidities.get(&42).unwrap()
.as_directed(&target, &source, 1_000, decay_params);
.as_directed(&target, &source, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 700);
assert_eq!(liquidity.max_liquidity_msat(), 900);

scorer.channel_liquidities.get_mut(&42).unwrap()
.as_directed_mut(&source, &target, 1_000, decay_params)
.as_directed_mut(&source, &target, 1_000)
.set_min_liquidity_msat(200, Duration::ZERO);

let liquidity = scorer.channel_liquidities.get(&42).unwrap()
.as_directed(&source, &target, 1_000, decay_params);
.as_directed(&source, &target, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 200);
assert_eq!(liquidity.max_liquidity_msat(), 300);

let liquidity = scorer.channel_liquidities.get(&42).unwrap()
.as_directed(&target, &source, 1_000, decay_params);
.as_directed(&target, &source, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 700);
assert_eq!(liquidity.max_liquidity_msat(), 800);

// Update maximum liquidity.

let liquidity = scorer.channel_liquidities.get(&43).unwrap()
.as_directed(&target, &recipient, 1_000, decay_params);
.as_directed(&target, &recipient, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 700);
assert_eq!(liquidity.max_liquidity_msat(), 900);

let liquidity = scorer.channel_liquidities.get(&43).unwrap()
.as_directed(&recipient, &target, 1_000, decay_params);
.as_directed(&recipient, &target, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 100);
assert_eq!(liquidity.max_liquidity_msat(), 300);

scorer.channel_liquidities.get_mut(&43).unwrap()
.as_directed_mut(&target, &recipient, 1_000, decay_params)
.as_directed_mut(&target, &recipient, 1_000)
.set_max_liquidity_msat(200, Duration::ZERO);

let liquidity = scorer.channel_liquidities.get(&43).unwrap()
.as_directed(&target, &recipient, 1_000, decay_params);
.as_directed(&target, &recipient, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 0);
assert_eq!(liquidity.max_liquidity_msat(), 200);

let liquidity = scorer.channel_liquidities.get(&43).unwrap()
.as_directed(&recipient, &target, 1_000, decay_params);
.as_directed(&recipient, &target, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 800);
assert_eq!(liquidity.max_liquidity_msat(), 1000);
}
Expand All @@ -2430,42 +2411,42 @@ mod tests {

// Check initial bounds.
let liquidity = scorer.channel_liquidities.get(&42).unwrap()
.as_directed(&source, &target, 1_000, decay_params);
.as_directed(&source, &target, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 400);
assert_eq!(liquidity.max_liquidity_msat(), 800);

let liquidity = scorer.channel_liquidities.get(&42).unwrap()
.as_directed(&target, &source, 1_000, decay_params);
.as_directed(&target, &source, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 200);
assert_eq!(liquidity.max_liquidity_msat(), 600);

// Reset from source to target.
scorer.channel_liquidities.get_mut(&42).unwrap()
.as_directed_mut(&source, &target, 1_000, decay_params)
.as_directed_mut(&source, &target, 1_000)
.set_min_liquidity_msat(900, Duration::ZERO);

let liquidity = scorer.channel_liquidities.get(&42).unwrap()
.as_directed(&source, &target, 1_000, decay_params);
.as_directed(&source, &target, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 900);
assert_eq!(liquidity.max_liquidity_msat(), 1_000);

let liquidity = scorer.channel_liquidities.get(&42).unwrap()
.as_directed(&target, &source, 1_000, decay_params);
.as_directed(&target, &source, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 0);
assert_eq!(liquidity.max_liquidity_msat(), 100);

// Reset from target to source.
scorer.channel_liquidities.get_mut(&42).unwrap()
.as_directed_mut(&target, &source, 1_000, decay_params)
.as_directed_mut(&target, &source, 1_000)
.set_min_liquidity_msat(400, Duration::ZERO);

let liquidity = scorer.channel_liquidities.get(&42).unwrap()
.as_directed(&source, &target, 1_000, decay_params);
.as_directed(&source, &target, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 0);
assert_eq!(liquidity.max_liquidity_msat(), 600);

let liquidity = scorer.channel_liquidities.get(&42).unwrap()
.as_directed(&target, &source, 1_000, decay_params);
.as_directed(&target, &source, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 400);
assert_eq!(liquidity.max_liquidity_msat(), 1_000);
}
Expand All @@ -2491,42 +2472,42 @@ mod tests {

// Check initial bounds.
let liquidity = scorer.channel_liquidities.get(&42).unwrap()
.as_directed(&source, &target, 1_000, decay_params);
.as_directed(&source, &target, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 400);
assert_eq!(liquidity.max_liquidity_msat(), 800);

let liquidity = scorer.channel_liquidities.get(&42).unwrap()
.as_directed(&target, &source, 1_000, decay_params);
.as_directed(&target, &source, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 200);
assert_eq!(liquidity.max_liquidity_msat(), 600);

// Reset from source to target.
scorer.channel_liquidities.get_mut(&42).unwrap()
.as_directed_mut(&source, &target, 1_000, decay_params)
.as_directed_mut(&source, &target, 1_000)
.set_max_liquidity_msat(300, Duration::ZERO);

let liquidity = scorer.channel_liquidities.get(&42).unwrap()
.as_directed(&source, &target, 1_000, decay_params);
.as_directed(&source, &target, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 0);
assert_eq!(liquidity.max_liquidity_msat(), 300);

let liquidity = scorer.channel_liquidities.get(&42).unwrap()
.as_directed(&target, &source, 1_000, decay_params);
.as_directed(&target, &source, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 700);
assert_eq!(liquidity.max_liquidity_msat(), 1_000);

// Reset from target to source.
scorer.channel_liquidities.get_mut(&42).unwrap()
.as_directed_mut(&target, &source, 1_000, decay_params)
.as_directed_mut(&target, &source, 1_000)
.set_max_liquidity_msat(600, Duration::ZERO);

let liquidity = scorer.channel_liquidities.get(&42).unwrap()
.as_directed(&source, &target, 1_000, decay_params);
.as_directed(&source, &target, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 400);
assert_eq!(liquidity.max_liquidity_msat(), 1_000);

let liquidity = scorer.channel_liquidities.get(&42).unwrap()
.as_directed(&target, &source, 1_000, decay_params);
.as_directed(&target, &source, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 0);
assert_eq!(liquidity.max_liquidity_msat(), 600);
}
Expand Down Expand Up @@ -2971,47 +2952,6 @@ mod tests {
assert_eq!(scorer.channel_penalty_msat(&candidate, usage, &params), u64::max_value());
}

#[test]
fn decays_liquidity_bounds_without_shift_overflow() {
let logger = TestLogger::new();
let network_graph = network_graph(&logger);
let params = ProbabilisticScoringFeeParameters {
liquidity_penalty_multiplier_msat: 1_000,
..ProbabilisticScoringFeeParameters::zero_penalty()
};
let decay_params = ProbabilisticScoringDecayParameters {
liquidity_offset_half_life: Duration::from_secs(10),
..ProbabilisticScoringDecayParameters::default()
};
let mut scorer = ProbabilisticScorer::new(decay_params, &network_graph, &logger);
let source = source_node_id();
let usage = ChannelUsage {
amount_msat: 256,
inflight_htlc_msat: 0,
effective_capacity: EffectiveCapacity::Total { capacity_msat: 1_024, htlc_maximum_msat: 1_000 },
};
let channel = network_graph.read_only().channel(42).unwrap().to_owned();
let (info, _) = channel.as_directed_from(&source).unwrap();
let candidate = CandidateRouteHop::PublicHop {
info,
short_channel_id: 42,
};
assert_eq!(scorer.channel_penalty_msat(&candidate, usage, &params), 125);

scorer.payment_path_failed(&payment_path_for_amount(512), 42, Duration::ZERO);
assert_eq!(scorer.channel_penalty_msat(&candidate, usage, &params), 281);

// An unchecked right shift 64 bits or more in DirectedChannelLiquidity::decayed_offset_msat
// would cause an overflow.
SinceEpoch::advance(Duration::from_secs(10 * 64));
scorer.time_passed(Duration::from_secs(10 * 64));
assert_eq!(scorer.channel_penalty_msat(&candidate, usage, &params), 125);

SinceEpoch::advance(Duration::from_secs(10));
scorer.time_passed(Duration::from_secs(10 * 65));
assert_eq!(scorer.channel_penalty_msat(&candidate, usage, &params), 125);
}

#[test]
fn restricts_liquidity_bounds_after_decay() {
let logger = TestLogger::new();
Expand Down Expand Up @@ -3660,7 +3600,7 @@ mod tests {
scorer.payment_path_failed(&path, 43, Duration::ZERO);

let liquidity = scorer.channel_liquidities.get(&42).unwrap()
.as_directed(&source, &target, 1_000, decay_params);
.as_directed(&source, &target, 1_000);
assert_eq!(liquidity.min_liquidity_msat(), 256);
assert_eq!(liquidity.max_liquidity_msat(), 768);
}
Expand Down

0 comments on commit 18b4231

Please sign in to comment.