diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index e4fd3475024..cfa91b34e43 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -244,16 +244,14 @@ struct LatestMonitorState { /// which we haven't yet completed. We're allowed to reload with those as well, at least until /// they're completed. persisted_monitor_id: u64, - /// The latest serialized `ChannelMonitor` that we told LDK we persisted. - persisted_monitor: Vec, - /// A set of (monitor id, serialized `ChannelMonitor`)s which we're currently "persisting", + /// The latest `ChannelMonitor` that we told LDK we persisted, if any. + persisted_monitor: Option>, + /// A set of (monitor id, `ChannelMonitor`)s which we're currently "persisting", /// from LDK's perspective. - pending_monitors: Vec<(u64, Vec)>, + pending_monitors: Vec<(u64, channelmonitor::ChannelMonitor)>, } struct TestChainMonitor { - pub logger: Arc, - pub keys: Arc, pub persister: Arc, pub chain_monitor: Arc< chainmonitor::ChainMonitor< @@ -277,15 +275,13 @@ impl TestChainMonitor { chain_monitor: Arc::new(chainmonitor::ChainMonitor::new( None, broadcaster, - logger.clone(), + logger, feeest, Arc::clone(&persister), Arc::clone(&keys), keys.get_peer_storage_key(), false, )), - logger, - keys, persister, latest_monitors: Mutex::new(new_hash_map()), } @@ -295,20 +291,19 @@ impl chain::Watch for TestChainMonitor { fn watch_channel( &self, channel_id: ChannelId, monitor: channelmonitor::ChannelMonitor, ) -> Result { - let mut ser = VecWriter(Vec::new()); - monitor.write(&mut ser).unwrap(); let monitor_id = monitor.get_latest_update_id(); let res = self.chain_monitor.watch_channel(channel_id, monitor); + let mon = self.persister.take_latest_monitor(&channel_id); let state = match res { Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState { persisted_monitor_id: monitor_id, - persisted_monitor: ser.0, + persisted_monitor: Some(mon), pending_monitors: Vec::new(), }, Ok(chain::ChannelMonitorUpdateStatus::InProgress) => LatestMonitorState { persisted_monitor_id: monitor_id, - persisted_monitor: Vec::new(), - pending_monitors: vec![(monitor_id, ser.0)], + persisted_monitor: None, + pending_monitors: vec![(monitor_id, mon)], }, Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(), Err(()) => panic!(), @@ -324,37 +319,15 @@ impl chain::Watch for TestChainMonitor { ) -> chain::ChannelMonitorUpdateStatus { let mut map_lock = self.latest_monitors.lock().unwrap(); let map_entry = map_lock.get_mut(&channel_id).expect("Didn't have monitor on update call"); - let latest_monitor_data = map_entry - .pending_monitors - .last() - .as_ref() - .map(|(_, data)| data) - .unwrap_or(&map_entry.persisted_monitor); - let deserialized_monitor = - <(BlockHash, channelmonitor::ChannelMonitor)>::read( - &mut &latest_monitor_data[..], - (&*self.keys, &*self.keys), - ) - .unwrap() - .1; - deserialized_monitor - .update_monitor( - update, - &&TestBroadcaster { txn_broadcasted: RefCell::new(Vec::new()) }, - &&FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }, - &self.logger, - ) - .unwrap(); - let mut ser = VecWriter(Vec::new()); - deserialized_monitor.write(&mut ser).unwrap(); let res = self.chain_monitor.update_channel(channel_id, update); + let mon = self.persister.take_latest_monitor(&channel_id); match res { chain::ChannelMonitorUpdateStatus::Completed => { map_entry.persisted_monitor_id = update.update_id; - map_entry.persisted_monitor = ser.0; + map_entry.persisted_monitor = Some(mon); }, chain::ChannelMonitorUpdateStatus::InProgress => { - map_entry.pending_monitors.push((update.update_id, ser.0)); + map_entry.pending_monitors.push((update.update_id, mon)); }, chain::ChannelMonitorUpdateStatus::UnrecoverableError => panic!(), } @@ -864,9 +837,8 @@ fn assert_action_timeout_awaiting_response(action: &msgs::ErrorAction) { #[inline] pub fn do_test( - data: &[u8], underlying_out: Out, anchors: bool, + data: &[u8], out: Out, anchors: bool, ) { - let out = SearchingOutput::new(underlying_out); let broadcast_a = Arc::new(TestBroadcaster { txn_broadcasted: RefCell::new(Vec::new()) }); let broadcast_b = Arc::new(TestBroadcaster { txn_broadcasted: RefCell::new(Vec::new()) }); let broadcast_c = Arc::new(TestBroadcaster { txn_broadcasted: RefCell::new(Vec::new()) }); @@ -915,9 +887,7 @@ pub fn do_test( $broadcaster.clone(), logger.clone(), $fee_estimator.clone(), - Arc::new(TestPersister { - update_ret: Mutex::new(mon_style[$node_id as usize].borrow().clone()), - }), + Arc::new(TestPersister::new(mon_style[$node_id as usize].borrow().clone())), Arc::clone(&keys_manager), )); @@ -967,9 +937,7 @@ pub fn do_test( broadcaster.clone(), logger.clone(), Arc::clone(fee_estimator), - Arc::new(TestPersister { - update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed), - }), + Arc::new(TestPersister::new(ChannelMonitorUpdateStatus::Completed)), Arc::clone(keys), )); @@ -984,30 +952,35 @@ pub fn do_test( let mut monitors = new_hash_map(); let mut old_monitors = old_monitors.latest_monitors.lock().unwrap(); for (channel_id, mut prev_state) in old_monitors.drain() { - let (mon_id, serialized_mon) = if use_old_mons % 3 == 0 { + let old_mon = + prev_state.persisted_monitor.map(|m| (prev_state.persisted_monitor_id, m)); + let (mon_id, mon) = if use_old_mons % 3 == 0 { // Reload with the oldest `ChannelMonitor` (the one that we already told // `ChannelManager` we finished persisting). - (prev_state.persisted_monitor_id, prev_state.persisted_monitor) + old_mon.expect("no persisted monitor to reload") } else if use_old_mons % 3 == 1 { // Reload with the second-oldest `ChannelMonitor` - let old_mon = (prev_state.persisted_monitor_id, prev_state.persisted_monitor); - prev_state.pending_monitors.drain(..).next().unwrap_or(old_mon) + prev_state.pending_monitors.drain(..).next().or(old_mon) + .expect("no monitor to reload") } else { // Reload with the newest `ChannelMonitor` - let old_mon = (prev_state.persisted_monitor_id, prev_state.persisted_monitor); - prev_state.pending_monitors.pop().unwrap_or(old_mon) + prev_state.pending_monitors.pop().or(old_mon) + .expect("no monitor to reload") }; // Use a different value of `use_old_mons` if we have another monitor (only for node B) // by shifting `use_old_mons` one in base-3. use_old_mons /= 3; - let mon = <(BlockHash, ChannelMonitor)>::read( - &mut &serialized_mon[..], + // Serialize and deserialize the monitor to verify round-trip correctness. + let mut ser = VecWriter(Vec::new()); + mon.write(&mut ser).unwrap(); + let (_, deserialized_mon) = <(BlockHash, ChannelMonitor)>::read( + &mut &ser.0[..], (&**keys, &**keys), ) .expect("Failed to read monitor"); - monitors.insert(channel_id, mon.1); + monitors.insert(channel_id, deserialized_mon); // Update the latest `ChannelMonitor` state to match what we just told LDK. - prev_state.persisted_monitor = serialized_mon; + prev_state.persisted_monitor = Some(mon); prev_state.persisted_monitor_id = mon_id; // Wipe any `ChannelMonitor`s which we never told LDK we finished persisting, // considering them discarded. LDK should replay these for us as they're stored in @@ -1054,7 +1027,7 @@ pub fn do_test( $monitor.chain_monitor.channel_monitor_updated(*channel_id, id).unwrap(); if id >= state.persisted_monitor_id { state.persisted_monitor_id = id; - state.persisted_monitor = data; + state.persisted_monitor = Some(data); } } } @@ -1793,11 +1766,7 @@ pub fn do_test( // Can be generated as a result of calling `timer_tick_occurred` enough // times while peers are disconnected }, - _ => if out.may_fail.load(atomic::Ordering::Acquire) { - return; - } else { - panic!("Unhandled message event {:?}", event) - }, + _ => panic!("Unhandled message event {:?}", event), } if $limit_events != ProcessMessages::AllMessages { break; @@ -1837,13 +1806,7 @@ pub fn do_test( MessageSendEvent::HandleError { ref action, .. } => { assert_action_timeout_awaiting_response(action); }, - _ => { - if out.may_fail.load(atomic::Ordering::Acquire) { - return; - } else { - panic!("Unhandled message event") - } - }, + _ => panic!("Unhandled message event"), } } push_excess_b_events!( @@ -1865,13 +1828,7 @@ pub fn do_test( MessageSendEvent::HandleError { ref action, .. } => { assert_action_timeout_awaiting_response(action); }, - _ => { - if out.may_fail.load(atomic::Ordering::Acquire) { - return; - } else { - panic!("Unhandled message event") - } - }, + _ => panic!("Unhandled message event"), } } push_excess_b_events!( @@ -1980,13 +1937,7 @@ pub fn do_test( }, events::Event::SpliceFailed { .. } => {}, - _ => { - if out.may_fail.load(atomic::Ordering::Acquire) { - return; - } else { - panic!("Unhandled event") - } - }, + _ => panic!("Unhandled event"), } } while nodes[$node].needs_pending_htlc_processing() { @@ -2004,10 +1955,11 @@ pub fn do_test( let complete_first = |v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None }; let complete_second = |v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None }; + type PendingMonitors = Vec<(u64, channelmonitor::ChannelMonitor)>; let complete_monitor_update = |monitor: &Arc, chan_funding, - compl_selector: &dyn Fn(&mut Vec<(u64, Vec)>) -> Option<(u64, Vec)>| { + compl_selector: &dyn Fn(&mut PendingMonitors) -> Option<(u64, channelmonitor::ChannelMonitor)>| { if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) { assert!( state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0), @@ -2017,7 +1969,7 @@ pub fn do_test( monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap(); if id > state.persisted_monitor_id { state.persisted_monitor_id = id; - state.persisted_monitor = data; + state.persisted_monitor = Some(data); } } } @@ -2033,7 +1985,7 @@ pub fn do_test( monitor.chain_monitor.channel_monitor_updated(*chan_id, id).unwrap(); if id > state.persisted_monitor_id { state.persisted_monitor_id = id; - state.persisted_monitor = data; + state.persisted_monitor = Some(data); } } } @@ -2809,27 +2761,6 @@ pub fn do_test( } } -/// We actually have different behavior based on if a certain log string has been seen, so we have -/// to do a bit more tracking. -#[derive(Clone)] -struct SearchingOutput { - output: O, - may_fail: Arc, -} -impl Output for SearchingOutput { - fn locked_write(&self, data: &[u8]) { - // We hit a design limitation of LN state machine (see CONCURRENT_INBOUND_HTLC_FEE_BUFFER) - if std::str::from_utf8(data).unwrap().contains("Outbound update_fee HTLC buffer overflow - counterparty should force-close this channel") { - self.may_fail.store(true, atomic::Ordering::Release); - } - self.output.locked_write(data) - } -} -impl SearchingOutput { - pub fn new(output: O) -> Self { - Self { output, may_fail: Arc::new(atomic::AtomicBool::new(false)) } - } -} pub fn chanmon_consistency_test(data: &[u8], out: Out) { do_test(data, out.clone(), false); diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index 3b7c99ea0b6..cfd1e213d65 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -598,7 +598,7 @@ pub fn do_test(mut data: &[u8], logger: &Arc broadcast.clone(), Arc::clone(&logger), fee_est.clone(), - Arc::new(TestPersister { update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) }), + Arc::new(TestPersister::new(ChannelMonitorUpdateStatus::Completed)), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), false, diff --git a/fuzz/src/utils/test_logger.rs b/fuzz/src/utils/test_logger.rs index 193ccc06a54..e629a7f486b 100644 --- a/fuzz/src/utils/test_logger.rs +++ b/fuzz/src/utils/test_logger.rs @@ -8,6 +8,7 @@ // licenses. use lightning::util::logger::{Logger, Record}; +use std::any::TypeId; use std::io::Write; use std::sync::{Arc, Mutex}; @@ -66,6 +67,9 @@ impl<'a, Out: Output> Write for LockedWriteAdapter<'a, Out> { impl Logger for TestLogger { fn log(&self, record: Record) { + if TypeId::of::() == TypeId::of::() { + return; + } writeln!(LockedWriteAdapter(&self.out), "{:<6} {}", self.id, record).unwrap(); } } diff --git a/fuzz/src/utils/test_persister.rs b/fuzz/src/utils/test_persister.rs index 5838a961c0f..513569f0508 100644 --- a/fuzz/src/utils/test_persister.rs +++ b/fuzz/src/utils/test_persister.rs @@ -1,25 +1,45 @@ use lightning::chain; use lightning::chain::{chainmonitor, channelmonitor}; +use lightning::ln::types::ChannelId; use lightning::util::persist::MonitorName; use lightning::util::test_channel_signer::TestChannelSigner; +use std::collections::HashMap; use std::sync::Mutex; pub struct TestPersister { pub update_ret: Mutex, + latest_monitors: Mutex>>, +} +impl TestPersister { + pub fn new(update_ret: chain::ChannelMonitorUpdateStatus) -> Self { + Self { + update_ret: Mutex::new(update_ret), + latest_monitors: Mutex::new(HashMap::new()), + } + } + + pub fn take_latest_monitor( + &self, channel_id: &ChannelId, + ) -> channelmonitor::ChannelMonitor { + self.latest_monitors.lock().unwrap().remove(channel_id) + .expect("Persister should have monitor for channel") + } } impl chainmonitor::Persist for TestPersister { fn persist_new_channel( &self, _monitor_name: MonitorName, - _data: &channelmonitor::ChannelMonitor, + data: &channelmonitor::ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { + self.latest_monitors.lock().unwrap().insert(data.channel_id(), data.clone()); self.update_ret.lock().unwrap().clone() } fn update_persisted_channel( &self, _monitor_name: MonitorName, _update: Option<&channelmonitor::ChannelMonitorUpdate>, - _data: &channelmonitor::ChannelMonitor, + data: &channelmonitor::ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { + self.latest_monitors.lock().unwrap().insert(data.channel_id(), data.clone()); self.update_ret.lock().unwrap().clone() }