Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix errcheck in service/history/shard #3755

Merged
merged 1 commit into from
Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.uber.org/multierr"
"golang.org/x/exp/maps"

"go.temporal.io/server/api/adminservice/v1"
Expand Down Expand Up @@ -1436,8 +1437,7 @@ func (s *ContextImpl) handleReadError(err error) error {
case *persistence.ShardOwnershipLostError:
// Shard is stolen, trigger shutdown of history engine.
// Handling of max read level doesn't matter here.
s.transition(contextRequestStop{})
return err
return multierr.Combine(err, s.transition(contextRequestStop{}))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can/should combine the errors here (and two more places below).

  • Many places in our code path is still checking error type directly not via errors.As, so those places might break.
  • To me, the error from s.transition is internal to the shard context impl and upper layer should not know.

cc @dnr Would you mind also take a look?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this PR doesn't make much sense. I think you should revert these changes.

The error returned from transition is really only meaningful for contextRequestAcquired, the others will always return nil and the result doesn't have to be checked. I know errcheck isn't smart enough to figure that out but we can just manually ignore them.

As Yichao said, transition already logs so callers should not.

And I agree the multierr stuff is not appropriate here.


default:
return err
Expand Down Expand Up @@ -1471,8 +1471,7 @@ func (s *ContextImpl) handleWriteErrorAndUpdateMaxReadLevelLocked(err error, new
case *persistence.ShardOwnershipLostError:
// Shard is stolen, trigger shutdown of history engine.
// Handling of max read level doesn't matter here.
s.transition(contextRequestStop{})
return err
return multierr.Combine(err, s.transition(contextRequestStop{}))

default:
// We have no idea if the write failed or will eventually make it to persistence. Try to re-acquire
Expand All @@ -1481,8 +1480,7 @@ func (s *ContextImpl) handleWriteErrorAndUpdateMaxReadLevelLocked(err error, new
// reliably check the outcome by performing a read. If we fail, we'll shut down the shard.
// Note that reacquiring the shard will cause the max read level to be updated
// to the new range (i.e. past newMaxReadLevel).
s.transition(contextRequestLost{})
return err
return multierr.Combine(err, s.transition(contextRequestLost{}))
}
}

Expand All @@ -1505,18 +1503,24 @@ func (s *ContextImpl) createEngine() Engine {

// start should only be called by the controller.
func (s *ContextImpl) start() {
s.transition(contextRequestAcquire{})
if err := s.transition(contextRequestAcquire{}); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s.transition() already emits logs if the transition is invalid I believe.

s.contextTaggedLogger.Error("Failed to start shard", tag.Error(err))
}
}

func (s *ContextImpl) Unload() {
s.transition(contextRequestStop{})
if err := s.transition(contextRequestStop{}); err != nil {
s.contextTaggedLogger.Error("Failed to unload shard", tag.Error(err))
}
}

// finishStop should only be called by the controller.
func (s *ContextImpl) finishStop() {
// After this returns, engineFuture.Set may not be called anymore, so if we don't get see
// an Engine here, we won't ever have one.
s.transition(contextRequestFinishStop{})
if err := s.transition(contextRequestFinishStop{}); err != nil {
s.contextTaggedLogger.Error("Failed to stop shard", tag.Error(err))
}

// use a context that we know is cancelled so that this doesn't block
engine, _ := s.engineFuture.Get(s.lifecycleCtx)
Expand Down Expand Up @@ -1969,7 +1973,9 @@ func (s *ContextImpl) acquireShard() {

// On any error, initiate shutting down the shard. If we already changed state
// because we got a ShardOwnershipLostError, this won't do anything.
s.transition(contextRequestStop{})
if err := s.transition(contextRequestStop{}); err != nil {
s.contextTaggedLogger.Error("Error stopping shard", tag.Error(err))
}
}
}

Expand Down
9 changes: 6 additions & 3 deletions service/history/shard/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,17 @@ func (s *contextSuite) TestTimerMaxReadLevelUpdate_SingleProcessor() {
s.timeSource.Update(now)

// make sure the scheduledTaskMaxReadLevelMap has value for both current cluster and alternative cluster
s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, false)
s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestAlternativeClusterName, false)
_, err := s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, false)
s.NoError(err)
_, err = s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestAlternativeClusterName, false)
s.NoError(err)

now = time.Now().Add(time.Minute)
s.timeSource.Update(now)

// update in single processor mode
s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, true)
_, err = s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, true)
s.NoError(err)
scheduledTaskMaxReadLevelMap := s.mockShard.scheduledTaskMaxReadLevelMap
s.Len(scheduledTaskMaxReadLevelMap, 2)
s.True(scheduledTaskMaxReadLevelMap[cluster.TestCurrentClusterName].After(now))
Expand Down
4 changes: 3 additions & 1 deletion service/history/shard/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,9 @@ func (s *controllerSuite) TestShardControllerFuzz() {
shardID := int32(rand.Intn(int(s.config.NumberOfShards))) + 1
switch rand.Intn(5) {
case 0:
s.shardController.GetShardByID(shardID)
if _, err := s.shardController.GetShardByID(shardID); err != nil {
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returning defeats the purpose of this code, which is to generate load on the shard controller. this shouldn't return an error but if it does the worker should continue to run.

}
case 1:
if shard, err := s.shardController.GetShardByID(shardID); err == nil {
_, _ = shard.GetEngine(ctx)
Expand Down