Skip to content

Commit

Permalink
Merge pull request #496 from flavio/mem-reduction
Browse files Browse the repository at this point in the history
fix(perf): reduce memory usage
  • Loading branch information
flavio authored Jul 17, 2023
2 parents 9fe6473 + ef7724b commit dd02489
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 4 deletions.
56 changes: 56 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ edition = "2021"
anyhow = "1.0"
clap = { version = "4.2", features = [ "cargo", "env" ] }
daemonize = "0.5"
humansize = "2.1"
itertools = "0.11.0"
k8s-openapi = { version = "0.18.0", default-features = false, features = ["v1_26"] }
lazy_static = "1.4.0"
num_cpus = "1.16.0"
opentelemetry-otlp = { version = "0.10.0", features = ["metrics", "tonic"] }
opentelemetry = { version = "0.17", default-features = false, features = ["metrics", "trace", "rt-tokio", "serialize"] }
procfs = "0.15"
policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", tag = "v0.11.0" }
rayon = "1.6"
serde_json = "1.0"
Expand Down
34 changes: 33 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use policy_evaluator::policy_fetcher::verify::FulcioAndRekorData;
use policy_evaluator::{callback_handler::CallbackHandlerBuilder, kube};
use std::{fs, path::PathBuf, process, sync::RwLock, thread};
use tokio::{runtime::Runtime, sync::mpsc, sync::oneshot};
use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};

mod admission_review;
mod api;
Expand Down Expand Up @@ -335,6 +335,7 @@ fn main() -> Result<()> {
}
}
info!(status = "done", "worker pool bootstrap");
memory_usage(pool_size);

// All is good, we can start listening for incoming requests through the
// web server
Expand Down Expand Up @@ -367,6 +368,37 @@ fn main() -> Result<()> {
Ok(())
}

fn memory_usage(pool_size: usize) {
let process = match procfs::process::Process::myself() {
Ok(p) => p,
Err(e) => {
warn!(error =? e, "cannot access process stats");
return;
}
};
let mem_stats = match process.statm() {
Ok(s) => s,
Err(e) => {
warn!(error =? e, "cannot access process memory stats");
return;
}
};

let formatter = humansize::make_format(humansize::DECIMAL);

let vm_size = mem_stats.size * procfs::page_size();
let vm_rss = mem_stats.resident * procfs::page_size();

debug!(
VmSize = formatter(vm_size),
VmSizeBytes = vm_size,
VmRSS = formatter(vm_rss),
VmRSSBytes = vm_rss,
pool_size,
"memory usage"
);
}

fn fatal_error(msg: String) {
let trace_system_ready = TRACE_SYSTEM_INITIALIZED.read().unwrap();
if *trace_system_ready {
Expand Down
26 changes: 23 additions & 3 deletions src/worker_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,11 @@ impl WorkerPool {
}
};

let precompiled_policies =
// Use a reference counter to share access to precompiled policies
// between workers. This reduces memory usage
let precompiled_policies: Arc<PrecompiledPolicies> =
match precompile_policies(&engine, &bootstrap_data.fetched_policies) {
Ok(pp) => pp,
Ok(pp) => Arc::new(pp),
Err(e) => {
eprintln!("{e}");
std::process::exit(1);
Expand Down Expand Up @@ -202,10 +204,16 @@ impl WorkerPool {
warn!("policy timeout protection is disabled");
}

// Use a reference counter to share access to policies
// between workers. This reduces memory usage
let policies = Arc::new(bootstrap_data.policies);

for n in 1..=pool_size {
let (tx, rx) = mpsc::channel::<EvalRequest>(32);
worker_tx_chans.push(tx);

// Each worker has its own wasmtime::Engine, sharing the
// same engine across all the workers leads to bad performance
let engine = match wasmtime::Engine::new(&wasmtime_config) {
Ok(e) => e,
Err(e) => {
Expand All @@ -225,16 +233,17 @@ impl WorkerPool {
};
worker_engines.push(engine.clone());

let policies = bootstrap_data.policies.clone();
let modules = precompiled_policies.clone();
let b = barrier.clone();
let canary = boot_canary.clone();
let callback_handler_tx = self.callback_handler_tx.clone();
let always_accept_admission_reviews_on_namespace =
self.always_accept_admission_reviews_on_namespace.clone();
let policies = policies.clone();

let join = thread::spawn(move || -> Result<()> {
info!(spawned = n, total = pool_size, "spawning worker");

let mut worker = match Worker::new(
rx,
&policies,
Expand All @@ -252,6 +261,10 @@ impl WorkerPool {
return Err(anyhow!("Worker {} couldn't start: {}", n, e));
}
};
// Drop the Arc references ASAP, they are no longer needed
// at this point
drop(policies);
drop(modules);
b.wait();

debug!(id = n, "worker loop start");
Expand All @@ -262,6 +275,13 @@ impl WorkerPool {
});
join_handles.push(join);
}

// Deallocate all the memory used by the precompiled policies since
// they are no longer needed. Without this explicit cleanup
// the reference would be dropped right before Policy Server exits,
// meaning a lot of memory would have been consumed without a valid reason
// during the whole execution time
drop(precompiled_policies);
barrier.wait();

if !boot_canary.load(Ordering::SeqCst) {
Expand Down

0 comments on commit dd02489

Please sign in to comment.