From 0375b3041a33b0f731c6d88f8f5ab782bdb2690d Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Fri, 18 Jan 2019 17:44:49 +0000 Subject: [PATCH] Switch operation getting to tower (#7108) --- src/rust/engine/Cargo.lock | 4 +- src/rust/engine/process_execution/Cargo.toml | 3 +- .../process_execution/bazel_protos/Cargo.toml | 1 + .../process_execution/bazel_protos/build.rs | 8 +- .../bazel_protos/src/conversions.rs | 44 + .../process_execution/bazel_protos/src/lib.rs | 1 + src/rust/engine/process_execution/src/lib.rs | 1 - .../engine/process_execution/src/remote.rs | 1037 ++++++++--------- src/rust/engine/process_executor/src/main.rs | 15 - src/rust/engine/src/context.rs | 3 - .../testutil/mock/src/execution_server.rs | 13 +- 11 files changed, 568 insertions(+), 562 deletions(-) diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index a368da84555..7799ebbeb4d 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -98,6 +98,7 @@ dependencies = [ "grpcio 0.3.0 (git+https://github.com/pantsbuild/grpc-rs.git?rev=4dfafe9355dc996d7d0702e7386a6fedcd9734c0)", "grpcio-compiler 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "hashing 0.0.1", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "prost 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "prost-derive 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "prost-types 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1280,13 +1281,14 @@ dependencies = [ "fs 0.0.1", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "futures-timer 0.1.1 (git+https://github.com/pantsbuild/futures-timer?rev=0b747e565309a58537807ab43c674d8951f9e5a0)", - "grpcio 0.3.0 (git+https://github.com/pantsbuild/grpc-rs.git?rev=4dfafe9355dc996d7d0702e7386a6fedcd9734c0)", "h2 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", "hashing 0.0.1", "http 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "mock 0.0.1", "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", + "prost 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "prost-types 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)", "resettable 0.0.1", "sha2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/src/rust/engine/process_execution/Cargo.toml b/src/rust/engine/process_execution/Cargo.toml index 755836ad809..affa75eeae1 100644 --- a/src/rust/engine/process_execution/Cargo.toml +++ b/src/rust/engine/process_execution/Cargo.toml @@ -15,12 +15,13 @@ fs = { path = "../fs" } futures = "^0.1.16" # TODO: Switch to a release once https://github.com/alexcrichton/futures-timer/pull/11 and https://github.com/alexcrichton/futures-timer/pull/12 merge futures-timer = { git = "https://github.com/pantsbuild/futures-timer", rev = "0b747e565309a58537807ab43c674d8951f9e5a0" } -grpcio = { git = "https://github.com/pantsbuild/grpc-rs.git", rev = "4dfafe9355dc996d7d0702e7386a6fedcd9734c0", default_features = false, features = ["protobuf-codec"] } h2 = "0.1.13" hashing = { path = "../hashing" } http = "0.1" log = "0.4" parking_lot = "0.6" +prost = "0.4" +prost-types = "0.4" protobuf = { version = "2.0.4", features = ["with-bytes"] } resettable = { path = "../resettable" } sha2 = "0.8" diff --git a/src/rust/engine/process_execution/bazel_protos/Cargo.toml b/src/rust/engine/process_execution/bazel_protos/Cargo.toml index 7cd044c7471..6890cef290f 100644 --- a/src/rust/engine/process_execution/bazel_protos/Cargo.toml +++ b/src/rust/engine/process_execution/bazel_protos/Cargo.toml @@ -10,6 +10,7 @@ bytes = "0.4.5" futures = "^0.1.16" grpcio = { git = "https://github.com/pantsbuild/grpc-rs.git", rev = "4dfafe9355dc996d7d0702e7386a6fedcd9734c0", default_features = false, features = ["protobuf-codec"] } hashing = { path = "../../hashing" } +log = "0.4" prost = "0.4" prost-derive = "0.4" prost-types = "0.4" diff --git a/src/rust/engine/process_execution/bazel_protos/build.rs b/src/rust/engine/process_execution/bazel_protos/build.rs index 0c9e136ad9a..23c441961f3 100644 --- a/src/rust/engine/process_execution/bazel_protos/build.rs +++ b/src/rust/engine/process_execution/bazel_protos/build.rs @@ -176,9 +176,11 @@ fn generate_for_tower(thirdpartyprotobuf: &Path, out_dir: PathBuf) { .enable_server(true) .enable_client(true) .build( - &[PathBuf::from( - "build/bazel/remote/execution/v2/remote_execution.proto", - )], + &[ + PathBuf::from("build/bazel/remote/execution/v2/remote_execution.proto"), + PathBuf::from("google/rpc/code.proto"), + PathBuf::from("google/rpc/error_details.proto"), + ], &std::fs::read_dir(&thirdpartyprotobuf) .unwrap() .into_iter() diff --git a/src/rust/engine/process_execution/bazel_protos/src/conversions.rs b/src/rust/engine/process_execution/bazel_protos/src/conversions.rs index 4ed75bc2ef4..e46767fb1e6 100644 --- a/src/rust/engine/process_execution/bazel_protos/src/conversions.rs +++ b/src/rust/engine/process_execution/bazel_protos/src/conversions.rs @@ -1,4 +1,7 @@ +use bytes::BytesMut; use hashing; +use log::error; +use prost::Message; impl<'a> From<&'a hashing::Digest> for crate::remote_execution::Digest { fn from(d: &hashing::Digest) -> Self { @@ -97,6 +100,47 @@ impl From } } +// This should only be used in test contexts. It should be deleted when the mock systems use tower. +impl Into for crate::google::rpc::Status { + fn into(self) -> grpcio::RpcStatus { + let mut buf = BytesMut::with_capacity(self.encoded_len()); + self.encode(&mut buf).unwrap(); + grpcio::RpcStatus { + status: self.code.into(), + details: None, + status_proto_bytes: Some(buf.to_vec()), + } + } +} + +// TODO: Use num_enum or similar here when TryInto is stable. +pub fn code_from_i32(i: i32) -> crate::google::rpc::Code { + use crate::google::rpc::Code::*; + match i { + 0 => Ok, + 1 => Cancelled, + 2 => Unknown, + 3 => InvalidArgument, + 4 => DeadlineExceeded, + 5 => NotFound, + 6 => AlreadyExists, + 7 => PermissionDenied, + 8 => ResourceExhausted, + 9 => FailedPrecondition, + 10 => Aborted, + 11 => OutOfRange, + 12 => Unimplemented, + 13 => Internal, + 14 => Unavailable, + 15 => DataLoss, + 16 => Unauthenticated, + _ => { + error!("Unknown grpc error code: {}, default to Unknown", i); + Unknown + } + } +} + pub fn prost_any_to_gcprio_any(any: prost_types::Any) -> protobuf::well_known_types::Any { let prost_types::Any { type_url, value } = any; let mut dst = protobuf::well_known_types::Any::new(); diff --git a/src/rust/engine/process_execution/bazel_protos/src/lib.rs b/src/rust/engine/process_execution/bazel_protos/src/lib.rs index 44d7ae6fdc6..a5810076164 100644 --- a/src/rust/engine/process_execution/bazel_protos/src/lib.rs +++ b/src/rust/engine/process_execution/bazel_protos/src/lib.rs @@ -16,5 +16,6 @@ mod gen_for_tower; pub use crate::gen_for_tower::*; mod conversions; +pub use crate::conversions::code_from_i32; mod verification; pub use crate::verification::verify_directory_canonical; diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index 8b8a60617f8..e7286c4077b 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -34,7 +34,6 @@ use bazel_protos; use fs; -use grpcio; use hashing; #[cfg(test)] diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 029153e47ba..930c6adb79a 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; use std::path::PathBuf; -use std::sync::Arc; use std::time::{Duration, Instant}; use bazel_protos; @@ -10,11 +9,11 @@ use digest::{Digest as DigestTrait, FixedOutput}; use fs::{self, File, PathStat, Store}; use futures::{future, Future, Stream}; use futures_timer::Delay; -use grpcio; use hashing::{Digest, Fingerprint}; use log::{debug, trace, warn}; use parking_lot::Mutex; -use protobuf::{self, Message, ProtobufEnum}; +use prost::Message; +use protobuf::{self, Message as GrpcioMessage, ProtobufEnum}; use sha2::Sha256; use time; @@ -37,8 +36,18 @@ const CACHE_KEY_GEN_VERSION_ENV_VAR_NAME: &str = "PANTS_CACHE_KEY_GEN_VERSION"; #[derive(Debug)] enum OperationOrStatus { - Operation(bazel_protos::operations::Operation), - Status(bazel_protos::status::Status), + Operation(bazel_protos::google::longrunning::Operation), + Status(bazel_protos::google::rpc::Status), +} + +type Connection = tower_http::add_origin::AddOrigin< + tower_h2::client::Connection, +>; + +struct Clients { + execution_client: + Mutex>, + operations_client: Mutex>, } #[derive(Clone)] @@ -47,25 +56,7 @@ pub struct CommandRunner { cache_key_gen_version: Option, instance_name: Option, authorization_header: Option, - channel: grpcio::Channel, - env: Arc, - execution_client: futures::future::Shared< - BoxFuture< - Mutex< - bazel_protos::build::bazel::remote::execution::v2::client::Execution< - tower_http::add_origin::AddOrigin< - tower_h2::client::Connection< - tokio::net::tcp::TcpStream, - DefaultExecutor, - tower_grpc::BoxBody, - >, - >, - >, - >, - String, - >, - >, - operations_client: Arc, + clients: futures::future::Shared>, store: Store, futures_timer_thread: resettable::Resettable, } @@ -98,14 +89,16 @@ impl CommandRunner { &self, execute_request: bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest, ) -> impl Future { + let command_runner = self.clone(); self - .execution_client + .clients .clone() .map_err(|err| format!("Error getting execution_client: {}", err)) - .and_then(|execution_client| { - let mut execution_client = execution_client.lock(); - execution_client - .execute(Request::new(execute_request)) + .and_then(move |clients| { + clients + .execution_client + .lock() + .execute(command_runner.make_request(execute_request)) .map_err(towergrpcerror_to_string) .and_then(|response_stream| { response_stream @@ -122,7 +115,7 @@ impl CommandRunner { std::mem::drop(stream); resp.ok_or_else(|| "Didn't get response from remote process execution".to_owned()) }) - .map(|operation| OperationOrStatus::Operation(operation.into())) + .map(OperationOrStatus::Operation) }) }) } @@ -149,7 +142,7 @@ impl super::CommandRunner for CommandRunner { /// TODO: Request jdk_home be created if set. /// fn run(&self, req: ExecuteProcessRequest) -> BoxFuture { - let operations_client = self.operations_client.clone(); + let clients = self.clients.clone(); let store = self.store.clone(); let execute_request_result = @@ -168,7 +161,6 @@ impl super::CommandRunner for CommandRunner { Ok((action, command, execute_request)) => { let command_runner = self.clone(); let command_runner2 = self.clone(); - let command_runner3 = self.clone(); let execute_request2 = execute_request.clone(); let futures_timer_thread = self.futures_timer_thread.clone(); @@ -202,9 +194,9 @@ impl super::CommandRunner for CommandRunner { let execute_request2 = execute_request2.clone(); let store = store.clone(); - let operations_client = operations_client.clone(); + let clients = clients.clone(); let command_runner2 = command_runner2.clone(); - let command_runner3 = command_runner3.clone(); + let command_runner3 = command_runner2.clone(); let futures_timer_thread = futures_timer_thread.clone(); let f = command_runner2.extract_execute_response(operation, &mut history); f.map(future::Loop::Break).or_else(move |value| { @@ -243,9 +235,11 @@ impl super::CommandRunner for CommandRunner { .to_boxed() } ExecutionError::NotFinished(operation_name) => { - let mut operation_request = - bazel_protos::operations::GetOperationRequest::new(); - operation_request.set_name(operation_name.clone()); + let operation_name2 = operation_name.clone(); + let operation_request = + bazel_protos::google::longrunning::GetOperationRequest { + name: operation_name.clone(), + }; let backoff_period = min( CommandRunner::BACKOFF_MAX_WAIT_MILLIS, @@ -274,19 +268,24 @@ impl super::CommandRunner for CommandRunner { ) }) .and_then(move |_| { - future::done( - operations_client - .get_operation_opt(&operation_request, command_runner3.call_option()) - .or_else(move |err| { - rpcerror_recover_cancelled(operation_request.take_name(), err) - }) - .map(OperationOrStatus::Operation) - .map_err(rpcerror_to_string), - ) - .map(move |operation| { - future::Loop::Continue((history, operation, iter_num + 1)) - }) - .to_boxed() + clients + .map_err(|err| format!("{}", err)) + .and_then(move |clients| { + clients + .operations_client + .lock() + .get_operation(command_runner3.make_request(operation_request)) + .map(|r| r.into_inner()) + .or_else(move |err| { + rpcerror_recover_cancelled(operation_name2, err) + }) + .map_err(towergrpcerror_to_string) + }) + .map(OperationOrStatus::Operation) + .map(move |operation| { + future::Loop::Continue((history, operation, iter_num + 1)) + }) + .to_boxed() }) .to_boxed() } @@ -324,31 +323,10 @@ impl CommandRunner { address: &str, cache_key_gen_version: Option, instance_name: Option, - root_ca_certs: Option>, oauth_bearer_token: Option, - thread_count: usize, store: Store, futures_timer_thread: resettable::Resettable, ) -> Result { - let env = Arc::new(grpcio::Environment::new(thread_count)); - let channel = { - let builder = grpcio::ChannelBuilder::new(env.clone()); - if let Some(_root_ca_certs) = root_ca_certs { - panic!("Sorry, we dropped secure grpc support until we can either make openssl link properly, or switch to tower"); - /* - let creds = grpcio::ChannelCredentialsBuilder::new() - .root_cert(root_ca_certs) - .build(); - builder.secure_connect(address, creds) - */ - } else { - builder.connect(address) - } - }; - let operations_client = Arc::new(bazel_protos::operations_grpc::OperationsClient::new( - channel.clone(), - )); - struct Dst(SocketAddr); impl tokio_connect::Connect for Dst { @@ -370,7 +348,7 @@ impl CommandRunner { .map_err(|err| format!("Failed to resolve remote socket address URL: {}", err))? .next() .ok_or_else(|| "Remote server address resolved to no addresses".to_owned())?; - let execution_client = client::Connect::new( + let conn = client::Connect::new( Dst(socket_addr), h2::client::Builder::default(), DefaultExecutor::current(), @@ -378,7 +356,7 @@ impl CommandRunner { .make_service(()) .map_err(|err| format!("Error connecting to remote execution server: {}", err)) .and_then(move |conn| { - let conn = tower_http::add_origin::Builder::new() + tower_http::add_origin::Builder::new() .uri(uri) .build(conn) .map_err(|err| { @@ -386,36 +364,43 @@ impl CommandRunner { "Failed to add origin for remote execution server: {:?}", err ) - })?; - Ok(Mutex::new( - bazel_protos::build::bazel::remote::execution::v2::client::Execution::new(conn), - )) - }) - .to_boxed() - .shared(); + }) + .map(Mutex::new) + }); + let clients = conn + .map(|conn| { + let conn = conn.lock(); + let execution_client = Mutex::new( + bazel_protos::build::bazel::remote::execution::v2::client::Execution::new(conn.clone()), + ); + let operations_client = Mutex::new( + bazel_protos::google::longrunning::client::Operations::new(conn.clone()), + ); + Clients { + execution_client, + operations_client, + } + }) + .to_boxed() + .shared(); Ok(CommandRunner { cache_key_gen_version, instance_name, authorization_header: oauth_bearer_token.map(|t| format!("Bearer {}", t)), - channel, - env, - operations_client, - execution_client, + clients, store, futures_timer_thread, }) } - fn call_option(&self) -> grpcio::CallOption { - let mut call_option = grpcio::CallOption::default(); + fn make_request(&self, message: T) -> Request { + let mut request = Request::new(message); if let Some(ref authorization_header) = self.authorization_header { - let mut builder = grpcio::MetadataBuilder::with_capacity(1); - builder - .add_str("authorization", &authorization_header) - .unwrap(); - call_option = call_option.headers(builder.build()); + request + .metadata_mut() + .insert("authorization", authorization_header.parse().unwrap()); } - call_option + request } fn store_proto_locally( @@ -440,96 +425,116 @@ impl CommandRunner { trace!("Got operation response: {:?}", operation_or_status); let status = match operation_or_status { - OperationOrStatus::Operation(mut operation) => { - if !operation.get_done() { - return future::err(ExecutionError::NotFinished(operation.take_name())).to_boxed(); - } - if operation.has_error() { - return future::err(ExecutionError::Fatal(format_error(&operation.get_error()))) - .to_boxed(); + OperationOrStatus::Operation(operation) => { + if !operation.done { + return future::err(ExecutionError::NotFinished(operation.name)).to_boxed(); } - if !operation.has_response() { + let execute_response = if let Some(result) = operation.result { + match result { + bazel_protos::google::longrunning::operation::Result::Error(ref status) => { + return future::err(ExecutionError::Fatal(format_error(status))).to_boxed(); + } + bazel_protos::google::longrunning::operation::Result::Response(ref any) => try_future!( + bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse::decode( + &any.value + ) + .map_err(|e| ExecutionError::Fatal(format!("Invalid ExecuteResponse: {:?}", e))) + ), + } + } else { return future::err(ExecutionError::Fatal( "Operation finished but no response supplied".to_string(), )) .to_boxed(); - } + }; - let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new(); - try_future!(execute_response - .merge_from_bytes(operation.get_response().get_value()) - .map_err(|e| ExecutionError::Fatal(format!("Invalid ExecuteResponse: {:?}", e)))); trace!("Got (nested) execute response: {:?}", execute_response); - if execute_response.get_result().has_execution_metadata() { - let metadata = execute_response.get_result().get_execution_metadata(); - let enqueued = timespec_from(metadata.get_queued_timestamp()); - let worker_start = timespec_from(metadata.get_worker_start_timestamp()); - let input_fetch_start = timespec_from(metadata.get_input_fetch_start_timestamp()); - let input_fetch_completed = timespec_from(metadata.get_input_fetch_completed_timestamp()); - let execution_start = timespec_from(metadata.get_execution_start_timestamp()); - let execution_completed = timespec_from(metadata.get_execution_completed_timestamp()); - let output_upload_start = timespec_from(metadata.get_output_upload_start_timestamp()); - let output_upload_completed = - timespec_from(metadata.get_output_upload_completed_timestamp()); - - match (worker_start - enqueued).to_std() { - Ok(duration) => attempts.current_attempt.remote_queue = Some(duration), - Err(err) => warn!("Got negative remote queue time: {}", err), - } - match (input_fetch_completed - input_fetch_start).to_std() { - Ok(duration) => attempts.current_attempt.remote_input_fetch = Some(duration), - Err(err) => warn!("Got negative remote input fetch time: {}", err), - } - match (execution_completed - execution_start).to_std() { - Ok(duration) => attempts.current_attempt.remote_execution = Some(duration), - Err(err) => warn!("Got negative remote execution time: {}", err), - } - match (output_upload_completed - output_upload_start).to_std() { - Ok(duration) => attempts.current_attempt.remote_output_store = Some(duration), - Err(err) => warn!("Got negative remote output store time: {}", err), + if let Some(ref result) = execute_response.result { + if let Some(ref metadata) = result.execution_metadata { + let enqueued = timespec_from(&metadata.queued_timestamp); + let worker_start = timespec_from(&metadata.worker_start_timestamp); + let input_fetch_start = timespec_from(&metadata.input_fetch_start_timestamp); + let input_fetch_completed = timespec_from(&metadata.input_fetch_completed_timestamp); + let execution_start = timespec_from(&metadata.execution_start_timestamp); + let execution_completed = timespec_from(&metadata.execution_completed_timestamp); + let output_upload_start = timespec_from(&metadata.output_upload_start_timestamp); + let output_upload_completed = + timespec_from(&metadata.output_upload_completed_timestamp); + + match (worker_start - enqueued).to_std() { + Ok(duration) => attempts.current_attempt.remote_queue = Some(duration), + Err(err) => warn!("Got negative remote queue time: {}", err), + } + match (input_fetch_completed - input_fetch_start).to_std() { + Ok(duration) => attempts.current_attempt.remote_input_fetch = Some(duration), + Err(err) => warn!("Got negative remote input fetch time: {}", err), + } + match (execution_completed - execution_start).to_std() { + Ok(duration) => attempts.current_attempt.remote_execution = Some(duration), + Err(err) => warn!("Got negative remote execution time: {}", err), + } + match (output_upload_completed - output_upload_start).to_std() { + Ok(duration) => attempts.current_attempt.remote_output_store = Some(duration), + Err(err) => warn!("Got negative remote output store time: {}", err), + } + attempts.current_attempt.was_cache_hit = execute_response.cached_result; } - attempts.current_attempt.was_cache_hit = execute_response.cached_result; } let mut execution_attempts = std::mem::replace(&mut attempts.attempts, vec![]); execution_attempts.push(attempts.current_attempt); - let status = execute_response.take_status(); - if grpcio::RpcStatusCode::from(status.get_code()) == grpcio::RpcStatusCode::Ok { - return self - .extract_stdout(&execute_response) - .join(self.extract_stderr(&execute_response)) - .join(self.extract_output_files(&execute_response)) - .and_then(move |((stdout, stderr), output_directory)| { - Ok(FallibleExecuteProcessResult { - stdout: stdout, - stderr: stderr, - exit_code: execute_response.get_result().get_exit_code(), - output_directory: output_directory, - execution_attempts: execution_attempts, + let maybe_result = execute_response.result; + + let status = execute_response + .status + .unwrap_or_else(|| bazel_protos::google::rpc::Status { + code: bazel_protos::google::rpc::Code::Ok.into(), + message: String::new(), + details: vec![], + }); + if status.code == bazel_protos::google::rpc::Code::Ok.into() { + if let Some(result) = maybe_result { + return self + .extract_stdout(&result) + .join(self.extract_stderr(&result)) + .join(self.extract_output_files(&result)) + .and_then(move |((stdout, stderr), output_directory)| { + Ok(FallibleExecuteProcessResult { + stdout: stdout, + stderr: stderr, + exit_code: result.exit_code, + output_directory: output_directory, + execution_attempts: execution_attempts, + }) }) - }) + .to_boxed(); + } else { + return futures::future::err(ExecutionError::Fatal( + "No result found on ExecuteResponse".to_owned(), + )) .to_boxed(); + } } status } OperationOrStatus::Status(status) => status, }; - match grpcio::RpcStatusCode::from(status.get_code()) { - grpcio::RpcStatusCode::Ok => unreachable!(), - grpcio::RpcStatusCode::FailedPrecondition => { - if status.get_details().len() != 1 { + match bazel_protos::code_from_i32(status.code) { + bazel_protos::google::rpc::Code::Ok => unreachable!(), + bazel_protos::google::rpc::Code::FailedPrecondition => { + if status.details.len() != 1 { return future::err(ExecutionError::Fatal(format!( "Received multiple details in FailedPrecondition ExecuteResponse's status field: {:?}", - status.get_details() + status.details ))) .to_boxed(); } - let details = status.get_details().get(0).unwrap(); + let details = &status.details[0]; let mut precondition_failure = bazel_protos::error_details::PreconditionFailure::new(); - if details.get_type_url() + if details.type_url != format!( "type.googleapis.com/{}", precondition_failure.descriptor().full_name() @@ -538,13 +543,12 @@ impl CommandRunner { return future::err(ExecutionError::Fatal(format!( "Received FailedPrecondition, but didn't know how to resolve it: {},\ protobuf type {}", - status.get_message(), - details.get_type_url() + status.message, details.type_url ))) .to_boxed(); } try_future!(precondition_failure - .merge_from_bytes(details.get_value()) + .merge_from_bytes(&details.value) .map_err(|e| ExecutionError::Fatal(format!( "Error deserializing FailedPrecondition proto: {:?}", e @@ -592,8 +596,7 @@ impl CommandRunner { } code => future::err(ExecutionError::Fatal(format!( "Error from remote execution: {:?}: {:?}", - code, - status.get_message() + code, status.message ))) .to_boxed(), } @@ -602,11 +605,10 @@ impl CommandRunner { fn extract_stdout( &self, - execute_response: &bazel_protos::remote_execution::ExecuteResponse, + result: &bazel_protos::build::bazel::remote::execution::v2::ActionResult, ) -> BoxFuture { - if execute_response.get_result().has_stdout_digest() { - let stdout_digest_result: Result = - execute_response.get_result().get_stdout_digest().into(); + if let Some(ref stdout_digest) = result.stdout_digest { + let stdout_digest_result: Result = stdout_digest.into(); let stdout_digest = try_future!(stdout_digest_result .map_err(|err| ExecutionError::Fatal(format!("Error extracting stdout: {}", err)))); self @@ -628,7 +630,7 @@ impl CommandRunner { }) .to_boxed() } else { - let stdout_raw = Bytes::from(execute_response.get_result().get_stdout_raw()); + let stdout_raw = Bytes::from(result.stdout_raw.as_slice()); let stdout_copy = stdout_raw.clone(); self .store @@ -643,11 +645,10 @@ impl CommandRunner { fn extract_stderr( &self, - execute_response: &bazel_protos::remote_execution::ExecuteResponse, + result: &bazel_protos::build::bazel::remote::execution::v2::ActionResult, ) -> BoxFuture { - if execute_response.get_result().has_stderr_digest() { - let stderr_digest_result: Result = - execute_response.get_result().get_stderr_digest().into(); + if let Some(ref stderr_digest) = result.stderr_digest { + let stderr_digest_result: Result = stderr_digest.into(); let stderr_digest = try_future!(stderr_digest_result .map_err(|err| ExecutionError::Fatal(format!("Error extracting stderr: {}", err)))); self @@ -669,7 +670,7 @@ impl CommandRunner { }) .to_boxed() } else { - let stderr_raw = Bytes::from(execute_response.get_result().get_stderr_raw()); + let stderr_raw = Bytes::from(result.stderr_raw.as_slice()); let stderr_copy = stderr_raw.clone(); self .store @@ -684,21 +685,16 @@ impl CommandRunner { fn extract_output_files( &self, - execute_response: &bazel_protos::remote_execution::ExecuteResponse, + result: &bazel_protos::build::bazel::remote::execution::v2::ActionResult, ) -> BoxFuture { // Get Digests of output Directories. // Then we'll make a Directory for the output files, and merge them. - let mut directory_digests = - Vec::with_capacity(execute_response.get_result().get_output_directories().len() + 1); - // TODO: Maybe take rather than clone - let output_directories = execute_response - .get_result() - .get_output_directories() - .to_owned(); + let output_directories = result.output_directories.clone(); + let mut directory_digests = Vec::with_capacity(output_directories.len() + 1); for dir in output_directories { - let digest_result: Result = dir.get_tree_digest().into(); + let digest_result: Result = (&dir.tree_digest.unwrap()).into(); let mut digest = future::done(digest_result).to_boxed(); - for component in dir.get_path().rsplit('/') { + for component in dir.path.rsplit('/') { let component = component.to_owned(); let store = self.store.clone(); digest = digest @@ -721,19 +717,21 @@ impl CommandRunner { // Make a directory for the files let mut path_map = HashMap::new(); - let path_stats_result: Result, String> = execute_response - .get_result() - .get_output_files() + let output_files = result.output_files.clone(); + let path_stats_result: Result, String> = output_files .into_iter() .map(|output_file| { - let output_file_path_buf = PathBuf::from(output_file.get_path()); - let digest: Result = output_file.get_digest().into(); + let output_file_path_buf = PathBuf::from(output_file.path); + let digest = output_file + .digest + .ok_or_else(|| "No digest on remote execution output file".to_string())?; + let digest: Result = (&digest).into(); path_map.insert(output_file_path_buf.clone(), digest?); Ok(PathStat::file( output_file_path_buf.clone(), File { path: output_file_path_buf, - is_executable: output_file.get_is_executable(), + is_executable: output_file.is_executable, }, )) }) @@ -881,29 +879,32 @@ fn make_execute_request( Ok((action, command, execute_request)) } -fn format_error(error: &bazel_protos::status::Status) -> String { - let error_code_enum = bazel_protos::code::Code::from_i32(error.get_code()); +fn format_error(error: &bazel_protos::google::rpc::Status) -> String { + let error_code_enum = bazel_protos::code::Code::from_i32(error.code); let error_code = match error_code_enum { Some(x) => format!("{:?}", x), - None => format!("{:?}", error.get_code()), + None => format!("{:?}", error.code), }; - format!("{}: {}", error_code, error.get_message()) + format!("{}: {}", error_code, error.message) } /// /// If the given operation represents a cancelled request, recover it into /// ExecutionError::NotFinished. /// -fn rpcerror_recover_cancelled( +fn rpcerror_recover_cancelled( operation_name: String, - err: grpcio::Error, -) -> Result { + err: tower_grpc::Error, +) -> Result> { // If the error represented cancellation, return an Operation for the given Operation name. match &err { - &grpcio::Error::RpcFailure(ref rs) if rs.status == grpcio::RpcStatusCode::Cancelled => { - let mut next_operation = bazel_protos::operations::Operation::new(); - next_operation.set_name(operation_name); - return Ok(next_operation); + &tower_grpc::Error::Grpc(ref status) if status.code() == tower_grpc::Code::Cancelled => { + return Ok(bazel_protos::google::longrunning::Operation { + name: operation_name, + done: false, + metadata: None, + result: None, + }); } _ => {} } @@ -911,17 +912,6 @@ fn rpcerror_recover_cancelled( Err(err) } -fn rpcerror_to_string(error: grpcio::Error) -> String { - match error { - grpcio::Error::RpcFailure(status) => format!( - "{:?}: {:?}", - status.status, - status.details.unwrap_or_else(|| "[no message]".to_string()) - ), - err => format!("{:?}", err), - } -} - fn towergrpcerror_to_string(error: tower_grpc::Error) -> String { match error { tower_grpc::Error::Grpc(status) => { @@ -936,7 +926,7 @@ fn towergrpcerror_to_string(error: tower_grpc::Error) -> } } -fn digest(message: &dyn Message) -> Result { +fn digest(message: &dyn GrpcioMessage) -> Result { let bytes = message.write_to_bytes().map_err(|e| format!("{:?}", e))?; let mut hasher = Sha256::default(); @@ -948,20 +938,25 @@ fn digest(message: &dyn Message) -> Result { )) } -fn timespec_from(timestamp: &protobuf::well_known_types::Timestamp) -> time::Timespec { - time::Timespec::new(timestamp.seconds, timestamp.nanos) +fn timespec_from(timestamp: &Option) -> time::Timespec { + if let Some(timestamp) = timestamp { + time::Timespec::new(timestamp.seconds, timestamp.nanos) + } else { + time::Timespec::new(0, 0) + } } #[cfg(test)] mod tests { use bazel_protos; - use bytes::Bytes; + use bytes::{Bytes, BytesMut}; use fs; use futures::Future; - use grpcio; use hashing::{Digest, Fingerprint}; use mock; - use protobuf::{self, Message, ProtobufEnum}; + use prost::Message; + use prost_types; + use protobuf::{self, ProtobufEnum}; use tempfile::TempDir; use testutil::data::{TestData, TestDirectory}; use testutil::{as_bytes, owned_string_vec}; @@ -1057,10 +1052,7 @@ mod tests { )) .into(), ), - instance_name: String::new(), - execution_policy: None, - results_cache_policy: None, - skip_cache_lookup: false, + ..Default::default() }; assert_eq!( @@ -1149,9 +1141,7 @@ mod tests { .into(), ), instance_name: "dark-tower".to_owned(), - execution_policy: None, - results_cache_policy: None, - skip_cache_lookup: false, + ..Default::default() }; assert_eq!( @@ -1232,10 +1222,7 @@ mod tests { )) .into(), ), - instance_name: String::new(), - execution_policy: None, - results_cache_policy: None, - skip_cache_lookup: false, + ..Default::default() }; assert_eq!( @@ -1292,10 +1279,7 @@ mod tests { )) .into(), ), - instance_name: String::new(), - execution_policy: None, - results_cache_policy: None, - skip_cache_lookup: false, + ..Default::default() }; assert_eq!( @@ -1484,8 +1468,6 @@ mod tests { None, None, None, - None, - 1, store, timer_thread, ) @@ -1656,21 +1638,17 @@ mod tests { vec![ make_incomplete_operation(&op_name), MockOperation::new({ - let mut op = bazel_protos::operations::Operation::new(); - op.set_name(op_name.clone()); - op.set_done(true); - op.set_response({ - let mut response_wrapper = protobuf::well_known_types::Any::new(); - response_wrapper.set_type_url(format!( - "type.googleapis.com/{}", - bazel_protos::remote_execution::ExecuteResponse::new() - .descriptor() - .full_name() - )); - response_wrapper.set_value(vec![0x00, 0x00, 0x00]); - response_wrapper - }); - op + bazel_protos::google::longrunning::Operation { + name: op_name.clone(), + done: true, + result: Some( + bazel_protos::google::longrunning::operation::Result::Response(prost_types::Any { + type_url: "build.bazel.remote.execution.v2.ExecuteResponse".to_string(), + value: vec![0x00, 0x00, 0x00], + }), + ), + ..Default::default() + } }), ], )) @@ -1691,18 +1669,20 @@ mod tests { super::make_execute_request(&execute_request, &None, &None) .unwrap() .2, - vec![MockOperation::new({ - let mut op = bazel_protos::operations::Operation::new(); - op.set_name(op_name.to_string()); - op.set_done(true); - op.set_error({ - let mut error = bazel_protos::status::Status::new(); - error.set_code(bazel_protos::code::Code::INTERNAL.value()); - error.set_message("Something went wrong".to_string()); - error - }); - op - })], + vec![MockOperation::new( + bazel_protos::google::longrunning::Operation { + name: op_name.clone(), + done: true, + result: Some(bazel_protos::google::longrunning::operation::Result::Error( + bazel_protos::google::rpc::Status { + code: bazel_protos::code::Code::INTERNAL.value(), + message: "Something went wrong".to_string(), + details: vec![], + }, + )), + ..Default::default() + }, + )], )) }; @@ -1725,17 +1705,17 @@ mod tests { .2, vec![ make_incomplete_operation(&op_name), - MockOperation::new({ - let mut op = bazel_protos::operations::Operation::new(); - op.set_name(op_name.to_string()); - op.set_done(true); - op.set_error({ - let mut error = bazel_protos::status::Status::new(); - error.set_code(bazel_protos::code::Code::INTERNAL.value()); - error.set_message("Something went wrong".to_string()); - error - }); - op + MockOperation::new(bazel_protos::google::longrunning::Operation { + name: op_name.clone(), + done: true, + result: Some(bazel_protos::google::longrunning::operation::Result::Error( + bazel_protos::google::rpc::Status { + code: bazel_protos::code::Code::INTERNAL.value(), + message: "Something went wrong".to_string(), + details: vec![], + }, + )), + ..Default::default() }), ], )) @@ -1758,12 +1738,14 @@ mod tests { super::make_execute_request(&execute_request, &None, &None) .unwrap() .2, - vec![MockOperation::new({ - let mut op = bazel_protos::operations::Operation::new(); - op.set_name(op_name.to_string()); - op.set_done(true); - op - })], + vec![MockOperation::new( + bazel_protos::google::longrunning::Operation { + name: op_name.clone(), + done: true, + result: None, + ..Default::default() + }, + )], )) }; @@ -1786,11 +1768,11 @@ mod tests { .2, vec![ make_incomplete_operation(&op_name), - MockOperation::new({ - let mut op = bazel_protos::operations::Operation::new(); - op.set_name(op_name.to_string()); - op.set_done(true); - op + MockOperation::new(bazel_protos::google::longrunning::Operation { + name: op_name.clone(), + done: true, + result: None, + ..Default::default() }), ], )) @@ -1865,8 +1847,6 @@ mod tests { None, None, None, - None, - 1, store, timer_thread, ) @@ -1899,17 +1879,9 @@ mod tests { let mock_server = { let op_name = "cat".to_owned(); - let status = grpcio::RpcStatus { - status: grpcio::RpcStatusCode::FailedPrecondition, - details: None, - status_proto_bytes: Some( - make_precondition_failure_status(vec![missing_preconditionfailure_violation( - &roland.digest(), - )]) - .write_to_bytes() - .unwrap(), - ), - }; + let status = make_precondition_failure_status(vec![missing_preconditionfailure_violation( + &roland.digest(), + )]); mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), @@ -1964,8 +1936,6 @@ mod tests { None, None, None, - None, - 1, store, timer_thread, ) @@ -2035,8 +2005,6 @@ mod tests { None, None, None, - None, - 1, store, timer_thread, ) @@ -2050,9 +2018,11 @@ mod tests { #[test] fn format_error_complete() { - let mut error = bazel_protos::status::Status::new(); - error.set_code(bazel_protos::code::Code::CANCELLED.value()); - error.set_message("Oops, oh well!".to_string()); + let error = bazel_protos::google::rpc::Status { + code: bazel_protos::code::Code::CANCELLED.value(), + message: "Oops, oh well!".to_string(), + details: vec![], + }; assert_eq!( super::format_error(&error), "CANCELLED: Oops, oh well!".to_string() @@ -2061,9 +2031,11 @@ mod tests { #[test] fn extract_execute_response_unknown_code() { - let mut error = bazel_protos::status::Status::new(); - error.set_code(555); - error.set_message("Oops, oh well!".to_string()); + let error = bazel_protos::google::rpc::Status { + code: 555, + message: "Oops, oh well!".to_string(), + details: vec![], + }; assert_eq!( super::format_error(&error), "555: Oops, oh well!".to_string() @@ -2080,28 +2052,35 @@ mod tests { execution_attempts: vec![], }; - let mut output_file = bazel_protos::remote_execution::OutputFile::new(); - output_file.set_path("cats/roland".into()); - output_file.set_digest((&TestData::roland().digest()).into()); - output_file.set_is_executable(false); - let mut output_files = protobuf::RepeatedField::new(); - output_files.push(output_file); - - let mut operation = bazel_protos::operations::Operation::new(); - operation.set_name("cat".to_owned()); - operation.set_done(true); - operation.set_response(make_any_proto(&{ - let mut response = bazel_protos::remote_execution::ExecuteResponse::new(); - response.set_result({ - let mut result = bazel_protos::remote_execution::ActionResult::new(); - result.set_exit_code(want_result.exit_code); - result.set_stdout_raw(Bytes::from(want_result.stdout.clone())); - result.set_stderr_raw(Bytes::from(want_result.stderr.clone())); - result.set_output_files(output_files); - result - }); - response - })); + let response = bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse { + result: Some( + bazel_protos::build::bazel::remote::execution::v2::ActionResult { + exit_code: want_result.exit_code, + stdout_raw: want_result.stdout.to_vec(), + stderr_raw: want_result.stderr.to_vec(), + output_files: vec![ + bazel_protos::build::bazel::remote::execution::v2::OutputFile { + path: "cats/roland".to_string(), + digest: Some((&TestData::roland().digest()).into()), + is_executable: false, + }, + ], + ..Default::default() + }, + ), + ..Default::default() + }; + + let operation = bazel_protos::google::longrunning::Operation { + name: "cat".to_owned(), + done: true, + result: Some( + bazel_protos::google::longrunning::operation::Result::Response( + make_any_prost_executeresponse(&response), + ), + ), + ..Default::default() + }; assert_eq!( extract_execute_response(operation) @@ -2114,9 +2093,11 @@ mod tests { #[test] fn extract_execute_response_pending() { let operation_name = "cat".to_owned(); - let mut operation = bazel_protos::operations::Operation::new(); - operation.set_name(operation_name.clone()); - operation.set_done(false); + let operation = bazel_protos::google::longrunning::Operation { + name: operation_name.clone(), + done: false, + ..Default::default() + }; assert_eq!( extract_execute_response(operation), @@ -2151,11 +2132,10 @@ mod tests { fn extract_execute_response_missing_other_things() { let missing = vec![ missing_preconditionfailure_violation(&TestData::roland().digest()), - { - let mut violation = bazel_protos::error_details::PreconditionFailure_Violation::new(); - violation.set_field_type("MISSING".to_owned()); - violation.set_subject("monkeys".to_owned()); - violation + bazel_protos::google::rpc::precondition_failure::Violation { + type_: "MISSING".to_string(), + subject: "monkeys".to_string(), + description: "".to_string(), }, ]; @@ -2172,10 +2152,9 @@ mod tests { #[test] fn extract_execute_response_other_failed_precondition() { - let missing = vec![{ - let mut violation = bazel_protos::error_details::PreconditionFailure_Violation::new(); - violation.set_field_type("OUT_OF_CAPACITY".to_owned()); - violation + let missing = vec![bazel_protos::google::rpc::precondition_failure::Violation { + type_: "OUT_OF_CAPACITY".to_string(), + ..Default::default() }]; let operation = make_precondition_failure_operation(missing) @@ -2206,18 +2185,24 @@ mod tests { #[test] fn extract_execute_response_other_status() { - let mut operation = bazel_protos::operations::Operation::new(); - operation.set_name("cat".to_owned()); - operation.set_done(true); - operation.set_response(make_any_proto(&{ - let mut response = bazel_protos::remote_execution::ExecuteResponse::new(); - response.set_status({ - let mut status = bazel_protos::status::Status::new(); - status.set_code(grpcio::RpcStatusCode::PermissionDenied as i32); - status - }); - response - })); + let operation = bazel_protos::google::longrunning::Operation { + name: "cat".to_owned(), + done: true, + result: Some( + bazel_protos::google::longrunning::operation::Result::Response( + make_any_prost_executeresponse( + &bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse { + status: Some(bazel_protos::google::rpc::Status { + code: bazel_protos::google::rpc::Code::PermissionDenied.into(), + ..Default::default() + }), + ..Default::default() + }, + ), + ), + ), + ..Default::default() + }; match extract_execute_response(operation) { Err(ExecutionError::Fatal(err)) => assert_contains(&err, "PermissionDenied"), @@ -2346,103 +2331,90 @@ mod tests { #[test] fn extract_output_files_from_response_one_file() { - let mut output_file = bazel_protos::remote_execution::OutputFile::new(); - output_file.set_path("roland".into()); - output_file.set_digest((&TestData::roland().digest()).into()); - output_file.set_is_executable(false); - let mut output_files = protobuf::RepeatedField::new(); - output_files.push(output_file); - - let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new(); - execute_response.set_result({ - let mut result = bazel_protos::remote_execution::ActionResult::new(); - result.set_exit_code(0); - result.set_output_files(output_files); - result - }); - + let result = bazel_protos::build::bazel::remote::execution::v2::ActionResult { + exit_code: 0, + output_files: vec![ + bazel_protos::build::bazel::remote::execution::v2::OutputFile { + path: "roland".to_string(), + digest: Some((&TestData::roland().digest()).into()), + is_executable: false, + }, + ], + ..Default::default() + }; assert_eq!( - extract_output_files_from_response(&execute_response), + extract_output_files_from_response(&result), Ok(TestDirectory::containing_roland().digest()) ) } #[test] fn extract_output_files_from_response_two_files_not_nested() { - let mut output_file_1 = bazel_protos::remote_execution::OutputFile::new(); - output_file_1.set_path("roland".into()); - output_file_1.set_digest((&TestData::roland().digest()).into()); - output_file_1.set_is_executable(false); - - let mut output_file_2 = bazel_protos::remote_execution::OutputFile::new(); - output_file_2.set_path("treats".into()); - output_file_2.set_digest((&TestData::catnip().digest()).into()); - output_file_2.set_is_executable(false); - let mut output_files = protobuf::RepeatedField::new(); - output_files.push(output_file_1); - output_files.push(output_file_2); - - let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new(); - execute_response.set_result({ - let mut result = bazel_protos::remote_execution::ActionResult::new(); - result.set_exit_code(0); - result.set_output_files(output_files); - result - }); + let output_files = vec![ + bazel_protos::build::bazel::remote::execution::v2::OutputFile { + path: "roland".to_string(), + digest: Some((&TestData::roland().digest()).into()), + is_executable: false, + }, + bazel_protos::build::bazel::remote::execution::v2::OutputFile { + path: "treats".to_string(), + digest: Some((&TestData::catnip().digest()).into()), + is_executable: false, + }, + ]; + + let result = bazel_protos::build::bazel::remote::execution::v2::ActionResult { + output_files, + ..Default::default() + }; assert_eq!( - extract_output_files_from_response(&execute_response), + extract_output_files_from_response(&result), Ok(TestDirectory::containing_roland_and_treats().digest()) ) } #[test] fn extract_output_files_from_response_two_files_nested() { - let mut output_file_1 = bazel_protos::remote_execution::OutputFile::new(); - output_file_1.set_path("cats/roland".into()); - output_file_1.set_digest((&TestData::roland().digest()).into()); - output_file_1.set_is_executable(false); - - let mut output_file_2 = bazel_protos::remote_execution::OutputFile::new(); - output_file_2.set_path("treats".into()); - output_file_2.set_digest((&TestData::catnip().digest()).into()); - output_file_2.set_is_executable(false); - let mut output_files = protobuf::RepeatedField::new(); - output_files.push(output_file_1); - output_files.push(output_file_2); - - let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new(); - execute_response.set_result({ - let mut result = bazel_protos::remote_execution::ActionResult::new(); - result.set_exit_code(0); - result.set_output_files(output_files); - result - }); + let output_files = vec![ + bazel_protos::build::bazel::remote::execution::v2::OutputFile { + path: "cats/roland".to_string(), + digest: Some((&TestData::roland().digest()).into()), + is_executable: false, + }, + bazel_protos::build::bazel::remote::execution::v2::OutputFile { + path: "treats".to_string(), + digest: Some((&TestData::catnip().digest()).into()), + is_executable: false, + }, + ]; + + let result = bazel_protos::build::bazel::remote::execution::v2::ActionResult { + output_files, + ..Default::default() + }; assert_eq!( - extract_output_files_from_response(&execute_response), + extract_output_files_from_response(&result), Ok(TestDirectory::recursive().digest()) ) } #[test] fn extract_output_files_from_response_just_directory() { - let mut output_directory = bazel_protos::remote_execution::OutputDirectory::new(); - output_directory.set_path("cats".into()); - output_directory.set_tree_digest((&TestDirectory::containing_roland().digest()).into()); - let mut output_directories = protobuf::RepeatedField::new(); - output_directories.push(output_directory); - - let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new(); - execute_response.set_result({ - let mut result = bazel_protos::remote_execution::ActionResult::new(); - result.set_exit_code(0); - result.set_output_directories(output_directories); - result - }); + let result = bazel_protos::build::bazel::remote::execution::v2::ActionResult { + exit_code: 0, + output_directories: vec![ + bazel_protos::build::bazel::remote::execution::v2::OutputDirectory { + path: "cats".to_owned(), + tree_digest: Some((&TestDirectory::containing_roland().digest()).into()), + }, + ], + ..Default::default() + }; assert_eq!( - extract_output_files_from_response(&execute_response), + extract_output_files_from_response(&result), Ok(TestDirectory::nested().digest()) ) } @@ -2453,40 +2425,29 @@ mod tests { // /pets/cats/roland // /pets/dogs/robin - let mut output_directories = protobuf::RepeatedField::new(); - output_directories.push({ - let mut output_directory = bazel_protos::remote_execution::OutputDirectory::new(); - output_directory.set_path("pets/cats".into()); - output_directory.set_tree_digest((&TestDirectory::containing_roland().digest()).into()); - output_directory - }); - output_directories.push({ - let mut output_directory = bazel_protos::remote_execution::OutputDirectory::new(); - output_directory.set_path("pets/dogs".into()); - output_directory.set_tree_digest((&TestDirectory::containing_robin().digest()).into()); - output_directory - }); - - let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new(); - execute_response.set_result({ - let mut result = bazel_protos::remote_execution::ActionResult::new(); - result.set_exit_code(0); - result.set_output_directories(output_directories); - result.set_output_files({ - let mut output_files = protobuf::RepeatedField::new(); - output_files.push({ - let mut output_file = bazel_protos::remote_execution::OutputFile::new(); - output_file.set_path("treats".into()); - output_file.set_digest((&TestData::catnip().digest()).into()); - output_file - }); - output_files - }); - result - }); + let result = bazel_protos::build::bazel::remote::execution::v2::ActionResult { + output_files: vec![ + bazel_protos::build::bazel::remote::execution::v2::OutputFile { + path: "treats".to_owned(), + digest: Some((&TestData::catnip().digest()).into()), + is_executable: false, + }, + ], + output_directories: vec![ + bazel_protos::build::bazel::remote::execution::v2::OutputDirectory { + path: "pets/cats".to_owned(), + tree_digest: Some((&TestDirectory::containing_roland().digest()).into()), + }, + bazel_protos::build::bazel::remote::execution::v2::OutputDirectory { + path: "pets/dogs".to_owned(), + tree_digest: Some((&TestDirectory::containing_robin().digest()).into()), + }, + ], + ..Default::default() + }; assert_eq!( - extract_output_files_from_response(&execute_response), + extract_output_files_from_response(&result), Ok(Digest( Fingerprint::from_hex_string( "639b4b84bb58a9353d49df8122e7987baf038efe54ed035e67910846c865b1e2" @@ -2518,16 +2479,19 @@ mod tests { } fn make_incomplete_operation(operation_name: &str) -> MockOperation { - let mut op = bazel_protos::operations::Operation::new(); - op.set_name(operation_name.to_string()); - op.set_done(false); - MockOperation::new(op) + MockOperation::new(bazel_protos::google::longrunning::Operation { + name: operation_name.to_string(), + done: false, + ..Default::default() + }) } fn make_delayed_incomplete_operation(operation_name: &str, delay: Duration) -> MockOperation { - let mut op = bazel_protos::operations::Operation::new(); - op.set_name(operation_name.to_string()); - op.set_done(false); + let op = bazel_protos::google::longrunning::Operation { + name: operation_name.to_string(), + done: false, + ..Default::default() + }; MockOperation { op: Ok(Some(op)), duration: Some(delay), @@ -2540,72 +2504,74 @@ mod tests { stderr: StderrType, exit_code: i32, ) -> MockOperation { - let mut op = bazel_protos::operations::Operation::new(); - op.set_name(operation_name.to_string()); - op.set_done(true); - op.set_response({ - let mut response_proto = bazel_protos::remote_execution::ExecuteResponse::new(); - response_proto.set_result({ - let mut action_result = bazel_protos::remote_execution::ActionResult::new(); - match stdout { - StdoutType::Raw(stdout_raw) => { - action_result.set_stdout_raw(Bytes::from(stdout_raw)); - } - StdoutType::Digest(stdout_digest) => { - action_result.set_stdout_digest((&stdout_digest).into()); - } - } - match stderr { - StderrType::Raw(stderr_raw) => { - action_result.set_stderr_raw(Bytes::from(stderr_raw)); - } - StderrType::Digest(stderr_digest) => { - action_result.set_stderr_digest((&stderr_digest).into()); - } - } - action_result.set_exit_code(exit_code); - action_result - }); + let (stdout_raw, stdout_digest) = match stdout { + StdoutType::Raw(stdout_raw) => (stdout_raw.as_bytes().to_vec(), None), + StdoutType::Digest(stdout_digest) => (vec![], Some((&stdout_digest).into())), + }; - let mut response_wrapper = protobuf::well_known_types::Any::new(); - response_wrapper.set_type_url(format!( - "type.googleapis.com/{}", - response_proto.descriptor().full_name() - )); - let response_proto_bytes = response_proto.write_to_bytes().unwrap(); - response_wrapper.set_value(response_proto_bytes); - response_wrapper - }); + let (stderr_raw, stderr_digest) = match stderr { + StderrType::Raw(stderr_raw) => (stderr_raw.as_bytes().to_vec(), None), + StderrType::Digest(stderr_digest) => (vec![], Some((&stderr_digest).into())), + }; + + let response_proto = bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse { + result: Some( + bazel_protos::build::bazel::remote::execution::v2::ActionResult { + stdout_raw, + stdout_digest, + stderr_raw, + stderr_digest, + exit_code, + ..Default::default() + }, + ), + ..Default::default() + }; + + let op = bazel_protos::google::longrunning::Operation { + name: operation_name.to_string(), + done: true, + result: Some( + bazel_protos::google::longrunning::operation::Result::Response( + make_any_prost_executeresponse(&response_proto), + ), + ), + ..Default::default() + }; MockOperation::new(op) } fn make_precondition_failure_operation( - violations: Vec, + violations: Vec, ) -> MockOperation { - let mut operation = bazel_protos::operations::Operation::new(); - operation.set_name("cat".to_owned()); - operation.set_done(true); - operation.set_response(make_any_proto(&{ - let mut response = bazel_protos::remote_execution::ExecuteResponse::new(); - response.set_status(make_precondition_failure_status(violations)); - response - })); + let response = bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse { + status: Some(make_precondition_failure_status(violations)), + ..Default::default() + }; + let operation = bazel_protos::google::longrunning::Operation { + name: "cat".to_string(), + done: true, + result: Some( + bazel_protos::google::longrunning::operation::Result::Response( + make_any_prost_executeresponse(&response), + ), + ), + ..Default::default() + }; MockOperation::new(operation) } fn make_precondition_failure_status( - violations: Vec, - ) -> bazel_protos::status::Status { - let mut status = bazel_protos::status::Status::new(); - status.set_code(grpcio::RpcStatusCode::FailedPrecondition as i32); - status.mut_details().push(make_any_proto(&{ - let mut precondition_failure = bazel_protos::error_details::PreconditionFailure::new(); - for violation in violations.into_iter() { - precondition_failure.mut_violations().push(violation); - } - precondition_failure - })); - status + violations: Vec, + ) -> bazel_protos::google::rpc::Status { + bazel_protos::google::rpc::Status { + code: bazel_protos::google::rpc::Code::FailedPrecondition.into(), + details: vec![make_any_prost_proto( + "google.rpc.PreconditionFailure", + &bazel_protos::google::rpc::PreconditionFailure { violations }, + )], + ..Default::default() + } } fn run_command_remote( @@ -2642,7 +2608,7 @@ mod tests { ) .expect("Failed to make store"); - CommandRunner::new(&address, None, None, None, None, 1, store, timer_thread) + CommandRunner::new(&address, None, None, None, store, timer_thread) .expect("Failed to make command runner") } @@ -2651,7 +2617,7 @@ mod tests { } fn extract_execute_response( - operation: bazel_protos::operations::Operation, + operation: bazel_protos::google::longrunning::Operation, ) -> Result { let cas = mock::StubCAS::builder() .file(&TestData::roland()) @@ -2669,7 +2635,7 @@ mod tests { } fn extract_output_files_from_response( - execute_response: &bazel_protos::remote_execution::ExecuteResponse, + result: &bazel_protos::build::bazel::remote::execution::v2::ActionResult, ) -> Result { let cas = mock::StubCAS::builder() .file(&TestData::roland()) @@ -2678,29 +2644,36 @@ mod tests { let mut runtime = tokio::runtime::Runtime::new().unwrap(); let command_runner = create_command_runner("127.0.0.1:0".to_owned(), &cas); - let result = runtime.block_on(command_runner.extract_output_files(&execute_response)); + let result = runtime.block_on(command_runner.extract_output_files(result)); runtime.shutdown_now().wait().unwrap(); result } - fn make_any_proto(message: &dyn Message) -> protobuf::well_known_types::Any { - let mut any = protobuf::well_known_types::Any::new(); - any.set_type_url(format!( - "type.googleapis.com/{}", - message.descriptor().full_name() - )); - any.set_value(message.write_to_bytes().expect("Error serializing proto")); - any + fn make_any_prost_executeresponse( + message: &bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse, + ) -> prost_types::Any { + make_any_prost_proto("build.bazel.remote.execution.v2.ExecuteResponse", message) + } + + fn make_any_prost_proto(message_name: &str, message: &M) -> prost_types::Any { + let size = message.encoded_len(); + let mut value = BytesMut::with_capacity(size); + message.encode(&mut value).expect("Error serializing proto"); + prost_types::Any { + type_url: format!("type.googleapis.com/{}", message_name), + value: value.to_vec(), + } } fn missing_preconditionfailure_violation( digest: &Digest, - ) -> bazel_protos::error_details::PreconditionFailure_Violation { + ) -> bazel_protos::google::rpc::precondition_failure::Violation { { - let mut violation = bazel_protos::error_details::PreconditionFailure_Violation::new(); - violation.set_field_type("MISSING".to_owned()); - violation.set_subject(format!("blobs/{}/{}", digest.0, digest.1)); - violation + bazel_protos::google::rpc::precondition_failure::Violation { + type_: "MISSING".to_owned(), + subject: format!("blobs/{}/{}", digest.0, digest.1), + ..Default::default() + } } } diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index a7f074f5f27..aa424aab3df 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -92,13 +92,6 @@ fn main() { If unspecified, local execution will be performed.", ), ) - .arg( - Arg::with_name("execution-root-ca-cert-file") - .help("Path to file containing root certificate authority certificates for the execution server. If not set, TLS will not be used when connecting to the execution server.") - .takes_value(true) - .long("execution-root-ca-cert-file") - .required(false) - ) .arg( Arg::with_name("execution-oauth-bearer-token-path") .help("Path to file containing oauth bearer token for communication with the execution server. If not set, no authorization will be provided to remote servers.") @@ -257,12 +250,6 @@ fn main() { let runner: Box = match server_arg { Some(address) => { - let root_ca_certs = if let Some(path) = args.value_of("execution-root-ca-cert-file") { - Some(std::fs::read(path).expect("Error reading root CA certs file")) - } else { - None - }; - let oauth_bearer_token = if let Some(path) = args.value_of("execution-oauth-bearer-token-path") { Some(std::fs::read_to_string(path).expect("Error reading oauth bearer token file")) @@ -275,9 +262,7 @@ fn main() { address, args.value_of("cache-key-gen-version").map(str::to_owned), remote_instance_arg, - root_ca_certs, oauth_bearer_token, - 1, store, timer_thread, ) diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index cdd332d0d68..1fac707784c 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -135,10 +135,7 @@ impl Core { address, remote_execution_process_cache_namespace.clone(), remote_instance_name.clone(), - root_ca_certs.clone(), oauth_bearer_token.clone(), - // Allow for some overhead for bookkeeping threads (if any). - process_execution_parallelism + 2, store.clone(), futures_timer_thread2.clone(), ) diff --git a/src/rust/engine/testutil/mock/src/execution_server.rs b/src/rust/engine/testutil/mock/src/execution_server.rs index 07b49af9e47..1b690212a62 100644 --- a/src/rust/engine/testutil/mock/src/execution_server.rs +++ b/src/rust/engine/testutil/mock/src/execution_server.rs @@ -22,12 +22,13 @@ use protobuf; /// #[derive(Clone, Debug)] pub struct MockOperation { - pub op: Result, grpcio::RpcStatus>, + pub op: + Result, bazel_protos::google::rpc::Status>, pub duration: Option, } impl MockOperation { - pub fn new(op: bazel_protos::operations::Operation) -> MockOperation { + pub fn new(op: bazel_protos::google::longrunning::Operation) -> MockOperation { MockOperation { op: Ok(Some(op)), duration: None, @@ -190,9 +191,9 @@ impl MockResponder { } if let Ok(Some(op)) = op { // Complete the channel with the op. - sink.success(op.clone()); + sink.success(op.clone().into()); } else if let Err(status) = op { - sink.fail(status); + sink.fail(status.into()); } else { // Cancel the request by dropping the sink. drop(sink); @@ -218,13 +219,13 @@ impl MockResponder { if let Ok(Some(op)) = op { ctx.spawn( sink - .send((op.clone(), grpcio::WriteFlags::default())) + .send((op.clone().into(), grpcio::WriteFlags::default())) .map(|mut stream| stream.close()) .map(|_| ()) .map_err(|_| ()), ) } else if let Err(status) = op { - sink.fail(status); + sink.fail(status.into()); } else { // Cancel the request by dropping the sink. drop(sink)