diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 59918139af5..15baa02ee5e 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -977,7 +977,9 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { lock_fundings!(nodes); let chan_a = nodes[0].list_usable_channels()[0].short_channel_id.unwrap(); + let chan_a_id = nodes[0].list_usable_channels()[0].channel_id; let chan_b = nodes[2].list_usable_channels()[0].short_channel_id.unwrap(); + let chan_b_id = nodes[2].list_usable_channels()[0].channel_id; let mut p_id: u8 = 0; let mut p_idx: u64 = 0; @@ -1038,6 +1040,10 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { if Some(*node_id) == expect_drop_id { panic!("peer_disconnected should drop msgs bound for the disconnected peer"); } *node_id == a_id }, + events::MessageSendEvent::SendStfu { ref node_id, .. } => { + if Some(*node_id) == expect_drop_id { panic!("peer_disconnected should drop msgs bound for the disconnected peer"); } + *node_id == a_id + }, events::MessageSendEvent::SendChannelReady { .. } => continue, events::MessageSendEvent::SendAnnouncementSignatures { .. } => continue, events::MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => { @@ -1100,7 +1106,7 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { for (idx, dest) in nodes.iter().enumerate() { if dest.get_our_node_id() == node_id { for update_add in update_add_htlcs.iter() { - out.locked_write(format!("Delivering update_add_htlc to node {}.\n", idx).as_bytes()); + out.locked_write(format!("Delivering update_add_htlc from node {} to node {}.\n", $node, idx).as_bytes()); if !$corrupt_forward { dest.handle_update_add_htlc(nodes[$node].get_our_node_id(), update_add); } else { @@ -1115,19 +1121,19 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { } } for update_fulfill in update_fulfill_htlcs.iter() { - out.locked_write(format!("Delivering update_fulfill_htlc to node {}.\n", idx).as_bytes()); + out.locked_write(format!("Delivering update_fulfill_htlc from node {} to node {}.\n", $node, idx).as_bytes()); dest.handle_update_fulfill_htlc(nodes[$node].get_our_node_id(), update_fulfill); } for update_fail in update_fail_htlcs.iter() { - out.locked_write(format!("Delivering update_fail_htlc to node {}.\n", idx).as_bytes()); + out.locked_write(format!("Delivering update_fail_htlc from node {} to node {}.\n", $node, idx).as_bytes()); dest.handle_update_fail_htlc(nodes[$node].get_our_node_id(), update_fail); } for update_fail_malformed in update_fail_malformed_htlcs.iter() { - out.locked_write(format!("Delivering update_fail_malformed_htlc to node {}.\n", idx).as_bytes()); + out.locked_write(format!("Delivering update_fail_malformed_htlc from node {} to node {}.\n", $node, idx).as_bytes()); dest.handle_update_fail_malformed_htlc(nodes[$node].get_our_node_id(), update_fail_malformed); } if let Some(msg) = update_fee { - out.locked_write(format!("Delivering update_fee to node {}.\n", idx).as_bytes()); + out.locked_write(format!("Delivering update_fee from node {} to node {}.\n", $node, idx).as_bytes()); dest.handle_update_fee(nodes[$node].get_our_node_id(), &msg); } let processed_change = !update_add_htlcs.is_empty() || !update_fulfill_htlcs.is_empty() || @@ -1144,7 +1150,7 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { } }); break; } - out.locked_write(format!("Delivering commitment_signed to node {}.\n", idx).as_bytes()); + out.locked_write(format!("Delivering commitment_signed from node {} to node {}.\n", $node, idx).as_bytes()); dest.handle_commitment_signed(nodes[$node].get_our_node_id(), &commitment_signed); break; } @@ -1153,7 +1159,7 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { events::MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { for (idx, dest) in nodes.iter().enumerate() { if dest.get_our_node_id() == *node_id { - out.locked_write(format!("Delivering revoke_and_ack to node {}.\n", idx).as_bytes()); + out.locked_write(format!("Delivering revoke_and_ack from node {} to node {}.\n", $node, idx).as_bytes()); dest.handle_revoke_and_ack(nodes[$node].get_our_node_id(), msg); } } @@ -1161,11 +1167,19 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { events::MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { for (idx, dest) in nodes.iter().enumerate() { if dest.get_our_node_id() == *node_id { - out.locked_write(format!("Delivering channel_reestablish to node {}.\n", idx).as_bytes()); + out.locked_write(format!("Delivering channel_reestablish from node {} to node {}.\n", $node, idx).as_bytes()); dest.handle_channel_reestablish(nodes[$node].get_our_node_id(), msg); } } }, + events::MessageSendEvent::SendStfu { ref node_id, ref msg } => { + for (idx, dest) in nodes.iter().enumerate() { + if dest.get_our_node_id() == *node_id { + out.locked_write(format!("Delivering stfu from node {} to node {}.\n", $node, idx).as_bytes()); + dest.handle_stfu(nodes[$node].get_our_node_id(), msg); + } + } + } events::MessageSendEvent::SendChannelReady { .. } => { // Can be generated as a reestablish response }, @@ -1218,6 +1232,7 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { events::MessageSendEvent::UpdateHTLCs { .. } => {}, events::MessageSendEvent::SendRevokeAndACK { .. } => {}, events::MessageSendEvent::SendChannelReestablish { .. } => {}, + events::MessageSendEvent::SendStfu { .. } => {}, events::MessageSendEvent::SendChannelReady { .. } => {}, events::MessageSendEvent::SendAnnouncementSignatures { .. } => {}, events::MessageSendEvent::SendChannelUpdate { ref msg, .. } => { @@ -1244,6 +1259,7 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { events::MessageSendEvent::UpdateHTLCs { .. } => {}, events::MessageSendEvent::SendRevokeAndACK { .. } => {}, events::MessageSendEvent::SendChannelReestablish { .. } => {}, + events::MessageSendEvent::SendStfu { .. } => {}, events::MessageSendEvent::SendChannelReady { .. } => {}, events::MessageSendEvent::SendAnnouncementSignatures { .. } => {}, events::MessageSendEvent::SendChannelUpdate { ref msg, .. } => { @@ -1687,6 +1703,19 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { nodes[2].maybe_update_chan_fees(); }, + 0xa0 => { + nodes[0].maybe_propose_quiescence(&nodes[1].get_our_node_id(), &chan_a_id).unwrap() + }, + 0xa1 => { + nodes[1].maybe_propose_quiescence(&nodes[0].get_our_node_id(), &chan_a_id).unwrap() + }, + 0xa2 => { + nodes[1].maybe_propose_quiescence(&nodes[2].get_our_node_id(), &chan_b_id).unwrap() + }, + 0xa3 => { + nodes[2].maybe_propose_quiescence(&nodes[1].get_our_node_id(), &chan_b_id).unwrap() + }, + 0xf0 => complete_monitor_update(&monitor_a, &chan_1_id, &complete_first), 0xf1 => complete_monitor_update(&monitor_a, &chan_1_id, &complete_second), 0xf2 => complete_monitor_update(&monitor_a, &chan_1_id, &Vec::pop), @@ -1752,34 +1781,49 @@ pub fn do_test(data: &[u8], underlying_out: Out, anchors: bool) { chan_b_disconnected = false; } - for i in 0..std::usize::MAX { - if i == 100 { - panic!("It may take may iterations to settle the state, but it should not take forever"); - } - // Then, make sure any current forwards make their way to their destination - if process_msg_events!(0, false, ProcessMessages::AllMessages) { - continue; - } - if process_msg_events!(1, false, ProcessMessages::AllMessages) { - continue; - } - if process_msg_events!(2, false, ProcessMessages::AllMessages) { - continue; - } - // ...making sure any pending PendingHTLCsForwardable events are handled and - // payments claimed. - if process_events!(0, false) { - continue; - } - if process_events!(1, false) { - continue; - } - if process_events!(2, false) { - continue; - } - break; + macro_rules! process_all_events { + () => { + for i in 0..std::usize::MAX { + if i == 100 { + panic!("It may take may iterations to settle the state, but it should not take forever"); + } + // Then, make sure any current forwards make their way to their destination + if process_msg_events!(0, false, ProcessMessages::AllMessages) { + continue; + } + if process_msg_events!(1, false, ProcessMessages::AllMessages) { + continue; + } + if process_msg_events!(2, false, ProcessMessages::AllMessages) { + continue; + } + // ...making sure any pending PendingHTLCsForwardable events are handled and + // payments claimed. + if process_events!(0, false) { + continue; + } + if process_events!(1, false) { + continue; + } + if process_events!(2, false) { + continue; + } + break; + } + }; } + // At this point, we may be pending quiescence, so we'll process all messages to + // ensure we can complete its handshake. We'll then exit quiescence and process all + // messages again, to resolve any pending HTLCs (only irrevocably committed ones) + // before attempting to send more payments. + process_all_events!(); + nodes[0].exit_quiescence(&nodes[1].get_our_node_id(), &chan_a_id).unwrap(); + nodes[1].exit_quiescence(&nodes[0].get_our_node_id(), &chan_a_id).unwrap(); + nodes[1].exit_quiescence(&nodes[2].get_our_node_id(), &chan_b_id).unwrap(); + nodes[2].exit_quiescence(&nodes[1].get_our_node_id(), &chan_b_id).unwrap(); + process_all_events!(); + // Finally, make sure that at least one end of each channel can make a substantial payment assert!( send_payment(&nodes[0], &nodes[1], chan_a, 10_000_000, &mut p_id, &mut p_idx) diff --git a/lightning-types/src/features.rs b/lightning-types/src/features.rs index 2dd9b14bd35..f0a747b85b0 100644 --- a/lightning-types/src/features.rs +++ b/lightning-types/src/features.rs @@ -72,6 +72,8 @@ //! (see the [`Trampoline` feature proposal](https://github.com/lightning/bolts/pull/836) for more information). //! - `DnsResolver` - supports resolving DNS names to TXT DNSSEC proofs for BIP 353 payments //! (see [bLIP 32](https://github.com/lightning/blips/blob/master/blip-0032.md) for more information). +//! - `Quiescence` - protocol to quiesce a channel by indicating that "SomeThing Fundamental is Underway" +//! (see [BOLT-2](https://github.com/lightning/bolts/blob/master/02-peer-protocol.md#channel-quiescence) for more information). //! //! LDK knows about the following features, but does not support them: //! - `AnchorsNonzeroFeeHtlcTx` - the initial version of anchor outputs, which was later found to be @@ -150,7 +152,7 @@ mod sealed { // Byte 3 RouteBlinding | ShutdownAnySegwit | DualFund | Taproot, // Byte 4 - OnionMessages, + Quiescence | OnionMessages, // Byte 5 ChannelType | SCIDPrivacy, // Byte 6 @@ -171,7 +173,7 @@ mod sealed { // Byte 3 RouteBlinding | ShutdownAnySegwit | DualFund | Taproot, // Byte 4 - OnionMessages, + Quiescence | OnionMessages, // Byte 5 ChannelType | SCIDPrivacy, // Byte 6 @@ -534,6 +536,16 @@ mod sealed { supports_taproot, requires_taproot ); + define_feature!( + 35, + Quiescence, + [InitContext, NodeContext], + "Feature flags for `option_quiesce`.", + set_quiescence_optional, + set_quiescence_required, + supports_quiescence, + requires_quiescence + ); define_feature!( 39, OnionMessages, @@ -1175,6 +1187,7 @@ mod tests { init_features.set_channel_type_optional(); init_features.set_scid_privacy_optional(); init_features.set_zero_conf_optional(); + init_features.set_quiescence_optional(); assert!(init_features.initial_routing_sync()); assert!(!init_features.supports_upfront_shutdown_script()); @@ -1195,7 +1208,7 @@ mod tests { assert_eq!(node_features.flags[1], 0b01010001); assert_eq!(node_features.flags[2], 0b10001010); assert_eq!(node_features.flags[3], 0b00001010); - assert_eq!(node_features.flags[4], 0b10000000); + assert_eq!(node_features.flags[4], 0b10001000); assert_eq!(node_features.flags[5], 0b10100000); assert_eq!(node_features.flags[6], 0b00001000); } diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 1d64a396bfa..cbbae104566 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -473,6 +473,10 @@ mod state_flags { pub const LOCAL_SHUTDOWN_SENT: u32 = 1 << 11; pub const SHUTDOWN_COMPLETE: u32 = 1 << 12; pub const WAITING_FOR_BATCH: u32 = 1 << 13; + pub const AWAITING_QUIESCENCE: u32 = 1 << 14; + pub const LOCAL_STFU_SENT: u32 = 1 << 15; + pub const REMOTE_STFU_SENT: u32 = 1 << 16; + pub const CHANNEL_QUIESCENT: u32 = 1 << 17; } define_state_flags!( @@ -531,7 +535,21 @@ define_state_flags!( messages as we'd be unable to determine which HTLCs they included in their `revoke_and_ack` \ implicit ACK, so instead we have to hold them away temporarily to be sent later.", AWAITING_REMOTE_REVOKE, state_flags::AWAITING_REMOTE_REVOKE, - is_awaiting_remote_revoke, set_awaiting_remote_revoke, clear_awaiting_remote_revoke) + is_awaiting_remote_revoke, set_awaiting_remote_revoke, clear_awaiting_remote_revoke), + ("Indicates a local request has been made for the channel to become quiescent. Both nodes \ + must send `stfu` for the channel to become quiescent. This flag will be cleared and we \ + will no longer attempt quiescence if either node requests a shutdown.", + AWAITING_QUIESCENCE, state_flags::AWAITING_QUIESCENCE, + is_awaiting_quiescence, set_awaiting_quiescence, clear_awaiting_quiescence), + ("Indicates we have sent a `stfu` message to the counterparty. This message can only be sent \ + if either `AWAITING_QUIESCENCE` or `REMOTE_STFU_SENT` is set. Shutdown requests are \ + rejected if this flag is set.", + LOCAL_STFU_SENT, state_flags::LOCAL_STFU_SENT, + is_local_stfu_sent, set_local_stfu_sent, clear_local_stfu_sent), + ("Indicates we have received a `stfu` message from the counterparty. Shutdown requests are \ + rejected if this flag is set.", + REMOTE_STFU_SENT, state_flags::REMOTE_STFU_SENT, + is_remote_stfu_sent, set_remote_stfu_sent, clear_remote_stfu_sent) ] ); @@ -551,6 +569,12 @@ enum ChannelState { /// Both we and our counterparty consider the funding transaction confirmed and the channel is /// now operational. ChannelReady(ChannelReadyFlags), + /// The channel has become quiescent via exchanging `stfu` messages. Any pending commitment + /// updates prior to entering this state must complete, including pending monitor updates. Any + /// further commitment updates throughout this state will be queued to the holding cell. + /// Shutdown requests will be rejected until we revert back to `ChannelReady`, which can happen + /// with a disconnection, or if a message signaling quiescence termination is sent/received. + ChannelQuiescent, /// We've successfully negotiated a `closing_signed` dance. At this point, the `ChannelManager` /// is about to drop us, but we store this anyway. ShutdownComplete, @@ -599,6 +623,7 @@ impl ChannelState { match state { state_flags::FUNDING_NEGOTIATED => Ok(ChannelState::FundingNegotiated), state_flags::SHUTDOWN_COMPLETE => Ok(ChannelState::ShutdownComplete), + state_flags::CHANNEL_QUIESCENT => Ok(ChannelState::ChannelQuiescent), val => { if val & state_flags::AWAITING_CHANNEL_READY == state_flags::AWAITING_CHANNEL_READY { AwaitingChannelReadyFlags::from_u32(val & !state_flags::AWAITING_CHANNEL_READY) @@ -621,6 +646,7 @@ impl ChannelState { ChannelState::FundingNegotiated => state_flags::FUNDING_NEGOTIATED, ChannelState::AwaitingChannelReady(flags) => state_flags::AWAITING_CHANNEL_READY | flags.0, ChannelState::ChannelReady(flags) => state_flags::CHANNEL_READY | flags.0, + ChannelState::ChannelQuiescent => state_flags::CHANNEL_QUIESCENT, ChannelState::ShutdownComplete => state_flags::SHUTDOWN_COMPLETE, } } @@ -645,8 +671,10 @@ impl ChannelState { match self { ChannelState::ChannelReady(flags) => !flags.is_set(ChannelReadyFlags::AWAITING_REMOTE_REVOKE) && + !flags.is_set(ChannelReadyFlags::LOCAL_STFU_SENT) && !flags.is_set(FundedStateFlags::MONITOR_UPDATE_IN_PROGRESS.into()) && !flags.is_set(FundedStateFlags::PEER_DISCONNECTED.into()), + ChannelState::ChannelQuiescent => false, _ => { debug_assert!(false, "Can only generate new commitment within ChannelReady"); false @@ -662,6 +690,9 @@ impl ChannelState { impl_state_flag!(is_their_channel_ready, set_their_channel_ready, clear_their_channel_ready, AwaitingChannelReady); impl_state_flag!(is_waiting_for_batch, set_waiting_for_batch, clear_waiting_for_batch, AwaitingChannelReady); impl_state_flag!(is_awaiting_remote_revoke, set_awaiting_remote_revoke, clear_awaiting_remote_revoke, ChannelReady); + impl_state_flag!(is_awaiting_quiescence, set_awaiting_quiescence, clear_awaiting_quiescence, ChannelReady); + impl_state_flag!(is_local_stfu_sent, set_local_stfu_sent, clear_local_stfu_sent, ChannelReady); + impl_state_flag!(is_remote_stfu_sent, set_remote_stfu_sent, clear_remote_stfu_sent, ChannelReady); } pub const INITIAL_COMMITMENT_NUMBER: u64 = (1 << 48) - 1; @@ -1111,9 +1142,8 @@ pub(crate) const MIN_AFFORDABLE_HTLC_COUNT: usize = 4; /// * `EXPIRE_PREV_CONFIG_TICKS` = convergence_delay / tick_interval pub(crate) const EXPIRE_PREV_CONFIG_TICKS: usize = 5; -/// The number of ticks that may elapse while we're waiting for a response to a -/// [`msgs::RevokeAndACK`] or [`msgs::ChannelReestablish`] message before we attempt to disconnect -/// them. +/// The number of ticks that may elapse while we're waiting for a response before we attempt to +/// disconnect them. /// /// See [`ChannelContext::sent_message_awaiting_response`] for more information. pub(crate) const DISCONNECT_PEER_AWAITING_RESPONSE_TICKS: usize = 2; @@ -1795,16 +1825,14 @@ pub(super) struct ChannelContext where SP::Target: SignerProvider { pub workaround_lnd_bug_4006: Option, /// An option set when we wish to track how many ticks have elapsed while waiting for a response - /// from our counterparty after sending a message. If the peer has yet to respond after reaching - /// `DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`, a reconnection should be attempted to try to - /// unblock the state machine. + /// from our counterparty after entering specific states. If the peer has yet to respond after + /// reaching `DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`, a reconnection should be attempted to + /// try to unblock the state machine. /// - /// This behavior is mostly motivated by a lnd bug in which we don't receive a message we expect - /// to in a timely manner, which may lead to channels becoming unusable and/or force-closed. An - /// example of such can be found at . - /// - /// This is currently only used when waiting for a [`msgs::ChannelReestablish`] or - /// [`msgs::RevokeAndACK`] message from the counterparty. + /// This behavior was initially motivated by a lnd bug in which we don't receive a message we + /// expect to in a timely manner, which may lead to channels becoming unusable and/or + /// force-closed. An example of such can be found at + /// . sent_message_awaiting_response: Option, /// This channel's type, as negotiated during channel open @@ -1850,6 +1878,7 @@ pub(super) struct ChannelContext where SP::Target: SignerProvider { /// If we can't release a [`ChannelMonitorUpdate`] until some external action completes, we /// store it here and only release it to the `ChannelManager` once it asks for it. blocked_monitor_updates: Vec, + // The `next_funding_txid` field allows peers to finalize the signing steps of an interactive // transaction construction, or safely abort that transaction if it was not signed by one of the // peers, who has thus already removed it from its state. @@ -1865,6 +1894,13 @@ pub(super) struct ChannelContext where SP::Target: SignerProvider { // TODO(dual_funding): Persist this when we actually contribute funding inputs. For now we always // send an empty witnesses array in `tx_signatures` as a V2 channel acceptor next_funding_txid: Option, + + /// Staging area for the counterparty's `stfu` if it was received while a monitor update was in + /// progress. + pending_counterparty_stfu: Option, + /// Only set when a counterparty `stfu` has been processed to track which node is allowed to + /// propose "something fundamental" upon becoming quiescent. + quiescence_initiator: Option, } /// A channel struct implementing this trait can receive an initial counterparty commitment @@ -2536,6 +2572,9 @@ impl ChannelContext where SP::Target: SignerProvider { is_manual_broadcast: false, next_funding_txid: None, + + pending_counterparty_stfu: None, + quiescence_initiator: None, }; Ok(channel_context) @@ -2763,6 +2802,9 @@ impl ChannelContext where SP::Target: SignerProvider { local_initiated_shutdown: None, is_manual_broadcast: false, next_funding_txid: None, + + pending_counterparty_stfu: None, + quiescence_initiator: None, }) } @@ -2799,10 +2841,11 @@ impl ChannelContext where SP::Target: SignerProvider { /// Returns true if this channel is fully established and not known to be closing. /// Allowed in any state (including after shutdown) pub fn is_usable(&self) -> bool { - matches!(self.channel_state, ChannelState::ChannelReady(_)) && - !self.channel_state.is_local_shutdown_sent() && - !self.channel_state.is_remote_shutdown_sent() && - !self.monitor_pending_channel_ready + (matches!(self.channel_state, ChannelState::ChannelReady(_)) + || matches!(self.channel_state, ChannelState::ChannelQuiescent)) + && !self.channel_state.is_local_shutdown_sent() + && !self.channel_state.is_remote_shutdown_sent() + && !self.monitor_pending_channel_ready } /// shutdown state returns the state of the channel in its various stages of shutdown @@ -2852,6 +2895,77 @@ impl ChannelContext where SP::Target: SignerProvider { } } + /// Checks whether the local/remote side of a channel has any HTLC additions, HTLC removals, or + /// fee updates that have been sent but not yet irrevocably committed on both commitments. + /// Holding cell updates are not considered because they haven't been sent to the peer yet. + /// + /// This can be used to satisfy quiescence's requirement when sending `stfu`: + /// - MUST NOT send `stfu` if any of the sender's htlc additions, htlc removals + /// or fee updates are pending for either peer. + /// + /// NOTE: There are certain states in which we cannot accurately determine whether a channel + /// update has been irrevocably committed. In such cases, this may return true when the node in + /// question doesn't actually have a pending channel update, but the counterparty does. Within + /// the quiescence protocol, in the worst case, this will just delay sending/processing a `stfu` + /// message until the counterparty doesn't have any pending updates as well. + pub fn has_pending_channel_update(&self, local: bool) -> bool { + if local { + // A fee update from the local node is pending on the remote commitment. + if matches!(self.pending_update_fee.map(|(_, state)| state), Some(FeeUpdateState::Outbound)) { + return true; + } + // An HTLC add from the local node is pending on the remote commitment. + if self.pending_outbound_htlcs.iter() + .any(|htlc| matches!(htlc.state, OutboundHTLCState::LocalAnnounced(_))) + { + return true; + } + // An HTLC removal from the local node is pending on the remote commitment. + if self.pending_inbound_htlcs.iter() + .any(|htlc| matches!(htlc.state, InboundHTLCState::LocalRemoved(_))) + { + return true; + } + // An update from the local node may be pending on the local commitment since it is not + // tracked within our state. We rely on whether it is owed a `commitment_signed` or it + // owes a `revoke_and_ack`, which could also be set if an update from the remote node is + // pending. + self.expecting_peer_commitment_signed || self.monitor_pending_revoke_and_ack + } else { + // A fee update from the remote node is pending on the local commitment. + if !self.is_outbound() && self.pending_update_fee.is_some() { + return true; + } + // An HTLC add from the remote node is pending on the local commitment. + if self.pending_inbound_htlcs.iter() + .any(|htlc| match htlc.state { + InboundHTLCState::RemoteAnnounced(_) + | InboundHTLCState::AwaitingRemoteRevokeToAnnounce(_) + | InboundHTLCState::AwaitingAnnouncedRemoteRevoke(_) => true, + _ => false, + }) + { + return true; + } + // An HTLC removal from the remote node is pending on the local commitment. + if self.pending_outbound_htlcs.iter() + .any(|htlc| match htlc.state { + OutboundHTLCState::RemoteRemoved(_) + | OutboundHTLCState::AwaitingRemoteRevokeToRemove(_) + | OutboundHTLCState::AwaitingRemovedRemoteRevoke(_) => true, + _ => false, + }) + { + return true; + } + // An update from the remote node may be pending on the remote commitment since it is + // not tracked within our state. We rely on whether it is owed a `revoke_and_ack` or it + // owes a `commitment_signed`, which could also be set if an update from the local node + // is pending. + self.monitor_pending_commitment_signed || self.channel_state.is_awaiting_remote_revoke() + } + } + // Public utilities: pub fn channel_id(&self) -> ChannelId { @@ -4746,11 +4860,12 @@ impl FundedChannel where &mut self, htlc_id_arg: u64, payment_preimage_arg: PaymentPreimage, payment_info: Option, logger: &L, ) -> UpdateFulfillFetch where L::Target: Logger { - // Either ChannelReady got set (which means it won't be unset) or there is no way any - // caller thought we could have something claimed (cause we wouldn't have accepted in an - // incoming HTLC anyway). If we got to ShutdownComplete, callers aren't allowed to call us, - // either. - if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) { + // Either ChannelReady/ChannelQuiescent got set or there is no way any caller thought we + // could have something claimed (cause we wouldn't have accepted in an incoming HTLC + // anyway). If we got to ShutdownComplete, callers aren't allowed to call us, either. + if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) + && !matches!(self.context.channel_state, ChannelState::ChannelQuiescent) + { panic!("Was asked to fulfill an HTLC when channel was not in an operational state"); } @@ -4927,7 +5042,9 @@ impl FundedChannel where &mut self, htlc_id_arg: u64, err_contents: E, mut force_holding_cell: bool, logger: &L ) -> Result, ChannelError> where L::Target: Logger { - if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) { + if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) && + !matches!(self.context.channel_state, ChannelState::ChannelQuiescent) + { panic!("Was asked to fail an HTLC when channel was not in an operational state"); } @@ -5107,6 +5224,11 @@ impl FundedChannel where pub fn update_add_htlc( &mut self, msg: &msgs::UpdateAddHTLC, fee_estimator: &LowerBoundedFeeEstimator, ) -> Result<(), ChannelError> where F::Target: FeeEstimator { + if self.context.channel_state.is_remote_stfu_sent() + || matches!(self.context.channel_state, ChannelState::ChannelQuiescent) + { + return Err(ChannelError::Warn("Got add HTLC message while quiescent".to_owned())); + } if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) { return Err(ChannelError::close("Got add HTLC message when channel was not in an operational state".to_owned())); } @@ -5251,6 +5373,11 @@ impl FundedChannel where } pub fn update_fulfill_htlc(&mut self, msg: &msgs::UpdateFulfillHTLC) -> Result<(HTLCSource, u64, Option), ChannelError> { + if self.context.channel_state.is_remote_stfu_sent() + || matches!(self.context.channel_state, ChannelState::ChannelQuiescent) + { + return Err(ChannelError::Warn("Got fulfill HTLC message while quiescent".to_owned())); + } if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) { return Err(ChannelError::close("Got fulfill HTLC message when channel was not in an operational state".to_owned())); } @@ -5262,6 +5389,11 @@ impl FundedChannel where } pub fn update_fail_htlc(&mut self, msg: &msgs::UpdateFailHTLC, fail_reason: HTLCFailReason) -> Result<(), ChannelError> { + if self.context.channel_state.is_remote_stfu_sent() + || matches!(self.context.channel_state, ChannelState::ChannelQuiescent) + { + return Err(ChannelError::Warn("Got fail HTLC message while quiescent".to_owned())); + } if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) { return Err(ChannelError::close("Got fail HTLC message when channel was not in an operational state".to_owned())); } @@ -5274,6 +5406,11 @@ impl FundedChannel where } pub fn update_fail_malformed_htlc(&mut self, msg: &msgs::UpdateFailMalformedHTLC, fail_reason: HTLCFailReason) -> Result<(), ChannelError> { + if self.context.channel_state.is_remote_stfu_sent() + || matches!(self.context.channel_state, ChannelState::ChannelQuiescent) + { + return Err(ChannelError::Warn("Got fail malformed HTLC message while quiescent".to_owned())); + } if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) { return Err(ChannelError::close("Got fail malformed HTLC message when channel was not in an operational state".to_owned())); } @@ -5325,6 +5462,9 @@ impl FundedChannel where pub fn commitment_signed(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result, ChannelError> where L::Target: Logger { + if matches!(self.context.channel_state, ChannelState::ChannelQuiescent) { + return Err(ChannelError::Warn("Got commitment_signed message while quiescent".to_owned())); + } if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) { return Err(ChannelError::close("Got commitment signed message when channel was not in an operational state".to_owned())); } @@ -5574,6 +5714,7 @@ impl FundedChannel where ) -> (Option, Vec<(HTLCSource, PaymentHash)>) where F::Target: FeeEstimator, L::Target: Logger { + assert!(matches!(self.context.channel_state, ChannelState::ChannelReady(_))); assert!(!self.context.channel_state.is_monitor_update_in_progress()); if self.context.holding_cell_htlc_updates.len() != 0 || self.context.holding_cell_update_fee.is_some() { log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.context.holding_cell_htlc_updates.len(), @@ -5607,7 +5748,13 @@ impl FundedChannel where amount_msat, *payment_hash, cltv_expiry, source.clone(), onion_routing_packet.clone(), false, skimmed_fee_msat, blinding_point, fee_estimator, logger ) { - Ok(_) => update_add_count += 1, + Ok(update_add_msg_opt) => { + debug_assert!( + update_add_msg_opt.is_some(), + "Must generate new update if we're freeing the holding cell" + ); + update_add_count += 1; + }, Err(e) => { match e { ChannelError::Ignore(ref msg) => { @@ -5710,6 +5857,9 @@ impl FundedChannel where ) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option), ChannelError> where F::Target: FeeEstimator, L::Target: Logger, { + if matches!(self.context.channel_state, ChannelState::ChannelQuiescent) { + return Err(ChannelError::Warn("Got revoke_and_ack message while quiescent".to_owned())); + } if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) { return Err(ChannelError::close("Got revoke/ACK message when channel was not in an operational state".to_owned())); } @@ -5775,7 +5925,7 @@ impl FundedChannel where // OK, we step the channel here and *then* if the new generation fails we can fail the // channel based on that, but stepping stuff here should be safe either way. self.context.channel_state.clear_awaiting_remote_revoke(); - self.context.sent_message_awaiting_response = None; + self.mark_response_received(); self.context.counterparty_prev_commitment_point = self.context.counterparty_cur_commitment_point; self.context.counterparty_cur_commitment_point = Some(msg.next_per_commitment_point); self.context.cur_counterparty_commitment_transaction_number -= 1; @@ -5928,14 +6078,8 @@ impl FundedChannel where self.context.monitor_pending_update_adds.append(&mut pending_update_adds); if self.context.channel_state.is_monitor_update_in_progress() { - // We can't actually generate a new commitment transaction (incl by freeing holding - // cells) while we can't update the monitor, so we just return what we have. if require_commitment { self.context.monitor_pending_commitment_signed = true; - // When the monitor updating is restored we'll call - // get_last_commitment_update_for_send(), which does not update state, but we're - // definitely now awaiting a remote revoke before we can step forward any more, so - // set it here. let mut additional_update = self.build_commitment_no_status_check(logger); // build_commitment_no_status_check may bump latest_monitor_id but we want them to be // strictly increasing by one, so decrement it here. @@ -5949,7 +6093,7 @@ impl FundedChannel where return_with_htlcs_to_fail!(Vec::new()); } - match self.free_holding_cell_htlcs(fee_estimator, logger) { + match self.maybe_free_holding_cell_htlcs(fee_estimator, logger) { (Some(mut additional_update), htlcs_to_fail) => { // free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be // strictly increasing by one, so decrement it here. @@ -5964,6 +6108,7 @@ impl FundedChannel where }, (None, htlcs_to_fail) => { if require_commitment { + // We can't generate a new commitment transaction yet so we just return what we have. let mut additional_update = self.build_commitment_no_status_check(logger); // build_commitment_no_status_check may bump latest_monitor_id but we want them to be @@ -5971,10 +6116,24 @@ impl FundedChannel where self.context.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); - log_debug!(logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed. {} monitor update.", - &self.context.channel_id(), - update_fail_htlcs.len() + update_fail_malformed_htlcs.len(), - release_state_str); + log_debug!(logger, "Received a valid revoke_and_ack for channel {}. {} monitor update.", + &self.context.channel_id(), release_state_str); + if self.context.channel_state.can_generate_new_commitment() { + log_debug!(logger, "Responding with a commitment update with {} HTLCs failed for channel {}", + update_fail_htlcs.len() + update_fail_malformed_htlcs.len(), + &self.context.channel_id); + } else { + debug_assert!(htlcs_to_fail.is_empty()); + let reason = if self.context.channel_state.is_local_stfu_sent() { + "exits quiescence" + } else if self.context.channel_state.is_monitor_update_in_progress() { + "completes pending monitor update" + } else { + "can continue progress" + }; + log_debug!(logger, "Holding back commitment update until channel {} {}", + &self.context.channel_id, reason); + } self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs); return_with_htlcs_to_fail!(htlcs_to_fail); @@ -6111,7 +6270,10 @@ impl FundedChannel where return None; } - if self.context.channel_state.is_awaiting_remote_revoke() || self.context.channel_state.is_monitor_update_in_progress() { + // Some of the checks of `can_generate_new_commitment` have already been done above, but + // it's much more brittle to not use it in favor of checking the remaining flags left, as it + // gives us one less code path to update if the method changes. + if !self.context.channel_state.can_generate_new_commitment() { force_holding_cell = true; } @@ -6141,6 +6303,10 @@ impl FundedChannel where return Err(()) } + // We only clear `peer_disconnected` if we were able to reestablish the channel. We always + // reset our awaiting response in case we failed reestablishment and are disconnecting. + self.context.sent_message_awaiting_response = None; + if self.context.channel_state.is_peer_disconnected() { // While the below code should be idempotent, it's simpler to just return early, as // redundant disconnect events can fire, though they should be rare. @@ -6201,7 +6367,22 @@ impl FundedChannel where } } - self.context.sent_message_awaiting_response = None; + // Reset any quiescence-related state as it is implicitly terminated once disconnected. + match self.context.channel_state { + ChannelState::ChannelReady(_) => { + self.context.channel_state.clear_awaiting_quiescence(); + self.context.channel_state.clear_local_stfu_sent(); + self.context.channel_state.clear_remote_stfu_sent(); + }, + ChannelState::ChannelQuiescent => { + // Entering quiescence implies that shutdown has not been negotiated, and there + // aren't any pending channel updates. + self.context.channel_state = ChannelState::ChannelReady(ChannelReadyFlags::new()); + }, + _ => {}, + } + self.context.pending_counterparty_stfu.take(); + self.context.quiescence_initiator.take(); self.context.channel_state.set_peer_disconnected(); log_trace!(logger, "Peer disconnection resulted in {} remote-announced HTLC drops on channel {}", inbound_drop_count, &self.context.channel_id()); @@ -6318,10 +6499,6 @@ impl FundedChannel where commitment_update = None; } - if commitment_update.is_some() { - self.mark_awaiting_response(); - } - self.context.monitor_pending_revoke_and_ack = false; self.context.monitor_pending_commitment_signed = false; let order = self.context.resend_order.clone(); @@ -6364,6 +6541,11 @@ impl FundedChannel where if self.context.channel_state.is_peer_disconnected() { return Err(ChannelError::close("Peer sent update_fee when we needed a channel_reestablish".to_owned())); } + if self.context.channel_state.is_remote_stfu_sent() + || matches!(self.context.channel_state, ChannelState::ChannelQuiescent) + { + return Err(ChannelError::Warn("Got fee update message while quiescent".to_owned())); + } FundedChannel::::check_remote_fee(&self.context.channel_type, fee_estimator, msg.feerate_per_kw, Some(self.context.feerate_per_kw), logger)?; self.context.pending_update_fee = Some((msg.feerate_per_kw, FeeUpdateState::RemoteAnnounced)); @@ -6675,7 +6857,7 @@ impl FundedChannel where // Go ahead and unmark PeerDisconnected as various calls we may make check for it (and all // remaining cases either succeed or ErrorMessage-fail). self.context.channel_state.clear_peer_disconnected(); - self.context.sent_message_awaiting_response = None; + self.mark_response_received(); let shutdown_msg = self.get_outbound_shutdown(); @@ -6731,9 +6913,6 @@ impl FundedChannel where // AwaitingRemoteRevoke set, which indicates we sent a commitment_signed but haven't gotten // the corresponding revoke_and_ack back yet. let is_awaiting_remote_revoke = self.context.channel_state.is_awaiting_remote_revoke(); - if is_awaiting_remote_revoke && !self.is_awaiting_monitor_update() { - self.mark_awaiting_response(); - } let next_counterparty_commitment_number = INITIAL_COMMITMENT_NUMBER - self.context.cur_counterparty_commitment_transaction_number + if is_awaiting_remote_revoke { 1 } else { 0 }; let channel_ready = if msg.next_local_commitment_number == 1 && INITIAL_COMMITMENT_NUMBER - self.holder_commitment_point.transaction_number() == 1 { @@ -6918,26 +7097,38 @@ impl FundedChannel where Ok((closing_signed, None, None)) } - // Marks a channel as waiting for a response from the counterparty. If it's not received - // [`DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`] after sending our own to them, then we'll attempt - // a reconnection. - fn mark_awaiting_response(&mut self) { - self.context.sent_message_awaiting_response = Some(0); + fn mark_response_received(&mut self) { + self.context.sent_message_awaiting_response = None; } /// Determines whether we should disconnect the counterparty due to not receiving a response /// within our expected timeframe. /// - /// This should be called on every [`super::channelmanager::ChannelManager::timer_tick_occurred`]. + /// This should be called for peers with an active socket on every + /// [`super::channelmanager::ChannelManager::timer_tick_occurred`]. pub fn should_disconnect_peer_awaiting_response(&mut self) -> bool { - let ticks_elapsed = if let Some(ticks_elapsed) = self.context.sent_message_awaiting_response.as_mut() { - ticks_elapsed + if let Some(ticks_elapsed) = self.context.sent_message_awaiting_response.as_mut() { + *ticks_elapsed += 1; + *ticks_elapsed >= DISCONNECT_PEER_AWAITING_RESPONSE_TICKS + } else if + // Cleared upon receiving `channel_reestablish`. + self.context.channel_state.is_peer_disconnected() + // Cleared upon receiving `stfu`. + || self.context.channel_state.is_local_stfu_sent() + // Cleared upon receiving a message that triggers the end of quiescence. + || matches!(self.context.channel_state, ChannelState::ChannelQuiescent) + // Cleared upon receiving `revoke_and_ack`. + || self.context.has_pending_channel_update(false) + || self.context.has_pending_channel_update(true) + { + // This is the first tick we've seen after expecting to make forward progress. + self.context.sent_message_awaiting_response = Some(1); + debug_assert!(DISCONNECT_PEER_AWAITING_RESPONSE_TICKS > 1); + false } else { // Don't disconnect when we're not waiting on a response. - return false; - }; - *ticks_elapsed += 1; - *ticks_elapsed >= DISCONNECT_PEER_AWAITING_RESPONSE_TICKS + false + } } pub fn shutdown( @@ -6960,6 +7151,14 @@ impl FundedChannel where } assert!(!matches!(self.context.channel_state, ChannelState::ShutdownComplete)); + // TODO: The spec is pretty vague regarding the handling of shutdown within quiescence. + if self.context.channel_state.is_local_stfu_sent() + || self.context.channel_state.is_remote_stfu_sent() + || matches!(self.context.channel_state, ChannelState::ChannelQuiescent) + { + return Err(ChannelError::Warn("Got shutdown request while quiescent".to_owned())); + } + if !script::is_bolt2_compliant(&msg.scriptpubkey, their_features) { return Err(ChannelError::Warn(format!("Got a nonstandard scriptpubkey ({}) from remote peer", msg.scriptpubkey.to_hex_string()))); } @@ -6996,6 +7195,11 @@ impl FundedChannel where // From here on out, we may not fail! self.context.channel_state.set_remote_shutdown_sent(); + if self.context.channel_state.is_awaiting_quiescence() { + // We haven't been able to send `stfu` yet, and there's no point in attempting + // quiescence anymore since the counterparty wishes to close the channel. + self.context.channel_state.clear_awaiting_quiescence(); + } self.context.update_time_counter += 1; let monitor_update = if update_shutdown_script { @@ -7798,8 +8002,11 @@ impl FundedChannel where return Ok((Some(channel_ready), timed_out_htlcs, announcement_sigs)); } - if matches!(self.context.channel_state, ChannelState::ChannelReady(_)) || - self.context.channel_state.is_our_channel_ready() { + if matches!(self.context.channel_state, ChannelState::ChannelReady(_)) + // Still close the channel if it is reorged out while we're quiescent. + || matches!(self.context.channel_state, ChannelState::ChannelQuiescent) + || self.context.channel_state.is_our_channel_ready() + { let mut funding_tx_confirmations = height as i64 - self.context.funding_tx_confirmation_height as i64 + 1; if self.context.funding_tx_confirmation_height == 0 { // Note that check_get_channel_ready may reset funding_tx_confirmation_height to @@ -8087,7 +8294,6 @@ impl FundedChannel where log_info!(logger, "Sending a data_loss_protect with no previous remote per_commitment_secret for channel {}", &self.context.channel_id()); [0;32] }; - self.mark_awaiting_response(); msgs::ChannelReestablish { channel_id: self.context.channel_id(), // The protocol has two different commitment number concepts - the "commitment @@ -8164,9 +8370,10 @@ impl FundedChannel where ) -> Result, ChannelError> where F::Target: FeeEstimator, L::Target: Logger { - if !matches!(self.context.channel_state, ChannelState::ChannelReady(_)) || - self.context.channel_state.is_local_shutdown_sent() || - self.context.channel_state.is_remote_shutdown_sent() + if (!matches!(self.context.channel_state, ChannelState::ChannelReady(_)) + && !matches!(self.context.channel_state, ChannelState::ChannelQuiescent)) + || self.context.channel_state.is_local_shutdown_sent() + || self.context.channel_state.is_remote_shutdown_sent() { return Err(ChannelError::Ignore("Cannot send HTLC until channel is fully established and we haven't started shutting down".to_owned())); } @@ -8448,6 +8655,12 @@ impl FundedChannel where target_feerate_sats_per_kw: Option, override_shutdown_script: Option) -> Result<(msgs::Shutdown, Option, Vec<(HTLCSource, PaymentHash)>), APIError> { + if self.context.channel_state.is_local_stfu_sent() + || self.context.channel_state.is_remote_stfu_sent() + || matches!(self.context.channel_state, ChannelState::ChannelQuiescent) + { + return Err(APIError::APIMisuseError { err: "Cannot begin shutdown while quiescent".to_owned() }); + } for htlc in self.context.pending_outbound_htlcs.iter() { if let OutboundHTLCState::LocalAnnounced(_) = htlc.state { return Err(APIError::APIMisuseError{err: "Cannot begin shutdown with pending HTLCs. Process pending events first".to_owned()}); @@ -8492,6 +8705,9 @@ impl FundedChannel where // From here on out, we may not fail! self.context.target_closing_feerate_sats_per_kw = target_feerate_sats_per_kw; self.context.channel_state.set_local_shutdown_sent(); + if self.context.channel_state.is_awaiting_quiescence() { + self.context.channel_state.clear_awaiting_quiescence(); + } self.context.local_initiated_shutdown = Some(()); self.context.update_time_counter += 1; @@ -8544,6 +8760,170 @@ impl FundedChannel where }) .chain(self.context.pending_outbound_htlcs.iter().map(|htlc| (&htlc.source, &htlc.payment_hash))) } + + pub fn should_advance_quiescence_handshake(&self) -> bool { + matches!(self.context.channel_state, ChannelState::ChannelReady(_)) + && (self.context.channel_state.is_awaiting_quiescence() + || self.context.channel_state.is_local_stfu_sent() + || self.context.channel_state.is_remote_stfu_sent()) + || self.context.pending_counterparty_stfu.is_some() + } + + pub fn get_pending_counterparty_stfu(&mut self) -> Option { + self.context.pending_counterparty_stfu.take() + } + + #[cfg(any(test, fuzzing))] + pub fn propose_quiescence(&mut self, logger: &L) -> Result, ChannelError> + where + L::Target: Logger, + { + log_debug!(logger, "Attempting to initiate quiescence"); + + if !self.context.is_live() { + return Err(ChannelError::Ignore("Channel is not in a live state to propose quiescence".to_owned())); + } + if matches!(self.context.channel_state, ChannelState::ChannelQuiescent) { + return Err(ChannelError::Ignore("Channel is already quiescent".to_owned())); + } + + if self.context.channel_state.is_awaiting_quiescence() + || self.context.channel_state.is_local_stfu_sent() + { + return Ok(None); + } + + self.context.channel_state.set_awaiting_quiescence(); + Ok(Some(self.send_stfu(logger)?)) + } + + pub fn send_stfu(&mut self, logger: &L) -> Result + where + L::Target: Logger, + { + if self.context.channel_state.is_local_stfu_sent() { + return Err(ChannelError::Ignore( + "We already sent `stfu` and we've yet to become quiescent".to_owned()) + ); + } + + // Either of these states imply the channel is live. + if !self.context.channel_state.is_awaiting_quiescence() + && !self.context.channel_state.is_remote_stfu_sent() + { + return Err(ChannelError::Ignore( + "We cannot send `stfu` if we're not attempting to become quiescent".to_owned()) + ); + } + debug_assert!(self.context.is_live()); + + if self.context.has_pending_channel_update(true) { + return Err(ChannelError::Ignore( + "We cannot send `stfu` while state machine is pending".to_owned()) + ); + } + if self.context.channel_state.is_monitor_update_in_progress() { + return Err(ChannelError::Ignore( + "We cannot send `stfu` while monitor update in progress".to_owned()) + ); + } + + let initiator = if self.context.channel_state.is_remote_stfu_sent() { + debug_assert!(!self.context.channel_state.is_awaiting_remote_revoke()); + self.context.channel_state = ChannelState::ChannelQuiescent; + if let Some(quiescence_initiator) = self.context.quiescence_initiator { + log_debug!( + logger, + "Responding to counterparty stfu with our own, channel is now quiescent and we are{} the initiator", + if !quiescence_initiator { " not" } else { "" } + ); + + quiescence_initiator + } else { + debug_assert!(false, "Quiescence initiator must have been determined when we received stfu"); + false + } + } else { + log_debug!(logger, "Sending stfu as quiescence initiator"); + debug_assert!(self.context.channel_state.is_awaiting_quiescence()); + self.context.channel_state.clear_awaiting_quiescence(); + self.context.channel_state.set_local_stfu_sent(); + true + }; + + Ok(msgs::Stfu { channel_id: self.context.channel_id, initiator }) + } + + pub fn stfu( + &mut self, msg: &msgs::Stfu, logger: &L + ) -> Result, ChannelError> where L::Target: Logger { + if matches!(self.context.channel_state, ChannelState::ChannelQuiescent) { + return Err(ChannelError::Warn("Channel is already quiescent".to_owned())); + } + if self.context.channel_state.is_remote_stfu_sent() { + return Err(ChannelError::Warn( + "Peer sent `stfu` when they already sent it and we've yet to become quiescent".to_owned() + )); + } + + if !self.context.is_live() { + return Err(ChannelError::Warn("Peer sent `stfu` when we were not in a live state".to_owned())); + } + + if self.context.has_pending_channel_update(false) { + self.context.pending_counterparty_stfu = Some(msg.clone()); + return Err(ChannelError::Ignore( + "Stashing `stfu` from counterparty until state machine is no longer pending".to_owned() + )); + } + if self.context.channel_state.is_monitor_update_in_progress() { + self.context.pending_counterparty_stfu = Some(msg.clone()); + return Err(ChannelError::Ignore( + "Stashing `stfu` from counterparty until monitor update is complete".to_owned() + )); + } + + self.context.channel_state.set_remote_stfu_sent(); + + if self.context.channel_state.is_awaiting_quiescence() { + self.context.quiescence_initiator = Some(self.context.is_outbound()); + return self.send_stfu(logger).map(|stfu| Some(stfu)); + } + if !self.context.channel_state.is_local_stfu_sent() { + debug_assert!(msg.initiator); + self.context.quiescence_initiator = Some(false); + return self.send_stfu(logger).map(|stfu| Some(stfu)); + }; + + debug_assert!(!self.context.channel_state.is_awaiting_remote_revoke()); + debug_assert!(self.context.channel_state.is_local_stfu_sent()); + self.context.channel_state = ChannelState::ChannelQuiescent; + let quiescence_initiator = !msg.initiator || self.context.is_outbound(); + self.context.quiescence_initiator = Some(quiescence_initiator); + self.mark_response_received(); + + log_debug!( + logger, + "Received counterparty stfu, channel is now quiescent and we are{} the initiator", + if !quiescence_initiator { " not" } else { "" } + ); + + Ok(None) + } + + #[cfg(any(test, fuzzing))] + pub fn exit_quiescence(&mut self) -> bool { + if self.context.channel_state == ChannelState::ChannelQuiescent { + self.mark_response_received(); + self.context.channel_state = ChannelState::ChannelReady(ChannelReadyFlags::new()); + self.context.quiescence_initiator.take().expect("Must always be set while quiescent") + } else { + debug_assert!(!self.context.channel_state.is_awaiting_quiescence()); + debug_assert!(!self.context.channel_state.is_local_stfu_sent()); + debug_assert!(!self.context.channel_state.is_remote_stfu_sent()); + false + } + } } /// A not-yet-funded outbound (from holder) channel using V1 channel establishment. @@ -9530,11 +9910,19 @@ impl Writeable for FundedChannel where SP::Target: SignerProvider self.context.channel_id.write(writer)?; { let mut channel_state = self.context.channel_state; - if matches!(channel_state, ChannelState::AwaitingChannelReady(_)|ChannelState::ChannelReady(_)) { - channel_state.set_peer_disconnected(); - } else { - debug_assert!(false, "Pre-funded/shutdown channels should not be written"); + match channel_state { + ChannelState::AwaitingChannelReady(_) => {}, + ChannelState::ChannelReady(_) => { + channel_state.clear_awaiting_quiescence(); + channel_state.clear_local_stfu_sent(); + channel_state.clear_remote_stfu_sent(); + }, + ChannelState::ChannelQuiescent => { + channel_state = ChannelState::ChannelReady(ChannelReadyFlags::new()); + }, + _ => debug_assert!(false, "Pre-funded/shutdown channels should not be written"), } + channel_state.set_peer_disconnected(); channel_state.to_u32().write(writer)?; } self.context.channel_value_satoshis.write(writer)?; @@ -10444,6 +10832,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch blocked_monitor_updates: blocked_monitor_updates.unwrap(), is_manual_broadcast: is_manual_broadcast.unwrap_or(false), + // TODO(dual_funding): Instead of getting this from persisted value, figure it out based on the // funding transaction and other channel state. // @@ -10451,6 +10840,9 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch // during a signing session, but have not received `tx_signatures` we MUST set `next_funding_txid` // to the txid of that interactive transaction, else we MUST NOT set it. next_funding_txid: None, + + pending_counterparty_stfu: None, + quiescence_initiator: None, }, interactive_tx_signing_session: None, holder_commitment_point, diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 65c41bc531a..53326f27e89 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -6520,19 +6520,21 @@ where funded_chan.context.maybe_expire_prev_config(); - if funded_chan.should_disconnect_peer_awaiting_response() { - let logger = WithChannelContext::from(&self.logger, &funded_chan.context, None); - log_debug!(logger, "Disconnecting peer {} due to not making any progress on channel {}", - counterparty_node_id, chan_id); - pending_msg_events.push(MessageSendEvent::HandleError { - node_id: counterparty_node_id, - action: msgs::ErrorAction::DisconnectPeerWithWarning { - msg: msgs::WarningMessage { - channel_id: *chan_id, - data: "Disconnecting due to timeout awaiting response".to_owned(), + if peer_state.is_connected { + if funded_chan.should_disconnect_peer_awaiting_response() { + let logger = WithChannelContext::from(&self.logger, &funded_chan.context, None); + log_debug!(logger, "Disconnecting peer {} due to not making any progress on channel {}", + counterparty_node_id, chan_id); + pending_msg_events.push(MessageSendEvent::HandleError { + node_id: counterparty_node_id, + action: msgs::ErrorAction::DisconnectPeerWithWarning { + msg: msgs::WarningMessage { + channel_id: *chan_id, + data: "Disconnecting due to timeout awaiting response".to_owned(), + }, }, - }, - }); + }); + } } true @@ -9060,6 +9062,58 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ Ok(()) } + fn internal_stfu(&self, counterparty_node_id: &PublicKey, msg: &msgs::Stfu) -> Result { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id).ok_or_else(|| { + debug_assert!(false); + MsgHandleErrInternal::send_err_msg_no_close( + format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), + msg.channel_id + ) + })?; + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + + if !self.init_features().supports_quiescence() { + return Err(MsgHandleErrInternal::from_chan_no_close( + ChannelError::Warn("Quiescense not supported".to_string()), msg.channel_id + )); + } + + let mut sent_stfu = false; + match peer_state.channel_by_id.entry(msg.channel_id) { + hash_map::Entry::Occupied(mut chan_entry) => { + if let Some(chan) = chan_entry.get_mut().as_funded_mut() { + let logger = WithContext::from( + &self.logger, Some(*counterparty_node_id), Some(msg.channel_id), None + ); + + if let Some(stfu) = try_channel_entry!( + self, peer_state, chan.stfu(&msg, &&logger), chan_entry + ) { + sent_stfu = true; + peer_state.pending_msg_events.push(MessageSendEvent::SendStfu { + node_id: *counterparty_node_id, + msg: stfu, + }); + } + } else { + let msg = "Peer sent `stfu` for an unfunded channel"; + let err = Err(ChannelError::Close( + (msg.into(), ClosureReason::ProcessingError { err: msg.into() }) + )); + return try_channel_entry!(self, peer_state, err, chan_entry); + } + }, + hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close( + format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), + msg.channel_id + )) + } + + Ok(sent_stfu) + } + fn internal_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) @@ -9585,6 +9639,130 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ has_update } + fn maybe_advance_quiescence_handshake(&self) { + let per_peer_state = self.per_peer_state.read().unwrap(); + for (counterparty_node_id, peer_state_mutex) in per_peer_state.iter() { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + let pending_msg_events = &mut peer_state.pending_msg_events; + for (channel_id, chan) in &mut peer_state.channel_by_id { + if let Some(funded_chan) = chan.as_funded_mut() { + if !funded_chan.should_advance_quiescence_handshake() { + continue; + } + + let logger = WithContext::from( + &self.logger, Some(*counterparty_node_id), Some(*channel_id), None + ); + + let result = if let Some(stfu) = funded_chan.get_pending_counterparty_stfu() { + log_debug!(logger, "Re-processing stashed counterparty stfu"); + funded_chan.stfu(&stfu, &&logger) + } else { + funded_chan.send_stfu(&&logger).map(|stfu| Some(stfu)) + }; + + match result { + Ok(None) => {}, + Ok(Some(stfu)) => { + pending_msg_events.push(events::MessageSendEvent::SendStfu { + node_id: chan.context().get_counterparty_node_id(), + msg: stfu, + }); + }, + Err(e) => { + log_debug!(logger, "Could not advance quiescence handshake: {}", e); + } + } + } + } + } + } + + #[cfg(any(test, fuzzing))] + pub fn maybe_propose_quiescence(&self, counterparty_node_id: &PublicKey, channel_id: &ChannelId) -> Result<(), APIError> { + let mut result = Ok(()); + PersistenceNotifierGuard::optionally_notify(self, || { + let mut notify = NotifyOption::SkipPersistNoEvents; + + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex_opt = per_peer_state.get(counterparty_node_id); + if peer_state_mutex_opt.is_none() { + result = Err(APIError::ChannelUnavailable { + err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) + }); + return notify; + } + + let mut peer_state = peer_state_mutex_opt.unwrap().lock().unwrap(); + if !peer_state.latest_features.supports_quiescence() { + result = Err(APIError::ChannelUnavailable { err: "Peer does not support quiescence".to_owned() }); + return notify; + } + + match peer_state.channel_by_id.entry(channel_id.clone()) { + hash_map::Entry::Occupied(mut chan_entry) => { + if let Some(chan) = chan_entry.get_mut().as_funded_mut() { + let logger = WithContext::from( + &self.logger, Some(*counterparty_node_id), Some(*channel_id), None + ); + + match chan.propose_quiescence(&&logger) { + Ok(None) => {}, + Ok(Some(stfu)) => { + peer_state.pending_msg_events.push(MessageSendEvent::SendStfu { + node_id: *counterparty_node_id, msg: stfu + }); + notify = NotifyOption::SkipPersistHandleEvents; + }, + Err(msg) => log_trace!(logger, "{}", msg), + } + } else { + result = Err(APIError::APIMisuseError { + err: format!("Unfunded channel {} cannot be quiescent", channel_id), + }); + } + }, + hash_map::Entry::Vacant(_) => { + result = Err(APIError::ChannelUnavailable { + err: format!("Channel with id {} not found for the passed counterparty node_id {}", + channel_id, counterparty_node_id), + }); + }, + } + + notify + }); + + result + } + + #[cfg(any(test, fuzzing))] + pub fn exit_quiescence(&self, counterparty_node_id: &PublicKey, channel_id: &ChannelId) -> Result { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state_mutex = per_peer_state.get(counterparty_node_id) + .ok_or_else(|| APIError::ChannelUnavailable { + err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) + })?; + let mut peer_state = peer_state_mutex.lock().unwrap(); + let initiator = match peer_state.channel_by_id.entry(*channel_id) { + hash_map::Entry::Occupied(mut chan_entry) => { + if let Some(chan) = chan_entry.get_mut().as_funded_mut() { + chan.exit_quiescence() + } else { + return Err(APIError::APIMisuseError { + err: format!("Unfunded channel {} cannot be quiescent", channel_id), + }) + } + }, + hash_map::Entry::Vacant(_) => return Err(APIError::ChannelUnavailable { + err: format!("Channel with id {} not found for the passed counterparty node_id {}", + channel_id, counterparty_node_id), + }), + }; + Ok(initiator) + } + /// Utility for creating a BOLT11 invoice that can be verified by [`ChannelManager`] without /// storing any additional state. It achieves this by including a [`PaymentSecret`] in the /// invoice which it uses to verify that the invoice has not expired and the payment amount is @@ -10760,6 +10938,9 @@ where result = NotifyOption::DoPersist; } + // Quiescence is an in-memory protocol, so we don't have to persist because of it. + self.maybe_advance_quiescence_handshake(); + let mut is_any_peer_connected = false; let mut pending_events = Vec::new(); let per_peer_state = self.per_peer_state.read().unwrap(); @@ -11370,9 +11551,20 @@ where } fn handle_stfu(&self, counterparty_node_id: PublicKey, msg: &msgs::Stfu) { - let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( - "Quiescence not supported".to_owned(), - msg.channel_id.clone())), counterparty_node_id); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || { + let res = self.internal_stfu(&counterparty_node_id, msg); + let persist = match &res { + Err(e) if e.closes_channel() => NotifyOption::DoPersist, + Err(_) => NotifyOption::SkipPersistHandleEvents, + Ok(sent_stfu) => if *sent_stfu { + NotifyOption::SkipPersistHandleEvents + } else { + NotifyOption::SkipPersistNoEvents + }, + }; + let _ = handle_error!(self, res, counterparty_node_id); + persist + }); } #[cfg(splicing)] @@ -12353,6 +12545,10 @@ pub fn provided_init_features(config: &UserConfig) -> InitFeatures { } #[cfg(dual_funding)] features.set_dual_fund_optional(); + // Only signal quiescence support in tests for now, as we don't yet support any + // quiescent-dependent protocols (e.g., splicing). + #[cfg(any(test, fuzzing))] + features.set_quiescence_optional(); features } diff --git a/lightning/src/ln/mod.rs b/lightning/src/ln/mod.rs index 201f2ecfeff..b55ae8ac5a2 100644 --- a/lightning/src/ln/mod.rs +++ b/lightning/src/ln/mod.rs @@ -89,6 +89,8 @@ mod monitor_tests; #[allow(unused_mut)] mod shutdown_tests; #[cfg(test)] +mod quiescence_tests; +#[cfg(test)] #[allow(unused_mut)] mod async_signer_tests; #[cfg(test)] diff --git a/lightning/src/ln/msgs.rs b/lightning/src/ln/msgs.rs index 659ec65f6cf..5137468ff43 100644 --- a/lightning/src/ln/msgs.rs +++ b/lightning/src/ln/msgs.rs @@ -454,8 +454,8 @@ pub type SerialId = u64; pub struct Stfu { /// The channel ID where quiescence is intended pub channel_id: ChannelId, - /// Initiator flag, 1 if initiating, 0 if replying to an stfu. - pub initiator: u8, + /// Initiator flag, true if initiating, false if replying to an stfu. + pub initiator: bool, } /// A `splice_init` message to be sent by or received from the stfu initiator (splice initiator). @@ -4053,10 +4053,17 @@ mod tests { fn encoding_stfu() { let stfu = msgs::Stfu { channel_id: ChannelId::from_bytes([2; 32]), - initiator: 1, + initiator: true, }; let encoded_value = stfu.encode(); assert_eq!(encoded_value.as_hex().to_string(), "020202020202020202020202020202020202020202020202020202020202020201"); + + let stfu = msgs::Stfu { + channel_id: ChannelId::from_bytes([3; 32]), + initiator: false, + }; + let encoded_value = stfu.encode(); + assert_eq!(encoded_value.as_hex().to_string(), "030303030303030303030303030303030303030303030303030303030303030300"); } #[test] diff --git a/lightning/src/ln/quiescence_tests.rs b/lightning/src/ln/quiescence_tests.rs new file mode 100644 index 00000000000..3fb22bd0591 --- /dev/null +++ b/lightning/src/ln/quiescence_tests.rs @@ -0,0 +1,461 @@ +use crate::chain::ChannelMonitorUpdateStatus; +use crate::events::Event; +use crate::events::HTLCDestination; +use crate::events::MessageSendEvent; +use crate::events::MessageSendEventsProvider; +use crate::ln::channel::DISCONNECT_PEER_AWAITING_RESPONSE_TICKS; +use crate::ln::channelmanager::PaymentId; +use crate::ln::channelmanager::RecipientOnionFields; +use crate::ln::functional_test_utils::*; +use crate::ln::msgs; +use crate::ln::msgs::{ChannelMessageHandler, ErrorAction}; +use crate::util::errors::APIError; + +#[test] +fn test_quiescence_tie() { + // Test that both nodes proposing quiescence at the same time results in the channel funder + // becoming the quiescence initiator. + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2; + + nodes[0].node.maybe_propose_quiescence(&nodes[1].node.get_our_node_id(), &chan_id).unwrap(); + nodes[1].node.maybe_propose_quiescence(&nodes[0].node.get_our_node_id(), &chan_id).unwrap(); + + let stfu_node_0 = + get_event_msg!(nodes[0], MessageSendEvent::SendStfu, nodes[1].node.get_our_node_id()); + nodes[1].node.handle_stfu(nodes[0].node.get_our_node_id(), &stfu_node_0); + + let stfu_node_1 = + get_event_msg!(nodes[1], MessageSendEvent::SendStfu, nodes[0].node.get_our_node_id()); + nodes[0].node.handle_stfu(nodes[1].node.get_our_node_id(), &stfu_node_1); + + assert!(stfu_node_0.initiator && stfu_node_1.initiator); + + assert!(nodes[0].node.exit_quiescence(&nodes[1].node.get_our_node_id(), &chan_id).unwrap()); + assert!(!nodes[1].node.exit_quiescence(&nodes[0].node.get_our_node_id(), &chan_id).unwrap()); +} + +#[test] +fn test_quiescence_shutdown_ignored() { + // Test that a shutdown sent/received during quiescence is ignored. + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2; + + nodes[0].node.maybe_propose_quiescence(&nodes[1].node.get_our_node_id(), &chan_id).unwrap(); + let _ = get_event_msg!(nodes[0], MessageSendEvent::SendStfu, nodes[1].node.get_our_node_id()); + + if let Err(e) = nodes[0].node.close_channel(&chan_id, &nodes[1].node.get_our_node_id()) { + assert_eq!( + e, + APIError::APIMisuseError { err: "Cannot begin shutdown while quiescent".to_owned() } + ); + } else { + panic!("Expected shutdown to be ignored while quiescent"); + } + + nodes[1].node.close_channel(&chan_id, &nodes[0].node.get_our_node_id()).unwrap(); + let shutdown = + get_event_msg!(nodes[1], MessageSendEvent::SendShutdown, nodes[0].node.get_our_node_id()); + + nodes[0].node.handle_shutdown(nodes[1].node.get_our_node_id(), &shutdown); + let msg_events = nodes[0].node.get_and_clear_pending_msg_events(); + match msg_events[0] { + MessageSendEvent::HandleError { + action: ErrorAction::SendWarningMessage { ref msg, .. }, + .. + } => { + assert_eq!(msg.data, "Got shutdown request while quiescent".to_owned()); + }, + _ => panic!(), + } +} + +#[test] +fn test_allow_shutdown_while_awaiting_quiescence() { + allow_shutdown_while_awaiting_quiescence(false); + allow_shutdown_while_awaiting_quiescence(true); +} + +fn allow_shutdown_while_awaiting_quiescence(local_shutdown: bool) { + // Test that a shutdown sent/received while we're still awaiting quiescence (stfu has not been + // sent yet) is honored and the channel is closed cooperatively. + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2; + + let local_node = &nodes[0]; + let remote_node = &nodes[1]; + let local_node_id = local_node.node.get_our_node_id(); + let remote_node_id = remote_node.node.get_our_node_id(); + + let payment_amount = 1_000_000; + let (route, payment_hash, _, payment_secret) = + get_route_and_payment_hash!(local_node, remote_node, payment_amount); + let onion = RecipientOnionFields::secret_only(payment_secret); + let payment_id = PaymentId(payment_hash.0); + local_node.node.send_payment_with_route(route, payment_hash, onion, payment_id).unwrap(); + check_added_monitors!(local_node, 1); + + // Attempt to send an HTLC, but don't fully commit it yet. + let update_add = get_htlc_update_msgs!(local_node, remote_node_id); + nodes[1].node.handle_update_add_htlc(local_node_id, &update_add.update_add_htlcs[0]); + nodes[1].node.handle_commitment_signed(local_node_id, &update_add.commitment_signed); + let (revoke_and_ack, commit_sig) = get_revoke_commit_msgs!(remote_node, local_node_id); + nodes[0].node.handle_revoke_and_ack(remote_node_id, &revoke_and_ack); + check_added_monitors(local_node, 1); + + // Request the local node to propose quiescence, and immediately try to close the channel. Since + // we haven't sent `stfu` yet as the state machine is pending, we should forget about our + // quiescence attempt. + local_node.node.maybe_propose_quiescence(&remote_node_id, &chan_id).unwrap(); + assert!(local_node.node.get_and_clear_pending_msg_events().is_empty()); + + let (closer_node, closee_node) = + if local_shutdown { (local_node, remote_node) } else { (remote_node, local_node) }; + let closer_node_id = closer_node.node.get_our_node_id(); + let closee_node_id = closee_node.node.get_our_node_id(); + + closer_node.node.close_channel(&chan_id, &closee_node_id).unwrap(); + check_added_monitors(&nodes[1], 1); + let shutdown_initiator = + get_event_msg!(closer_node, MessageSendEvent::SendShutdown, closee_node_id); + closee_node.node.handle_shutdown(closer_node_id, &shutdown_initiator); + let shutdown_responder = + get_event_msg!(closee_node, MessageSendEvent::SendShutdown, closer_node_id); + closer_node.node.handle_shutdown(closee_node_id, &shutdown_responder); + + // Continue exchanging messages until the HTLC is irrevocably committed and eventually failed + // back as we are shutting down. + local_node.node.handle_commitment_signed(remote_node_id, &commit_sig); + check_added_monitors(local_node, 1); + + let last_revoke_and_ack = + get_event_msg!(local_node, MessageSendEvent::SendRevokeAndACK, remote_node_id); + remote_node.node.handle_revoke_and_ack(local_node_id, &last_revoke_and_ack); + check_added_monitors(remote_node, 1); + expect_pending_htlcs_forwardable!(remote_node); + expect_htlc_handling_failed_destinations!( + remote_node.node.get_and_clear_pending_events(), + &[HTLCDestination::FailedPayment { payment_hash }] + ); + check_added_monitors(remote_node, 1); + + let update_fail = get_htlc_update_msgs!(remote_node, local_node_id); + local_node.node.handle_update_fail_htlc(remote_node_id, &update_fail.update_fail_htlcs[0]); + local_node.node.handle_commitment_signed(remote_node_id, &update_fail.commitment_signed); + + let (revoke_and_ack, commit_sig) = get_revoke_commit_msgs!(local_node, remote_node_id); + remote_node.node.handle_revoke_and_ack(local_node_id, &revoke_and_ack); + check_added_monitors(remote_node, 1); + remote_node.node.handle_commitment_signed(local_node_id, &commit_sig); + check_added_monitors(remote_node, 1); + + let last_revoke_and_ack = + get_event_msg!(remote_node, MessageSendEvent::SendRevokeAndACK, local_node_id); + local_node.node.handle_revoke_and_ack(remote_node_id, &last_revoke_and_ack); + + expect_payment_failed_conditions( + local_node, + payment_hash, + true, + PaymentFailedConditions::new(), + ); + + // Now that the state machine is no longer pending, and `closing_signed` is ready to be sent, + // make sure we're still not waiting for the quiescence handshake to complete. + local_node.node.exit_quiescence(&remote_node_id, &chan_id).unwrap(); + + let _ = get_event_msg!(local_node, MessageSendEvent::SendClosingSigned, remote_node_id); + check_added_monitors(local_node, 2); // One for the last revoke_and_ack, another for closing_signed +} + +#[test] +fn test_quiescence_waits_for_monitor_update_complete() { + // Test that quiescence will always wait for any pending monitor updates to complete, even if + // when there are no updates pending on either side. + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2; + + let node_id_0 = nodes[0].node.get_our_node_id(); + let node_id_1 = nodes[1].node.get_our_node_id(); + + let payment_amount = 1_000_000; + let (preimage, payment_hash, ..) = route_payment(&nodes[0], &[&nodes[1]], payment_amount); + nodes[1].node.claim_funds(preimage); + check_added_monitors(&nodes[1], 1); + + let update = get_htlc_update_msgs!(&nodes[1], node_id_0); + nodes[0].node.handle_update_fulfill_htlc(node_id_1, &update.update_fulfill_htlcs[0]); + nodes[0].node.handle_commitment_signed(node_id_1, &update.commitment_signed); + check_added_monitors(&nodes[0], 1); + + // While settling back the payment, propose quiescence from nodes[1]. We won't see its `stfu` go + // out yet as the `update_fulfill` is still pending on both sides. + nodes[1].node.maybe_propose_quiescence(&node_id_0, &chan_id).unwrap(); + + // Drive the state machine forward until nodes[1] is ready to send out `stfu`. + let (revoke_and_ack, commit_sig) = get_revoke_commit_msgs!(&nodes[0], node_id_1); + nodes[1].node.handle_revoke_and_ack(node_id_0, &revoke_and_ack); + check_added_monitors(&nodes[1], 1); + nodes[1].node.handle_commitment_signed(node_id_0, &commit_sig); + check_added_monitors(&nodes[1], 1); + + macro_rules! find_msg { + ($events: expr, $msg: ident) => {{ + $events + .iter() + .find_map(|event| { + if let MessageSendEvent::$msg { ref msg, .. } = event { + Some(msg) + } else { + None + } + }) + .unwrap() + }}; + } + let msg_events = nodes[1].node.get_and_clear_pending_msg_events(); + assert_eq!(msg_events.len(), 2); + let revoke_and_ack = find_msg!(msg_events, SendRevokeAndACK); + let stfu = find_msg!(msg_events, SendStfu); + + // While handling the last `revoke_and_ack`, we'll hold the monitor update. + chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + nodes[0].node.handle_revoke_and_ack(node_id_1, &revoke_and_ack); + + // Receiving `stfu` while a monitor update is in progress will stash the message to re-process + // once the monitor update completes. + nodes[0].node.handle_stfu(node_id_1, &stfu); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); + + nodes[0].chain_monitor.complete_sole_pending_chan_update(&chan_id); + + let stfu = get_event_msg!(&nodes[0], MessageSendEvent::SendStfu, node_id_1); + nodes[1].node.handle_stfu(node_id_0, &stfu); + + nodes[0].node.exit_quiescence(&node_id_1, &chan_id).unwrap(); + nodes[1].node.exit_quiescence(&node_id_0, &chan_id).unwrap(); + + expect_payment_sent(&nodes[0], preimage, None, true, true); + expect_payment_claimed!(&nodes[1], payment_hash, payment_amount); +} + +#[test] +fn test_quiescence_updates_go_to_holding_cell() { + quiescence_updates_go_to_holding_cell(false); + quiescence_updates_go_to_holding_cell(true); +} + +fn quiescence_updates_go_to_holding_cell(fail_htlc: bool) { + // Test that any updates made to a channel while quiescent go to the holding cell. + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2; + + let node_id_0 = nodes[0].node.get_our_node_id(); + let node_id_1 = nodes[1].node.get_our_node_id(); + + // Send enough to be able to pay from both directions. + let payment_amount = 1_000_000; + send_payment(&nodes[0], &[&nodes[1]], payment_amount * 4); + + // Propose quiescence from nodes[1], and immediately try to send a payment. Since its `stfu` has + // already gone out first, the outbound HTLC will go into the holding cell. + nodes[1].node.maybe_propose_quiescence(&node_id_0, &chan_id).unwrap(); + let stfu = get_event_msg!(&nodes[1], MessageSendEvent::SendStfu, node_id_0); + + let (route1, payment_hash1, payment_preimage1, payment_secret1) = + get_route_and_payment_hash!(&nodes[1], &nodes[0], payment_amount); + let onion1 = RecipientOnionFields::secret_only(payment_secret1); + let payment_id1 = PaymentId(payment_hash1.0); + nodes[1].node.send_payment_with_route(route1, payment_hash1, onion1, payment_id1).unwrap(); + check_added_monitors!(&nodes[1], 0); + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + + // Send a payment in the opposite direction. Since nodes[0] hasn't sent its own `stfu` yet, it's + // allowed to make updates. + let (route2, payment_hash2, payment_preimage2, payment_secret2) = + get_route_and_payment_hash!(&nodes[0], &nodes[1], payment_amount); + let onion2 = RecipientOnionFields::secret_only(payment_secret2); + let payment_id2 = PaymentId(payment_hash2.0); + nodes[0].node.send_payment_with_route(route2, payment_hash2, onion2, payment_id2).unwrap(); + check_added_monitors!(&nodes[0], 1); + + let update_add = get_htlc_update_msgs!(&nodes[0], node_id_1); + nodes[1].node.handle_update_add_htlc(node_id_0, &update_add.update_add_htlcs[0]); + commitment_signed_dance!(&nodes[1], &nodes[0], update_add.commitment_signed, false); + expect_pending_htlcs_forwardable!(&nodes[1]); + expect_payment_claimable!(nodes[1], payment_hash2, payment_secret2, payment_amount); + + // Have nodes[1] attempt to fail/claim nodes[0]'s payment. Since nodes[1] already sent out + // `stfu`, the `update_fail/fulfill` will go into the holding cell. + if fail_htlc { + nodes[1].node.fail_htlc_backwards(&payment_hash2); + let failed_payment = HTLCDestination::FailedPayment { payment_hash: payment_hash2 }; + expect_pending_htlcs_forwardable_and_htlc_handling_failed!(&nodes[1], vec![failed_payment]); + } else { + nodes[1].node.claim_funds(payment_preimage2); + check_added_monitors(&nodes[1], 1); + } + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + + // Finish the quiescence handshake. + nodes[0].node.handle_stfu(node_id_1, &stfu); + let stfu = get_event_msg!(&nodes[0], MessageSendEvent::SendStfu, node_id_1); + nodes[1].node.handle_stfu(node_id_0, &stfu); + + nodes[0].node.exit_quiescence(&node_id_1, &chan_id).unwrap(); + nodes[1].node.exit_quiescence(&node_id_0, &chan_id).unwrap(); + + // Now that quiescence is over, nodes are allowed to make updates again. nodes[1] will have its + // outbound HTLC finally go out, along with the fail/claim of nodes[0]'s payment. + let update = get_htlc_update_msgs!(&nodes[1], node_id_0); + check_added_monitors(&nodes[1], 1); + nodes[0].node.handle_update_add_htlc(node_id_1, &update.update_add_htlcs[0]); + if fail_htlc { + nodes[0].node.handle_update_fail_htlc(node_id_1, &update.update_fail_htlcs[0]); + } else { + nodes[0].node.handle_update_fulfill_htlc(node_id_1, &update.update_fulfill_htlcs[0]); + } + commitment_signed_dance!(&nodes[0], &nodes[1], update.commitment_signed, false); + + if !fail_htlc { + expect_payment_claimed!(nodes[1], payment_hash2, payment_amount); + } + + let events = nodes[0].node.get_and_clear_pending_events(); + assert_eq!(events.len(), 3); + assert!(events.iter().find(|e| matches!(e, Event::PendingHTLCsForwardable { .. })).is_some()); + if fail_htlc { + assert!(events.iter().find(|e| matches!(e, Event::PaymentFailed { .. })).is_some()); + assert!(events.iter().find(|e| matches!(e, Event::PaymentPathFailed { .. })).is_some()); + } else { + assert!(events.iter().find(|e| matches!(e, Event::PaymentSent { .. })).is_some()); + assert!(events.iter().find(|e| matches!(e, Event::PaymentPathSuccessful { .. })).is_some()); + check_added_monitors(&nodes[0], 1); + } + nodes[0].node.process_pending_htlc_forwards(); + expect_payment_claimable!(nodes[0], payment_hash1, payment_secret1, payment_amount); + + if fail_htlc { + nodes[0].node.fail_htlc_backwards(&payment_hash1); + let failed_payment = HTLCDestination::FailedPayment { payment_hash: payment_hash1 }; + expect_pending_htlcs_forwardable_and_htlc_handling_failed!(&nodes[0], vec![failed_payment]); + } else { + nodes[0].node.claim_funds(payment_preimage1); + } + check_added_monitors(&nodes[0], 1); + + let update = get_htlc_update_msgs!(&nodes[0], node_id_1); + if fail_htlc { + nodes[1].node.handle_update_fail_htlc(node_id_0, &update.update_fail_htlcs[0]); + } else { + nodes[1].node.handle_update_fulfill_htlc(node_id_0, &update.update_fulfill_htlcs[0]); + } + commitment_signed_dance!(&nodes[1], &nodes[0], update.commitment_signed, false); + + if fail_htlc { + let conditions = PaymentFailedConditions::new(); + expect_payment_failed_conditions(&nodes[1], payment_hash1, true, conditions); + } else { + expect_payment_claimed!(nodes[0], payment_hash1, payment_amount); + expect_payment_sent(&nodes[1], payment_preimage1, None, true, true); + } +} + +#[test] +fn test_quiescence_timeout() { + // Test that we'll disconnect if we remain quiescent for `DISCONNECT_PEER_AWAITING_RESPONSE_TICKS`. + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2; + + let node_id_0 = nodes[0].node.get_our_node_id(); + let node_id_1 = nodes[1].node.get_our_node_id(); + + nodes[0].node.maybe_propose_quiescence(&nodes[1].node.get_our_node_id(), &chan_id).unwrap(); + + let stfu_initiator = get_event_msg!(nodes[0], MessageSendEvent::SendStfu, node_id_1); + nodes[1].node.handle_stfu(node_id_0, &stfu_initiator); + + let stfu_responder = get_event_msg!(nodes[1], MessageSendEvent::SendStfu, node_id_0); + nodes[0].node.handle_stfu(node_id_1, &stfu_responder); + + assert!(stfu_initiator.initiator && !stfu_responder.initiator); + + for _ in 0..DISCONNECT_PEER_AWAITING_RESPONSE_TICKS { + nodes[0].node.timer_tick_occurred(); + nodes[1].node.timer_tick_occurred(); + } + + let f = |event| { + if let MessageSendEvent::HandleError { action, .. } = event { + if let msgs::ErrorAction::DisconnectPeerWithWarning { .. } = action { + Some(()) + } else { + None + } + } else { + None + } + }; + assert!(nodes[0].node.get_and_clear_pending_msg_events().into_iter().find_map(f).is_some()); + assert!(nodes[1].node.get_and_clear_pending_msg_events().into_iter().find_map(f).is_some()); +} + +#[test] +fn test_quiescence_timeout_while_waiting_for_counterparty_stfu() { + // Test that we'll disconnect if the counterparty does not send their stfu within a reasonable + // time if we've already sent ours. + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2; + + let node_id_0 = nodes[0].node.get_our_node_id(); + + nodes[1].node.maybe_propose_quiescence(&node_id_0, &chan_id).unwrap(); + let _ = get_event_msg!(nodes[1], MessageSendEvent::SendStfu, node_id_0); + + // Route a payment in between to ensure expecting to receive `revoke_and_ack` doesn't override + // the expectation of receiving `stfu` as well. + let _ = route_payment(&nodes[0], &[&nodes[1]], 1_000_000); + + for _ in 0..DISCONNECT_PEER_AWAITING_RESPONSE_TICKS { + nodes[0].node.timer_tick_occurred(); + nodes[1].node.timer_tick_occurred(); + } + + // nodes[0] hasn't received stfu from nodes[1], so it's not enforcing any timeouts. + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); + + // nodes[1] didn't receive nodes[0]'s stfu within the timeout so it'll disconnect. + let f = |&ref event| { + if let MessageSendEvent::HandleError { action, .. } = event { + if let msgs::ErrorAction::DisconnectPeerWithWarning { .. } = action { + Some(()) + } else { + None + } + } else { + None + } + }; + assert!(nodes[1].node.get_and_clear_pending_msg_events().iter().find_map(f).is_some()); +}