Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make compact lifecycle more flexible to be overridden for sharded compaction #5964

Merged
merged 26 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c61ac66
POC on sharded compaction
alexqyle Dec 14, 2022
45a4cfa
Refactored partition info to be part of meta
alexqyle Jan 3, 2023
05f1048
Merge branch 'main' into sharded-compaction
alexqyle Jan 3, 2023
9dbe33e
Override prometheus populate block func
alexqyle Mar 7, 2023
712e401
Merge branch 'main' into sharded-compaction
alexqyle Apr 11, 2023
c71ed9d
Make block populator plugable through CompactionLifecycleCallback
alexqyle Apr 13, 2023
554b19d
Uncommented code
alexqyle Apr 13, 2023
1fa6b60
Merge branch 'main' into sharded-compaction
alexqyle Apr 13, 2023
66f60d8
refactor
alexqyle Apr 13, 2023
42dbe8a
rename
alexqyle Apr 13, 2023
3408c8f
Merge branch 'main' into sharded-compaction
alexqyle Apr 20, 2023
ed90f79
make Group.partitionInfo nil-able
alexqyle Apr 20, 2023
390496b
Merge branch 'main' into sharded-compaction
alexqyle Apr 26, 2023
53b6992
Merge branch 'main' into sharded-compaction
alexqyle May 1, 2023
ed28417
Merge branch 'main' into sharded-compaction
alexqyle May 9, 2023
1766f20
Merge branch 'main' into sharded-compaction
alexqyle May 10, 2023
bc2be70
add extension field to thanos meta
alexqyle Jun 8, 2023
a8d2b0b
Merge branch 'main' into sharded-compaction
alexqyle Jun 8, 2023
28927a3
Merge branch 'main' into sharded-compaction
alexqyle Jul 11, 2023
ff552b1
fixed merge issue
alexqyle Jul 11, 2023
d20a9a4
Merge branch 'main' into sharded-compaction
alexqyle Jul 11, 2023
8b75147
Clean up
alexqyle Jul 12, 2023
8fe0d04
Merge branch 'main' into sharded-compaction
alexqyle Jul 12, 2023
fba2e9f
Merge branch 'main' into sharded-compaction
alexqyle Jul 13, 2023
c40a67f
Merge branch 'main' into sharded-compaction
alexqyle Jul 14, 2023
169f38e
Added comment to ConvertExtensions func
alexqyle Jul 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,24 +586,28 @@ func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*
return nil
}

var _ MetadataFilter = &DeduplicateFilter{}
var _ MetadataFilter = &DefaultDeduplicateFilter{}

// DeduplicateFilter is a BaseFetcher filter that filters out older blocks that have exactly the same data.
type DeduplicateFilter interface {
DuplicateIDs() []ulid.ULID
}

