From 73f755ec8c69e593a01ecc1f36688bc865423332 Mon Sep 17 00:00:00 2001 From: Hugo van der Wijst Date: Mon, 9 Sep 2024 09:38:37 -0700 Subject: [PATCH] Add support to upload action results to an RE client. --- remote_execution/oss/re_grpc/src/client.rs | 119 ++++++++++++++++++++- 1 file changed, 116 insertions(+), 3 deletions(-) diff --git a/remote_execution/oss/re_grpc/src/client.rs b/remote_execution/oss/re_grpc/src/client.rs index 3c5b8a3b941c..daa58b9001b8 100644 --- a/remote_execution/oss/re_grpc/src/client.rs +++ b/remote_execution/oss/re_grpc/src/client.rs @@ -46,13 +46,18 @@ use re_grpc_proto::build::bazel::remote::execution::v2::Digest; use re_grpc_proto::build::bazel::remote::execution::v2::ExecuteOperationMetadata; use re_grpc_proto::build::bazel::remote::execution::v2::ExecuteRequest as GExecuteRequest; use re_grpc_proto::build::bazel::remote::execution::v2::ExecuteResponse as GExecuteResponse; +use re_grpc_proto::build::bazel::remote::execution::v2::ExecutedActionMetadata; use re_grpc_proto::build::bazel::remote::execution::v2::FindMissingBlobsRequest; use re_grpc_proto::build::bazel::remote::execution::v2::FindMissingBlobsResponse; use re_grpc_proto::build::bazel::remote::execution::v2::GetActionResultRequest; use re_grpc_proto::build::bazel::remote::execution::v2::GetCapabilitiesRequest; +use re_grpc_proto::build::bazel::remote::execution::v2::OutputDirectory; +use re_grpc_proto::build::bazel::remote::execution::v2::OutputFile; +use re_grpc_proto::build::bazel::remote::execution::v2::OutputSymlink; use re_grpc_proto::build::bazel::remote::execution::v2::RequestMetadata; use re_grpc_proto::build::bazel::remote::execution::v2::ResultsCachePolicy; use re_grpc_proto::build::bazel::remote::execution::v2::ToolDetails; +use re_grpc_proto::build::bazel::remote::execution::v2::UpdateActionResultRequest; use re_grpc_proto::google::bytestream::byte_stream_client::ByteStreamClient; use re_grpc_proto::google::bytestream::ReadRequest; use re_grpc_proto::google::bytestream::ReadResponse; @@ -117,6 +122,13 @@ fn check_status(status: Status) -> Result<(), REClientError> { }) } +fn ttimestamp_to(ts: TTimestamp) -> ::prost_types::Timestamp { + ::prost_types::Timestamp { + seconds: ts.seconds, + nanos: ts.nanos, + } +} + fn ttimestamp_from(ts: Option<::prost_types::Timestamp>) -> TTimestamp { match ts { Some(timestamp) => TTimestamp { @@ -589,10 +601,28 @@ impl REClient { pub async fn write_action_result( &self, - _metadata: RemoteExecutionMetadata, - _request: WriteActionResultRequest, + metadata: RemoteExecutionMetadata, + request: WriteActionResultRequest, ) -> anyhow::Result { - Err(anyhow::anyhow!("Not supported")) + let mut client = self.grpc_clients.action_cache_client.clone(); + + let res = client + .update_action_result(with_re_metadata( + UpdateActionResultRequest { + instance_name: self.instance_name.as_str().to_owned(), + action_digest: Some(tdigest_to(request.action_digest)), + action_result: Some(convert_t_action_result2(request.action_result)?), + results_cache_policy: None, + }, + metadata, + self.runtime_opts.use_fbcode_metadata, + )) + .await?; + + Ok(WriteActionResultResponse { + actual_action_result: convert_action_result(res.into_inner())?, + ttl_seconds: 0, + }) } pub async fn execute_with_progress( @@ -1008,6 +1038,89 @@ fn convert_action_result(action_result: ActionResult) -> anyhow::Result anyhow::Result { + let t_execution_metadata = t_action_result.execution_metadata; + let virtual_execution_duration = prost_types::Duration::try_from( + t_execution_metadata + .execution_completed_timestamp + .saturating_duration_since(&t_execution_metadata.execution_start_timestamp), + )?; + let execution_metadata = Some(ExecutedActionMetadata { + worker: t_execution_metadata.worker, + queued_timestamp: Some(ttimestamp_to(t_execution_metadata.queued_timestamp)), + worker_start_timestamp: Some(ttimestamp_to(t_execution_metadata.worker_start_timestamp)), + worker_completed_timestamp: Some(ttimestamp_to( + t_execution_metadata.worker_completed_timestamp, + )), + input_fetch_start_timestamp: Some(ttimestamp_to( + t_execution_metadata.input_fetch_start_timestamp, + )), + input_fetch_completed_timestamp: Some(ttimestamp_to( + t_execution_metadata.input_fetch_completed_timestamp, + )), + execution_start_timestamp: Some(ttimestamp_to( + t_execution_metadata.execution_start_timestamp, + )), + execution_completed_timestamp: Some(ttimestamp_to( + t_execution_metadata.execution_completed_timestamp, + )), + virtual_execution_duration: Some(virtual_execution_duration), + output_upload_start_timestamp: Some(ttimestamp_to( + t_execution_metadata.output_upload_start_timestamp, + )), + output_upload_completed_timestamp: Some(ttimestamp_to( + t_execution_metadata.output_upload_completed_timestamp, + )), + auxiliary_metadata: Vec::new(), + }); + + let output_files = t_action_result + .output_files + .into_map(|output_file| OutputFile { + path: output_file.name, + digest: Some(tdigest_to(output_file.digest.digest)), + is_executable: output_file.executable, + contents: Vec::new(), + node_properties: None, + }); + + let output_symlinks = + t_action_result + .output_symlinks + .into_map(|output_symlink| OutputSymlink { + path: output_symlink.name, + target: output_symlink.target, + node_properties: None, + }); + + let output_directories = t_action_result + .output_directories + .into_map(|output_directory| { + let digest = tdigest_to(output_directory.tree_digest); + OutputDirectory { + path: output_directory.path, + tree_digest: Some(digest.clone()), + is_topologically_sorted: false, + } + }); + + let action_result = ActionResult { + output_files, + output_file_symlinks: Vec::new(), + output_symlinks, + output_directories, + output_directory_symlinks: Vec::new(), + exit_code: t_action_result.exit_code, + stdout_raw: Vec::new(), + stdout_digest: t_action_result.stdout_digest.map(tdigest_to), + stderr_raw: Vec::new(), + stderr_digest: t_action_result.stderr_digest.map(tdigest_to), + execution_metadata, + }; + + Ok(action_result) +} + async fn download_impl( instance_name: &InstanceName, request: DownloadRequest,