Skip to content

Commit

Permalink
Add an archival.Module to be used in both tests and prod
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Dec 21, 2022
1 parent 11b808a commit 207f2e9
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 6 deletions.
2 changes: 2 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,8 @@ const (
ArchivalProcessorArchiveDelay = "history.archivalProcessorArchiveDelay"
// ArchivalProcessorRetryWarningLimit is the number of times an archival task may be retried before we log a warning
ArchivalProcessorRetryWarningLimit = "history.archivalProcessorRetryLimitWarning"
// ArchivalBackendMaxRPS is the maximum rate of requests per second to the archival backend
ArchivalBackendMaxRPS = "history.archivalBackendMaxRPS"

// ReplicatorTaskBatchSize is batch size for ReplicatorProcessor
ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize"
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/describeworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func Invoke(
}
}

relocatableAttributes, err := workflow.NewRelocatableAttributesFetcher(persistenceVisibilityMgr).Fetch(ctx, mutableState)
relocatableAttributes, err := workflow.RelocatableAttributesFetcherProvider(persistenceVisibilityMgr).Fetch(ctx, mutableState)
if err != nil {
shard.GetLogger().Error(
"Failed to fetch relocatable attributes",
Expand Down
35 changes: 34 additions & 1 deletion service/history/archival/archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/fx"
"go.uber.org/multierr"

carchiver "go.temporal.io/server/common/archiver"
Expand All @@ -42,6 +43,7 @@ import (
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/common/testing/mocksdk"
"go.temporal.io/server/service/history/configs"
)

func TestArchiver(t *testing.T) {
Expand Down Expand Up @@ -280,7 +282,38 @@ func TestArchiver(t *testing.T) {
rateLimiter := quotas.NewMockRateLimiter(controller)
rateLimiter.EXPECT().WaitN(gomock.Any(), 2).Return(c.RateLimiterWaitErr)

archiver := NewArchiver(archiverProvider, logRecorder, metricsHandler, rateLimiter)
// we need this channel to get the Archiver which is created asynchronously
archivers := make(chan Archiver, 1)
// we make an app here so that we can test that the Module is working as intended
app := fx.New(
fx.Supply(fx.Annotate(archiverProvider, fx.As(new(provider.ArchiverProvider)))),
fx.Supply(fx.Annotate(logRecorder, fx.As(new(log.Logger)))),
fx.Supply(fx.Annotate(metricsHandler, fx.As(new(metrics.Handler)))),
fx.Supply(&configs.Config{
ArchivalBackendMaxRPS: func() float64 {
return 42.0
},
}),
Module,
fx.Decorate(func(rl quotas.RateLimiter) quotas.RateLimiter {
// we need to decorate the rate limiter so that we can use the mock
// we also verify that the rate being used is equal to the one in the config
assert.Equal(t, 42.0, rl.Rate())
return rateLimiter
}),
fx.Invoke(func(a Archiver) {
// after all parameters are provided, we get the Archiver and put it in the channel
// so that we can use it in the test
archivers <- a
}),
)
require.NoError(t, app.Err())
// we need to start the app for fx.Invoke to be called, so that we can get the Archiver
require.NoError(t, app.Start(ctx))
defer func() {
require.NoError(t, app.Stop(ctx))
}()
archiver := <-archivers
_, err = archiver.Archive(ctx, &Request{
HistoryURI: historyURI,
VisibilityURI: visibilityURI,
Expand Down
39 changes: 39 additions & 0 deletions service/history/archival/fx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package archival

import (
"go.uber.org/fx"

"go.temporal.io/server/common/quotas"
"go.temporal.io/server/service/history/configs"
)

var Module = fx.Options(
fx.Provide(NewArchiver),
fx.Provide(func(config *configs.Config) quotas.RateLimiter {
return quotas.NewDefaultOutgoingRateLimiter(quotas.RateFn(config.ArchivalBackendMaxRPS))
}),
)
2 changes: 1 addition & 1 deletion service/history/archival_queue_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
a,
shardContext,
workflowCache,
workflow.NewRelocatableAttributesFetcher(visibilityManager),
workflow.RelocatableAttributesFetcherProvider(visibilityManager),
p.MetricsHandler,
logger,
)
Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ type Config struct {
ArchivalProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
ArchivalProcessorArchiveDelay dynamicconfig.DurationPropertyFn
ArchivalProcessorRetryWarningLimit dynamicconfig.IntPropertyFn
ArchivalBackendMaxRPS dynamicconfig.FloatPropertyFn
}

const (
Expand Down Expand Up @@ -525,6 +526,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
ArchivalProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorPollBackoffInterval, 5*time.Second),
ArchivalProcessorArchiveDelay: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorArchiveDelay, 5*time.Minute),
ArchivalProcessorRetryWarningLimit: dc.GetIntProperty(dynamicconfig.ArchivalProcessorRetryWarningLimit, 100),
ArchivalBackendMaxRPS: dc.GetFloat64Property(dynamicconfig.ArchivalBackendMaxRPS, 10000.0),
}

return cfg
Expand Down
2 changes: 2 additions & 0 deletions service/history/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/service"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/archival"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/events"
Expand All @@ -68,6 +69,7 @@ var Module = fx.Options(
workflow.Module,
shard.Module,
cache.Module,
archival.Module,
fx.Provide(dynamicconfig.NewCollection),
fx.Provide(ConfigProvider), // might be worth just using provider for configs.Config directly
fx.Provide(RetryableInterceptorProvider),
Expand Down
1 change: 1 addition & 0 deletions service/history/workflow/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ import (

var Module = fx.Options(
fx.Populate(&taskGeneratorProvider),
fx.Provide(RelocatableAttributesFetcherProvider),
)
4 changes: 2 additions & 2 deletions service/history/workflow/relocatable_attributes_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type RelocatableAttributesFetcher interface {
) (*RelocatableAttributes, error)
}

// NewRelocatableAttributesFetcher creates a new instance of a RelocatableAttributesFetcher.
// RelocatableAttributesFetcherProvider provides a new instance of a RelocatableAttributesFetcher.
// The manager.VisibilityManager parameter is used to fetch the relocatable attributes from the persistence backend iff
// we already moved them there out from the mutable state.
// The visibility manager is not used if the relocatable attributes are still in the mutable state.
Expand All @@ -52,7 +52,7 @@ type RelocatableAttributesFetcher interface {
// Currently, there is no cache, but you may provide a manager.VisibilityManager that supports caching to this function
// safely.
// TODO: Add a cache around the visibility manager for the relocatable attributes.
func NewRelocatableAttributesFetcher(
func RelocatableAttributesFetcherProvider(
visibilityManager manager.VisibilityManager,
) RelocatableAttributesFetcher {
return &relocatableAttributesFetcher{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestRelocatableAttributesFetcher_Fetch(t *testing.T) {
}
ctx := context.Background()

fetcher := NewRelocatableAttributesFetcher(visibilityManager)
fetcher := RelocatableAttributesFetcherProvider(visibilityManager)
info, err := fetcher.Fetch(ctx, mutableState)

if c.ExpectedErr != nil {
Expand Down

0 comments on commit 207f2e9

Please sign in to comment.