Skip to content

Commit

Permalink
remote_execution: implement OSS write_action_result for `local_only…
Browse files Browse the repository at this point in the history
…` cache uploads

Forward port of part of <#477>,
which implements code in the OSS path for putting results of actions marked
`local_only = True` into the RBE `ActionCache`.

For people actually using the "remote execution" part of RBE and not just a
cache, this rather large omission in the system will often be transparently
mitigated because RBE services will not only record `ActionCache` entries
on your behalf, they will also implement "EX-to-AC" forwarding where any
execution calls will immediately get looked up in the `ActionCache` anyway as an
optimization. Which means you'll never see this for things that properly hit CAS
inputs/outputs.

However, a `CommandExecutorConfig` configured with `remote_enabled = False` can
still enable `remote_cache_enabled = allow_cache_uploads = True` which will both
enable the ActionCache/CAS support, and also allow uploads to those interfaces
too.

The most useful example of this type of setup is in a CI system like GitHub
Actions: where you run the build inside a container or image that provides
something quasi-hermetic (e.g. including your toolchains), while the RBE system
is purely a cache storing inputs/outputs/ActionCache entries and never uses
remote execution.

The hope is that this functionality will soon be plug-and-play on GitHub using
an RBE-to-GHA caching proxy, so Buck2 projects will be able to get a quick and
easy cache, but without RE. Therefore this functionality will become highly
useful.

Co-authored-by: Austin Seipp <[email protected]>
  • Loading branch information
cormacrelf and thoughtpolice committed Sep 3, 2024
1 parent fa2541e commit 69c97fc
Showing 1 changed file with 152 additions and 3 deletions.
155 changes: 152 additions & 3 deletions remote_execution/oss/re_grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,17 @@ use re_grpc_proto::build::bazel::remote::execution::v2::Digest;
use re_grpc_proto::build::bazel::remote::execution::v2::ExecuteOperationMetadata;
use re_grpc_proto::build::bazel::remote::execution::v2::ExecuteRequest as GExecuteRequest;
use re_grpc_proto::build::bazel::remote::execution::v2::ExecuteResponse as GExecuteResponse;
use re_grpc_proto::build::bazel::remote::execution::v2::ExecutedActionMetadata;
use re_grpc_proto::build::bazel::remote::execution::v2::FindMissingBlobsRequest;
use re_grpc_proto::build::bazel::remote::execution::v2::FindMissingBlobsResponse;
use re_grpc_proto::build::bazel::remote::execution::v2::GetActionResultRequest;
use re_grpc_proto::build::bazel::remote::execution::v2::GetCapabilitiesRequest;
use re_grpc_proto::build::bazel::remote::execution::v2::OutputDirectory;
use re_grpc_proto::build::bazel::remote::execution::v2::OutputFile;
use re_grpc_proto::build::bazel::remote::execution::v2::RequestMetadata;
use re_grpc_proto::build::bazel::remote::execution::v2::ResultsCachePolicy;
use re_grpc_proto::build::bazel::remote::execution::v2::ToolDetails;
use re_grpc_proto::build::bazel::remote::execution::v2::UpdateActionResultRequest;
use re_grpc_proto::google::bytestream::byte_stream_client::ByteStreamClient;
use re_grpc_proto::google::bytestream::ReadRequest;
use re_grpc_proto::google::bytestream::ReadResponse;
Expand Down Expand Up @@ -117,6 +121,13 @@ fn check_status(status: Status) -> Result<(), REClientError> {
})
}

fn ttimestamp_to(ts: TTimestamp) -> Option<prost_types::Timestamp> {
Some(prost_types::Timestamp {
seconds: ts.seconds,
nanos: ts.nanos,
})
}

