From d13fff370f965370db4c07a5600e6513c175d4b1 Mon Sep 17 00:00:00 2001 From: Jeff Schoner Date: Sun, 6 Aug 2023 12:42:30 -0700 Subject: [PATCH 01/12] Correct uspert -> upsert --- spec/unit/lib/temporal/workflow/state_manager_spec.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spec/unit/lib/temporal/workflow/state_manager_spec.rb b/spec/unit/lib/temporal/workflow/state_manager_spec.rb index ce193bb1..44b14479 100644 --- a/spec/unit/lib/temporal/workflow/state_manager_spec.rb +++ b/spec/unit/lib/temporal/workflow/state_manager_spec.rb @@ -62,7 +62,7 @@ class MyWorkflow < Temporal::Workflow; end let(:upsert_search_attribute_event_1) do Fabricate(:api_upsert_search_attributes_event, search_attributes: upserted_attributes_1) end - let(:usperted_attributes_2) do + let(:upserted_attributes_2) do { 'CustomAttribute3' => 'bar', 'CustomAttribute4' => 10 @@ -71,7 +71,7 @@ class MyWorkflow < Temporal::Workflow; end let(:upsert_search_attribute_event_2) do Fabricate(:api_upsert_search_attributes_event, event_id: 4, - search_attributes: usperted_attributes_2) + search_attributes: upserted_attributes_2) end let(:upsert_empty_search_attributes_event) do Fabricate(:api_upsert_search_attributes_event, search_attributes: {}) @@ -138,7 +138,7 @@ class MyWorkflow < Temporal::Workflow; end window_2 = Temporal::Workflow::History::Window.new window_2.add(Temporal::Workflow::History::Event.new(upsert_search_attribute_event_2)) - command_2 = Temporal::Workflow::Command::UpsertSearchAttributes.new(search_attributes: usperted_attributes_2) + command_2 = Temporal::Workflow::Command::UpsertSearchAttributes.new(search_attributes: upserted_attributes_2) state_manager.schedule(command_2) state_manager.apply(window_2) From 5ef544af53b4183c3b31eadf9e114c849a6f9963 Mon Sep 17 00:00:00 2001 From: Jeff Schoner Date: Fri, 4 Aug 2023 17:10:08 -0700 Subject: [PATCH 02/12] Signals first ordering w/ config, flags --- CHANGELOG.md | 4 ++ lib/temporal/configuration.rb | 10 ++- lib/temporal/connection/grpc.rb | 8 ++- lib/temporal/errors.rb | 3 + lib/temporal/version.rb | 2 +- lib/temporal/workflow/executor.rb | 7 ++- lib/temporal/workflow/history/window.rb | 13 ++-- lib/temporal/workflow/sdk_flags.rb | 10 +++ lib/temporal/workflow/state_manager.rb | 63 ++++++++++++++++--- lib/temporal/workflow/task_processor.rb | 11 ++-- spec/unit/lib/temporal/grpc_spec.rb | 5 +- .../lib/temporal/workflow/executor_spec.rb | 5 +- .../temporal/workflow/state_manager_spec.rb | 8 +-- .../temporal/workflow/task_processor_spec.rb | 16 ++++- 14 files changed, 134 insertions(+), 31 deletions(-) create mode 100644 lib/temporal/workflow/sdk_flags.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index e604aca7..b4f755bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,3 +2,7 @@ ## 0.0.1 - First release + +## 0.1.0 +- Introduces signal first ordering. See comments in configuration.rb about how to safely roll this out if you +use signals in any of your workflows. diff --git a/lib/temporal/configuration.rb b/lib/temporal/configuration.rb index df26315b..9f791747 100644 --- a/lib/temporal/configuration.rb +++ b/lib/temporal/configuration.rb @@ -17,7 +17,7 @@ class Configuration attr_reader :timeouts, :error_handlers attr_accessor :connection_type, :converter, :use_error_serialization_v2, :host, :port, :credentials, :identity, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :header_propagators, - :payload_codec + :payload_codec, :legacy_signals # See https://docs.temporal.io/blog/activity-timeouts/ for general docs. # We want an infinite execution timeout for cron schedules and other perpetual workflows. @@ -82,6 +82,14 @@ def initialize @identity = nil @search_attributes = {} @header_propagators = [] + + # Signals previously were incorrectly replayed in order within a workflow task, rather than + # at the beginning. Correcting this changes the determinism of any workflow with signals. This + # flag exists to preserve this legacy behavior while rolling out the new order. When adopting + # the first version of the library with this mode, set this to true, fully deploy your worker, + # then do a second deployment with this flag returned to the default value of false. New use + # cases should simply leave this as the default. + @legacy_signals = false end def on_error(&block) diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 092faf96..28ae379f 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -221,7 +221,7 @@ def respond_query_task_completed(namespace:, task_token:, query_result:) client.respond_query_task_completed(request) end - def respond_workflow_task_completed(namespace:, task_token:, commands:, binary_checksum:, query_results: {}) + def respond_workflow_task_completed(namespace:, task_token:, commands:, binary_checksum:, new_sdk_flags:, query_results: {}) request = Temporalio::Api::WorkflowService::V1::RespondWorkflowTaskCompletedRequest.new( namespace: namespace, identity: identity, @@ -231,6 +231,12 @@ def respond_workflow_task_completed(namespace:, task_token:, commands:, binary_c binary_checksum: binary_checksum ) + if new_sdk_flags.any? + request.sdk_metadata = Temporalio::Api::Sdk::V1::WorkflowTaskCompletedMetadata.new( + lang_used_flags: new_sdk_flags.to_a + ) + end + client.respond_workflow_task_completed(request) end diff --git a/lib/temporal/errors.rb b/lib/temporal/errors.rb index 7aa11405..72a5d9db 100644 --- a/lib/temporal/errors.rb +++ b/lib/temporal/errors.rb @@ -9,6 +9,9 @@ class InternalError < Error; end # a non-deterministic workflow implementation or the gem's bug class NonDeterministicWorkflowError < InternalError; end + # Indicates a workflow task was encountered that used an unknown SDK flag + class UnknownSDKFlagError < InternalError; end + # Superclass for misconfiguration/misuse on the client (user) side class ClientError < Error; end diff --git a/lib/temporal/version.rb b/lib/temporal/version.rb index dde4f73c..87781382 100644 --- a/lib/temporal/version.rb +++ b/lib/temporal/version.rb @@ -1,3 +1,3 @@ module Temporal - VERSION = '0.0.3'.freeze + VERSION = '0.1.0'.freeze end diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index b11aa328..4ebfc3e5 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -11,16 +11,19 @@ module Temporal class Workflow class Executor + RunResult = Struct.new(:commands, :new_sdk_flags, keyword_init: true) + # @param workflow_class [Class] # @param history [Workflow::History] # @param task_metadata [Metadata::WorkflowTask] # @param config [Configuration] # @param track_stack_trace [Boolean] + # @return [RunResult] def initialize(workflow_class, history, task_metadata, config, track_stack_trace, middleware_chain) @workflow_class = workflow_class @dispatcher = Dispatcher.new @query_registry = QueryRegistry.new - @state_manager = StateManager.new(dispatcher) + @state_manager = StateManager.new(dispatcher, config) @history = history @task_metadata = task_metadata @config = config @@ -39,7 +42,7 @@ def run state_manager.apply(window) end - return state_manager.commands + RunResult.new(commands: state_manager.commands, new_sdk_flags: state_manager.new_sdk_flags) end # Process queries using the pre-registered query handlers diff --git a/lib/temporal/workflow/history/window.rb b/lib/temporal/workflow/history/window.rb index 944c8d25..d4c5d178 100644 --- a/lib/temporal/workflow/history/window.rb +++ b/lib/temporal/workflow/history/window.rb @@ -1,15 +1,17 @@ +require 'temporal/workflow/sdk_flags' + module Temporal class Workflow class History class Window - attr_reader :local_time, :last_event_id, :events, :markers + attr_reader :local_time, :last_event_id, :events, :sdk_flags def initialize @local_time = nil @last_event_id = nil @events = [] - @markers = [] @replay = false + @sdk_flags = Set.new end def replay? @@ -18,8 +20,6 @@ def replay? def add(event) case event.type - when 'MARKER_RECORDED' - markers << event when 'WORKFLOW_TASK_STARTED' @last_event_id = event.id + 1 # one for completed @local_time = event.timestamp @@ -28,6 +28,11 @@ def add(event) @local_time = nil when 'WORKFLOW_TASK_COMPLETED' @replay = true + used_flags = Set.new(event.attributes&.sdk_metadata&.lang_used_flags) + unknown_flags = used_flags.difference(SDKFlags::ALL) + raise Temporal::UnknownSDKFlagError, "Unknown SDK flags: #{unknown_flags.join(',')}" if unknown_flags.any? + + used_flags.each { |flag| sdk_flags.add(flag) } when 'WORKFLOW_TASK_SCHEDULED' # no-op else diff --git a/lib/temporal/workflow/sdk_flags.rb b/lib/temporal/workflow/sdk_flags.rb new file mode 100644 index 00000000..49aa792c --- /dev/null +++ b/lib/temporal/workflow/sdk_flags.rb @@ -0,0 +1,10 @@ +module Temporal + class Workflow + module SDKFlags + HANDLE_SIGNALS_FIRST = 1 + + # Make sure to include all known flags here + ALL = Set.new([HANDLE_SIGNALS_FIRST]) + end + end +end diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index 6fbf8983..ae2d9354 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -5,6 +5,7 @@ require 'temporal/workflow/history/event_target' require 'temporal/concerns/payloads' require 'temporal/workflow/errors' +require 'temporal/workflow/sdk_flags' require 'temporal/workflow/signal' module Temporal @@ -18,9 +19,9 @@ class StateManager class UnsupportedEvent < Temporal::InternalError; end class UnsupportedMarkerType < Temporal::InternalError; end - attr_reader :commands, :local_time, :search_attributes + attr_reader :commands, :local_time, :search_attributes, :new_sdk_flags - def initialize(dispatcher) + def initialize(dispatcher, config) @dispatcher = dispatcher @commands = [] @marker_ids = Set.new @@ -31,6 +32,13 @@ def initialize(dispatcher) @local_time = nil @replay = false @search_attributes = {} + @config = config + + # Current flags in use, built up from workflow task completed history entries + @sdk_flags = Set.new + + # New flags used when not replaying + @new_sdk_flags = Set.new end def replay? @@ -83,20 +91,61 @@ def apply(history_window) @replay = history_window.replay? @local_time = history_window.local_time @last_event_id = history_window.last_event_id + history_window.sdk_flags.each { |flag| sdk_flags.add(flag) } - # handle markers first since their data is needed for processing events - history_window.markers.each do |event| + order_events(history_window.events).each do |event| apply_event(event) end + end - history_window.events.each do |event| - apply_event(event) + def self.event_order(event, signals_first) + if event.type == 'MARKER_RECORDED' + # markers always come first + 0 + elsif event.type == 'WORKFLOW_EXECUTION_STARTED' + # This always comes next if present + 1 + elsif signals_first && signal_event?(event) + # signals come next if we are in signals first mode + 2 + else + # then everything else + 3 end end + def self.signal_event?(event) + event.type == 'WORKFLOW_EXECUTION_SIGNALED' + end + private - attr_reader :dispatcher, :command_tracker, :marker_ids, :side_effects, :releases + attr_reader :dispatcher, :command_tracker, :marker_ids, :side_effects, :releases, :sdk_flags + + def order_events(raw_events) + signals_first = + # If signals were handled first when this task or a previous one in this run were first + # played, we must continue to do so in order to ensure determinism. The configuration + # value can be ignored. + sdk_flags.include?(SDKFlags::HANDLE_SIGNALS_FIRST) || + # If this is being played for the first time, use the configuration flag to choose + (!replay? && !@config.legacy_signals) + + # Only add the flag when it's used and not already present + if !replay? && + signals_first && + raw_events.any? { |event| StateManager.signal_event?(event) } && + !sdk_flags.include?(SDKFlags::HANDLE_SIGNALS_FIRST) && + !new_sdk_flags.include?(SDKFlags::HANDLE_SIGNALS_FIRST) + new_sdk_flags << SDKFlags::HANDLE_SIGNALS_FIRST + sdk_flags << SDKFlags::HANDLE_SIGNALS_FIRST + end + + # sort_by is not stable, so include index for sort then remove + raw_events.sort_by.with_index do |event, index| + [StateManager.event_order(event, signals_first), index] + end + end def next_event_id @last_event_id += 1 diff --git a/lib/temporal/workflow/task_processor.rb b/lib/temporal/workflow/task_processor.rb index dbd14d05..cd0cb8bd 100644 --- a/lib/temporal/workflow/task_processor.rb +++ b/lib/temporal/workflow/task_processor.rb @@ -56,7 +56,7 @@ def process # TODO: For sticky workflows we need to cache the Executor instance executor = Workflow::Executor.new(workflow_class, history, metadata, config, track_stack_trace, workflow_middleware_chain) - commands = middleware_chain.invoke(metadata) do + run_result = middleware_chain.invoke(metadata) do executor.run end @@ -65,7 +65,7 @@ def process if legacy_query_task? complete_query(query_results[LEGACY_QUERY_KEY]) else - complete_task(commands, query_results) + complete_task(run_result, query_results) end rescue StandardError => error Temporal::ErrorHandler.handle(error, config, metadata: metadata) @@ -125,15 +125,16 @@ def parse_queries end end - def complete_task(commands, query_results) + def complete_task(run_result, query_results) Temporal.logger.info("Workflow task completed", metadata.to_h) connection.respond_workflow_task_completed( namespace: namespace, task_token: task_token, - commands: commands, + commands: run_result.commands, binary_checksum: binary_checksum, - query_results: query_results + query_results: query_results, + new_sdk_flags: run_result.new_sdk_flags ) end diff --git a/spec/unit/lib/temporal/grpc_spec.rb b/spec/unit/lib/temporal/grpc_spec.rb index ce11e251..9f16ebde 100644 --- a/spec/unit/lib/temporal/grpc_spec.rb +++ b/spec/unit/lib/temporal/grpc_spec.rb @@ -588,7 +588,8 @@ class TestDeserializer task_token: task_token, commands: [], query_results: query_results, - binary_checksum: binary_checksum + binary_checksum: binary_checksum, + new_sdk_flags: [1] ) expect(grpc_stub).to have_received(:respond_workflow_task_completed) do |request| @@ -612,6 +613,8 @@ class TestDeserializer Temporalio::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_FAILED) ) expect(request.query_results['2'].error_message).to eq('Test query failure') + + expect(request.sdk_metadata.lang_used_flags).to eq([1]) end end end diff --git a/spec/unit/lib/temporal/workflow/executor_spec.rb b/spec/unit/lib/temporal/workflow/executor_spec.rb index 47f10b86..1bc03457 100644 --- a/spec/unit/lib/temporal/workflow/executor_spec.rb +++ b/spec/unit/lib/temporal/workflow/executor_spec.rb @@ -46,9 +46,10 @@ def execute it 'returns a complete workflow decision' do decisions = subject.run - expect(decisions.length).to eq(1) + expect(decisions.commands.length).to eq(1) + expect(decisions.new_sdk_flags.length).to eq(0) - decision_id, decision = decisions.first + decision_id, decision = decisions.commands.first expect(decision_id).to eq(history.events.length + 1) expect(decision).to be_an_instance_of(Temporal::Workflow::Command::CompleteWorkflow) expect(decision.result).to eq('test') diff --git a/spec/unit/lib/temporal/workflow/state_manager_spec.rb b/spec/unit/lib/temporal/workflow/state_manager_spec.rb index 44b14479..c11f77f1 100644 --- a/spec/unit/lib/temporal/workflow/state_manager_spec.rb +++ b/spec/unit/lib/temporal/workflow/state_manager_spec.rb @@ -24,7 +24,7 @@ class MyWorkflow < Temporal::Workflow; end ), ].each do |terminal_command| it "fails to validate if #{terminal_command.class} is not the last command scheduled" do - state_manager = described_class.new(Temporal::Workflow::Dispatcher.new) + state_manager = described_class.new(Temporal::Workflow::Dispatcher.new, Temporal::Configuration.new) next_command = Temporal::Workflow::Command::RecordMarker.new( name: Temporal::Workflow::StateManager::RELEASE_MARKER, @@ -78,7 +78,7 @@ class MyWorkflow < Temporal::Workflow; end end it 'initial merges with upserted' do - state_manager = described_class.new(Temporal::Workflow::Dispatcher.new) + state_manager = described_class.new(Temporal::Workflow::Dispatcher.new, Temporal::Configuration.new) window = Temporal::Workflow::History::Window.new window.add(Temporal::Workflow::History::Event.new(start_workflow_execution_event)) @@ -106,7 +106,7 @@ class MyWorkflow < Temporal::Workflow; end end it 'initial and upsert treated as empty hash' do - state_manager = described_class.new(Temporal::Workflow::Dispatcher.new) + state_manager = described_class.new(Temporal::Workflow::Dispatcher.new, Temporal::Configuration.new) window = Temporal::Workflow::History::Window.new window.add(Temporal::Workflow::History::Event.new(start_workflow_execution_event_no_search_attributes)) @@ -123,7 +123,7 @@ class MyWorkflow < Temporal::Workflow; end it 'multiple upserts merge' do - state_manager = described_class.new(Temporal::Workflow::Dispatcher.new) + state_manager = described_class.new(Temporal::Workflow::Dispatcher.new, Temporal::Configuration.new) window_1 = Temporal::Workflow::History::Window.new window_1.add(Temporal::Workflow::History::Event.new(workflow_task_started_event)) diff --git a/spec/unit/lib/temporal/workflow/task_processor_spec.rb b/spec/unit/lib/temporal/workflow/task_processor_spec.rb index e20c1721..75aad3f3 100644 --- a/spec/unit/lib/temporal/workflow/task_processor_spec.rb +++ b/spec/unit/lib/temporal/workflow/task_processor_spec.rb @@ -79,11 +79,13 @@ let(:workflow_class) { double('Temporal::Workflow', execute_in_context: nil) } let(:executor) { double('Temporal::Workflow::Executor') } let(:commands) { double('commands') } + let(:new_sdk_flags) { double('new_sdk_flags') } + let(:run_result) { Temporal::Workflow::Executor::RunResult.new(commands: commands, new_sdk_flags: new_sdk_flags) } before do allow(lookup).to receive(:find).with(workflow_name).and_return(workflow_class) allow(Temporal::Workflow::Executor).to receive(:new).and_return(executor) - allow(executor).to receive(:run) { workflow_class.execute_in_context(context, input); commands } + allow(executor).to receive(:run) { workflow_class.execute_in_context(context, input) }.and_return(run_result) allow(executor).to receive(:process_queries) end @@ -130,7 +132,8 @@ task_token: task.task_token, commands: commands, binary_checksum: binary_checksum, - query_results: { query_id => query_result } + query_results: { query_id => query_result }, + new_sdk_flags: new_sdk_flags ) end end @@ -167,7 +170,14 @@ expect(connection).to_not have_received(:respond_query_task_completed) expect(connection) .to have_received(:respond_workflow_task_completed) - .with(namespace: namespace, task_token: task.task_token, commands: commands, query_results: nil, binary_checksum: binary_checksum) + .with( + namespace: namespace, + task_token: task.task_token, + commands: commands, + query_results: nil, + binary_checksum: binary_checksum, + new_sdk_flags: new_sdk_flags + ) end it 'ignores connection exception' do From f9bc27240060bfabf98ecc9769e541da963e366e Mon Sep 17 00:00:00 2001 From: Jeff Schoner Date: Sun, 6 Aug 2023 13:56:57 -0700 Subject: [PATCH 03/12] Tests and fabricators --- .../grpc/history_event_fabricator.rb | 26 ++ .../lib/temporal/workflow/executor_spec.rb | 82 ++++-- .../lib/temporal/workflow/history_spec.rb | 7 - .../temporal/workflow/state_manager_spec.rb | 243 ++++++++++++++++++ 4 files changed, 325 insertions(+), 33 deletions(-) delete mode 100644 spec/unit/lib/temporal/workflow/history_spec.rb diff --git a/spec/fabricators/grpc/history_event_fabricator.rb b/spec/fabricators/grpc/history_event_fabricator.rb index 6f043b4d..e4e67579 100644 --- a/spec/fabricators/grpc/history_event_fabricator.rb +++ b/spec/fabricators/grpc/history_event_fabricator.rb @@ -73,6 +73,7 @@ class TestSerializer end Fabricator(:api_workflow_task_completed_event, from: :api_history_event) do + transient :sdk_flags event_type { Temporalio::Api::Enums::V1::EventType::EVENT_TYPE_WORKFLOW_TASK_COMPLETED } workflow_task_completed_event_attributes do |attrs| Temporalio::Api::History::V1::WorkflowTaskCompletedEventAttributes.new( @@ -80,6 +81,9 @@ class TestSerializer started_event_id: attrs[:event_id] - 1, identity: 'test-worker@test-host', binary_checksum: 'v1.0.0', + sdk_metadata: Temporalio::Api::Sdk::V1::WorkflowTaskCompletedMetadata.new( + lang_used_flags: attrs[:sdk_flags] || [] + ) ) end end @@ -199,3 +203,25 @@ class TestSerializer ) end end + +Fabricator(:api_marker_recorded_event, from: :api_history_event) do + event_type { Temporalio::Api::Enums::V1::EventType::EVENT_TYPE_MARKER_RECORDED } + marker_recorded_event_attributes do |attrs| + Temporalio::Api::History::V1::MarkerRecordedEventAttributes.new( + workflow_task_completed_event_id: attrs[:event_id] - 1, + marker_name: 'SIDE_EFFECT', + details: to_payload_map({}) + ) + end +end + +Fabricator(:api_workflow_execution_signaled_event, from: :api_history_event) do + event_type { Temporalio::Api::Enums::V1::EventType::EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED } + workflow_execution_signaled_event_attributes do + Temporalio::Api::History::V1::WorkflowExecutionSignaledEventAttributes.new( + signal_name: 'a_signal', + input: nil, + identity: 'test-worker@test-host' + ) + end +end diff --git a/spec/unit/lib/temporal/workflow/executor_spec.rb b/spec/unit/lib/temporal/workflow/executor_spec.rb index 1bc03457..51d039ad 100644 --- a/spec/unit/lib/temporal/workflow/executor_spec.rb +++ b/spec/unit/lib/temporal/workflow/executor_spec.rb @@ -11,11 +11,11 @@ let(:workflow_started_event) { Fabricate(:api_workflow_execution_started_event, event_id: 1) } let(:history) do Temporal::Workflow::History.new([ - workflow_started_event, - Fabricate(:api_workflow_task_scheduled_event, event_id: 2), - Fabricate(:api_workflow_task_started_event, event_id: 3), - Fabricate(:api_workflow_task_completed_event, event_id: 4) - ]) + workflow_started_event, + Fabricate(:api_workflow_task_scheduled_event, event_id: 2), + Fabricate(:api_workflow_task_started_event, event_id: 3), + Fabricate(:api_workflow_task_completed_event, event_id: 4) + ]) end let(:workflow) { TestWorkflow } let(:workflow_metadata) { Fabricate(:workflow_metadata) } @@ -37,17 +37,17 @@ def execute expect(workflow) .to have_received(:execute_in_context) - .with( - an_instance_of(Temporal::Workflow::Context), - nil - ) + .with( + an_instance_of(Temporal::Workflow::Context), + nil + ) end it 'returns a complete workflow decision' do decisions = subject.run expect(decisions.commands.length).to eq(1) - expect(decisions.new_sdk_flags.length).to eq(0) + expect(decisions.new_sdk_flags).to be_empty decision_id, decision = decisions.commands.first expect(decision_id).to eq(history.events.length + 1) @@ -55,15 +55,45 @@ def execute expect(decision.result).to eq('test') end + context 'history with signal' do + let(:history) do + Temporal::Workflow::History.new([ + workflow_started_event, + Fabricate(:api_workflow_execution_signaled_event, event_id: 2), + Fabricate(:api_workflow_task_scheduled_event, event_id: 3), + Fabricate(:api_workflow_task_started_event, event_id: 4) + ]) + end + + context 'signals first config enabled' do + it 'set signals first sdk flag' do + decisions = subject.run + + expect(decisions.commands.length).to eq(1) + expect(decisions.new_sdk_flags).to eq(Set.new([Temporal::Workflow::SDKFlags::HANDLE_SIGNALS_FIRST])) + end + end + + context 'signals first config disabled' do + let(:config) { Temporal::Configuration.new.tap { |c| c.legacy_signals = true } } + it 'no sdk flag' do + decisions = subject.run + + expect(decisions.commands.length).to eq(1) + expect(decisions.new_sdk_flags).to be_empty + end + end + end + it 'generates workflow metadata' do allow(Temporal::Metadata::Workflow).to receive(:new) payload = Temporalio::Api::Common::V1::Payload.new( metadata: { 'encoding' => 'json/plain' }, data: '"bar"'.b ) - header = + header = Google::Protobuf::Map.new(:string, :message, Temporalio::Api::Common::V1::Payload, { 'Foo' => payload }) - workflow_started_event.workflow_execution_started_event_attributes.header = + workflow_started_event.workflow_execution_started_event_attributes.header = Fabricate(:api_header, fields: header) subject.run @@ -71,19 +101,19 @@ def execute event_attributes = workflow_started_event.workflow_execution_started_event_attributes expect(Temporal::Metadata::Workflow) .to have_received(:new) - .with( - namespace: workflow_metadata.namespace, - id: workflow_metadata.workflow_id, - name: event_attributes.workflow_type.name, - run_id: event_attributes.original_execution_run_id, - parent_id: nil, - parent_run_id: nil, - attempt: event_attributes.attempt, - task_queue: event_attributes.task_queue.name, - headers: {'Foo' => 'bar'}, - run_started_at: workflow_started_event.event_time.to_time, - memo: {}, - ) + .with( + namespace: workflow_metadata.namespace, + id: workflow_metadata.workflow_id, + name: event_attributes.workflow_type.name, + run_id: event_attributes.original_execution_run_id, + parent_id: nil, + parent_run_id: nil, + attempt: event_attributes.attempt, + task_queue: event_attributes.task_queue.name, + headers: { 'Foo' => 'bar' }, + run_started_at: workflow_started_event.event_time.to_time, + memo: {} + ) end end @@ -95,7 +125,7 @@ def execute { '1' => Temporal::Workflow::TaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'success')), '2' => Temporal::Workflow::TaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'failure')), - '3' => Temporal::Workflow::TaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'unknown')), + '3' => Temporal::Workflow::TaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'unknown')) } end diff --git a/spec/unit/lib/temporal/workflow/history_spec.rb b/spec/unit/lib/temporal/workflow/history_spec.rb deleted file mode 100644 index 8a058fc1..00000000 --- a/spec/unit/lib/temporal/workflow/history_spec.rb +++ /dev/null @@ -1,7 +0,0 @@ -require 'temporal/workflow/history' - -describe Temporal::Workflow::History do - describe '#next_window' do - - end -end diff --git a/spec/unit/lib/temporal/workflow/state_manager_spec.rb b/spec/unit/lib/temporal/workflow/state_manager_spec.rb index c11f77f1..a6e512b4 100644 --- a/spec/unit/lib/temporal/workflow/state_manager_spec.rb +++ b/spec/unit/lib/temporal/workflow/state_manager_spec.rb @@ -2,6 +2,7 @@ require 'temporal/workflow/dispatcher' require 'temporal/workflow/history/event' require 'temporal/workflow/history/window' +require 'temporal/workflow/signal' require 'temporal/workflow/state_manager' require 'temporal/errors' @@ -39,6 +40,248 @@ class MyWorkflow < Temporal::Workflow; end end end + describe '#apply' do + let(:dispatcher) { Temporal::Workflow::Dispatcher.new } + let(:state_manager) do + Temporal::Workflow::StateManager.new(dispatcher, config) + end + let(:config) { Temporal::Configuration.new } + + context 'workflow execution started' do + let(:history) do + Temporal::Workflow::History.new([Fabricate(:api_workflow_execution_started_event, event_id: 1)]) + end + + it 'dispatcher invoked for start' do + expect(dispatcher).to receive(:dispatch).with( + Temporal::Workflow::History::EventTarget.workflow, 'started', instance_of(Array) + ).once + state_manager.apply(history.next_window) + end + end + + context 'workflow execution started with signal' do + let(:signal_entry) { Fabricate(:api_workflow_execution_signaled_event, event_id: 2) } + let(:history) do + Temporal::Workflow::History.new( + [ + Fabricate(:api_workflow_execution_started_event, event_id: 1), + signal_entry + ] + ) + end + + it 'dispatcher invoked for start' do + # While markers do come before the workflow execution started event, signals do not + expect(dispatcher).to receive(:dispatch).with( + Temporal::Workflow::History::EventTarget.workflow, 'started', instance_of(Array) + ).once.ordered + expect(dispatcher).to receive(:dispatch).with( + Temporal::Workflow::Signal.new(signal_entry.workflow_execution_signaled_event_attributes.signal_name), + 'signaled', + [ + signal_entry.workflow_execution_signaled_event_attributes.signal_name, + signal_entry.workflow_execution_signaled_event_attributes.input + ] + ).once.ordered + + state_manager.apply(history.next_window) + end + end + + context 'with a marker' do + let(:activity_entry) { Fabricate(:api_activity_task_scheduled_event, event_id: 5) } + let(:marker_entry) { Fabricate(:api_marker_recorded_event, event_id: 8) } + let(:history) do + Temporal::Workflow::History.new( + [ + Fabricate(:api_workflow_execution_started_event, event_id: 1), + Fabricate(:api_workflow_task_scheduled_event, event_id: 2), + Fabricate(:api_workflow_task_started_event, event_id: 3), + Fabricate(:api_workflow_task_completed_event, event_id: 4), + activity_entry, + Fabricate(:api_activity_task_started_event, event_id: 6), + Fabricate(:api_activity_task_completed_event, event_id: 7), + marker_entry, + Fabricate(:api_workflow_task_scheduled_event, event_id: 9), + Fabricate(:api_workflow_task_started_event, event_id: 10), + Fabricate(:api_workflow_task_completed_event, event_id: 11) + ] + ) + end + + it 'marker handled first' do + activity_target = nil + dispatcher.register_handler(Temporal::Workflow::History::EventTarget.workflow, 'started') do + activity_target, = state_manager.schedule( + Temporal::Workflow::Command::ScheduleActivity.new( + activity_id: activity_entry.event_id, + activity_type: activity_entry.activity_task_scheduled_event_attributes.activity_type, + input: nil, + task_queue: activity_entry.activity_task_scheduled_event_attributes.task_queue, + retry_policy: nil, + timeouts: nil, + headers: nil + ) + ) + end + + # First task: starts workflow execution, schedules an activity + state_manager.apply(history.next_window) + + expect(activity_target).not_to be_nil + + activity_completed = false + dispatcher.register_handler(activity_target, 'completed') do + activity_completed = true + state_manager.schedule( + Temporal::Workflow::Command::RecordMarker.new( + name: marker_entry.marker_recorded_event_attributes.marker_name, + details: to_payload_map({}) + ) + ) + + # Activity completed event comes before marker recorded event in history, but + # when activity completion is handled, the marker has already been handled. + expect(state_manager.send(:marker_ids).count).to eq(1) + end + + # Second task: Handles activity completion, records marker + state_manager.apply(history.next_window) + + expect(activity_completed).to eq(true) + end + end + + def test_order(signal_first) + activity_target = nil + signaled = false + + dispatcher.register_handler(Temporal::Workflow::History::EventTarget.workflow, 'started') do + activity_target, = state_manager.schedule( + Temporal::Workflow::Command::ScheduleActivity.new( + activity_id: activity_entry.event_id, + activity_type: activity_entry.activity_task_scheduled_event_attributes.activity_type, + input: nil, + task_queue: activity_entry.activity_task_scheduled_event_attributes.task_queue, + retry_policy: nil, + timeouts: nil, + headers: nil + ) + ) + end + + dispatcher.register_handler( + Temporal::Workflow::Signal.new( + signal_entry.workflow_execution_signaled_event_attributes.signal_name + ), + 'signaled' + ) do + signaled = true + end + + # First task: starts workflow execution, schedules an activity + state_manager.apply(history.next_window) + + expect(activity_target).not_to be_nil + expect(signaled).to eq(false) + + activity_completed = false + dispatcher.register_handler(activity_target, 'completed') do + activity_completed = true + + expect(signaled).to eq(signal_first) + end + + # Second task: Handles activity completion, signal + state_manager.apply(history.next_window) + + expect(activity_completed).to eq(true) + expect(signaled).to eq(true) + end + + context 'replaying with a signal' do + let(:activity_entry) { Fabricate(:api_activity_task_scheduled_event, event_id: 5) } + let(:signal_entry) { Fabricate(:api_workflow_execution_signaled_event, event_id: 8) } + let(:signal_handling_task) { Fabricate(:api_workflow_task_completed_event, event_id: 11) } + let(:history) do + Temporal::Workflow::History.new( + [ + Fabricate(:api_workflow_execution_started_event, event_id: 1), + Fabricate(:api_workflow_task_scheduled_event, event_id: 2), + Fabricate(:api_workflow_task_started_event, event_id: 3), + Fabricate(:api_workflow_task_completed_event, event_id: 4), + activity_entry, + Fabricate(:api_activity_task_started_event, event_id: 6), + Fabricate(:api_activity_task_completed_event, event_id: 7), + signal_entry, + Fabricate(:api_workflow_task_scheduled_event, event_id: 9), + Fabricate(:api_workflow_task_started_event, event_id: 10), + signal_handling_task + ] + ) + end + + context 'no SDK flag' do + it 'signal inline' do + test_order(false) + end + end + + context 'with SDK flag' do + let(:signal_handling_task) do + Fabricate( + :api_workflow_task_completed_event, + event_id: 11, + sdk_flags: [Temporal::Workflow::SDKFlags::HANDLE_SIGNALS_FIRST] + ) + end + it 'signal first' do + test_order(true) + end + end + end + + context 'not replaying with a signal' do + let(:activity_entry) { Fabricate(:api_activity_task_scheduled_event, event_id: 5) } + let(:signal_entry) { Fabricate(:api_workflow_execution_signaled_event, event_id: 8) } + let(:history) do + Temporal::Workflow::History.new( + [ + Fabricate(:api_workflow_execution_started_event, event_id: 1), + Fabricate(:api_workflow_task_scheduled_event, event_id: 2), + Fabricate(:api_workflow_task_started_event, event_id: 3), + Fabricate(:api_workflow_task_completed_event, event_id: 4), + activity_entry, + Fabricate(:api_activity_task_started_event, event_id: 6), + Fabricate(:api_activity_task_completed_event, event_id: 7), + signal_entry, + Fabricate(:api_workflow_task_scheduled_event, event_id: 9) + ] + ) + end + + context 'signals first config disabled' do + let(:config) { Temporal::Configuration.new.tap { |c| c.legacy_signals = true } } + it 'signal inline' do + test_order(false) + + expect(state_manager.new_sdk_flags).to be_empty + end + end + + context 'signals first with default config' do + let(:config) { Temporal::Configuration.new } + + it 'signal first' do + test_order(true) + + expect(state_manager.new_sdk_flags).to eq(Set.new([Temporal::Workflow::SDKFlags::HANDLE_SIGNALS_FIRST])) + end + end + end + end + describe '#search_attributes' do let(:initial_search_attributes) do { From 38eb4ce8ba89c0ff90e7dd7ebbc5628e66b0fc8a Mon Sep 17 00:00:00 2001 From: Jeff Schoner Date: Sun, 6 Aug 2023 14:20:35 -0700 Subject: [PATCH 04/12] rubyfmt substantially modified files --- lib/temporal/connection/grpc.rb | 90 +++++++++---------- lib/temporal/workflow/executor.rb | 7 +- lib/temporal/workflow/state_manager.rb | 50 +++++------ lib/temporal/workflow/task_processor.rb | 46 +++++----- .../grpc/history_event_fabricator.rb | 6 +- .../temporal/workflow/state_manager_spec.rb | 20 ++--- .../temporal/workflow/task_processor_spec.rb | 14 ++- 7 files changed, 115 insertions(+), 118 deletions(-) diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 28ae379f..4716536c 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -21,7 +21,7 @@ class GRPC HISTORY_EVENT_FILTER = { all: Temporalio::Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, - close: Temporalio::Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT, + close: Temporalio::Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT }.freeze QUERY_REJECT_CONDITION = { @@ -37,7 +37,7 @@ class GRPC double: Temporalio::Api::Enums::V1::IndexedValueType::INDEXED_VALUE_TYPE_DOUBLE, bool: Temporalio::Api::Enums::V1::IndexedValueType::INDEXED_VALUE_TYPE_BOOL, datetime: Temporalio::Api::Enums::V1::IndexedValueType::INDEXED_VALUE_TYPE_DATETIME, - keyword_list: Temporalio::Api::Enums::V1::IndexedValueType::INDEXED_VALUE_TYPE_KEYWORD_LIST, + keyword_list: Temporalio::Api::Enums::V1::IndexedValueType::INDEXED_VALUE_TYPE_KEYWORD_LIST }.freeze INDEXED_VALUE_TYPE_TO_SYMBOL = SYMBOL_TO_INDEXED_VALUE_TYPE.map do |symbol, int_value| @@ -46,7 +46,7 @@ class GRPC SYMBOL_TO_RESET_REAPPLY_TYPE = { signal: Temporalio::Api::Enums::V1::ResetReapplyType::RESET_REAPPLY_TYPE_SIGNAL, - none: Temporalio::Api::Enums::V1::ResetReapplyType::RESET_REAPPLY_TYPE_NONE, + none: Temporalio::Api::Enums::V1::ResetReapplyType::RESET_REAPPLY_TYPE_NONE } DEFAULT_OPTIONS = { @@ -73,7 +73,7 @@ def register_namespace(name:, description: nil, is_global: false, retention_peri workflow_execution_retention_period: Google::Protobuf::Duration.new( seconds: (retention_period * 24 * 60 * 60).to_i ), - data: data, + data: data ) client.register_namespace(request) rescue ::GRPC::AlreadyExists => e @@ -85,8 +85,9 @@ def describe_namespace(name:) client.describe_namespace(request) end - def list_namespaces(page_size:, next_page_token: "") - request = Temporalio::Api::WorkflowService::V1::ListNamespacesRequest.new(page_size: page_size, next_page_token: next_page_token) + def list_namespaces(page_size:, next_page_token: '') + request = Temporalio::Api::WorkflowService::V1::ListNamespacesRequest.new(page_size: page_size, + next_page_token: next_page_token) client.list_namespaces(request) end @@ -110,10 +111,7 @@ def start_workflow_execution( workflow_id:, workflow_name:, task_queue:, - input: nil, - execution_timeout:, - run_timeout:, - task_timeout:, + execution_timeout:, run_timeout:, task_timeout:, input: nil, workflow_id_reuse_policy: nil, headers: nil, cron_schedule: nil, @@ -145,7 +143,7 @@ def start_workflow_execution( ), search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new( indexed_fields: to_payload_map_without_codec(search_attributes || {}) - ), + ) ) client.start_workflow_execution(request) @@ -169,11 +167,10 @@ def get_workflow_execution_history( if wait_for_new_event if timeout.nil? # This is an internal error. Wrappers should enforce this. - raise "You must specify a timeout when wait_for_new_event = true." + raise 'You must specify a timeout when wait_for_new_event = true.' elsif timeout > SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL - raise ClientError.new( - "You may not specify a timeout of more than #{SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL} seconds, got: #{timeout}." - ) + raise ClientError, + "You may not specify a timeout of more than #{SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL} seconds, got: #{timeout}." end end request = Temporalio::Api::WorkflowService::V1::GetWorkflowExecutionHistoryRequest.new( @@ -202,6 +199,7 @@ def poll_workflow_task_queue(namespace:, task_queue:, binary_checksum:) poll_mutex.synchronize do return unless can_poll? + @poll_request = client.poll_workflow_task_queue(request, return_op: true) end @@ -215,7 +213,7 @@ def respond_query_task_completed(namespace:, task_token:, query_result:) namespace: namespace, completed_type: query_result_proto.result_type, query_result: query_result_proto.answer, - error_message: query_result_proto.error_message, + error_message: query_result_proto.error_message ) client.respond_query_task_completed(request) @@ -228,15 +226,12 @@ def respond_workflow_task_completed(namespace:, task_token:, commands:, binary_c task_token: task_token, commands: Array(commands).map { |(_, command)| Serializer.serialize(command) }, query_results: query_results.transform_values { |value| Serializer.serialize(value) }, - binary_checksum: binary_checksum + binary_checksum: binary_checksum, + sdk_metadata: if new_sdk_flags.any? + Temporalio::Api::Sdk::V1::WorkflowTaskCompletedMetadata.new(lang_used_flags: new_sdk_flags.to_a) + end ) - if new_sdk_flags.any? - request.sdk_metadata = Temporalio::Api::Sdk::V1::WorkflowTaskCompletedMetadata.new( - lang_used_flags: new_sdk_flags.to_a - ) - end - client.respond_workflow_task_completed(request) end @@ -263,6 +258,7 @@ def poll_activity_task_queue(namespace:, task_queue:) poll_mutex.synchronize do return unless can_poll? + @poll_request = client.poll_activity_task_queue(request, return_op: true) end @@ -288,7 +284,7 @@ def respond_activity_task_completed(namespace:, task_token:, result:) namespace: namespace, identity: identity, task_token: task_token, - result: to_result_payloads(result), + result: to_result_payloads(result) ) client.respond_activity_task_completed(request) end @@ -365,27 +361,22 @@ def signal_with_start_workflow_execution( workflow_id:, workflow_name:, task_queue:, - input: nil, - execution_timeout:, - run_timeout:, - task_timeout:, + execution_timeout:, run_timeout:, task_timeout:, signal_name:, signal_input:, input: nil, workflow_id_reuse_policy: nil, headers: nil, cron_schedule: nil, - signal_name:, - signal_input:, memo: nil, search_attributes: nil ) proto_header_fields = if headers.nil? - to_payload_map({}) - elsif headers.class == Hash - to_payload_map(headers) - else - # Preserve backward compatability for headers specified using proto objects - warn '[DEPRECATION] Specify headers using a hash rather than protobuf objects' - headers - end + to_payload_map({}) + elsif headers.instance_of?(Hash) + to_payload_map(headers) + else + # Preserve backward compatability for headers specified using proto objects + warn '[DEPRECATION] Specify headers using a hash rather than protobuf objects' + headers + end request = Temporalio::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionRequest.new( identity: identity, @@ -404,7 +395,7 @@ def signal_with_start_workflow_execution( workflow_task_timeout: task_timeout, request_id: SecureRandom.uuid, header: Temporalio::Api::Common::V1::Header.new( - fields: proto_header_fields, + fields: proto_header_fields ), cron_schedule: cron_schedule, signal_name: signal_name, @@ -414,7 +405,7 @@ def signal_with_start_workflow_execution( ), search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new( indexed_fields: to_payload_map_without_codec(search_attributes || {}) - ), + ) ) client.signal_with_start_workflow_execution(request) @@ -425,7 +416,7 @@ def reset_workflow_execution(namespace:, workflow_id:, run_id:, reason:, workflo namespace: namespace, workflow_execution: Temporalio::Api::Common::V1::WorkflowExecution.new( workflow_id: workflow_id, - run_id: run_id, + run_id: run_id ), reason: reason, workflow_task_finish_event_id: workflow_task_event_id, @@ -454,7 +445,7 @@ def terminate_workflow_execution( namespace: namespace, workflow_execution: Temporalio::Api::Common::V1::WorkflowExecution.new( workflow_id: workflow_id, - run_id: run_id, + run_id: run_id ), reason: reason, details: to_details_payloads(details) @@ -518,9 +509,8 @@ def add_custom_search_attributes(attributes, namespace) attributes.each_value do |symbol_type| next if SYMBOL_TO_INDEXED_VALUE_TYPE.include?(symbol_type) - raise Temporal::InvalidSearchAttributeTypeFailure.new( - "Cannot add search attributes (#{attributes}): unknown search attribute type :#{symbol_type}, supported types: #{SYMBOL_TO_INDEXED_VALUE_TYPE.keys}" - ) + raise Temporal::InvalidSearchAttributeTypeFailure, + "Cannot add search attributes (#{attributes}): unknown search attribute type :#{symbol_type}, supported types: #{SYMBOL_TO_INDEXED_VALUE_TYPE.keys}" end request = Temporalio::Api::OperatorService::V1::AddSearchAttributesRequest.new( @@ -530,12 +520,12 @@ def add_custom_search_attributes(attributes, namespace) begin operator_client.add_search_attributes(request) rescue ::GRPC::AlreadyExists => e - raise Temporal::SearchAttributeAlreadyExistsFailure.new(e) + raise Temporal::SearchAttributeAlreadyExistsFailure, e rescue ::GRPC::Internal => e # The internal workflow that adds search attributes can fail for a variety of reasons such # as recreating a removed attribute with a new type. Wrap these all up into a fall through # exception. - raise Temporal::SearchAttributeFailure.new(e) + raise Temporal::SearchAttributeFailure, e end end @@ -555,7 +545,7 @@ def remove_custom_search_attributes(attribute_names, namespace) begin operator_client.remove_search_attributes(request) rescue ::GRPC::NotFound => e - raise Temporal::NotFoundFailure.new(e) + raise Temporal::NotFoundFailure, e end end @@ -637,7 +627,7 @@ def client url, credentials, timeout: CONNECTION_TIMEOUT_SECONDS, - interceptors: [ ClientNameVersionInterceptor.new() ] + interceptors: [ClientNameVersionInterceptor.new] ) end @@ -646,7 +636,7 @@ def operator_client url, credentials, timeout: CONNECTION_TIMEOUT_SECONDS, - interceptors: [ ClientNameVersionInterceptor.new() ] + interceptors: [ClientNameVersionInterceptor.new] ) end diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index 4ebfc3e5..77c11d7d 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -66,13 +66,14 @@ def process_query(query) result = query_registry.handle(query.query_type, query.query_args) QueryResult.answer(result) - rescue StandardError => error - QueryResult.failure(error) + rescue StandardError => e + QueryResult.failure(e) end def execute_workflow(input, workflow_started_event) metadata = Metadata.generate_workflow_metadata(workflow_started_event, task_metadata) - context = Workflow::Context.new(state_manager, dispatcher, workflow_class, metadata, config, query_registry, track_stack_trace) + context = Workflow::Context.new(state_manager, dispatcher, workflow_class, metadata, config, query_registry, + track_stack_trace) Fiber.new do middleware_chain.invoke(metadata) do diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index ae2d9354..392a445a 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -49,9 +49,7 @@ def schedule(command) # Fast-forward event IDs to skip all the markers (version markers can # be removed, so we can't rely on them being scheduled during a replay) command_id = next_event_id - while marker_ids.include?(command_id) do - command_id = next_event_id - end + command_id = next_event_id while marker_ids.include?(command_id) cancelation_id = case command @@ -74,7 +72,7 @@ def schedule(command) validate_append_command(command) commands << [command_id, command] - return [event_target_from(command_id, command), cancelation_id] + [event_target_from(command_id, command), cancelation_id] end def release?(release_name) @@ -153,22 +151,21 @@ def next_event_id def validate_append_command(command) return if commands.last.nil? + _, previous_command = commands.last case previous_command when Command::CompleteWorkflow, Command::FailWorkflow, Command::ContinueAsNew context_string = case previous_command - when Command::CompleteWorkflow - "The workflow completed" - when Command::FailWorkflow - "The workflow failed" - when Command::ContinueAsNew - "The workflow continued as new" - end - raise Temporal::WorkflowAlreadyCompletingError.new( - "You cannot do anything in a Workflow after it completes. #{context_string}, "\ + when Command::CompleteWorkflow + 'The workflow completed' + when Command::FailWorkflow + 'The workflow failed' + when Command::ContinueAsNew + 'The workflow continued as new' + end + raise Temporal::WorkflowAlreadyCompletingError, "You cannot do anything in a Workflow after it completes. #{context_string}, "\ "but then it sent a new command: #{command.class}. This can happen, for example, if you've "\ - "not waited for all of your Activity futures before finishing the Workflow." - ) + 'not waited for all of your Activity futures before finishing the Workflow.' end end @@ -187,7 +184,7 @@ def apply_event(event) History::EventTarget.workflow, 'started', from_payloads(event.attributes.input), - event, + event ) when 'WORKFLOW_EXECUTION_COMPLETED' @@ -227,7 +224,8 @@ def apply_event(event) when 'ACTIVITY_TASK_FAILED' state_machine.fail - dispatch(history_target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure, ActivityException)) + dispatch(history_target, 'failed', + Temporal::Workflow::Errors.generate_error(event.attributes.failure, ActivityException)) when 'ACTIVITY_TASK_TIMED_OUT' state_machine.time_out @@ -244,7 +242,8 @@ def apply_event(event) when 'ACTIVITY_TASK_CANCELED' state_machine.cancel - dispatch(history_target, 'failed', Temporal::ActivityCanceled.new(from_details_payloads(event.attributes.details))) + dispatch(history_target, 'failed', + Temporal::ActivityCanceled.new(from_details_payloads(event.attributes.details))) when 'TIMER_STARTED' state_machine.start @@ -286,7 +285,8 @@ def apply_event(event) when 'WORKFLOW_EXECUTION_SIGNALED' # relies on Signal#== for matching in Dispatcher signal_target = Signal.new(event.attributes.signal_name) - dispatch(signal_target, 'signaled', event.attributes.signal_name, from_signal_payloads(event.attributes.input)) + dispatch(signal_target, 'signaled', event.attributes.signal_name, + from_signal_payloads(event.attributes.input)) when 'WORKFLOW_EXECUTION_TERMINATED' # todo @@ -323,7 +323,8 @@ def apply_event(event) when 'CHILD_WORKFLOW_EXECUTION_TIMED_OUT' state_machine.time_out - dispatch(history_target, 'failed', ChildWorkflowTimeoutError.new('The child workflow timed out before succeeding')) + dispatch(history_target, 'failed', + ChildWorkflowTimeoutError.new('The child workflow timed out before succeeding')) when 'CHILD_WORKFLOW_EXECUTION_TERMINATED' state_machine.terminated @@ -396,16 +397,16 @@ def discard_command(history_target) # Pop the first command from the list, it is expected to match replay_command_id, replay_command = commands.shift - if !replay_command_id + unless replay_command_id raise NonDeterministicWorkflowError, - "A command in the history of previous executions, #{history_target}, was not scheduled upon replay. " + NONDETERMINISM_ERROR_SUGGESTION + "A command in the history of previous executions, #{history_target}, was not scheduled upon replay. " + NONDETERMINISM_ERROR_SUGGESTION end replay_target = event_target_from(replay_command_id, replay_command) if history_target != replay_target raise NonDeterministicWorkflowError, - "Unexpected command. The replaying code is issuing: #{replay_target}, "\ - "but the history of previous executions recorded: #{history_target}. " + NONDETERMINISM_ERROR_SUGGESTION + "Unexpected command. The replaying code is issuing: #{replay_target}, "\ + "but the history of previous executions recorded: #{history_target}. " + NONDETERMINISM_ERROR_SUGGESTION end end @@ -431,7 +432,6 @@ def track_release(release_name) schedule(Command::RecordMarker.new(name: RELEASE_MARKER, details: release_name)) end end - end end end diff --git a/lib/temporal/workflow/task_processor.rb b/lib/temporal/workflow/task_processor.rb index cd0cb8bd..e11a284a 100644 --- a/lib/temporal/workflow/task_processor.rb +++ b/lib/temporal/workflow/task_processor.rb @@ -40,12 +40,11 @@ def initialize(task, namespace, workflow_lookup, middleware_chain, workflow_midd def process start_time = Time.now - Temporal.logger.debug("Processing Workflow task", metadata.to_h) - Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_QUEUE_TIME, queue_time_ms, workflow: workflow_name, namespace: namespace) + Temporal.logger.debug('Processing Workflow task', metadata.to_h) + Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_QUEUE_TIME, queue_time_ms, workflow: workflow_name, + namespace: namespace) - if !workflow_class - raise Temporal::WorkflowNotRegistered, 'Workflow is not registered with this worker' - end + raise Temporal::WorkflowNotRegistered, 'Workflow is not registered with this worker' unless workflow_class history = fetch_full_history queries = parse_queries @@ -54,7 +53,8 @@ def process track_stack_trace = queries.values.map(&:query_type).include?(StackTraceTracker::STACK_TRACE_QUERY_NAME) # TODO: For sticky workflows we need to cache the Executor instance - executor = Workflow::Executor.new(workflow_class, history, metadata, config, track_stack_trace, workflow_middleware_chain) + executor = Workflow::Executor.new(workflow_class, history, metadata, config, track_stack_trace, + workflow_middleware_chain) run_result = middleware_chain.invoke(metadata) do executor.run @@ -67,20 +67,21 @@ def process else complete_task(run_result, query_results) end - rescue StandardError => error - Temporal::ErrorHandler.handle(error, config, metadata: metadata) + rescue StandardError => e + Temporal::ErrorHandler.handle(e, config, metadata: metadata) - fail_task(error) + fail_task(e) ensure time_diff_ms = ((Time.now - start_time) * 1000).round - Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_LATENCY, time_diff_ms, workflow: workflow_name, namespace: namespace) - Temporal.logger.debug("Workflow task processed", metadata.to_h.merge(execution_time: time_diff_ms)) + Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_TASK_LATENCY, time_diff_ms, workflow: workflow_name, + namespace: namespace) + Temporal.logger.debug('Workflow task processed', metadata.to_h.merge(execution_time: time_diff_ms)) end private attr_reader :task, :namespace, :task_token, :workflow_name, :workflow_class, - :middleware_chain, :workflow_middleware_chain, :metadata, :config, :binary_checksum + :middleware_chain, :workflow_middleware_chain, :metadata, :config, :binary_checksum def connection @connection ||= Temporal::Connection.generate(config.for_connection) @@ -95,7 +96,7 @@ def queue_time_ms def fetch_full_history events = task.history.events.to_a next_page_token = task.next_page_token - while !next_page_token.empty? do + until next_page_token.empty? response = connection.get_workflow_execution_history( namespace: namespace, workflow_id: task.workflow_execution.workflow_id, @@ -126,7 +127,7 @@ def parse_queries end def complete_task(run_result, query_results) - Temporal.logger.info("Workflow task completed", metadata.to_h) + Temporal.logger.info('Workflow task completed', metadata.to_h) connection.respond_workflow_task_completed( namespace: namespace, @@ -139,21 +140,22 @@ def complete_task(run_result, query_results) end def complete_query(result) - Temporal.logger.info("Workflow Query task completed", metadata.to_h) + Temporal.logger.info('Workflow Query task completed', metadata.to_h) connection.respond_query_task_completed( namespace: namespace, task_token: task_token, query_result: result ) - rescue StandardError => error - Temporal.logger.error("Unable to complete a query", metadata.to_h.merge(error: error.inspect)) + rescue StandardError => e + Temporal.logger.error('Unable to complete a query', metadata.to_h.merge(error: e.inspect)) - Temporal::ErrorHandler.handle(error, config, metadata: metadata) + Temporal::ErrorHandler.handle(e, config, metadata: metadata) end def fail_task(error) - Temporal.metrics.increment(Temporal::MetricKeys::WORKFLOW_TASK_EXECUTION_FAILED, workflow: workflow_name, namespace: namespace) + Temporal.metrics.increment(Temporal::MetricKeys::WORKFLOW_TASK_EXECUTION_FAILED, workflow: workflow_name, + namespace: namespace) Temporal.logger.error('Workflow task failed', metadata.to_h.merge(error: error.inspect)) Temporal.logger.debug(error.backtrace.join("\n")) @@ -169,10 +171,10 @@ def fail_task(error) exception: error, binary_checksum: binary_checksum ) - rescue StandardError => error - Temporal.logger.error("Unable to fail Workflow task", metadata.to_h.merge(error: error.inspect)) + rescue StandardError => e + Temporal.logger.error('Unable to fail Workflow task', metadata.to_h.merge(error: e.inspect)) - Temporal::ErrorHandler.handle(error, config, metadata: metadata) + Temporal::ErrorHandler.handle(e, config, metadata: metadata) end end end diff --git a/spec/fabricators/grpc/history_event_fabricator.rb b/spec/fabricators/grpc/history_event_fabricator.rb index e4e67579..f2102671 100644 --- a/spec/fabricators/grpc/history_event_fabricator.rb +++ b/spec/fabricators/grpc/history_event_fabricator.rb @@ -52,7 +52,7 @@ class TestSerializer Fabricator(:api_workflow_task_scheduled_event, from: :api_history_event) do event_type { Temporalio::Api::Enums::V1::EventType::EVENT_TYPE_WORKFLOW_TASK_SCHEDULED } - workflow_task_scheduled_event_attributes do |attrs| + workflow_task_scheduled_event_attributes do |_attrs| Temporalio::Api::History::V1::WorkflowTaskScheduledEventAttributes.new( task_queue: Fabricate(:api_task_queue), start_to_close_timeout: 15, @@ -127,7 +127,7 @@ class TestSerializer event_type { Temporalio::Api::Enums::V1::EventType::EVENT_TYPE_ACTIVITY_TASK_FAILED } activity_task_failed_event_attributes do |attrs| Temporalio::Api::History::V1::ActivityTaskFailedEventAttributes.new( - failure: Temporalio::Api::Failure::V1::Failure.new(message: "Activity failed"), + failure: Temporalio::Api::Failure::V1::Failure.new(message: 'Activity failed'), scheduled_event_id: attrs[:event_id] - 2, started_event_id: attrs[:event_id] - 1, identity: 'test-worker@test-host' @@ -152,7 +152,7 @@ class TestSerializer activity_task_cancel_requested_event_attributes do |attrs| Temporalio::Api::History::V1::ActivityTaskCancelRequestedEventAttributes.new( scheduled_event_id: attrs[:event_id] - 1, - workflow_task_completed_event_id: attrs[:event_id] - 2, + workflow_task_completed_event_id: attrs[:event_id] - 2 ) end end diff --git a/spec/unit/lib/temporal/workflow/state_manager_spec.rb b/spec/unit/lib/temporal/workflow/state_manager_spec.rb index a6e512b4..1f3670a9 100644 --- a/spec/unit/lib/temporal/workflow/state_manager_spec.rb +++ b/spec/unit/lib/temporal/workflow/state_manager_spec.rb @@ -7,7 +7,6 @@ require 'temporal/errors' describe Temporal::Workflow::StateManager do - describe '#schedule' do class MyWorkflow < Temporal::Workflow; end @@ -15,21 +14,21 @@ class MyWorkflow < Temporal::Workflow; end [ Temporal::Workflow::Command::ContinueAsNew.new( workflow_type: MyWorkflow, - task_queue: 'dummy', + task_queue: 'dummy' ), Temporal::Workflow::Command::FailWorkflow.new( - exception: StandardError.new('dummy'), + exception: StandardError.new('dummy') ), Temporal::Workflow::Command::CompleteWorkflow.new( - result: 5, - ), + result: 5 + ) ].each do |terminal_command| it "fails to validate if #{terminal_command.class} is not the last command scheduled" do state_manager = described_class.new(Temporal::Workflow::Dispatcher.new, Temporal::Configuration.new) next_command = Temporal::Workflow::Command::RecordMarker.new( name: Temporal::Workflow::StateManager::RELEASE_MARKER, - details: 'dummy', + details: 'dummy' ) state_manager.schedule(terminal_command) @@ -313,8 +312,8 @@ def test_order(signal_first) end let(:upsert_search_attribute_event_2) do Fabricate(:api_upsert_search_attributes_event, - event_id: 4, - search_attributes: upserted_attributes_2) + event_id: 4, + search_attributes: upserted_attributes_2) end let(:upsert_empty_search_attributes_event) do Fabricate(:api_upsert_search_attributes_event, search_attributes: {}) @@ -343,7 +342,7 @@ def test_order(signal_first) { 'CustomAttribute1' => 42, # from initial (not overridden) 'CustomAttribute2' => 8, # only from upsert - 'CustomAttribute3' => 'foo', # overridden by upsert + 'CustomAttribute3' => 'foo' # overridden by upsert } ) end @@ -364,7 +363,6 @@ def test_order(signal_first) expect(state_manager.search_attributes).to eq({}) end - it 'multiple upserts merge' do state_manager = described_class.new(Temporal::Workflow::Dispatcher.new, Temporal::Configuration.new) @@ -389,7 +387,7 @@ def test_order(signal_first) { 'CustomAttribute2' => 8, 'CustomAttribute3' => 'bar', - 'CustomAttribute4' => 10, + 'CustomAttribute4' => 10 } ) end diff --git a/spec/unit/lib/temporal/workflow/task_processor_spec.rb b/spec/unit/lib/temporal/workflow/task_processor_spec.rb index 75aad3f3..bdd80662 100644 --- a/spec/unit/lib/temporal/workflow/task_processor_spec.rb +++ b/spec/unit/lib/temporal/workflow/task_processor_spec.rb @@ -4,13 +4,17 @@ require 'temporal/workflow/task_processor' describe Temporal::Workflow::TaskProcessor do - subject { described_class.new(task, namespace, lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum) } + subject do + described_class.new(task, namespace, lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum) + end let(:namespace) { 'test-namespace' } let(:lookup) { instance_double('Temporal::ExecutableLookup', find: nil) } let(:query) { nil } let(:queries) { nil } - let(:task) { Fabricate(:api_workflow_task, { workflow_type: api_workflow_type, query: query, queries: queries }.compact) } + let(:task) do + Fabricate(:api_workflow_task, { workflow_type: api_workflow_type, query: query, queries: queries }.compact) + end let(:api_workflow_type) { Fabricate(:api_workflow_type, name: workflow_name) } let(:workflow_name) { 'TestWorkflow' } let(:connection) { instance_double('Temporal::Connection::GRPC') } @@ -90,7 +94,7 @@ end context 'when workflow task completes' do - # Note: This is a bit of a pointless test because I short circuit this with stubs. + # NOTE: This is a bit of a pointless test because I short circuit this with stubs. # The code does not drop down into the state machine and so forth. it 'runs the specified task' do subject.process @@ -379,7 +383,9 @@ context 'when a page has no events' do let(:page_two) { 'page-2' } let(:page_three) { 'page-3' } - let(:first_history_response) { Fabricate(:workflow_execution_history, events: [event], _next_page_token: page_two) } + let(:first_history_response) do + Fabricate(:workflow_execution_history, events: [event], _next_page_token: page_two) + end let(:empty_history_response) do Fabricate(:workflow_execution_history, events: [], _next_page_token: page_three) From da1770074ed745af625319ff6b60bd6766953cea Mon Sep 17 00:00:00 2001 From: Jeff Schoner Date: Fri, 4 Aug 2023 10:59:48 -0700 Subject: [PATCH 05/12] Integration test for signal fix --- examples/bin/worker | 1 + examples/spec/integration/signal_spec.rb | 40 ++++++++++++++++++++++++ examples/workflows/signal_workflow.rb | 12 +++++++ 3 files changed, 53 insertions(+) create mode 100644 examples/spec/integration/signal_spec.rb create mode 100644 examples/workflows/signal_workflow.rb diff --git a/examples/bin/worker b/examples/bin/worker index cead588d..16633673 100755 --- a/examples/bin/worker +++ b/examples/bin/worker @@ -64,6 +64,7 @@ worker.register_workflow(SendSignalToExternalWorkflow) worker.register_workflow(SerialHelloWorldWorkflow) worker.register_workflow(SideEffectWorkflow) worker.register_workflow(SignalWithStartWorkflow) +worker.register_workflow(SignalWorkflow) worker.register_workflow(SimpleTimerWorkflow) worker.register_workflow(SlowChildWorkflow) worker.register_workflow(StartChildWorkflowWorkflow) diff --git a/examples/spec/integration/signal_spec.rb b/examples/spec/integration/signal_spec.rb new file mode 100644 index 00000000..4789219a --- /dev/null +++ b/examples/spec/integration/signal_spec.rb @@ -0,0 +1,40 @@ +require 'securerandom' +require 'workflows/signal_workflow' + +describe 'signal' do + it 'all signals process' do + workflow_id = SecureRandom.uuid + expected_score = 7 + run_id = Temporal.start_workflow( + SignalWorkflow, + 1, # seconds + options: { + workflow_id: workflow_id, + signal_name: 'score', + signal_input: expected_score, + timeouts: { execution: 10 } + } + ) + + loop do + value = SecureRandom.rand(10) + + begin + Temporal.signal_workflow(SignalWorkflow, 'score', workflow_id, run_id, value) + rescue StandardError + # Keep going until there's an error such as the workflow finishing + break + end + expected_score += value + sleep 0.01 + end + + result = Temporal.await_workflow_result( + SignalWorkflow, + workflow_id: workflow_id, + run_id: run_id + ) + + expect(result).to eq(expected_score) + end +end diff --git a/examples/workflows/signal_workflow.rb b/examples/workflows/signal_workflow.rb new file mode 100644 index 00000000..d665533d --- /dev/null +++ b/examples/workflows/signal_workflow.rb @@ -0,0 +1,12 @@ +class SignalWorkflow < Temporal::Workflow + def execute(sleep_for) + score = 0 + workflow.on_signal('score') do |signal_value| + score += signal_value + end + + workflow.sleep(sleep_for) + + score + end +end From 12efc83b2dd0d80ac70b5067bae004a853de7828 Mon Sep 17 00:00:00 2001 From: Jeff Schoner Date: Sun, 6 Aug 2023 19:49:18 -0700 Subject: [PATCH 06/12] Check for supported server version for signals first ordering --- lib/temporal/connection/grpc.rb | 4 ++ lib/temporal/errors.rb | 6 +++ lib/temporal/worker.rb | 25 +++++++++ lib/temporal/workflow/poller.rb | 25 +++++---- spec/unit/lib/temporal/worker_spec.rb | 75 +++++++++++++++++++++++++-- 5 files changed, 121 insertions(+), 14 deletions(-) diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 4716536c..f230a305 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -618,6 +618,10 @@ def cancel_polling_request end end + def get_system_info + client.get_system_info(Temporalio::Api::WorkflowService::V1::GetSystemInfoRequest.new) + end + private attr_reader :url, :identity, :credentials, :options, :poll_mutex, :poll_request diff --git a/lib/temporal/errors.rb b/lib/temporal/errors.rb index 72a5d9db..3fc2cb38 100644 --- a/lib/temporal/errors.rb +++ b/lib/temporal/errors.rb @@ -12,6 +12,12 @@ class NonDeterministicWorkflowError < InternalError; end # Indicates a workflow task was encountered that used an unknown SDK flag class UnknownSDKFlagError < InternalError; end + # Indicates the worker has connected to a Temporal server that does not + # support SDK metadata, and therefore only legacy signal mode can be used. + # Set the .legacy_signals option in your Temporal::Configuration or upgrade + # Temporal server to 1.20 or newer. + class SDKMetadatNotSupportedError < InternalError; end + # Superclass for misconfiguration/misuse on the client (user) side class ClientError < Error; end diff --git a/lib/temporal/worker.rb b/lib/temporal/worker.rb index 2a232458..a94d43f8 100644 --- a/lib/temporal/worker.rb +++ b/lib/temporal/worker.rb @@ -1,3 +1,4 @@ +require 'temporal/errors' require 'temporal/workflow/poller' require 'temporal/activity/poller' require 'temporal/execution_options' @@ -105,6 +106,10 @@ def add_activity_middleware(middleware_class, *args) end def start + if workflows.any? + check_signals_first_support + end + @start_stop_mutex.synchronize do return if shutting_down? # Handle the case where stop method grabbed the mutex first @@ -177,5 +182,25 @@ def trap_signals end end + def check_signals_first_support + if config.legacy_signals + Temporal.logger.debug('Running in legacy signals mode') + return + end + + connection = Temporal::Connection.generate(config.for_connection) + system_info = connection.get_system_info + Temporal.logger.debug("Connected to Temporal server running version #{system_info.server_version}") + + if system_info&.capabilities&.sdk_metadata + Temporal.logger.debug('Running in signals first mode. Server supports SDK metadata.') + return + end + + raise SDKMetadatNotSupportedError, + 'Signals first ordering requires a Temporal server that supports SDK metadata. Set ' \ + 'Temporal::Configuration.legacy_signals to true or upgrade to Temporal server 1.20 ' \ + 'or newer.' + end end end diff --git a/lib/temporal/workflow/poller.rb b/lib/temporal/workflow/poller.rb index 07162ce1..0ec5aaba 100644 --- a/lib/temporal/workflow/poller.rb +++ b/lib/temporal/workflow/poller.rb @@ -41,8 +41,8 @@ def cancel_pending_requests end def wait - if !shutting_down? - raise "Workflow poller waiting for shutdown completion without being in shutting_down state!" + unless shutting_down? + raise 'Workflow poller waiting for shutdown completion without being in shutting_down state!' end thread.join @@ -51,7 +51,8 @@ def wait private - attr_reader :namespace, :task_queue, :connection, :workflow_lookup, :config, :middleware, :workflow_middleware, :options, :thread + attr_reader :namespace, :task_queue, :connection, :workflow_lookup, :config, :middleware, :workflow_middleware, + :options, :thread def connection @connection ||= Temporal::Connection.generate(config.for_connection) @@ -71,8 +72,9 @@ def poll_loop return if shutting_down? time_diff_ms = ((Time.now - last_poll_time) * 1000).round - Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_POLLER_TIME_SINCE_LAST_POLL, time_diff_ms, metrics_tags) - Temporal.logger.debug("Polling workflow task queue", { namespace: namespace, task_queue: task_queue }) + Temporal.metrics.timing(Temporal::MetricKeys::WORKFLOW_POLLER_TIME_SINCE_LAST_POLL, time_diff_ms, + metrics_tags) + Temporal.logger.debug('Polling workflow task queue', { namespace: namespace, task_queue: task_queue }) task = poll_for_task last_poll_time = Time.now @@ -89,13 +91,15 @@ def poll_loop end def poll_for_task - connection.poll_workflow_task_queue(namespace: namespace, task_queue: task_queue, binary_checksum: binary_checksum) + connection.poll_workflow_task_queue(namespace: namespace, task_queue: task_queue, + binary_checksum: binary_checksum) rescue ::GRPC::Cancelled # We're shutting down and we've already reported that in the logs nil - rescue StandardError => error - Temporal.logger.error("Unable to poll Workflow task queue", { namespace: namespace, task_queue: task_queue, error: error.inspect }) - Temporal::ErrorHandler.handle(error, config) + rescue StandardError => e + Temporal.logger.error('Unable to poll Workflow task queue', + { namespace: namespace, task_queue: task_queue, error: e.inspect }) + Temporal::ErrorHandler.handle(e, config) sleep(poll_retry_seconds) @@ -106,7 +110,8 @@ def process(task) middleware_chain = Middleware::Chain.new(middleware) workflow_middleware_chain = Middleware::Chain.new(workflow_middleware) - TaskProcessor.new(task, namespace, workflow_lookup, middleware_chain, workflow_middleware_chain, config, binary_checksum).process + TaskProcessor.new(task, namespace, workflow_lookup, middleware_chain, workflow_middleware_chain, config, + binary_checksum).process end def thread_pool diff --git a/spec/unit/lib/temporal/worker_spec.rb b/spec/unit/lib/temporal/worker_spec.rb index f8a74b21..37c6c3d8 100644 --- a/spec/unit/lib/temporal/worker_spec.rb +++ b/spec/unit/lib/temporal/worker_spec.rb @@ -6,6 +6,19 @@ describe Temporal::Worker do subject { described_class.new(config) } let(:config) { Temporal::Configuration.new } + let(:connection) { instance_double('Temporal::Connection::GRPC') } + let(:sdk_metadata_enabled) { true } + before do + allow(Temporal::Connection).to receive(:generate).and_return(connection) + allow(connection).to receive(:get_system_info).and_return( + Temporalio::Api::WorkflowService::V1::GetSystemInfoResponse.new( + server_version: 'test', + capabilities: Temporalio::Api::WorkflowService::V1::GetSystemInfoResponse::Capabilities.new( + sdk_metadata: sdk_metadata_enabled + ) + ) + ) + end class TestWorkerWorkflow < Temporal::Workflow namespace 'default-namespace' @@ -211,7 +224,11 @@ def start_and_stop(worker) stopped = true } - thread = Thread.new {worker.start} + thread = Thread.new do + Thread.current.abort_on_exception = true + worker.start + end + while !stopped sleep(THREAD_SYNC_DELAY) end @@ -236,6 +253,58 @@ def start_and_stop(worker) let(:activity_poller_1) { instance_double(Temporal::Activity::Poller, start: nil, stop_polling: nil, cancel_pending_requests: nil, wait: nil) } let(:activity_poller_2) { instance_double(Temporal::Activity::Poller, start: nil, stop_polling: nil, cancel_pending_requests: nil, wait: nil) } + context 'no SDK metadata support' do + let(:sdk_metadata_enabled) { false } + it 'fails' do + allow(Temporal::Workflow::Poller) + .to receive(:new) + .with( + 'default-namespace', + 'default-task-queue', + an_instance_of(Temporal::ExecutableLookup), + config, + [], + [], + thread_pool_size: 10, + binary_checksum: nil, + poll_retry_seconds: 0 + ) + .and_return(workflow_poller_1) + subject.register_workflow(TestWorkerWorkflow) + + expect do + subject.start + end.to raise_error(Temporal::SDKMetadatNotSupportedError) + + expect(workflow_poller_1).not_to have_received(:start) + end + + context 'legacy signal mode' do + let(:config) { Temporal::Configuration.new.tap { |c| c.legacy_signals = true } } + it 'ok' do + allow(Temporal::Workflow::Poller) + .to receive(:new) + .with( + 'default-namespace', + 'default-task-queue', + an_instance_of(Temporal::ExecutableLookup), + config, + [], + [], + thread_pool_size: 10, + binary_checksum: nil, + poll_retry_seconds: 0 + ) + .and_return(workflow_poller_1) + subject.register_workflow(TestWorkerWorkflow) + + start_and_stop(subject) + + expect(workflow_poller_1).to have_received(:start) + end + end + end + it 'starts a poller for each namespace/task list combination' do allow(Temporal::Workflow::Poller) .to receive(:new) @@ -341,7 +410,7 @@ def start_and_stop(worker) allow(subject).to receive(:while_stopping_hook) do # This callback is within a mutex, so this new thread shouldn't # do anything until Worker.stop is complete. - Thread.new {subject.start} + Thread.new { subject.start } sleep(THREAD_SYNC_DELAY) # give it a little time to do damage if it's going to end subject.stop @@ -394,7 +463,6 @@ def start_and_stop(worker) .and_return(activity_poller) worker = Temporal::Worker.new(activity_poll_retry_seconds: 10) - worker.register_workflow(TestWorkerWorkflow) worker.register_activity(TestWorkerActivity) start_and_stop(worker) @@ -419,7 +487,6 @@ def start_and_stop(worker) worker = Temporal::Worker.new(workflow_poll_retry_seconds: 10) worker.register_workflow(TestWorkerWorkflow) - worker.register_activity(TestWorkerActivity) start_and_stop(worker) From 4af2243f00c295d9bbea09fabde39f88fa59435c Mon Sep 17 00:00:00 2001 From: Jeff Schoner Date: Tue, 8 Aug 2023 14:58:53 -0700 Subject: [PATCH 07/12] Add safe rollout instructions to change log --- CHANGELOG.md | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b4f755bd..f4247982 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,25 @@ # Changelog -## 0.0.1 -- First release - ## 0.1.0 -- Introduces signal first ordering. See comments in configuration.rb about how to safely roll this out if you -use signals in any of your workflows. + +This introduces signal first ordering. See https://github.com/coinbase/temporal-ruby/issues/258 for +details on why this is necessary for correct handling of signals. + +**IMPORTANT: ** This feature requires Temporal server 1.20.0 or newer. If you are running an older +version of the server, you must either upgrade to at least this version, or you can set the +`.legacy_signals` configuration option to true until you can upgrade. **If you run with default +settings on 1.19 or earlier, your worker will raise an error on start.** + +If you do not have existing workflows with signals running or are standing up a worker service +for the first time, you can ignore all the below instructions. + +If you have any workflows with signals running during a deployment and run more than one worker +process, you must follow these rollout steps to avoid non-determinism errors: +1. Set `.legacy_signals` in `Temporal::Configuration` to true +2. Deploy your worker +3. Remove the `.legacy_signals` setting or set it to `false` +4. Deploy your worker + +These steps ensure any workflow that executes in signals first mode will continue to be executed +in this order on replay. If you don't follow these steps, you may see failed workflow tasks, which +in some cases could result in unrecoverable history corruption. \ No newline at end of file From cd8390542d5e7e8ebace6439abc365ca0f395c40 Mon Sep 17 00:00:00 2001 From: Jeff Schoner Date: Tue, 8 Aug 2023 15:02:02 -0700 Subject: [PATCH 08/12] Clean up styling --- lib/temporal/connection/grpc.rb | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index f230a305..2517a864 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -111,7 +111,10 @@ def start_workflow_execution( workflow_id:, workflow_name:, task_queue:, - execution_timeout:, run_timeout:, task_timeout:, input: nil, + execution_timeout:, + run_timeout:, + task_timeout:, + input: nil, workflow_id_reuse_policy: nil, headers: nil, cron_schedule: nil, From d9b2a51561c9d57b60e1508e0d2d305632dcef1d Mon Sep 17 00:00:00 2001 From: Jeff Schoner Date: Tue, 8 Aug 2023 15:29:53 -0700 Subject: [PATCH 09/12] Factor out flags used function in state manager --- lib/temporal/connection/grpc.rb | 9 ++++--- lib/temporal/workflow/executor.rb | 4 +-- lib/temporal/workflow/state_manager.rb | 27 ++++++++++--------- lib/temporal/workflow/task_processor.rb | 2 +- spec/unit/lib/temporal/grpc_spec.rb | 2 +- .../lib/temporal/workflow/executor_spec.rb | 6 ++--- .../temporal/workflow/state_manager_spec.rb | 4 +-- .../temporal/workflow/task_processor_spec.rb | 10 ++++--- 8 files changed, 36 insertions(+), 28 deletions(-) diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 2517a864..3c206160 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -222,7 +222,7 @@ def respond_query_task_completed(namespace:, task_token:, query_result:) client.respond_query_task_completed(request) end - def respond_workflow_task_completed(namespace:, task_token:, commands:, binary_checksum:, new_sdk_flags:, query_results: {}) + def respond_workflow_task_completed(namespace:, task_token:, commands:, binary_checksum:, new_sdk_flags_used:, query_results: {}) request = Temporalio::Api::WorkflowService::V1::RespondWorkflowTaskCompletedRequest.new( namespace: namespace, identity: identity, @@ -230,8 +230,11 @@ def respond_workflow_task_completed(namespace:, task_token:, commands:, binary_c commands: Array(commands).map { |(_, command)| Serializer.serialize(command) }, query_results: query_results.transform_values { |value| Serializer.serialize(value) }, binary_checksum: binary_checksum, - sdk_metadata: if new_sdk_flags.any? - Temporalio::Api::Sdk::V1::WorkflowTaskCompletedMetadata.new(lang_used_flags: new_sdk_flags.to_a) + sdk_metadata: if new_sdk_flags_used.any? + Temporalio::Api::Sdk::V1::WorkflowTaskCompletedMetadata.new( + lang_used_flags: new_sdk_flags_used.to_a + ) + # else nil end ) diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index 77c11d7d..762ae250 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -11,7 +11,7 @@ module Temporal class Workflow class Executor - RunResult = Struct.new(:commands, :new_sdk_flags, keyword_init: true) + RunResult = Struct.new(:commands, :new_sdk_flags_used, keyword_init: true) # @param workflow_class [Class] # @param history [Workflow::History] @@ -42,7 +42,7 @@ def run state_manager.apply(window) end - RunResult.new(commands: state_manager.commands, new_sdk_flags: state_manager.new_sdk_flags) + RunResult.new(commands: state_manager.commands, new_sdk_flags_used: state_manager.new_sdk_flags_used) end # Process queries using the pre-registered query handlers diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index 392a445a..a713e1b9 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -19,8 +19,7 @@ class StateManager class UnsupportedEvent < Temporal::InternalError; end class UnsupportedMarkerType < Temporal::InternalError; end - attr_reader :commands, :local_time, :search_attributes, :new_sdk_flags - + attr_reader :commands, :local_time, :search_attributes, :new_sdk_flags_used def initialize(dispatcher, config) @dispatcher = dispatcher @commands = [] @@ -38,7 +37,7 @@ def initialize(dispatcher, config) @sdk_flags = Set.new # New flags used when not replaying - @new_sdk_flags = Set.new + @new_sdk_flags_used = Set.new end def replay? @@ -129,22 +128,26 @@ def order_events(raw_events) # If this is being played for the first time, use the configuration flag to choose (!replay? && !@config.legacy_signals) - # Only add the flag when it's used and not already present - if !replay? && - signals_first && - raw_events.any? { |event| StateManager.signal_event?(event) } && - !sdk_flags.include?(SDKFlags::HANDLE_SIGNALS_FIRST) && - !new_sdk_flags.include?(SDKFlags::HANDLE_SIGNALS_FIRST) - new_sdk_flags << SDKFlags::HANDLE_SIGNALS_FIRST - sdk_flags << SDKFlags::HANDLE_SIGNALS_FIRST + if signals_first && raw_events.any? { |event| StateManager.signal_event?(event) } + report_flag_used(SDKFlags::HANDLE_SIGNALS_FIRST) end - # sort_by is not stable, so include index for sort then remove raw_events.sort_by.with_index do |event, index| + # sort_by is not stable, so include index to preserve order [StateManager.event_order(event, signals_first), index] end end + def report_flag_used(flag) + # Only add the flag if it's not already present and we are not replaying + if !replay? && + !sdk_flags.include?(flag) && + !new_sdk_flags_used.include?(flag) + new_sdk_flags_used << flag + sdk_flags << flag + end + end + def next_event_id @last_event_id += 1 end diff --git a/lib/temporal/workflow/task_processor.rb b/lib/temporal/workflow/task_processor.rb index e11a284a..9b79b454 100644 --- a/lib/temporal/workflow/task_processor.rb +++ b/lib/temporal/workflow/task_processor.rb @@ -135,7 +135,7 @@ def complete_task(run_result, query_results) commands: run_result.commands, binary_checksum: binary_checksum, query_results: query_results, - new_sdk_flags: run_result.new_sdk_flags + new_sdk_flags_used: run_result.new_sdk_flags_used ) end diff --git a/spec/unit/lib/temporal/grpc_spec.rb b/spec/unit/lib/temporal/grpc_spec.rb index 9f16ebde..ee3c1fcb 100644 --- a/spec/unit/lib/temporal/grpc_spec.rb +++ b/spec/unit/lib/temporal/grpc_spec.rb @@ -589,7 +589,7 @@ class TestDeserializer commands: [], query_results: query_results, binary_checksum: binary_checksum, - new_sdk_flags: [1] + new_sdk_flags_used: [1] ) expect(grpc_stub).to have_received(:respond_workflow_task_completed) do |request| diff --git a/spec/unit/lib/temporal/workflow/executor_spec.rb b/spec/unit/lib/temporal/workflow/executor_spec.rb index 51d039ad..059fc4b3 100644 --- a/spec/unit/lib/temporal/workflow/executor_spec.rb +++ b/spec/unit/lib/temporal/workflow/executor_spec.rb @@ -47,7 +47,7 @@ def execute decisions = subject.run expect(decisions.commands.length).to eq(1) - expect(decisions.new_sdk_flags).to be_empty + expect(decisions.new_sdk_flags_used).to be_empty decision_id, decision = decisions.commands.first expect(decision_id).to eq(history.events.length + 1) @@ -70,7 +70,7 @@ def execute decisions = subject.run expect(decisions.commands.length).to eq(1) - expect(decisions.new_sdk_flags).to eq(Set.new([Temporal::Workflow::SDKFlags::HANDLE_SIGNALS_FIRST])) + expect(decisions.new_sdk_flags_used).to eq(Set.new([Temporal::Workflow::SDKFlags::HANDLE_SIGNALS_FIRST])) end end @@ -80,7 +80,7 @@ def execute decisions = subject.run expect(decisions.commands.length).to eq(1) - expect(decisions.new_sdk_flags).to be_empty + expect(decisions.new_sdk_flags_used).to be_empty end end end diff --git a/spec/unit/lib/temporal/workflow/state_manager_spec.rb b/spec/unit/lib/temporal/workflow/state_manager_spec.rb index 1f3670a9..e9d227ea 100644 --- a/spec/unit/lib/temporal/workflow/state_manager_spec.rb +++ b/spec/unit/lib/temporal/workflow/state_manager_spec.rb @@ -265,7 +265,7 @@ def test_order(signal_first) it 'signal inline' do test_order(false) - expect(state_manager.new_sdk_flags).to be_empty + expect(state_manager.new_sdk_flags_used).to be_empty end end @@ -275,7 +275,7 @@ def test_order(signal_first) it 'signal first' do test_order(true) - expect(state_manager.new_sdk_flags).to eq(Set.new([Temporal::Workflow::SDKFlags::HANDLE_SIGNALS_FIRST])) + expect(state_manager.new_sdk_flags_used).to eq(Set.new([Temporal::Workflow::SDKFlags::HANDLE_SIGNALS_FIRST])) end end end diff --git a/spec/unit/lib/temporal/workflow/task_processor_spec.rb b/spec/unit/lib/temporal/workflow/task_processor_spec.rb index bdd80662..33d5506f 100644 --- a/spec/unit/lib/temporal/workflow/task_processor_spec.rb +++ b/spec/unit/lib/temporal/workflow/task_processor_spec.rb @@ -83,8 +83,10 @@ let(:workflow_class) { double('Temporal::Workflow', execute_in_context: nil) } let(:executor) { double('Temporal::Workflow::Executor') } let(:commands) { double('commands') } - let(:new_sdk_flags) { double('new_sdk_flags') } - let(:run_result) { Temporal::Workflow::Executor::RunResult.new(commands: commands, new_sdk_flags: new_sdk_flags) } + let(:new_sdk_flags_used) { double('new_sdk_flags_used') } + let(:run_result) do + Temporal::Workflow::Executor::RunResult.new(commands: commands, new_sdk_flags_used: new_sdk_flags_used) + end before do allow(lookup).to receive(:find).with(workflow_name).and_return(workflow_class) @@ -137,7 +139,7 @@ commands: commands, binary_checksum: binary_checksum, query_results: { query_id => query_result }, - new_sdk_flags: new_sdk_flags + new_sdk_flags_used: new_sdk_flags_used ) end end @@ -180,7 +182,7 @@ commands: commands, query_results: nil, binary_checksum: binary_checksum, - new_sdk_flags: new_sdk_flags + new_sdk_flags_used: new_sdk_flags_used ) end From f174ff1812eb0bb09d684a6632f2b14d43ece2f0 Mon Sep 17 00:00:00 2001 From: Jeff Schoner Date: Tue, 22 Aug 2023 21:50:40 -0700 Subject: [PATCH 10/12] Refactor capabilities for lazy loading --- CHANGELOG.md | 3 +- lib/temporal/capabilities.rb | 30 +++++++++++ lib/temporal/configuration.rb | 21 ++++---- lib/temporal/errors.rb | 9 ++-- lib/temporal/worker.rb | 42 ++++----------- lib/temporal/workflow/state_manager.rb | 30 +++++++---- .../grpc/get_system_info_fabricator.rb | 10 ++++ spec/unit/lib/temporal/worker_spec.rb | 52 ------------------- .../lib/temporal/workflow/executor_spec.rb | 8 +++ .../temporal/workflow/state_manager_spec.rb | 21 ++++++++ 10 files changed, 113 insertions(+), 113 deletions(-) create mode 100644 lib/temporal/capabilities.rb create mode 100644 spec/fabricators/grpc/get_system_info_fabricator.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index f4247982..47d336b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,7 @@ details on why this is necessary for correct handling of signals. **IMPORTANT: ** This feature requires Temporal server 1.20.0 or newer. If you are running an older version of the server, you must either upgrade to at least this version, or you can set the -`.legacy_signals` configuration option to true until you can upgrade. **If you run with default -settings on 1.19 or earlier, your worker will raise an error on start.** +`.legacy_signals` configuration option to true until you can upgrade. If you do not have existing workflows with signals running or are standing up a worker service for the first time, you can ignore all the below instructions. diff --git a/lib/temporal/capabilities.rb b/lib/temporal/capabilities.rb new file mode 100644 index 00000000..644aac31 --- /dev/null +++ b/lib/temporal/capabilities.rb @@ -0,0 +1,30 @@ +require 'temporal/errors' + +module Temporal + class Capabilities + def initialize(config) + @config = config + @sdk_metadata = nil + end + + def sdk_metadata + set_capabilities if @sdk_metadata.nil? + + @sdk_metadata + end + + private + + def set_capabilities + connection = Temporal::Connection.generate(@config.for_connection) + system_info = connection.get_system_info + + @sdk_metadata = system_info&.capabilities&.sdk_metadata || false + + Temporal.logger.debug( + "Connected to Temporal server running version #{system_info.server_version}. " \ + "SDK Metadata supported: #{@sdk_metadata}" + ) + end + end +end diff --git a/lib/temporal/configuration.rb b/lib/temporal/configuration.rb index 9f791747..b414d76e 100644 --- a/lib/temporal/configuration.rb +++ b/lib/temporal/configuration.rb @@ -1,3 +1,4 @@ +require 'temporal/capabilities' require 'temporal/logger' require 'temporal/metrics_adapters/null' require 'temporal/middleware/header_propagator_chain' @@ -14,7 +15,7 @@ class Configuration Connection = Struct.new(:type, :host, :port, :credentials, :identity, keyword_init: true) Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, :search_attributes, keyword_init: true) - attr_reader :timeouts, :error_handlers + attr_reader :timeouts, :error_handlers, :capabilities attr_accessor :connection_type, :converter, :use_error_serialization_v2, :host, :port, :credentials, :identity, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :header_propagators, :payload_codec, :legacy_signals @@ -57,7 +58,7 @@ class Configuration Temporal::Connection::Converter::Payload::JSON.new ] ).freeze - + # The Payload Codec is an optional step that happens between the wire and the Payload Converter: # Temporal Server <--> Wire <--> Payload Codec <--> Payload Converter <--> User code # which can be useful for transformations such as compression and encryption @@ -82,13 +83,14 @@ def initialize @identity = nil @search_attributes = {} @header_propagators = [] - - # Signals previously were incorrectly replayed in order within a workflow task, rather than - # at the beginning. Correcting this changes the determinism of any workflow with signals. This - # flag exists to preserve this legacy behavior while rolling out the new order. When adopting - # the first version of the library with this mode, set this to true, fully deploy your worker, - # then do a second deployment with this flag returned to the default value of false. New use - # cases should simply leave this as the default. + @capabilities = Capabilities.new(self) + + # Signals previously were incorrectly replayed in order within a workflow task window, rather + # than at the beginning. Correcting this changes the determinism of any workflow with signals. + # This flag exists to force this legacy behavior to gradually roll out the new ordering. + # Because this feature depends on the SDK Metadata capability which only became available + # in Temporal server 1.20, it is ignored when connected to older versions and effectively + # treated as true. @legacy_signals = false end @@ -130,6 +132,7 @@ def default_execution_options def add_header_propagator(propagator_class, *args) raise 'header propagator must implement `def inject!(headers)`' unless propagator_class.method_defined? :inject! + @header_propagators << Middleware::Entry.new(propagator_class, args) end diff --git a/lib/temporal/errors.rb b/lib/temporal/errors.rb index 3fc2cb38..a13ada62 100644 --- a/lib/temporal/errors.rb +++ b/lib/temporal/errors.rb @@ -12,12 +12,6 @@ class NonDeterministicWorkflowError < InternalError; end # Indicates a workflow task was encountered that used an unknown SDK flag class UnknownSDKFlagError < InternalError; end - # Indicates the worker has connected to a Temporal server that does not - # support SDK metadata, and therefore only legacy signal mode can be used. - # Set the .legacy_signals option in your Temporal::Configuration or upgrade - # Temporal server to 1.20 or newer. - class SDKMetadatNotSupportedError < InternalError; end - # Superclass for misconfiguration/misuse on the client (user) side class ClientError < Error; end @@ -58,8 +52,10 @@ class WorkflowCanceled < WorkflowError; end # Errors where the workflow run didn't complete but not an error for the whole workflow. class WorkflowRunError < Error; end + class WorkflowRunContinuedAsNew < WorkflowRunError attr_reader :new_run_id + def initialize(new_run_id:) super @new_run_id = new_run_id @@ -81,6 +77,7 @@ def initialize(message, run_id = nil) @run_id = run_id end end + class NamespaceNotActiveFailure < ApiError; end class ClientVersionNotSupportedFailure < ApiError; end class FeatureVersionNotSupportedFailure < ApiError; end diff --git a/lib/temporal/worker.rb b/lib/temporal/worker.rb index a94d43f8..5d84df6e 100644 --- a/lib/temporal/worker.rb +++ b/lib/temporal/worker.rb @@ -66,9 +66,9 @@ def register_dynamic_workflow(workflow_class, options = {}) @workflows[namespace_and_task_queue].add_dynamic(execution_options.name, workflow_class) rescue Temporal::ExecutableLookup::SecondDynamicExecutableError => e raise Temporal::SecondDynamicWorkflowError, - "Temporal::Worker#register_dynamic_workflow: cannot register #{execution_options.name} "\ - "dynamically; #{e.previous_executable_name} was already registered dynamically for task queue "\ - "'#{execution_options.task_queue}', and there can be only one." + "Temporal::Worker#register_dynamic_workflow: cannot register #{execution_options.name} "\ + "dynamically; #{e.previous_executable_name} was already registered dynamically for task queue "\ + "'#{execution_options.task_queue}', and there can be only one." end end @@ -87,9 +87,9 @@ def register_dynamic_activity(activity_class, options = {}) @activities[namespace_and_task_queue].add_dynamic(execution_options.name, activity_class) rescue Temporal::ExecutableLookup::SecondDynamicExecutableError => e raise Temporal::SecondDynamicActivityError, - "Temporal::Worker#register_dynamic_activity: cannot register #{execution_options.name} "\ - "dynamically; #{e.previous_executable_name} was already registered dynamically for task queue "\ - "'#{execution_options.task_queue}', and there can be only one." + "Temporal::Worker#register_dynamic_activity: cannot register #{execution_options.name} "\ + "dynamically; #{e.previous_executable_name} was already registered dynamically for task queue "\ + "'#{execution_options.task_queue}', and there can be only one." end end @@ -106,10 +106,6 @@ def add_activity_middleware(middleware_class, *args) end def start - if workflows.any? - check_signals_first_support - end - @start_stop_mutex.synchronize do return if shutting_down? # Handle the case where stop method grabbed the mutex first @@ -128,7 +124,7 @@ def start on_started_hook # keep the main thread alive - sleep 1 while !shutting_down? + sleep 1 until shutting_down? end def stop @@ -163,7 +159,8 @@ def while_stopping_hook; end def on_stopped_hook; end def workflow_poller_for(namespace, task_queue, lookup) - Workflow::Poller.new(namespace, task_queue, lookup.freeze, config, workflow_task_middleware, workflow_middleware, workflow_poller_options) + Workflow::Poller.new(namespace, task_queue, lookup.freeze, config, workflow_task_middleware, workflow_middleware, + workflow_poller_options) end def activity_poller_for(namespace, task_queue, lookup) @@ -181,26 +178,5 @@ def trap_signals Signal.trap(signal) { stop } end end - - def check_signals_first_support - if config.legacy_signals - Temporal.logger.debug('Running in legacy signals mode') - return - end - - connection = Temporal::Connection.generate(config.for_connection) - system_info = connection.get_system_info - Temporal.logger.debug("Connected to Temporal server running version #{system_info.server_version}") - - if system_info&.capabilities&.sdk_metadata - Temporal.logger.debug('Running in signals first mode. Server supports SDK metadata.') - return - end - - raise SDKMetadatNotSupportedError, - 'Signals first ordering requires a Temporal server that supports SDK metadata. Set ' \ - 'Temporal::Configuration.legacy_signals to true or upgrade to Temporal server 1.20 ' \ - 'or newer.' - end end end diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index a713e1b9..d5fa3e3e 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -20,6 +20,7 @@ class UnsupportedEvent < Temporal::InternalError; end class UnsupportedMarkerType < Temporal::InternalError; end attr_reader :commands, :local_time, :search_attributes, :new_sdk_flags_used + def initialize(dispatcher, config) @dispatcher = dispatcher @commands = [] @@ -119,18 +120,25 @@ def self.signal_event?(event) attr_reader :dispatcher, :command_tracker, :marker_ids, :side_effects, :releases, :sdk_flags - def order_events(raw_events) - signals_first = + def use_signals_first(raw_events) + raw_events.any? { |event| StateManager.signal_event?(event) } && # If signals were handled first when this task or a previous one in this run were first - # played, we must continue to do so in order to ensure determinism. The configuration - # value can be ignored. - sdk_flags.include?(SDKFlags::HANDLE_SIGNALS_FIRST) || - # If this is being played for the first time, use the configuration flag to choose - (!replay? && !@config.legacy_signals) - - if signals_first && raw_events.any? { |event| StateManager.signal_event?(event) } - report_flag_used(SDKFlags::HANDLE_SIGNALS_FIRST) - end + # played, we must continue to do so in order to ensure determinism regardless of what + # the configuration value is set to. + ( + sdk_flags.include?(SDKFlags::HANDLE_SIGNALS_FIRST) || + # If this is being played for the first time, use the configuration flag to choose + (!replay? && !@config.legacy_signals) + ) && + # In order to preserve determinism, the server must support SDK metadata to order signals + # first + @config.capabilities.sdk_metadata + end + + def order_events(raw_events) + signals_first = use_signals_first(raw_events) + + report_flag_used(SDKFlags::HANDLE_SIGNALS_FIRST) if signals_first raw_events.sort_by.with_index do |event, index| # sort_by is not stable, so include index to preserve order diff --git a/spec/fabricators/grpc/get_system_info_fabricator.rb b/spec/fabricators/grpc/get_system_info_fabricator.rb new file mode 100644 index 00000000..5b35cb33 --- /dev/null +++ b/spec/fabricators/grpc/get_system_info_fabricator.rb @@ -0,0 +1,10 @@ +Fabricator(:api_get_system_info, from: Temporalio::Api::WorkflowService::V1::GetSystemInfoResponse) do + transient :sdk_metadata_capability + + server_version 'test-7.8.9' + capabilities do |attrs| + Temporalio::Api::WorkflowService::V1::GetSystemInfoResponse::Capabilities.new( + sdk_metadata: attrs.fetch(:sdk_metadata, true) + ) + end +end diff --git a/spec/unit/lib/temporal/worker_spec.rb b/spec/unit/lib/temporal/worker_spec.rb index 37c6c3d8..685e07a0 100644 --- a/spec/unit/lib/temporal/worker_spec.rb +++ b/spec/unit/lib/temporal/worker_spec.rb @@ -253,58 +253,6 @@ def start_and_stop(worker) let(:activity_poller_1) { instance_double(Temporal::Activity::Poller, start: nil, stop_polling: nil, cancel_pending_requests: nil, wait: nil) } let(:activity_poller_2) { instance_double(Temporal::Activity::Poller, start: nil, stop_polling: nil, cancel_pending_requests: nil, wait: nil) } - context 'no SDK metadata support' do - let(:sdk_metadata_enabled) { false } - it 'fails' do - allow(Temporal::Workflow::Poller) - .to receive(:new) - .with( - 'default-namespace', - 'default-task-queue', - an_instance_of(Temporal::ExecutableLookup), - config, - [], - [], - thread_pool_size: 10, - binary_checksum: nil, - poll_retry_seconds: 0 - ) - .and_return(workflow_poller_1) - subject.register_workflow(TestWorkerWorkflow) - - expect do - subject.start - end.to raise_error(Temporal::SDKMetadatNotSupportedError) - - expect(workflow_poller_1).not_to have_received(:start) - end - - context 'legacy signal mode' do - let(:config) { Temporal::Configuration.new.tap { |c| c.legacy_signals = true } } - it 'ok' do - allow(Temporal::Workflow::Poller) - .to receive(:new) - .with( - 'default-namespace', - 'default-task-queue', - an_instance_of(Temporal::ExecutableLookup), - config, - [], - [], - thread_pool_size: 10, - binary_checksum: nil, - poll_retry_seconds: 0 - ) - .and_return(workflow_poller_1) - subject.register_workflow(TestWorkerWorkflow) - - start_and_stop(subject) - - expect(workflow_poller_1).to have_received(:start) - end - end - end - it 'starts a poller for each namespace/task list combination' do allow(Temporal::Workflow::Poller) .to receive(:new) diff --git a/spec/unit/lib/temporal/workflow/executor_spec.rb b/spec/unit/lib/temporal/workflow/executor_spec.rb index 059fc4b3..ff15e719 100644 --- a/spec/unit/lib/temporal/workflow/executor_spec.rb +++ b/spec/unit/lib/temporal/workflow/executor_spec.rb @@ -8,6 +8,7 @@ describe Temporal::Workflow::Executor do subject { described_class.new(workflow, history, workflow_metadata, config, false, middleware_chain) } + let(:connection) { instance_double('Temporal::Connection::GRPC') } let(:workflow_started_event) { Fabricate(:api_workflow_execution_started_event, event_id: 1) } let(:history) do Temporal::Workflow::History.new([ @@ -22,6 +23,10 @@ let(:config) { Temporal::Configuration.new } let(:middleware_chain) { Temporal::Middleware::Chain.new } + before do + allow(Temporal::Connection).to receive(:generate).and_return(connection) + end + class TestWorkflow < Temporal::Workflow def execute 'test' @@ -64,9 +69,12 @@ def execute Fabricate(:api_workflow_task_started_event, event_id: 4) ]) end + let(:system_info) { Fabricate(:api_get_system_info) } context 'signals first config enabled' do it 'set signals first sdk flag' do + allow(connection).to receive(:get_system_info).and_return(system_info) + decisions = subject.run expect(decisions.commands.length).to eq(1) diff --git a/spec/unit/lib/temporal/workflow/state_manager_spec.rb b/spec/unit/lib/temporal/workflow/state_manager_spec.rb index e9d227ea..99685cd9 100644 --- a/spec/unit/lib/temporal/workflow/state_manager_spec.rb +++ b/spec/unit/lib/temporal/workflow/state_manager_spec.rb @@ -45,6 +45,12 @@ class MyWorkflow < Temporal::Workflow; end Temporal::Workflow::StateManager.new(dispatcher, config) end let(:config) { Temporal::Configuration.new } + let(:connection) { instance_double('Temporal::Connection::GRPC') } + let(:system_info) { Fabricate(:api_get_system_info) } + + before do + allow(Temporal::Connection).to receive(:generate).and_return(connection) + end context 'workflow execution started' do let(:history) do @@ -71,6 +77,8 @@ class MyWorkflow < Temporal::Workflow; end end it 'dispatcher invoked for start' do + allow(connection).to receive(:get_system_info).and_return(system_info) + # While markers do come before the workflow execution started event, signals do not expect(dispatcher).to receive(:dispatch).with( Temporal::Workflow::History::EventTarget.workflow, 'started', instance_of(Array) @@ -236,8 +244,19 @@ def test_order(signal_first) ) end it 'signal first' do + allow(connection).to receive(:get_system_info).and_return(system_info) + test_order(true) end + + context 'even with legacy config enabled' do + let(:config) { Temporal::Configuration.new.tap { |c| c.legacy_signals = true } } + it 'signal first' do + allow(connection).to receive(:get_system_info).and_return(system_info) + + test_order(true) + end + end end end @@ -273,6 +292,8 @@ def test_order(signal_first) let(:config) { Temporal::Configuration.new } it 'signal first' do + allow(connection).to receive(:get_system_info).and_return(system_info) + test_order(true) expect(state_manager.new_sdk_flags_used).to eq(Set.new([Temporal::Workflow::SDKFlags::HANDLE_SIGNALS_FIRST])) From 1e9dd7372c1af5c831bb65998d59a9b8311fd319 Mon Sep 17 00:00:00 2001 From: Jeff Schoner Date: Wed, 23 Aug 2023 12:40:20 -0700 Subject: [PATCH 11/12] Refactor how HANDLE_SIGNALS_FIRST SDK flag is managed --- lib/temporal/workflow/state_manager.rb | 28 +++++++++++++++----------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index d5fa3e3e..62302ac2 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -121,25 +121,29 @@ def self.signal_event?(event) attr_reader :dispatcher, :command_tracker, :marker_ids, :side_effects, :releases, :sdk_flags def use_signals_first(raw_events) - raw_events.any? { |event| StateManager.signal_event?(event) } && + if sdk_flags.include?(SDKFlags::HANDLE_SIGNALS_FIRST) # If signals were handled first when this task or a previous one in this run were first # played, we must continue to do so in order to ensure determinism regardless of what - # the configuration value is set to. - ( - sdk_flags.include?(SDKFlags::HANDLE_SIGNALS_FIRST) || - # If this is being played for the first time, use the configuration flag to choose - (!replay? && !@config.legacy_signals) - ) && - # In order to preserve determinism, the server must support SDK metadata to order signals - # first - @config.capabilities.sdk_metadata + # the configuration value is set to. Even the capabilities can be ignored because the + # server must have returned SDK metadata for this to be true. + true + elsif raw_events.any? { |event| StateManager.signal_event?(event) } && + # If this is being played for the first time, use the configuration flag to choose + (!replay? && !@config.legacy_signals) && + # In order to preserve determinism, the server must support SDK metadata to order signals + # first. This is checked last because it will result in a Temporal server call the first + # time it's called in the worker process. + @config.capabilities.sdk_metadata + report_flag_used(SDKFlags::HANDLE_SIGNALS_FIRST) + true + else + false + end end def order_events(raw_events) signals_first = use_signals_first(raw_events) - report_flag_used(SDKFlags::HANDLE_SIGNALS_FIRST) if signals_first - raw_events.sort_by.with_index do |event, index| # sort_by is not stable, so include index to preserve order [StateManager.event_order(event, signals_first), index] From 89bcbde6e5716d5a149fc675e223764f10c3491e Mon Sep 17 00:00:00 2001 From: Jeff Schoner Date: Sat, 26 Aug 2023 12:42:46 -0700 Subject: [PATCH 12/12] Require set --- lib/temporal/workflow/history/window.rb | 1 + lib/temporal/workflow/sdk_flags.rb | 2 ++ 2 files changed, 3 insertions(+) diff --git a/lib/temporal/workflow/history/window.rb b/lib/temporal/workflow/history/window.rb index d4c5d178..83112feb 100644 --- a/lib/temporal/workflow/history/window.rb +++ b/lib/temporal/workflow/history/window.rb @@ -1,3 +1,4 @@ +require 'set' require 'temporal/workflow/sdk_flags' module Temporal diff --git a/lib/temporal/workflow/sdk_flags.rb b/lib/temporal/workflow/sdk_flags.rb index 49aa792c..6b24fe05 100644 --- a/lib/temporal/workflow/sdk_flags.rb +++ b/lib/temporal/workflow/sdk_flags.rb @@ -1,3 +1,5 @@ +require 'set' + module Temporal class Workflow module SDKFlags