From a23353c21c30bce6fd9fcf1085b4f92fdbec3106 Mon Sep 17 00:00:00 2001 From: Flavio Castelli Date: Fri, 14 Jul 2023 18:01:56 +0200 Subject: [PATCH 1/2] test: print memory usage right after bootstrap Print the memory usage over the debug channel right after the workers initialization is done Signed-off-by: Flavio Castelli --- Cargo.lock | 56 +++++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 ++ src/main.rs | 34 +++++++++++++++++++++++++++++++- 3 files changed, 91 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index e06b73a2..e080630a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1551,6 +1551,16 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cda653ca797810c02f7ca4b804b40b8b95ae046eb989d356bce17919a8c25499" +[[package]] +name = "flate2" +version = "1.0.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1999,6 +2009,15 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "humansize" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7" +dependencies = [ + "libm", +] + [[package]] name = "humantime" version = "2.1.0" @@ -2457,6 +2476,12 @@ version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7012b1bbb0719e1097c47611d3898568c546d597c2e74d66f6087edd5233ff4" +[[package]] +name = "linux-raw-sys" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4" + [[package]] name = "linux-raw-sys" version = "0.3.8" @@ -3480,6 +3505,7 @@ dependencies = [ "anyhow", "clap", "daemonize", + "humansize", "itertools 0.11.0", "k8s-openapi", "lazy_static", @@ -3487,6 +3513,7 @@ dependencies = [ "opentelemetry", "opentelemetry-otlp", "policy-evaluator", + "procfs", "rayon", "rstest", "semver", @@ -3577,6 +3604,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "procfs" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "943ca7f9f29bab5844ecd8fdb3992c5969b6622bb9609b9502fef9b4310e3f1f" +dependencies = [ + "bitflags 1.3.2", + "byteorder", + "chrono", + "flate2", + "hex", + "lazy_static", + "rustix 0.36.15", +] + [[package]] name = "prost" version = "0.9.0" @@ -4041,6 +4083,20 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "0.36.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c37f1bd5ef1b5422177b7646cba67430579cfe2ace80f284fee876bca52ad941" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes 1.0.11", + "libc", + "linux-raw-sys 0.1.4", + "windows-sys 0.45.0", +] + [[package]] name = "rustix" version = "0.37.23" diff --git a/Cargo.toml b/Cargo.toml index 5c3182be..ee8ce351 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,12 +14,14 @@ edition = "2021" anyhow = "1.0" clap = { version = "4.2", features = [ "cargo", "env" ] } daemonize = "0.5" +humansize = "2.1" itertools = "0.11.0" k8s-openapi = { version = "0.18.0", default-features = false, features = ["v1_26"] } lazy_static = "1.4.0" num_cpus = "1.16.0" opentelemetry-otlp = { version = "0.10.0", features = ["metrics", "tonic"] } opentelemetry = { version = "0.17", default-features = false, features = ["metrics", "trace", "rt-tokio", "serialize"] } +procfs = "0.15" policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", tag = "v0.11.0" } rayon = "1.6" serde_json = "1.0" diff --git a/src/main.rs b/src/main.rs index cccf8ec6..5784151c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ use policy_evaluator::policy_fetcher::verify::FulcioAndRekorData; use policy_evaluator::{callback_handler::CallbackHandlerBuilder, kube}; use std::{fs, path::PathBuf, process, sync::RwLock, thread}; use tokio::{runtime::Runtime, sync::mpsc, sync::oneshot}; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; mod admission_review; mod api; @@ -335,6 +335,7 @@ fn main() -> Result<()> { } } info!(status = "done", "worker pool bootstrap"); + memory_usage(pool_size); // All is good, we can start listening for incoming requests through the // web server @@ -367,6 +368,37 @@ fn main() -> Result<()> { Ok(()) } +fn memory_usage(pool_size: usize) { + let process = match procfs::process::Process::myself() { + Ok(p) => p, + Err(e) => { + warn!(error =? e, "cannot access process stats"); + return; + } + }; + let mem_stats = match process.statm() { + Ok(s) => s, + Err(e) => { + warn!(error =? e, "cannot access process memory stats"); + return; + } + }; + + let formatter = humansize::make_format(humansize::DECIMAL); + + let vm_size = mem_stats.size * procfs::page_size(); + let vm_rss = mem_stats.resident * procfs::page_size(); + + debug!( + VmSize = formatter(vm_size), + VmSizeBytes = vm_size, + VmRSS = formatter(vm_rss), + VmRSSBytes = vm_rss, + pool_size, + "memory usage" + ); +} + fn fatal_error(msg: String) { let trace_system_ready = TRACE_SYSTEM_INITIALIZED.read().unwrap(); if *trace_system_ready { From ef7724b89ebc77ede34a4c490bec505ae0957202 Mon Sep 17 00:00:00 2001 From: Flavio Castelli Date: Mon, 17 Jul 2023 14:28:28 +0200 Subject: [PATCH 2/2] fix(perf): reduce memory usage Reduce the amount of memory being used by policy-server at runtime. This table contains a comparison of the memory usage (Mb) before and after this optimization: | Workers (#) | Vanilla (Mb) | Optimized (Mb) | Improvement (%) | |-------------|--------------|----------------|-----------------| | 1 | 634.356 | 506.484 | -20.16 | | 2 | 790.672 | 598.902 | -24.25 | | 3 | 954.726 | 680.124 | -28.76 | | 4 | 1108.042 | 774.498 | -30.10 | Signed-off-by: Flavio Castelli --- src/worker_pool.rs | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/src/worker_pool.rs b/src/worker_pool.rs index ec80df0c..d1cc8031 100644 --- a/src/worker_pool.rs +++ b/src/worker_pool.rs @@ -163,9 +163,11 @@ impl WorkerPool { } }; - let precompiled_policies = + // Use a reference counter to share access to precompiled policies + // between workers. This reduces memory usage + let precompiled_policies: Arc = match precompile_policies(&engine, &bootstrap_data.fetched_policies) { - Ok(pp) => pp, + Ok(pp) => Arc::new(pp), Err(e) => { eprintln!("{e}"); std::process::exit(1); @@ -202,10 +204,16 @@ impl WorkerPool { warn!("policy timeout protection is disabled"); } + // Use a reference counter to share access to policies + // between workers. This reduces memory usage + let policies = Arc::new(bootstrap_data.policies); + for n in 1..=pool_size { let (tx, rx) = mpsc::channel::(32); worker_tx_chans.push(tx); + // Each worker has its own wasmtime::Engine, sharing the + // same engine across all the workers leads to bad performance let engine = match wasmtime::Engine::new(&wasmtime_config) { Ok(e) => e, Err(e) => { @@ -225,16 +233,17 @@ impl WorkerPool { }; worker_engines.push(engine.clone()); - let policies = bootstrap_data.policies.clone(); let modules = precompiled_policies.clone(); let b = barrier.clone(); let canary = boot_canary.clone(); let callback_handler_tx = self.callback_handler_tx.clone(); let always_accept_admission_reviews_on_namespace = self.always_accept_admission_reviews_on_namespace.clone(); + let policies = policies.clone(); let join = thread::spawn(move || -> Result<()> { info!(spawned = n, total = pool_size, "spawning worker"); + let mut worker = match Worker::new( rx, &policies, @@ -252,6 +261,10 @@ impl WorkerPool { return Err(anyhow!("Worker {} couldn't start: {}", n, e)); } }; + // Drop the Arc references ASAP, they are no longer needed + // at this point + drop(policies); + drop(modules); b.wait(); debug!(id = n, "worker loop start"); @@ -262,6 +275,13 @@ impl WorkerPool { }); join_handles.push(join); } + + // Deallocate all the memory used by the precompiled policies since + // they are no longer needed. Without this explicit cleanup + // the reference would be dropped right before Policy Server exits, + // meaning a lot of memory would have been consumed without a valid reason + // during the whole execution time + drop(precompiled_policies); barrier.wait(); if !boot_canary.load(Ordering::SeqCst) {