diff --git a/lightning-liquidity/src/lsps4/service.rs b/lightning-liquidity/src/lsps4/service.rs index 52b23f31170..82ca99a024e 100644 --- a/lightning-liquidity/src/lsps4/service.rs +++ b/lightning-liquidity/src/lsps4/service.rs @@ -101,6 +101,7 @@ where scid_store: ScidStore, htlc_store: HTLCStore, connected_peers: RwLock>, + pending_channel_opens: RwLock>, config: LSPS4ServiceConfig, } @@ -128,6 +129,7 @@ where config, logger, connected_peers: RwLock::new(HashSet::new()), + pending_channel_opens: RwLock::new(HashSet::new()), }) } @@ -193,7 +195,6 @@ where .get_cm() .list_channels_with_counterparty(&counterparty_node_id); let any_usable = channels.iter().any(|ch| ch.is_usable); - let has_channels = !channels.is_empty(); log_info!( self.logger, @@ -215,40 +216,23 @@ where ); } - if has_channels && !any_usable { - // Channels exist but none usable yet (channel_reestablish in progress). - // Defer until process_pending_htlcs picks them up. - log_info!( - self.logger, - "[LSPS4] htlc_intercepted: peer {} has {} channels but none usable, \ - deferring HTLC until channel_reestablish completes. payment_hash: {}", - counterparty_node_id, - channels.len(), - payment_hash - ); - self.htlc_store.insert(htlc).unwrap(); - } else { - // Either channels are usable, or no channels exist (need JIT open). - // calculate_htlc_actions_for_peer handles both: forward through usable - // channels, or emit OpenChannel event when no capacity exists. - let actions = self.calculate_htlc_actions_for_peer( - counterparty_node_id, - vec![htlc.clone()], - ); + let actions = self.calculate_htlc_actions_for_peer( + counterparty_node_id, + vec![htlc.clone()], + ); - if actions.new_channel_needed_msat.is_some() { - self.htlc_store.insert(htlc).unwrap(); - } + if actions.new_channel_needed_msat.is_some() { + self.htlc_store.insert(htlc).unwrap(); + } - log_debug!( - self.logger, - "[LSPS4] htlc_intercepted: calculated actions for peer {}: {:?}", - counterparty_node_id, - actions - ); + log_debug!( + self.logger, + "[LSPS4] htlc_intercepted: calculated actions for peer {}: {:?}", + counterparty_node_id, + actions + ); - self.execute_htlc_actions(actions, counterparty_node_id.clone()); - } + self.execute_htlc_actions(actions, counterparty_node_id.clone()); } } else { log_error!( @@ -275,6 +259,7 @@ where pub fn channel_ready( &self, counterparty_node_id: &PublicKey, ) -> Result<(), APIError> { + self.pending_channel_opens.write().unwrap().remove(counterparty_node_id); let is_connected = self.is_peer_connected(counterparty_node_id); log_info!( @@ -385,28 +370,7 @@ where } } - if self.has_usable_channel(&counterparty_node_id) || channels.is_empty() { - // Either channels are usable (forward immediately) or no channels exist - // at all (process_htlcs_for_peer will trigger OpenChannel for JIT). - self.process_htlcs_for_peer(counterparty_node_id.clone(), htlcs); - } else { - // Channels exist but none usable yet (reestablish in progress). - // We still call process_htlcs_for_peer because calculate_htlc_actions - // skips non-usable channels. If existing capacity is insufficient, this - // will emit OpenChannel now rather than deferring to process_pending_htlcs - // (which would never open a channel). Forwards through existing channels - // will be empty since none are usable, so no premature forwarding occurs. - // The actual forwards happen later via channel_ready or process_pending_htlcs - // once reestablish completes. - log_info!( - self.logger, - "[LSPS4] peer_connected: {} has {} pending HTLCs, channels not yet usable \ - (reestablish in progress) - checking if new channel needed", - counterparty_node_id, - htlcs.len() - ); - self.process_htlcs_for_peer(counterparty_node_id.clone(), htlcs); - } + self.process_htlcs_for_peer(counterparty_node_id.clone(), htlcs); log_info!( self.logger, @@ -478,6 +442,7 @@ where /// Will update the set of connected peers pub fn peer_disconnected(&self, counterparty_node_id: &PublicKey) { + self.pending_channel_opens.write().unwrap().remove(counterparty_node_id); let (was_present, remaining_count) = { let mut peers = self.connected_peers.write().unwrap(); let was = peers.remove(counterparty_node_id); @@ -524,27 +489,14 @@ where } if self.has_usable_channel(&node_id) { - // Channel reestablish completed — forward the deferred HTLCs. + // Channel reestablish completed — attempt to forward the deferred HTLCs. log_info!( self.logger, "[LSPS4] process_pending_htlcs: forwarding {} HTLCs for peer {} (channel now usable)", htlcs.len(), node_id ); - let actions = self.calculate_htlc_actions_for_peer(node_id, htlcs); - if actions.new_channel_needed_msat.is_some() { - // A channel open is already in flight from htlc_intercepted or - // peer_connected. Skip — channel_ready will handle forwarding - // once the new channel is established. - log_info!( - self.logger, - "[LSPS4] process_pending_htlcs: peer {} needs a new channel, \ - skipping (channel open already in flight)", - node_id - ); - continue; - } - self.execute_htlc_actions(actions, node_id); + self.process_htlcs_for_peer(node_id, htlcs); } } } @@ -633,9 +585,15 @@ where self.channel_manager.get_cm().list_channels_with_counterparty(&their_node_id); let channel_count = channels.len(); + // Include channels that finished opening (is_channel_ready) in capacity + // decisions, even if still reestablishing (is_usable=false). This prevents + // spurious channel opens during reestablish. Channels still opening + // (is_channel_ready=false) are excluded: they report outbound_capacity_msat + // but reject forwards, consuming the InterceptId and losing the HTLC. + // execute_htlc_actions checks usability before forwarding. let mut channel_capacity_map: HashMap = new_hash_map(); for channel in &channels { - if !channel.is_usable { + if !channel.is_channel_ready { continue; } channel_capacity_map.insert(channel.channel_id, channel.outbound_capacity_msat); @@ -750,12 +708,14 @@ where ) { // Execute forwards for forward_action in actions.forwards { - // Re-check peer liveness right before forwarding to narrow the - // TOCTOU window between the usability check and the actual forward. - if !self.is_peer_connected(&their_node_id) { + // Re-check channel usability right before forwarding to narrow the + // TOCTOU window. Covers both peer disconnect and disconnect+reconnect + // (where the peer is connected but the channel is still reestablishing). + if !self.has_usable_channel(&their_node_id) { log_info!( self.logger, - "[LSPS4] execute_htlc_actions: peer {} disconnected before forward, skipping HTLC {:?} (will retry on next timer tick). payment_hash: {}", + "[LSPS4] execute_htlc_actions: no usable channel for peer {}, \ + skipping HTLC {:?} (will retry on next timer tick). payment_hash: {}", their_node_id, forward_action.htlc.id(), forward_action.htlc.payment_hash() @@ -826,21 +786,30 @@ where } } - // Handle new channel opening + // Handle new channel opening — skip if one is already in flight. if let Some(channel_size_msat) = actions.new_channel_needed_msat { - log_info!( - self.logger, - "Need a new channel with peer {} for {}msat to forward HTLCs", - their_node_id, - channel_size_msat - ); - - let mut event_queue_notifier = self.pending_events.notifier(); - event_queue_notifier.enqueue(crate::events::LiquidityEvent::LSPS4Service(LSPS4ServiceEvent::OpenChannel { - their_network_key: their_node_id, - amt_to_forward_msat: channel_size_msat, - channel_count: actions.channel_count, - })); + if self.pending_channel_opens.read().unwrap().contains(&their_node_id) { + log_info!( + self.logger, + "[LSPS4] execute_htlc_actions: peer {} needs a new channel but one is \ + already opening, skipping duplicate OpenChannel", + their_node_id + ); + } else { + log_info!( + self.logger, + "Need a new channel with peer {} for {}msat to forward HTLCs", + their_node_id, + channel_size_msat + ); + self.pending_channel_opens.write().unwrap().insert(their_node_id); + let mut event_queue_notifier = self.pending_events.notifier(); + event_queue_notifier.enqueue(crate::events::LiquidityEvent::LSPS4Service(LSPS4ServiceEvent::OpenChannel { + their_network_key: their_node_id, + amt_to_forward_msat: channel_size_msat, + channel_count: actions.channel_count, + })); + } } }