Skip to content

Commit

Permalink
add comments for exported structs and functions
Browse files Browse the repository at this point in the history
  • Loading branch information
shuaizhang committed Nov 10, 2019
1 parent eb25aef commit cfb819c
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 60 deletions.
6 changes: 1 addition & 5 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func runCompact(
deduper := dedup.NewBucketDeduper(logger, reg, bkt, dedupDir, dedupReplicaLabel, consistencyDelay, blockSyncConcurrency)

f := func() error {
if isEnableDedup(enableDedup, dedupReplicaLabel) {
if enableDedup && len(dedupReplicaLabel) > 0 {
if err := deduper.Dedup(ctx); err != nil {
return errors.Wrap(err, "dedup failed")
}
Expand Down Expand Up @@ -484,7 +484,3 @@ func generateIndexCacheFile(
}
return nil
}

func isEnableDedup(enableDedup bool, dedupReplicaLabel string) bool {
return enableDedup && len(dedupReplicaLabel) > 0
}
1 change: 1 addition & 0 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ type RetryError struct {
err error
}

// Retry wraps the passed non HaltError as a RetryError
func Retry(err error) error {
if IsHaltError(err) {
return err
Expand Down
16 changes: 10 additions & 6 deletions pkg/compact/dedup/dedup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,23 @@ import (
"github.com/thanos-io/thanos/pkg/objstore"
)

type BucketDeduper struct {
type bucketDeduper struct {
logger log.Logger
dedupDir string
replicaLabelName string
bkt objstore.Bucket

metrics *DedupMetrics

syncer *ReplicaSyncer
merger *ReplicaMerger
syncer *replicaSyncer
merger *replicaMerger
}

// NewBucketDeduper return a new bucketDeduper for the given bucket, replica label and directory.
func NewBucketDeduper(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, dedupDir, replicaLabelName string,
consistencyDelay time.Duration, blockSyncConcurrency int) *BucketDeduper {
consistencyDelay time.Duration, blockSyncConcurrency int) *bucketDeduper {
metrics := NewDedupMetrics(reg)
return &BucketDeduper{
return &bucketDeduper{
logger: logger,
dedupDir: dedupDir,
replicaLabelName: replicaLabelName,
Expand All @@ -38,7 +39,10 @@ func NewBucketDeduper(logger log.Logger, reg prometheus.Registerer, bkt objstore
}
}

func (d *BucketDeduper) Dedup(ctx context.Context) error {
// Dedup groups the blocks based on replica label from their metadata information
// and merge different replica's blocks under same time range into one block.
// The dedup process start from oldest blocks to earliest blocks.
func (d *bucketDeduper) Dedup(ctx context.Context) error {
level.Info(d.logger).Log("msg", "start of deduplication")
start := time.Now()
if err := os.RemoveAll(d.dedupDir); err != nil {
Expand Down
43 changes: 23 additions & 20 deletions pkg/compact/dedup/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/thanos-io/thanos/pkg/query"
)

// NewTimeRange return a new TimeRange with given minTime and maxTime
func NewTimeRange(minTime, maxTime int64) *tsdb.TimeRange {
return &tsdb.TimeRange{Min: minTime, Max: maxTime}
}
Expand Down Expand Up @@ -55,6 +56,7 @@ func NewBlockGroup(tr *tsdb.TimeRange, blocks []*metadata.Meta) *BlockGroup {

type BlockGroups []*BlockGroup

// NewBlockGroups return a new BlockGroups with given replicas
func NewBlockGroups(replicas Replicas) BlockGroups {
if len(replicas) == 0 {
return nil
Expand Down Expand Up @@ -117,16 +119,17 @@ func getBlockGroup(blocks []*metadata.Meta, tr *tsdb.TimeRange) *BlockGroup {
return NewBlockGroup(tr, target)
}

type ReplicaMerger struct {
type replicaMerger struct {
logger log.Logger
metrics *DedupMetrics
bkt objstore.Bucket
dir string
replicaLabel string
}

func NewReplicaMerger(logger log.Logger, metrics *DedupMetrics, bkt objstore.Bucket, dir string, replicaLabel string) *ReplicaMerger {
return &ReplicaMerger{
// NewReplicaMerger return a new replicaMerger with given bucket, replica label and directory
func NewReplicaMerger(logger log.Logger, metrics *DedupMetrics, bkt objstore.Bucket, dir string, replicaLabel string) *replicaMerger {
return &replicaMerger{
logger: logger,
metrics: metrics,
bkt: bkt,
Expand All @@ -136,7 +139,7 @@ func NewReplicaMerger(logger log.Logger, metrics *DedupMetrics, bkt objstore.Buc
}

// Do the merge process on the replicas with same resolution
func (rm *ReplicaMerger) Merge(ctx context.Context, resolution int64, replicas Replicas) error {
func (rm *replicaMerger) Merge(ctx context.Context, resolution int64, replicas Replicas) error {
groups := rm.plan(ctx, replicas)

for _, group := range groups {
Expand All @@ -157,7 +160,7 @@ func (rm *ReplicaMerger) Merge(ctx context.Context, resolution int64, replicas R
return nil
}

func (rm *ReplicaMerger) plan(ctx context.Context, replicas Replicas) BlockGroups {
func (rm *replicaMerger) plan(ctx context.Context, replicas Replicas) BlockGroups {
if len(replicas) < 2 {
return nil
}
Expand All @@ -173,7 +176,7 @@ func (rm *ReplicaMerger) plan(ctx context.Context, replicas Replicas) BlockGroup
return target
}

func (rm *ReplicaMerger) prepare(ctx context.Context, group *BlockGroup) error {
func (rm *replicaMerger) prepare(ctx context.Context, group *BlockGroup) error {
var wg sync.WaitGroup
defer wg.Wait()

Expand Down Expand Up @@ -206,7 +209,7 @@ func (rm *ReplicaMerger) prepare(ctx context.Context, group *BlockGroup) error {
return nil
}

func (rm *ReplicaMerger) download(ctx context.Context, b *metadata.Meta) error {
func (rm *replicaMerger) download(ctx context.Context, b *metadata.Meta) error {
blockDir := filepath.Join(rm.dir, b.ULID.String())
if err := rm.deleteLocalBlock(&b.ULID); err != nil {
return compact.Retry(errors.Wrapf(err, "clean up block dir: %s", blockDir))
Expand All @@ -220,12 +223,12 @@ func (rm *ReplicaMerger) download(ctx context.Context, b *metadata.Meta) error {
return nil
}

func (rm *ReplicaMerger) merge(ctx context.Context, resolution int64, group *BlockGroup) (*ulid.ULID, error) {
func (rm *replicaMerger) merge(ctx context.Context, resolution int64, group *BlockGroup) (*ulid.ULID, error) {
if len(group.blocks) == 0 {
return nil, nil
}
baseBlock := group.blocks[0]
readers := make([]*BlockReader, 0, len(group.blocks))
readers := make([]*blockReader, 0, len(group.blocks))

defer func() {
for _, reader := range readers {
Expand Down Expand Up @@ -257,7 +260,7 @@ func (rm *ReplicaMerger) merge(ctx context.Context, resolution int64, group *Blo
return &newId, nil
}

func (rm *ReplicaMerger) newMeta(baseMeta *metadata.Meta, newId ulid.ULID, tr *tsdb.TimeRange) *metadata.Meta {
func (rm *replicaMerger) newMeta(baseMeta *metadata.Meta, newId ulid.ULID, tr *tsdb.TimeRange) *metadata.Meta {
newMeta := *baseMeta
newMeta.ULID = newId
newMeta.MinTime = tr.Min
Expand All @@ -279,7 +282,7 @@ func (rm *ReplicaMerger) newMeta(baseMeta *metadata.Meta, newId ulid.ULID, tr *t
return &newMeta
}

func (rm *ReplicaMerger) write(readers []*BlockReader, blockDir string, meta *metadata.Meta, tr *tsdb.TimeRange, resolution int64) error {
func (rm *replicaMerger) write(readers []*blockReader, blockDir string, meta *metadata.Meta, tr *tsdb.TimeRange, resolution int64) error {
symbols, err := rm.getMergedSymbols(readers)
if err != nil {
return err
Expand All @@ -289,7 +292,7 @@ func (rm *ReplicaMerger) write(readers []*BlockReader, blockDir string, meta *me
return err
}

buf := make([]*SampleReader, len(readers))
buf := make([]*sampleReader, len(readers))

running := true
for running {
Expand Down Expand Up @@ -342,7 +345,7 @@ func (rm *ReplicaMerger) write(readers []*BlockReader, blockDir string, meta *me
return nil
}

func (rm *ReplicaMerger) getMergedSymbols(readers []*BlockReader) (map[string]struct{}, error) {
func (rm *replicaMerger) getMergedSymbols(readers []*blockReader) (map[string]struct{}, error) {
result := make(map[string]struct{})
for _, reader := range readers {
symbols, err := reader.Symbols()
Expand All @@ -358,8 +361,8 @@ func (rm *ReplicaMerger) getMergedSymbols(readers []*BlockReader) (map[string]st
return result, nil
}

func (rm *ReplicaMerger) getMergedChunkSeries(readers []*SampleReader, tr *tsdb.TimeRange, resolution int64) (*ChunkSeries, error) {
buf := make([]*SampleReader, len(readers))
func (rm *replicaMerger) getMergedChunkSeries(readers []*sampleReader, tr *tsdb.TimeRange, resolution int64) (*ChunkSeries, error) {
buf := make([]*sampleReader, len(readers))
copy(buf, readers)

sort.Slice(buf, func(i, j int) bool {
Expand Down Expand Up @@ -402,7 +405,7 @@ func (rm *ReplicaMerger) getMergedChunkSeries(readers []*SampleReader, tr *tsdb.
return NewSampleSeries(lset, mergedData, resolution).ToChunkSeries()
}

func (rm *ReplicaMerger) mergeSamples(a, b map[downsample.AggrType][]*Sample, res int64) map[downsample.AggrType][]*Sample {
func (rm *replicaMerger) mergeSamples(a, b map[downsample.AggrType][]*Sample, res int64) map[downsample.AggrType][]*Sample {
result := make(map[downsample.AggrType][]*Sample)
if len(a) == 0 {
return b
Expand Down Expand Up @@ -431,7 +434,7 @@ func (rm *ReplicaMerger) mergeSamples(a, b map[downsample.AggrType][]*Sample, re
return result
}

func (rm *ReplicaMerger) upload(ctx context.Context, group *BlockGroup, newId *ulid.ULID) error {
func (rm *replicaMerger) upload(ctx context.Context, group *BlockGroup, newId *ulid.ULID) error {
blockDir := filepath.Join(rm.dir, newId.String())
if err := block.VerifyIndex(rm.logger, filepath.Join(blockDir, block.IndexFilename), group.tr.Min, group.tr.Max); err != nil {
return errors.Wrapf(err, "agg block index not valid: %s", newId)
Expand All @@ -445,7 +448,7 @@ func (rm *ReplicaMerger) upload(ctx context.Context, group *BlockGroup, newId *u
return nil
}

func (rm *ReplicaMerger) clean(ctx context.Context, group *BlockGroup, newId *ulid.ULID) error {
func (rm *replicaMerger) clean(ctx context.Context, group *BlockGroup, newId *ulid.ULID) error {
// delete blocks in remote storage
for _, b := range group.blocks {
if b.MaxTime > group.tr.Max {
Expand All @@ -470,7 +473,7 @@ func (rm *ReplicaMerger) clean(ctx context.Context, group *BlockGroup, newId *ul
return nil
}

func (rm *ReplicaMerger) deleteRemoteBlock(id *ulid.ULID) error {
func (rm *replicaMerger) deleteRemoteBlock(id *ulid.ULID) error {
if id == nil {
return nil
}
Expand All @@ -484,7 +487,7 @@ func (rm *ReplicaMerger) deleteRemoteBlock(id *ulid.ULID) error {
return nil
}

func (rm *ReplicaMerger) deleteLocalBlock(id *ulid.ULID) error {
func (rm *replicaMerger) deleteLocalBlock(id *ulid.ULID) error {
if id == nil {
return nil
}
Expand Down
Loading

0 comments on commit cfb819c

Please sign in to comment.