Skip to content

Commit

Permalink
Improve thread pool error handling (#273)
Browse files Browse the repository at this point in the history
* Guard nil context

* Set abort_on_exception for thread pool threads

* Logging and handling of errors in thread top level

* Set abort_on_exception in poller threads

* New thread pool tests for error cases

* Clean up/fix related tests

---------

Co-authored-by: Jeff Schoner <jeffschoner@stripe.com>
  • Loading branch information
jeffschoner and jeffschoner-stripe authored Oct 30, 2023
1 parent 08fe1e9 commit 6a11a81
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 25 deletions.
5 changes: 5 additions & 0 deletions lib/temporal/activity/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def shutting_down?
end

def poll_loop
# Prevent the poller thread from silently dying
Thread.current.abort_on_exception = true

last_poll_time = Time.now
metrics_tags = { namespace: namespace, task_queue: task_queue }.freeze

Expand Down Expand Up @@ -115,6 +118,7 @@ def poll_retry_seconds
def thread_pool
@thread_pool ||= ThreadPool.new(
options[:thread_pool_size],
@config,
{
pool_name: 'activity_task_poller',
namespace: namespace,
Expand All @@ -126,6 +130,7 @@ def thread_pool
def heartbeat_thread_pool
@heartbeat_thread_pool ||= ScheduledThreadPool.new(
options[:thread_pool_size],
@config,
{
pool_name: 'heartbeat',
namespace: namespace,
Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/activity/task_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def process

respond_failed(error)
ensure
unless context.heartbeat_check_scheduled.nil?
unless context&.heartbeat_check_scheduled.nil?
heartbeat_thread_pool.cancel(context.heartbeat_check_scheduled)
end

Expand Down
16 changes: 14 additions & 2 deletions lib/temporal/scheduled_thread_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ class ScheduledThreadPool

ScheduledItem = Struct.new(:id, :job, :fire_at, :canceled, keyword_init: true)

def initialize(size, metrics_tags)
def initialize(size, config, metrics_tags)
@size = size
@metrics_tags = metrics_tags
@queue = Queue.new
@mutex = Mutex.new
@config = config
@available_threads = size
@occupied_threads = {}
@pool = Array.new(size) do |_i|
Expand Down Expand Up @@ -66,6 +67,8 @@ class CancelError < StandardError; end
EXIT_SYMBOL = :exit

def poll
Thread.current.abort_on_exception = true

loop do
item = @queue.pop
if item == EXIT_SYMBOL
Expand All @@ -90,7 +93,16 @@ def poll
# reliably be stopped once running. It's still in the begin/rescue block
# so that it won't be executed if the thread gets canceled.
if !item.canceled
item.job.call
begin
item.job.call
rescue StandardError => e
Temporal.logger.error('Error reached top of thread pool thread', { error: e.inspect })
Temporal::ErrorHandler.handle(e, @config)
rescue Exception => ex
Temporal.logger.error('Exception reached top of thread pool thread', { error: ex.inspect })
Temporal::ErrorHandler.handle(ex, @config)
raise
end
end
rescue CancelError
end
Expand Down
16 changes: 14 additions & 2 deletions lib/temporal/thread_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ module Temporal
class ThreadPool
attr_reader :size

def initialize(size, metrics_tags)
def initialize(size, config, metrics_tags)
@size = size
@metrics_tags = metrics_tags
@queue = Queue.new
@mutex = Mutex.new
@config = config
@availability = ConditionVariable.new
@available_threads = size
@pool = Array.new(size) do |_i|
Expand Down Expand Up @@ -55,10 +56,21 @@ def shutdown
EXIT_SYMBOL = :exit

def poll
Thread.current.abort_on_exception = true

catch(EXIT_SYMBOL) do
loop do
job = @queue.pop
job.call
begin
job.call
rescue StandardError => e
Temporal.logger.error('Error reached top of thread pool thread', { error: e.inspect })
Temporal::ErrorHandler.handle(e, @config)
rescue Exception => ex
Temporal.logger.error('Exception reached top of thread pool thread', { error: ex.inspect })
Temporal::ErrorHandler.handle(ex, @config)
raise
end
@mutex.synchronize do
@available_threads += 1
@availability.signal
Expand Down
4 changes: 4 additions & 0 deletions lib/temporal/workflow/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ def shutting_down?
end

def poll_loop
# Prevent the poller thread from silently dying
Thread.current.abort_on_exception = true

last_poll_time = Time.now
metrics_tags = { namespace: namespace, task_queue: task_queue }.freeze

Expand Down Expand Up @@ -117,6 +120,7 @@ def process(task)
def thread_pool
@thread_pool ||= ThreadPool.new(
options[:thread_pool_size],
@config,
{
pool_name: 'workflow_task_poller',
namespace: namespace,
Expand Down
29 changes: 15 additions & 14 deletions spec/unit/lib/temporal/activity/context_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,31 @@
require 'temporal/scheduled_thread_pool'

describe Temporal::Activity::Context do
let(:client) { instance_double('Temporal::Client::GRPCClient') }
let(:connection) { instance_double('Temporal::Connection::GRPC') }
let(:metadata_hash) { Fabricate(:activity_metadata).to_h }
let(:metadata) { Temporal::Metadata::Activity.new(**metadata_hash) }
let(:config) { Temporal::Configuration.new }
let(:task_token) { SecureRandom.uuid }
let(:heartbeat_thread_pool) { Temporal::ScheduledThreadPool.new(1, {}) }
let(:heartbeat_thread_pool) { Temporal::ScheduledThreadPool.new(1, config, {}) }
let(:heartbeat_response) { Fabricate(:api_record_activity_heartbeat_response) }

subject { described_class.new(client, metadata, config, heartbeat_thread_pool) }
subject { described_class.new(connection, metadata, config, heartbeat_thread_pool) }

describe '#heartbeat' do
before { allow(client).to receive(:record_activity_task_heartbeat).and_return(heartbeat_response) }
before { allow(connection).to receive(:record_activity_task_heartbeat).and_return(heartbeat_response) }

it 'records heartbeat' do
subject.heartbeat

expect(client)
expect(connection)
.to have_received(:record_activity_task_heartbeat)
.with(namespace: metadata.namespace, task_token: metadata.task_token, details: nil)
end

it 'records heartbeat with details' do
subject.heartbeat(foo: :bar)

expect(client)
expect(connection)
.to have_received(:record_activity_task_heartbeat)
.with(namespace: metadata.namespace, task_token: metadata.task_token, details: { foo: :bar })
end
Expand All @@ -48,7 +48,7 @@
subject.heartbeat(iteration: i)
end

expect(client)
expect(connection)
.to have_received(:record_activity_task_heartbeat)
.with(namespace: metadata.namespace, task_token: metadata.task_token, details: { iteration: 0 })
.once
Expand All @@ -67,7 +67,7 @@
# Shutdown to drain remaining threads
heartbeat_thread_pool.shutdown

expect(client)
expect(connection)
.to have_received(:record_activity_task_heartbeat)
.ordered
.with(namespace: metadata.namespace, task_token: metadata.task_token, details: { iteration: 1 })
Expand All @@ -80,7 +80,7 @@
config.timeouts = { max_heartbeat_throttle_interval: 0 }
subject.heartbeat

expect(client)
expect(connection)
.to have_received(:record_activity_task_heartbeat)
.with(namespace: metadata.namespace, task_token: metadata.task_token, details: nil)

Expand All @@ -90,17 +90,18 @@
end

describe '#last_heartbeat_throttled' do
before { allow(client).to receive(:record_activity_task_heartbeat).and_return(heartbeat_response) }
before { allow(connection).to receive(:record_activity_task_heartbeat).and_return(heartbeat_response) }

let(:metadata_hash) { Fabricate(:activity_metadata, heartbeat_timeout: 10).to_h }
let(:metadata_hash) { Fabricate(:activity_metadata, heartbeat_timeout: 3).to_h }

it 'true when throttled, false when not' do
subject.heartbeat(iteration: 1)
expect(subject.last_heartbeat_throttled).to be(false)
subject.heartbeat(iteration: 2)
expect(subject.last_heartbeat_throttled).to be(true)
subject.heartbeat(iteration: 3)
expect(subject.last_heartbeat_throttled).to be(true)

# Shutdown to drain remaining threads
heartbeat_thread_pool.shutdown
end
end

Expand All @@ -120,7 +121,7 @@

describe '#async?' do
subject { context.async? }
let(:context) { described_class.new(client, metadata, nil, nil) }
let(:context) { described_class.new(connection, metadata, nil, nil) }

context 'when context is sync' do
it { is_expected.to eq(false) }
Expand Down
14 changes: 10 additions & 4 deletions spec/unit/lib/temporal/activity/task_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
let(:connection) { instance_double('Temporal::Connection::GRPC') }
let(:middleware_chain) { Temporal::Middleware::Chain.new }
let(:config) { Temporal::Configuration.new }
let(:heartbeat_thread_pool) { Temporal::ScheduledThreadPool.new(2, {}) }
let(:heartbeat_thread_pool) { Temporal::ScheduledThreadPool.new(2, config, {}) }
let(:input) { %w[arg1 arg2] }

describe '#process' do
let(:heartbeat_check_scheduled) { nil }
let(:context) { instance_double('Temporal::Activity::Context', async?: false, heartbeat_check_scheduled: heartbeat_check_scheduled) }
let(:context) do
instance_double('Temporal::Activity::Context', async?: false,
heartbeat_check_scheduled: heartbeat_check_scheduled)
end

before do
allow(Temporal::Connection)
Expand All @@ -38,7 +41,8 @@
.to receive(:generate_activity_metadata)
.with(task, namespace)
.and_return(metadata)
allow(Temporal::Activity::Context).to receive(:new).with(connection, metadata, config, heartbeat_thread_pool).and_return(context)
allow(Temporal::Activity::Context).to receive(:new).with(connection, metadata, config,
heartbeat_thread_pool).and_return(context)

allow(connection).to receive(:respond_activity_task_completed)
allow(connection).to receive(:respond_activity_task_failed)
Expand Down Expand Up @@ -119,7 +123,9 @@
end

context 'when there is an outstanding scheduled heartbeat' do
let(:heartbeat_check_scheduled) { Temporal::ScheduledThreadPool::ScheduledItem.new(id: :foo, canceled: false) }
let(:heartbeat_check_scheduled) do
Temporal::ScheduledThreadPool::ScheduledItem.new(id: :foo, canceled: false)
end
it 'it gets canceled' do
subject.process

Expand Down
35 changes: 34 additions & 1 deletion spec/unit/lib/temporal/scheduled_thread_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
allow(Temporal.metrics).to receive(:gauge)
end

let(:config) { Temporal::Configuration.new }
let(:size) { 2 }
let(:tags) { { foo: 'bar', bat: 'baz' } }
let(:thread_pool) { described_class.new(size, tags) }
let(:thread_pool) { described_class.new(size, config, tags) }

describe '#schedule' do
it 'executes one task with zero delay on a thread and exits' do
Expand Down Expand Up @@ -39,6 +40,38 @@
expect(answers.pop).to eq(:first)
expect(answers.pop).to eq(:second)
end

it 'error does not exit' do
times = 0

thread_pool.schedule(:foo, 0) do
times += 1
raise 'foo'
end

thread_pool.shutdown

expect(times).to eq(1)
end

it 'exception does exit' do
Thread.report_on_exception = false
times = 0

thread_pool.schedule(:foo, 0) do
times += 1
raise Exception, 'crash'
end

begin
thread_pool.shutdown
raise 'should not be reached'
rescue Exception => e
'ok'
end

expect(times).to eq(1)
end
end

describe '#cancel' do
Expand Down
34 changes: 33 additions & 1 deletion spec/unit/lib/temporal/thread_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
allow(Temporal.metrics).to receive(:gauge)
end

let(:config) { Temporal::Configuration.new }
let(:size) { 2 }
let(:tags) { { foo: 'bar', bat: 'baz' } }
let(:thread_pool) { described_class.new(size, tags) }
let(:thread_pool) { described_class.new(size, config, tags) }

describe '#new' do
it 'executes one task on a thread and exits' do
Expand All @@ -22,6 +23,37 @@
expect(times).to eq(1)
end

it 'handles error without exiting' do
times = 0

thread_pool.schedule do
times += 1
raise 'failure'
end

thread_pool.shutdown

expect(times).to eq(1)
end

it 'handles exception with exiting' do
Thread.report_on_exception = false
times = 0

thread_pool.schedule do
times += 1
raise Exception, 'crash'
end

begin
thread_pool.shutdown
rescue Exception => e
'ok'
end

expect(times).to eq(1)
end

it 'reports thread available metrics' do
thread_pool.schedule do
end
Expand Down

0 comments on commit 6a11a81

Please sign in to comment.