diff --git a/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/controller.rs b/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/controller.rs index 0641f2560a5..5fcd4c46229 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/controller.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/controller.rs @@ -1,6 +1,7 @@ use crate::kafka::key_exchange::{KafkaKeyExchangeController, TopicPartition}; use crate::kafka::protocol_aware::KafkaEncryptedContent; use crate::kafka::{ConsumerPublishing, ConsumerResolution}; +use crate::nodes::models::relay::ReturnTiming; use crate::nodes::NodeManager; use ockam::identity::{ DecryptionRequest, DecryptionResponse, EncryptionRequest, EncryptionResponse, Identifier, @@ -112,6 +113,7 @@ impl KafkaKeyExchangeController for KafkaKeyExchangeControllerImpl { alias.clone(), None, Some(alias), + ReturnTiming::AfterConnection, ) .await?; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs b/implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs index 237be505e6d..47b68bf68f8 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs @@ -14,6 +14,13 @@ use crate::session::replacer::ReplacerOutputKind; use crate::session::session::Session; use crate::{route_to_multiaddr, ConnectionStatus}; +#[derive(Debug, Clone, Encode, Decode, CborLen)] +#[rustfmt::skip] +pub enum ReturnTiming { + #[n(1)] Immediately, + #[n(2)] AfterConnection, +} + /// Request body when instructing a node to create a relay #[derive(Debug, Clone, Encode, Decode, CborLen)] #[rustfmt::skip] @@ -29,6 +36,8 @@ pub struct CreateRelay { #[n(3)] pub(crate) authorized: Option, /// Relay address. #[n(4)] pub(crate) relay_address: Option, + /// When to return. + #[n(5)] pub(crate) return_timing: ReturnTiming, } impl CreateRelay { @@ -37,12 +46,14 @@ impl CreateRelay { alias: String, auth: Option, relay_address: Option, + return_timing: ReturnTiming, ) -> Self { Self { address, alias, authorized: auth, relay_address, + return_timing, } } @@ -61,6 +72,10 @@ impl CreateRelay { pub fn relay_address(&self) -> Option<&str> { self.relay_address.as_deref() } + + pub fn return_timing(&self) -> ReturnTiming { + self.return_timing.clone() + } } /// Response body when creating a relay diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs index 91d361d6843..e05ff94193f 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs @@ -15,7 +15,7 @@ use ockam_node::compat::asynchronous::Mutex; use ockam_node::Context; use crate::nodes::connection::Connection; -use crate::nodes::models::relay::{CreateRelay, RelayInfo}; +use crate::nodes::models::relay::{CreateRelay, RelayInfo, ReturnTiming}; use crate::nodes::models::secure_channel::{ CreateSecureChannelRequest, CreateSecureChannelResponse, }; @@ -40,10 +40,18 @@ impl NodeManagerWorker { alias, authorized, relay_address, + return_timing, } = create_relay; match self .node_manager - .create_relay(ctx, &address, alias, authorized, relay_address) + .create_relay( + ctx, + &address, + alias, + authorized, + relay_address, + return_timing, + ) .await { Ok(body) => Ok(Response::ok().with_headers(req).body(body)), @@ -125,6 +133,7 @@ impl NodeManager { alias: String, authorized: Option, relay_address: Option, + return_timing: ReturnTiming, ) -> Result { if self.registry.relays.contains_key(&alias).await { let message = format!("A relay with the name '{alias}' already exists"); @@ -147,26 +156,42 @@ impl NodeManager { let mut session = Session::create(ctx, Arc::new(Mutex::new(replacer)), None).await?; - let remote_relay_info = session - .initial_connect() - .await - .map(|outcome| match outcome { - ReplacerOutputKind::Relay(status) => status, - _ => { - panic!("Unexpected outcome: {:?}", outcome); + let remote_relay_info = match return_timing { + ReturnTiming::Immediately => None, + ReturnTiming::AfterConnection => { + let result = session + .initial_connect() + .await + .map(|outcome| match outcome { + ReplacerOutputKind::Relay(status) => status, + _ => { + panic!("Unexpected outcome: {:?}", outcome); + } + }); + + match result { + Ok(remote_relay_info) => Some(remote_relay_info), + Err(err) => { + warn!(%err, "Failed to create relay"); + None + } } - })?; + } + }; session.start_monitoring().await?; - debug!( - forwarding_route = %remote_relay_info.forwarding_route(), - remote_address = %remote_relay_info.remote_address(), - "CreateRelay request processed, sending back response" - ); - - let relay_info = RelayInfo::new(addr.clone(), alias.clone(), session.connection_status()) - .with(remote_relay_info); + let relay_info = RelayInfo::new(addr.clone(), alias.clone(), session.connection_status()); + let relay_info = if let Some(remote_relay_info) = remote_relay_info { + debug!( + forwarding_route = %remote_relay_info.forwarding_route(), + remote_address = %remote_relay_info.remote_address(), + "CreateRelay request processed, sending back response" + ); + relay_info.with(remote_relay_info) + } else { + relay_info + }; let registry_relay_info = RegistryRelayInfo { destination_address: addr.clone(), @@ -236,9 +261,17 @@ impl InMemoryNode { alias: String, authorized: Option, relay_address: Option, + return_timing: ReturnTiming, ) -> Result { self.node_manager - .create_relay(ctx, address, alias, authorized, relay_address) + .create_relay( + ctx, + address, + alias, + authorized, + relay_address, + return_timing, + ) .await } @@ -348,6 +381,7 @@ pub trait Relays { alias: String, authorized: Option, relay_address: Option, + return_timing: ReturnTiming, ) -> miette::Result; } @@ -360,8 +394,15 @@ impl Relays for BackgroundNodeClient { alias: String, authorized: Option, relay_address: Option, + return_timing: ReturnTiming, ) -> miette::Result { - let body = CreateRelay::new(address.clone(), alias, authorized, relay_address); + let body = CreateRelay::new( + address.clone(), + alias, + authorized, + relay_address, + return_timing, + ); self.ask(ctx, Request::post("/node/relay").body(body)).await } } diff --git a/implementations/rust/ockam/ockam_app_lib/src/shared_service/relay/create.rs b/implementations/rust/ockam/ockam_app_lib/src/shared_service/relay/create.rs index 8a0e249ed5c..32ec2121ef0 100644 --- a/implementations/rust/ockam/ockam_app_lib/src/shared_service/relay/create.rs +++ b/implementations/rust/ockam/ockam_app_lib/src/shared_service/relay/create.rs @@ -6,7 +6,7 @@ use tracing::{debug, info, trace, warn}; use ockam::Context; use ockam_api::cli_state::CliState; -use ockam_api::nodes::models::relay::RelayInfo; +use ockam_api::nodes::models::relay::{RelayInfo, ReturnTiming}; use ockam_api::nodes::InMemoryNode; use ockam_multiaddr::MultiAddr; @@ -90,6 +90,7 @@ impl AppState { relay_alias.clone(), None, Some(relay_alias), + ReturnTiming::AfterConnection, ) .await .into_diagnostic()?; diff --git a/implementations/rust/ockam/ockam_command/src/relay/create.rs b/implementations/rust/ockam/ockam_command/src/relay/create.rs index f8b9a99d0dd..f3054642666 100644 --- a/implementations/rust/ockam/ockam_command/src/relay/create.rs +++ b/implementations/rust/ockam/ockam_command/src/relay/create.rs @@ -13,9 +13,10 @@ use ockam::Context; use ockam_api::address::extract_address_value; use ockam_api::colors::{color_primary, OckamColor}; +use ockam_api::nodes::models::relay::ReturnTiming; use ockam_api::nodes::service::relay::Relays; use ockam_api::nodes::BackgroundNodeClient; -use ockam_api::{fmt_log, fmt_ok, CliState}; +use ockam_api::{fmt_info, fmt_log, fmt_ok, fmt_warn, CliState, ConnectionStatus}; use ockam_multiaddr::proto::Project; use ockam_multiaddr::{MultiAddr, Protocol}; @@ -55,11 +56,14 @@ pub struct CreateCommand { #[arg(long)] relay_address: Option, - /// Whether the relay will be used to relay messages at a project. - /// By default, this information will be inferred from the `--at` argument. + /// Deprecated #[arg(long)] project_relay: bool, + /// Create the Relay without waiting for the connection to be established + #[arg(long, default_value = "false")] + pub no_connection_wait: bool, + #[command(flatten)] retry_opts: RetryOpts, } @@ -85,6 +89,12 @@ impl Command for CreateCommand { opts.terminal.write_line(&fmt_log!("Creating Relay...\n"))?; let is_finished: Mutex = Mutex::new(false); + let return_timing = if cmd.no_connection_wait { + ReturnTiming::Immediately + } else { + ReturnTiming::AfterConnection + }; + let node = BackgroundNodeClient::create(ctx, &opts.state, &cmd.to).await?; let get_relay_info = async { let relay_info = { @@ -100,6 +110,7 @@ impl Command for CreateCommand { alias.clone(), cmd.authorized, Some(cmd.relay_address.unwrap_or(alias)), + return_timing.clone(), ) .await .map_err(Error::Retry)? @@ -122,29 +133,58 @@ impl Command for CreateCommand { let (relay, _) = try_join!(get_relay_info, progress_output)?; - let invalid_relay_error_msg = - "The Orchestrator returned an invalid relay address. Try creating a new one."; - let remote_address = relay - .remote_address_ma() - .into_diagnostic()? - .ok_or(miette!(invalid_relay_error_msg))?; - let worker_address = relay - .worker_address_ma() - .into_diagnostic()? - .ok_or(miette!(invalid_relay_error_msg))?; - - let plain = { - // `remote_address` in the project is relaying to worker at address `worker_address` on that node. - let from = color_primary(format!("{}{}", &at, remote_address)); - let to = color_primary(format!("/node/{}{}", &node.node_name(), worker_address)); - fmt_ok!("Now relaying messages from {from} → {to}") - }; - opts.terminal - .stdout() - .plain(plain) - .machine(remote_address.to_string()) - .json(serde_json::to_string(&relay).into_diagnostic()?) - .write_line()?; + match return_timing { + ReturnTiming::Immediately => { + opts.terminal + .stdout() + .plain("Relay will be created automatically as soon as a connection can be established.") + .json(serde_json::to_string(&relay).into_diagnostic()?) + .write_line()?; + } + ReturnTiming::AfterConnection => { + if relay.connection_status() == ConnectionStatus::Up { + let invalid_relay_error_msg = + "The Orchestrator returned an invalid relay address. Try creating a new one."; + + let remote_address = relay + .remote_address_ma() + .into_diagnostic()? + .ok_or(miette!(invalid_relay_error_msg))?; + let worker_address = relay + .worker_address_ma() + .into_diagnostic()? + .ok_or(miette!(invalid_relay_error_msg))?; + + let plain = { + // `remote_address` in the project is relaying to worker at address `worker_address` on that node. + let from = color_primary(format!("{}{}", &at, remote_address)); + let to = + color_primary(format!("/node/{}{}", &node.node_name(), worker_address)); + fmt_ok!("Now relaying messages from {from} → {to}") + }; + + opts.terminal + .stdout() + .plain(plain) + .machine(remote_address.to_string()) + .json(serde_json::to_string(&relay).into_diagnostic()?) + .write_line()?; + } else { + let node_name = node.node_name(); + let plain = fmt_warn!( + "A Relay was created at Node {} but failed to connect to the Node at {}\n", + color_primary(node_name), + color_primary(&cmd.at), + ) + &fmt_info!("It will retry to connect automatically"); + + opts.terminal + .stdout() + .plain(plain) + .json(serde_json::to_string(&relay).into_diagnostic()?) + .write_line()?; + } + } + } Ok(()) } @@ -168,7 +208,6 @@ impl CreateCommand { .ok() .map(|p| p.name().to_string()); let at = Self::parse_arg_at(&opts.state, self.at, default_project_name.as_deref()).await?; - self.project_relay |= at.starts_with(Project::CODE); self.at = at.to_string(); Ok(self) } diff --git a/tools/stress-test/src/execution.rs b/tools/stress-test/src/execution.rs index 4a68d3d9ea6..65f8601863c 100644 --- a/tools/stress-test/src/execution.rs +++ b/tools/stress-test/src/execution.rs @@ -1,5 +1,6 @@ use crate::{portal_simulator, Relay, State}; use ockam::abac::tokio::task::JoinSet; +use ockam_api::nodes::models::relay::ReturnTiming; use ockam_core::Address; use std::cmp; @@ -29,8 +30,15 @@ impl State { let context = self.context.clone(); join_set.spawn(async move { let id = Self::random_id(); - node.create_relay(&context, &project_addr, id.clone(), None, Some(id)) - .await + node.create_relay( + &context, + &project_addr, + id.clone(), + None, + Some(id), + ReturnTiming::AfterConnection, + ) + .await }); }