Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Concurrent time series indexing within a single batch #2146

Merged
merged 11 commits into from
Mar 5, 2020
1 change: 1 addition & 0 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,7 @@ func (i *nsIndex) Flush(
if err != nil {
return err
}
defer builder.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.


var evicted int
for _, block := range flushable {
Expand Down
26 changes: 16 additions & 10 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,18 +706,24 @@ func (b *block) cleanupForegroundCompactWithLock() {
b.foregroundSegments = nil

// Free compactor resources.
if b.compact.foregroundCompactor == nil {
return
if b.compact.foregroundCompactor != nil {
if err := b.compact.foregroundCompactor.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) {
l.Error("error closing index block foreground compactor", zap.Error(err))
})
}
b.compact.foregroundCompactor = nil
}

if err := b.compact.foregroundCompactor.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) {
l.Error("error closing index block foreground compactor", zap.Error(err))
})
// Free segment builder resources.
if b.compact.segmentBuilder != nil {
if err := b.compact.segmentBuilder.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) {
l.Error("error closing index block segment builder", zap.Error(err))
})
}
b.compact.segmentBuilder = nil
}

b.compact.foregroundCompactor = nil
b.compact.segmentBuilder = nil
}

func (b *block) executorWithRLock() (search.Executor, error) {
Expand Down Expand Up @@ -1484,7 +1490,7 @@ func (b *block) writeBatchErrorInvalidState(state blockState) error {

// blockCompact has several lazily allocated compaction components.
type blockCompact struct {
segmentBuilder segment.DocumentsBuilder
segmentBuilder segment.CloseableDocumentsBuilder
foregroundCompactor *compaction.Compactor
backgroundCompactor *compaction.Compactor
compactingForeground bool
Expand Down
13 changes: 13 additions & 0 deletions src/m3ninx/index/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"bytes"
"errors"
"fmt"
"sync"

"github.com/m3db/m3/src/m3ninx/doc"
)
Expand Down Expand Up @@ -79,6 +80,8 @@ func NewBatch(docs []doc.Document, opts ...BatchOption) Batch {
// BatchPartialError indicates an error was encountered inserting some documents in a batch.
// It is not safe for concurrent use.
type BatchPartialError struct {
sync.Mutex

errs []BatchError
}

Expand Down Expand Up @@ -138,6 +141,16 @@ func (e *BatchPartialError) Add(err BatchError) {
e.errs = append(e.errs, err)
}

// AddWithLock adds an error to e with a lock. Any nil errors are ignored.
func (e *BatchPartialError) AddWithLock(err BatchError) {
if err.Err == nil {
return
}
e.Lock()
e.errs = append(e.errs, err)
e.Unlock()
}

// Errs returns the errors with the indexes of the documents in the batch
// which were not indexed.
func (e *BatchPartialError) Errs() []BatchError {
Expand Down
189 changes: 132 additions & 57 deletions src/m3ninx/index/segment/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,43 @@ package builder
import (
"errors"
"fmt"
"sync"

"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/index"
"github.com/m3db/m3/src/m3ninx/index/segment"
"github.com/m3db/m3/src/m3ninx/postings"
"github.com/m3db/m3/src/m3ninx/util"

"github.com/cespare/xxhash"
)

var (
errDocNotFound = errors.New("doc not found")
errClosed = errors.New("builder closed")
)

const (
// Slightly buffer the work to avoid blocking main thread.
indexQueueSize = 2 << 9 // 1024
)

type indexJob struct {
wg *sync.WaitGroup

id postings.ID
field doc.Field

shard int
idx int
batchErr *index.BatchPartialError
}

type builderStatus struct {
sync.RWMutex
closed bool
}

type builder struct {
opts Options
newUUIDFn util.NewUUIDFn
Expand All @@ -44,29 +69,47 @@ type builder struct {
batchSizeOne index.Batch
docs []doc.Document
idSet *IDsMap
fields *fieldsMap
uniqueFields [][]byte
fields *shardedFieldsMap
uniqueFields [][][]byte

indexQueues []chan indexJob
status builderStatus
}

// NewBuilderFromDocuments returns a builder from documents, it is
// not thread safe and is optimized for insertion speed and a
// final build step when documents are indexed.
func NewBuilderFromDocuments(opts Options) (segment.DocumentsBuilder, error) {
return &builder{
func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, error) {
concurrency := opts.Concurrency()
b := &builder{
opts: opts,
newUUIDFn: opts.NewUUIDFn(),
batchSizeOne: index.Batch{
Docs: make([]doc.Document, 1),
AllowPartialUpdates: false,
Docs: make([]doc.Document, 1),
},
idSet: NewIDsMap(IDsMapOptions{
InitialSize: opts.InitialCapacity(),
}),
fields: newFieldsMap(fieldsMapOptions{
InitialSize: opts.InitialCapacity(),
}),
uniqueFields: make([][]byte, 0, opts.InitialCapacity()),
}, nil
uniqueFields: make([][][]byte, 0, concurrency),
indexQueues: make([]chan indexJob, 0, concurrency),
}

for i := 0; i < concurrency; i++ {
indexQueue := make(chan indexJob, indexQueueSize)
b.indexQueues = append(b.indexQueues, indexQueue)
go b.indexWorker(indexQueue)
robskillington marked this conversation as resolved.
Show resolved Hide resolved

// Give each shard a fraction of the configured initial capacity.
shardInitialCapacity := opts.InitialCapacity()
if shardInitialCapacity > 0 {
shardInitialCapacity /= concurrency
}
shardUniqueFields := make([][]byte, 0, shardInitialCapacity)
b.uniqueFields = append(b.uniqueFields, shardUniqueFields)
b.fields = newShardedFieldsMap(concurrency, shardInitialCapacity)
}

return b, nil
}

func (b *builder) Reset(offset postings.ID) {
Expand All @@ -83,15 +126,15 @@ func (b *builder) Reset(offset postings.ID) {
b.idSet.Reset()

// Keep fields around, just reset the terms set for each one.
for _, entry := range b.fields.Iter() {
entry.Value().reset()
}
b.fields.ResetTermsSets()

// Reset the unique fields slice
for i := range b.uniqueFields {
b.uniqueFields[i] = nil
for i, shardUniqueFields := range b.uniqueFields {
for i := range shardUniqueFields {
shardUniqueFields[i] = nil
}
b.uniqueFields[i] = shardUniqueFields[:0]
}
b.uniqueFields = b.uniqueFields[:0]
}

func (b *builder) Insert(d doc.Document) ([]byte, error) {
Expand All @@ -107,15 +150,20 @@ func (b *builder) Insert(d doc.Document) ([]byte, error) {
}

func (b *builder) InsertBatch(batch index.Batch) error {
b.status.RLock()
defer b.status.RUnlock()

if b.status.closed {
return errClosed
}

// NB(r): This is all kept in a single method to make the
// insertion path fast.
var wg sync.WaitGroup
batchErr := index.NewBatchPartialError()
for i, d := range batch.Docs {
// Validate doc
if err := d.Validate(); err != nil {
if !batch.AllowPartialUpdates {
return err
}
batchErr.Add(index.BatchError{Err: err, Idx: i})
continue
}
Expand All @@ -124,9 +172,6 @@ func (b *builder) InsertBatch(batch index.Batch) error {
if !d.HasID() {
id, err := b.newUUIDFn()
if err != nil {
if !batch.AllowPartialUpdates {
return err
}
batchErr.Add(index.BatchError{Err: err, Idx: i})
continue
}
Expand All @@ -139,9 +184,6 @@ func (b *builder) InsertBatch(batch index.Batch) error {

// Avoid duplicates.
if _, ok := b.idSet.Get(d.ID); ok {
if !batch.AllowPartialUpdates {
return index.ErrDuplicateID
}
batchErr.Add(index.BatchError{Err: index.ErrDuplicateID, Idx: i})
continue
}
Expand All @@ -158,50 +200,73 @@ func (b *builder) InsertBatch(batch index.Batch) error {

// Index the terms.
for _, f := range d.Fields {
if err := b.index(postings.ID(postingsListID), f); err != nil {
if !batch.AllowPartialUpdates {
return err
}
batchErr.Add(index.BatchError{Err: err, Idx: i})
}
b.index(&wg, postings.ID(postingsListID), f, i, batchErr)
}
if err := b.index(postings.ID(postingsListID), doc.Field{
b.index(&wg, postings.ID(postingsListID), doc.Field{
Name: doc.IDReservedFieldName,
Value: d.ID,
}); err != nil {
if !batch.AllowPartialUpdates {
return err
}
batchErr.Add(index.BatchError{Err: err, Idx: i})
}
}, i, batchErr)
}

// Wait for all the concurrent indexing jobs to finish.
wg.Wait()

if !batchErr.IsEmpty() {
return batchErr
}
return nil
}

func (b *builder) index(id postings.ID, f doc.Field) error {
terms, ok := b.fields.Get(f.Name)
if !ok {
terms = newTerms(b.opts)
b.fields.SetUnsafe(f.Name, terms, fieldsMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: true,
})
func (b *builder) index(
wg *sync.WaitGroup,
id postings.ID,
f doc.Field,
i int,
batchErr *index.BatchPartialError,
) {
wg.Add(1)
// NB(bodu): To avoid locking inside of the terms, we shard the work
// by field name.
shard := b.calculateShard(f.Name)
b.indexQueues[shard] <- indexJob{
wg: wg,
id: id,
field: f,
shard: shard,
idx: i,
batchErr: batchErr,
}
}

// If empty field, track insertion of this key into the fields
// collection for correct response when retrieving all fields.
newField := terms.size() == 0
if err := terms.post(f.Value, id); err != nil {
return err
}
if newField {
b.uniqueFields = append(b.uniqueFields, f.Name)
func (b *builder) indexWorker(indexQueue chan indexJob) {
for job := range indexQueue {
terms, ok := b.fields.ShardedGet(job.shard, job.field.Name)
if !ok {
// NB(bodu): Check again within the lock to make sure we aren't making concurrent map writes.
terms = newTerms(b.opts)
b.fields.ShardedSetUnsafe(job.shard, job.field.Name, terms, fieldsMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: true,
})
}

// If empty field, track insertion of this key into the fields
// collection for correct response when retrieving all fields.
newField := terms.size() == 0
// NB(bodu): Bulk of the cpu time during insertion is spent inside of terms.post().
err := terms.post(job.field.Value, job.id)
if err != nil {
job.batchErr.AddWithLock(index.BatchError{Err: err, Idx: job.idx})
}
if err == nil && newField {
b.uniqueFields[job.shard] = append(b.uniqueFields[job.shard], job.field.Name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this happen if the post failed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good call, moving around the logic here.

}
job.wg.Done()
}
return nil
}

func (b *builder) calculateShard(field []byte) int {
return int(xxhash.Sum64(field) % uint64(len(b.indexQueues)))
}

func (b *builder) AllDocs() (index.IDDocIterator, error) {
Expand Down Expand Up @@ -236,7 +301,7 @@ func (b *builder) Fields() (segment.FieldsIterator, error) {
}

func (b *builder) Terms(field []byte) (segment.TermsIterator, error) {
terms, ok := b.fields.Get(field)
terms, ok := b.fields.ShardedGet(b.calculateShard(field), field)
if !ok {
return nil, fmt.Errorf("field not found: %s", string(field))
}
Expand All @@ -247,3 +312,13 @@ func (b *builder) Terms(field []byte) (segment.TermsIterator, error) {

return newTermsIter(terms.uniqueTerms), nil
}

func (b *builder) Close() error {
b.status.Lock()
defer b.status.Unlock()
for _, q := range b.indexQueues {
close(q)
robskillington marked this conversation as resolved.
Show resolved Hide resolved
}
b.status.closed = true
return nil
}
Loading