Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions lightning-liquidity/src/lsps4/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

//! Contains the main LSPS4 server-side object, [`LSPS4ServiceHandler`].

use crate::events::{LiquidityEvent, EventQueue};
use crate::events::{EventQueue, LiquidityEvent};
use crate::lsps0::ser::{
LSPSMessage, LSPSProtocolMessageHandler, LSPSRequestId, LSPSResponseError,
JSONRPC_INTERNAL_ERROR_ERROR_CODE, JSONRPC_INTERNAL_ERROR_ERROR_MESSAGE,
Expand Down Expand Up @@ -101,6 +101,9 @@ where
scid_store: ScidStore<L, K>,
htlc_store: HTLCStore<L, K>,
connected_peers: RwLock<HashSet<PublicKey>>,
/// Peers for which an OpenChannel event has been emitted but channel_ready has not
/// yet fired. Used to prevent duplicate OpenChannel emissions from the timer.
pending_channel_opens: RwLock<HashSet<PublicKey>>,
config: LSPS4ServiceConfig,
}

Expand Down Expand Up @@ -128,6 +131,7 @@ where
config,
logger,
connected_peers: RwLock::new(HashSet::new()),
pending_channel_opens: RwLock::new(HashSet::new()),
})
}

Expand Down Expand Up @@ -183,7 +187,7 @@ where
// peer_connected() will find this HTLC in the store and process it.
// The process_pending_htlcs timer also serves as a fallback.
let mut event_queue_notifier = self.pending_events.notifier();
event_queue_notifier.enqueue(crate::events::LiquidityEvent::LSPS4Service(LSPS4ServiceEvent::SendWebhook {
event_queue_notifier.enqueue(LiquidityEvent::LSPS4Service(LSPS4ServiceEvent::SendWebhook {
counterparty_node_id: counterparty_node_id.clone(),
payment_hash,
}));
Expand Down Expand Up @@ -275,6 +279,8 @@ where
pub fn channel_ready(
&self, counterparty_node_id: &PublicKey,
) -> Result<(), APIError> {
self.pending_channel_opens.write().unwrap().remove(counterparty_node_id);

let is_connected = self.is_peer_connected(counterparty_node_id);

log_info!(
Expand Down Expand Up @@ -497,6 +503,29 @@ where
htlc.expected_outbound_amount_msat()
);
}

// Re-send webhook so the peer wakes up and reconnects. Use the first
// HTLC's payment_hash — the peer will reconnect and peer_connected /
// process_pending_htlcs will forward all queued HTLCs. If the peer
// disconnects again before channels become usable, this fires again,
// creating a retry loop bounded by the HTLC expiry threshold (45s).
let payment_hash = pending_htlcs[0].payment_hash();
log_info!(
self.logger,
"[LSPS4] peer_disconnected: re-sending webhook for {} orphaned HTLCs, peer: {}, payment_hash: {}",
pending_htlcs.len(),
counterparty_node_id,
payment_hash
);
let mut event_queue_notifier = self.pending_events.notifier();
event_queue_notifier.enqueue(
LiquidityEvent::LSPS4Service(
LSPS4ServiceEvent::SendWebhook {
counterparty_node_id: counterparty_node_id.clone(),
payment_hash,
},
),
);
}
}

Expand Down Expand Up @@ -524,14 +553,16 @@ where
node_id
);
let actions = self.calculate_htlc_actions_for_peer(node_id, htlcs);
if actions.new_channel_needed_msat.is_some() {
// A channel open is already in flight from htlc_intercepted or
// peer_connected. Skip — channel_ready will handle forwarding
// once the new channel is established.
if actions.new_channel_needed_msat.is_some()
&& self.pending_channel_opens.read().unwrap().contains(&node_id)
{
// A channel open was already emitted (by htlc_intercepted or
// peer_connected) and channel_ready hasn't fired yet. Skip to
// avoid opening a duplicate channel.
log_info!(
self.logger,
"[LSPS4] process_pending_htlcs: peer {} needs a new channel, \
skipping (channel open already in flight)",
"[LSPS4] process_pending_htlcs: peer {} needs a new channel \
but one is already being opened, skipping",
node_id
);
continue;
Expand Down Expand Up @@ -827,8 +858,9 @@ where
channel_size_msat
);

self.pending_channel_opens.write().unwrap().insert(their_node_id);
let mut event_queue_notifier = self.pending_events.notifier();
event_queue_notifier.enqueue(crate::events::LiquidityEvent::LSPS4Service(LSPS4ServiceEvent::OpenChannel {
event_queue_notifier.enqueue(LiquidityEvent::LSPS4Service(LSPS4ServiceEvent::OpenChannel {
their_network_key: their_node_id,
amt_to_forward_msat: channel_size_msat,
channel_count: actions.channel_count,
Expand Down
Loading