Skip to content

Commit

Permalink
[dbnode] Add wide filter (#2949)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored Nov 30, 2020
1 parent cee5f09 commit 9036832
Show file tree
Hide file tree
Showing 22 changed files with 275 additions and 90 deletions.
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ linters:
# New line required before return would require a large fraction of the
# code base to need updating, it's not worth the perceived benefit.
- nlreturn
# Opinionated and sometimes wrong.
- paralleltest
disable-all: false
presets:
# bodyclose, errcheck, gosec, govet, scopelint, staticcheck, typecheck
Expand Down
17 changes: 9 additions & 8 deletions src/dbnode/persist/fs/fs_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions src/dbnode/persist/fs/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"time"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/limits"
Expand Down Expand Up @@ -314,7 +315,7 @@ func (r *blockRetriever) filterAndCompleteWideReqs(
retrieverResources.dataReqs = append(retrieverResources.dataReqs, req)

case streamWideEntryReq:
entry, err := seeker.SeekWideEntry(req.id, seekerResources)
entry, err := seeker.SeekWideEntry(req.id, req.wideFilter, seekerResources)
if err != nil {
if errors.Is(err, errSeekIDNotFound) {
// Missing, return empty result, successful lookup.
Expand All @@ -329,7 +330,7 @@ func (r *blockRetriever) filterAndCompleteWideReqs(

// Enqueue for fetch in batch in offset ascending order.
req.wideEntry = entry
entry.Shard = req.shard
req.wideEntry.Shard = req.shard
retrieverResources.appendWideEntryReq(req)

default:
Expand Down Expand Up @@ -663,10 +664,12 @@ func (r *blockRetriever) StreamWideEntry(
shard uint32,
id ident.ID,
startTime time.Time,
filter schema.WideEntryFilter,
nsCtx namespace.Context,
) (block.StreamedWideEntry, error) {
req := r.reqPool.Get()
req.streamReqType = streamWideEntryReq
req.wideFilter = filter

found, err := r.streamRequest(ctx, req, shard, id, startTime, nsCtx)
if err != nil {
Expand Down Expand Up @@ -788,6 +791,7 @@ type retrieveRequest struct {
streamReqType streamReqType
indexEntry IndexEntry
wideEntry xio.WideEntry
wideFilter schema.WideEntryFilter
reader xio.SegmentReader

err error
Expand Down Expand Up @@ -965,6 +969,7 @@ func (req *retrieveRequest) resetForReuse() {
req.streamReqType = streamInvalidReq
req.indexEntry = IndexEntry{}
req.wideEntry = xio.WideEntry{}
req.wideFilter = nil
req.reader = nil
req.err = nil
req.notFound = false
Expand Down
11 changes: 11 additions & 0 deletions src/dbnode/persist/fs/seek.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ func (s *seeker) SeekIndexEntry(
// far we know it does not exist.
func (s *seeker) SeekWideEntry(
id ident.ID,
filter schema.WideEntryFilter,
resources ReusableSeekerResources,
) (xio.WideEntry, error) {
offset, err := s.indexLookup.getNearestIndexFileOffset(id, resources)
Expand Down Expand Up @@ -508,6 +509,16 @@ func (s *seeker) SeekWideEntry(
return xio.WideEntry{}, instrument.InvariantErrorf(err.Error())
}

if filter != nil {
filtered, err := filter(entry)
if err != nil || filtered {
// NB: this entry is not being taken, can free memory.
resources.decodeIndexEntryBytesPool.Put(entry.ID)
resources.decodeIndexEntryBytesPool.Put(entry.EncodedTags)
return xio.WideEntry{}, err
}
}

if status != xmsgpack.MatchedLookupStatus {
// No longer being used so we can return to the pool.
resources.decodeIndexEntryBytesPool.Put(entry.ID)
Expand Down
13 changes: 11 additions & 2 deletions src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs/msgpack"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage/block"
Expand Down Expand Up @@ -221,7 +222,11 @@ type DataFileSetSeeker interface {

// SeekWideEntry seeks in a manner similar to SeekIndexEntry, but
// instead yields a wide entry checksum of the series.
SeekWideEntry(id ident.ID, resources ReusableSeekerResources) (xio.WideEntry, error)
SeekWideEntry(
id ident.ID,
filter schema.WideEntryFilter,
resources ReusableSeekerResources,
) (xio.WideEntry, error)

// Range returns the time range associated with data in the volume
Range() xtime.Range
Expand Down Expand Up @@ -259,7 +264,11 @@ type ConcurrentDataFileSetSeeker interface {
SeekIndexEntry(id ident.ID, resources ReusableSeekerResources) (IndexEntry, error)

// SeekWideEntry is the same as in DataFileSetSeeker.
SeekWideEntry(id ident.ID, resources ReusableSeekerResources) (xio.WideEntry, error)
SeekWideEntry(
id ident.ID,
filter schema.WideEntryFilter,
resources ReusableSeekerResources,
) (xio.WideEntry, error)

// ConcurrentIDBloomFilter is the same as in DataFileSetSeeker.
ConcurrentIDBloomFilter() *ManagedConcurrentBloomFilter
Expand Down
3 changes: 3 additions & 0 deletions src/dbnode/persist/schema/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ type WideEntry struct {
MetadataChecksum int64
}

// WideEntryFilter provides a filter for wide entries.
type WideEntryFilter func(entry WideEntry) (bool, error)

// IndexEntryHasher hashes an index entry.
type IndexEntryHasher interface {
// HashIndexEntry computes a hash value for this IndexEntry using its ID, tags,
Expand Down
17 changes: 9 additions & 8 deletions src/dbnode/storage/block/block_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion src/dbnode/storage/block/retriever_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/x/context"
Expand Down Expand Up @@ -116,10 +117,11 @@ func (r *shardBlockRetriever) StreamWideEntry(
ctx context.Context,
id ident.ID,
blockStart time.Time,
filter schema.WideEntryFilter,
nsCtx namespace.Context,
) (StreamedWideEntry, error) {
return r.DatabaseBlockRetriever.StreamWideEntry(ctx, r.shard, id,
blockStart, nsCtx)
blockStart, filter, nsCtx)
}

type shardBlockRetrieverManager struct {
Expand Down
3 changes: 3 additions & 0 deletions src/dbnode/storage/block/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/ts"
Expand Down Expand Up @@ -312,6 +313,7 @@ type DatabaseBlockRetriever interface {
shard uint32,
id ident.ID,
blockStart time.Time,
filter schema.WideEntryFilter,
nsCtx namespace.Context,
) (StreamedWideEntry, error)

Expand All @@ -336,6 +338,7 @@ type DatabaseShardBlockRetriever interface {
ctx context.Context,
id ident.ID,
blockStart time.Time,
filter schema.WideEntryFilter,
nsCtx namespace.Context,
) (StreamedWideEntry, error)
}
Expand Down
12 changes: 5 additions & 7 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,12 +941,9 @@ func (d *db) ReadEncoded(
return n.ReadEncoded(ctx, id, start, end)
}

// batchProcessWideQuery runs the given query against the namespace index,
// iterating in a batchwise fashion across all matching IDs, applying the given
// IDBatchProcessor batch processing function to each ID discovered.
func (d *db) batchProcessWideQuery(
func (d *db) BatchProcessWideQuery(
ctx context.Context,
n databaseNamespace,
n Namespace,
query index.Query,
batchProcessor IDBatchProcessor,
opts index.WideQueryOptions,
Expand Down Expand Up @@ -1029,8 +1026,9 @@ func (d *db) WideQuery(
streamedWideEntries := make([]block.StreamedWideEntry, 0, batchSize)
indexChecksumProcessor := func(batch *ident.IDBatch) error {
streamedWideEntries = streamedWideEntries[:0]

for _, shardID := range batch.ShardIDs {
streamedWideEntry, err := n.FetchWideEntry(ctx, shardID.ID, start)
streamedWideEntry, err := n.FetchWideEntry(ctx, shardID.ID, start, nil)
if err != nil {
return err
}
Expand All @@ -1050,7 +1048,7 @@ func (d *db) WideQuery(
return nil
}

err = d.batchProcessWideQuery(ctx, n, query, indexChecksumProcessor, opts)
err = d.BatchProcessWideQuery(ctx, n, query, indexChecksumProcessor, opts)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ func TestWideQuery(t *testing.T) {
ns *MockdatabaseNamespace, d *db, q index.Query,
now time.Time, shards []uint32, iterOpts index.IterationOptions) {
ns.EXPECT().FetchWideEntry(gomock.Any(),
ident.StringID("foo"), gomock.Any()).
ident.StringID("foo"), gomock.Any(), nil).
Return(block.EmptyStreamedWideEntry, nil)

_, err := d.WideQuery(ctx, ident.StringID("testns"), q, now, shards, iterOpts)
Expand Down
4 changes: 3 additions & 1 deletion src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
Expand Down Expand Up @@ -922,6 +923,7 @@ func (n *dbNamespace) FetchWideEntry(
ctx context.Context,
id ident.ID,
blockStart time.Time,
filter schema.WideEntryFilter,
) (block.StreamedWideEntry, error) {
callStart := n.nowFn()
shard, nsCtx, err := n.readableShardFor(id)
Expand All @@ -931,7 +933,7 @@ func (n *dbNamespace) FetchWideEntry(
return block.EmptyStreamedWideEntry, err
}

res, err := shard.FetchWideEntry(ctx, id, blockStart, nsCtx)
res, err := shard.FetchWideEntry(ctx, id, blockStart, filter, nsCtx)
n.metrics.read.ReportSuccessOrError(err, n.nowFn().Sub(callStart))

return res, err
Expand Down
Loading

0 comments on commit 9036832

Please sign in to comment.