Skip to content

Commit

Permalink
remote_execution: implement OSS write_action_result for `local_only…
Browse files Browse the repository at this point in the history
…` cache uploads

Forward port of part of <#477>,
which implements code in the OSS path for putting results of actions marked
`local_only = True` into the RBE `ActionCache`.

For people actually using the "remote execution" part of RBE and not just a
cache, this rather large omission in the system will often be transparently
mitigated because RBE services will not only record `ActionCache` entries
on your behalf, they will also implement "EX-to-AC" forwarding where any
execution calls will immediately get looked up in the `ActionCache` anyway as an
optimization. Which means you'll never see this for things that properly hit CAS
inputs/outputs.

However, a `CommandExecutorConfig` configured with `remote_enabled = False` can
still enable `remote_cache_enabled = allow_cache_uploads = True` which will both
enable the ActionCache/CAS support, and also allow uploads to those interfaces
too.

The most useful example of this type of setup is in a CI system like GitHub
Actions: where you run the build inside a container or image that provides
something quasi-hermetic (e.g. including your toolchains), while the RBE system
is purely a cache storing inputs/outputs/ActionCache entries and never uses
remote execution.

The hope is that this functionality will soon be plug-and-play on GitHub using
an RBE-to-GHA caching proxy, so Buck2 projects will be able to get a quick and
easy cache, but without RE. Therefore this functionality will become highly
useful.

Co-authored-by: Austin Seipp <[email protected]>
  • Loading branch information
cormacrelf and thoughtpolice committed Nov 19, 2024
1 parent dc95235 commit f780555
Showing 1 changed file with 123 additions and 3 deletions.
126 changes: 123 additions & 3 deletions remote_execution/oss/re_grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,17 @@ use re_grpc_proto::build::bazel::remote::execution::v2::Digest;
use re_grpc_proto::build::bazel::remote::execution::v2::ExecuteOperationMetadata;
use re_grpc_proto::build::bazel::remote::execution::v2::ExecuteRequest as GExecuteRequest;
use re_grpc_proto::build::bazel::remote::execution::v2::ExecuteResponse as GExecuteResponse;
use re_grpc_proto::build::bazel::remote::execution::v2::ExecutedActionMetadata;
use re_grpc_proto::build::bazel::remote::execution::v2::FindMissingBlobsRequest;
use re_grpc_proto::build::bazel::remote::execution::v2::FindMissingBlobsResponse;
use re_grpc_proto::build::bazel::remote::execution::v2::GetActionResultRequest;
use re_grpc_proto::build::bazel::remote::execution::v2::GetCapabilitiesRequest;
use re_grpc_proto::build::bazel::remote::execution::v2::OutputDirectory;
use re_grpc_proto::build::bazel::remote::execution::v2::OutputFile;
use re_grpc_proto::build::bazel::remote::execution::v2::RequestMetadata;
use re_grpc_proto::build::bazel::remote::execution::v2::ResultsCachePolicy;
use re_grpc_proto::build::bazel::remote::execution::v2::ToolDetails;
use re_grpc_proto::build::bazel::remote::execution::v2::UpdateActionResultRequest;
use re_grpc_proto::google::bytestream::byte_stream_client::ByteStreamClient;
use re_grpc_proto::google::bytestream::ReadRequest;
use re_grpc_proto::google::bytestream::ReadResponse;
Expand Down Expand Up @@ -118,6 +122,13 @@ fn check_status(status: Status) -> Result<(), REClientError> {
})
}

fn ttimestamp_to(ts: TTimestamp) -> Option<prost_types::Timestamp> {
Some(prost_types::Timestamp {
seconds: ts.seconds,
nanos: ts.nanos,
})
}

