diff --git a/lightning-liquidity/src/lsps4/service.rs b/lightning-liquidity/src/lsps4/service.rs index 5d9b7ee673e..c33ed110622 100644 --- a/lightning-liquidity/src/lsps4/service.rs +++ b/lightning-liquidity/src/lsps4/service.rs @@ -9,7 +9,7 @@ //! Contains the main LSPS4 server-side object, [`LSPS4ServiceHandler`]. -use crate::events::{LiquidityEvent, EventQueue}; +use crate::events::{EventQueue, LiquidityEvent}; use crate::lsps0::ser::{ LSPSMessage, LSPSProtocolMessageHandler, LSPSRequestId, LSPSResponseError, JSONRPC_INTERNAL_ERROR_ERROR_CODE, JSONRPC_INTERNAL_ERROR_ERROR_MESSAGE, @@ -101,6 +101,9 @@ where scid_store: ScidStore, htlc_store: HTLCStore, connected_peers: RwLock>, + /// Peers for which an OpenChannel event has been emitted but channel_ready has not + /// yet fired. Used to prevent duplicate OpenChannel emissions from the timer. + pending_channel_opens: RwLock>, config: LSPS4ServiceConfig, } @@ -128,6 +131,7 @@ where config, logger, connected_peers: RwLock::new(HashSet::new()), + pending_channel_opens: RwLock::new(HashSet::new()), }) } @@ -183,7 +187,7 @@ where // peer_connected() will find this HTLC in the store and process it. // The process_pending_htlcs timer also serves as a fallback. let mut event_queue_notifier = self.pending_events.notifier(); - event_queue_notifier.enqueue(crate::events::LiquidityEvent::LSPS4Service(LSPS4ServiceEvent::SendWebhook { + event_queue_notifier.enqueue(LiquidityEvent::LSPS4Service(LSPS4ServiceEvent::SendWebhook { counterparty_node_id: counterparty_node_id.clone(), payment_hash, })); @@ -275,6 +279,8 @@ where pub fn channel_ready( &self, counterparty_node_id: &PublicKey, ) -> Result<(), APIError> { + self.pending_channel_opens.write().unwrap().remove(counterparty_node_id); + let is_connected = self.is_peer_connected(counterparty_node_id); log_info!( @@ -497,6 +503,29 @@ where htlc.expected_outbound_amount_msat() ); } + + // Re-send webhook so the peer wakes up and reconnects. Use the first + // HTLC's payment_hash — the peer will reconnect and peer_connected / + // process_pending_htlcs will forward all queued HTLCs. If the peer + // disconnects again before channels become usable, this fires again, + // creating a retry loop bounded by the HTLC expiry threshold (45s). + let payment_hash = pending_htlcs[0].payment_hash(); + log_info!( + self.logger, + "[LSPS4] peer_disconnected: re-sending webhook for {} orphaned HTLCs, peer: {}, payment_hash: {}", + pending_htlcs.len(), + counterparty_node_id, + payment_hash + ); + let mut event_queue_notifier = self.pending_events.notifier(); + event_queue_notifier.enqueue( + LiquidityEvent::LSPS4Service( + LSPS4ServiceEvent::SendWebhook { + counterparty_node_id: counterparty_node_id.clone(), + payment_hash, + }, + ), + ); } } @@ -524,14 +553,16 @@ where node_id ); let actions = self.calculate_htlc_actions_for_peer(node_id, htlcs); - if actions.new_channel_needed_msat.is_some() { - // A channel open is already in flight from htlc_intercepted or - // peer_connected. Skip — channel_ready will handle forwarding - // once the new channel is established. + if actions.new_channel_needed_msat.is_some() + && self.pending_channel_opens.read().unwrap().contains(&node_id) + { + // A channel open was already emitted (by htlc_intercepted or + // peer_connected) and channel_ready hasn't fired yet. Skip to + // avoid opening a duplicate channel. log_info!( self.logger, - "[LSPS4] process_pending_htlcs: peer {} needs a new channel, \ - skipping (channel open already in flight)", + "[LSPS4] process_pending_htlcs: peer {} needs a new channel \ + but one is already being opened, skipping", node_id ); continue; @@ -827,8 +858,9 @@ where channel_size_msat ); + self.pending_channel_opens.write().unwrap().insert(their_node_id); let mut event_queue_notifier = self.pending_events.notifier(); - event_queue_notifier.enqueue(crate::events::LiquidityEvent::LSPS4Service(LSPS4ServiceEvent::OpenChannel { + event_queue_notifier.enqueue(LiquidityEvent::LSPS4Service(LSPS4ServiceEvent::OpenChannel { their_network_key: their_node_id, amt_to_forward_msat: channel_size_msat, channel_count: actions.channel_count,