Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull/374 review #4

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
83a4ef1
Typo fixed: "inlcuding" -> "including" (#462)
JoeWrightss Dec 3, 2018
bac9cbe
no overlapping on compaction when an existing block is not within def…
krasi-georgiev Dec 4, 2018
b4a2103
Release v0.3.0 (#464)
krasi-georgiev Dec 4, 2018
c8c03ff
fix some typos (#466)
JoeWrightss Dec 7, 2018
cbfda5a
Fixs typo: "compltely" to "completely" (#470)
JoeWrightss Dec 11, 2018
a2779cc
fix flaky tests: TestDisableAutoCompactions,TestBlockRanges (#472)
krasi-georgiev Dec 12, 2018
2962202
fix windows tests (#469)
krasi-georgiev Dec 13, 2018
fd24a1c
re add the missing check_license test (#476)
krasi-georgiev Dec 13, 2018
79aa611
release 0.3.1 (#477)
krasi-georgiev Dec 13, 2018
79869d9
fix race for minValidTime (#479)
krasi-georgiev Dec 14, 2018
520ab7d
re-add the missing prometheus_tsdb_wal_corruptions_total (#473)
krasi-georgiev Dec 18, 2018
22e3aeb
Add WALSegmentSize as an option of tsdb creation (#450)
glutamatt Dec 18, 2018
6d489a1
fix TestWALSegmentSizeOption for windows. (#482)
krasi-georgiev Dec 19, 2018
915d7cf
Add a tsdbutil command to analyse churn etc. (#478)
brian-brazil Dec 28, 2018
2e0571c
remove unused WALFlushInterval option and NopWAL struct (#468)
krasi-georgiev Dec 28, 2018
090b685
remove unused `PrefixMatcher` (#474)
krasi-georgiev Dec 28, 2018
a90a719
createPopulatedBlock returns the block dir instead of the block (#487)
krasi-georgiev Dec 29, 2018
eb6586f
rename createPopulatedBlock to createBlock and use it instead ot crea…
krasi-georgiev Dec 29, 2018
48c439d
fix statick check errors (#475)
krasi-georgiev Jan 2, 2019
b2d7bbd
Move series fetches out of inner loop of SortedPostings. (#485)
brian-brazil Jan 3, 2019
296f943
More efficient Merge implementation. (#486)
brian-brazil Jan 3, 2019
8d991bd
Delete temp checkpoint folder on error. (#415)
krasi-georgiev Jan 7, 2019
c065fa6
Exposed helper methods for reading index bytes. (#492)
bwplotka Jan 11, 2019
a360aa3
change variable name metrics to labels (#496)
Jan 14, 2019
bff5aa4
Missing the len of crc32 when calculating maxLen in WriteChunks (#494)
naivewong Jan 14, 2019
ebf5d74
Added storage size based retention method and new metrics (#343)
mknapphrt Jan 16, 2019
98988fd
Merge remote-tracking branch 'upstream/master' into pull/374-review
Jan 16, 2019
e0bb757
rebased
Jan 16, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,23 @@
sudo: required
dist: trusty
language: go
os:
- windows
- linux
- osx

go:
- 1.10.x
- 1.11.x

go_import_path: github.com/prometheus/tsdb

before_install:
- if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then choco install make; fi

install:
- go get -v -t ./...
- make deps

script:
# `staticcheck` target is omitted due to linting errors
- make check_license style unused test
- if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then make test; else make; fi
23 changes: 19 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
## master / unreleased
- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
- [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374)
- [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change:
- added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total`
- new public interface `SizeReader: Size() int64`
- `OpenBlock` signature changed to take a logger.
- [REMOVED] `PrefixMatcher` is considered unused so was removed.
- [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere.

- `LastCheckpoint` used to return just the segment name and now it returns the full relative path.
- `NewSegmentsRangeReader` can now read over miltiple wal ranges by using the new `SegmentRange` struct.
- `CorruptionErr` now also exposes the Segment `Dir` which is added when displaying any errors.
- \[CHANGE\] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374)
## 0.3.1
- [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers.

## 0.3.0
- [CHANGE] `LastCheckpoint()` used to return just the segment name and now it returns the full relative path.
- [CHANGE] `NewSegmentsRangeReader()` can now read over miltiple wal ranges by using the new `SegmentRange{}` struct.
- [CHANGE] `CorruptionErr{}` now also exposes the Segment `Dir` which is added when displaying any errors.
- [CHANGE] `Head.Init()` is changed to `Head.Init(minValidTime int64)`
- [CHANGE] `SymbolTable()` renamed to `SymbolTableSize()` to make the name consistent with the `Block{ symbolTableSize uint64 }` field.
- [CHANGE] `wal.Reader{}` now exposes `Segment()` for the current segment being read and `Offset()` for the current offset.
-[FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc.
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ TSDB_BENCHMARK_NUM_METRICS ?= 1000
TSDB_BENCHMARK_DATASET ?= "$(TSDB_PROJECT_DIR)/testdata/20kseries.json"
TSDB_BENCHMARK_OUTPUT_DIR ?= "$(TSDB_CLI_DIR)/benchout"

STATICCHECK_IGNORE =
include Makefile.common

.PHONY: deps
deps:
@echo ">> getting dependencies"
GO111MODULE=$(GO111MODULE) $(GO) get $(GOOPTS) -t ./...

build:
@$(GO) build -o $(TSDB_BIN) $(TSDB_CLI_DIR)
GO111MODULE=$(GO111MODULE) $(GO) build -o $(TSDB_BIN) $(TSDB_CLI_DIR)

bench: build
@echo ">> running benchmark, writing result to $(TSDB_BENCHMARK_OUTPUT_DIR)"
Expand Down
38 changes: 36 additions & 2 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"path/filepath"
"sync"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunkenc"
Expand Down Expand Up @@ -140,6 +142,12 @@ type Appendable interface {
Appender() Appender
}

// SizeReader returns the size of the object in bytes.
type SizeReader interface {
// Size returns the size in bytes.
Size() int64
}

// BlockMeta provides meta information about a block.
type BlockMeta struct {
// Unique identifier for the block and its contents. Changes on compaction.
Expand All @@ -166,6 +174,7 @@ type BlockStats struct {
NumSeries uint64 `json:"numSeries,omitempty"`
NumChunks uint64 `json:"numChunks,omitempty"`
NumTombstones uint64 `json:"numTombstones,omitempty"`
NumBytes int64 `json:"numBytes,omitempty"`
}

// BlockDesc describes a block by ULID and time range.
Expand Down Expand Up @@ -260,7 +269,10 @@ type Block struct {

// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs.
func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) {
if logger == nil {
logger = log.NewNopLogger()
}
meta, err := readMetaFile(dir)
if err != nil {
return nil, err
Expand All @@ -275,11 +287,20 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
return nil, err
}

tr, err := readTombstones(dir)
tr, tsr, err := readTombstones(dir)
if err != nil {
return nil, err
}

// TODO refactor to set this at block creation time as
// that would be the logical place for a block size to be calculated.
bs := blockSize(cr, ir, tsr)
meta.Stats.NumBytes = bs
err = writeMetaFile(dir, meta)
if err != nil {
level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err)
}

pb := &Block{
dir: dir,
meta: *meta,
Expand All @@ -291,6 +312,16 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
return pb, nil
}

func blockSize(rr ...SizeReader) int64 {
var total int64
for _, r := range rr {
if r != nil {
total += r.Size()
}
}
return total
}

// Close closes the on-disk block. It blocks as long as there are readers reading from the block.
func (pb *Block) Close() error {
pb.mtx.Lock()
Expand Down Expand Up @@ -318,6 +349,9 @@ func (pb *Block) Dir() string { return pb.dir }
// Meta returns meta information about the block.
func (pb *Block) Meta() BlockMeta { return pb.meta }

// Size returns the number of bytes that the block takes up.
func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes }

// ErrClosing is returned when a block is in the process of being closed.
var ErrClosing = errors.New("block is closing")

Expand Down
55 changes: 16 additions & 39 deletions block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"testing"

"github.com/go-kit/kit/log"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
)
Expand All @@ -46,60 +45,40 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Ok(t, err)
defer os.RemoveAll(tmpdir)

b := createEmptyBlock(t, tmpdir, &BlockMeta{Version: 2})

blockDir := createBlock(t, tmpdir, 0, 0, 0)
b, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
testutil.Equals(t, false, b.meta.Compaction.Failed)
testutil.Ok(t, b.setCompactionFailed())
testutil.Equals(t, true, b.meta.Compaction.Failed)
testutil.Ok(t, b.Close())

b, err = OpenBlock(tmpdir, nil)
b, err = OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
testutil.Equals(t, true, b.meta.Compaction.Failed)
testutil.Ok(t, b.Close())
}

// createEmpty block creates a block with the given meta but without any data.
func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block {
testutil.Ok(t, os.MkdirAll(dir, 0777))

testutil.Ok(t, writeMetaFile(dir, meta))

ir, err := index.NewWriter(filepath.Join(dir, indexFilename))
testutil.Ok(t, err)
testutil.Ok(t, ir.Close())

testutil.Ok(t, os.MkdirAll(chunkDir(dir), 0777))

testutil.Ok(t, writeTombstoneFile(dir, newMemTombstones()))

b, err := OpenBlock(dir, nil)
testutil.Ok(t, err)
return b
}

// createPopulatedBlock creates a block with nSeries series, and nSamples samples.
func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Block {
// createBlock creates a block with nSeries series, filled with
// samples of the given mint,maxt time range and returns its dir.
func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) string {
head, err := NewHead(nil, nil, nil, 2*60*60*1000)
testutil.Ok(tb, err)
defer head.Close()

lbls, err := labels.ReadLabels(filepath.Join("testdata", "20kseries.json"), nSeries)
testutil.Ok(tb, err)
refs := make([]uint64, nSeries)
var ref uint64

for n := 0; n < nSamples; n++ {
for ts := mint; ts <= maxt; ts++ {
app := head.Appender()
ts := n * 1000
for i, lbl := range lbls {
if refs[i] != 0 {
err := app.AddFast(refs[i], int64(ts), rand.Float64())
if err == nil {
continue
}
for _, lbl := range lbls {
err := app.AddFast(ref, ts, rand.Float64())
if err == nil {
continue
}
ref, err := app.Add(lbl, int64(ts), rand.Float64())
ref, err = app.Add(lbl, int64(ts), rand.Float64())
testutil.Ok(tb, err)
refs[i] = ref
}
err := app.Commit()
testutil.Ok(tb, err)
Expand All @@ -113,7 +92,5 @@ func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Blo
ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime(), nil)
testutil.Ok(tb, err)

blk, err := OpenBlock(filepath.Join(dir, ulid.String()), nil)
testutil.Ok(tb, err)
return blk
return filepath.Join(dir, ulid.String())
}
10 changes: 8 additions & 2 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type CheckpointStats struct {
DroppedSamples int
DroppedTombstones int
TotalSeries int // Processed series including dropped ones.
TotalSamples int // Processed samples inlcuding dropped ones.
TotalSamples int // Processed samples including dropped ones.
TotalTombstones int // Processed tombstones including dropped ones.
}

Expand Down Expand Up @@ -128,7 +128,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
defer sgmReader.Close()
}

cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", to))
cpdir := filepath.Join(w.Dir(), fmt.Sprintf(checkpointPrefix+"%06d", to))
cpdirtmp := cpdir + ".tmp"

if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
Expand All @@ -139,6 +139,12 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
return nil, errors.Wrap(err, "open checkpoint")
}

// Ensures that an early return caused by an error doesn't leave any tmp files.
defer func() {
cp.Close()
os.RemoveAll(cpdirtmp)
}()

r := wal.NewReader(sgmReader)

var (
Expand Down
34 changes: 32 additions & 2 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
package tsdb

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"

"github.com/pkg/errors"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
Expand All @@ -31,11 +34,11 @@ func TestLastCheckpoint(t *testing.T) {
testutil.Ok(t, err)
defer os.RemoveAll(dir)

s, k, err := LastCheckpoint(dir)
_, _, err = LastCheckpoint(dir)
testutil.Equals(t, ErrNotFound, err)

testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.0000"), 0777))
s, k, err = LastCheckpoint(dir)
s, k, err := LastCheckpoint(dir)
testutil.Ok(t, err)
testutil.Equals(t, filepath.Join(dir, "checkpoint.0000"), s)
testutil.Equals(t, 0, k)
Expand Down Expand Up @@ -180,3 +183,30 @@ func TestCheckpoint(t *testing.T) {
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
}, series)
}

func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
// Create a new wal with an invalid records.
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
w, err := wal.NewSize(nil, nil, dir, 64*1024)
testutil.Ok(t, err)
testutil.Ok(t, w.Log([]byte{99}))
w.Close()

// Run the checkpoint and since the wal contains an invalid records this should return an error.
_, err = Checkpoint(w, 0, 1, nil, 0)
testutil.NotOk(t, err)

// Walk the wal dir to make sure there are no tmp folder left behind after the error.
err = filepath.Walk(w.Dir(), func(path string, info os.FileInfo, err error) error {
if err != nil {
return errors.Wrapf(err, "access err %q: %v\n", path, err)
}
if info.IsDir() && strings.HasSuffix(info.Name(), ".tmp") {
return fmt.Errorf("wal dir contains temporary folder:%s", info.Name())
}
return nil
})
testutil.Ok(t, err)
}
Loading