Skip to content

Commit

Permalink
Disable workflow request/response calls from ingress, and invoke them…
Browse files Browse the repository at this point in the history
… only once from context client (#1603)

* Add liburing-devel to dev deps

* Allow a request/response to a workflow run method only once. This makes sure that invoking a workflow run from a context client can be executed only once.

* Don't serialize InvocationError#description if empty

* Fix error message
  • Loading branch information
slinkydeveloper committed Jun 6, 2024
1 parent d76b781 commit 6a45fed
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 41 deletions.
4 changes: 4 additions & 0 deletions crates/types/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub mod codes {
pub struct InvocationError {
code: InvocationErrorCode,
message: Cow<'static, str>,
#[serde(skip_serializing_if = "Option::is_none")]
description: Option<Cow<'static, str>>,
}

Expand Down Expand Up @@ -214,6 +215,9 @@ pub const ATTACH_NOT_SUPPORTED_INVOCATION_ERROR: InvocationError =
pub const ALREADY_COMPLETED_INVOCATION_ERROR: InvocationError =
InvocationError::new_static(codes::CONFLICT, "promise was already completed");

pub const WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR: InvocationError =
InvocationError::new_static(codes::CONFLICT, "the workflow method was already invoked");

/// Error parsing/decoding a resource ID.
#[derive(Debug, thiserror::Error, Clone, Eq, PartialEq)]
pub enum IdDecodeError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use restate_storage_api::Result as StorageResult;
use restate_types::errors::{
InvocationError, InvocationErrorCode, ALREADY_COMPLETED_INVOCATION_ERROR,
ATTACH_NOT_SUPPORTED_INVOCATION_ERROR, CANCELED_INVOCATION_ERROR, GONE_INVOCATION_ERROR,
KILLED_INVOCATION_ERROR, NOT_FOUND_INVOCATION_ERROR,
KILLED_INVOCATION_ERROR, NOT_FOUND_INVOCATION_ERROR, WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR,
};
use restate_types::identifiers::{
EntryIndex, IdempotencyId, InvocationId, JournalEntryId, PartitionKey, ServiceId,
Expand Down Expand Up @@ -328,27 +328,38 @@ where
// If locked, then we check the original invocation
if let VirtualObjectStatus::Locked(original_invocation_id) = service_status {
if let Some(response_sink) = service_invocation.response_sink {
let invocation_status =
state.get_invocation_status(&original_invocation_id).await?;

match invocation_status {
InvocationStatus::Completed(
CompletedInvocation { response_result, .. }) => {
self.send_response_to_sinks(
effects,
iter::once(response_sink),
response_result,
Some(service_invocation.invocation_id),
Some(&service_invocation.invocation_target),
);
}
InvocationStatus::Free => panic!("Unexpected state, the InvocationStatus cannot be Free for invocation {} given it's in locked status", original_invocation_id),
is => effects.append_response_sink(
original_invocation_id,
is,
response_sink
)
}
// --- ATTACH business logic below, this is currently disabled due to the pending discussion about equality check.
// We instead simply fail the invocation with CONFLICT status code
//
// let invocation_status =
// state.get_invocation_status(&original_invocation_id).await?;
//
// match invocation_status {
// InvocationStatus::Completed(
// CompletedInvocation { response_result, .. }) => {
// self.send_response_to_sinks(
// effects,
// iter::once(response_sink),
// response_result,
// Some(service_invocation.invocation_id),
// Some(&service_invocation.invocation_target),
// );
// }
// InvocationStatus::Free => panic!("Unexpected state, the InvocationStatus cannot be Free for invocation {} given it's in locked status", original_invocation_id),
// is => effects.append_response_sink(
// original_invocation_id,
// is,
// response_sink
// )
// }

self.send_response_to_sinks(
effects,
iter::once(response_sink),
ResponseResult::Failure(WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR),
Some(original_invocation_id),
Some(&service_invocation.invocation_target),
);
}

Self::send_submit_notification_if_needed(
Expand Down
52 changes: 34 additions & 18 deletions crates/worker/src/partition/state_machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1617,6 +1617,7 @@ mod tests {
use restate_storage_api::invocation_status_table::{CompletedInvocation, StatusTimestamps};
use restate_storage_api::service_status_table::ReadOnlyVirtualObjectStatusTable;
use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind};
use restate_types::errors::WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR;
use restate_types::invocation::{
AttachInvocationRequest, InvocationQuery, InvocationTarget,
};
Expand Down Expand Up @@ -1684,10 +1685,25 @@ mod tests {
.await;
assert_that!(
actions,
not(contains(pat!(Action::Invoke {
invocation_id: eq(invocation_id),
invoke_input_journal: pat!(InvokeInputJournal::CachedJournal(_, _))
})))
all!(
not(contains(pat!(Action::Invoke {
invocation_id: eq(invocation_id),
invoke_input_journal: pat!(InvokeInputJournal::CachedJournal(_, _))
}))),
// We get back this error due to the fact that we disabled the attach semantics
contains(pat!(Action::IngressResponse(pat!(
IngressResponseEnvelope {
target_node: eq(node_id),
inner: pat!(ingress::InvocationResponse {
request_id: eq(request_id_2),
invocation_id: some(eq(invocation_id)),
response: eq(IngressResponseResult::Failure(
WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR
))
})
}
))))
)
);

// Send output, then end
Expand Down Expand Up @@ -1727,7 +1743,8 @@ mod tests {
})
}
)))),
contains(pat!(Action::IngressResponse(pat!(
// This is a not() because we currently disabled the attach semantics on request/response
not(contains(pat!(Action::IngressResponse(pat!(
IngressResponseEnvelope {
target_node: eq(node_id),
inner: pat!(ingress::InvocationResponse {
Expand All @@ -1739,7 +1756,7 @@ mod tests {
))
})
}
)))),
))))),
contains(pat!(Action::ScheduleInvocationStatusCleanup {
invocation_id: eq(invocation_id)
}))
Expand All @@ -1760,7 +1777,7 @@ mod tests {
})))
);

