Skip to content

Commit

Permalink
Fix NPE due to race with a closing series (#3056)
Browse files Browse the repository at this point in the history
* Fix NPE due to race with a closing series

* Concurrent read/write wired list test
  • Loading branch information
justinjc authored Mar 9, 2021
1 parent 4f45430 commit c0d6797
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 9 deletions.
21 changes: 21 additions & 0 deletions src/dbnode/integration/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ const (
// defaultTickMinimumInterval is the default minimum tick interval.
defaultTickMinimumInterval = 1 * time.Second

// defaultTickCancellationCheckInterval is the default minimum tick cancellation check interval.
defaultTickCancellationCheckInterval = 1 * time.Second

// defaultUseTChannelClientForReading determines whether we use the tchannel client for reading by default.
defaultUseTChannelClientForReading = false

Expand Down Expand Up @@ -119,6 +122,12 @@ type TestOptions interface {
// TickMinimumInterval returns the tick interval.
TickMinimumInterval() time.Duration

// SetTickCancellationCheckInterval sets the tick cancellation check interval.
SetTickCancellationCheckInterval(value time.Duration) TestOptions

// TickCancellationCheckInterval returns the tick cancellation check interval.
TickCancellationCheckInterval() time.Duration

// SetHTTPClusterAddr sets the http cluster address.
SetHTTPClusterAddr(value string) TestOptions

Expand Down Expand Up @@ -319,6 +328,7 @@ type options struct {
nsInitializer namespace.Initializer
id string
tickMinimumInterval time.Duration
tickCancellationCheckInterval time.Duration
httpClusterAddr string
tchannelClusterAddr string
httpNodeAddr string
Expand Down Expand Up @@ -368,6 +378,7 @@ func NewTestOptions(t *testing.T) TestOptions {
namespaces: namespaces,
id: defaultID,
tickMinimumInterval: defaultTickMinimumInterval,
tickCancellationCheckInterval: defaultTickCancellationCheckInterval,
serverStateChangeTimeout: defaultServerStateChangeTimeout,
clusterConnectionTimeout: defaultClusterConnectionTimeout,
readRequestTimeout: defaultReadRequestTimeout,
Expand Down Expand Up @@ -427,6 +438,16 @@ func (o *options) TickMinimumInterval() time.Duration {
return o.tickMinimumInterval
}

func (o *options) SetTickCancellationCheckInterval(value time.Duration) TestOptions {
opts := *o
opts.tickCancellationCheckInterval = value
return &opts
}

func (o *options) TickCancellationCheckInterval() time.Duration {
return o.tickCancellationCheckInterval
}

func (o *options) SetHTTPClusterAddr(value string) TestOptions {
opts := *o
opts.httpClusterAddr = value
Expand Down
216 changes: 216 additions & 0 deletions src/dbnode/integration/series_wired_list_panic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
// +build integration

// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package integration

import (
"fmt"
"testing"
"time"

"github.com/m3db/m3/src/dbnode/generated/thrift/rpc"
"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/require"
)

const (
numSeries = 10
)

var (
nsID = ident.StringID("ns0")
)

func TestWiredListPanic(t *testing.T) {
// This test is used to repro https://github.com/m3db/m3/issues/2573.
// Unfortunately, this bug is due to a race condition and this test does not
// consistently reproduce it reliably in short period of time. As such, the
// test is configured to run for a very long duration to see if the repro
// occurs. Comment out the below SkipNow() to actually run this.
t.SkipNow()

// Small increment to make race condition more likely.
tickInterval := 5 * time.Millisecond

nsOpts := namespace.NewOptions().
SetRepairEnabled(false).
SetRetentionOptions(DefaultIntegrationTestRetentionOpts).
SetCacheBlocksOnRetrieve(true)
ns, err := namespace.NewMetadata(nsID, nsOpts)
require.NoError(t, err)
testOpts := NewTestOptions(t).
SetTickMinimumInterval(tickInterval).
SetTickCancellationCheckInterval(tickInterval).
SetNamespaces([]namespace.Metadata{ns}).
// Wired list size of one means that if we query for two different IDs
// alternating between each one, we'll evict from the wired list on
// every query.
SetMaxWiredBlocks(1)

testSetup, err := NewTestSetup(t, testOpts, nil,
func(opts storage.Options) storage.Options {
return opts.SetMediatorTickInterval(tickInterval)
},
func(opts storage.Options) storage.Options {
blockRetrieverMgr := block.NewDatabaseBlockRetrieverManager(
func(
md namespace.Metadata,
shardSet sharding.ShardSet,
) (block.DatabaseBlockRetriever, error) {
retrieverOpts := fs.NewBlockRetrieverOptions().
SetBlockLeaseManager(opts.BlockLeaseManager()).
SetCacheBlocksOnRetrieve(true)
retriever, err := fs.NewBlockRetriever(retrieverOpts,
opts.CommitLogOptions().FilesystemOptions())
if err != nil {
return nil, err
}

if err := retriever.Open(md, shardSet); err != nil {
return nil, err
}
return retriever, nil
})
return opts.SetDatabaseBlockRetrieverManager(blockRetrieverMgr)
},
)

require.NoError(t, err)
defer testSetup.Close()

// Start the server.
log := testSetup.StorageOpts().InstrumentOptions().Logger()
require.NoError(t, testSetup.StartServer())
log.Info("server is now up")

// Stop the server.
defer func() {
require.NoError(t, testSetup.StopServer())
log.Info("server is now down")
}()

md := testSetup.NamespaceMetadataOrFail(nsID)
ropts := md.Options().RetentionOptions()
blockSize := ropts.BlockSize()
filePathPrefix := testSetup.StorageOpts().CommitLogOptions().FilesystemOptions().FilePathPrefix()

seriesStrs := make([]string, 0, numSeries)
for i := 0; i < numSeries; i++ {
seriesStrs = append(seriesStrs, fmt.Sprintf("series-%d", i))
}

start := testSetup.NowFn()()
go func() {
for i := 0; true; i++ {
write(t, testSetup, blockSize, start, filePathPrefix, i, seriesStrs)
time.Sleep(5 * time.Millisecond)
}
}()

doneCh := make(chan struct{})
go func() {
for {
select {
case <-doneCh:
return
default:
read(t, testSetup, blockSize, seriesStrs)
time.Sleep(5 * time.Millisecond)
}
}
}()

time.Sleep(time.Hour)
// Stop reads before tearing down testSetup.
doneCh <- struct{}{}
}

func write(
t *testing.T,
testSetup TestSetup,
blockSize time.Duration,
start time.Time,
filePathPrefix string,
i int,
seriesStrs []string,
) {
blockStart := start.Add(time.Duration(2*i) * blockSize)
testSetup.SetNowFn(blockStart)

input := generate.BlockConfig{
IDs: seriesStrs, NumPoints: 1, Start: blockStart,
}
testData := generate.Block(input)
require.NoError(t, testSetup.WriteBatch(nsID, testData))

// Progress well past the block boundary so that the series gets flushed to
// disk. This allows the next tick to purge the series from memory, closing
// the series and thus making the id nil.
testSetup.SetNowFn(blockStart.Add(blockSize * 3 / 2))
require.NoError(t, waitUntilFileSetFilesExist(
filePathPrefix,
[]fs.FileSetFileIdentifier{
{
Namespace: nsID,
Shard: 1,
BlockStart: blockStart,
VolumeIndex: 0,
},
},
time.Second,
))
}

func read(
t *testing.T,
testSetup TestSetup,
blockSize time.Duration,
seriesStrs []string,
) {
// After every write, "now" would be progressed into the future so that the
// will be flushed to disk. This makes "now" a suitable RangeEnd for the
// fetch request. The precise range does not matter so long as it overlaps
// with the current retention.
now := testSetup.NowFn()()

req := rpc.NewFetchRequest()
req.NameSpace = nsID.String()
req.RangeStart = xtime.ToNormalizedTime(now.Add(-4*blockSize), time.Second)
req.RangeEnd = xtime.ToNormalizedTime(now, time.Second)
req.ResultTimeType = rpc.TimeType_UNIX_SECONDS

// Fetching the series sequentially ensures that the wired list will have
// evictions assuming that the list is configured with a size of 1.
for _, seriesStr := range seriesStrs {
req.ID = seriesStr
_, err := testSetup.Fetch(req)
require.NoError(t, err)
}
}
1 change: 1 addition & 0 deletions src/dbnode/integration/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func NewTestSetup(
runtimeOptsMgr := storageOpts.RuntimeOptionsManager()
runtimeOpts := runtimeOptsMgr.Get().
SetTickMinimumInterval(opts.TickMinimumInterval()).
SetTickCancellationCheckInterval(opts.TickCancellationCheckInterval()).
SetMaxWiredBlocks(opts.MaxWiredBlocks()).
SetWriteNewSeriesAsync(opts.WriteNewSeriesAsync())
if err := runtimeOptsMgr.Update(runtimeOpts); err != nil {
Expand Down
28 changes: 28 additions & 0 deletions src/dbnode/runtime/runtime_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions src/dbnode/runtime/runtime_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
defaultTickSeriesBatchSize = 512
defaultTickPerSeriesSleepDuration = 100 * time.Microsecond
defaultTickMinimumInterval = 10 * time.Second
defaultTickCancellationCheckInterval = time.Second
defaultMaxWiredBlocks = uint(1 << 16) // 65,536
)

Expand Down Expand Up @@ -71,6 +72,7 @@ type options struct {
clientBootstrapConsistencyLevel topology.ReadConsistencyLevel
clientReadConsistencyLevel topology.ReadConsistencyLevel
clientWriteConsistencyLevel topology.ConsistencyLevel
tickCancellationCheckInterval time.Duration
}

// NewOptions creates a new set of runtime options with defaults
Expand All @@ -87,6 +89,7 @@ func NewOptions() Options {
clientBootstrapConsistencyLevel: DefaultBootstrapConsistencyLevel,
clientReadConsistencyLevel: DefaultReadConsistencyLevel,
clientWriteConsistencyLevel: DefaultWriteConsistencyLevel,
tickCancellationCheckInterval: defaultTickCancellationCheckInterval,
}
}

Expand Down Expand Up @@ -234,3 +237,13 @@ func (o *options) SetClientWriteConsistencyLevel(value topology.ConsistencyLevel
func (o *options) ClientWriteConsistencyLevel() topology.ConsistencyLevel {
return o.clientWriteConsistencyLevel
}

func (o *options) SetTickCancellationCheckInterval(value time.Duration) Options {
opts := *o
opts.tickCancellationCheckInterval = value
return &opts
}

func (o *options) TickCancellationCheckInterval() time.Duration {
return o.tickCancellationCheckInterval
}
8 changes: 8 additions & 0 deletions src/dbnode/runtime/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,14 @@ type Options interface {
// ClientWriteConsistencyLevel returns the client write consistency level
// used when fetching data from peers for coordinated writes
ClientWriteConsistencyLevel() topology.ConsistencyLevel

// SetTickCancellationCheckInterval sets the interval to check whether the tick
// has been canceled. This duration also affects the minimum tick duration.
SetTickCancellationCheckInterval(value time.Duration) Options

// TickCancellationCheckInterval is the interval to check whether the tick
// has been canceled. This duration also affects the minimum tick duration.
TickCancellationCheckInterval() time.Duration
}

// OptionsManager updates and supplies runtime options.
Expand Down
5 changes: 3 additions & 2 deletions src/dbnode/storage/series/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,8 +592,9 @@ func (s *dbSeries) OnEvictedFromWiredList(id ident.ID, blockStart time.Time) {
s.Lock()
defer s.Unlock()

// Should never happen
if !id.Equal(s.id) {
// id can be nil at this point if this dbSeries gets closed just before it
// gets evicted from the wiredlist.
if id == nil || s.id == nil || !id.Equal(s.id) {
return
}

Expand Down
Loading

0 comments on commit c0d6797

Please sign in to comment.