Skip to content

Commit

Permalink
Use Duration based time info in scoring rather than Time
Browse files Browse the repository at this point in the history
In the coming commits, the `T: Time` bound on `ProbabilisticScorer`
will be removed. In order to enable that, we need to switch over to
using the `ScoreUpdate`-provided current time (as a `Duration`
since the unix epoch), making the `T` bound entirely unused.
  • Loading branch information
TheBlueMatt committed Dec 13, 2023
1 parent d54c930 commit 2288842
Showing 1 changed file with 62 additions and 85 deletions.
147 changes: 62 additions & 85 deletions lightning/src/routing/scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,8 @@ where L::Target: Logger {
decay_params: ProbabilisticScoringDecayParameters,
network_graph: G,
logger: L,
channel_liquidities: HashMap<u64, ChannelLiquidity<T>>,
channel_liquidities: HashMap<u64, ChannelLiquidity>,
_unused_time: core::marker::PhantomData<T>,
}

/// Parameters for configuring [`ProbabilisticScorer`].
Expand Down Expand Up @@ -797,7 +798,7 @@ impl ProbabilisticScoringDecayParameters {
/// Direction is defined in terms of [`NodeId`] partial ordering, where the source node is the
/// first node in the ordering of the channel's counterparties. Thus, swapping the two liquidity
/// offset fields gives the opposite direction.
struct ChannelLiquidity<T: Time> {
struct ChannelLiquidity {
/// Lower channel liquidity bound in terms of an offset from zero.
min_liquidity_offset_msat: u64,

Expand All @@ -807,23 +808,22 @@ struct ChannelLiquidity<T: Time> {
min_liquidity_offset_history: HistoricalBucketRangeTracker,
max_liquidity_offset_history: HistoricalBucketRangeTracker,

/// Time when the liquidity bounds were last modified.
last_updated: T,
/// Time when the liquidity bounds were last modified as an offset since the unix epoch.
last_updated: Duration,

/// Time when the historical liquidity bounds were last modified.
offset_history_last_updated: T,
/// Time when the historical liquidity bounds were last modified as an offset against the unix
/// epoch.
offset_history_last_updated: Duration,
}

/// A snapshot of [`ChannelLiquidity`] in one direction assuming a certain channel capacity and
/// decayed with a given half life.
struct DirectedChannelLiquidity<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>, T: Time, U: Deref<Target = T>> {
/// A snapshot of [`ChannelLiquidity`] in one direction assuming a certain channel capacity.
struct DirectedChannelLiquidity<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>, T: Deref<Target = Duration>> {
min_liquidity_offset_msat: L,
max_liquidity_offset_msat: L,
liquidity_history: HistoricalMinMaxBuckets<BRT>,
capacity_msat: u64,
last_updated: U,
offset_history_last_updated: U,
now: T,
last_updated: T,
offset_history_last_updated: T,
decay_params: ProbabilisticScoringDecayParameters,
}

Expand All @@ -836,11 +836,12 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ProbabilisticScorerU
network_graph,
logger,
channel_liquidities: HashMap::new(),
_unused_time: core::marker::PhantomData,
}
}

#[cfg(test)]
fn with_channel(mut self, short_channel_id: u64, liquidity: ChannelLiquidity<T>) -> Self {
fn with_channel(mut self, short_channel_id: u64, liquidity: ChannelLiquidity) -> Self {
assert!(self.channel_liquidities.insert(short_channel_id, liquidity).is_none());
self
}
Expand Down Expand Up @@ -993,24 +994,23 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ProbabilisticScorerU
}
}

impl<T: Time> ChannelLiquidity<T> {
#[inline]
fn new() -> Self {
impl ChannelLiquidity {
fn new(last_updated: Duration) -> Self {
Self {
min_liquidity_offset_msat: 0,
max_liquidity_offset_msat: 0,
min_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
max_liquidity_offset_history: HistoricalBucketRangeTracker::new(),
last_updated: T::now(),
offset_history_last_updated: T::now(),
last_updated,
offset_history_last_updated: last_updated,
}
}

/// Returns a view of the channel liquidity directed from `source` to `target` assuming
/// `capacity_msat`.
fn as_directed(
&self, source: &NodeId, target: &NodeId, capacity_msat: u64, decay_params: ProbabilisticScoringDecayParameters
) -> DirectedChannelLiquidity<&u64, &HistoricalBucketRangeTracker, T, &T> {
) -> DirectedChannelLiquidity<&u64, &HistoricalBucketRangeTracker, &Duration> {
let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) =
if source < target {
(&self.min_liquidity_offset_msat, &self.max_liquidity_offset_msat,
Expand All @@ -1030,7 +1030,6 @@ impl<T: Time> ChannelLiquidity<T> {
capacity_msat,
last_updated: &self.last_updated,
offset_history_last_updated: &self.offset_history_last_updated,
now: T::now(),
decay_params: decay_params,
}
}
Expand All @@ -1039,7 +1038,7 @@ impl<T: Time> ChannelLiquidity<T> {
/// `capacity_msat`.
fn as_directed_mut(
&mut self, source: &NodeId, target: &NodeId, capacity_msat: u64, decay_params: ProbabilisticScoringDecayParameters
) -> DirectedChannelLiquidity<&mut u64, &mut HistoricalBucketRangeTracker, T, &mut T> {
) -> DirectedChannelLiquidity<&mut u64, &mut HistoricalBucketRangeTracker, &mut Duration> {
let (min_liquidity_offset_msat, max_liquidity_offset_msat, min_liquidity_offset_history, max_liquidity_offset_history) =
if source < target {
(&mut self.min_liquidity_offset_msat, &mut self.max_liquidity_offset_msat,
Expand All @@ -1059,7 +1058,6 @@ impl<T: Time> ChannelLiquidity<T> {
capacity_msat,
last_updated: &mut self.last_updated,
offset_history_last_updated: &mut self.offset_history_last_updated,
now: T::now(),
decay_params: decay_params,
}
}
Expand All @@ -1070,7 +1068,7 @@ impl<T: Time> ChannelLiquidity<T> {
) -> u64 {
let half_life = decay_params.liquidity_offset_half_life.as_secs_f64();
if half_life != 0.0 {
let elapsed_time = T::now().duration_since(self.last_updated).as_secs_f64();
let elapsed_time = duration_since_epoch.saturating_sub(self.last_updated).as_secs_f64();
((offset as f64) * powf64(0.5, elapsed_time / half_life)) as u64
} else {
0
Expand Down Expand Up @@ -1159,7 +1157,8 @@ fn success_probability(
(numerator, denominator)
}

impl<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>, T: Time, U: Deref<Target = T>> DirectedChannelLiquidity< L, BRT, T, U> {
impl<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>, T: Deref<Target = Duration>>
DirectedChannelLiquidity< L, BRT, T> {
/// Returns a liquidity penalty for routing the given HTLC `amount_msat` through the channel in
/// this direction.
fn penalty_msat(&self, amount_msat: u64, score_params: &ProbabilisticScoringFeeParameters) -> u64 {
Expand Down Expand Up @@ -1267,7 +1266,8 @@ impl<L: Deref<Target = u64>, BRT: Deref<Target = HistoricalBucketRangeTracker>,
}
}

impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTracker>, T: Time, U: DerefMut<Target = T>> DirectedChannelLiquidity<L, BRT, T, U> {
impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTracker>, T: DerefMut<Target = Duration>>
DirectedChannelLiquidity<L, BRT, T> {
/// Adjusts the channel liquidity balance bounds when failing to route `amount_msat`.
fn failed_at_channel<Log: Deref>(
&mut self, amount_msat: u64, duration_since_epoch: Duration, chan_descr: fmt::Arguments, logger: &Log
Expand Down Expand Up @@ -1313,7 +1313,9 @@ impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTrac
/// state"), we allow the caller to set an offset applied to our liquidity bounds which
/// represents the amount of the successful payment we just made.
fn update_history_buckets(&mut self, bucket_offset_msat: u64, duration_since_epoch: Duration) {
let half_lives = self.now.duration_since(*self.offset_history_last_updated).as_secs()
let half_lives =
duration_since_epoch.checked_sub(*self.offset_history_last_updated)
.unwrap_or(Duration::ZERO).as_secs()
.checked_div(self.decay_params.historical_no_updates_half_life.as_secs())
.map(|v| v.try_into().unwrap_or(u32::max_value())).unwrap_or(u32::max_value());
self.liquidity_history.min_liquidity_offset_history.time_decay_data(half_lives);
Expand All @@ -1327,29 +1329,25 @@ impl<L: DerefMut<Target = u64>, BRT: DerefMut<Target = HistoricalBucketRangeTrac
self.liquidity_history.max_liquidity_offset_history.track_datapoint(
max_liquidity_offset_msat.saturating_sub(bucket_offset_msat), self.capacity_msat
);
*self.offset_history_last_updated = self.now;
*self.offset_history_last_updated = duration_since_epoch;
}

/// Adjusts the lower bound of the channel liquidity balance in this direction.
fn set_min_liquidity_msat(&mut self, amount_msat: u64, duration_since_epoch: Duration) {
*self.min_liquidity_offset_msat = amount_msat;
*self.max_liquidity_offset_msat = if amount_msat > self.max_liquidity_msat() {
0
} else {
self.decayed_offset_msat(*self.max_liquidity_offset_msat)
};
*self.last_updated = self.now;
if amount_msat > self.max_liquidity_msat() {
*self.max_liquidity_offset_msat = 0;
}
*self.last_updated = duration_since_epoch;
}

/// Adjusts the upper bound of the channel liquidity balance in this direction.
fn set_max_liquidity_msat(&mut self, amount_msat: u64, duration_since_epoch: Duration) {
*self.max_liquidity_offset_msat = self.capacity_msat.checked_sub(amount_msat).unwrap_or(0);
*self.min_liquidity_offset_msat = if amount_msat < self.min_liquidity_msat() {
0
} else {
self.decayed_offset_msat(*self.min_liquidity_offset_msat)
};
*self.last_updated = self.now;
if amount_msat < *self.min_liquidity_offset_msat {
*self.min_liquidity_offset_msat = 0;
}
*self.last_updated = duration_since_epoch;
}
}

Expand Down Expand Up @@ -1396,7 +1394,7 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ScoreLookUp for Prob
let capacity_msat = usage.effective_capacity.as_msat();
self.channel_liquidities
.get(&scid)
.unwrap_or(&ChannelLiquidity::new())
.unwrap_or(&ChannelLiquidity::new(Duration::ZERO))
.as_directed(&source, &target, capacity_msat, self.decay_params)
.penalty_msat(amount_msat, score_params)
.saturating_add(anti_probing_penalty_msat)
Expand Down Expand Up @@ -1426,14 +1424,14 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ScoreUpdate for Prob
if at_failed_channel {
self.channel_liquidities
.entry(hop.short_channel_id)
.or_insert_with(ChannelLiquidity::new)
.or_insert_with(|| ChannelLiquidity::new(duration_since_epoch))
.as_directed_mut(source, &target, capacity_msat, self.decay_params)
.failed_at_channel(amount_msat, duration_since_epoch,
format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger);
} else {
self.channel_liquidities
.entry(hop.short_channel_id)
.or_insert_with(ChannelLiquidity::new)
.or_insert_with(|| ChannelLiquidity::new(duration_since_epoch))
.as_directed_mut(source, &target, capacity_msat, self.decay_params)
.failed_downstream(amount_msat, duration_since_epoch,
format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger);
Expand Down Expand Up @@ -1462,7 +1460,7 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ScoreUpdate for Prob
let capacity_msat = channel.effective_capacity().as_msat();
self.channel_liquidities
.entry(hop.short_channel_id)
.or_insert_with(ChannelLiquidity::new)
.or_insert_with(|| ChannelLiquidity::new(duration_since_epoch))
.as_directed_mut(source, &target, capacity_msat, self.decay_params)
.successful(amount_msat, duration_since_epoch,
format_args!("SCID {}, towards {:?}", hop.short_channel_id, target), &self.logger);
Expand All @@ -1488,10 +1486,10 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ScoreUpdate for Prob
liquidity.decayed_offset(liquidity.min_liquidity_offset_msat, duration_since_epoch, decay_params);
liquidity.max_liquidity_offset_msat =
liquidity.decayed_offset(liquidity.max_liquidity_offset_msat, duration_since_epoch, decay_params);
liquidity.last_updated = T::now();
liquidity.last_updated = duration_since_epoch;

let elapsed_time =
T::now().duration_since(liquidity.offset_history_last_updated);
duration_since_epoch.saturating_sub(liquidity.offset_history_last_updated);
if elapsed_time > decay_params.historical_no_updates_half_life {
let half_life = decay_params.historical_no_updates_half_life.as_secs_f64();
if half_life != 0.0 {
Expand All @@ -1502,7 +1500,7 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref, T: Time> ScoreUpdate for Prob
for bucket in liquidity.max_liquidity_offset_history.buckets.iter_mut() {
*bucket = ((*bucket as u64) * 1024 / divisor) as u16;
}
liquidity.offset_history_last_updated = T::now();
liquidity.offset_history_last_updated = duration_since_epoch;
}
}
liquidity.min_liquidity_offset_msat != 0 || liquidity.max_liquidity_offset_msat != 0 ||
Expand Down Expand Up @@ -2125,31 +2123,29 @@ ReadableArgs<(ProbabilisticScoringDecayParameters, G, L)> for ProbabilisticScore
network_graph,
logger,
channel_liquidities,
_unused_time: core::marker::PhantomData,
})
}
}

impl<T: Time> Writeable for ChannelLiquidity<T> {
impl Writeable for ChannelLiquidity {
#[inline]
fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
let offset_history_duration_since_epoch =
T::duration_since_epoch() - self.offset_history_last_updated.elapsed();
let duration_since_epoch = T::duration_since_epoch() - self.last_updated.elapsed();
write_tlv_fields!(w, {
(0, self.min_liquidity_offset_msat, required),
// 1 was the min_liquidity_offset_history in octile form
(2, self.max_liquidity_offset_msat, required),
// 3 was the max_liquidity_offset_history in octile form
(4, duration_since_epoch, required),
(4, self.last_updated, required),
(5, Some(self.min_liquidity_offset_history), option),
(7, Some(self.max_liquidity_offset_history), option),
(9, offset_history_duration_since_epoch, required),
(9, self.offset_history_last_updated, required),
});
Ok(())
}
}

impl<T: Time> Readable for ChannelLiquidity<T> {
impl Readable for ChannelLiquidity {
#[inline]
fn read<R: Read>(r: &mut R) -> Result<Self, DecodeError> {
let mut min_liquidity_offset_msat = 0;
Expand All @@ -2158,36 +2154,18 @@ impl<T: Time> Readable for ChannelLiquidity<T> {
let mut legacy_max_liq_offset_history: Option<LegacyHistoricalBucketRangeTracker> = None;
let mut min_liquidity_offset_history: Option<HistoricalBucketRangeTracker> = None;
let mut max_liquidity_offset_history: Option<HistoricalBucketRangeTracker> = None;
let mut duration_since_epoch = Duration::from_secs(0);
let mut offset_history_duration_since_epoch = None;
let mut last_updated = Duration::from_secs(0);
let mut offset_history_last_updated = None;
read_tlv_fields!(r, {
(0, min_liquidity_offset_msat, required),
(1, legacy_min_liq_offset_history, option),
(2, max_liquidity_offset_msat, required),
(3, legacy_max_liq_offset_history, option),
(4, duration_since_epoch, required),
(4, last_updated, required),
(5, min_liquidity_offset_history, option),
(7, max_liquidity_offset_history, option),
(9, offset_history_duration_since_epoch, option),
(9, offset_history_last_updated, option),
});
// On rust prior to 1.60 `Instant::duration_since` will panic if time goes backwards.
// We write `last_updated` as wallclock time even though its ultimately an `Instant` (which
// is a time from a monotonic clock usually represented as an offset against boot time).
// Thus, we have to construct an `Instant` by subtracting the difference in wallclock time
// from the one that was written. However, because `Instant` can panic if we construct one
// in the future, we must handle wallclock time jumping backwards, which we do by simply
// using `Instant::now()` in that case.
let wall_clock_now = T::duration_since_epoch();
let now = T::now();
let last_updated = if wall_clock_now > duration_since_epoch {
now - (wall_clock_now - duration_since_epoch)
} else { now };

let offset_history_duration_since_epoch =
offset_history_duration_since_epoch.unwrap_or(duration_since_epoch);
let offset_history_last_updated = if wall_clock_now > offset_history_duration_since_epoch {
now - (wall_clock_now - offset_history_duration_since_epoch)
} else { now };

if min_liquidity_offset_history.is_none() {
if let Some(legacy_buckets) = legacy_min_liq_offset_history {
Expand All @@ -2209,7 +2187,7 @@ impl<T: Time> Readable for ChannelLiquidity<T> {
min_liquidity_offset_history: min_liquidity_offset_history.unwrap(),
max_liquidity_offset_history: max_liquidity_offset_history.unwrap(),
last_updated,
offset_history_last_updated,
offset_history_last_updated: offset_history_last_updated.unwrap_or(last_updated),
})
}
}
Expand All @@ -2219,7 +2197,6 @@ mod tests {
use super::{ChannelLiquidity, HistoricalBucketRangeTracker, ProbabilisticScoringFeeParameters, ProbabilisticScoringDecayParameters, ProbabilisticScorerUsingTime};
use crate::blinded_path::{BlindedHop, BlindedPath};
use crate::util::config::UserConfig;
use crate::util::time::Time;
use crate::util::time::tests::SinceEpoch;

use crate::ln::channelmanager;
Expand Down Expand Up @@ -2384,8 +2361,8 @@ mod tests {
#[test]
fn liquidity_bounds_directed_from_lowest_node_id() {
let logger = TestLogger::new();
let last_updated = SinceEpoch::now();
let offset_history_last_updated = SinceEpoch::now();
let last_updated = Duration::ZERO;
let offset_history_last_updated = Duration::ZERO;
let network_graph = network_graph(&logger);
let decay_params = ProbabilisticScoringDecayParameters::default();
let mut scorer = ProbabilisticScorer::new(decay_params, &network_graph, &logger)
Expand Down Expand Up @@ -2465,8 +2442,8 @@ mod tests {
#[test]
fn resets_liquidity_upper_bound_when_crossed_by_lower_bound() {
let logger = TestLogger::new();
let last_updated = SinceEpoch::now();
let offset_history_last_updated = SinceEpoch::now();
let last_updated = Duration::ZERO;
let offset_history_last_updated = Duration::ZERO;
let network_graph = network_graph(&logger);
let decay_params = ProbabilisticScoringDecayParameters::default();
let mut scorer = ProbabilisticScorer::new(decay_params, &network_graph, &logger)
Expand Down Expand Up @@ -2526,8 +2503,8 @@ mod tests {
#[test]
fn resets_liquidity_lower_bound_when_crossed_by_upper_bound() {
let logger = TestLogger::new();
let last_updated = SinceEpoch::now();
let offset_history_last_updated = SinceEpoch::now();
let last_updated = Duration::ZERO;
let offset_history_last_updated = Duration::ZERO;
let network_graph = network_graph(&logger);
let decay_params = ProbabilisticScoringDecayParameters::default();
let mut scorer = ProbabilisticScorer::new(decay_params, &network_graph, &logger)
Expand Down Expand Up @@ -2639,8 +2616,8 @@ mod tests {
#[test]
fn constant_penalty_outside_liquidity_bounds() {
let logger = TestLogger::new();
let last_updated = SinceEpoch::now();
let offset_history_last_updated = SinceEpoch::now();
let last_updated = Duration::ZERO;
let offset_history_last_updated = Duration::ZERO;
let network_graph = network_graph(&logger);
let params = ProbabilisticScoringFeeParameters {
liquidity_penalty_multiplier_msat: 1_000,
Expand Down

0 comments on commit 2288842

Please sign in to comment.