Skip to content

Commit

Permalink
buck2_execute: implement OSS upload_blob for local_only cache upl…
Browse files Browse the repository at this point in the history
…oads

Forward-port of patch 4 in <#477>,
providing a clear piece of missing functionality: in the event that stdout
or stderr were more than 50KiB of output when caching `local_only` actions,
then this dead path was taken, and so stdout/stderr would not be uploaded
successfully in the cache.

Co-authored-by: Austin Seipp <[email protected]>
  • Loading branch information
cormacrelf and thoughtpolice committed Sep 6, 2024
1 parent 9069798 commit 0088a2f
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 13 deletions.
15 changes: 12 additions & 3 deletions app/buck2_execute/src/execute/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ use anyhow::Context;
use buck2_common::file_ops::FileDigest;
use buck2_core::execution_types::executor_config::RemoteExecutorUseCase;
use futures::future;
use remote_execution::InlinedBlobWithDigest;
use remote_execution::TDigest;

use crate::digest::CasDigestConversionResultExt;
use crate::digest::CasDigestFromReExt;
use crate::digest::CasDigestToReExt;
use crate::digest_config::DigestConfig;
use crate::re::manager::ManagedRemoteExecutionClient;
use crate::re::streams::RemoteCommandStdStreams;
Expand Down Expand Up @@ -238,12 +240,13 @@ impl CommandStdStreams {
self,
client: &ManagedRemoteExecutionClient,
use_case: RemoteExecutorUseCase,
digest_config: DigestConfig,
) -> anyhow::Result<StdStreamPair<ReStdStream>> {
match self {
Self::Local { stdout, stderr } => {
let (stdout, stderr) = future::try_join(
maybe_upload_to_re(client, use_case, stdout),
maybe_upload_to_re(client, use_case, stderr),
maybe_upload_to_re(client, use_case, stdout, digest_config),
maybe_upload_to_re(client, use_case, stderr, digest_config),
)
.await?;

Expand Down Expand Up @@ -276,11 +279,17 @@ async fn maybe_upload_to_re(
client: &ManagedRemoteExecutionClient,
use_case: RemoteExecutorUseCase,
bytes: Vec<u8>,
digest_config: DigestConfig,
) -> anyhow::Result<ReStdStream> {
const MIN_STREAM_UPLOAD_SIZE: usize = 50 * 1024; // Same as RE
if bytes.len() < MIN_STREAM_UPLOAD_SIZE {
return Ok(ReStdStream::Raw(bytes));
}
let digest = client.upload_blob(bytes, use_case).await?;
let inline_blob = InlinedBlobWithDigest {
digest: FileDigest::from_content(&bytes, digest_config.cas_digest_config()).to_re(),
blob: bytes,
..Default::default()
};
let digest = client.upload_blob(inline_blob, use_case).await?;
Ok(ReStdStream::Digest(digest))
}
8 changes: 5 additions & 3 deletions app/buck2_execute/src/re/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ impl RemoteExecutionClient {

pub async fn upload_blob(
&self,
blob: Vec<u8>,
blob: InlinedBlobWithDigest,
use_case: RemoteExecutorUseCase,
) -> anyhow::Result<TDigest> {
self.data
Expand Down Expand Up @@ -1151,17 +1151,19 @@ impl RemoteExecutionClientImpl {

pub async fn upload_blob(
&self,
blob: Vec<u8>,
blob: InlinedBlobWithDigest,
use_case: RemoteExecutorUseCase,
) -> anyhow::Result<TDigest> {
let digest = blob.digest.clone();
with_error_handler(
"upload_blob",
self.get_session_id(),
self.client()
.upload_blob(blob, use_case.metadata(None))
.await,
)
.await
.await?;
Ok(digest)
}

async fn materialize_files(
Expand Down
2 changes: 1 addition & 1 deletion app/buck2_execute/src/re/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ impl ManagedRemoteExecutionClient {

pub async fn upload_blob(
&self,
blob: Vec<u8>,
blob: InlinedBlobWithDigest,
use_case: RemoteExecutorUseCase,
) -> anyhow::Result<TDigest> {
self.lock()?.get().await?.upload_blob(blob, use_case).await
Expand Down
2 changes: 1 addition & 1 deletion app/buck2_execute_impl/src/executors/caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ impl CacheUploader {
.report
.std_streams
.clone()
.into_re(&self.re_client, self.re_use_case)
.into_re(&self.re_client, self.re_use_case, digest_config)
.await
.context("Error accessing std_streams")
};
Expand Down
20 changes: 15 additions & 5 deletions remote_execution/oss/re_grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,11 +758,21 @@ impl REClient {

pub async fn upload_blob(
&self,
_blob: Vec<u8>,
_metadata: RemoteExecutionMetadata,
) -> anyhow::Result<TDigest> {
// TODO(aloiscochard)
Err(anyhow::anyhow!("Not implemented (RE upload_blob)"))
blob: InlinedBlobWithDigest,
metadata: RemoteExecutionMetadata,
) -> anyhow::Result<()> {
self.upload(
metadata,
UploadRequest {
inlined_blobs_with_digest: Some(vec![blob]),
files_with_digest: None,
directories: None,
upload_only_missing: false,
..Default::default()
},
)
.await?;
Ok(())
}

pub async fn download(
Expand Down

0 comments on commit 0088a2f

Please sign in to comment.