diff --git a/src/aggregator/aggregator/aggregator.go b/src/aggregator/aggregator/aggregator.go index fe0ca30f0a..fba94f4506 100644 --- a/src/aggregator/aggregator/aggregator.go +++ b/src/aggregator/aggregator/aggregator.go @@ -52,6 +52,7 @@ import ( const ( uninitializedCutoverNanos = math.MinInt64 uninitializedShardSetID = 0 + placementCheckInterval = 10 * time.Second ) var ( @@ -174,10 +175,50 @@ func (agg *aggregator) Open() error { agg.wg.Add(1) go agg.tick() } + + agg.wg.Add(1) + go agg.placementTick() agg.state = aggregatorOpen return nil } +func (agg *aggregator) placementTick() { + defer agg.wg.Done() + + ticker := time.NewTicker(placementCheckInterval) + defer ticker.Stop() + + m := agg.metrics.placement + + for { + select { + case <-ticker.C: + case <-agg.placementManager.C(): + case <-agg.doneCh: + return + } + + agg.RLock() + placement, err := agg.placementManager.Placement() + if err != nil { + m.updateFailures.Inc(1) + continue + } + + if !agg.shouldProcessPlacementWithLock(placement) { + agg.RUnlock() + continue + } + agg.RUnlock() + + agg.Lock() + if err := agg.processPlacementWithLock(placement); err != nil { + m.updateFailures.Inc(1) + } + agg.Unlock() + } +} + func (agg *aggregator) AddUntimed( metric unaggregated.MetricUnion, metadatas metadata.StagedMetadatas, @@ -280,12 +321,6 @@ func (agg *aggregator) AddPassthrough( return nil } - pw, err := agg.passWriter() - if err != nil { - agg.metrics.addPassthrough.ReportError(err) - return err - } - mp := aggregated.ChunkedMetricWithStoragePolicy{ ChunkedMetric: aggregated.ChunkedMetric{ ChunkedID: id.ChunkedID{ @@ -298,7 +333,14 @@ func (agg *aggregator) AddPassthrough( StoragePolicy: storagePolicy, } - if err := pw.Write(mp); err != nil { + agg.RLock() + defer agg.RUnlock() + + if agg.state != aggregatorOpen { + return errAggregatorNotOpenOrClosed + } + + if err := agg.passthroughWriter.Write(mp); err != nil { agg.metrics.addPassthrough.ReportError(err) return err } @@ -342,21 +384,6 @@ func (agg *aggregator) Close() error { return nil } -func (agg *aggregator) passWriter() (writer.Writer, error) { - agg.RLock() - defer agg.RUnlock() - - if agg.state != aggregatorOpen { - return nil, errAggregatorNotOpenOrClosed - } - - if agg.electionManager.ElectionState() == FollowerState { - return writer.NewBlackholeWriter(), nil - } - - return agg.passthroughWriter, nil -} - func (agg *aggregator) shardFor(id id.RawID) (*aggregatorShard, error) { var ( numShards = agg.currNumShards.Load() @@ -368,52 +395,17 @@ func (agg *aggregator) shardFor(id id.RawID) (*aggregatorShard, error) { } agg.RLock() - shard, err := agg.shardForWithLock(id, shardID, noUpdateShards) - if err == nil || err != errActivePlacementChanged { - agg.RUnlock() - return shard, err + if int(shardID) >= len(agg.shards) { + return nil, errShardNotOwned } + shard := agg.shards[shardID] agg.RUnlock() - agg.Lock() - shard, err = agg.shardForWithLock(id, shardID, updateShards) - agg.Unlock() - - return shard, err -} - -func (agg *aggregator) shardForWithLock( - id id.RawID, - shardID uint32, - updateShardsType updateShardsType, -) (*aggregatorShard, error) { - if agg.state != aggregatorOpen { - return nil, errAggregatorNotOpenOrClosed - } - - placement, err := agg.placementManager.Placement() - if err != nil { - return nil, err - } - - if agg.shouldProcessPlacementWithLock(placement) { - if updateShardsType == noUpdateShards { - return nil, errActivePlacementChanged - } - if err := agg.processPlacementWithLock(placement); err != nil { - return nil, err - } - // check if number of shards in placement changed, and recalculate shardID if needed - if int32(placement.NumShards()) != agg.currNumShards.Load() { - shardID = agg.shardFn(id, uint32(placement.NumShards())) - } - } - - if int(shardID) >= len(agg.shards) || agg.shards[shardID] == nil { + if shard == nil { return nil, errShardNotOwned } - return agg.shards[shardID], nil + return shard, nil } func (agg *aggregator) processPlacementWithLock( @@ -423,7 +415,13 @@ func (agg *aggregator) processPlacementWithLock( if !agg.shouldProcessPlacementWithLock(newPlacement) { return nil } - var newShardSet shard.Shards + + var ( + metrics = agg.metrics.placement + newShardSet shard.Shards + ) + + metrics.cutoverChanged.Inc(1) instance, err := agg.placementManager.InstanceFrom(newPlacement) if err == nil { newShardSet = instance.Shards() @@ -452,7 +450,8 @@ func (agg *aggregator) processPlacementWithLock( return err } - agg.metrics.placement.updated.Inc(1) + metrics.updated.Inc(1) + return nil } @@ -462,7 +461,6 @@ func (agg *aggregator) shouldProcessPlacementWithLock( // If there is no placement yet, or the placement has been updated, // process this placement. if agg.currPlacement == nil || agg.currPlacement != newPlacement { - agg.metrics.placement.cutoverChanged.Inc(1) return true } return false @@ -1001,12 +999,14 @@ func newAggregatorShardsMetrics(scope tally.Scope) aggregatorShardsMetrics { type aggregatorPlacementMetrics struct { cutoverChanged tally.Counter updated tally.Counter + updateFailures tally.Counter } func newAggregatorPlacementMetrics(scope tally.Scope) aggregatorPlacementMetrics { return aggregatorPlacementMetrics{ cutoverChanged: scope.Counter("placement-changed"), updated: scope.Counter("updated"), + updateFailures: scope.Counter("update-failures"), } } diff --git a/src/aggregator/aggregator/aggregator_mock.go b/src/aggregator/aggregator/aggregator_mock.go index f647bf0987..7a4ad484ab 100644 --- a/src/aggregator/aggregator/aggregator_mock.go +++ b/src/aggregator/aggregator/aggregator_mock.go @@ -428,6 +428,20 @@ func (m *MockPlacementManager) EXPECT() *MockPlacementManagerMockRecorder { return m.recorder } +// C mocks base method +func (m *MockPlacementManager) C() <-chan struct{} { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "C") + ret0, _ := ret[0].(<-chan struct{}) + return ret0 +} + +// C indicates an expected call of C +func (mr *MockPlacementManagerMockRecorder) C() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "C", reflect.TypeOf((*MockPlacementManager)(nil).C)) +} + // Close mocks base method func (m *MockPlacementManager) Close() error { m.ctrl.T.Helper() diff --git a/src/aggregator/aggregator/aggregator_test.go b/src/aggregator/aggregator/aggregator_test.go index 27fc98a6e9..8822c3cdf7 100644 --- a/src/aggregator/aggregator/aggregator_test.go +++ b/src/aggregator/aggregator/aggregator_test.go @@ -200,8 +200,9 @@ func TestAggregatorOpenInstanceNotInPlacement(t *testing.T) { testPlacement := placement.NewPlacement().SetCutoverNanos(5678) placementManager.EXPECT().Open().Return(nil) + placementManager.EXPECT().C().Return(make(chan struct{})).AnyTimes() placementManager.EXPECT().InstanceID().Return(agg.opts.PlacementManager().InstanceID()) - placementManager.EXPECT().Placement().Return(testPlacement, nil) + placementManager.EXPECT().Placement().Return(testPlacement, nil).AnyTimes() placementManager.EXPECT().InstanceFrom(testPlacement).Return(nil, ErrInstanceNotFoundInPlacement) require.NoError(t, agg.Open()) @@ -324,7 +325,7 @@ func TestAggregatorAddUntimedNotOpen(t *testing.T) { agg, _ := testAggregator(t, ctrl) err := agg.AddUntimed(testUntimedMetric, testStagedMetadatas) - require.Equal(t, errAggregatorNotOpenOrClosed, err) + require.Equal(t, errShardNotOwned, err) } func TestAggregatorAddUntimedNotResponsibleForShard(t *testing.T) { @@ -364,6 +365,8 @@ func TestAggregatorAddUntimedSuccessWithPlacementUpdate(t *testing.T) { require.NoError(t, agg.Open()) require.Equal(t, int64(testPlacementCutover), agg.currPlacement.CutoverNanos()) + existingShard := agg.shards[3] + newShardAssignment := []shard.Shard{ shard.NewShard(0).SetState(shard.Initializing).SetCutoverNanos(5000).SetCutoffNanos(20000), shard.NewShard(1).SetState(shard.Initializing).SetCutoverNanos(5500).SetCutoffNanos(25000), @@ -385,7 +388,6 @@ func TestAggregatorAddUntimedSuccessWithPlacementUpdate(t *testing.T) { time.Sleep(100 * time.Millisecond) } - existingShard := agg.shards[3] err = agg.AddUntimed(testUntimedMetric, testStagedMetadatas) require.NoError(t, err) require.Equal(t, 5, len(agg.shards)) @@ -412,14 +414,15 @@ func TestAggregatorAddUntimedSuccessWithPlacementUpdate(t *testing.T) { } require.Equal(t, 1, len(agg.shards[1].metricMap.entries)) require.Equal(t, newPlacementCutoverNanos, agg.currPlacement.CutoverNanos()) - for { existingShard.RLock() closed := existingShard.closed existingShard.RUnlock() + if closed { break } + time.Sleep(100 * time.Millisecond) } } @@ -430,7 +433,7 @@ func TestAggregatorAddTimedNotOpen(t *testing.T) { agg, _ := testAggregator(t, ctrl) err := agg.AddTimed(testTimedMetric, testTimedMetadata) - require.Equal(t, errAggregatorNotOpenOrClosed, err) + require.Equal(t, errShardNotOwned, err) } func TestAggregatorAddTimedNotResponsibleForShard(t *testing.T) { @@ -474,6 +477,8 @@ func TestAggregatorAddTimedSuccessWithPlacementUpdate(t *testing.T) { require.NoError(t, agg.Open()) require.Equal(t, int64(testPlacementCutover), agg.currPlacement.CutoverNanos()) + existingShard := agg.shards[3] + newShardAssignment := []shard.Shard{ shard.NewShard(0).SetState(shard.Initializing).SetCutoverNanos(5000).SetCutoffNanos(20000), shard.NewShard(1).SetState(shard.Initializing).SetCutoverNanos(5500).SetCutoffNanos(25000), @@ -495,7 +500,6 @@ func TestAggregatorAddTimedSuccessWithPlacementUpdate(t *testing.T) { time.Sleep(100 * time.Millisecond) } - existingShard := agg.shards[3] err = agg.AddTimed(testTimedMetric, testTimedMetadata) require.NoError(t, err) require.Equal(t, 5, len(agg.shards)) @@ -540,7 +544,7 @@ func TestAggregatorAddForwardedNotOpen(t *testing.T) { agg, _ := testAggregator(t, ctrl) err := agg.AddForwarded(testForwardedMetric, testForwardMetadata) - require.Equal(t, errAggregatorNotOpenOrClosed, err) + require.Equal(t, errShardNotOwned, err) } func TestAggregatorAddForwardedNotResponsibleForShard(t *testing.T) { @@ -584,6 +588,8 @@ func TestAggregatorAddForwardedSuccessWithPlacementUpdate(t *testing.T) { require.NoError(t, agg.Open()) require.Equal(t, int64(testPlacementCutover), agg.currPlacement.CutoverNanos()) + existingShard := agg.shards[3] + newShardAssignment := []shard.Shard{ shard.NewShard(0).SetState(shard.Initializing).SetCutoverNanos(5000).SetCutoffNanos(20000), shard.NewShard(1).SetState(shard.Initializing).SetCutoverNanos(5500).SetCutoffNanos(25000), @@ -605,7 +611,6 @@ func TestAggregatorAddForwardedSuccessWithPlacementUpdate(t *testing.T) { time.Sleep(100 * time.Millisecond) } - existingShard := agg.shards[3] err = agg.AddForwarded(testForwardedMetric, testForwardMetadata) require.NoError(t, err) require.Equal(t, 5, len(agg.shards)) @@ -1042,10 +1047,10 @@ func testAggregatorWithCustomPlacements( ctrl *gomock.Controller, proto *placementpb.PlacementSnapshots, ) (*aggregator, kv.Store) { - watcher, store := testWatcherWithPlacementProto(t, testPlacementKey, proto) + watcherOpts, store := testWatcherOptsWithPlacementProto(t, testPlacementKey, proto) placementManagerOpts := NewPlacementManagerOptions(). SetInstanceID(testInstanceID). - SetWatcher(watcher) + SetWatcherOptions(watcherOpts) placementManager := NewPlacementManager(placementManagerOpts) opts := testOptions(ctrl). SetEntryCheckInterval(0). @@ -1054,11 +1059,11 @@ func testAggregatorWithCustomPlacements( } // nolint: unparam -func testWatcherWithPlacementProto( +func testWatcherOptsWithPlacementProto( t *testing.T, placementKey string, proto *placementpb.PlacementSnapshots, -) (placement.Watcher, kv.Store) { +) (placement.WatcherOptions, kv.Store) { t.Helper() store := mem.NewStore() _, err := store.SetIfNotExists(placementKey, proto) @@ -1066,8 +1071,7 @@ func testWatcherWithPlacementProto( placementWatcherOpts := placement.NewWatcherOptions(). SetStagedPlacementKey(placementKey). SetStagedPlacementStore(store) - placementWatcher := placement.NewPlacementsWatcher(placementWatcherOpts) - return placementWatcher, store + return placementWatcherOpts, store } // nolint: unparam diff --git a/src/aggregator/aggregator/placement_mgr.go b/src/aggregator/aggregator/placement_mgr.go index 70c236e328..2950f08a35 100644 --- a/src/aggregator/aggregator/placement_mgr.go +++ b/src/aggregator/aggregator/placement_mgr.go @@ -63,6 +63,9 @@ type PlacementManager interface { // Shards returns the current shards owned by the instance. Shards() (shard.Shards, error) + // C returns a channel that can be used to subscribe for updates + C() <-chan struct{} + // Close closes the placement manager. Close() error } @@ -70,12 +73,14 @@ type PlacementManager interface { type placementManagerMetrics struct { activePlacementErrors tally.Counter instanceNotFound tally.Counter + updates tally.Counter } func newPlacementManagerMetrics(scope tally.Scope) placementManagerMetrics { return placementManagerMetrics{ activePlacementErrors: scope.Counter("active-placement-errors"), instanceNotFound: scope.Counter("instance-not-found"), + updates: scope.Counter("placement-updates"), } } @@ -96,17 +101,22 @@ type placementManager struct { state placementManagerState metrics placementManagerMetrics + ch chan struct{} } // NewPlacementManager creates a new placement manager. func NewPlacementManager(opts PlacementManagerOptions) PlacementManager { instrumentOpts := opts.InstrumentOptions() - return &placementManager{ - nowFn: opts.ClockOptions().NowFn(), - instanceID: opts.InstanceID(), - placementWatcher: opts.Watcher(), - metrics: newPlacementManagerMetrics(instrumentOpts.MetricsScope()), + mgr := &placementManager{ + nowFn: opts.ClockOptions().NowFn(), + instanceID: opts.InstanceID(), + ch: make(chan struct{}, 1), + metrics: newPlacementManagerMetrics(instrumentOpts.MetricsScope()), } + mgr.placementWatcher = placement.NewPlacementsWatcher( + opts.WatcherOptions().SetOnPlacementChangedFn(mgr.process)) + + return mgr } func (mgr *placementManager) Open() error { @@ -120,9 +130,14 @@ func (mgr *placementManager) Open() error { return err } mgr.state = placementManagerOpen + return nil } +func (mgr *placementManager) C() <-chan struct{} { + return mgr.ch +} + func (mgr *placementManager) InstanceID() string { mgr.RLock() value := mgr.instanceID @@ -243,3 +258,10 @@ func (mgr *placementManager) instanceFrom(placement placement.Placement) (placem } return instance, nil } + +func (mgr *placementManager) process(_, _ placement.Placement) { + select { + case mgr.ch <- struct{}{}: + default: + } +} diff --git a/src/aggregator/aggregator/placement_mgr_options.go b/src/aggregator/aggregator/placement_mgr_options.go index 5a7ad10e7f..00ad43b0ec 100644 --- a/src/aggregator/aggregator/placement_mgr_options.go +++ b/src/aggregator/aggregator/placement_mgr_options.go @@ -50,18 +50,18 @@ type PlacementManagerOptions interface { // InstanceID returns the instance id. InstanceID() string - // SetWatcher sets the placement watcher. - SetWatcher(value placement.Watcher) PlacementManagerOptions + // SetWatcherOptions sets the placement watcher options. + SetWatcherOptions(value placement.WatcherOptions) PlacementManagerOptions - // Watcher returns the placement watcher. - Watcher() placement.Watcher + // WatcherOptions returns the placement watcher options. + WatcherOptions() placement.WatcherOptions } type placementManagerOptions struct { - clockOpts clock.Options - instrumentOpts instrument.Options - instanceID string - placementWatcher placement.Watcher + clockOpts clock.Options + instrumentOpts instrument.Options + instanceID string + placementWatcherOpts placement.WatcherOptions } // NewPlacementManagerOptions creates a new set of placement manager options. @@ -103,12 +103,12 @@ func (o *placementManagerOptions) InstanceID() string { return o.instanceID } -func (o *placementManagerOptions) SetWatcher(value placement.Watcher) PlacementManagerOptions { +func (o *placementManagerOptions) SetWatcherOptions(value placement.WatcherOptions) PlacementManagerOptions { opts := *o - opts.placementWatcher = value + opts.placementWatcherOpts = value return &opts } -func (o *placementManagerOptions) Watcher() placement.Watcher { - return o.placementWatcher +func (o *placementManagerOptions) WatcherOptions() placement.WatcherOptions { + return o.placementWatcherOpts } diff --git a/src/aggregator/aggregator/placement_mgr_test.go b/src/aggregator/aggregator/placement_mgr_test.go index 66f0ed52a2..539f05753e 100644 --- a/src/aggregator/aggregator/placement_mgr_test.go +++ b/src/aggregator/aggregator/placement_mgr_test.go @@ -123,6 +123,30 @@ func TestPlacementManagerPlacement(t *testing.T) { require.Equal(t, []uint32{0, 1, 2, 3}, placement.Shards()) } +func TestPlacementManagerC(t *testing.T) { + mgr, store := testPlacementManager(t) + require.NoError(t, mgr.Open()) + select { + case <-mgr.C(): + case <-time.After(1 * time.Second): + t.Fatal("expected placement init to propagate within deadline") + } + + require.Equal(t, 0, len(mgr.C())) + // Wait for change to propagate. + _, err := store.Set(testPlacementKey, testStagedPlacementProto) + require.NoError(t, err) + select { + case <-mgr.C(): + case <-time.After(1 * time.Second): + t.Fatal("expected placement update to propagate within deadline") + } + placement, err := mgr.Placement() + require.NoError(t, err) + require.Equal(t, int64(10000), placement.CutoverNanos()) + require.Equal(t, []uint32{0, 1, 2, 3}, placement.Shards()) +} + func TestPlacementManagerInstanceNotFound(t *testing.T) { mgr, store := testPlacementManager(t) require.NoError(t, mgr.Open()) @@ -329,10 +353,10 @@ func TestPlacementClose(t *testing.T) { } func testPlacementManager(t *testing.T) (*placementManager, kv.Store) { - watcher, store := testWatcherWithPlacementProto(t, testPlacementKey, testStagedPlacementProto) + watcherOpts, store := testWatcherOptsWithPlacementProto(t, testPlacementKey, testStagedPlacementProto) opts := NewPlacementManagerOptions(). SetInstanceID(testInstanceID). - SetWatcher(watcher) + SetWatcherOptions(watcherOpts) placementManager := NewPlacementManager(opts).(*placementManager) return placementManager, store } diff --git a/src/aggregator/integration/setup.go b/src/aggregator/integration/setup.go index f163c119f2..d80afed7fa 100644 --- a/src/aggregator/integration/setup.go +++ b/src/aggregator/integration/setup.go @@ -117,10 +117,9 @@ func newTestServerSetup(t *testing.T, opts testServerOptions) *testServerSetup { placementWatcherOpts := placement.NewWatcherOptions(). SetStagedPlacementKey(opts.PlacementKVKey()). SetStagedPlacementStore(opts.KVStore()) - placementWatcher := placement.NewPlacementsWatcher(placementWatcherOpts) placementManagerOpts := aggregator.NewPlacementManagerOptions(). SetInstanceID(opts.InstanceID()). - SetWatcher(placementWatcher) + SetWatcherOptions(placementWatcherOpts) placementManager := aggregator.NewPlacementManager(placementManagerOpts) aggregatorOpts = aggregatorOpts. SetShardFn(opts.ShardFn()). diff --git a/src/cluster/placement/placements_watcher.go b/src/cluster/placement/placements_watcher.go index 11394d8b8d..3a42d8da45 100644 --- a/src/cluster/placement/placements_watcher.go +++ b/src/cluster/placement/placements_watcher.go @@ -148,11 +148,11 @@ func (t *placementsWatcher) process(newValue interface{}) error { oldPlacement = old.placement } + t.valuePayload.Store(payload{placement: newPlacement}) + if t.onPlacementChanged != nil { t.onPlacementChanged(oldPlacement, newPlacement) } - t.valuePayload.Store(payload{placement: newPlacement}) - return nil } diff --git a/src/cmd/services/m3aggregator/config/aggregator.go b/src/cmd/services/m3aggregator/config/aggregator.go index 54f4074da9..93db250d88 100644 --- a/src/cmd/services/m3aggregator/config/aggregator.go +++ b/src/cmd/services/m3aggregator/config/aggregator.go @@ -572,11 +572,10 @@ func (c placementManagerConfiguration) NewPlacementManager( scope := instrumentOpts.MetricsScope() iOpts := instrumentOpts.SetMetricsScope(scope.SubScope("placement-watcher")) placementWatcherOpts := c.Watcher.NewOptions(store, iOpts) - placementWatcher := placement.NewPlacementsWatcher(placementWatcherOpts) placementManagerOpts := aggregator.NewPlacementManagerOptions(). SetInstrumentOptions(instrumentOpts). SetInstanceID(instanceID). - SetWatcher(placementWatcher) + SetWatcherOptions(placementWatcherOpts) return aggregator.NewPlacementManager(placementManagerOpts), nil } diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index d5ff950055..456d12f7f0 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -1072,10 +1072,9 @@ func (o DownsamplerOptions) newAggregatorPlacementManager( placementWatcherOpts := placement.NewWatcherOptions(). SetStagedPlacementKey(placementKVKey). SetStagedPlacementStore(localKVStore) - placementWatcher := placement.NewPlacementsWatcher(placementWatcherOpts) placementManagerOpts := aggregator.NewPlacementManagerOptions(). SetInstanceID(instanceID). - SetWatcher(placementWatcher) + SetWatcherOptions(placementWatcherOpts) return aggregator.NewPlacementManager(placementManagerOpts), nil }