From d123b435e9f8f29d0eb4152e144df0a2fa9900ce Mon Sep 17 00:00:00 2001 From: Oleksandr Deundiak Date: Fri, 27 Sep 2024 14:03:49 +0200 Subject: [PATCH] feat(rust): use more monolith structure for ebpf portals --- Cargo.lock | 1 + .../rust/ockam/ockam_ebpf/ockam_ebpf | Bin 25272 -> 25272 bytes .../rust/ockam/ockam_transport_tcp/Cargo.toml | 5 +- .../src/ebpf_portal/common.rs | 65 ++-- .../src/ebpf_portal/ebpf_support.rs | 13 +- .../src/ebpf_portal/mod.rs | 6 +- .../src/ebpf_portal/outlet_listener_worker.rs | 153 --------- .../src/ebpf_portal/portal_processor.rs | 321 ++++++++++++++---- .../src/ebpf_portal/portal_worker.rs | 197 ++++++++--- .../src/ebpf_portal/portals.rs | 60 ++-- .../src/ebpf_portal/processor.rs | 253 -------------- .../src/ebpf_portal/registry/inlet.rs | 153 ++++++--- .../src/ebpf_portal/registry/outlet.rs | 155 +++++---- .../src/portal/inlet_listener.rs | 2 +- .../ockam_transport_tcp/src/portal/options.rs | 11 +- .../src/transport/portals.rs | 6 +- 16 files changed, 704 insertions(+), 697 deletions(-) delete mode 100644 implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/outlet_listener_worker.rs delete mode 100644 implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/processor.rs diff --git a/Cargo.lock b/Cargo.lock index 74dee070ae4..a52256d3588 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5110,6 +5110,7 @@ dependencies = [ "cfg-if", "cfg_aliases 0.2.1", "env_logger 0.11.5", + "hex", "libc", "log", "minicbor", diff --git a/implementations/rust/ockam/ockam_ebpf/ockam_ebpf b/implementations/rust/ockam/ockam_ebpf/ockam_ebpf index f6d8fae8903bf9a45a140fb0b2194570479c6a7d..d527702666e83fc2db7088634e9e8d7babb0cefc 100644 GIT binary patch delta 318 zcmdmSlyS#V#tmLf3Oo!TP^iiarqfv&7&w3|e^)S z$%P3blX*g17!OYl3~^^Ho;))|U(y>W$;iOQkOOfC13Q#fp8POGoars&>9q&6871j10}p zEliA4%}oppTqj?MQ=i-&r#1Ov+&}6350DUf1O#lD@ delta 322 zcmdmSlyS#V#tmLf3cL&;kj}!uzyYKSRe8XC{}3?65X!PSi)j)oWAx^~?CBhgX`538 z`b8NNCjVvMzj=$A8Z%+-zcgpDGEUt*RsTK*u zHACAu_zjFq3{B0GQ!G=HEe+C4Chw1R5-?9rGO{$WG%>JDwoEoLHJdCL=OmbHY?zpw znqromW^QR|WN7RNcVc2RoNSp?4*(60R6PIy diff --git a/implementations/rust/ockam/ockam_transport_tcp/Cargo.toml b/implementations/rust/ockam/ockam_transport_tcp/Cargo.toml index d2d806c49a2..e5aac710673 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/Cargo.toml +++ b/implementations/rust/ockam/ockam_transport_tcp/Cargo.toml @@ -36,6 +36,7 @@ cfg_aliases = "0.2.1" [dependencies] cfg-if = "1.0.0" +hex = "0.4.3" log = "0.4.21" minicbor = "0.24" ockam_core = { path = "../ockam_core", version = "^0.117.0" } @@ -54,9 +55,11 @@ tokio-rustls = { version = "0.26", default-features = false, features = ["loggin tracing = { version = "0.1", default-features = false } [target.'cfg( target_os = "linux" )'.dependencies] -pnet = { version = "0.35.0", optional = true } aya = { version = "0.12", optional = true } aya-log = { version = "0.2", optional = true } + +[target.'cfg( any(target_os = "linux", target_os = "macos") )'.dependencies] env_logger = { version = "0.11", optional = true } libc = { version = "0.2", optional = true } nix = { version = "0.29", features = ["net"], optional = true } +pnet = { version = "0.35.0", optional = true } diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/common.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/common.rs index 906df07a07c..55ea666ea2d 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/common.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/common.rs @@ -1,22 +1,37 @@ use minicbor::{Decode, Encode}; -use ockam_core::CowBytes; use pnet::packet::tcp::TcpPacket; use pnet::packet::Packet; +use rand::distributions::{Distribution, Standard}; +use rand::Rng; use std::net::Ipv4Addr; +/// Unique random connection identifier +#[derive(Clone, Debug, Eq, PartialEq, Hash, Encode, Decode)] +#[cbor(transparent)] +#[rustfmt::skip] +pub struct ConnectionIdentifier(#[n(0)] String); + +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> ConnectionIdentifier { + let bytes: [u8; 8] = rng.gen(); + ConnectionIdentifier(hex::encode(bytes)) + } +} + #[allow(missing_docs)] #[derive(Encode, Decode)] #[rustfmt::skip] -pub struct OckamPortalPacket<'a> { - #[n(0)] pub sequence: u32, - #[n(1)] pub acknowledgement: u32, - #[n(2)] pub data_offset: u8, - #[n(3)] pub reserved: u8, - #[n(4)] pub flags: u8, - #[n(5)] pub window: u16, - #[n(6)] pub urgent_ptr: u16, - #[n(7)] pub options: Vec, - #[b(8)] pub payload: CowBytes<'a>, +pub struct OckamPortalPacket { + #[n(0)] pub connection_identifier: ConnectionIdentifier, + #[n(1)] pub sequence: u32, + #[n(2)] pub acknowledgement: u32, + #[n(3)] pub data_offset: u8, + #[n(4)] pub reserved: u8, + #[n(5)] pub flags: u8, + #[n(6)] pub window: u16, + #[n(7)] pub urgent_ptr: u16, + #[n(8)] pub options: Vec, + #[n(9)] pub payload: Vec, } #[allow(missing_docs)] @@ -38,26 +53,14 @@ impl From for pnet::packet::tcp::TcpOption { } } -impl OckamPortalPacket<'_> { - /// Clone data to make an owned version of an instance. - pub fn into_owned(self) -> OckamPortalPacket<'static> { - OckamPortalPacket { - sequence: self.sequence, - acknowledgement: self.acknowledgement, - data_offset: self.data_offset, - reserved: self.reserved, - flags: self.flags, - window: self.window, - urgent_ptr: self.urgent_ptr, - options: self.options, - payload: self.payload.to_owned(), - } - } -} - -impl From for OckamPortalPacket<'_> { - fn from(value: RawSocketMessage) -> Self { +impl OckamPortalPacket { + /// Transform + pub fn from_raw_socket_message( + value: RawSocketMessage, + connection_identifier: ConnectionIdentifier, + ) -> Self { Self { + connection_identifier, sequence: value.sequence, acknowledgement: value.acknowledgement, data_offset: value.data_offset, @@ -66,7 +69,7 @@ impl From for OckamPortalPacket<'_> { window: value.window, urgent_ptr: value.urgent_ptr, options: value.options.into_iter().map(Into::into).collect(), - payload: value.payload.into(), + payload: value.payload, } } } diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/ebpf_support.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/ebpf_support.rs index b2f50228c21..77329b3f26a 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/ebpf_support.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/ebpf_support.rs @@ -10,7 +10,7 @@ use core::fmt::{Debug, Formatter}; use ockam_core::compat::collections::HashMap; use ockam_core::compat::sync::RwLock; use ockam_core::errcode::{Kind, Origin}; -use ockam_core::{Address, Error, Result}; +use ockam_core::{Address, AllowAll, DenyAll, Error, Result}; use ockam_node::compat::asynchronous::Mutex as AsyncMutex; use ockam_node::Context; use pnet::transport::TransportSender; @@ -104,7 +104,8 @@ impl TcpTransportEbpfSupport { *socket_write_handle_lock = Some(socket_write_handle.clone()); - ctx.start_processor(address, processor).await?; + ctx.start_processor_with_access_control(address, processor, DenyAll, AllowAll) + .await?; info!("Started RawSocket"); @@ -281,7 +282,7 @@ impl TcpTransportEbpfSupport { } /// Add inlet port - pub fn add_inlet_port(&self, port: u16) -> Result<()> { + pub fn add_inlet_port(&self, port: Port) -> Result<()> { let mut bpf = self.bpf.lock().unwrap(); bpf.as_mut() @@ -294,7 +295,7 @@ impl TcpTransportEbpfSupport { } /// Remove inlet port - pub fn remove_inlet_port(&self, port: u16) -> Result<()> { + pub fn remove_inlet_port(&self, port: Port) -> Result<()> { let mut bpf = self.bpf.lock().unwrap(); bpf.as_mut().unwrap().inlet_port_map.remove(&port).unwrap(); @@ -303,7 +304,7 @@ impl TcpTransportEbpfSupport { } /// Add outlet port - pub fn add_outlet_port(&self, port: u16) -> Result<()> { + pub fn add_outlet_port(&self, port: Port) -> Result<()> { let mut bpf = self.bpf.lock().unwrap(); bpf.as_mut() @@ -316,7 +317,7 @@ impl TcpTransportEbpfSupport { } /// Remove outlet port - pub fn remove_outlet_port(&self, port: u16) -> Result<()> { + pub fn remove_outlet_port(&self, port: Port) -> Result<()> { let mut bpf = self.bpf.lock().unwrap(); bpf.as_mut().unwrap().outlet_port_map.remove(&port).unwrap(); diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/mod.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/mod.rs index a9beb984d6e..fa3480877a3 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/mod.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/mod.rs @@ -1,17 +1,13 @@ mod common; mod ebpf_support; -mod outlet_listener_worker; mod portal_processor; mod portal_worker; mod portals; -mod processor; mod registry; mod transport; pub use common::*; pub use ebpf_support::*; -pub use outlet_listener_worker::*; -pub(crate) use portal_processor::*; +pub use portal_processor::*; pub use portal_worker::*; -pub use processor::*; pub use registry::*; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/outlet_listener_worker.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/outlet_listener_worker.rs deleted file mode 100644 index c94cf300200..00000000000 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/outlet_listener_worker.rs +++ /dev/null @@ -1,153 +0,0 @@ -use crate::ebpf_portal::{ - OckamPortalPacket, OutletMappingValue, OutletRegistry, PortalProcessor, PortalWorker, - TcpTransportEbpfSupport, -}; -use crate::portal::addresses::{Addresses, PortalType}; -use crate::TcpOutletOptions; -use ockam_core::{async_trait, Address, AllowAll, Any, DenyAll, Result, Route, Routed, Worker}; -use ockam_node::Context; -use pnet::transport::TransportSender; -use std::net::Ipv4Addr; -use std::sync::{Arc, RwLock}; -use tokio::net::TcpListener; -use tracing::{debug, info, warn}; - -/// Worker listens for new incoming connections. -pub struct OutletListenerWorker { - options: TcpOutletOptions, - - socket_write_handle: Arc>, - outlet_registry: OutletRegistry, - - dst_ip: Ipv4Addr, - dst_port: u16, - - ebpf_support: TcpTransportEbpfSupport, -} - -impl OutletListenerWorker { - /// Constructor. - pub fn new( - options: TcpOutletOptions, - socket_write_handle: Arc>, - outlet_registry: OutletRegistry, - dst_ip: Ipv4Addr, - dst_port: u16, - ebpf_support: TcpTransportEbpfSupport, - ) -> Self { - Self { - options, - socket_write_handle, - outlet_registry, - dst_ip, - dst_port, - ebpf_support, - } - } - - async fn new_outlet_connection( - &self, - ctx: &Context, - src_addr: Address, - msg: OckamPortalPacket<'_>, - return_route: Route, - ) -> Result<()> { - // TODO: Remove connection eventually? - - // debug!("New TCP connection"); - info!("New TCP connection"); - - let addresses = Addresses::generate(PortalType::EbpfOutlet); - - self.options - .setup_flow_control_for_outlet(ctx.flow_controls(), &addresses, &src_addr); - - let (sender, receiver) = tokio::sync::mpsc::channel(128); - - // FIXME: eBPF Address? - let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let assigned_port = tcp_listener.local_addr().unwrap().port(); - - let mapping = OutletMappingValue { - inlet_worker_address: return_route.recipient()?, - _addresses: addresses.clone(), - sender, - assigned_port, - }; - - let processor = PortalProcessor::new_outlet( - receiver, - addresses.clone(), - return_route, - tcp_listener, - assigned_port, - self.ebpf_support.clone(), - ); - let worker = PortalWorker::new( - None, - self.socket_write_handle.clone(), - assigned_port, - self.dst_ip, - self.dst_port, - Some(msg.into_owned()), - ); - - ctx.start_processor_with_access_control( - addresses.receiver_remote, - processor, - DenyAll, - AllowAll, - ) - .await?; - ctx.start_worker_with_access_control( - addresses.sender_remote, - worker, - AllowAll, // FIXME eBPF - DenyAll, - ) - .await?; - - self.outlet_registry.add_mapping(mapping.clone()); - - Ok(()) - } -} - -#[async_trait] -impl Worker for OutletListenerWorker { - type Message = Any; - type Context = Context; - - async fn handle_message( - &mut self, - ctx: &mut Self::Context, - msg: Routed, - ) -> Result<()> { - let return_route = msg.return_route(); - let src_addr = msg.src_addr(); - let inlet_worker_address = return_route.recipient()?; - let payload = msg.into_payload(); - - let msg: OckamPortalPacket = minicbor::decode(&payload)?; - - if msg.flags != 2 { - warn!("Outlet Listener Worker received a non SYN packet"); - return Ok(()); - } - - if self - .outlet_registry - .get_mapping2(&inlet_worker_address) - .is_some() - { - // FIXME: eBPF Should still send it - debug!("Received another SYN for an already created connection"); - return Ok(()); - } - - self.new_outlet_connection(ctx, src_addr, msg, return_route) - .await?; - - Ok(()) - } -} diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portal_processor.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portal_processor.rs index 69ede154893..563516f1c97 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portal_processor.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portal_processor.rs @@ -1,105 +1,282 @@ -use crate::ebpf_portal::{OckamPortalPacket, Port, RawSocketMessage, TcpTransportEbpfSupport}; -use crate::portal::addresses::Addresses; -use ockam_core::{async_trait, route, LocalMessage, Processor, Result, Route}; +use crate::ebpf_portal::{ + Inlet, InletConnection, InletRegistry, OckamPortalPacket, Outlet, OutletConnection, + OutletRegistry, Port, RawSocketMessage, +}; +use log::warn; +use ockam_core::{async_trait, route, LocalMessage, Processor, Result}; use ockam_node::Context; +use pnet::packet::ip::IpNextHeaderProtocol; +use pnet::packet::Packet; +use pnet::transport; +use pnet::transport::{ + tcp_packet_iter, TransportChannelType, TransportProtocol, TransportReceiver, TransportSender, +}; +use rand::random; +use std::net::{IpAddr, Ipv4Addr}; use std::sync::{Arc, RwLock}; -use tokio::net::TcpListener; -use tokio::sync::mpsc::Receiver; use tracing::info; -/// Processor responsible for receiving TCP packets for a certain connection. -// TODO: eBPF Can be a worker instead? -pub(crate) struct PortalProcessor { - receiver: Receiver, - addresses: Addresses, - current_route: Arc>, - assigned_port_state: Option, -} +/// Processor responsible for receiving all data with OCKAM_TCP_PORTAL_PROTOCOL on the machine +/// and redirect it to individual portal workers. +pub struct RawSocketProcessor { + socket_write_handle: Arc>, + // TODO: Remove lock by moving to blocking and returning back from blocking thread + socket_read_handle: Arc>, -struct AssignedPortState { - _tcp_listener: TcpListener, // Just hold it so that port is marked as taken - assigned_port: Port, - ebpf_support: TcpTransportEbpfSupport, + inlet_registry: InletRegistry, + outlet_registry: OutletRegistry, } -impl PortalProcessor { - /// Constructor. - pub fn new_inlet( - receiver: Receiver, - addresses: Addresses, - current_route: Arc>, - ) -> Self { - Self { - receiver, - addresses, - current_route, - assigned_port_state: None, - } - } +impl RawSocketProcessor { + pub(crate) async fn create( + ip_proto: u8, + inlet_registry: InletRegistry, + outlet_registry: OutletRegistry, + ) -> Result { + // FIXME: Use Layer3 + let (socket_write_handle, socket_read_handle) = transport::transport_channel( + 1024 * 1024, + TransportChannelType::Layer4(TransportProtocol::Ipv4(IpNextHeaderProtocol::new( + ip_proto, + ))), + ) + .unwrap(); - /// Constructor. - pub fn new_outlet( - receiver: Receiver, - addresses: Addresses, - current_route: Route, // Immutable - tcp_listener: TcpListener, - assigned_port: Port, - ebpf_support: TcpTransportEbpfSupport, - ) -> Self { - Self { - receiver, - addresses, - current_route: Arc::new(RwLock::new(current_route)), - assigned_port_state: Some(AssignedPortState { - _tcp_listener: tcp_listener, - assigned_port, - ebpf_support, - }), - } + let s = Self { + socket_write_handle: Arc::new(RwLock::new(socket_write_handle)), + socket_read_handle: Arc::new(RwLock::new(socket_read_handle)), + inlet_registry, + outlet_registry, + }; + + Ok(s) } -} -#[async_trait] -impl Processor for PortalProcessor { - type Context = Context; + async fn new_inlet_connection( + inlet: &Inlet, + src_ip: Ipv4Addr, + parsed_packet: &ParsedPacket, + ) -> Result>> { + // TODO: eBPF Remove connection eventually + + let is_paused = inlet.inlet_shared_state.read().unwrap().is_paused; - async fn initialize(&mut self, _context: &mut Self::Context) -> Result<()> { - if let Some(state) = &self.assigned_port_state { - state.ebpf_support.add_outlet_port(state.assigned_port)?; + if is_paused { + // Just drop the stream + return Ok(None); } - Ok(()) + // TODO: Make sure the connection can't be spoofed by someone having access to that Outlet + + let connection = Arc::new(InletConnection { + identifier: None, + connection_identifier: random(), + inlet_ip: parsed_packet.destination_ip, + client_ip: src_ip, + client_port: parsed_packet.source_port, + }); + + inlet.add_connection(connection.clone()); + + Ok(Some(connection)) + } + + /// Write handle to the socket + pub fn socket_write_handle(&self) -> Arc> { + self.socket_write_handle.clone() } - async fn shutdown(&mut self, _context: &mut Self::Context) -> Result<()> { - if let Some(state) = &self.assigned_port_state { - state.ebpf_support.remove_outlet_port(state.assigned_port)?; + async fn handle_inlet( + &self, + ctx: &Context, + inlet: Inlet, + connection: &InletConnection, + message: RawSocketMessage, + ) -> Result<()> { + let packet = OckamPortalPacket::from_raw_socket_message( + message, + connection.connection_identifier.clone(), + ); + + // debug!("Got packet, forwarding to the other side"); + info!("Got packet, forwarding to the other side"); + + let inlet_shared_state = inlet.inlet_shared_state.read().unwrap().clone(); + + if inlet_shared_state.is_paused { + return Ok(()); } + ctx.forward_from_address( + LocalMessage::new() + .with_onward_route(inlet_shared_state.route) + .with_return_route(route![inlet.portal_worker_address]) + .with_payload(minicbor::to_vec(packet)?), + ctx.address(), + ) + .await?; + Ok(()) } - async fn process(&mut self, ctx: &mut Self::Context) -> Result { - let message = match self.receiver.recv().await { - Some(message) => message, - None => return Ok(false), - }; - - let packet = OckamPortalPacket::from(message); + async fn handle_outlet( + &self, + ctx: &Context, + outlet: Outlet, + connection: &OutletConnection, + message: RawSocketMessage, + ) -> Result<()> { + let packet = OckamPortalPacket::from_raw_socket_message( + message, + connection.connection_identifier.clone(), + ); // debug!("Got packet, forwarding to the other side"); info!("Got packet, forwarding to the other side"); - let current_route = self.current_route.read().unwrap().clone(); ctx.forward_from_address( LocalMessage::new() - .with_onward_route(current_route) - .with_return_route(route![self.addresses.sender_remote.clone()]) + .with_onward_route(connection.return_route.clone()) + .with_return_route(route![outlet.portal_worker_address]) .with_payload(minicbor::to_vec(packet)?), - self.addresses.receiver_remote.clone(), + ctx.address(), ) .await?; + Ok(()) + } + + async fn get_new_packet( + socket_read_handle: Arc>, + ) -> Result> { + let parsed_packet = tokio::task::spawn_blocking(move || { + let mut socket_read_handle = socket_read_handle.write().unwrap(); // FIXME + let mut iterator = tcp_packet_iter(&mut socket_read_handle); + // TODO: Should we check the checksum? + let (packet, source_ip) = iterator.next().unwrap(); // FIXME + + let source_ip = match source_ip { + IpAddr::V4(ip) => ip, + IpAddr::V6(_) => return None, + }; + + let destination_ip = Ipv4Addr::LOCALHOST; // FIXME + let source_port = packet.get_source(); + let destination_port = packet.get_destination(); + let flags = packet.get_flags(); + + info!( + "PACKET LEN: {}. Source: {}, Destination: {}", + packet.payload().len(), + source_port, + destination_port, + ); + + let message = RawSocketMessage::from_packet(packet, source_ip); + + let parsed_packet = ParsedPacket { + message, + source_ip, + source_port, + flags, + destination_ip, + destination_port, + }; + + Some(parsed_packet) + }) + .await + .unwrap(); + + Ok(parsed_packet) + } +} + +struct ParsedPacket { + message: RawSocketMessage, + + source_ip: Ipv4Addr, + source_port: Port, + flags: u8, + + destination_ip: Ipv4Addr, + destination_port: Port, +} + +#[async_trait] +impl Processor for RawSocketProcessor { + type Context = Context; + + async fn process(&mut self, ctx: &mut Self::Context) -> Result { + let parsed_packet = Self::get_new_packet(self.socket_read_handle.clone()).await?; + + let parsed_packet = match parsed_packet { + Some(parsed_packet) => parsed_packet, + None => return Ok(false), + }; + + if let Some(inlet) = self + .inlet_registry + .get_inlet(parsed_packet.destination_port) + { + let connection = match inlet + .get_connection_internal(parsed_packet.source_ip, parsed_packet.source_port) + { + Some(connection) => { + // trace!("Existing connection from {}", packet.get_source()); + info!("Existing connection from {}", parsed_packet.source_port); + connection + } + None => { + if parsed_packet.flags != 2 { + warn!( + "Unknown connection packet from {}. Skipping", + parsed_packet.source_port + ); + return Ok(true); + } + + // debug!("New connection from {}", packet.get_source()); + info!("New connection from {}", parsed_packet.source_port); + match Self::new_inlet_connection( + &inlet, + parsed_packet.source_ip, + &parsed_packet, + ) + .await? + { + Some(connection) => connection, + None => return Ok(true), + } + } + }; + + self.handle_inlet(ctx, inlet, &connection, parsed_packet.message) + .await?; + + return Ok(true); + } + + let outlet = match self + .outlet_registry + .get_outlet(parsed_packet.source_ip, parsed_packet.source_port) + { + Some(outlet) => outlet, + None => return Ok(true), + }; + + let connection = match outlet.get_connection_internal(parsed_packet.destination_port) { + Some(connection) => { + // trace!("Existing connection to {}", packet.get_destination()); + info!("Existing connection to {}", parsed_packet.destination_port); + connection + } + None => return Ok(true), + }; + + self.handle_outlet(ctx, outlet, &connection, parsed_packet.message) + .await?; + Ok(true) } } diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portal_worker.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portal_worker.rs index 84b122b6ef5..44375670b17 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portal_worker.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portal_worker.rs @@ -1,46 +1,102 @@ -use crate::ebpf_portal::OckamPortalPacket; +use crate::ebpf_portal::{ + Inlet, InletConnection, OckamPortalPacket, Outlet, OutletConnection, Port, + TcpTransportEbpfSupport, +}; use ockam_core::{async_trait, Any, Result, Route, Routed, Worker}; use ockam_node::Context; use pnet::packet::tcp::MutableTcpPacket; use pnet::transport::TransportSender; use std::net::{IpAddr, Ipv4Addr}; use std::sync::{Arc, RwLock}; -use tracing::info; +use tokio::net::TcpListener; +use tracing::{info, warn}; + +/// PortalWorker mode of operation +pub enum PortalWorkerMode { + /// PortalWorker spawned for an Inlet + Inlet { + /// Inlet info + inlet: Inlet, + }, + /// PortalWorker spawned for an Outlet + Outlet { + /// Outlet info + outlet: Outlet, + }, +} -/// Worker responsible for writing data to the Socket that is received from the other side of the -/// TCP connection. +/// Worker listens for new incoming connections. pub struct PortalWorker { - current_route: Option>>, - // FIXME: eBPF I doubt there should be a mutable usage - socket_write_handle: Arc>, - src_port: u16, - dst_ip: Ipv4Addr, - dst_port: u16, + mode: PortalWorkerMode, - first_message: Option>, + socket_write_handle: Arc>, + ebpf_support: TcpTransportEbpfSupport, } impl PortalWorker { /// Constructor. - pub fn new( - current_route: Option>>, + pub fn new_inlet( socket_write_handle: Arc>, - src_port: u16, - dst_ip: Ipv4Addr, - dst_port: u16, - first_message: Option>, + inlet: Inlet, + ebpf_support: TcpTransportEbpfSupport, ) -> Self { Self { - current_route, + mode: PortalWorkerMode::Inlet { inlet }, socket_write_handle, - src_port, - dst_ip, - dst_port, - first_message, + ebpf_support, } } - async fn handle(&self, msg: OckamPortalPacket<'_>) -> Result<()> { + /// Constructor. + pub fn new_outlet( + socket_write_handle: Arc>, + outlet: Outlet, + ebpf_support: TcpTransportEbpfSupport, + ) -> Self { + Self { + mode: PortalWorkerMode::Outlet { outlet }, + socket_write_handle, + ebpf_support, + } + } + + async fn new_outlet_connection( + &self, + outlet: &Outlet, + identifier: Option, + msg: &OckamPortalPacket, + return_route: Route, + ) -> Result> { + // debug!("New TCP connection"); + info!("New TCP connection"); + + // FIXME: eBPF It should an IP address of the network device that we'll use to send packets, + // However, we don't know it here. + let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let assigned_port = tcp_listener.local_addr().unwrap().port(); + + let connection = Arc::new(OutletConnection { + identifier, + connection_identifier: msg.connection_identifier.clone(), + assigned_port, + _tcp_listener: Arc::new(tcp_listener), + return_route, + }); + + outlet.add_connection(connection.clone()); + + self.ebpf_support.add_outlet_port(assigned_port)?; + + Ok(connection) + } + + async fn handle( + &self, + msg: OckamPortalPacket, + src_port: Port, + dst_ip: Ipv4Addr, + dst_port: Port, + ) -> Result<()> { let buff_len = (msg.data_offset as usize) * 4 + msg.payload.len(); let buff = vec![0u8; buff_len]; @@ -62,28 +118,60 @@ impl PortalWorker { ); packet.set_payload(&msg.payload); - packet.set_source(self.src_port); - packet.set_destination(self.dst_port); + packet.set_source(src_port); + packet.set_destination(dst_port); let check = pnet::packet::tcp::ipv4_checksum( &packet.to_immutable(), // checksum is adjusted inside the eBPF in respect to the correct src IP addr &Ipv4Addr::new(0, 0, 0, 0), - &self.dst_ip, + &dst_ip, ); packet.set_checksum(check); let packet = packet.to_immutable(); + // TODO: We don't pick the source IP here, but it's important that it stays the same, + // Otherwise the receiving TCP connection would be disrupted. self.socket_write_handle .write() .unwrap() - .send_to(packet, IpAddr::V4(self.dst_ip)) + .send_to(packet, IpAddr::V4(dst_ip)) .unwrap(); Ok(()) } + + async fn handle_inlet( + &self, + inlet: &Inlet, + connection: &InletConnection, + msg: OckamPortalPacket, + ) -> Result<()> { + self.handle( + msg, + inlet.port, + connection.client_ip, + connection.client_port, + ) + .await + } + + async fn handle_outlet( + &self, + outlet: &Outlet, + connection: &OutletConnection, + msg: OckamPortalPacket, + ) -> Result<()> { + self.handle( + msg, + connection.assigned_port, + outlet.dst_ip, + outlet.dst_port, + ) + .await + } } #[async_trait] @@ -91,30 +179,53 @@ impl Worker for PortalWorker { type Message = Any; type Context = Context; - async fn initialize(&mut self, _context: &mut Self::Context) -> Result<()> { - if let Some(msg) = self.first_message.take() { - self.handle(msg).await?; - } - - Ok(()) - } - async fn handle_message( &mut self, _ctx: &mut Self::Context, msg: Routed, ) -> Result<()> { - // debug!("Got message, forwarding to the socket"); - info!("Got message, forwarding to the socket"); - - if let Some(current_route) = &self.current_route { - *current_route.write().unwrap() = msg.return_route(); - } - + let return_route = msg.return_route(); let payload = msg.into_payload(); let msg: OckamPortalPacket = minicbor::decode(&payload)?; - self.handle(msg).await + let identifier = None; // FIXME: Should be the Identifier of the other side + + match &self.mode { + PortalWorkerMode::Inlet { inlet } => { + if let Some(connection) = + inlet.get_connection_external(identifier, msg.connection_identifier.clone()) + { + self.handle_inlet(inlet, &connection, msg).await?; + + return Ok(()); + } + + warn!("Portal Worker in Inlet mode received a packet for an unknown connection"); + } + PortalWorkerMode::Outlet { outlet } => { + if let Some(connection) = outlet + .get_connection_external(identifier.clone(), msg.connection_identifier.clone()) + { + self.handle_outlet(outlet, &connection, msg).await?; + + return Ok(()); + } + + if msg.flags == 2 { + let connection = self + .new_outlet_connection(outlet, identifier, &msg, return_route) + .await?; + + self.handle_outlet(outlet, &connection, msg).await?; + + return Ok(()); + } + + warn!("Portal Worker in Outlet mode received a non SYN packet for an unknown connection"); + } + } + + Ok(()) } } diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portals.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portals.rs index b1e46d5e3b3..4ccfff918d2 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portals.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portals.rs @@ -1,8 +1,8 @@ -use crate::ebpf_portal::OutletListenerWorker; +use crate::ebpf_portal::{Port, PortalWorker}; use crate::portal::InletSharedState; use crate::{TcpInlet, TcpInletOptions, TcpOutletOptions, TcpTransport}; use core::fmt::Debug; -use ockam_core::{Address, DenyAll, Result, Route}; +use ockam_core::{Address, AllowAll, DenyAll, Result, Route}; use ockam_node::compat::asynchronous::resolve_peer; use ockam_node::WorkerBuilder; use ockam_transport_core::HostnamePort; @@ -20,6 +20,10 @@ impl TcpTransport { outlet_route: impl Into + Clone + Debug, options: TcpInletOptions, ) -> Result { + let outlet_route = outlet_route.into(); + + let next = outlet_route.next().cloned()?; + // TODO: eBPF Find correlation between bind_addr and iface? let bind_addr = bind_addr.into(); let tcp_listener = TcpListener::bind(bind_addr.clone()).await.unwrap(); // FIXME eBPF @@ -54,14 +58,23 @@ impl TcpTransport { } } - let _write_handle = self.start_raw_socket_processor_if_needed().await?; + let write_handle = self.start_raw_socket_processor_if_needed().await?; let inlet_shared_state = Arc::new(RwLock::new(InletSharedState { - route: outlet_route.into(), + route: outlet_route.clone(), is_paused: false, })); - self.ebpf_support.inlet_registry.create_inlet( + let portal_worker_address = Address::random_tagged("Ebpf.PortalWorker"); // FIXME + + options.setup_flow_control_for_address( + self.ctx().flow_controls(), + portal_worker_address.clone(), + &next, + ); + + let inlet_info = self.ebpf_support.inlet_registry.create_inlet( + portal_worker_address.clone(), options, local_address.port(), tcp_listener, @@ -70,12 +83,20 @@ impl TcpTransport { self.ebpf_support.add_inlet_port(port)?; + let worker = PortalWorker::new_inlet(write_handle, inlet_info, self.ebpf_support.clone()); + WorkerBuilder::new(worker) + .with_address(portal_worker_address) + .with_outgoing_access_control(DenyAll) + .with_incoming_access_control(AllowAll) // FIXME + .start(self.ctx()) + .await?; + Ok(TcpInlet::new_ebpf(local_address, inlet_shared_state)) } /// Stop the Raw Inlet #[instrument(skip(self), fields(port=port))] - pub async fn stop_raw_inlet(&self, port: u16) -> Result<()> { + pub async fn stop_raw_inlet(&self, port: Port) -> Result<()> { self.ebpf_support.inlet_registry.delete_inlet(port); Ok(()) @@ -87,12 +108,12 @@ impl TcpTransport { &self, address: impl Into
+ Clone + Debug, peer: HostnamePort, - options: TcpOutletOptions, + options: TcpOutletOptions, // FIXME ) -> Result<()> { // Resolve peer address as a host name and port tracing::Span::current().record("peer", peer.to_string()); - let address = address.into(); + let portal_worker_address = address.into(); // TODO: eBPF May be good to run resolution every time there is incoming connection, but that // would require also updating the self.ebpf_support.outlet_registry @@ -118,28 +139,27 @@ impl TcpTransport { let access_control = options.incoming_access_control.clone(); - options.setup_flow_control_for_outlet_listener(self.ctx().flow_controls(), &address); + options.setup_flow_control_for_outlet_listener( + self.ctx().flow_controls(), + &portal_worker_address, + ); - let outlet_listener_worker = OutletListenerWorker::new( - options, - write_handle, - self.ebpf_support.outlet_registry.clone(), + let outlet_info = self.ebpf_support.outlet_registry.add_outlet( + portal_worker_address.clone(), dst_ip, dst_port, - self.ebpf_support.clone(), ); - WorkerBuilder::new(outlet_listener_worker) - .with_address(address) + let portal_worker = + PortalWorker::new_outlet(write_handle, outlet_info, self.ebpf_support.clone()); + + WorkerBuilder::new(portal_worker) + .with_address(portal_worker_address) .with_incoming_access_control_arc(access_control) .with_outgoing_access_control(DenyAll) .start(self.ctx()) .await?; - self.ebpf_support - .outlet_registry - .add_outlet(dst_ip, dst_port); - Ok(()) } diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/processor.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/processor.rs deleted file mode 100644 index f94206097d3..00000000000 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/processor.rs +++ /dev/null @@ -1,253 +0,0 @@ -use crate::ebpf_portal::{ - InletMappingValue, InletRegistry, OutletRegistry, PortalProcessor, PortalWorker, - RawSocketMessage, -}; -use crate::portal::addresses::{Addresses, PortalType}; -use crate::portal::InletSharedState; -use crate::TcpInletOptions; -use ockam_core::{async_trait, AllowAll, DenyAll, Processor, Result}; -use ockam_node::Context; -use pnet::packet::ip::IpNextHeaderProtocol; -use pnet::packet::Packet; -use pnet::transport; -use pnet::transport::{ - tcp_packet_iter, TransportChannelType, TransportProtocol, TransportReceiver, TransportSender, -}; -use std::net::{IpAddr, Ipv4Addr}; -use std::sync::{Arc, RwLock}; -use tracing::{info, warn}; - -/// Processor responsible for receiving all data with OCKAM_TCP_PORTAL_PROTOCOL on the machine -/// and redirect it to individual portal workers. -pub struct RawSocketProcessor { - socket_write_handle: Arc>, - socket_read_handle: Arc>, - - inlet_registry: InletRegistry, - outlet_registry: OutletRegistry, -} - -impl RawSocketProcessor { - pub(crate) async fn create( - ip_proto: u8, - inlet_registry: InletRegistry, - outlet_registry: OutletRegistry, - ) -> Result { - let (socket_write_handle, socket_read_handle) = transport::transport_channel( - 1024 * 1024, - TransportChannelType::Layer4(TransportProtocol::Ipv4(IpNextHeaderProtocol::new( - ip_proto, - ))), - ) - .unwrap(); - - let s = Self { - socket_write_handle: Arc::new(RwLock::new(socket_write_handle)), - socket_read_handle: Arc::new(RwLock::new(socket_read_handle)), - inlet_registry, - outlet_registry, - }; - - Ok(s) - } - - async fn new_inlet_connection( - ctx: &Context, - options: TcpInletOptions, - inlet_shared_state: Arc>, - socket_write_handle: Arc>, - registry: &InletRegistry, - src_ip: Ipv4Addr, - parsed_packet: &ParsedPacket, - ) -> Result> { - // TODO: eBPF Remove connection eventually - - let addresses = Addresses::generate(PortalType::EbpfInlet); - - let inlet_shared_state = inlet_shared_state.read().unwrap().clone(); - - if inlet_shared_state.is_paused { - // Just drop the stream - return Ok(None); - } - - options.setup_flow_control( - ctx.flow_controls(), - &addresses, - inlet_shared_state.route.next()?, - ); - - // TODO: Make sure the connection can't be spoofed by someone having access to that Outlet - - let (sender, receiver) = tokio::sync::mpsc::channel(128); - - let mapping = InletMappingValue { - client_ip: src_ip, - client_port: parsed_packet.source, - _addresses: addresses.clone(), - sender, - }; - - let outlet_route = Arc::new(RwLock::new(inlet_shared_state.route)); - let processor = - PortalProcessor::new_inlet(receiver, addresses.clone(), outlet_route.clone()); - let worker = PortalWorker::new( - Some(outlet_route), - socket_write_handle, - parsed_packet.destination, - src_ip, - parsed_packet.source, - None, - ); - - ctx.start_processor_with_access_control( - addresses.receiver_remote, - processor, - DenyAll, - AllowAll, // FIXME eBPF - ) - .await?; - ctx.start_worker_with_access_control( - addresses.sender_remote, - worker, - AllowAll, // FIXME eBPF - DenyAll, - ) - .await?; - - registry.add_mapping(mapping.clone()); - - Ok(Some(mapping)) - } - - /// Write handle to the socket - pub fn socket_write_handle(&self) -> Arc> { - self.socket_write_handle.clone() - } -} - -struct ParsedPacket { - message: RawSocketMessage, - - source_ip: Ipv4Addr, - flags: u8, - source: u16, - destination: u16, -} - -#[async_trait] -impl Processor for RawSocketProcessor { - type Context = Context; - - async fn process(&mut self, ctx: &mut Self::Context) -> Result { - let socket_read_handle = self.socket_read_handle.clone(); - let parsed_packet = tokio::task::spawn_blocking(move || { - let mut socket_read_handle = socket_read_handle.write().unwrap(); // FIXME - let mut iterator = tcp_packet_iter(&mut socket_read_handle); - // TODO: Should we check the checksum? - let (packet, source_ip) = iterator.next().unwrap(); // FIXME - - let source_ip = match source_ip { - IpAddr::V4(ip) => ip, - IpAddr::V6(_) => return None, - }; - - let source = packet.get_source(); - let destination = packet.get_destination(); - let flags = packet.get_flags(); - - info!( - "PACKET LEN: {}. Source: {}, Destination: {}", - packet.payload().len(), - source, - destination, - ); - - let message = RawSocketMessage::from_packet(packet, source_ip); - - let parsed_packet = ParsedPacket { - message, - source_ip, - flags, - source, - destination, - }; - - Some(parsed_packet) - }) - .await - .unwrap(); - - let parsed_packet = match parsed_packet { - Some(parsed_packet) => parsed_packet, - None => return Ok(false), - }; - - if let Some((inlet_shared_state, options)) = self - .inlet_registry - .get_inlets_info(parsed_packet.destination) - { - let mapping = match self - .inlet_registry - .get_mapping(parsed_packet.source_ip, parsed_packet.source) - { - Some(mapping) => { - // trace!("Existing connection from {}", packet.get_source()); - info!("Existing connection from {}", parsed_packet.source); - mapping - } - None => { - if parsed_packet.flags != 2 { - warn!( - "Unknown connection packet from {}. Skipping", - parsed_packet.source - ); - return Ok(true); - } - - // debug!("New connection from {}", packet.get_source()); - info!("New connection from {}", parsed_packet.source); - match Self::new_inlet_connection( - ctx, - options, - inlet_shared_state, - self.socket_write_handle.clone(), - &self.inlet_registry, - parsed_packet.source_ip, - &parsed_packet, - ) - .await? - { - Some(mapping) => mapping, - None => return Ok(true), - } - } - }; - - mapping.sender.send(parsed_packet.message).await.unwrap(); - - return Ok(true); - } - - let _outlet = match self - .outlet_registry - .get_outlet(parsed_packet.source_ip, parsed_packet.source) - { - Some(outlet) => outlet, - None => return Ok(true), - }; - - let mapping = match self.outlet_registry.get_mapping(parsed_packet.destination) { - Some(mapping) => { - // trace!("Existing connection to {}", packet.get_destination()); - info!("Existing connection to {}", parsed_packet.destination); - mapping - } - None => return Ok(true), - }; - - mapping.sender.send(parsed_packet.message).await.unwrap(); - - Ok(true) - } -} diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/inlet.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/inlet.rs index 80a8532d7c5..bdbf2bc98c2 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/inlet.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/inlet.rs @@ -1,74 +1,54 @@ -use crate::ebpf_portal::RawSocketMessage; -use crate::portal::addresses::Addresses; +use crate::ebpf_portal::{ConnectionIdentifier, Port}; use crate::portal::InletSharedState; use crate::TcpInletOptions; +use ockam_core::Address; use std::collections::HashMap; use std::net::Ipv4Addr; use std::sync::{Arc, RwLock}; use tokio::net::TcpListener; -use tokio::sync::mpsc::Sender; /// Inlet registry #[derive(Default, Clone)] pub(crate) struct InletRegistry { - inlets: Arc>>, - mapping: Arc>>, + inlets: Arc>>, } impl InletRegistry { - /// Add new mapping - pub fn add_mapping(&self, mapping: InletMappingValue) { - // FIXME: eBPF duplicates - self.mapping.write().unwrap().push(mapping) - } - - /// Get mapping - pub fn get_mapping(&self, client_ip: Ipv4Addr, client_port: u16) -> Option { - let mapping = self.mapping.read().unwrap(); - - mapping.iter().find_map(|x| { - if x.client_ip == client_ip && x.client_port == client_port { - Some(x.clone()) - } else { - None - } - }) - } - /// Get inlets - pub fn get_inlets_info( - &self, - dst_port: u16, - ) -> Option<(Arc>, TcpInletOptions)> { + pub fn get_inlet(&self, dst_port: Port) -> Option { let inlets = self.inlets.read().unwrap(); - let inlet = inlets.get(&dst_port)?; - - Some((inlet.inlet_shared_state.clone(), inlet.options.clone())) + inlets.get(&dst_port).cloned() } /// Create inlet pub fn create_inlet( &self, + portal_worker_address: Address, options: TcpInletOptions, - port: u16, + port: Port, tcp_listener: TcpListener, inlet_shared_state: Arc>, - ) { + ) -> Inlet { let mut inlets = self.inlets.write().unwrap(); - inlets.insert( + let inlet_info = Inlet { + portal_worker_address, port, - InletInfo { - options, - inlet_shared_state, - tcp_listener, - }, - ); + options, + inlet_shared_state, + _tcp_listener: Arc::new(tcp_listener), + connections1: Default::default(), + connections2: Default::default(), + }; + + inlets.insert(port, inlet_info.clone()); + + inlet_info } /// Delete the inlet - pub fn delete_inlet(&self, port: u16) { + pub fn delete_inlet(&self, port: Port) { let mut inlets = self.inlets.write().unwrap(); inlets.remove(&port); @@ -76,24 +56,97 @@ impl InletRegistry { } /// Inlet info -pub struct InletInfo { +#[derive(Clone)] +pub struct Inlet { + /// PortalWorker Address + pub portal_worker_address: Address, + /// Port + pub port: Port, /// Route to the corresponding Outlet pub inlet_shared_state: Arc>, /// Options pub options: TcpInletOptions, /// Hold to mark the port as taken - pub tcp_listener: TcpListener, + pub _tcp_listener: Arc, + /// Same map with different key + connections1: Arc>>>, + connections2: Arc>>>, +} + +impl Inlet { + /// Add new mapping + pub fn add_connection(&self, connection: Arc) { + self.connections1.write().unwrap().insert( + InletConnectionKey1 { + client_ip: connection.client_ip, + client_port: connection.client_port, + }, + connection.clone(), + ); + self.connections2.write().unwrap().insert( + InletConnectionKey2 { + identifier: connection.identifier.clone(), + connection_identifier: connection.connection_identifier.clone(), + }, + connection, + ); + } + + /// Get mapping + pub fn get_connection_internal( + &self, + client_ip: Ipv4Addr, + client_port: Port, + ) -> Option> { + self.connections1 + .read() + .unwrap() + .get(&InletConnectionKey1 { + client_ip, + client_port, + }) + .cloned() + } + + /// Get mapping + pub(crate) fn get_connection_external( + &self, + identifier: Option, // Identity + connection_identifier: ConnectionIdentifier, + ) -> Option> { + self.connections2 + .read() + .unwrap() + .get(&InletConnectionKey2 { + identifier, + connection_identifier, + }) + .cloned() + } +} + +#[derive(Hash, PartialEq, Eq)] +struct InletConnectionKey1 { + client_ip: Ipv4Addr, + client_port: Port, +} + +#[derive(Hash, PartialEq, Eq)] +struct InletConnectionKey2 { + identifier: Option, + connection_identifier: ConnectionIdentifier, } /// Inlet Mapping -#[derive(Clone)] -pub(crate) struct InletMappingValue { +pub struct InletConnection { + /// Identity Identifier of the other side + pub identifier: Option, + /// Unique connection Identifier + pub connection_identifier: ConnectionIdentifier, + /// We can listen of multiple IPs + pub inlet_ip: Ipv4Addr, /// Client IP pub client_ip: Ipv4Addr, /// Client port - pub client_port: u16, - /// Addresses - pub _addresses: Addresses, - /// Sender to a processor - pub sender: Sender, + pub client_port: Port, } diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/outlet.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/outlet.rs index 2f92daeaea9..e039c136953 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/outlet.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/outlet.rs @@ -1,57 +1,20 @@ -use crate::ebpf_portal::RawSocketMessage; -use crate::portal::addresses::Addresses; -use ockam_core::Address; +use crate::ebpf_portal::{ConnectionIdentifier, Port}; +use ockam_core::{Address, Route}; use std::collections::HashMap; use std::net::Ipv4Addr; use std::sync::{Arc, RwLock}; -use tokio::sync::mpsc::Sender; +use tokio::net::TcpListener; /// Outlet registry #[derive(Default, Clone)] pub struct OutletRegistry { - targets_ports: Arc>>, - mapping: Arc>>, + outlets: Arc>>, } impl OutletRegistry { - /// Add mapping - pub(crate) fn add_mapping(&self, mapping: OutletMappingValue) { - // FIXME: eBPF duplicates - self.mapping.write().unwrap().push(mapping) - } - - /// Get mapping - pub(crate) fn get_mapping(&self, dst_port: u16) -> Option { - let mapping = self.mapping.read().unwrap(); - - mapping.iter().find_map(|x| { - if x.assigned_port == dst_port { - Some(x.clone()) - } else { - None - } - }) - } - - /// Get mapping - pub(crate) fn get_mapping2( - &self, - inlet_worker_address: &Address, - ) -> Option { - let mapping = self.mapping.read().unwrap(); - - mapping.iter().find_map(|x| { - if &x.inlet_worker_address == inlet_worker_address { - Some(x.clone()) - } else { - None - } - }) - } - /// Get outlet - pub fn get_outlet(&self, src_ip: Ipv4Addr, src_port: u16) -> Option { - self.targets_ports + pub fn get_outlet(&self, src_ip: Ipv4Addr, src_port: Port) -> Option { + self.outlets .read() .unwrap() .get(&OutletKey { @@ -62,35 +25,111 @@ impl OutletRegistry { } /// Add outlet - pub fn add_outlet(&self, dst_ip: Ipv4Addr, dst_port: u16) { + pub fn add_outlet( + &self, + portal_worker_address: Address, + dst_ip: Ipv4Addr, + dst_port: Port, + ) -> Outlet { + let outlet_info = Outlet { + dst_ip, + dst_port, + portal_worker_address, + connections1: Default::default(), + connections2: Default::default(), + }; + // TODO: eBPF Duplicates? - self.targets_ports + self.outlets .write() .unwrap() - .insert(OutletKey { dst_ip, dst_port }, OutletInfo {}); + .insert(OutletKey { dst_ip, dst_port }, outlet_info.clone()); + + outlet_info } } #[derive(Hash, PartialEq, Eq)] struct OutletKey { dst_ip: Ipv4Addr, - dst_port: u16, + dst_port: Port, } /// Outlet info #[derive(Clone)] -pub struct OutletInfo {} +pub struct Outlet { + /// Destination IP + pub dst_ip: Ipv4Addr, + /// Destination Port + pub dst_port: Port, + /// PortalWorker Address + pub portal_worker_address: Address, + /// Same map with different key + connections1: Arc>>>, + connections2: Arc>>>, +} + +impl Outlet { + /// Add mapping + pub(crate) fn add_connection(&self, connection: Arc) { + self.connections1 + .write() + .unwrap() + .insert(connection.assigned_port, connection.clone()); + self.connections2.write().unwrap().insert( + OutletConnectionKey { + identifier: connection.identifier.clone(), + connection_identifier: connection.connection_identifier.clone(), + }, + connection, + ); + } + + /// Get Connection + pub(crate) fn get_connection_internal( + &self, + assigned_port: Port, + ) -> Option> { + self.connections1 + .read() + .unwrap() + .get(&assigned_port) + .cloned() + } + + /// Get mapping + pub(crate) fn get_connection_external( + &self, + identifier: Option, // Identity + connection_identifier: ConnectionIdentifier, + ) -> Option> { + self.connections2 + .read() + .unwrap() + .get(&OutletConnectionKey { + identifier, + connection_identifier, + }) + .cloned() + } +} + +#[derive(Hash, PartialEq, Eq)] +struct OutletConnectionKey { + identifier: Option, + connection_identifier: ConnectionIdentifier, +} /// Outlet mapping -#[derive(Clone)] -pub(crate) struct OutletMappingValue { - // TODO: eBPF Add identifier? - /// The other side's Inlet worker address - pub inlet_worker_address: Address, +pub struct OutletConnection { + /// Identity Identifier of the other side + pub identifier: Option, + /// Unique connection Identifier + pub connection_identifier: ConnectionIdentifier, /// Assigned port on our machine for a specific connection - pub assigned_port: u16, - /// Addresses - pub _addresses: Addresses, - /// Sender to the processor - pub sender: Sender, + pub assigned_port: Port, + /// Route to the other side PortalWorker + pub return_route: Route, + /// To hold the port + pub _tcp_listener: Arc, } diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs index b9dd3a4b5e9..d6b22e4af39 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs @@ -23,7 +23,7 @@ use tracing::{debug, error, instrument}; #[derive(Debug, Clone)] pub struct InletSharedState { pub route: Route, - pub is_paused: bool, // FIXME: eBPF Not implemented + pub is_paused: bool, } /// A TCP Portal Inlet listen processor diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/options.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/options.rs index baaca7d8f7e..fd08cc40960 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/options.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/options.rs @@ -81,13 +81,22 @@ impl TcpInletOptions { flow_controls: &FlowControls, addresses: &Addresses, next: &Address, + ) { + self.setup_flow_control_for_address(flow_controls, addresses.sender_remote.clone(), next) + } + + pub(crate) fn setup_flow_control_for_address( + &self, + flow_controls: &FlowControls, + address: Address, + next: &Address, ) { if let Some(flow_control_id) = flow_controls .find_flow_control_with_producer_address(next) .map(|x| x.flow_control_id().clone()) { // Allow a sender with corresponding flow_control_id send messages to this address - flow_controls.add_consumer(addresses.sender_remote.clone(), &flow_control_id); + flow_controls.add_consumer(address, &flow_control_id); } } } diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs b/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs index a8a9b9bfb67..caaef38fd2c 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs @@ -129,8 +129,8 @@ impl TcpTransport { #[derive(Clone, Debug)] pub struct TcpInlet { socket_address: SocketAddr, - state: TcpInletState, inlet_shared_state: Arc>, + state: TcpInletState, } #[derive(Clone, Debug)] @@ -165,8 +165,8 @@ impl TcpInlet { ) -> Self { Self { socket_address, - state: TcpInletState::Regular { processor_address }, inlet_shared_state, + state: TcpInletState::Regular { processor_address }, } } @@ -177,8 +177,8 @@ impl TcpInlet { ) -> Self { Self { socket_address, - state: TcpInletState::Ebpf, inlet_shared_state, + state: TcpInletState::Ebpf, } }