fn ttimestamp_from(ts: Option<::prost_types::Timestamp>) -> TTimestamp {
match ts {
Some(timestamp) => TTimestamp {
Expand Down Expand Up @@ -589,10 +600,37 @@ impl REClient {

pub async fn write_action_result(
&self,
_metadata: RemoteExecutionMetadata,
_request: WriteActionResultRequest,
metadata: RemoteExecutionMetadata,
write_request: WriteActionResultRequest,
) -> anyhow::Result<WriteActionResultResponse> {
Err(anyhow::anyhow!("Not supported"))
let mut client = self.grpc_clients.action_cache_client.clone();
let action_digest = tdigest_to(write_request.action_digest.clone());
let action_result = convert_taction_result_to_rbe(write_request.action_result)?;
let request = UpdateActionResultRequest {
action_digest: Some(action_digest),
action_result: Some(action_result),
results_cache_policy: None,
instance_name: self.instance_name.as_str().to_owned(),
};

let t: ActionResult = client
.update_action_result(with_re_metadata(
request,
metadata,
self.runtime_opts.use_fbcode_metadata,
))
.await?
.into_inner();

let result = convert_action_result(t)?;
let result = WriteActionResultResponse {
actual_action_result: result,
// NOTE: This is an arbitrary number because RBE does not return information
// on the TTL of the ActionResult.
// Also buck2 does not appear to read this value anywhere.
ttl_seconds: 0,
};
Ok(result)
}

pub async fn execute_with_progress(
Expand Down Expand Up @@ -912,7 +950,118 @@ impl REClient {
}
}

fn convert_execution_action_metadata_to_rbe(
metadata: TExecutedActionMetadata,
) -> anyhow::Result<ExecutedActionMetadata> {
let TExecutedActionMetadata {
worker,
queued_timestamp,
worker_start_timestamp,
worker_completed_timestamp,
input_fetch_start_timestamp,
input_fetch_completed_timestamp,
execution_start_timestamp,
execution_completed_timestamp,
output_upload_start_timestamp,
output_upload_completed_timestamp,
execution_dir: _,
input_analyzing_start_timestamp: _,
input_analyzing_completed_timestamp: _,
execution_attempts: _,
last_queued_timestamp: _,
instruction_counts: _,
auxiliary_metadata: _,
_dot_dot_default,
} = metadata;
Ok(ExecutedActionMetadata {
worker,
worker_start_timestamp: ttimestamp_to(worker_start_timestamp),
worker_completed_timestamp: ttimestamp_to(worker_completed_timestamp),
input_fetch_start_timestamp: ttimestamp_to(input_fetch_start_timestamp),
input_fetch_completed_timestamp: ttimestamp_to(input_fetch_completed_timestamp),
execution_start_timestamp: ttimestamp_to(execution_start_timestamp),
execution_completed_timestamp: ttimestamp_to(execution_completed_timestamp),
output_upload_start_timestamp: ttimestamp_to(output_upload_start_timestamp),
output_upload_completed_timestamp: ttimestamp_to(output_upload_completed_timestamp),
queued_timestamp: ttimestamp_to(queued_timestamp),
// TODO(cormacrelf): calculate this in a reasonable way for buck.
// see protobuf docs on virtual_execution_duration.
// May be able to use last_queued_timestamp
virtual_execution_duration: None,
// Ugh, need a routine to convert TAny to prost_type::Any...
auxiliary_metadata: vec![],
})
}

fn convert_taction_result_to_rbe(result: TActionResult2) -> anyhow::Result<ActionResult> {
let TActionResult2 {
output_files,
output_directories,
output_symlinks,
exit_code,
stdout_raw,
stdout_digest,
stderr_raw,
stderr_digest,
execution_metadata,
auxiliary_metadata: _,
_dot_dot_default,
} = result;

let execution_metadata = convert_execution_action_metadata_to_rbe(execution_metadata)?;
let output_files = output_files.into_try_map(|output_file| {
let TFile {
digest,
name,
executable,
..
} = output_file;
anyhow::Ok(OutputFile {
digest: Some(tdigest_to(digest.digest)),
path: name,
is_executable: executable,
// Clients SHOULD NOT populate this field when uploading to the cache.
contents: Vec::new(),
node_properties: None,
})
})?;
let output_directories = output_directories.into_try_map(|output_directory| {
let tree_digest = tdigest_to(output_directory.tree_digest);
anyhow::Ok(OutputDirectory {
path: output_directory.path,
tree_digest: Some(tree_digest.clone()),
// TODO(cormacrelf): check whether buck2_execute::directory::directory_to_re_tree
// conforms with the requirements of passing `true` here (see .proto file)
is_topologically_sorted: false,
})
})?;

anyhow::Ok(ActionResult {
exit_code,
execution_metadata: Some(execution_metadata),
output_directories,
output_files,
// TODO: support symlinks
output_symlinks: vec![],
output_file_symlinks: vec![],
output_directory_symlinks: vec![],
// If missing, it's because we uploaded it already
// if present, it's inline
stdout_raw: stdout_raw.unwrap_or(Vec::new()),
stdout_digest: stdout_digest.map(tdigest_to),
stderr_raw: stderr_raw.unwrap_or(Vec::new()),
stderr_digest: stderr_digest.map(tdigest_to),
})
}

fn convert_action_result(action_result: ActionResult) -> anyhow::Result<TActionResult2> {
if !action_result.output_symlinks.is_empty()
|| !action_result.output_file_symlinks.is_empty()
|| !action_result.output_directory_symlinks.is_empty()
{
return Err(anyhow::anyhow!("Symlinks are not supported"));
}

let execution_metadata = action_result
.execution_metadata
.with_context(|| "The execution metadata are not defined.")?;
Expand Down

0 comments on commit 69c97fc

Please sign in to comment.