Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert #3755 #3842

Merged
merged 1 commit into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 10 additions & 16 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.uber.org/multierr"
"golang.org/x/exp/maps"

"go.temporal.io/server/api/adminservice/v1"
Expand Down Expand Up @@ -1446,7 +1445,8 @@ func (s *ContextImpl) handleReadError(err error) error {
case *persistence.ShardOwnershipLostError:
// Shard is stolen, trigger shutdown of history engine.
// Handling of max read level doesn't matter here.
return multierr.Combine(err, s.transition(contextRequestStop{}))
_ = s.transition(contextRequestStop{})
return err

default:
return err
Expand Down Expand Up @@ -1480,7 +1480,8 @@ func (s *ContextImpl) handleWriteErrorAndUpdateMaxReadLevelLocked(err error, new
case *persistence.ShardOwnershipLostError:
// Shard is stolen, trigger shutdown of history engine.
// Handling of max read level doesn't matter here.
return multierr.Combine(err, s.transition(contextRequestStop{}))
_ = s.transition(contextRequestStop{})
return err

default:
// We have no idea if the write failed or will eventually make it to persistence. Try to re-acquire
Expand All @@ -1489,7 +1490,8 @@ func (s *ContextImpl) handleWriteErrorAndUpdateMaxReadLevelLocked(err error, new
// reliably check the outcome by performing a read. If we fail, we'll shut down the shard.
// Note that reacquiring the shard will cause the max read level to be updated
// to the new range (i.e. past newMaxReadLevel).
return multierr.Combine(err, s.transition(contextRequestLost{}))
_ = s.transition(contextRequestLost{})
return err
}
}

Expand All @@ -1512,24 +1514,18 @@ func (s *ContextImpl) createEngine() Engine {

// start should only be called by the controller.
func (s *ContextImpl) start() {
if err := s.transition(contextRequestAcquire{}); err != nil {
s.contextTaggedLogger.Error("Failed to start shard", tag.Error(err))
}
_ = s.transition(contextRequestAcquire{})
}

func (s *ContextImpl) Unload() {
if err := s.transition(contextRequestStop{}); err != nil {
s.contextTaggedLogger.Error("Failed to unload shard", tag.Error(err))
}
_ = s.transition(contextRequestStop{})
}

// finishStop should only be called by the controller.
func (s *ContextImpl) finishStop() {
// After this returns, engineFuture.Set may not be called anymore, so if we don't get see
// an Engine here, we won't ever have one.
if err := s.transition(contextRequestFinishStop{}); err != nil {
s.contextTaggedLogger.Error("Failed to stop shard", tag.Error(err))
}
_ = s.transition(contextRequestFinishStop{})

// use a context that we know is cancelled so that this doesn't block
engine, _ := s.engineFuture.Get(s.lifecycleCtx)
Expand Down Expand Up @@ -2034,9 +2030,7 @@ func (s *ContextImpl) acquireShard() {

// On any error, initiate shutting down the shard. If we already changed state
// because we got a ShardOwnershipLostError, this won't do anything.
if err := s.transition(contextRequestStop{}); err != nil {
s.contextTaggedLogger.Error("Error stopping shard", tag.Error(err))
}
_ = s.transition(contextRequestStop{})
}
}

Expand Down
4 changes: 1 addition & 3 deletions service/history/shard/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,9 +790,7 @@ func (s *controllerSuite) TestShardControllerFuzz() {
shardID := int32(rand.Intn(int(s.config.NumberOfShards))) + 1
switch rand.Intn(5) {
case 0:
if _, err := s.shardController.GetShardByID(shardID); err != nil {
return err
}
_, _ = s.shardController.GetShardByID(shardID)
case 1:
if shard, err := s.shardController.GetShardByID(shardID); err == nil {
_, _ = shard.GetEngine(ctx)
Expand Down