Skip to content

Commit

Permalink
Remove tenant shipper after prunning (#6207)
Browse files Browse the repository at this point in the history
* Remove tenant shipper after prunning

We noticed another edge case with tenant prunning where the shipper would
keep syncing blocks even after the tenant TSDB was removed. The reason for this
is because the shipper runs in parallel in a different goroutine and is not
stopped when the TSDB is gone.

This leads to empty shipper files being created on disk once a tenant is pruned,
and the orphaned TSDBs alert from the mixing starts to fire.

This commit modifies the pruning logic to remove all components from a tenant
once eviction conditions have been met.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Add CHANGELOG entry

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Run make docs

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

---------

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski authored Mar 16, 2023
1 parent 9b5f1fe commit e6bbef2
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6171](https://github.com/thanos-io/thanos/pull/6171) Store: fix error handling on limits.
- [#6183](https://github.com/thanos-io/thanos/pull/6183) Receiver: fix off by one in multitsdb flush that will result in empty blocks if the head only contains one sample
- [#6197](https://github.com/thanos-io/thanos/pull/6197) Exemplar OTel: Fix exemplar for otel to use traceId instead of spanId and sample only if trace is sampled
- [#6207](https://github.com/thanos-io/thanos/pull/6207) Receive: Remove the shipper once a tenant has been pruned.

### Changed
- [#6168](https://github.com/thanos-io/thanos/pull/6168) Receiver: Make ketama hashring fail early when configured with number of nodes lower than the replication factor.
Expand Down
15 changes: 12 additions & 3 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,14 @@ func (t *tenant) shipper() *shipper.Shipper {
func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) {
t.readyS.Set(tenantTSDB)
t.mtx.Lock()
t.setComponents(storeTSDB, ship, exemplarsTSDB)
t.mtx.Unlock()
}

func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) {
t.storeTSDB = storeTSDB
t.ship = ship
t.exemplarsTSDB = exemplarsTSDB
t.mtx.Unlock()
}

func (t *MultiTSDB) Open() error {
Expand Down Expand Up @@ -350,6 +354,10 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
tenantTSDB.mtx.Lock()
defer tenantTSDB.mtx.Unlock()

// Lock the entire tenant to make sure the shipper is not running in parallel.
tenantInstance.mtx.Lock()
defer tenantInstance.mtx.Unlock()

sinceLastAppendMillis = time.Since(time.UnixMilli(head.MaxTime())).Milliseconds()
if sinceLastAppendMillis <= compactThreshold {
return false, nil
Expand All @@ -365,8 +373,8 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
}

level.Info(logger).Log("msg", "Pruning tenant")
if tenantInstance.shipper() != nil {
uploaded, err := tenantInstance.shipper().Sync(ctx)
if tenantInstance.ship != nil {
uploaded, err := tenantInstance.ship.Sync(ctx)
if err != nil {
return false, err
}
Expand All @@ -385,6 +393,7 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
}

tenantInstance.readyS.set(nil)
tenantInstance.setComponents(nil, nil, nil)

return true, nil
}
Expand Down
28 changes: 25 additions & 3 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func TestMultiTSDBPrune(t *testing.T) {
name: "prune tsdbs with object storage",
bucket: objstore.NewInMemBucket(),
expectedTenants: 2,
expectedUploads: 1,
expectedUploads: 2,
},
}

Expand Down Expand Up @@ -454,9 +454,17 @@ func TestMultiTSDBPrune(t *testing.T) {
}
testutil.Equals(t, 3, len(m.TSDBLocalClients()))

testutil.Ok(t, m.Prune(context.Background()))
testutil.Equals(t, test.expectedTenants, len(m.TSDBLocalClients()))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if test.bucket != nil {
go func() {
testutil.Ok(t, syncTSDBs(ctx, m, 10*time.Millisecond))
}()
}

testutil.Ok(t, m.Prune(ctx))
testutil.Equals(t, test.expectedTenants, len(m.TSDBLocalClients()))
var shippedBlocks int
if test.bucket != nil {
testutil.Ok(t, test.bucket.Iter(context.Background(), "", func(s string) error {
Expand All @@ -469,6 +477,20 @@ func TestMultiTSDBPrune(t *testing.T) {
}
}

func syncTSDBs(ctx context.Context, m *MultiTSDB, interval time.Duration) error {
for {
select {
case <-ctx.Done():
return nil
case <-time.After(interval):
_, err := m.Sync(ctx)
if err != nil {
return err
}
}
}
}

func TestMultiTSDBRecreatePrunedTenant(t *testing.T) {
dir := t.TempDir()

Expand Down

0 comments on commit e6bbef2

Please sign in to comment.