From 6a11a81a25498a2f3bd30819f68c979f774a7884 Mon Sep 17 00:00:00 2001 From: Jeff Schoner Date: Mon, 30 Oct 2023 07:08:13 -0700 Subject: [PATCH] Improve thread pool error handling (#273) * Guard nil context * Set abort_on_exception for thread pool threads * Logging and handling of errors in thread top level * Set abort_on_exception in poller threads * New thread pool tests for error cases * Clean up/fix related tests --------- Co-authored-by: Jeff Schoner --- lib/temporal/activity/poller.rb | 5 +++ lib/temporal/activity/task_processor.rb | 2 +- lib/temporal/scheduled_thread_pool.rb | 16 +++++++-- lib/temporal/thread_pool.rb | 16 +++++++-- lib/temporal/workflow/poller.rb | 4 +++ .../lib/temporal/activity/context_spec.rb | 29 +++++++-------- .../temporal/activity/task_processor_spec.rb | 14 +++++--- .../temporal/scheduled_thread_pool_spec.rb | 35 ++++++++++++++++++- spec/unit/lib/temporal/thread_pool_spec.rb | 34 +++++++++++++++++- 9 files changed, 130 insertions(+), 25 deletions(-) diff --git a/lib/temporal/activity/poller.rb b/lib/temporal/activity/poller.rb index 55271593..40259f16 100644 --- a/lib/temporal/activity/poller.rb +++ b/lib/temporal/activity/poller.rb @@ -61,6 +61,9 @@ def shutting_down? end def poll_loop + # Prevent the poller thread from silently dying + Thread.current.abort_on_exception = true + last_poll_time = Time.now metrics_tags = { namespace: namespace, task_queue: task_queue }.freeze @@ -115,6 +118,7 @@ def poll_retry_seconds def thread_pool @thread_pool ||= ThreadPool.new( options[:thread_pool_size], + @config, { pool_name: 'activity_task_poller', namespace: namespace, @@ -126,6 +130,7 @@ def thread_pool def heartbeat_thread_pool @heartbeat_thread_pool ||= ScheduledThreadPool.new( options[:thread_pool_size], + @config, { pool_name: 'heartbeat', namespace: namespace, diff --git a/lib/temporal/activity/task_processor.rb b/lib/temporal/activity/task_processor.rb index 34847b93..51ae5408 100644 --- a/lib/temporal/activity/task_processor.rb +++ b/lib/temporal/activity/task_processor.rb @@ -47,7 +47,7 @@ def process respond_failed(error) ensure - unless context.heartbeat_check_scheduled.nil? + unless context&.heartbeat_check_scheduled.nil? heartbeat_thread_pool.cancel(context.heartbeat_check_scheduled) end diff --git a/lib/temporal/scheduled_thread_pool.rb b/lib/temporal/scheduled_thread_pool.rb index 99b70a10..5e9025af 100644 --- a/lib/temporal/scheduled_thread_pool.rb +++ b/lib/temporal/scheduled_thread_pool.rb @@ -9,11 +9,12 @@ class ScheduledThreadPool ScheduledItem = Struct.new(:id, :job, :fire_at, :canceled, keyword_init: true) - def initialize(size, metrics_tags) + def initialize(size, config, metrics_tags) @size = size @metrics_tags = metrics_tags @queue = Queue.new @mutex = Mutex.new + @config = config @available_threads = size @occupied_threads = {} @pool = Array.new(size) do |_i| @@ -66,6 +67,8 @@ class CancelError < StandardError; end EXIT_SYMBOL = :exit def poll + Thread.current.abort_on_exception = true + loop do item = @queue.pop if item == EXIT_SYMBOL @@ -90,7 +93,16 @@ def poll # reliably be stopped once running. It's still in the begin/rescue block # so that it won't be executed if the thread gets canceled. if !item.canceled - item.job.call + begin + item.job.call + rescue StandardError => e + Temporal.logger.error('Error reached top of thread pool thread', { error: e.inspect }) + Temporal::ErrorHandler.handle(e, @config) + rescue Exception => ex + Temporal.logger.error('Exception reached top of thread pool thread', { error: ex.inspect }) + Temporal::ErrorHandler.handle(ex, @config) + raise + end end rescue CancelError end diff --git a/lib/temporal/thread_pool.rb b/lib/temporal/thread_pool.rb index 7ff2a2fe..3febbf82 100644 --- a/lib/temporal/thread_pool.rb +++ b/lib/temporal/thread_pool.rb @@ -11,11 +11,12 @@ module Temporal class ThreadPool attr_reader :size - def initialize(size, metrics_tags) + def initialize(size, config, metrics_tags) @size = size @metrics_tags = metrics_tags @queue = Queue.new @mutex = Mutex.new + @config = config @availability = ConditionVariable.new @available_threads = size @pool = Array.new(size) do |_i| @@ -55,10 +56,21 @@ def shutdown EXIT_SYMBOL = :exit def poll + Thread.current.abort_on_exception = true + catch(EXIT_SYMBOL) do loop do job = @queue.pop - job.call + begin + job.call + rescue StandardError => e + Temporal.logger.error('Error reached top of thread pool thread', { error: e.inspect }) + Temporal::ErrorHandler.handle(e, @config) + rescue Exception => ex + Temporal.logger.error('Exception reached top of thread pool thread', { error: ex.inspect }) + Temporal::ErrorHandler.handle(ex, @config) + raise + end @mutex.synchronize do @available_threads += 1 @availability.signal diff --git a/lib/temporal/workflow/poller.rb b/lib/temporal/workflow/poller.rb index 0ec5aaba..89fed958 100644 --- a/lib/temporal/workflow/poller.rb +++ b/lib/temporal/workflow/poller.rb @@ -63,6 +63,9 @@ def shutting_down? end def poll_loop + # Prevent the poller thread from silently dying + Thread.current.abort_on_exception = true + last_poll_time = Time.now metrics_tags = { namespace: namespace, task_queue: task_queue }.freeze @@ -117,6 +120,7 @@ def process(task) def thread_pool @thread_pool ||= ThreadPool.new( options[:thread_pool_size], + @config, { pool_name: 'workflow_task_poller', namespace: namespace, diff --git a/spec/unit/lib/temporal/activity/context_spec.rb b/spec/unit/lib/temporal/activity/context_spec.rb index 0ebe7020..e9bc274b 100644 --- a/spec/unit/lib/temporal/activity/context_spec.rb +++ b/spec/unit/lib/temporal/activity/context_spec.rb @@ -3,23 +3,23 @@ require 'temporal/scheduled_thread_pool' describe Temporal::Activity::Context do - let(:client) { instance_double('Temporal::Client::GRPCClient') } + let(:connection) { instance_double('Temporal::Connection::GRPC') } let(:metadata_hash) { Fabricate(:activity_metadata).to_h } let(:metadata) { Temporal::Metadata::Activity.new(**metadata_hash) } let(:config) { Temporal::Configuration.new } let(:task_token) { SecureRandom.uuid } - let(:heartbeat_thread_pool) { Temporal::ScheduledThreadPool.new(1, {}) } + let(:heartbeat_thread_pool) { Temporal::ScheduledThreadPool.new(1, config, {}) } let(:heartbeat_response) { Fabricate(:api_record_activity_heartbeat_response) } - subject { described_class.new(client, metadata, config, heartbeat_thread_pool) } + subject { described_class.new(connection, metadata, config, heartbeat_thread_pool) } describe '#heartbeat' do - before { allow(client).to receive(:record_activity_task_heartbeat).and_return(heartbeat_response) } + before { allow(connection).to receive(:record_activity_task_heartbeat).and_return(heartbeat_response) } it 'records heartbeat' do subject.heartbeat - expect(client) + expect(connection) .to have_received(:record_activity_task_heartbeat) .with(namespace: metadata.namespace, task_token: metadata.task_token, details: nil) end @@ -27,7 +27,7 @@ it 'records heartbeat with details' do subject.heartbeat(foo: :bar) - expect(client) + expect(connection) .to have_received(:record_activity_task_heartbeat) .with(namespace: metadata.namespace, task_token: metadata.task_token, details: { foo: :bar }) end @@ -48,7 +48,7 @@ subject.heartbeat(iteration: i) end - expect(client) + expect(connection) .to have_received(:record_activity_task_heartbeat) .with(namespace: metadata.namespace, task_token: metadata.task_token, details: { iteration: 0 }) .once @@ -67,7 +67,7 @@ # Shutdown to drain remaining threads heartbeat_thread_pool.shutdown - expect(client) + expect(connection) .to have_received(:record_activity_task_heartbeat) .ordered .with(namespace: metadata.namespace, task_token: metadata.task_token, details: { iteration: 1 }) @@ -80,7 +80,7 @@ config.timeouts = { max_heartbeat_throttle_interval: 0 } subject.heartbeat - expect(client) + expect(connection) .to have_received(:record_activity_task_heartbeat) .with(namespace: metadata.namespace, task_token: metadata.task_token, details: nil) @@ -90,17 +90,18 @@ end describe '#last_heartbeat_throttled' do - before { allow(client).to receive(:record_activity_task_heartbeat).and_return(heartbeat_response) } + before { allow(connection).to receive(:record_activity_task_heartbeat).and_return(heartbeat_response) } - let(:metadata_hash) { Fabricate(:activity_metadata, heartbeat_timeout: 10).to_h } + let(:metadata_hash) { Fabricate(:activity_metadata, heartbeat_timeout: 3).to_h } it 'true when throttled, false when not' do subject.heartbeat(iteration: 1) expect(subject.last_heartbeat_throttled).to be(false) subject.heartbeat(iteration: 2) expect(subject.last_heartbeat_throttled).to be(true) - subject.heartbeat(iteration: 3) - expect(subject.last_heartbeat_throttled).to be(true) + + # Shutdown to drain remaining threads + heartbeat_thread_pool.shutdown end end @@ -120,7 +121,7 @@ describe '#async?' do subject { context.async? } - let(:context) { described_class.new(client, metadata, nil, nil) } + let(:context) { described_class.new(connection, metadata, nil, nil) } context 'when context is sync' do it { is_expected.to eq(false) } diff --git a/spec/unit/lib/temporal/activity/task_processor_spec.rb b/spec/unit/lib/temporal/activity/task_processor_spec.rb index afb1344a..41ea952f 100644 --- a/spec/unit/lib/temporal/activity/task_processor_spec.rb +++ b/spec/unit/lib/temporal/activity/task_processor_spec.rb @@ -22,12 +22,15 @@ let(:connection) { instance_double('Temporal::Connection::GRPC') } let(:middleware_chain) { Temporal::Middleware::Chain.new } let(:config) { Temporal::Configuration.new } - let(:heartbeat_thread_pool) { Temporal::ScheduledThreadPool.new(2, {}) } + let(:heartbeat_thread_pool) { Temporal::ScheduledThreadPool.new(2, config, {}) } let(:input) { %w[arg1 arg2] } describe '#process' do let(:heartbeat_check_scheduled) { nil } - let(:context) { instance_double('Temporal::Activity::Context', async?: false, heartbeat_check_scheduled: heartbeat_check_scheduled) } + let(:context) do + instance_double('Temporal::Activity::Context', async?: false, + heartbeat_check_scheduled: heartbeat_check_scheduled) + end before do allow(Temporal::Connection) @@ -38,7 +41,8 @@ .to receive(:generate_activity_metadata) .with(task, namespace) .and_return(metadata) - allow(Temporal::Activity::Context).to receive(:new).with(connection, metadata, config, heartbeat_thread_pool).and_return(context) + allow(Temporal::Activity::Context).to receive(:new).with(connection, metadata, config, + heartbeat_thread_pool).and_return(context) allow(connection).to receive(:respond_activity_task_completed) allow(connection).to receive(:respond_activity_task_failed) @@ -119,7 +123,9 @@ end context 'when there is an outstanding scheduled heartbeat' do - let(:heartbeat_check_scheduled) { Temporal::ScheduledThreadPool::ScheduledItem.new(id: :foo, canceled: false) } + let(:heartbeat_check_scheduled) do + Temporal::ScheduledThreadPool::ScheduledItem.new(id: :foo, canceled: false) + end it 'it gets canceled' do subject.process diff --git a/spec/unit/lib/temporal/scheduled_thread_pool_spec.rb b/spec/unit/lib/temporal/scheduled_thread_pool_spec.rb index 74f73018..56fab272 100644 --- a/spec/unit/lib/temporal/scheduled_thread_pool_spec.rb +++ b/spec/unit/lib/temporal/scheduled_thread_pool_spec.rb @@ -5,9 +5,10 @@ allow(Temporal.metrics).to receive(:gauge) end + let(:config) { Temporal::Configuration.new } let(:size) { 2 } let(:tags) { { foo: 'bar', bat: 'baz' } } - let(:thread_pool) { described_class.new(size, tags) } + let(:thread_pool) { described_class.new(size, config, tags) } describe '#schedule' do it 'executes one task with zero delay on a thread and exits' do @@ -39,6 +40,38 @@ expect(answers.pop).to eq(:first) expect(answers.pop).to eq(:second) end + + it 'error does not exit' do + times = 0 + + thread_pool.schedule(:foo, 0) do + times += 1 + raise 'foo' + end + + thread_pool.shutdown + + expect(times).to eq(1) + end + + it 'exception does exit' do + Thread.report_on_exception = false + times = 0 + + thread_pool.schedule(:foo, 0) do + times += 1 + raise Exception, 'crash' + end + + begin + thread_pool.shutdown + raise 'should not be reached' + rescue Exception => e + 'ok' + end + + expect(times).to eq(1) + end end describe '#cancel' do diff --git a/spec/unit/lib/temporal/thread_pool_spec.rb b/spec/unit/lib/temporal/thread_pool_spec.rb index 20659367..5de5b03a 100644 --- a/spec/unit/lib/temporal/thread_pool_spec.rb +++ b/spec/unit/lib/temporal/thread_pool_spec.rb @@ -5,9 +5,10 @@ allow(Temporal.metrics).to receive(:gauge) end + let(:config) { Temporal::Configuration.new } let(:size) { 2 } let(:tags) { { foo: 'bar', bat: 'baz' } } - let(:thread_pool) { described_class.new(size, tags) } + let(:thread_pool) { described_class.new(size, config, tags) } describe '#new' do it 'executes one task on a thread and exits' do @@ -22,6 +23,37 @@ expect(times).to eq(1) end + it 'handles error without exiting' do + times = 0 + + thread_pool.schedule do + times += 1 + raise 'failure' + end + + thread_pool.shutdown + + expect(times).to eq(1) + end + + it 'handles exception with exiting' do + Thread.report_on_exception = false + times = 0 + + thread_pool.schedule do + times += 1 + raise Exception, 'crash' + end + + begin + thread_pool.shutdown + rescue Exception => e + 'ok' + end + + expect(times).to eq(1) + end + it 'reports thread available metrics' do thread_pool.schedule do end