Skip to content

Commit

Permalink
[dbnode] Negate possibility of point in time segment rotation returni…
Browse files Browse the repository at this point in the history
…ng query error (#2432)
  • Loading branch information
robskillington authored Jun 28, 2020
1 parent 333fea9 commit c373017
Show file tree
Hide file tree
Showing 8 changed files with 431 additions and 113 deletions.
30 changes: 25 additions & 5 deletions src/dbnode/integration/index_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,25 +171,45 @@ func genIDTags(i int, j int, numTags int, opts ...genIDTagsOption) (ident.ID, id
}

func isIndexed(t *testing.T, s client.Session, ns ident.ID, id ident.ID, tags ident.TagIterator) bool {
result, err := isIndexedChecked(t, s, ns, id, tags)
if err != nil {
return false
}
return result
}

func isIndexedChecked(t *testing.T, s client.Session, ns ident.ID, id ident.ID, tags ident.TagIterator) (bool, error) {
q := newQuery(t, tags)
iter, _, err := s.FetchTaggedIDs(ns, index.Query{Query: q}, index.QueryOptions{
StartInclusive: time.Now(),
EndExclusive: time.Now(),
SeriesLimit: 10})
if err != nil {
return false
return false, err
}

defer iter.Finalize()

if !iter.Next() {
return false
return false, nil
}

cuNs, cuID, cuTag := iter.Current()
if err := iter.Err(); err != nil {
return false, fmt.Errorf("iter err: %v", err)
}

if ns.String() != cuNs.String() {
return false
return false, fmt.Errorf("namespace not matched")
}
if id.String() != cuID.String() {
return false
return false, fmt.Errorf("id not matched")
}
return ident.NewTagIterMatcher(tags).Matches(cuTag)
if !ident.NewTagIterMatcher(tags).Matches(cuTag) {
return false, fmt.Errorf("tags did not match")
}

return true, nil
}

func newQuery(t *testing.T, tags ident.TagIterator) idx.Query {
Expand Down
148 changes: 109 additions & 39 deletions src/dbnode/integration/index_single_node_high_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package integration

import (
"fmt"
"math/rand"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -81,14 +82,39 @@ func TestIndexSingleNodeHighConcurrencyFewTagsHighCardinalitySkipWrites(t *testi
})
}

func TestIndexSingleNodeHighConcurrencyFewTagsHighCardinalityQueryDuringWrites(t *testing.T) {
if testing.Short() {
t.SkipNow() // Just skip if we're doing a short run
}

testIndexSingleNodeHighConcurrency(t, testIndexHighConcurrencyOptions{
concurrencyEnqueueWorker: 8,
concurrencyWrites: 5000,
enqueuePerWorker: 100000,
numTags: 2,
concurrencyQueryDuringWrites: 16,
skipVerify: true,
})
}

type testIndexHighConcurrencyOptions struct {
concurrencyEnqueueWorker int
concurrencyWrites int
enqueuePerWorker int
numTags int

// skipWrites will mix in skipped to make sure
// it doesn't interrupt the regular real-time ingestion pipeline.
skipWrites bool

// concurrencyQueryDuringWrites will issue queries while we
// are performing writes.
concurrencyQueryDuringWrites int

// skipVerify will skip verifying the actual series were indexed
// which is useful if just sanity checking can write/read concurrently
// without issue/errors and the stats look good.
skipVerify bool
}

func testIndexSingleNodeHighConcurrency(
Expand Down Expand Up @@ -188,8 +214,43 @@ func testIndexSingleNodeHighConcurrency(
}()
}

// If concurrent query load enabled while writing also hit with queries.
queryConcDuringWritesCloseCh := make(chan struct{}, 1)
numTotalQueryErrors := atomic.NewUint32(0)
if opts.concurrencyQueryDuringWrites == 0 {
log.Info("no concurrent queries during writes configured")
} else {
log.Info("starting concurrent queries during writes",
zap.Int("concurrency", opts.concurrencyQueryDuringWrites))
for i := 0; i < opts.concurrencyQueryDuringWrites; i++ {
go func() {
src := rand.NewSource(int64(i))
rng := rand.New(src)
for {
select {
case <-queryConcDuringWritesCloseCh:
return
default:
randI := rng.Intn(opts.concurrencyEnqueueWorker)
randJ := rng.Intn(opts.enqueuePerWorker)
id, tags := genIDTags(randI, randJ, opts.numTags)
_, err := isIndexedChecked(t, session, md.ID(), id, tags)
if err != nil {
if n := numTotalQueryErrors.Inc(); n < 10 {
// Log the first 10 errors for visibility but not flood.
log.Error("sampled query error", zap.Error(err))
}
}
}
}
}()
}
}

// Wait for writes to at least be enqueued.
wg.Wait()

// Check no write errors.
require.Equal(t, int(0), int(numTotalErrors.Load()))

log.Info("test data written",
Expand Down Expand Up @@ -224,49 +285,58 @@ func testIndexSingleNodeHighConcurrency(
assert.True(t, indexProcess,
fmt.Sprintf("expected to index %d but processed %d", expectNumIndex, value))

// Now check all of them are individually indexed.
var (
fetchWg sync.WaitGroup
notIndexedErrs []error
notIndexedLock sync.Mutex
)
for i := 0; i < opts.concurrencyEnqueueWorker; i++ {
fetchWg.Add(1)
i := i
go func() {
defer fetchWg.Done()
// Allow concurrent query during writes to finish.
close(queryConcDuringWritesCloseCh)

for j := 0; j < opts.enqueuePerWorker; j++ {
if opts.skipWrites && j%2 == 0 {
continue // not meant to be indexed.
}
// Check no query errors.
require.Equal(t, int(0), int(numTotalErrors.Load()))

j := j
fetchWg.Add(1)
workerPool.Go(func() {
defer fetchWg.Done()

id, tags := genIDTags(i, j, opts.numTags)
indexed := xclock.WaitUntil(func() bool {
found := isIndexed(t, session, md.ID(), id, tags)
return found
}, 30*time.Second)
if !indexed {
err := fmt.Errorf("not indexed series: i=%d, j=%d", i, j)
notIndexedLock.Lock()
notIndexedErrs = append(notIndexedErrs, err)
notIndexedLock.Unlock()
if !opts.skipVerify {
log.Info("data indexing each series visible start")
// Now check all of them are individually indexed.
var (
fetchWg sync.WaitGroup
notIndexedErrs []error
notIndexedLock sync.Mutex
)
for i := 0; i < opts.concurrencyEnqueueWorker; i++ {
fetchWg.Add(1)
i := i
go func() {
defer fetchWg.Done()

for j := 0; j < opts.enqueuePerWorker; j++ {
if opts.skipWrites && j%2 == 0 {
continue // not meant to be indexed.
}
})
}
}()

j := j
fetchWg.Add(1)
workerPool.Go(func() {
defer fetchWg.Done()

id, tags := genIDTags(i, j, opts.numTags)
indexed := xclock.WaitUntil(func() bool {
found := isIndexed(t, session, md.ID(), id, tags)
return found
}, 30*time.Second)
if !indexed {
err := fmt.Errorf("not indexed series: i=%d, j=%d", i, j)
notIndexedLock.Lock()
notIndexedErrs = append(notIndexedErrs, err)
notIndexedLock.Unlock()
}
})
}
}()
}
fetchWg.Wait()
log.Info("data indexing verify done",
zap.Int("notIndexed", len(notIndexedErrs)),
zap.Duration("took", time.Since(start)))
require.Equal(t, 0, len(notIndexedErrs),
fmt.Sprintf("not indexed errors: %v", notIndexedErrs[:min(5, len(notIndexedErrs))]))
}
fetchWg.Wait()
log.Info("data indexing verify done",
zap.Int("notIndexed", len(notIndexedErrs)),
zap.Duration("took", time.Since(start)))
require.Equal(t, 0, len(notIndexedErrs),
fmt.Sprintf("not indexed errors: %v", notIndexedErrs[:min(5, len(notIndexedErrs))]))

// Make sure attempted total indexing = skipped + written.
counters = testSetup.Scope().Snapshot().Counters()
Expand Down
8 changes: 4 additions & 4 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ type block struct {
coldMutableSegments []*mutableSegments
shardRangesSegmentsByVolumeType shardRangesSegmentsByVolumeType
newFieldsAndTermsIteratorFn newFieldsAndTermsIteratorFn
newExecutorFn newExecutorFn
newExecutorWithRLockFn newExecutorFn
blockStart time.Time
blockEnd time.Time
blockSize time.Duration
Expand Down Expand Up @@ -234,7 +234,7 @@ func NewBlock(
queryStats: opts.QueryStats(),
}
b.newFieldsAndTermsIteratorFn = newFieldsAndTermsIterator
b.newExecutorFn = b.executorWithRLock
b.newExecutorWithRLockFn = b.executorWithRLock

return b, nil
}
Expand Down Expand Up @@ -423,7 +423,7 @@ func (b *block) queryWithSpan(
return false, ErrUnableToQueryBlockClosed
}

exec, err := b.newExecutorFn()
exec, err := b.newExecutorWithRLockFn()
if err != nil {
return false, err
}
Expand Down Expand Up @@ -513,8 +513,8 @@ func (b *block) queryWithSpan(
}

func (b *block) closeExecutorAsync(exec search.Executor) {
// Note: This only happens if closing the readers isn't clean.
if err := exec.Close(); err != nil {
// Note: This only happens if closing the readers isn't clean.
b.logger.Error("could not close search exec", zap.Error(err))
}
}
Expand Down
24 changes: 11 additions & 13 deletions src/dbnode/storage/index/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,7 @@ func TestBlockQueryExecutorError(t *testing.T) {
b, ok := blk.(*block)
require.True(t, ok)

b.newExecutorFn = func() (search.Executor, error) {
b.RLock() // ensures we call newExecutorFn with RLock, or this would deadlock
defer b.RUnlock()
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return nil, fmt.Errorf("random-err")
}

Expand Down Expand Up @@ -479,7 +477,7 @@ func TestBlockMockQueryExecutorExecError(t *testing.T) {

// dIter:= doc.NewMockIterator(ctrl)
exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}
gomock.InOrder(
Expand All @@ -504,7 +502,7 @@ func TestBlockMockQueryExecutorExecIterErr(t *testing.T) {
require.True(t, ok)

exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}

Expand Down Expand Up @@ -544,7 +542,7 @@ func TestBlockMockQueryExecutorExecLimit(t *testing.T) {
require.True(t, ok)

exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}

Expand Down Expand Up @@ -594,7 +592,7 @@ func TestBlockMockQueryExecutorExecIterCloseErr(t *testing.T) {
require.True(t, ok)

exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}

Expand Down Expand Up @@ -632,7 +630,7 @@ func TestBlockMockQuerySeriesLimitNonExhaustive(t *testing.T) {
require.True(t, ok)

exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}

Expand Down Expand Up @@ -681,7 +679,7 @@ func TestBlockMockQuerySeriesLimitExhaustive(t *testing.T) {
require.True(t, ok)

exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}

Expand Down Expand Up @@ -732,7 +730,7 @@ func TestBlockMockQueryDocsLimitNonExhaustive(t *testing.T) {
require.True(t, ok)

exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}

Expand Down Expand Up @@ -781,7 +779,7 @@ func TestBlockMockQueryDocsLimitExhaustive(t *testing.T) {
require.True(t, ok)

exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}

Expand Down Expand Up @@ -833,7 +831,7 @@ func TestBlockMockQueryMergeResultsMapLimit(t *testing.T) {
require.NoError(t, b.Seal())

exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}

Expand Down Expand Up @@ -885,7 +883,7 @@ func TestBlockMockQueryMergeResultsDupeID(t *testing.T) {
require.True(t, ok)

exec := search.NewMockExecutor(ctrl)
b.newExecutorFn = func() (search.Executor, error) {
b.newExecutorWithRLockFn = func() (search.Executor, error) {
return exec, nil
}

Expand Down
11 changes: 11 additions & 0 deletions src/dbnode/storage/index/mutable_segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,17 @@ func (m *mutableSegments) backgroundCompactWithTask(
return err
}

// Add a read through cache for repeated expensive queries against
// background compacted segments since they can live for quite some
// time and accrue a large set of documents.
if immSeg, ok := compacted.(segment.ImmutableSegment); ok {
var (
plCache = m.opts.PostingsListCache()
readThroughOpts = m.opts.ReadThroughSegmentOptions()
)
compacted = NewReadThroughSegment(immSeg, plCache, readThroughOpts)
}

// Rotate out the replaced frozen segments and add the compacted one.
m.Lock()
defer m.Unlock()
Expand Down
Loading

0 comments on commit c373017

Please sign in to comment.