Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signals first ordering #261

Merged
merged 12 commits into from
Sep 11, 2023
24 changes: 22 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,24 @@
# Changelog

## 0.0.1
- First release
## 0.1.0

This introduces signal first ordering. See https://github.com/coinbase/temporal-ruby/issues/258 for
details on why this is necessary for correct handling of signals.

**IMPORTANT: ** This feature requires Temporal server 1.20.0 or newer. If you are running an older
version of the server, you must either upgrade to at least this version, or you can set the
`.legacy_signals` configuration option to true until you can upgrade.

If you do not have existing workflows with signals running or are standing up a worker service
for the first time, you can ignore all the below instructions.

If you have any workflows with signals running during a deployment and run more than one worker
process, you must follow these rollout steps to avoid non-determinism errors:
1. Set `.legacy_signals` in `Temporal::Configuration` to true
2. Deploy your worker
3. Remove the `.legacy_signals` setting or set it to `false`
4. Deploy your worker

These steps ensure any workflow that executes in signals first mode will continue to be executed
in this order on replay. If you don't follow these steps, you may see failed workflow tasks, which
in some cases could result in unrecoverable history corruption.
1 change: 1 addition & 0 deletions examples/bin/worker
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ worker.register_workflow(SendSignalToExternalWorkflow)
worker.register_workflow(SerialHelloWorldWorkflow)
worker.register_workflow(SideEffectWorkflow)
worker.register_workflow(SignalWithStartWorkflow)
worker.register_workflow(SignalWorkflow)
worker.register_workflow(SimpleTimerWorkflow)
worker.register_workflow(SlowChildWorkflow)
worker.register_workflow(StartChildWorkflowWorkflow)
Expand Down
40 changes: 40 additions & 0 deletions examples/spec/integration/signal_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
require 'securerandom'
require 'workflows/signal_workflow'

describe 'signal' do
it 'all signals process' do
workflow_id = SecureRandom.uuid
expected_score = 7
run_id = Temporal.start_workflow(
SignalWorkflow,
1, # seconds
options: {
workflow_id: workflow_id,
signal_name: 'score',
signal_input: expected_score,
timeouts: { execution: 10 }
}
)

loop do
value = SecureRandom.rand(10)

begin
Temporal.signal_workflow(SignalWorkflow, 'score', workflow_id, run_id, value)
rescue StandardError
# Keep going until there's an error such as the workflow finishing
break
end
expected_score += value
sleep 0.01
end

result = Temporal.await_workflow_result(
SignalWorkflow,
workflow_id: workflow_id,
run_id: run_id
)

expect(result).to eq(expected_score)
end
end
12 changes: 12 additions & 0 deletions examples/workflows/signal_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
class SignalWorkflow < Temporal::Workflow
def execute(sleep_for)
score = 0
workflow.on_signal('score') do |signal_value|
score += signal_value
end

workflow.sleep(sleep_for)

score
end
end
30 changes: 30 additions & 0 deletions lib/temporal/capabilities.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
require 'temporal/errors'

module Temporal
class Capabilities
def initialize(config)
@config = config
@sdk_metadata = nil
end

def sdk_metadata
set_capabilities if @sdk_metadata.nil?

@sdk_metadata
end

private

def set_capabilities
connection = Temporal::Connection.generate(@config.for_connection)
system_info = connection.get_system_info

@sdk_metadata = system_info&.capabilities&.sdk_metadata || false

Temporal.logger.debug(
"Connected to Temporal server running version #{system_info.server_version}. " \
"SDK Metadata supported: #{@sdk_metadata}"
)
end
end
end
17 changes: 14 additions & 3 deletions lib/temporal/configuration.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require 'temporal/capabilities'
require 'temporal/logger'
require 'temporal/metrics_adapters/null'
require 'temporal/middleware/header_propagator_chain'
Expand All @@ -14,10 +15,10 @@ class Configuration
Connection = Struct.new(:type, :host, :port, :credentials, :identity, keyword_init: true)
Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, :search_attributes, keyword_init: true)

attr_reader :timeouts, :error_handlers
attr_reader :timeouts, :error_handlers, :capabilities
attr_accessor :connection_type, :converter, :use_error_serialization_v2, :host, :port, :credentials, :identity,
:logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :header_propagators,
:payload_codec
:payload_codec, :legacy_signals

# See https://docs.temporal.io/blog/activity-timeouts/ for general docs.
# We want an infinite execution timeout for cron schedules and other perpetual workflows.
Expand Down Expand Up @@ -57,7 +58,7 @@ class Configuration
Temporal::Connection::Converter::Payload::JSON.new
]
).freeze

# The Payload Codec is an optional step that happens between the wire and the Payload Converter:
# Temporal Server <--> Wire <--> Payload Codec <--> Payload Converter <--> User code
# which can be useful for transformations such as compression and encryption
Expand All @@ -82,6 +83,15 @@ def initialize
@identity = nil
@search_attributes = {}
@header_propagators = []
@capabilities = Capabilities.new(self)

# Signals previously were incorrectly replayed in order within a workflow task window, rather
# than at the beginning. Correcting this changes the determinism of any workflow with signals.
# This flag exists to force this legacy behavior to gradually roll out the new ordering.
# Because this feature depends on the SDK Metadata capability which only became available
# in Temporal server 1.20, it is ignored when connected to older versions and effectively
# treated as true.
@legacy_signals = false
end

def on_error(&block)
Expand Down Expand Up @@ -122,6 +132,7 @@ def default_execution_options

def add_header_propagator(propagator_class, *args)
raise 'header propagator must implement `def inject!(headers)`' unless propagator_class.method_defined? :inject!

@header_propagators << Middleware::Entry.new(propagator_class, args)
end

Expand Down
Loading