Skip to content

Commit

Permalink
timer: timer inner loop auto recover from panic (pingcap#46412)
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao authored Aug 28, 2023
1 parent e7e9fb6 commit c7d5507
Show file tree
Hide file tree
Showing 6 changed files with 557 additions and 121 deletions.
4 changes: 3 additions & 1 deletion timer/runtime/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
deps = [
"//timer/api",
"//timer/metrics",
"//util",
"//util/logutil",
"//util/timeutil",
"@com_github_google_uuid//:uuid",
Expand All @@ -34,10 +35,11 @@ go_test(
embed = [":runtime"],
flaky = True,
race = "on",
shard_count = 20,
shard_count = 23,
deps = [
"//testkit/testsetup",
"//timer/api",
"//util",
"//util/mock",
"//util/timeutil",
"@com_github_google_uuid//:uuid",
Expand Down
31 changes: 31 additions & 0 deletions timer/runtime/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/tidb/testkit/testsetup"
"github.com/pingcap/tidb/timer/api"
"github.com/pingcap/tidb/util"
"github.com/stretchr/testify/mock"
"go.uber.org/goleak"
)
Expand All @@ -38,6 +39,28 @@ type mockHook struct {
stopped chan struct{}
}

type newHookFn struct {
mock.Mock
}

func (n *newHookFn) OnFuncCall() *mock.Call {
return n.On("Func")
}

func (n *newHookFn) Func() api.Hook {
args := n.Called()
if v := args.Get(0); v != nil {
return v.(api.Hook)
}
return nil
}

func onlyOnceNewHook(hook api.Hook) func() api.Hook {
n := newHookFn{}
n.OnFuncCall().Return(hook).Once()
return n.Func
}

func newMockHook() *mockHook {
return &mockHook{
started: make(chan struct{}),
Expand Down Expand Up @@ -118,6 +141,14 @@ func waitDone(obj any, timeout time.Duration) {
ch = o
case <-chan struct{}:
ch = o
case *util.WaitGroupWrapper:
newCh := make(chan struct{})
ch = newCh

go func() {
o.Wait()
close(newCh)
}()
case *sync.WaitGroup:
newCh := make(chan struct{})
ch = newCh
Expand Down
73 changes: 57 additions & 16 deletions timer/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/google/uuid"
"github.com/pingcap/tidb/timer/api"
"github.com/pingcap/tidb/timer/metrics"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand Down Expand Up @@ -64,6 +65,7 @@ func NewTimerRuntimeBuilder(groupID string, store *api.TimerStore) *TimerRuntime
// metrics
fullRefreshTimerCounter: metrics.TimerScopeCounter(fmt.Sprintf("runtime.%s", groupID), "full_refresh_timers"),
partialRefreshTimerCounter: metrics.TimerScopeCounter(fmt.Sprintf("runtime.%s", groupID), "partial_refresh_timers"),
retryLoopWait: 10 * time.Second,
},
}
}
Expand Down Expand Up @@ -91,7 +93,7 @@ type TimerGroupRuntime struct {
mu sync.Mutex
ctx context.Context
cancel func()
wg sync.WaitGroup
wg util.WaitGroupWrapper
logger *zap.Logger
cache *timersCache

Expand All @@ -108,6 +110,8 @@ type TimerGroupRuntime struct {
// metrics
fullRefreshTimerCounter prometheus.Counter
partialRefreshTimerCounter prometheus.Counter
// retryLoopWait indicates the wait time before restarting the loop after panic.
retryLoopWait time.Duration
}

// Start starts the TimerGroupRuntime
Expand All @@ -118,9 +122,10 @@ func (rt *TimerGroupRuntime) Start() {
return
}

rt.wg.Add(1)
rt.initCtx()
go rt.loop()
rt.wg.Run(func() {
withRecoverUntil(rt.ctx, rt.loop)
})
}

// Running returns whether the runtime is running
Expand All @@ -145,12 +150,16 @@ func (rt *TimerGroupRuntime) Stop() {
rt.wg.Wait()
}

func (rt *TimerGroupRuntime) loop() {
rt.logger.Info("TimerGroupRuntime loop started")
defer func() {
rt.logger.Info("TimerGroupRuntime loop exit")
rt.wg.Done()
}()
func (rt *TimerGroupRuntime) loop(totalPanic uint64) {
if totalPanic > 0 {
sleep(rt.ctx, rt.retryLoopWait)
rt.logger.Info("TimerGroupRuntime loop resumed from panic",
zap.Uint64("totalPanic", totalPanic),
zap.Duration("delay", rt.retryLoopWait))
} else {
rt.logger.Info("TimerGroupRuntime loop started")
}
defer rt.logger.Info("TimerGroupRuntime loop exit")

fullRefreshTimersTicker := time.NewTicker(fullRefreshTimersInterval)
defer fullRefreshTimersTicker.Stop()
Expand All @@ -167,7 +176,10 @@ func (rt *TimerGroupRuntime) loop() {
batchHandleResponsesTimer := time.NewTimer(batchProcessWatchRespInterval)
defer batchHandleResponsesTimer.Stop()

watchCh := rt.createWatchTimerChan()
watchCtx, cancelWatch := context.WithCancel(rt.ctx)
defer cancelWatch()

watchCh := rt.createWatchTimerChan(watchCtx)
batchResponses := make([]api.WatchTimerResponse, 0, 1)

var lastTryTriggerTime time.Time
Expand Down Expand Up @@ -211,7 +223,7 @@ func (rt *TimerGroupRuntime) loop() {
}
case <-reWatchTimer.C:
if watchCh == idleWatchChan {
watchCh = rt.createWatchTimerChan()
watchCh = rt.createWatchTimerChan(watchCtx)
}
}
}
Expand Down Expand Up @@ -385,13 +397,13 @@ func (rt *TimerGroupRuntime) partialRefreshTimers(timerIDs map[string]struct{})
return rt.cache.partialBatchUpdateTimers(timers)
}

func (rt *TimerGroupRuntime) createWatchTimerChan() api.WatchTimerChan {
func (rt *TimerGroupRuntime) createWatchTimerChan(ctx context.Context) api.WatchTimerChan {
watchSupported := rt.store.WatchSupported()
rt.logger.Info("create watch chan if possible for timer runtime",
zap.Bool("storeSupportWatch", watchSupported),
)
if watchSupported {
return rt.store.Watch(rt.ctx)
return rt.store.Watch(ctx)
}
return idleWatchChan
}
Expand Down Expand Up @@ -434,12 +446,15 @@ func (rt *TimerGroupRuntime) ensureWorker(hookClass string) (*hookWorker, bool)
return nil, false
}

var hook api.Hook
var hookFn func() api.Hook
if factory != nil {
hook = factory(hookClass, rt.cli)
cli := rt.cli
hookFn = func() api.Hook {
return factory(hookClass, cli)
}
}

worker = newHookWorker(rt.ctx, &rt.wg, rt.groupID, hookClass, hook, rt.nowFunc)
worker = newHookWorker(rt.ctx, &rt.wg, rt.groupID, hookClass, hookFn, rt.nowFunc)
rt.workers[hookClass] = worker
return worker, true
}
Expand Down Expand Up @@ -470,3 +485,29 @@ func resetTimer(t *time.Timer, interval time.Duration) {
}
t.Reset(interval)
}

func withRecoverUntil(ctx context.Context, fn func(uint64)) {
var i uint64
success := false
for ctx.Err() == nil && !success {
util.WithRecovery(func() {
fn(i)
}, func(r interface{}) {
if r == nil {
success = true
}
})
i++
}
}

func sleep(ctx context.Context, d time.Duration) {
if ctx == nil {
ctx = context.Background()
}

select {
case <-ctx.Done():
case <-time.After(d):
}
}
72 changes: 70 additions & 2 deletions timer/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package runtime
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -654,7 +655,7 @@ func TestCreateWatchTimerChan(t *testing.T) {
mockCore.On("Watch", mock.Anything).Return(retCh).Once()
mockCore.On("WatchSupported").Return(true).Once()

got := runtime.createWatchTimerChan()
got := runtime.createWatchTimerChan(context.Background())
require.True(t, got != idleWatchChan)
select {
case resp, ok := <-got:
Expand All @@ -667,7 +668,7 @@ func TestCreateWatchTimerChan(t *testing.T) {
mockCore.AssertExpectations(t)

mockCore.On("WatchSupported").Return(false).Once()
got = runtime.createWatchTimerChan()
got = runtime.createWatchTimerChan(context.Background())
require.True(t, got == idleWatchChan)
select {
case <-got:
Expand Down Expand Up @@ -879,3 +880,70 @@ func TestTimerFullProcess(t *testing.T) {
require.Equal(t, onSchedTimer.Load(), timer)
onSchedTimer.Store(nil)
}

func TestTimerRuntimeLoopPanicRecover(t *testing.T) {
mockCore, mockStore := newMockStore()
rt := NewTimerRuntimeBuilder("g1", mockStore).Build()

// start and panic two times, then normal
started := make(chan struct{})
mockCore.On("WatchSupported").Return(false).Times(3)
mockCore.On("List", mock.Anything, mock.Anything).Panic("store panic").Twice()
mockCore.On("List", mock.Anything, mock.Anything).Return([]*api.TimerRecord(nil), nil).Once().Run(func(args mock.Arguments) {
close(started)
})
rt.retryLoopWait = time.Millisecond
rt.Start()
waitDone(started, 5*time.Second)
mockCore.AssertExpectations(t)

// normal stop
stopped := make(chan struct{})
go func() {
rt.Stop()
close(stopped)
}()
waitDone(stopped, 5*time.Second)
mockCore.AssertExpectations(t)

// start and panic always
rt = NewTimerRuntimeBuilder("g1", mockStore).Build()
mockCore.On("WatchSupported").Return(false)
mockCore.On("List", mock.Anything, mock.Anything).Panic("store panic")
rt.retryLoopWait = time.Millisecond
rt.Start()
time.Sleep(10 * time.Millisecond)

// can also stop
stopped = make(chan struct{})
go func() {
rt.Stop()
close(stopped)
}()
waitDone(stopped, 5*time.Second)
mockCore.AssertExpectations(t)

// stop should stop immediately
mockCore, mockStore = newMockStore()
rt = NewTimerRuntimeBuilder("g1", mockStore).Build()
started = make(chan struct{})
var once sync.Once
mockCore.On("WatchSupported").Return(false).Once()
mockCore.On("List", mock.Anything, mock.Anything).Once().Run(func(args mock.Arguments) {
once.Do(func() {
close(started)
})
panic("store panic")
})
rt.retryLoopWait = time.Minute
rt.Start()
waitDone(started, 5*time.Second)
time.Sleep(time.Millisecond)
stopped = make(chan struct{})
go func() {
rt.Stop()
close(stopped)
}()
waitDone(stopped, 5*time.Second)
mockCore.AssertExpectations(t)
}
Loading

0 comments on commit c7d5507

Please sign in to comment.