fn ttimestamp_from(ts: Option<::prost_types::Timestamp>) -> TTimestamp {
match ts {
Some(timestamp) => TTimestamp {
Expand Down Expand Up @@ -590,10 +601,37 @@ impl REClient {

pub async fn write_action_result(
&self,
_metadata: RemoteExecutionMetadata,
_request: WriteActionResultRequest,
metadata: RemoteExecutionMetadata,
write_request: WriteActionResultRequest,
) -> anyhow::Result<WriteActionResultResponse> {
Err(anyhow::anyhow!("Not supported"))
let mut client = self.grpc_clients.action_cache_client.clone();
let action_digest = tdigest_to(write_request.action_digest.clone());
let action_result = convert_taction_result_to_rbe(write_request.action_result);
let request = UpdateActionResultRequest {
action_digest: Some(action_digest),
action_result: Some(action_result),
results_cache_policy: None,
instance_name: self.instance_name.as_str().to_owned(),
};

let t: ActionResult = client
.update_action_result(with_re_metadata(
request,
metadata,
self.runtime_opts.use_fbcode_metadata,
))
.await?
.into_inner();

let result = convert_action_result(t)?;
let result = WriteActionResultResponse {
actual_action_result: result,
// NOTE: This is an arbitrary number because RBE does not return information
// on the TTL of the ActionResult.
// Also buck2 does not appear to read this value anywhere.
ttl_seconds: 0,
};
Ok(result)
}

pub async fn execute_with_progress(
Expand Down Expand Up @@ -916,7 +954,89 @@ impl REClient {
}
}

fn convert_taction_result_to_rbe(result: TActionResult2) -> ActionResult {
let TActionResult2 {
output_files,
output_directories,
output_symlinks: _,
exit_code,
stdout_raw,
stdout_digest,
stderr_raw,
stderr_digest,
execution_metadata: metadata,
auxiliary_metadata: _,
_dot_dot_default,
} = result;

let output_files = output_files.into_map(|output_file| {
OutputFile {
digest: Some(tdigest_to(output_file.digest.digest)),
path: output_file.name,
is_executable: output_file.executable,
// Clients SHOULD NOT populate this field when uploading to the cache.
contents: Vec::new(),
node_properties: None,
}
});

let output_directories = output_directories.into_map(|output_directory| {
OutputDirectory {
path: output_directory.path,
tree_digest: Some(tdigest_to(output_directory.tree_digest)),
// TODO(cormacrelf): check whether buck2_execute::directory::directory_to_re_tree
// conforms with the requirements of passing `true` here (see .proto file)
is_topologically_sorted: false,
}
});

let execution_metadata = Some(ExecutedActionMetadata {
worker: metadata.worker,
worker_start_timestamp: ttimestamp_to(metadata.worker_start_timestamp),
worker_completed_timestamp: ttimestamp_to(metadata.worker_completed_timestamp),
input_fetch_start_timestamp: ttimestamp_to(metadata.input_fetch_start_timestamp),
input_fetch_completed_timestamp: ttimestamp_to(metadata.input_fetch_completed_timestamp),
execution_start_timestamp: ttimestamp_to(metadata.execution_start_timestamp),
execution_completed_timestamp: ttimestamp_to(metadata.execution_completed_timestamp),
output_upload_start_timestamp: ttimestamp_to(metadata.output_upload_start_timestamp),
output_upload_completed_timestamp: ttimestamp_to(
metadata.output_upload_completed_timestamp,
),
queued_timestamp: ttimestamp_to(metadata.queued_timestamp),
// TODO(cormacrelf): calculate this in a reasonable way for buck.
// see protobuf docs on virtual_execution_duration.
// May be able to use last_queued_timestamp
virtual_execution_duration: None,
// Ugh, need a routine to convert TAny to prost_type::Any...
auxiliary_metadata: vec![],
});

ActionResult {
exit_code,
execution_metadata,
output_directories,
output_files,
// TODO: support symlinks
output_symlinks: vec![],
output_file_symlinks: vec![],
output_directory_symlinks: vec![],
// If missing, it's because we uploaded it already
// if present, it's inline
stdout_raw: stdout_raw.unwrap_or(Vec::new()),
stdout_digest: stdout_digest.map(tdigest_to),
stderr_raw: stderr_raw.unwrap_or(Vec::new()),
stderr_digest: stderr_digest.map(tdigest_to),
}
}

fn convert_action_result(action_result: ActionResult) -> anyhow::Result<TActionResult2> {
if !action_result.output_symlinks.is_empty()
|| !action_result.output_file_symlinks.is_empty()
|| !action_result.output_directory_symlinks.is_empty()
{
return Err(anyhow::anyhow!("Symlinks are not supported"));
}

let execution_metadata = action_result
.execution_metadata
.with_context(|| "The execution metadata are not defined.")?;
Expand Down

0 comments on commit f780555

Please sign in to comment.