From b92d1a52d98ce753a8fd44a54fb93caf0514f949 Mon Sep 17 00:00:00 2001 From: Harshit Verma Date: Tue, 13 Aug 2024 08:28:39 +0530 Subject: [PATCH] fix: Updates tests --- Cargo.lock | 1 + lampo-common/Cargo.toml | 1 + lampo-common/src/conf.rs | 13 ++ lampo-common/src/event.rs | 4 +- lampo-common/src/event/liquidity.rs | 56 ++++++ lampo-testing/src/lib.rs | 20 +- lampod/src/actions/handler.rs | 27 ++- lampod/src/lib.rs | 102 +++++----- lampod/src/ln/channel_manager.rs | 6 +- lampod/src/ln/liquidity.rs | 297 ++++++++++++++-------------- lampod/src/ln/peer_manager.rs | 1 - tests/tests/src/lampo_tests.rs | 184 +++++++++++++++-- 12 files changed, 482 insertions(+), 230 deletions(-) create mode 100644 lampo-common/src/event/liquidity.rs diff --git a/Cargo.lock b/Cargo.lock index 9687a290..f26fe1d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -856,6 +856,7 @@ dependencies = [ "lightning-background-processor", "lightning-block-sync", "lightning-invoice", + "lightning-liquidity", "lightning-net-tokio", "lightning-persister", "lightning-rapid-gossip-sync", diff --git a/lampo-common/Cargo.toml b/lampo-common/Cargo.toml index ce45f740..77a7bf72 100644 --- a/lampo-common/Cargo.toml +++ b/lampo-common/Cargo.toml @@ -23,3 +23,4 @@ chrono = { version = "0.4", features = ["std", "clock"], default-features = fals serde_json = "1.0" serde = "1.0" hex = "0.4.3" +lightning-liquidity = "0.1.0-alpha.4" diff --git a/lampo-common/src/conf.rs b/lampo-common/src/conf.rs index d036fbc2..9b3544ce 100644 --- a/lampo-common/src/conf.rs +++ b/lampo-common/src/conf.rs @@ -25,6 +25,8 @@ pub struct LampoConf { pub announce_addr: Option, // Should be something like liquidity=consumer, liquidity=provider or none pub liquidity: Option, + pub lsp_node_id: Option, + pub lsp_socket_addr: Option, } impl Default for LampoConf { @@ -54,6 +56,8 @@ impl Default for LampoConf { alias: None, announce_addr: None, liquidity: None, + lsp_node_id: None, + lsp_socket_addr: None, } } } @@ -123,6 +127,11 @@ impl LampoConf { let input_path = path; let path = Self::normalize_root_dir(&conf.root_path, conf.network); conf.root_path = path.clone(); + // Must be used when we act as a liquidity provider + conf.ldk_conf = UserConfig { + accept_intercept_htlcs: true, + ..Default::default() + }; let lampo_file = format!("{}/lampo.conf", conf.path()); @@ -234,6 +243,8 @@ impl TryFrom for LampoConf { let alias = conf.get_conf("alias").unwrap_or(None); let announce_addr = conf.get_conf("announce-addr").unwrap_or(None); let liquidity = conf.get_conf("liquidity").unwrap_or(None); + let lsp_node_id = conf.get_conf("lsp-node-id").unwrap_or(None); + let lsp_socket_addr = conf.get_conf("lsp-socket-addr").unwrap_or(None); Ok(Self { inner: Some(conf), @@ -252,6 +263,8 @@ impl TryFrom for LampoConf { alias, announce_addr, liquidity, + lsp_node_id, + lsp_socket_addr, }) } } diff --git a/lampo-common/src/event.rs b/lampo-common/src/event.rs index d2c4b30a..f03083ef 100644 --- a/lampo-common/src/event.rs +++ b/lampo-common/src/event.rs @@ -1,4 +1,5 @@ //! Events commands +pub mod liquidity; pub mod ln; pub mod onchain; @@ -7,6 +8,7 @@ use std::sync::{Arc, Mutex}; use crate::chan; use crate::event::ln::LightningEvent; use crate::event::onchain::OnChainEvent; +use liquidity::LiquidityEvent; /// Publishes events to subscribers. #[derive(Clone)] @@ -64,6 +66,6 @@ impl Subscriber { pub enum Event { Lightning(LightningEvent), OnChain(OnChainEvent), - Liquidity(String), + Liquidity(LiquidityEvent), Inventory, } diff --git a/lampo-common/src/event/liquidity.rs b/lampo-common/src/event/liquidity.rs new file mode 100644 index 00000000..994b6347 --- /dev/null +++ b/lampo-common/src/event/liquidity.rs @@ -0,0 +1,56 @@ +use bitcoin::secp256k1::PublicKey; +use lightning::events::HTLCDestination; +use lightning::ln::{channelmanager::InterceptId, ChannelId, PaymentHash}; +use lightning_liquidity::{lsps0::ser::RequestId, lsps2::msgs::OpeningFeeParams}; + +#[derive(Debug, Clone)] +pub enum LiquidityEvent { + OpenParamsReady { + counterparty_node_id: PublicKey, + opening_fee_params_menu: Vec, + }, + InvoiceparamsReady { + counterparty_node_id: PublicKey, + intercept_scid: u64, + cltv_expiry_delta: u32, + }, + BuyRequest { + request_id: RequestId, + counterparty_node_id: PublicKey, + opening_fee_params: OpeningFeeParams, + payment_size_msat: Option, + }, + Geinfo { + request_id: RequestId, + counterparty_node_id: PublicKey, + token: Option, + }, + OpenChannel { + their_network_key: PublicKey, + amt_to_forward_msat: u64, + opening_fee_msat: u64, + user_channel_id: u128, + intercept_scid: u64, + }, + HTLCHandlingFailed { + prev_channel_id: ChannelId, + failed_next_destination: HTLCDestination, + }, + HTLCIntercepted { + intercept_id: InterceptId, + requested_next_hop_scid: u64, + payment_hash: PaymentHash, + inbound_amount_msat: u64, + expected_outbound_amount_msat: u64, + }, + PaymentForwarded { + prev_channel_id: Option, + next_channel_id: Option, + prev_user_channel_id: Option, + next_user_channel_id: Option, + total_fee_earned_msat: Option, + skimmed_fee_msat: Option, + claim_from_onchain_tx: bool, + outbound_amount_forwarded_msat: Option, + }, +} diff --git a/lampo-testing/src/lib.rs b/lampo-testing/src/lib.rs index a29642f8..ab03e393 100644 --- a/lampo-testing/src/lib.rs +++ b/lampo-testing/src/lib.rs @@ -65,12 +65,13 @@ macro_rules! wait { pub struct LampoTesting { inner: Arc, root_path: Arc, - liquidity: Option>, + liquidity: Option>, pub port: u64, pub wallet: Arc, pub mnemonic: String, pub btc: Arc, pub info: response::GetInfo, + pub lampod: Option>, } impl LampoTesting { @@ -147,12 +148,18 @@ impl LampoTesting { root_path: dir.into(), info, liquidity: None, + lampod: None, }) } // Use this for liquidity. // FIXME: Integrate this with new method - pub fn new_liquidity(btc: Arc, liquidity: String) -> error::Result { + pub fn new_liquidity( + btc: Arc, + liquidity: String, + lsp_node_id: Option, + lsp_socket_addr: Option, + ) -> error::Result { let dir = tempfile::tempdir()?; // SAFETY: this should be safe because if the system has no @@ -168,6 +175,8 @@ impl LampoTesting { let core_url = format!("127.0.0.1:{}", btc.port); if liquidity == "consumer" { lampo_conf.configure_as_liquidity_consumer(); + lampo_conf.lsp_node_id = lsp_node_id; + lampo_conf.lsp_socket_addr = lsp_socket_addr; } else { lampo_conf.configure_as_liquidity_provider(); } @@ -231,7 +240,8 @@ impl LampoTesting { btc, root_path: Arc::new(dir), info, - liquidity, + liquidity: Some(liquidity), + lampod: Some(lampod), }) } @@ -267,7 +277,7 @@ impl LampoTesting { self.root_path.clone() } - pub fn liquidity(&self) -> Option> { - self.liquidity.clone() + pub fn liquidity(&self) -> Arc { + self.liquidity.as_ref().unwrap().clone() } } diff --git a/lampod/src/actions/handler.rs b/lampod/src/actions/handler.rs index f1008590..7388398c 100644 --- a/lampod/src/actions/handler.rs +++ b/lampod/src/actions/handler.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use lampo_common::chan; use lampo_common::error; use lampo_common::error::Ok; +use lampo_common::event::liquidity::LiquidityEvent; use lampo_common::event::ln::LightningEvent; use lampo_common::event::{Emitter, Event, Subscriber}; use lampo_common::handler::Handler as EventHandler; @@ -33,9 +34,7 @@ pub struct LampoHandler { wallet_manager: Arc, chain_manager: Arc, external_handlers: RefCell>>, - // Question: How about Option>> - liquidity_manager: Option>, - // Do this inside liquidity + liquidity_manager: Option>, emitter: Emitter, subscriber: Subscriber, } @@ -146,7 +145,7 @@ impl Handler for LampoHandler { } => { let liquidity_manager = self.liquidity_manager.clone(); if let Some(ref liq_manager) = liquidity_manager { - let _ = liquidity_manager.unwrap().borrow().channel_ready(user_channel_id, &channel_id, &counterparty_node_id); + let _ = liquidity_manager.unwrap().channel_ready(user_channel_id, &channel_id, &counterparty_node_id); } log::info!("channel ready with node `{counterparty_node_id}`, and channel type {channel_type}"); self.emit(Event::Lightning(LightningEvent::ChannelReady { @@ -294,21 +293,29 @@ impl Handler for LampoHandler { Ok(()) }, ldk::events::Event::HTLCIntercepted { intercept_id, requested_next_hop_scid, payment_hash, inbound_amount_msat, expected_outbound_amount_msat } => { - // TODO: Emit this event log::info!("Intecepted an HTLC"); let liquidity_manager = self.liquidity_manager.clone(); - if liquidity_manager.is_some() { - liquidity_manager.unwrap().borrow().htlc_intercepted(requested_next_hop_scid, intercept_id, expected_outbound_amount_msat, payment_hash)?; + if let Some(ref liq_manager) = liquidity_manager { + let _ = liquidity_manager.unwrap().htlc_intercepted(requested_next_hop_scid, intercept_id, inbound_amount_msat, payment_hash); } - + let event = LiquidityEvent::HTLCIntercepted { intercept_id, requested_next_hop_scid, payment_hash, inbound_amount_msat, expected_outbound_amount_msat }; + self.emit(Event::Liquidity(event)); + log::info!("Successfully emitted the Intercepted event!"); Ok(()) }, ldk::events::Event::HTLCHandlingFailed { prev_channel_id, failed_next_destination } => { - todo!() + log::info!("HTLC Handling failed"); + let event = LiquidityEvent::HTLCHandlingFailed { prev_channel_id, failed_next_destination }; + self.emit(Event::Liquidity(event)); + Ok(()) } ldk::events::Event::PaymentForwarded { prev_channel_id, next_channel_id, prev_user_channel_id, next_user_channel_id, total_fee_earned_msat, skimmed_fee_msat, claim_from_onchain_tx, outbound_amount_forwarded_msat } => { - todo!() + log::info!("Payment Forwarded"); + let event = LiquidityEvent::PaymentForwarded { prev_channel_id, next_channel_id, prev_user_channel_id, next_user_channel_id, total_fee_earned_msat, skimmed_fee_msat, claim_from_onchain_tx, outbound_amount_forwarded_msat }; + self.emit(Event::Liquidity(event)); + Ok(()) } + _ => Err(error::anyhow!("unexpected ldk event: {:?}", event)), } } diff --git a/lampod/src/lib.rs b/lampod/src/lib.rs index 4d08632b..8b8d965f 100644 --- a/lampod/src/lib.rs +++ b/lampod/src/lib.rs @@ -21,9 +21,12 @@ pub mod persistence; use std::cell::Cell; use std::cell::RefCell; +use std::str::FromStr; use std::sync::Arc; use std::thread::JoinHandle; +use lampo_common::ldk::ln::msgs::SocketAddress; +use lampo_common::secp256k1::PublicKey; use lightning_liquidity::lsps2::client::LSPS2ClientConfig; use lightning_liquidity::lsps2::service::LSPS2ServiceConfig; use lightning_liquidity::LiquidityClientConfig; @@ -74,7 +77,7 @@ pub struct LampoDaemon { persister: Arc, handler: Option>, process: Cell>, - liquidity: Option>, + liquidity: Option>, // FIXME: remove this rt: Runtime, } @@ -113,8 +116,8 @@ impl LampoDaemon { &self.conf } - pub fn liquidity(&self) -> Option> { - self.liquidity.clone() + pub fn liquidity(&self) -> Arc { + self.liquidity.clone().unwrap() } pub fn init_onchaind(&mut self, client: Arc) -> error::Result<()> { @@ -179,14 +182,32 @@ impl LampoDaemon { pub fn init_peer_manager(&mut self) -> error::Result<()> { log::debug!(target: "lampo", "init peer manager ..."); + let mut peer_manager; + let peer_liquidity; + + if let Some(liq) = &self.liquidity { + peer_liquidity = Some(Arc::clone(&self.liquidity.clone().unwrap())); + } else { + peer_liquidity = None; + } + peer_manager = LampoPeerManager::new(&self.conf, self.logger.clone(), peer_liquidity); + peer_manager.init( + self.onchain_manager(), + self.wallet_manager.clone(), + self.channel_manager(), + )?; + + self.peer_manager = Some(Arc::new(peer_manager)); + Ok(()) + } + + pub fn init_liquidity_manager(&mut self) -> error::Result<()> { let liquidity; let liquidity_manager; - // FIXME: check this somewhere else if let Some(liq) = &self.conf.liquidity { - // FIXME: This could probably be made into an enum if liq == "Consumer" { - log::info!("Acting as consumer"); + log::info!("Acting as LSP consumer"); liquidity = LampoLiquidity::new( self.offchain_manager().key_manager(), self.channel_manager().manager(), @@ -198,59 +219,48 @@ impl LampoDaemon { lsps2_client_config: Some(LSPS2ClientConfig {}), }), ); + + liquidity_manager = Some(LampoLiquidityManager::new_lsp_as_client( + Arc::new(liquidity), + self.conf.clone(), + self.channel_manager(), + self.offchain_manager().key_manager(), + PublicKey::from_str(&self.conf.lsp_node_id.clone().unwrap()) + .expect("Wrong node id"), + SocketAddress::from_str(&self.conf.lsp_socket_addr.clone().unwrap()) + .expect("Failed to parse socket address"), + )); } else if liq == "Provider" { - log::info!("Acring as a provider"); - // FIXME: Probably do something about this + log::info!("Acting as a provider"); let promise_secret: [u8; 32] = [0; 32]; liquidity = LampoLiquidity::new( self.offchain_manager().key_manager(), self.channel_manager().manager(), Some(self.onchain_manager()), None, - // Do something here Some(LiquidityServiceConfig { lsps2_service_config: Some(LSPS2ServiceConfig { promise_secret }), advertise_service: true, }), None, ); + liquidity_manager = Some(LampoLiquidityManager::new_lsp( + Arc::new(liquidity), + self.conf.clone(), + self.channel_manager(), + self.offchain_manager().key_manager(), + )); } else { error::bail!("Wrong config provided"); } - - liquidity_manager = Some(LampoLiquidityManager::new_lsp( - Arc::new(liquidity), - self.conf.clone(), - self.channel_manager(), - self.offchain_manager().key_manager(), - )); } else { liquidity_manager = None; } - let mut peer_manager; - let peer_liquidity; - - if let Some(ref liq) = liquidity_manager { - peer_liquidity = Some(Arc::new(liq.clone())); - } else { - peer_liquidity = None; + if let Some(liq) = liquidity_manager { + self.liquidity = Some(Arc::new(liq)); } - peer_manager = LampoPeerManager::new(&self.conf, self.logger.clone(), peer_liquidity); - if let Some(liq_manager) = liquidity_manager { - self.liquidity = Some(RefCell::new(liq_manager)); - } else { - self.liquidity = None; - } - - peer_manager.init( - self.onchain_manager(), - self.wallet_manager.clone(), - self.channel_manager(), - )?; - self.peer_manager = Some(Arc::new(peer_manager)); - log::info!("Exit peer manager"); Ok(()) } @@ -296,11 +306,15 @@ impl LampoDaemon { self.init_onchaind(client.clone())?; self.init_channeld()?; self.init_offchain_manager()?; + self.init_liquidity_manager()?; self.init_peer_manager()?; self.init_inventory_manager()?; self.init_event_handler()?; client.set_handler(self.handler()); self.channel_manager().set_handler(self.handler()); + if let Some(liq) = &self.liquidity { + self.liquidity().set_handler(self.handler()); + } Ok(()) } @@ -329,16 +343,14 @@ impl LampoDaemon { let handler = self.handler(); // Here is the event handled - if let Some(lsp) = self.liquidity() { - log::info!("Listening for liquidity events!"); - let liquidity = self.liquidity().unwrap(); + if let Some(liq) = &self.liquidity { + let res = liq.clone(); + log::info!("Listening for lsp events..."); std::thread::spawn(move || { async_run!(async move { - println!("We are inside thread! async_run"); - liquidity.borrow_mut().listen(); - // loop { - // log::info!("We are inside!"); - // } + loop { + let _ = &res.listen().await; + } }) }); } diff --git a/lampod/src/ln/channel_manager.rs b/lampod/src/ln/channel_manager.rs index 4f47e670..5c3d4234 100644 --- a/lampod/src/ln/channel_manager.rs +++ b/lampod/src/ln/channel_manager.rs @@ -58,7 +58,7 @@ pub type LampoArcChannelManager = ChannelManager< Arc, >; -// Requited inside LampoLiquidity +// Required inside LampoLiquidity pub type LampoChannel = LampoArcChannelManager; @@ -127,6 +127,10 @@ impl LampoChannelManager { self.handler.borrow().clone().unwrap() } + pub fn channeld(&self) -> Arc { + self.channeld.clone().unwrap() + } + pub fn listen(self: Arc) -> JoinHandle<()> { if self.is_restarting().unwrap() { self.resume_channels().unwrap(); diff --git a/lampod/src/ln/liquidity.rs b/lampod/src/ln/liquidity.rs index d9a0dd9c..0e9c4505 100644 --- a/lampod/src/ln/liquidity.rs +++ b/lampod/src/ln/liquidity.rs @@ -1,19 +1,16 @@ -use std::borrow::Borrow; use std::sync::Arc; use std::sync::Mutex; use std::time::Duration; use lampo_common::bitcoin::hashes::sha256; use lampo_common::bitcoin::hashes::Hash; -use lampo_common::chan; use lampo_common::chrono::DateTime; use lampo_common::chrono::Utc; use lampo_common::conf::LampoConf; use lampo_common::error; use lampo_common::error::Ok; -use lampo_common::event::Emitter; +use lampo_common::event::liquidity::LiquidityEvent; use lampo_common::event::Event; -use lampo_common::event::Subscriber; use lampo_common::handler::Handler; use lampo_common::keys::LampoKeysManager; use lampo_common::ldk::invoice::Bolt11Invoice; @@ -30,13 +27,13 @@ use lampo_common::secp256k1::PublicKey; use lampo_common::secp256k1::Secp256k1; use lightning_liquidity::events::Event as liquidity_events; -use lightning_liquidity::lsps0::ser::RequestId; use lightning_liquidity::lsps2::event::LSPS2ClientEvent; use lightning_liquidity::lsps2::event::LSPS2ServiceEvent; use lightning_liquidity::lsps2::msgs::OpeningFeeParams; use lightning_liquidity::lsps2::msgs::RawOpeningFeeParams; use lightning_liquidity::LiquidityManager; +use crate::actions::handler::LampoHandler; use crate::chain::LampoChainManager; use crate::ln::LampoChannel; use crate::ln::LampoChannelManager; @@ -46,29 +43,33 @@ use super::InnerLampoPeerManager; pub type LampoLiquidity = LiquidityManager, Arc, Arc>; -#[derive(Clone, Debug)] -pub struct LiquidityProvider { +#[derive(Clone)] +struct LSPManager { pub addr: SocketAddress, pub node_id: PublicKey, pub token: Option, - pub opening_params: Option>, - pub scid: Option, - pub ctlv_exiry: Option, } -#[derive(Clone)] +impl LSPManager { + pub fn new(addr: SocketAddress, node_id: PublicKey, token: Option) -> Self { + LSPManager { + addr, + node_id, + token, + } + } +} + pub struct LampoLiquidityManager { lampo_liquidity: Arc, lampo_conf: LampoConf, - // FIXME: How about Option>>? - lsp_provider: Arc>>, + // FIXME: Change this name + lsp_manager: Option, channel_manager: Arc, keys_manager: Arc, - emitter: Emitter, - subscriber: Subscriber, + handler: Mutex>>, } -// Maybe implement Emit event for this too? impl LampoLiquidityManager { pub fn new_lsp( liquidity: Arc, @@ -76,54 +77,37 @@ impl LampoLiquidityManager { channel_manager: Arc, keys_manager: Arc, ) -> Self { - let emitter = Emitter::default(); - let subscriber = emitter.subscriber(); Self { lampo_liquidity: liquidity, lampo_conf: conf, - lsp_provider: Arc::new(Mutex::new(None)), + lsp_manager: None, channel_manager, keys_manager, - emitter, - subscriber, + handler: Mutex::new(None), } } - // This should not be initiated when we open our node only when we - // provide some args inside cli or in lampo.conf - - pub fn has_some_provider(&self) -> Option { - self.lsp_provider.lock().unwrap().clone() - } - pub fn configure_as_liquidity_consumer( - &mut self, + pub fn new_lsp_as_client( + liquidity: Arc, + conf: LampoConf, + channel_manager: Arc, + keys_manager: Arc, node_id: PublicKey, - addr: SocketAddress, - token: Option, - ) -> error::Result<()> { - log::info!("Starting lampo-liquidity manager as a consumer!"); - if self.has_some_provider().is_none() { - println!("We are inside hqllo 1"); - let liquidity_provider = LiquidityProvider { - addr, - node_id, - token, - opening_params: None, - scid: None, - ctlv_exiry: None, - }; - - *self.lsp_provider.lock().unwrap() = Some(liquidity_provider); - let res = self.lsp_provider.lock().unwrap().clone(); - log::info!("This is the result liq prov: {:?}", res); + socket_addr: SocketAddress, + ) -> Self { + let lsp = LSPManager::new(socket_addr, node_id, None); + Self { + lampo_liquidity: liquidity, + lampo_conf: conf, + lsp_manager: Some(lsp), + channel_manager, + keys_manager, + handler: Mutex::new(None), } + } - println!( - "This is the object we are having after configure as liquidity consumer: {:?}", - self.lsp_provider.clone() - ); - - Ok(()) + fn has_some_provider(&self) -> Option { + self.lsp_manager.clone() } pub fn liquidity_manager(&self) -> Arc { @@ -166,47 +150,40 @@ impl LampoLiquidityManager { Ok(()) } - pub fn get_events(&self) -> Vec { - self.lampo_liquidity.get_and_clear_pending_events() - } - pub fn set_peer_manager(&self, peer_manager: Arc) { let process_msgs_callback = move || peer_manager.process_events(); self.liquidity_manager() .set_process_msgs_callback(process_msgs_callback); } + pub fn set_handler(&self, handler: Arc) { + *self.handler.lock().unwrap() = Some(handler); + } + // getinfo(server) -> OpeningParamsReady(client) -> BuyRequest(client) -> InvoiceParametersReady(client) -> OpenChannel(server) - pub async fn listen(&mut self) -> error::Result<()> { - log::info!("GOT AN EVENT!!!"); + pub async fn listen(&self) -> error::Result<()> { match self.lampo_liquidity.next_event_async().await { - liquidity_events::LSPS0Client(..) => unimplemented!(), + // Get the opening_fee_params_menu from here and should be inside the event emitted. liquidity_events::LSPS2Client(LSPS2ClientEvent::OpeningParametersReady { counterparty_node_id, opening_fee_params_menu, .. }) => { - log::info!("Received a opening_params ready event!"); - // let res = self.get_lsp_provider().node_id; + log::info!( + "Received a opening_params ready event: {:?}", + opening_fee_params_menu + ); if &self.get_lsp_provider().node_id != &counterparty_node_id { error::bail!("Recieved Unknown OpeningParametersReady event"); } + self.handler() + .emit(Event::Liquidity(LiquidityEvent::OpenParamsReady { + counterparty_node_id, + opening_fee_params_menu, + })); - // TODO: Handle this in a better way as we can get new opening_params from a - // LSP if it fails to responds within a certain time - if self.get_lsp_provider().opening_params.is_some() { - error::bail!("We already have some params inside lsp_provider"); - } + log::info!("Emitted!"); - self.lsp_provider - .lock() - .unwrap() - .clone() - .unwrap() - .opening_params = Some(opening_fee_params_menu); - self.emit(Event::Liquidity( - "Got a openingparamsready event".to_string(), - )); Ok(()) } liquidity_events::LSPS2Client(LSPS2ClientEvent::InvoiceParametersReady { @@ -215,18 +192,16 @@ impl LampoLiquidityManager { cltv_expiry_delta, .. }) => { + log::info!("Received a invoice params ready event"); if counterparty_node_id != self.get_lsp_provider().node_id { error::bail!("Unknown lsp"); } - - // We will take the intercept_scid and cltv_expiry_delta from here and - // generate an invoice from these params - self.get_lsp_provider().ctlv_exiry = Some(cltv_expiry_delta); - self.get_lsp_provider().scid = Some(intercept_scid); - self.emit(Event::Liquidity( - "Got a invoiceparamsReady event".to_string(), - )); - + self.handler() + .emit(Event::Liquidity(LiquidityEvent::InvoiceparamsReady { + counterparty_node_id, + intercept_scid, + cltv_expiry_delta, + })); Ok(()) } liquidity_events::LSPS2Service(LSPS2ServiceEvent::GetInfo { @@ -261,19 +236,27 @@ impl LampoLiquidityManager { service_handler .opening_fee_params_generated( &counterparty_node_id, - request_id, + request_id.clone(), opening_fee_params_menu, ) .map_err(|e| error::anyhow!("Error : {:?}", e))?; + self.handler() + .emit(Event::Liquidity(LiquidityEvent::Geinfo { + request_id, + counterparty_node_id, + token, + })); + Ok(()) } liquidity_events::LSPS2Service(LSPS2ServiceEvent::BuyRequest { request_id, counterparty_node_id, - opening_fee_params: _, - payment_size_msat: _, + opening_fee_params, + payment_size_msat, }) => { + log::info!("Buy Request event received!"); let user_channel_id = 0; let scid = self .channel_manager @@ -289,7 +272,7 @@ impl LampoLiquidityManager { lsps2_service_handler .invoice_parameters_generated( &counterparty_node_id, - request_id, + request_id.clone(), scid, cltv_expiry_delta, client_trusts_lsp, @@ -297,6 +280,14 @@ impl LampoLiquidityManager { ) .map_err(|e| error::anyhow!("Error occured: {:?}", e))?; + self.handler() + .emit(Event::Liquidity(LiquidityEvent::BuyRequest { + request_id, + counterparty_node_id, + opening_fee_params, + payment_size_msat, + })); + Ok(()) } liquidity_events::LSPS2Service(LSPS2ServiceEvent::OpenChannel { @@ -306,6 +297,7 @@ impl LampoLiquidityManager { user_channel_id, intercept_scid, }) => { + log::info!("Open Channel request received"); let channel_size_sats = (amt_to_forward_msat / 1000) * 4; let mut config = self.lampo_conf.ldk_conf; config @@ -314,11 +306,10 @@ impl LampoLiquidityManager { config.channel_config.forwarding_fee_base_msat = 0; config.channel_config.forwarding_fee_proportional_millionths = 0; - // TODO(Harshit): Make a different function to get channeld - self.channel_manager - .channeld - .as_ref() - .unwrap() + // Some error here + let res = self + .channel_manager + .channeld() .create_channel( their_network_key, channel_size_sats, @@ -329,20 +320,28 @@ impl LampoLiquidityManager { ) .map_err(|e| error::anyhow!("Error occured: {:?}", e))?; + self.handler() + .emit(Event::Liquidity(LiquidityEvent::OpenChannel { + their_network_key, + amt_to_forward_msat, + opening_fee_msat, + user_channel_id, + intercept_scid, + })); + Ok(()) } + _ => error::bail!("Wrong event received"), } } - fn client_request_opening_params(&self) -> error::Result { + fn client_request_opening_params(&self) -> error::Result> { let provider = self.has_some_provider().clone(); - println!("This is the object before is_none call : {:?}", provider); if provider.is_none() { error::bail!("LSP provider not configured") } - - let node_id = provider.clone().unwrap().node_id; - let token = provider.unwrap().token; + let node_id = self.lsp_manager.clone().unwrap().node_id; + let token = None; let res = self .lampo_liquidity .lsps2_client_handler() @@ -351,23 +350,30 @@ impl LampoLiquidityManager { log::info!("This is the request_id: {:?}", res); - loop { - let event = self.events().recv_timeout(Duration::from_secs(30)).unwrap(); - if let Event::Liquidity(str) = event { - log::info!("We got an liquidity event!"); - return Ok(res); - } else { - error::bail!("Wrong event received") + let result = loop { + let events = self.handler().events(); + let event = events.recv_timeout(std::time::Duration::from_secs(30))?; + + if let Event::Liquidity(LiquidityEvent::OpenParamsReady { + counterparty_node_id, + opening_fee_params_menu, + }) = event + { + break Some(opening_fee_params_menu); } - } + }; - Ok(res) + Ok(result.unwrap()) } // Select the best fee_param from a list of fee_param given by the lsp provider // and then forward the request to the LSP for invoice generation // This will respond in InvoiceParametersReady event - fn buy_request(&self, best_fee_param: OpeningFeeParams, amount_msat: u64) -> error::Result<()> { + fn buy_request( + &self, + best_fee_param: OpeningFeeParams, + amount_msat: u64, + ) -> error::Result<(u64, u32)> { let node_id = self.get_lsp_provider().node_id; self.lampo_liquidity .lsps2_client_handler() @@ -375,54 +381,53 @@ impl LampoLiquidityManager { .select_opening_params(node_id, Some(amount_msat), best_fee_param) .map_err(|err| error::anyhow!("Error Occured : {:?}", err))?; - let _ = tokio::time::sleep(Duration::from_secs(10)); - Ok(()) + let result = loop { + let events = self.handler().events(); + let event = events.recv_timeout(std::time::Duration::from_secs(30))?; + + if let Event::Liquidity(LiquidityEvent::InvoiceparamsReady { + counterparty_node_id, + intercept_scid, + cltv_expiry_delta, + }) = event + { + break (intercept_scid, cltv_expiry_delta); + } + }; + + Ok(result) } - pub fn create_a_jit_channel( + pub fn create_jit_invoice( &self, amount_msat: u64, description: String, ) -> error::Result { - self.client_request_opening_params()?; - let fee_param = self.get_lsp_provider().opening_params.clone(); - if fee_param.is_none() { - error::bail!("At this point best_fee_param should not be None"); - } - - // TODO: We need to provide a suitable algorithm to get the best_params from all the - // opening params that we get from the peer. For now we are getting the first param - let best_fee_param = &fee_param.unwrap().clone()[0]; + let fee_params = self.client_request_opening_params()?; + let best_fee_param = &fee_params.first().unwrap().clone(); - self.buy_request(best_fee_param.clone(), amount_msat)?; - let invoice = self.generate_invoice_for_jit_channel(amount_msat, description)?; + let result = self.buy_request(best_fee_param.clone(), amount_msat)?; + let invoice = + self.generate_invoice_for_jit_channel(amount_msat, description, result.0, result.1)?; Ok(invoice) } - pub fn get_lsp_provider(&self) -> LiquidityProvider { - let res = self.lsp_provider.lock().unwrap().clone(); - println!("This is the get_lsp_provider: {:?}", res); - self.lsp_provider - .lock() - .unwrap() - .clone() - .unwrap() - .borrow() - .clone() + fn get_lsp_provider(&self) -> LSPManager { + self.lsp_manager.clone().unwrap() } fn generate_invoice_for_jit_channel( &self, amount_msat: u64, description: String, + intercept_scid: u64, + cltv_expiry_delta: u32, ) -> error::Result { - let scid = self.get_lsp_provider().scid.unwrap(); - let cltv = self.get_lsp_provider().ctlv_exiry.unwrap(); let node_id = self.get_lsp_provider().node_id; // TODO: This needs to be configurable - let expiry_seconds = 5; + let expiry_seconds = 300; let min_final_cltv_expiry_delta = MIN_FINAL_CLTV_EXPIRY_DELTA + 2; @@ -433,22 +438,22 @@ impl LampoLiquidityManager { .unwrap() .create_inbound_payment(None, expiry_seconds, Some(min_final_cltv_expiry_delta)); - let paymen_hash = res.unwrap().0; + let payment_hash = res.unwrap().0; let payment_secret = res.unwrap().1; let route_hint = RouteHint(vec![RouteHintHop { src_node_id: node_id, - short_channel_id: scid, + short_channel_id: intercept_scid, fees: RoutingFees { base_msat: 0, proportional_millionths: 0, }, - cltv_expiry_delta: cltv as u16, + cltv_expiry_delta: cltv_expiry_delta as u16, htlc_minimum_msat: None, htlc_maximum_msat: None, }]); - let payment_hash = sha256::Hash::from_slice(&paymen_hash.0)?; + let payment_hash = sha256::Hash::from_slice(&payment_hash.0)?; let currency = self.lampo_conf.network.into(); let mut invoice_builder = InvoiceBuilder::new(currency) @@ -470,16 +475,8 @@ impl LampoLiquidityManager { Ok(invoice) } -} - -impl Handler for LampoLiquidityManager { - fn emit(&self, event: Event) { - log::debug!(target: "emitter", "emit event: {:?}", event); - self.emitter.emit(event) - } - fn events(&self) -> chan::Receiver { - log::debug!(target: "listener", "subscribe for events"); - self.subscriber.subscribe() + fn handler(&self) -> Arc { + self.handler.lock().unwrap().clone().unwrap() } } diff --git a/lampod/src/ln/peer_manager.rs b/lampod/src/ln/peer_manager.rs index aedb7236..9fdb73e2 100644 --- a/lampod/src/ln/peer_manager.rs +++ b/lampod/src/ln/peer_manager.rs @@ -46,7 +46,6 @@ pub type SimpleArcPeerManager = PeerManager< Arc>>, Arc, Arc>>, Arc>, Arc, - // Implement a custom messagehandler for liquidity See https://docs.rs/lightning-liquidity/0.1.0-alpha.4/lightning_liquidity/struct.LiquidityManager.html# LampoCustomMessageHandler, Arc, >; diff --git a/tests/tests/src/lampo_tests.rs b/tests/tests/src/lampo_tests.rs index c89fcba5..fabc7a62 100644 --- a/tests/tests/src/lampo_tests.rs +++ b/tests/tests/src/lampo_tests.rs @@ -10,6 +10,7 @@ use std::sync::Arc; use std::time::Duration; use lampo_common::error; +use lampo_common::event::liquidity::LiquidityEvent; use lampo_common::event::ln::LightningEvent; use lampo_common::event::onchain::OnChainEvent; use lampo_common::event::Event; @@ -519,7 +520,7 @@ pub fn decode_offer() -> error::Result<()> { } #[test] -pub fn act_as_liquidity_server() -> error::Result<()> { +pub fn test_generate_a_jit_invoice() -> error::Result<()> { init(); let btc = async_run!(btc::BtcNode::tmp("regtest"))?; let btc = Arc::new(btc); @@ -527,20 +528,105 @@ pub fn act_as_liquidity_server() -> error::Result<()> { let node1 = Arc::new(LampoTesting::new_liquidity( btc.clone(), "server".to_string(), + None, + None, )?); + + let _info: response::GetInfo = node1.lampod().call("getinfo", json::json!({})).unwrap(); + let node1_id = _info.node_id.clone(); + let socket_addr = format!("127.0.0.1:{}", node1.port.clone()); + let socket_addr = + SocketAddress::from_str(&socket_addr).expect("Failed to parse socket address"); + // This should act as a client let node2 = Arc::new(LampoTesting::new_liquidity( btc.clone(), "consumer".to_string(), + Some(node1.info.node_id.clone()), + Some(socket_addr.to_string()), + )?); + + let response: response::Connect = node2 + .lampod() + .call( + "connect", + request::Connect { + node_id: node1.info.node_id.clone(), + addr: "127.0.0.1".to_owned(), + port: node1.port, + }, + ) + .unwrap(); + + log::info!("Connect successful: {:?}", response); + + let events = node1.lampod().events(); + let _ = node1.fund_wallet(101)?; + wait!(|| { + let Ok(Event::OnChain(OnChainEvent::NewBestBlock((_, height)))) = + events.recv_timeout(Duration::from_millis(100)) + else { + return Err(()); + }; + if height.to_consensus_u32() == 101 { + return Ok(()); + } + Err(()) + }); + + let result = node2 + .liquidity() + .create_jit_invoice(10000, "A new desc".to_string()); + + assert!(result.is_ok()); + log::info!("{}", result.unwrap().to_string()); + + Ok(()) +} + +#[test] +pub fn test_pay_a_jit_invoice() -> error::Result<()> { + init(); + let btc = async_run!(btc::BtcNode::tmp("regtest"))?; + let btc = Arc::new(btc); + // This is acting as a server + let node1 = Arc::new(LampoTesting::new_liquidity( + btc.clone(), + "server".to_string(), + None, + None, )?); let _info: response::GetInfo = node1.lampod().call("getinfo", json::json!({})).unwrap(); - println!("This is the getinfo response: {:?}", _info); let node1_id = _info.node_id.clone(); - let socket_addr = format!("127.0.0.1:{}", node2.port.clone()); + let socket_addr = format!("127.0.0.1:{}", node1.port.clone()); let socket_addr = SocketAddress::from_str(&socket_addr).expect("Failed to parse socket address"); + // This should act as a client + let node2 = Arc::new(LampoTesting::new_liquidity( + btc.clone(), + "consumer".to_string(), + Some(node1.info.node_id.clone()), + Some(socket_addr.to_string()), + )?); + + // Paying the jit invoice + let node3 = Arc::new(LampoTesting::new(btc.clone())?); + + // Connecting server and payee + let response: response::Connect = node1 + .lampod() + .call( + "connect", + request::Connect { + node_id: node3.info.node_id.clone(), + addr: "127.0.0.1".to_owned(), + port: node3.port, + }, + ) + .unwrap(); + let response: response::Connect = node2 .lampod() .call( @@ -557,11 +643,6 @@ pub fn act_as_liquidity_server() -> error::Result<()> { let node_id = PublicKey::from_str(&node1_id).expect("Wrong node id"); - let liquidity = node2.liquidity().unwrap().clone(); - let liquidity_consumer = - liquidity - .borrow_mut() - .configure_as_liquidity_consumer(node_id, socket_addr, None)?; let events = node1.lampod().events(); let _ = node1.fund_wallet(101)?; wait!(|| { @@ -576,23 +657,92 @@ pub fn act_as_liquidity_server() -> error::Result<()> { Err(()) }); - let res = node2.liquidity().unwrap().clone(); - let result = res - .clone() - .borrow_mut() - .create_a_jit_channel(100_000_000, "A new desc".to_string())?; + let _ = node3.fund_wallet(101)?; + let events3 = node3.lampod().events(); + wait!(|| { + let Ok(Event::OnChain(OnChainEvent::NewBestBlock((_, height)))) = + events3.recv_timeout(Duration::from_millis(100)) + else { + return Err(()); + }; + if height.to_consensus_u32() == 202 { + return Ok(()); + } + Err(()) + }); + + // The Provider needs to pay this invoice + let result = node2 + .liquidity() + .create_jit_invoice(10000, "A new desc".to_string())?; - let liquidity_events = res.borrow().events(); + log::info!("This is the invoice: {}", result.clone().to_string()); + // Funding channel + let response: json::Value = node3 + .lampod() + .call( + "fundchannel", + request::OpenChannel { + node_id: node1.info.node_id.clone(), + amount: 100000, + public: true, + port: Some(node1.port.clone()), + addr: Some("127.0.0.1".to_string()), + }, + ) + .unwrap(); + let events3 = node3.lampod().events(); wait!(|| { - while let Ok(event) = liquidity_events.recv_timeout(Duration::from_nanos(100_000)) { - if let Event::Liquidity(liq_event) = event { + while let Ok(event) = events3.recv_timeout(Duration::from_millis(10)) { + node3.fund_wallet(6).unwrap(); + if let Event::Lightning(LightningEvent::ChannelReady { + counterparty_node_id, + .. + }) = event + { + if counterparty_node_id.to_string() == node1.info.node_id { + return Err(()); + } return Ok(()); }; + // check if lampo see the channel + let channels: response::Channels = + node3.lampod().call("channels", json::json!({})).unwrap(); + if channels.channels.is_empty() { + return Err(()); + } + + if !channels.channels.first().unwrap().ready { + return Err(()); + } + + let channels: response::Channels = + node1.lampod().call("channels", json::json!({})).unwrap(); + + if channels.channels.is_empty() { + return Err(()); + } + + if channels.channels.first().unwrap().ready { + return Ok(()); + } } - node2.fund_wallet(6).unwrap(); + node3.fund_wallet(6).unwrap(); Err(()) }); + // This would return a timeout as json_pay will continue to wait till it get an + // PaymentSuccessful event. Out logic is not at fault here it is because node1 gets + // the HTLCIntercepted event but can't process further because of this function blocking + // the thread + // TODO: Find a better way to fix this. + let pay: response::PayResult = node3.lampod().call( + "pay", + request::Pay { + invoice_str: result.to_string(), + amount: result.clone().amount_milli_satoshis(), + }, + )?; Ok(()) }