Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 57 additions & 88 deletions lightning-liquidity/src/lsps4/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ where
scid_store: ScidStore<L, K>,
htlc_store: HTLCStore<L, K>,
connected_peers: RwLock<HashSet<PublicKey>>,
pending_channel_opens: RwLock<HashSet<PublicKey>>,
config: LSPS4ServiceConfig,
}

Expand Down Expand Up @@ -128,6 +129,7 @@ where
config,
logger,
connected_peers: RwLock::new(HashSet::new()),
pending_channel_opens: RwLock::new(HashSet::new()),
})
}

Expand Down Expand Up @@ -193,7 +195,6 @@ where
.get_cm()
.list_channels_with_counterparty(&counterparty_node_id);
let any_usable = channels.iter().any(|ch| ch.is_usable);
let has_channels = !channels.is_empty();

log_info!(
self.logger,
Expand All @@ -215,40 +216,23 @@ where
);
}

if has_channels && !any_usable {
// Channels exist but none usable yet (channel_reestablish in progress).
// Defer until process_pending_htlcs picks them up.
log_info!(
self.logger,
"[LSPS4] htlc_intercepted: peer {} has {} channels but none usable, \
deferring HTLC until channel_reestablish completes. payment_hash: {}",
counterparty_node_id,
channels.len(),
payment_hash
);
self.htlc_store.insert(htlc).unwrap();
} else {
// Either channels are usable, or no channels exist (need JIT open).
// calculate_htlc_actions_for_peer handles both: forward through usable
// channels, or emit OpenChannel event when no capacity exists.
let actions = self.calculate_htlc_actions_for_peer(
counterparty_node_id,
vec![htlc.clone()],
);
let actions = self.calculate_htlc_actions_for_peer(
counterparty_node_id,
vec![htlc.clone()],
);

if actions.new_channel_needed_msat.is_some() {
self.htlc_store.insert(htlc).unwrap();
}
if actions.new_channel_needed_msat.is_some() {
self.htlc_store.insert(htlc).unwrap();
}

log_debug!(
self.logger,
"[LSPS4] htlc_intercepted: calculated actions for peer {}: {:?}",
counterparty_node_id,
actions
);
log_debug!(
self.logger,
"[LSPS4] htlc_intercepted: calculated actions for peer {}: {:?}",
counterparty_node_id,
actions
);

self.execute_htlc_actions(actions, counterparty_node_id.clone());
}
self.execute_htlc_actions(actions, counterparty_node_id.clone());
}
} else {
log_error!(
Expand All @@ -275,6 +259,7 @@ where
pub fn channel_ready(
&self, counterparty_node_id: &PublicKey,
) -> Result<(), APIError> {
self.pending_channel_opens.write().unwrap().remove(counterparty_node_id);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this never executes, the node could end up stuck indefinitely and unable to forward HTLCs.

We should probably add a timeout, for example removing it after a minute.

Also, we likely need to listen for a channel_failed (or similar) event. If the channel fails to open, we should remove it from pending_channel_opens as well.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think listening for the failure side should be sufficient or can this be stuck in between somehow?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown

@martinsaposnic martinsaposnic Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens with long lived nodes that never get the peer_disconnected but still can get a channel failure and get stuck?

let is_connected = self.is_peer_connected(counterparty_node_id);

log_info!(
Expand Down Expand Up @@ -385,28 +370,7 @@ where
}
}

if self.has_usable_channel(&counterparty_node_id) || channels.is_empty() {
// Either channels are usable (forward immediately) or no channels exist
// at all (process_htlcs_for_peer will trigger OpenChannel for JIT).
self.process_htlcs_for_peer(counterparty_node_id.clone(), htlcs);
} else {
// Channels exist but none usable yet (reestablish in progress).
// We still call process_htlcs_for_peer because calculate_htlc_actions
// skips non-usable channels. If existing capacity is insufficient, this
// will emit OpenChannel now rather than deferring to process_pending_htlcs
// (which would never open a channel). Forwards through existing channels
// will be empty since none are usable, so no premature forwarding occurs.
// The actual forwards happen later via channel_ready or process_pending_htlcs
// once reestablish completes.
log_info!(
self.logger,
"[LSPS4] peer_connected: {} has {} pending HTLCs, channels not yet usable \
(reestablish in progress) - checking if new channel needed",
counterparty_node_id,
htlcs.len()
);
self.process_htlcs_for_peer(counterparty_node_id.clone(), htlcs);
}
self.process_htlcs_for_peer(counterparty_node_id.clone(), htlcs);

log_info!(
self.logger,
Expand Down Expand Up @@ -478,6 +442,7 @@ where

/// Will update the set of connected peers
pub fn peer_disconnected(&self, counterparty_node_id: &PublicKey) {
self.pending_channel_opens.write().unwrap().remove(counterparty_node_id);
let (was_present, remaining_count) = {
let mut peers = self.connected_peers.write().unwrap();
let was = peers.remove(counterparty_node_id);
Expand Down Expand Up @@ -524,27 +489,14 @@ where
}

if self.has_usable_channel(&node_id) {
// Channel reestablish completed — forward the deferred HTLCs.
// Channel reestablish completed — attempt to forward the deferred HTLCs.
log_info!(
self.logger,
"[LSPS4] process_pending_htlcs: forwarding {} HTLCs for peer {} (channel now usable)",
htlcs.len(),
node_id
);
let actions = self.calculate_htlc_actions_for_peer(node_id, htlcs);
if actions.new_channel_needed_msat.is_some() {
// A channel open is already in flight from htlc_intercepted or
// peer_connected. Skip — channel_ready will handle forwarding
// once the new channel is established.
log_info!(
self.logger,
"[LSPS4] process_pending_htlcs: peer {} needs a new channel, \
skipping (channel open already in flight)",
node_id
);
continue;
}
self.execute_htlc_actions(actions, node_id);
self.process_htlcs_for_peer(node_id, htlcs);
}
}
}
Expand Down Expand Up @@ -633,9 +585,15 @@ where
self.channel_manager.get_cm().list_channels_with_counterparty(&their_node_id);
let channel_count = channels.len();

// Include channels that finished opening (is_channel_ready) in capacity
// decisions, even if still reestablishing (is_usable=false). This prevents
// spurious channel opens during reestablish. Channels still opening
// (is_channel_ready=false) are excluded: they report outbound_capacity_msat
// but reject forwards, consuming the InterceptId and losing the HTLC.
// execute_htlc_actions checks usability before forwarding.
let mut channel_capacity_map: HashMap<ChannelId, u64> = new_hash_map();
for channel in &channels {
if !channel.is_usable {
if !channel.is_channel_ready {
continue;
}
channel_capacity_map.insert(channel.channel_id, channel.outbound_capacity_msat);
Expand Down Expand Up @@ -750,12 +708,14 @@ where
) {
// Execute forwards
for forward_action in actions.forwards {
// Re-check peer liveness right before forwarding to narrow the
// TOCTOU window between the usability check and the actual forward.
if !self.is_peer_connected(&their_node_id) {
// Re-check channel usability right before forwarding to narrow the
// TOCTOU window. Covers both peer disconnect and disconnect+reconnect
// (where the peer is connected but the channel is still reestablishing).
if !self.has_usable_channel(&their_node_id) {
log_info!(
self.logger,
"[LSPS4] execute_htlc_actions: peer {} disconnected before forward, skipping HTLC {:?} (will retry on next timer tick). payment_hash: {}",
"[LSPS4] execute_htlc_actions: no usable channel for peer {}, \
skipping HTLC {:?} (will retry on next timer tick). payment_hash: {}",
their_node_id,
forward_action.htlc.id(),
forward_action.htlc.payment_hash()
Expand Down Expand Up @@ -826,21 +786,30 @@ where
}
}

// Handle new channel opening
// Handle new channel opening — skip if one is already in flight.
if let Some(channel_size_msat) = actions.new_channel_needed_msat {
log_info!(
self.logger,
"Need a new channel with peer {} for {}msat to forward HTLCs",
their_node_id,
channel_size_msat
);

let mut event_queue_notifier = self.pending_events.notifier();
event_queue_notifier.enqueue(crate::events::LiquidityEvent::LSPS4Service(LSPS4ServiceEvent::OpenChannel {
their_network_key: their_node_id,
amt_to_forward_msat: channel_size_msat,
channel_count: actions.channel_count,
}));
if self.pending_channel_opens.read().unwrap().contains(&their_node_id) {
log_info!(
self.logger,
"[LSPS4] execute_htlc_actions: peer {} needs a new channel but one is \
already opening, skipping duplicate OpenChannel",
their_node_id
);
} else {
log_info!(
self.logger,
"Need a new channel with peer {} for {}msat to forward HTLCs",
their_node_id,
channel_size_msat
);
self.pending_channel_opens.write().unwrap().insert(their_node_id);
let mut event_queue_notifier = self.pending_events.notifier();
event_queue_notifier.enqueue(crate::events::LiquidityEvent::LSPS4Service(LSPS4ServiceEvent::OpenChannel {
their_network_key: their_node_id,
amt_to_forward_msat: channel_size_msat,
channel_count: actions.channel_count,
}));
}
}
}

Expand Down
Loading