diff --git a/remote_execution/oss/re_grpc/src/client.rs b/remote_execution/oss/re_grpc/src/client.rs index 3c5b8a3b941c..7f7e168b9bbc 100644 --- a/remote_execution/oss/re_grpc/src/client.rs +++ b/remote_execution/oss/re_grpc/src/client.rs @@ -46,13 +46,17 @@ use re_grpc_proto::build::bazel::remote::execution::v2::Digest; use re_grpc_proto::build::bazel::remote::execution::v2::ExecuteOperationMetadata; use re_grpc_proto::build::bazel::remote::execution::v2::ExecuteRequest as GExecuteRequest; use re_grpc_proto::build::bazel::remote::execution::v2::ExecuteResponse as GExecuteResponse; +use re_grpc_proto::build::bazel::remote::execution::v2::ExecutedActionMetadata; use re_grpc_proto::build::bazel::remote::execution::v2::FindMissingBlobsRequest; use re_grpc_proto::build::bazel::remote::execution::v2::FindMissingBlobsResponse; use re_grpc_proto::build::bazel::remote::execution::v2::GetActionResultRequest; use re_grpc_proto::build::bazel::remote::execution::v2::GetCapabilitiesRequest; +use re_grpc_proto::build::bazel::remote::execution::v2::OutputDirectory; +use re_grpc_proto::build::bazel::remote::execution::v2::OutputFile; use re_grpc_proto::build::bazel::remote::execution::v2::RequestMetadata; use re_grpc_proto::build::bazel::remote::execution::v2::ResultsCachePolicy; use re_grpc_proto::build::bazel::remote::execution::v2::ToolDetails; +use re_grpc_proto::build::bazel::remote::execution::v2::UpdateActionResultRequest; use re_grpc_proto::google::bytestream::byte_stream_client::ByteStreamClient; use re_grpc_proto::google::bytestream::ReadRequest; use re_grpc_proto::google::bytestream::ReadResponse; @@ -117,6 +121,13 @@ fn check_status(status: Status) -> Result<(), REClientError> { }) } +fn ttimestamp_to(ts: TTimestamp) -> Option { + Some(prost_types::Timestamp { + seconds: ts.seconds, + nanos: ts.nanos, + }) +} + fn ttimestamp_from(ts: Option<::prost_types::Timestamp>) -> TTimestamp { match ts { Some(timestamp) => TTimestamp { @@ -589,10 +600,37 @@ impl REClient { pub async fn write_action_result( &self, - _metadata: RemoteExecutionMetadata, - _request: WriteActionResultRequest, + metadata: RemoteExecutionMetadata, + write_request: WriteActionResultRequest, ) -> anyhow::Result { - Err(anyhow::anyhow!("Not supported")) + let mut client = self.grpc_clients.action_cache_client.clone(); + let action_digest = tdigest_to(write_request.action_digest.clone()); + let action_result = convert_taction_result_to_rbe(write_request.action_result); + let request = UpdateActionResultRequest { + action_digest: Some(action_digest), + action_result: Some(action_result), + results_cache_policy: None, + instance_name: self.instance_name.as_str().to_owned(), + }; + + let t: ActionResult = client + .update_action_result(with_re_metadata( + request, + metadata, + self.runtime_opts.use_fbcode_metadata, + )) + .await? + .into_inner(); + + let result = convert_action_result(t)?; + let result = WriteActionResultResponse { + actual_action_result: result, + // NOTE: This is an arbitrary number because RBE does not return information + // on the TTL of the ActionResult. + // Also buck2 does not appear to read this value anywhere. + ttl_seconds: 0, + }; + Ok(result) } pub async fn execute_with_progress( @@ -912,7 +950,89 @@ impl REClient { } } +fn convert_taction_result_to_rbe(result: TActionResult2) -> ActionResult { + let TActionResult2 { + output_files, + output_directories, + output_symlinks: _, + exit_code, + stdout_raw, + stdout_digest, + stderr_raw, + stderr_digest, + execution_metadata: metadata, + auxiliary_metadata: _, + _dot_dot_default, + } = result; + + let output_files = output_files.into_map(|output_file| { + OutputFile { + digest: Some(tdigest_to(output_file.digest.digest)), + path: output_file.name, + is_executable: output_file.executable, + // Clients SHOULD NOT populate this field when uploading to the cache. + contents: Vec::new(), + node_properties: None, + } + }); + + let output_directories = output_directories.into_map(|output_directory| { + OutputDirectory { + path: output_directory.path, + tree_digest: Some(tdigest_to(output_directory.tree_digest)), + // TODO(cormacrelf): check whether buck2_execute::directory::directory_to_re_tree + // conforms with the requirements of passing `true` here (see .proto file) + is_topologically_sorted: false, + } + }); + + let execution_metadata = Some(ExecutedActionMetadata { + worker: metadata.worker, + worker_start_timestamp: ttimestamp_to(metadata.worker_start_timestamp), + worker_completed_timestamp: ttimestamp_to(metadata.worker_completed_timestamp), + input_fetch_start_timestamp: ttimestamp_to(metadata.input_fetch_start_timestamp), + input_fetch_completed_timestamp: ttimestamp_to(metadata.input_fetch_completed_timestamp), + execution_start_timestamp: ttimestamp_to(metadata.execution_start_timestamp), + execution_completed_timestamp: ttimestamp_to(metadata.execution_completed_timestamp), + output_upload_start_timestamp: ttimestamp_to(metadata.output_upload_start_timestamp), + output_upload_completed_timestamp: ttimestamp_to( + metadata.output_upload_completed_timestamp, + ), + queued_timestamp: ttimestamp_to(metadata.queued_timestamp), + // TODO(cormacrelf): calculate this in a reasonable way for buck. + // see protobuf docs on virtual_execution_duration. + // May be able to use last_queued_timestamp + virtual_execution_duration: None, + // Ugh, need a routine to convert TAny to prost_type::Any... + auxiliary_metadata: vec![], + }); + + ActionResult { + exit_code, + execution_metadata, + output_directories, + output_files, + // TODO: support symlinks + output_symlinks: vec![], + output_file_symlinks: vec![], + output_directory_symlinks: vec![], + // If missing, it's because we uploaded it already + // if present, it's inline + stdout_raw: stdout_raw.unwrap_or(Vec::new()), + stdout_digest: stdout_digest.map(tdigest_to), + stderr_raw: stderr_raw.unwrap_or(Vec::new()), + stderr_digest: stderr_digest.map(tdigest_to), + } +} + fn convert_action_result(action_result: ActionResult) -> anyhow::Result { + if !action_result.output_symlinks.is_empty() + || !action_result.output_file_symlinks.is_empty() + || !action_result.output_directory_symlinks.is_empty() + { + return Err(anyhow::anyhow!("Symlinks are not supported")); + } + let execution_metadata = action_result .execution_metadata .with_context(|| "The execution metadata are not defined.")?;