Skip to content

Commit

Permalink
feat(rust): add ebpf to ockam_api&ockam_command
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed Sep 5, 2024
1 parent f701c73 commit 8916925
Show file tree
Hide file tree
Showing 18 changed files with 107 additions and 32 deletions.
1 change: 1 addition & 0 deletions implementations/rust/ockam/ockam_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ std = [
storage = ["ockam/storage"]
aws-lc = ["ockam_vault/aws-lc", "ockam_transport_tcp/aws-lc"]
rust-crypto = ["ockam_vault/rust-crypto", "ockam_transport_tcp/ring"]
ebpf = ["ockam_transport_tcp/ebpf"]

[dependencies]
base64-url = "3.0.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ impl KafkaInletController {
None,
false,
false,
false,
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl KafkaOutletController {
Some(kafka_outlet_address(broker_id)),
false,
OutletAccessControl::WithPolicyExpression(self.policy_expression.clone()),
false,
)
.await
.map(|info| info.to)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub struct CreateInlet {
/// Disable fallback to TCP.
/// TCP won't be used to transfer data between the Inlet and the Outlet.
#[n(12)] pub disable_tcp_fallback: bool,
#[n(13)] pub ebpf: bool
}

impl CreateInlet {
Expand All @@ -71,6 +72,7 @@ impl CreateInlet {
wait_connection: bool,
enable_udp_puncture: bool,
disable_tcp_fallback: bool,
ebpf: bool,
) -> Self {
Self {
listen_addr: listen,
Expand All @@ -85,6 +87,7 @@ impl CreateInlet {
secure_channel_identifier: None,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
}
}

Expand All @@ -99,6 +102,7 @@ impl CreateInlet {
wait_connection: bool,
enable_udp_puncture: bool,
disable_tcp_fallback: bool,
ebpf: bool,
) -> Self {
Self {
listen_addr: listen,
Expand All @@ -113,6 +117,7 @@ impl CreateInlet {
secure_channel_identifier: None,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
}
}

Expand Down Expand Up @@ -175,6 +180,7 @@ pub struct CreateOutlet {
/// If not set, the policy set for the [TCP outlet resource type](ockam_abac::ResourceType::TcpOutlet)
/// will be used.
#[n(5)] pub policy_expression: Option<PolicyExpression>,
#[n(6)] pub ebpf: bool
}

impl CreateOutlet {
Expand All @@ -183,13 +189,15 @@ impl CreateOutlet {
tls: bool,
worker_addr: Option<Address>,
reachable_from_default_secure_channel: bool,
ebpf: bool,
) -> Self {
Self {
hostname_port,
tls,
worker_addr,
reachable_from_default_secure_channel,
policy_expression: None,
ebpf,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub struct DefaultAddress;

impl DefaultAddress {
pub const OUTLET_SERVICE: &'static str = "outlet";
pub const RAW_OUTLET_SERVICE: &'static str = "outlet_raw";
pub const RELAY_SERVICE: &'static str = "forwarding_service";
pub const STATIC_RELAY_SERVICE: &'static str = "static_forwarding_service";
pub const UPPERCASE_SERVICE: &'static str = "uppercase";
Expand All @@ -29,7 +30,8 @@ impl DefaultAddress {
}

pub fn is_valid(name: &str) -> bool {
matches!(name, |Self::OUTLET_SERVICE| Self::RELAY_SERVICE
matches!(name, |Self::OUTLET_SERVICE| Self::RAW_OUTLET_SERVICE
| Self::RELAY_SERVICE
| Self::STATIC_RELAY_SERVICE
| Self::UPPERCASE_SERVICE
| Self::ECHO_SERVICE
Expand All @@ -48,6 +50,7 @@ impl DefaultAddress {
pub fn iter() -> impl Iterator<Item = &'static str> {
[
Self::OUTLET_SERVICE,
Self::RAW_OUTLET_SERVICE,
Self::RELAY_SERVICE,
Self::STATIC_RELAY_SERVICE,
Self::UPPERCASE_SERVICE,
Expand Down Expand Up @@ -76,6 +79,7 @@ mod test {
fn test_default_address_is_valid() {
assert!(!DefaultAddress::is_valid("foo"));
assert!(DefaultAddress::is_valid(DefaultAddress::OUTLET_SERVICE));
assert!(DefaultAddress::is_valid(DefaultAddress::RAW_OUTLET_SERVICE));
assert!(DefaultAddress::is_valid(DefaultAddress::RELAY_SERVICE));
assert!(DefaultAddress::is_valid(
DefaultAddress::STATIC_RELAY_SERVICE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ impl InMemoryNode {
None,
false,
false,
false,
)
.await?;

Expand Down Expand Up @@ -319,6 +320,7 @@ impl InMemoryNode {
Some(KAFKA_OUTLET_BOOTSTRAP_ADDRESS.into()),
false,
OutletAccessControl::WithPolicyExpression(outlet_policy_expression),
false,
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ impl Inlets for BackgroundNodeClient {
secure_channel_identifier: &Option<Identifier>,
enable_udp_puncture: bool,
disable_tcp_fallback: bool,
ebpf: bool,
) -> miette::Result<Reply<InletStatus>> {
let request = {
let via_project = outlet_addr.matches(0, &[ProjectProto::CODE.into()]);
Expand All @@ -40,6 +41,7 @@ impl Inlets for BackgroundNodeClient {
wait_connection,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
)
} else {
CreateInlet::to_node(
Expand All @@ -52,6 +54,7 @@ impl Inlets for BackgroundNodeClient {
wait_connection,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
)
};
if let Some(e) = policy_expression.as_ref() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl InMemoryNode {
secure_channel_identifier: Option<Identifier>,
enable_udp_puncture: bool,
disable_tcp_fallback: bool,
ebpf: bool,
) -> Result<InletStatus> {
self.node_manager
.create_inlet(
Expand All @@ -44,6 +45,7 @@ impl InMemoryNode {
secure_channel_identifier,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
)
.await
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub trait Inlets {
secure_channel_identifier: &Option<Identifier>,
enable_udp_puncture: bool,
disable_tcp_fallback: bool,
ebpf: bool,
) -> miette::Result<Reply<InletStatus>>;

async fn show_inlet(&self, ctx: &Context, alias: &str) -> miette::Result<Reply<InletStatus>>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl NodeManager {
enable_udp_puncture: bool,
// TODO: Introduce mode enum
disable_tcp_fallback: bool,
ebpf: bool,
) -> Result<InletStatus> {
info!("Handling request to create inlet portal");
debug! {
Expand Down Expand Up @@ -124,6 +125,7 @@ impl NodeManager {
additional_secure_channel: None,
udp_puncture: None,
additional_route: None,
ebpf,
};

let replacer = Arc::new(Mutex::new(replacer));
Expand Down Expand Up @@ -182,7 +184,10 @@ impl NodeManager {

let tcp_inlet_status = InletStatus::new(
listen_addr.to_string(),
outcome.clone().map(|s| s.worker.address().to_string()),
outcome.clone().map(|s| match s.worker {
Some(address) => address.address().to_string(),
None => "<>".to_string(),
}),
&alias,
None,
outcome.clone().map(|s| s.route.to_string()),
Expand Down Expand Up @@ -231,9 +236,14 @@ impl NodeManager {
drop(session);
if let Some(outcome) = outcome {
if let ReplacerOutputKind::Inlet(status) = outcome {
let address = match &status.worker {
Some(address) => address.address().to_string(),
None => "<>".to_string(),
};

Some(InletStatus::new(
inlet_info.bind_addr.to_string(),
status.worker.address().to_string(),
address,
alias,
None,
status.route.to_string(),
Expand Down Expand Up @@ -270,15 +280,22 @@ impl NodeManager {

let status = if let Some(outcome) = outcome {
match &outcome {
ReplacerOutputKind::Inlet(status) => InletStatus::new(
&info.bind_addr,
status.worker.address().to_string(),
alias,
None,
status.route.to_string(),
connection_status,
info.outlet_addr.to_string(),
),
ReplacerOutputKind::Inlet(status) => {
let address = match &status.worker {
Some(address) => address.address().to_string(),
None => "<>".to_string(),
};

InletStatus::new(
&info.bind_addr,
address,
alias,
None,
status.route.to_string(),
connection_status,
info.outlet_addr.to_string(),
)
}
_ => {
panic!("Unexpected outcome: {:?}", outcome)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl NodeManagerWorker {
secure_channel_identifier,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
} = create_inlet;
match self
.node_manager
Expand All @@ -47,6 +48,7 @@ impl NodeManagerWorker {
secure_channel_identifier,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
)
.await
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub(super) struct InletSessionReplacer {
pub(super) additional_secure_channel: Option<SecureChannel>,
pub(super) udp_puncture: Option<UdpPuncture>,
pub(super) additional_route: Option<Route>,
pub(super) ebpf: bool,
}

impl InletSessionReplacer {
Expand Down Expand Up @@ -148,16 +149,27 @@ impl InletSessionReplacer {
Some(inlet) => {
inlet.unpause(normalized_stripped_route.clone())?;

inlet.processor_address().clone()
inlet.processor_address().cloned()
}
None => {
let options = self.inlet_options(node_manager).await?;
let inlet = node_manager
.tcp_transport
.create_inlet(self.listen_addr.clone(), normalized_route.clone(), options)
.await?;
let inlet = if self.ebpf {
node_manager
.tcp_transport
.create_raw_inlet(
self.listen_addr.clone(),
normalized_route.clone(),
options,
)
.await?
} else {
node_manager
.tcp_transport
.create_inlet(self.listen_addr.clone(), normalized_route.clone(), options)
.await?
};

let inlet_address = inlet.processor_address().clone();
let inlet_address = inlet.processor_address().cloned();

let inlet = Arc::new(inlet);
self.inlet = Some(inlet);
Expand All @@ -183,18 +195,15 @@ impl InletSessionReplacer {
}
}

async fn close_inlet(&mut self, node_manager: &NodeManager) {
async fn close_inlet(&mut self) {
if let Some(inlet) = self.inlet.take() {
// The previous inlet worker needs to be stopped:
let result = node_manager
.tcp_transport
.stop_inlet(inlet.processor_address().clone())
.await;
let result = inlet.stop(&self.context).await;

if let Err(err) = result {
error!(
?err,
"Failed to remove inlet with address {}",
"Failed to remove inlet with address {:?}",
inlet.processor_address()
);
}
Expand Down Expand Up @@ -257,7 +266,7 @@ impl SessionReplacer for InletSessionReplacer {
return;
};

self.close_inlet(&node_manager).await;
self.close_inlet().await;
self.close_connection(&node_manager).await;
}
}
Expand Down
Loading

0 comments on commit 8916925

Please sign in to comment.