From 6441ebc15a40b38251e9c678690597bb11728c86 Mon Sep 17 00:00:00 2001 From: Hugo van der Wijst Date: Thu, 29 Feb 2024 02:09:47 -0800 Subject: [PATCH 1/2] Only upload large files once. Currently, upload requests are handled in parallel without knowledge of other ongoing requests. If multiple actions depend on the same set of large locally available artifacts, this results in many upload requests of the same blobs. To solve this behavior, we store new large blob upload requests in a dashmap. If a request is already ongoing, wait a notification of that upload finishing instead of starting a new upload. --- remote_execution/oss/re_grpc/Cargo.toml | 1 + remote_execution/oss/re_grpc/src/client.rs | 142 +++++++++++++++------ 2 files changed, 106 insertions(+), 37 deletions(-) diff --git a/remote_execution/oss/re_grpc/Cargo.toml b/remote_execution/oss/re_grpc/Cargo.toml index 1e71767600d3..1f26459ebe75 100644 --- a/remote_execution/oss/re_grpc/Cargo.toml +++ b/remote_execution/oss/re_grpc/Cargo.toml @@ -9,6 +9,7 @@ version = "0.1.0" [dependencies] anyhow = { workspace = true } dupe = { workspace = true } +dashmap = { workspace = true } futures = { workspace = true } gazebo = { workspace = true } http = { workspace = true } diff --git a/remote_execution/oss/re_grpc/src/client.rs b/remote_execution/oss/re_grpc/src/client.rs index 3c5b8a3b941c..3064d71c2b2b 100644 --- a/remote_execution/oss/re_grpc/src/client.rs +++ b/remote_execution/oss/re_grpc/src/client.rs @@ -478,6 +478,13 @@ impl FindMissingCache { } } +#[derive(Clone)] +enum OngoingUploadStatus { + Active(tokio::sync::watch::Receiver>), + Done, + Error, +} + pub struct REClient { runtime_opts: RERuntimeOpts, grpc_clients: GRPCClients, @@ -485,6 +492,7 @@ pub struct REClient { instance_name: InstanceName, // buck2 calls find_missing for same blobs find_missing_cache: Mutex, + prev_uploads: dashmap::DashMap, } impl Drop for REClient { @@ -559,6 +567,7 @@ impl REClient { ttl: Duration::from_secs(12 * 60 * 60), // 12 hours TODO: Tune this parameter last_check: Instant::now(), }), + prev_uploads: dashmap::DashMap::new(), } } @@ -726,6 +735,7 @@ impl REClient { request, self.capabilities.max_total_batch_size, self.runtime_opts.max_concurrent_uploads_per_action, + &self.prev_uploads, |re_request| async { let metadata = metadata.clone(); let mut cas_client = self.grpc_clients.cas_client.clone(); @@ -1186,6 +1196,7 @@ async fn upload_impl( request: UploadRequest, max_total_batch_size: usize, max_concurrent_uploads: Option, + prev_uploads: &dashmap::DashMap, cas_f: impl Fn(BatchUpdateBlobsRequest) -> Cas + Sync + Send + Copy, bystream_fut: impl Fn(Vec) -> Byt + Sync + Send + Copy, ) -> anyhow::Result @@ -1246,10 +1257,9 @@ where // Create futures for any files that needs uploading. for file in request.files_with_digest.unwrap_or_default() { - let hash = file.digest.hash.clone(); - let size = file.digest.size_in_bytes; + let digest = file.digest.clone(); let name = file.name.clone(); - if size < max_total_batch_size as i64 { + if digest.size_in_bytes < max_total_batch_size as i64 { batched_blob_updates.push(BatchUploadRequest::File(file)); continue; } @@ -1258,45 +1268,96 @@ where "{}uploads/{}/blobs/{}/{}", instance_name.as_resource_prefix(), client_uuid, - hash.clone(), - size + file.digest.hash, + file.digest.size_in_bytes ); + + enum UploadStatus { + New(tokio::sync::watch::Sender>), + Ongoing(OngoingUploadStatus), + } + + let upload_status = match prev_uploads.entry(digest.clone()) { + dashmap::mapref::entry::Entry::Occupied(o) => UploadStatus::Ongoing(o.get().clone()), + dashmap::mapref::entry::Entry::Vacant(v) => { + let (tx, rx) = tokio::sync::watch::channel(Err(())); + v.insert(OngoingUploadStatus::Active(rx)); + UploadStatus::New(tx) + } + }; let fut = async move { - let mut file = tokio::fs::File::open(&name) - .await - .with_context(|| format!("Opening `{name}` for reading failed"))?; - let mut data = vec![0; max_total_batch_size]; - - let mut write_offset = 0; - let mut upload_segments = Vec::new(); - loop { - let length = file - .read(&mut data) - .await - .with_context(|| format!("Error reading from {name}"))?; - if length == 0 { - break; + match upload_status { + UploadStatus::Ongoing(OngoingUploadStatus::Active(mut rx)) => { + // Another task was already uploading this artifact, wait for it complete and report result. + rx.changed().await?; + rx.borrow_and_update().as_ref().map_err(|_e| { + anyhow::anyhow!("Upload queued for previous action failed.") + })?; } - upload_segments.push(WriteRequest { - resource_name: resource_name.to_owned(), - write_offset, - finish_write: false, - data: data[..length].to_owned(), - }); - write_offset += length as i64; - } - upload_segments - .last_mut() - .with_context(|| format!("Read no segments from `{name} "))? - .finish_write = true; + UploadStatus::Ongoing(OngoingUploadStatus::Done) => { + // Another task has already completed the upload of this artifact, no need to do any work. + } + UploadStatus::Ongoing(OngoingUploadStatus::Error) => { + // Another task tried to perform the transmission, but failed. + anyhow::bail!("Upload queued for previous action failed.") + } + UploadStatus::New(tx) => { + let mut file = tokio::fs::File::open(&name) + .await + .with_context(|| format!("Opening `{name}` for reading failed"))?; + let mut data = vec![0; max_total_batch_size]; + + let mut write_offset = 0; + let mut upload_segments = Vec::new(); + loop { + let length = file + .read(&mut data) + .await + .with_context(|| format!("Error reading from {name}"))?; + if length == 0 { + break; + } + upload_segments.push(WriteRequest { + resource_name: resource_name.to_owned(), + write_offset, + finish_write: false, + data: data[..length].to_owned(), + }); + write_offset += length as i64; + } + upload_segments + .last_mut() + .with_context(|| format!("Read no segments from `{name} "))? + .finish_write = true; + + let upload_ret = bystream_fut(upload_segments) + .await + .and_then(|resp| { + if resp.committed_size != digest.size_in_bytes { + Err(anyhow::anyhow!( + "Failed to upload `{name}`: invalid committed_size from WriteResponse" + )) + } + else { + Ok(()) + } + }); - let resp = bystream_fut(upload_segments).await?; - if resp.committed_size != size { - return Err(anyhow::anyhow!( - "Failed to upload `{name}`: invalid committed_size from WriteResponse" - )); + // Mark artifact as uploaded and notify other potentially waiting tasks. + if upload_ret.is_ok() { + prev_uploads.alter(&digest, |_, _| OngoingUploadStatus::Done); + let _ = tx.send(upload_ret.as_ref().map_err(|_| ()).cloned()); + } else { + prev_uploads.alter(&digest, |_, _| OngoingUploadStatus::Error); + let _ = tx.send(Err(())); + } + + // Only propage errors _after_ notifying other waiting tasks that this task is complete. + upload_ret?; + } } - Ok(vec![hash]) + + Ok(vec![digest.hash]) }; upload_futures.push(Box::pin(fut)); } @@ -2071,6 +2132,7 @@ mod tests { req, 10000, None, + &dashmap::DashMap::new(), |req| { let res = res.clone(); let digest1 = digest1.clone(); @@ -2154,6 +2216,7 @@ mod tests { req, 10, // kept small to simulate a large file upload None, + &dashmap::DashMap::new(), |req| { let res = res.clone(); let digest1 = digest1.clone(); @@ -2228,6 +2291,7 @@ mod tests { req, 10, // kept small to simulate a large inlined upload None, + &dashmap::DashMap::new(), |req| { let res = res.clone(); let digest1 = digest1.clone(); @@ -2289,6 +2353,7 @@ mod tests { req, 10, None, + &dashmap::DashMap::new(), |_req| async move { panic!("This should not be called as there are no blobs to upload in batch"); }, @@ -2350,6 +2415,7 @@ mod tests { req, 3, None, + &dashmap::DashMap::new(), |_req| async move { panic!("Not called"); }, @@ -2391,6 +2457,7 @@ mod tests { req, 0, None, + &dashmap::DashMap::new(), |_req| async move { panic!("Not called"); }, @@ -2437,6 +2504,7 @@ mod tests { req, 1, None, + &dashmap::DashMap::new(), |_req| async move { panic!("Not called"); }, From 58244392e8dd6dd884ab0a3fb62439ecf7374924 Mon Sep 17 00:00:00 2001 From: Hugo van der Wijst Date: Mon, 26 Aug 2024 11:37:43 -0700 Subject: [PATCH 2/2] Import `dashmap::DashMap`. --- remote_execution/oss/re_grpc/src/client.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/remote_execution/oss/re_grpc/src/client.rs b/remote_execution/oss/re_grpc/src/client.rs index 3064d71c2b2b..0db7eda30741 100644 --- a/remote_execution/oss/re_grpc/src/client.rs +++ b/remote_execution/oss/re_grpc/src/client.rs @@ -19,6 +19,7 @@ use std::time::Instant; use anyhow::Context; use buck2_re_configuration::Buck2OssReConfiguration; use buck2_re_configuration::HttpHeader; +use dashmap::DashMap; use dupe::Dupe; use futures::future::BoxFuture; use futures::future::Future; @@ -492,7 +493,7 @@ pub struct REClient { instance_name: InstanceName, // buck2 calls find_missing for same blobs find_missing_cache: Mutex, - prev_uploads: dashmap::DashMap, + prev_uploads: DashMap, } impl Drop for REClient { @@ -567,7 +568,7 @@ impl REClient { ttl: Duration::from_secs(12 * 60 * 60), // 12 hours TODO: Tune this parameter last_check: Instant::now(), }), - prev_uploads: dashmap::DashMap::new(), + prev_uploads: DashMap::new(), } } @@ -1196,7 +1197,7 @@ async fn upload_impl( request: UploadRequest, max_total_batch_size: usize, max_concurrent_uploads: Option, - prev_uploads: &dashmap::DashMap, + prev_uploads: &DashMap, cas_f: impl Fn(BatchUpdateBlobsRequest) -> Cas + Sync + Send + Copy, bystream_fut: impl Fn(Vec) -> Byt + Sync + Send + Copy, ) -> anyhow::Result @@ -2132,7 +2133,7 @@ mod tests { req, 10000, None, - &dashmap::DashMap::new(), + &DashMap::new(), |req| { let res = res.clone(); let digest1 = digest1.clone(); @@ -2216,7 +2217,7 @@ mod tests { req, 10, // kept small to simulate a large file upload None, - &dashmap::DashMap::new(), + &DashMap::new(), |req| { let res = res.clone(); let digest1 = digest1.clone(); @@ -2291,7 +2292,7 @@ mod tests { req, 10, // kept small to simulate a large inlined upload None, - &dashmap::DashMap::new(), + &DashMap::new(), |req| { let res = res.clone(); let digest1 = digest1.clone(); @@ -2353,7 +2354,7 @@ mod tests { req, 10, None, - &dashmap::DashMap::new(), + &DashMap::new(), |_req| async move { panic!("This should not be called as there are no blobs to upload in batch"); }, @@ -2415,7 +2416,7 @@ mod tests { req, 3, None, - &dashmap::DashMap::new(), + &DashMap::new(), |_req| async move { panic!("Not called"); }, @@ -2457,7 +2458,7 @@ mod tests { req, 0, None, - &dashmap::DashMap::new(), + &DashMap::new(), |_req| async move { panic!("Not called"); }, @@ -2504,7 +2505,7 @@ mod tests { req, 1, None, - &dashmap::DashMap::new(), + &DashMap::new(), |_req| async move { panic!("Not called"); },