// Sending a new request will be completed immediately
// Sending a new request will not be completed because we don't support attach semantics
let request_id_3 = IngressRequestId::default();
let actions = state_machine
.apply(Command::Invoke(ServiceInvocation {
Expand All @@ -1779,11 +1796,10 @@ mod tests {
IngressResponseEnvelope {
target_node: eq(node_id),
inner: pat!(ingress::InvocationResponse {
invocation_id: some(eq(invocation_id)),
request_id: eq(request_id_3),
response: eq(IngressResponseResult::Success(
invocation_target.clone(),
response_bytes.clone()
invocation_id: some(eq(invocation_id)),
response: eq(IngressResponseResult::Failure(
WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR
))
})
}
Expand Down Expand Up @@ -1919,16 +1935,16 @@ mod tests {
})))
);

// Sending a new request will be completed immediately
// Sending another attach will be completed immediately
let actions = state_machine
.apply(Command::Invoke(ServiceInvocation {
invocation_id,
invocation_target: invocation_target.clone(),
response_sink: Some(ServiceInvocationResponseSink::Ingress {
.apply(Command::AttachInvocation(AttachInvocationRequest {
invocation_query: InvocationQuery::Workflow(
invocation_target.as_keyed_service_id().unwrap(),
),
response_sink: ServiceInvocationResponseSink::Ingress {
node_id,
request_id: request_id_3,
}),
..ServiceInvocation::mock()
},
}))
.await;
assert_that!(
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/local-development.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Optionally, you can install [just](https://github.com/casey/just) to make use of
To setup these on Fedora, run:

```
sudo dnf install clang lld lldb libcxx cmake openssl-devel rocksdb-devel protobuf-compiler just
sudo dnf install clang lld lldb libcxx cmake openssl-devel rocksdb-devel protobuf-compiler just liburing-devel
```

On MacOS, you can use [homebrew](https://brew.sh)
Expand Down

0 comments on commit 6a45fed

Please sign in to comment.