Skip to content

Commit

Permalink
Check entry empty state to ensure GC eligible (#3634)
Browse files Browse the repository at this point in the history
  • Loading branch information
rallen090 authored Aug 19, 2021
1 parent c815135 commit 926a478
Show file tree
Hide file tree
Showing 41 changed files with 733 additions and 673 deletions.
2 changes: 1 addition & 1 deletion src/dbnode/generated/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
//go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out $GOPATH/src/$PACKAGE/src/dbnode/x/xio/io_mock.go"
//go:generate sh -c "mockgen -package=digest -destination=$GOPATH/src/$PACKAGE/src/dbnode/digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest"
//go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/series_mock.go"
//go:generate sh -c "mockgen -package=lookup $PACKAGE/src/dbnode/storage/series/lookup IndexWriter | genclean -pkg $PACKAGE/src/dbnode/storage/series/lookup -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/lookup/lookup_mock.go"
//go:generate sh -c "mockgen -package=storage $PACKAGE/src/dbnode/storage IndexWriter | genclean -pkg $PACKAGE/src/dbnode/storage -out $GOPATH/src/$PACKAGE/src/dbnode/storage/lookup_mock.go"

// mockgen rules for generating mocks for unexported interfaces (file mode)
//go:generate sh -c "mockgen -package=encoding -destination=$GOPATH/src/$PACKAGE/src/dbnode/encoding/encoding_mock.go -source=$GOPATH/src/$PACKAGE/src/dbnode/encoding/types.go"
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/integration/index_active_block_rotate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestIndexActiveBlockRotate(t *testing.T) {
numWrites = 50
numTags = 10
blockSize = 2 * time.Hour
indexBlockSize = blockSize
indexBlockSize = blockSize * 2
retentionPeriod = 12 * blockSize
bufferPast = 10 * time.Minute
rOpts = retention.NewOptions().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package lookup
package storage

import (
"sync"
Expand All @@ -30,6 +30,7 @@ import (
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/storage/series"
"github.com/m3db/m3/src/dbnode/ts/writes"
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/context"
xtime "github.com/m3db/m3/src/x/time"
Expand Down Expand Up @@ -58,18 +59,18 @@ type IndexWriter interface {
// members to track lifecycle and minimize indexing overhead.
// NB: users are expected to use `NewEntry` to construct these objects.
type Entry struct {
relookupAndIncrementReaderWriterCount func() (index.OnIndexSeries, bool)
Series series.DatabaseSeries
Index uint64
indexWriter IndexWriter
curReadWriters int32
reverseIndex entryIndexState
nowFn clock.NowFn
pendingIndexBatchSizeOne []writes.PendingIndexInsert
Shard Shard
Series series.DatabaseSeries
Index uint64
indexWriter IndexWriter
curReadWriters int32
reverseIndex entryIndexState
nowFn clock.NowFn
pendingIndexBatchSizeOne []writes.PendingIndexInsert
}

// ensure Entry satisfies the `index.OnIndexSeries` interface.
var _ index.OnIndexSeries = &Entry{}
// ensure Entry satisfies the `doc.OnIndexSeries` interface.
var _ doc.OnIndexSeries = &Entry{}

// ensure Entry satisfies the `bootstrap.SeriesRef` interface.
var _ bootstrap.SeriesRef = &Entry{}
Expand All @@ -79,11 +80,11 @@ var _ bootstrap.SeriesRefResolver = &Entry{}

// NewEntryOptions supplies options for a new entry.
type NewEntryOptions struct {
RelookupAndIncrementReaderWriterCount func() (index.OnIndexSeries, bool)
Series series.DatabaseSeries
Index uint64
IndexWriter IndexWriter
NowFn clock.NowFn
Shard Shard
Series series.DatabaseSeries
Index uint64
IndexWriter IndexWriter
NowFn clock.NowFn
}

// NewEntry returns a new Entry.
Expand All @@ -93,22 +94,17 @@ func NewEntry(opts NewEntryOptions) *Entry {
nowFn = opts.NowFn
}
entry := &Entry{
relookupAndIncrementReaderWriterCount: opts.RelookupAndIncrementReaderWriterCount,
Series: opts.Series,
Index: opts.Index,
indexWriter: opts.IndexWriter,
nowFn: nowFn,
pendingIndexBatchSizeOne: make([]writes.PendingIndexInsert, 1),
reverseIndex: newEntryIndexState(),
Shard: opts.Shard,
Series: opts.Series,
Index: opts.Index,
indexWriter: opts.IndexWriter,
nowFn: nowFn,
pendingIndexBatchSizeOne: make([]writes.PendingIndexInsert, 1),
reverseIndex: newEntryIndexState(),
}
return entry
}

// RelookupAndIncrementReaderWriterCount will relookup the entry.
func (entry *Entry) RelookupAndIncrementReaderWriterCount() (index.OnIndexSeries, bool) {
return entry.relookupAndIncrementReaderWriterCount()
}

// ReaderWriterCount returns the current ref count on the Entry.
func (entry *Entry) ReaderWriterCount() int32 {
return atomic.LoadInt32(&entry.curReadWriters)
Expand All @@ -124,6 +120,14 @@ func (entry *Entry) DecrementReaderWriterCount() {
atomic.AddInt32(&entry.curReadWriters, -1)
}

// IndexedBlockCount returns the count of indexed block states.
func (entry *Entry) IndexedBlockCount() int {
entry.reverseIndex.RLock()
count := len(entry.reverseIndex.states)
entry.reverseIndex.RUnlock()
return count
}

// IndexedForBlockStart returns a bool to indicate if the Entry has been successfully
// indexed for the given index blockstart.
func (entry *Entry) IndexedForBlockStart(indexBlockStart xtime.UnixNano) bool {
Expand Down Expand Up @@ -231,23 +235,17 @@ func (entry *Entry) IfAlreadyIndexedMarkIndexSuccessAndFinalize(
return successAlready
}

// RemoveIndexedForBlockStarts removes the entry for the index for all blockStarts.
func (entry *Entry) RemoveIndexedForBlockStarts(
blockStarts map[xtime.UnixNano]struct{},
) index.RemoveIndexedForBlockStartsResult {
var result index.RemoveIndexedForBlockStartsResult
entry.reverseIndex.Lock()
for k, state := range entry.reverseIndex.states {
_, ok := blockStarts[k]
if ok && state.success {
delete(entry.reverseIndex.states, k)
result.IndexedBlockStartsRemoved++
continue
}
result.IndexedBlockStartsRemaining++
// RelookupAndCheckIsEmpty looks up the series and checks if it is empty.
// The first result indicates if the series is empty.
// The second result indicates if the series can be looked up at all.
func (entry *Entry) RelookupAndCheckIsEmpty() (bool, bool) {
e, _, err := entry.Shard.TryRetrieveWritableSeries(entry.Series.ID())
if err != nil || e == nil {
return false, false
}
entry.reverseIndex.Unlock()
return result
defer entry.DecrementReaderWriterCount()

return entry.Series.IsEmpty(), true
}

// Write writes a new value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package lookup_test
package storage

import (
"sync"
"testing"
"time"

"github.com/m3db/m3/src/dbnode/storage/series/lookup"
xtime "github.com/m3db/m3/src/x/time"

"github.com/fortytw2/leaktest"
Expand All @@ -43,7 +42,7 @@ func newTime(n int) xtime.UnixNano {
}

func TestEntryReaderWriterCount(t *testing.T) {
e := lookup.NewEntry(lookup.NewEntryOptions{})
e := NewEntry(NewEntryOptions{})
require.Equal(t, int32(0), e.ReaderWriterCount())

e.IncrementReaderWriterCount()
Expand All @@ -54,7 +53,7 @@ func TestEntryReaderWriterCount(t *testing.T) {
}

func TestEntryIndexSuccessPath(t *testing.T) {
e := lookup.NewEntry(lookup.NewEntryOptions{})
e := NewEntry(NewEntryOptions{})
t0 := newTime(0)
require.False(t, e.IndexedForBlockStart(t0))

Expand All @@ -69,7 +68,7 @@ func TestEntryIndexSuccessPath(t *testing.T) {
}

func TestEntryIndexFailPath(t *testing.T) {
e := lookup.NewEntry(lookup.NewEntryOptions{})
e := NewEntry(NewEntryOptions{})
t0 := newTime(0)
require.False(t, e.IndexedForBlockStart(t0))

Expand All @@ -85,7 +84,7 @@ func TestEntryIndexFailPath(t *testing.T) {
func TestEntryMultipleGoroutinesRaceIndexUpdate(t *testing.T) {
defer leaktest.CheckTimeout(t, time.Second)()

e := lookup.NewEntry(lookup.NewEntryOptions{})
e := NewEntry(NewEntryOptions{})
t0 := newTime(0)
require.False(t, e.IndexedForBlockStart(t0))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package lookup
package storage

import (
"testing"
Expand All @@ -35,22 +35,12 @@ import (
"github.com/stretchr/testify/require"
)

var (
initTime = time.Date(2018, time.May, 12, 15, 55, 0, 0, time.UTC)
testBlockSize = 24 * time.Hour
)

func newTime(n int) xtime.UnixNano {
t := initTime.Truncate(testBlockSize).Add(time.Duration(n) * testBlockSize)
return xtime.ToUnixNano(t)
}

func TestEntryIndexAttemptRotatesSlice(t *testing.T) {
e := NewEntry(NewEntryOptions{})
for i := 0; i < 10; i++ {
ti := newTime(i)
require.True(t, e.NeedsIndexUpdate(ti))
require.Equal(t, i+1, len(e.reverseIndex.states))
require.Equal(t, i+1, e.IndexedBlockCount())
}

// ensure only the latest ones are held on to
Expand Down
12 changes: 7 additions & 5 deletions src/dbnode/storage/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (m *flushManager) Flush(startTime xtime.UnixNano) error {
// will attempt to snapshot blocks w/ unflushed data which would be wasteful if
// the block is already flushable.
multiErr := xerrors.NewMultiError()
if err = m.dataWarmFlush(namespaces, startTime); err != nil {
if err := m.dataWarmFlush(namespaces, startTime); err != nil {
multiErr = multiErr.Add(err)
}

Expand All @@ -159,7 +159,7 @@ func (m *flushManager) Flush(startTime xtime.UnixNano) error {
multiErr = multiErr.Add(fmt.Errorf("error rotating commitlog in mediator tick: %v", err))
}

if err = m.indexFlush(namespaces); err != nil {
if err := m.indexFlush(namespaces); err != nil {
multiErr = multiErr.Add(err)
}

Expand Down Expand Up @@ -187,8 +187,7 @@ func (m *flushManager) dataWarmFlush(
multiErr = multiErr.Add(err)
continue
}
err = m.flushNamespaceWithTimes(ns, flushTimes, flushPersist)
if err != nil {
if err := m.flushNamespaceWithTimes(ns, flushTimes, flushPersist); err != nil {
multiErr = multiErr.Add(err)
}
}
Expand Down Expand Up @@ -272,7 +271,10 @@ func (m *flushManager) indexFlush(
if !indexEnabled {
continue
}
multiErr = multiErr.Add(ns.FlushIndex(indexFlush))

if err := ns.FlushIndex(indexFlush); err != nil {
multiErr = multiErr.Add(err)
}
}
multiErr = multiErr.Add(indexFlush.DoneIndex())

Expand Down
21 changes: 18 additions & 3 deletions src/dbnode/storage/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,16 @@ func TestFlushManagerSkipNamespaceIndexingDisabled(t *testing.T) {
defer ctrl.Finish()

nsOpts := defaultTestNs1Opts.SetIndexOptions(namespace.NewIndexOptions().SetEnabled(false))
s1 := NewMockdatabaseShard(ctrl)
s2 := NewMockdatabaseShard(ctrl)
ns := NewMockdatabaseNamespace(ctrl)
ns.EXPECT().Options().Return(nsOpts).AnyTimes()
ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes()
ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
s1.EXPECT().ID().Return(uint32(1)).AnyTimes()
s2.EXPECT().ID().Return(uint32(2)).AnyTimes()

var (
mockFlushPersist = persist.NewMockFlushPreparer(ctrl)
Expand Down Expand Up @@ -357,14 +361,25 @@ func TestFlushManagerNamespaceIndexingEnabled(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

blocks := 24
nsOpts := defaultTestNs1Opts.SetIndexOptions(namespace.NewIndexOptions().SetEnabled(true))
s1 := NewMockdatabaseShard(ctrl)
s2 := NewMockdatabaseShard(ctrl)
ns := NewMockdatabaseNamespace(ctrl)
ns.EXPECT().Options().Return(nsOpts).AnyTimes()
ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes()
ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
ns.EXPECT().FlushIndex(gomock.Any()).Return(nil)
s1.EXPECT().ID().Return(uint32(1)).AnyTimes()
s2.EXPECT().ID().Return(uint32(2)).AnyTimes()

// Validate that the flush state is marked as successful only AFTER all prequisite steps have been run.
// Order is important to avoid any edge case where data is GCed from memory without all flushing operations
// being completed.
gomock.InOrder(
ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return(nil).Times(blocks),
ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes(),
ns.EXPECT().FlushIndex(gomock.Any()).Return(nil),
)

var (
mockFlushPersist = persist.NewMockFlushPreparer(ctrl)
Expand Down
7 changes: 6 additions & 1 deletion src/dbnode/storage/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,17 @@ const (
fileOpFailed
)

type warmStatus struct {
DataFlushed fileOpStatus
IndexFlushed fileOpStatus
}

type fileOpState struct {
// WarmStatus is the status of data persistence for WarmWrites only.
// Each block will only be warm-flushed once, so not keeping track of a
// version here is okay. This is used in the buffer Tick to determine when
// a warm bucket is evictable from memory.
WarmStatus fileOpStatus
WarmStatus warmStatus
// ColdVersionRetrievable keeps track of data persistence for ColdWrites only.
// Each block can be cold-flushed multiple times, so this tracks which
// version of the flush completed successfully. This is ultimately used in
Expand Down
Loading

0 comments on commit 926a478

Please sign in to comment.