Skip to content

Commit

Permalink
fix: Fix bloom deleter PR after merge (#13167)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Jun 7, 2024
1 parent 9c96d26 commit c996349
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 19 deletions.
7 changes: 2 additions & 5 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,7 @@ func (p *Planner) processTenantTaskResults(
}

combined := append(originalMetas, newMetas...)
outdated, err := outdatedMetas(combined)
if err != nil {
return fmt.Errorf("failed to find outdated metas: %w", err)
}
outdated := outdatedMetas(combined)
level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated))

if err := p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, outdated); err != nil {
Expand Down Expand Up @@ -476,7 +473,7 @@ func (p *Planner) loadTenantWork(

// If this is the first this we see this table, initialize the map
if tenantTableWork[table] == nil {
tenantTableWork[table] = make(map[string][]v1.FingerprintBounds, tenants.Len())
tenantTableWork[table] = make(map[string][]v1.FingerprintBounds, tenants.Remaining())
}

for tenants.Next() && tenants.Err() == nil && ctx.Err() == nil {
Expand Down
38 changes: 26 additions & 12 deletions pkg/bloombuild/planner/versioned_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,17 +210,30 @@ func (t tsdbTokenRange) reassemble(from int) tsdbTokenRange {
return t[:len(t)-(reassembleTo-from)]
}

func outdatedMetas(metas []bloomshipper.Meta) (outdated []bloomshipper.Meta, err error) {
func outdatedMetas(metas []bloomshipper.Meta) []bloomshipper.Meta {
var outdated []bloomshipper.Meta

// Sort metas descending by most recent source when checking
// for outdated metas (older metas are discarded if they don't change the range).
sort.Slice(metas, func(i, j int) bool {
a, err := metas[i].MostRecentSource()
if err != nil {
panic(err.Error())
a, aExists := metas[i].MostRecentSource()
b, bExists := metas[j].MostRecentSource()

if !aExists && !bExists {
// stable sort two sourceless metas by their bounds (easier testing)
return metas[i].Bounds.Less(metas[j].Bounds)
}
b, err := metas[j].MostRecentSource()
if err != nil {
panic(err.Error())

if !aExists {
// If a meta has no sources, it's out of date by definition.
// By convention we sort it to the beginning of the list and will mark it for removal later
return true
}

if !bExists {
// if a exists but b does not, mark b as lesser, sorting b to the
// front
return false
}
return !a.TS.Before(b.TS)
})
Expand All @@ -231,9 +244,11 @@ func outdatedMetas(metas []bloomshipper.Meta) (outdated []bloomshipper.Meta, err
)

for _, meta := range metas {
mostRecent, err := meta.MostRecentSource()
if err != nil {
return nil, err
mostRecent, exists := meta.MostRecentSource()
if !exists {
// if the meta exists but does not reference a TSDB, it's out of date
// TODO(owen-d): this shouldn't happen, figure out why
outdated = append(outdated, meta)
}
version := int(model.TimeFromUnixNano(mostRecent.TS.UnixNano()))
tokenRange, added = tokenRange.Add(version, meta.Bounds)
Expand All @@ -242,6 +257,5 @@ func outdatedMetas(metas []bloomshipper.Meta) (outdated []bloomshipper.Meta, err
}
}

return outdated, nil

return outdated
}
3 changes: 1 addition & 2 deletions pkg/bloombuild/planner/versioned_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,7 @@ func Test_OutdatedMetas(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
outdated, err := outdatedMetas(tc.metas)
require.NoError(t, err)
outdated := outdatedMetas(tc.metas)
require.Equal(t, tc.exp, outdated)
})
}
Expand Down

0 comments on commit c996349

Please sign in to comment.