Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): allow relay connection failure without failing relay creation #8504

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::kafka::key_exchange::{KafkaKeyExchangeController, TopicPartition};
use crate::kafka::protocol_aware::KafkaEncryptedContent;
use crate::kafka::{ConsumerPublishing, ConsumerResolution};
use crate::nodes::models::relay::ReturnTiming;
use crate::nodes::NodeManager;
use ockam::identity::{
DecryptionRequest, DecryptionResponse, EncryptionRequest, EncryptionResponse, Identifier,
Expand Down Expand Up @@ -112,6 +113,7 @@ impl KafkaKeyExchangeController for KafkaKeyExchangeControllerImpl {
alias.clone(),
None,
Some(alias),
ReturnTiming::AfterConnection,
)
.await?;

Expand Down
15 changes: 15 additions & 0 deletions implementations/rust/ockam/ockam_api/src/nodes/models/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ use crate::session::replacer::ReplacerOutputKind;
use crate::session::session::Session;
use crate::{route_to_multiaddr, ConnectionStatus};

#[derive(Debug, Clone, Encode, Decode, CborLen)]
#[rustfmt::skip]
pub enum ReturnTiming {
#[n(1)] Immediately,
#[n(2)] AfterConnection,
}

/// Request body when instructing a node to create a relay
#[derive(Debug, Clone, Encode, Decode, CborLen)]
#[rustfmt::skip]
Expand All @@ -29,6 +36,8 @@ pub struct CreateRelay {
#[n(3)] pub(crate) authorized: Option<Identifier>,
/// Relay address.
#[n(4)] pub(crate) relay_address: Option<String>,
/// When to return.
#[n(5)] pub(crate) return_timing: ReturnTiming,
}

impl CreateRelay {
Expand All @@ -37,12 +46,14 @@ impl CreateRelay {
alias: String,
auth: Option<Identifier>,
relay_address: Option<String>,
return_timing: ReturnTiming,
) -> Self {
Self {
address,
alias,
authorized: auth,
relay_address,
return_timing,
}
}

Expand All @@ -61,6 +72,10 @@ impl CreateRelay {
pub fn relay_address(&self) -> Option<&str> {
self.relay_address.as_deref()
}

pub fn return_timing(&self) -> ReturnTiming {
self.return_timing.clone()
}
}

/// Response body when creating a relay
Expand Down
81 changes: 61 additions & 20 deletions implementations/rust/ockam/ockam_api/src/nodes/service/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use ockam_node::compat::asynchronous::Mutex;
use ockam_node::Context;

use crate::nodes::connection::Connection;
use crate::nodes::models::relay::{CreateRelay, RelayInfo};
use crate::nodes::models::relay::{CreateRelay, RelayInfo, ReturnTiming};
use crate::nodes::models::secure_channel::{
CreateSecureChannelRequest, CreateSecureChannelResponse,
};
Expand All @@ -40,10 +40,18 @@ impl NodeManagerWorker {
alias,
authorized,
relay_address,
return_timing,
} = create_relay;
match self
.node_manager
.create_relay(ctx, &address, alias, authorized, relay_address)
.create_relay(
ctx,
&address,
alias,
authorized,
relay_address,
return_timing,
)
.await
{
Ok(body) => Ok(Response::ok().with_headers(req).body(body)),
Expand Down Expand Up @@ -125,6 +133,7 @@ impl NodeManager {
alias: String,
authorized: Option<Identifier>,
relay_address: Option<String>,
return_timing: ReturnTiming,
) -> Result<RelayInfo> {
if self.registry.relays.contains_key(&alias).await {
let message = format!("A relay with the name '{alias}' already exists");
Expand All @@ -147,26 +156,42 @@ impl NodeManager {

let mut session = Session::create(ctx, Arc::new(Mutex::new(replacer)), None).await?;

let remote_relay_info = session
.initial_connect()
.await
.map(|outcome| match outcome {
ReplacerOutputKind::Relay(status) => status,
_ => {
panic!("Unexpected outcome: {:?}", outcome);
let remote_relay_info = match return_timing {
ReturnTiming::Immediately => None,
ReturnTiming::AfterConnection => {
let result = session
.initial_connect()
.await
.map(|outcome| match outcome {
ReplacerOutputKind::Relay(status) => status,
_ => {
panic!("Unexpected outcome: {:?}", outcome);
}
});

match result {
Ok(remote_relay_info) => Some(remote_relay_info),
Err(err) => {
warn!(%err, "Failed to create relay");
None
}
}
})?;
}
};

session.start_monitoring().await?;

debug!(
forwarding_route = %remote_relay_info.forwarding_route(),
remote_address = %remote_relay_info.remote_address(),
"CreateRelay request processed, sending back response"
);

let relay_info = RelayInfo::new(addr.clone(), alias.clone(), session.connection_status())
.with(remote_relay_info);
let relay_info = RelayInfo::new(addr.clone(), alias.clone(), session.connection_status());
let relay_info = if let Some(remote_relay_info) = remote_relay_info {
debug!(
forwarding_route = %remote_relay_info.forwarding_route(),
remote_address = %remote_relay_info.remote_address(),
"CreateRelay request processed, sending back response"
);
relay_info.with(remote_relay_info)
} else {
relay_info
};

let registry_relay_info = RegistryRelayInfo {
destination_address: addr.clone(),
Expand Down Expand Up @@ -236,9 +261,17 @@ impl InMemoryNode {
alias: String,
authorized: Option<Identifier>,
relay_address: Option<String>,
return_timing: ReturnTiming,
) -> Result<RelayInfo> {
self.node_manager
.create_relay(ctx, address, alias, authorized, relay_address)
.create_relay(
ctx,
address,
alias,
authorized,
relay_address,
return_timing,
)
.await
}

Expand Down Expand Up @@ -348,6 +381,7 @@ pub trait Relays {
alias: String,
authorized: Option<Identifier>,
relay_address: Option<String>,
return_timing: ReturnTiming,
) -> miette::Result<RelayInfo>;
}

Expand All @@ -360,8 +394,15 @@ impl Relays for BackgroundNodeClient {
alias: String,
authorized: Option<Identifier>,
relay_address: Option<String>,
return_timing: ReturnTiming,
) -> miette::Result<RelayInfo> {
let body = CreateRelay::new(address.clone(), alias, authorized, relay_address);
let body = CreateRelay::new(
address.clone(),
alias,
authorized,
relay_address,
return_timing,
);
self.ask(ctx, Request::post("/node/relay").body(body)).await
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tracing::{debug, info, trace, warn};

use ockam::Context;
use ockam_api::cli_state::CliState;
use ockam_api::nodes::models::relay::RelayInfo;
use ockam_api::nodes::models::relay::{RelayInfo, ReturnTiming};
use ockam_api::nodes::InMemoryNode;
use ockam_multiaddr::MultiAddr;

Expand Down Expand Up @@ -90,6 +90,7 @@ impl AppState {
relay_alias.clone(),
None,
Some(relay_alias),
ReturnTiming::AfterConnection,
)
.await
.into_diagnostic()?;
Expand Down
93 changes: 66 additions & 27 deletions implementations/rust/ockam/ockam_command/src/relay/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ use ockam::Context;
use ockam_api::address::extract_address_value;
use ockam_api::colors::{color_primary, OckamColor};

use ockam_api::nodes::models::relay::ReturnTiming;
use ockam_api::nodes::service::relay::Relays;
use ockam_api::nodes::BackgroundNodeClient;
use ockam_api::{fmt_log, fmt_ok, CliState};
use ockam_api::{fmt_info, fmt_log, fmt_ok, fmt_warn, CliState, ConnectionStatus};
use ockam_multiaddr::proto::Project;
use ockam_multiaddr::{MultiAddr, Protocol};

Expand Down Expand Up @@ -55,11 +56,14 @@ pub struct CreateCommand {
#[arg(long)]
relay_address: Option<String>,

/// Whether the relay will be used to relay messages at a project.
/// By default, this information will be inferred from the `--at` argument.
/// Deprecated
#[arg(long)]
project_relay: bool,

/// Create the Relay without waiting for the connection to be established
#[arg(long, default_value = "false")]
pub no_connection_wait: bool,

#[command(flatten)]
retry_opts: RetryOpts,
}
Expand All @@ -85,6 +89,12 @@ impl Command for CreateCommand {
opts.terminal.write_line(&fmt_log!("Creating Relay...\n"))?;
let is_finished: Mutex<bool> = Mutex::new(false);

let return_timing = if cmd.no_connection_wait {
ReturnTiming::Immediately
} else {
ReturnTiming::AfterConnection
};

let node = BackgroundNodeClient::create(ctx, &opts.state, &cmd.to).await?;
let get_relay_info = async {
let relay_info = {
Expand All @@ -100,6 +110,7 @@ impl Command for CreateCommand {
alias.clone(),
cmd.authorized,
Some(cmd.relay_address.unwrap_or(alias)),
return_timing.clone(),
)
.await
.map_err(Error::Retry)?
Expand All @@ -122,29 +133,58 @@ impl Command for CreateCommand {

let (relay, _) = try_join!(get_relay_info, progress_output)?;

let invalid_relay_error_msg =
"The Orchestrator returned an invalid relay address. Try creating a new one.";
let remote_address = relay
.remote_address_ma()
.into_diagnostic()?
.ok_or(miette!(invalid_relay_error_msg))?;
let worker_address = relay
.worker_address_ma()
.into_diagnostic()?
.ok_or(miette!(invalid_relay_error_msg))?;

let plain = {
// `remote_address` in the project is relaying to worker at address `worker_address` on that node.
let from = color_primary(format!("{}{}", &at, remote_address));
let to = color_primary(format!("/node/{}{}", &node.node_name(), worker_address));
fmt_ok!("Now relaying messages from {from} → {to}")
};
opts.terminal
.stdout()
.plain(plain)
.machine(remote_address.to_string())
.json(serde_json::to_string(&relay).into_diagnostic()?)
.write_line()?;
match return_timing {
ReturnTiming::Immediately => {
opts.terminal
.stdout()
.plain("Relay will be created automatically as soon as a connection can be established.")
.json(serde_json::to_string(&relay).into_diagnostic()?)
.write_line()?;
}
ReturnTiming::AfterConnection => {
if relay.connection_status() == ConnectionStatus::Up {
let invalid_relay_error_msg =
"The Orchestrator returned an invalid relay address. Try creating a new one.";

let remote_address = relay
.remote_address_ma()
.into_diagnostic()?
.ok_or(miette!(invalid_relay_error_msg))?;
let worker_address = relay
.worker_address_ma()
.into_diagnostic()?
.ok_or(miette!(invalid_relay_error_msg))?;

let plain = {
// `remote_address` in the project is relaying to worker at address `worker_address` on that node.
let from = color_primary(format!("{}{}", &at, remote_address));
let to =
color_primary(format!("/node/{}{}", &node.node_name(), worker_address));
fmt_ok!("Now relaying messages from {from} → {to}")
};

opts.terminal
.stdout()
.plain(plain)
.machine(remote_address.to_string())
.json(serde_json::to_string(&relay).into_diagnostic()?)
.write_line()?;
} else {
let node_name = node.node_name();
let plain = fmt_warn!(
"A Relay was created at Node {} but failed to connect to the Node at {}\n",
color_primary(node_name),
color_primary(&cmd.at),
) + &fmt_info!("It will retry to connect automatically");

opts.terminal
.stdout()
.plain(plain)
.json(serde_json::to_string(&relay).into_diagnostic()?)
.write_line()?;
}
}
}

Ok(())
}
Expand All @@ -168,7 +208,6 @@ impl CreateCommand {
.ok()
.map(|p| p.name().to_string());
let at = Self::parse_arg_at(&opts.state, self.at, default_project_name.as_deref()).await?;
self.project_relay |= at.starts_with(Project::CODE);
self.at = at.to_string();
Ok(self)
}
Expand Down
12 changes: 10 additions & 2 deletions tools/stress-test/src/execution.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{portal_simulator, Relay, State};
use ockam::abac::tokio::task::JoinSet;
use ockam_api::nodes::models::relay::ReturnTiming;
use ockam_core::Address;
use std::cmp;

Expand Down Expand Up @@ -29,8 +30,15 @@ impl State {
let context = self.context.clone();
join_set.spawn(async move {
let id = Self::random_id();
node.create_relay(&context, &project_addr, id.clone(), None, Some(id))
.await
node.create_relay(
&context,
&project_addr,
id.clone(),
None,
Some(id),
ReturnTiming::AfterConnection,
)
.await
});
}

Expand Down
Loading