Skip to content

Commit

Permalink
Remove unused operation wrapper (pantsbuild#7194)
Browse files Browse the repository at this point in the history
The tower migration made this obsolete
  • Loading branch information
illicitonion authored Feb 1, 2019
1 parent 884acc6 commit 9400024
Showing 1 changed file with 90 additions and 108 deletions.
198 changes: 90 additions & 108 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@ use tower_util::MakeService;
// CommandRunner.
const CACHE_KEY_GEN_VERSION_ENV_VAR_NAME: &str = "PANTS_CACHE_KEY_GEN_VERSION";

#[derive(Debug)]
enum OperationOrStatus {
Operation(bazel_protos::google::longrunning::Operation),
Status(bazel_protos::google::rpc::Status),
}

type Connection = tower_http::add_origin::AddOrigin<
tower_h2::client::Connection<tokio::net::tcp::TcpStream, DefaultExecutor, tower_grpc::BoxBody>,
>;
Expand Down Expand Up @@ -88,7 +82,7 @@ impl CommandRunner {
fn oneshot_execute(
&self,
execute_request: bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest,
) -> impl Future<Item = OperationOrStatus, Error = String> {
) -> impl Future<Item = bazel_protos::google::longrunning::Operation, Error = String> {
let command_runner = self.clone();
self
.clients
Expand All @@ -115,7 +109,6 @@ impl CommandRunner {
std::mem::drop(stream);
resp.ok_or_else(|| "Didn't get response from remote process execution".to_owned())
})
.map(OperationOrStatus::Operation)
})
})
}
Expand Down Expand Up @@ -281,7 +274,6 @@ impl super::CommandRunner for CommandRunner {
})
.map_err(towergrpcerror_to_string)
})
.map(OperationOrStatus::Operation)
.map(move |operation| {
future::Loop::Continue((history, operation, iter_num + 1))
})
Expand Down Expand Up @@ -419,108 +411,99 @@ impl CommandRunner {

fn extract_execute_response(
&self,
operation_or_status: OperationOrStatus,
operation: bazel_protos::google::longrunning::Operation,
attempts: &mut ExecutionHistory,
) -> BoxFuture<FallibleExecuteProcessResult, ExecutionError> {
trace!("Got operation response: {:?}", operation_or_status);
trace!("Got operation response: {:?}", operation);

let status = match operation_or_status {
OperationOrStatus::Operation(operation) => {
if !operation.done {
return future::err(ExecutionError::NotFinished(operation.name)).to_boxed();
}
let execute_response = if let Some(result) = operation.result {
match result {
bazel_protos::google::longrunning::operation::Result::Error(ref status) => {
return future::err(ExecutionError::Fatal(format_error(status))).to_boxed();
}
bazel_protos::google::longrunning::operation::Result::Response(ref any) => try_future!(
bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse::decode(
&any.value
)
.map_err(|e| ExecutionError::Fatal(format!("Invalid ExecuteResponse: {:?}", e)))
),
}
} else {
return future::err(ExecutionError::Fatal(
"Operation finished but no response supplied".to_string(),
))
.to_boxed();
};

trace!("Got (nested) execute response: {:?}", execute_response);

if let Some(ref result) = execute_response.result {
if let Some(ref metadata) = result.execution_metadata {
let enqueued = timespec_from(&metadata.queued_timestamp);
let worker_start = timespec_from(&metadata.worker_start_timestamp);
let input_fetch_start = timespec_from(&metadata.input_fetch_start_timestamp);
let input_fetch_completed = timespec_from(&metadata.input_fetch_completed_timestamp);
let execution_start = timespec_from(&metadata.execution_start_timestamp);
let execution_completed = timespec_from(&metadata.execution_completed_timestamp);
let output_upload_start = timespec_from(&metadata.output_upload_start_timestamp);
let output_upload_completed =
timespec_from(&metadata.output_upload_completed_timestamp);

match (worker_start - enqueued).to_std() {
Ok(duration) => attempts.current_attempt.remote_queue = Some(duration),
Err(err) => warn!("Got negative remote queue time: {}", err),
}
match (input_fetch_completed - input_fetch_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_input_fetch = Some(duration),
Err(err) => warn!("Got negative remote input fetch time: {}", err),
}
match (execution_completed - execution_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_execution = Some(duration),
Err(err) => warn!("Got negative remote execution time: {}", err),
}
match (output_upload_completed - output_upload_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_output_store = Some(duration),
Err(err) => warn!("Got negative remote output store time: {}", err),
}
attempts.current_attempt.was_cache_hit = execute_response.cached_result;
}
if !operation.done {
return future::err(ExecutionError::NotFinished(operation.name)).to_boxed();
}
let execute_response = if let Some(result) = operation.result {
match result {
bazel_protos::google::longrunning::operation::Result::Error(ref status) => {
return future::err(ExecutionError::Fatal(format_error(status))).to_boxed();
}
bazel_protos::google::longrunning::operation::Result::Response(ref any) => try_future!(
bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse::decode(&any.value)
.map_err(|e| ExecutionError::Fatal(format!("Invalid ExecuteResponse: {:?}", e)))
),
}
} else {
return future::err(ExecutionError::Fatal(
"Operation finished but no response supplied".to_string(),
))
.to_boxed();
};

let mut execution_attempts = std::mem::replace(&mut attempts.attempts, vec![]);
execution_attempts.push(attempts.current_attempt);

let maybe_result = execute_response.result;

let status = execute_response
.status
.unwrap_or_else(|| bazel_protos::google::rpc::Status {
code: bazel_protos::google::rpc::Code::Ok.into(),
message: String::new(),
details: vec![],
});
if status.code == bazel_protos::google::rpc::Code::Ok.into() {
if let Some(result) = maybe_result {
return self
.extract_stdout(&result)
.join(self.extract_stderr(&result))
.join(self.extract_output_files(&result))
.and_then(move |((stdout, stderr), output_directory)| {
Ok(FallibleExecuteProcessResult {
stdout: stdout,
stderr: stderr,
exit_code: result.exit_code,
output_directory: output_directory,
execution_attempts: execution_attempts,
})
})
.to_boxed();
} else {
return futures::future::err(ExecutionError::Fatal(
"No result found on ExecuteResponse".to_owned(),
))
.to_boxed();
}
trace!("Got (nested) execute response: {:?}", execute_response);

if let Some(ref result) = execute_response.result {
if let Some(ref metadata) = result.execution_metadata {
let enqueued = timespec_from(&metadata.queued_timestamp);
let worker_start = timespec_from(&metadata.worker_start_timestamp);
let input_fetch_start = timespec_from(&metadata.input_fetch_start_timestamp);
let input_fetch_completed = timespec_from(&metadata.input_fetch_completed_timestamp);
let execution_start = timespec_from(&metadata.execution_start_timestamp);
let execution_completed = timespec_from(&metadata.execution_completed_timestamp);
let output_upload_start = timespec_from(&metadata.output_upload_start_timestamp);
let output_upload_completed = timespec_from(&metadata.output_upload_completed_timestamp);

match (worker_start - enqueued).to_std() {
Ok(duration) => attempts.current_attempt.remote_queue = Some(duration),
Err(err) => warn!("Got negative remote queue time: {}", err),
}
match (input_fetch_completed - input_fetch_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_input_fetch = Some(duration),
Err(err) => warn!("Got negative remote input fetch time: {}", err),
}
status
match (execution_completed - execution_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_execution = Some(duration),
Err(err) => warn!("Got negative remote execution time: {}", err),
}
match (output_upload_completed - output_upload_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_output_store = Some(duration),
Err(err) => warn!("Got negative remote output store time: {}", err),
}
attempts.current_attempt.was_cache_hit = execute_response.cached_result;
}
OperationOrStatus::Status(status) => status,
};
}

let mut execution_attempts = std::mem::replace(&mut attempts.attempts, vec![]);
execution_attempts.push(attempts.current_attempt);

let maybe_result = execute_response.result;

let status = execute_response
.status
.unwrap_or_else(|| bazel_protos::google::rpc::Status {
code: bazel_protos::google::rpc::Code::Ok.into(),
message: String::new(),
details: vec![],
});
if status.code == bazel_protos::google::rpc::Code::Ok.into() {
if let Some(result) = maybe_result {
return self
.extract_stdout(&result)
.join(self.extract_stderr(&result))
.join(self.extract_output_files(&result))
.and_then(move |((stdout, stderr), output_directory)| {
Ok(FallibleExecuteProcessResult {
stdout: stdout,
stderr: stderr,
exit_code: result.exit_code,
output_directory: output_directory,
execution_attempts: execution_attempts,
})
})
.to_boxed();
} else {
return futures::future::err(ExecutionError::Fatal(
"No result found on ExecuteResponse".to_owned(),
))
.to_boxed();
}
}

match bazel_protos::code_from_i32(status.code) {
bazel_protos::google::rpc::Code::Ok => unreachable!(),
Expand Down Expand Up @@ -2625,10 +2608,9 @@ mod tests {
.build();
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let command_runner = create_command_runner("127.0.0.1:0".to_owned(), &cas);
let result = runtime.block_on(command_runner.extract_execute_response(
super::OperationOrStatus::Operation(operation),
&mut ExecutionHistory::default(),
));
let result = runtime.block_on(
command_runner.extract_execute_response(operation, &mut ExecutionHistory::default()),
);

runtime.shutdown_now().wait().unwrap();
result
Expand Down

0 comments on commit 9400024

Please sign in to comment.