// DefaultDeduplicateFilter is a BaseFetcher filter that filters out older blocks that have exactly the same data.
// Not go-routine safe.
type DeduplicateFilter struct {
type DefaultDeduplicateFilter struct {
duplicateIDs []ulid.ULID
concurrency int
mu sync.Mutex
}

// NewDeduplicateFilter creates DeduplicateFilter.
func NewDeduplicateFilter(concurrency int) *DeduplicateFilter {
return &DeduplicateFilter{concurrency: concurrency}
// NewDeduplicateFilter creates DefaultDeduplicateFilter.
func NewDeduplicateFilter(concurrency int) *DefaultDeduplicateFilter {
return &DefaultDeduplicateFilter{concurrency: concurrency}
}

// Filter filters out duplicate blocks that can be formed
// from two or more overlapping blocks that fully submatches the source blocks of the older blocks.
func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error {
func (f *DefaultDeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error {
f.duplicateIDs = f.duplicateIDs[:0]

var wg sync.WaitGroup
Expand Down Expand Up @@ -635,7 +639,7 @@ func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metad
return nil
}

func (f *DeduplicateFilter) filterGroup(metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec) {
func (f *DefaultDeduplicateFilter) filterGroup(metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec) {
sort.Slice(metaSlice, func(i, j int) bool {
ilen := len(metaSlice[i].Compaction.Sources)
jlen := len(metaSlice[j].Compaction.Sources)
Expand Down Expand Up @@ -677,8 +681,8 @@ childLoop:
f.mu.Unlock()
}

// DuplicateIDs returns slice of block ids that are filtered out by DeduplicateFilter.
func (f *DeduplicateFilter) DuplicateIDs() []ulid.ULID {
// DuplicateIDs returns slice of block ids that are filtered out by DefaultDeduplicateFilter.
func (f *DefaultDeduplicateFilter) DuplicateIDs() []ulid.ULID {
return f.duplicateIDs
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,34 @@ type Thanos struct {

// IndexStats contains stats info related to block index.
IndexStats IndexStats `json:"index_stats,omitempty"`

// Extensions are used for plugin any arbitrary additional information for block. Optional.
Extensions any `json:"extensions,omitempty"`
}

type IndexStats struct {
SeriesMaxSize int64 `json:"series_max_size,omitempty"`
ChunkMaxSize int64 `json:"chunk_max_size,omitempty"`
}

func (m *Thanos) ParseExtensions(v any) (any, error) {
return ConvertExtensions(m.Extensions, v)
}

func ConvertExtensions(extensions any, v any) (any, error) {
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
if extensions == nil {
return nil, nil
}
extensionsContent, err := json.Marshal(extensions)
if err != nil {
return nil, err
}
if err = json.Unmarshal(extensionsContent, v); err != nil {
return nil, err
}
return v, nil
}

type Rewrite struct {
// ULIDs of all source head blocks that went into the block.
Sources []ulid.ULID `json:"sources,omitempty"`
Expand Down
123 changes: 123 additions & 0 deletions pkg/block/metadata/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,4 +221,127 @@ func TestMeta_ReadWrite(t *testing.T) {
m1.Thanos.Labels = map[string]string{}
testutil.Equals(t, m1, *retMeta)
})

t.Run("extensions write/read/write", func(t *testing.T) {
b := bytes.Buffer{}
m1 := Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(5, nil),
MinTime: 2424,
MaxTime: 134,
Version: 1,
Compaction: tsdb.BlockMetaCompaction{
Level: 123,
},
Stats: tsdb.BlockStats{NumChunks: 14, NumSamples: 245, NumSeries: 4},
},
Thanos: Thanos{
Labels: map[string]string{"ext": "lset1"},
Source: ReceiveSource,
Downsample: ThanosDownsample{
Resolution: 123144,
},
Extensions: &TestExtensions{
Field1: 1,
Field2: "test_string",
},
},
}
testutil.Ok(t, m1.Write(&b))
testutil.Equals(t, `{
"ulid": "00000000050000000000000000",
"minTime": 2424,
"maxTime": 134,
"stats": {
"numSamples": 245,
"numSeries": 4,
"numChunks": 14
},
"compaction": {
"level": 123
},
"version": 1,
"thanos": {
"labels": {
"ext": "lset1"
},
"downsample": {
"resolution": 123144
},
"source": "receive",
"index_stats": {},
"extensions": {
"field1": 1,
"field2": "test_string"
}
}
}
`, b.String())
retMeta, err := Read(io.NopCloser(&b))
testutil.Ok(t, err)
retExtensions, err := retMeta.Thanos.ParseExtensions(&TestExtensions{})
_, ok := retExtensions.(*TestExtensions)
testutil.Equals(t, true, ok)
testutil.Ok(t, err)
testutil.Equals(t, m1.Thanos.Extensions, retExtensions)
})

t.Run("empty extensions write/read/write", func(t *testing.T) {
b := bytes.Buffer{}
m1 := Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(5, nil),
MinTime: 2424,
MaxTime: 134,
Version: 1,
Compaction: tsdb.BlockMetaCompaction{
Level: 123,
},
Stats: tsdb.BlockStats{NumChunks: 14, NumSamples: 245, NumSeries: 4},
},
Thanos: Thanos{
Labels: map[string]string{"ext": "lset1"},
Source: ReceiveSource,
Downsample: ThanosDownsample{
Resolution: 123144,
},
},
}
testutil.Ok(t, m1.Write(&b))
testutil.Equals(t, `{
"ulid": "00000000050000000000000000",
"minTime": 2424,
"maxTime": 134,
"stats": {
"numSamples": 245,
"numSeries": 4,
"numChunks": 14
},
"compaction": {
"level": 123
},
"version": 1,
"thanos": {
"labels": {
"ext": "lset1"
},
"downsample": {
"resolution": 123144
},
"source": "receive",
"index_stats": {}
}
}
`, b.String())
retMeta, err := Read(io.NopCloser(&b))
testutil.Ok(t, err)
retExtensions, err := retMeta.Thanos.ParseExtensions(&TestExtensions{})
testutil.Ok(t, err)
testutil.Equals(t, m1.Thanos.Extensions, retExtensions)
})
}

type TestExtensions struct {
Field1 int `json:"field1"`
Field2 string `json:"field2"`
}
Loading
Loading