Skip to content

Commit

Permalink
Addressed comments.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Jan 20, 2020
1 parent 0d48d75 commit 462d607
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 54 deletions.
84 changes: 49 additions & 35 deletions pkg/block/indexheader/binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"path/filepath"
"sort"
"time"
"unsafe"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -39,6 +40,8 @@ const (
MagicIndex = 0xBAAAD792

symbolFactor = 32

postingLengthFieldSize = 4
)

// The table gets initialized with sync.Once but may still cause a race
Expand All @@ -60,7 +63,7 @@ func newCRC32() hash.Hash32 {
type BinaryTOC struct {
// Symbols holds start to the same symbols section as index related to this index header.
Symbols uint64
// PostingsTable holds start to the the same Postings Offset Table section as index related to this index header.
// PostingsOffsetTable holds start to the the same Postings Offset Table section as index related to this index header.
PostingsOffsetTable uint64
}

Expand Down Expand Up @@ -383,6 +386,11 @@ func (w *binaryWriter) Close() error {
return w.f.Close()
}

type postingValueOffsets struct {
offsets []postingOffset
lastValOffset int64
}

type postingOffset struct {
// label value.
value string
Expand All @@ -399,7 +407,7 @@ type BinaryReader struct {

// Map of LabelName to a list of some LabelValues's position in the offset table.
// The first and last values for each name are always present.
postings map[string][]postingOffset
postings map[string]*postingValueOffsets
// For the v1 format, labelname -> labelvalue -> offset.
postingsV1 map[string]map[string]index.Range

Expand All @@ -422,13 +430,14 @@ func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.Bucket
return br, nil
}

level.Warn(logger).Log("msg", "failed to read index-header from disk; recreating", "path", binfn, "err", err)
level.Debug(logger).Log("msg", "failed to read index-header from disk; recreating", "path", binfn, "err", err)

start := time.Now()
if err := WriteBinary(ctx, bkt, id, binfn); err != nil {
return nil, errors.Wrap(err, "write index header")
}

level.Debug(logger).Log("msg", "build index-header file", "path", binfn, "err", err)
level.Debug(logger).Log("msg", "built index-header file", "path", binfn, "elapsed", time.Since(start))

return newFileBinaryReader(binfn)
}
Expand All @@ -447,7 +456,7 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) {
r := &BinaryReader{
b: realByteSlice(f.Bytes()),
c: f,
postings: map[string][]postingOffset{},
postings: map[string]*postingValueOffsets{},
}

// Verify header.
Expand Down Expand Up @@ -487,9 +496,9 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) {
if len(key) != 2 {
return errors.Errorf("unexpected key length for posting table %d", len(key))
}
// TODO(bwplotka): This is wrong, probably we have to sort.

if lastKey != nil {
prevRng.End = int64(off + 4)
prevRng.End = int64(off - crc32.Size)
r.postingsV1[lastKey[0]][lastKey[1]] = prevRng
}

Expand All @@ -499,13 +508,13 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) {
}

lastKey = key
prevRng = index.Range{Start: int64(off + 4)}
prevRng = index.Range{Start: int64(off + postingLengthFieldSize)}
return nil
}); err != nil {
return nil, errors.Wrap(err, "read postings table")
}
if lastKey != nil {
prevRng.End = r.indexLastPostingEnd + 4
prevRng.End = r.indexLastPostingEnd - crc32.Size
r.postingsV1[lastKey[0]][lastKey[1]] = prevRng
}
} else {
Expand All @@ -521,34 +530,41 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) {

if _, ok := r.postings[key[0]]; !ok {
// Next label name.
r.postings[key[0]] = []postingOffset{}
r.postings[key[0]] = &postingValueOffsets{}
if lastKey != nil {
// Always include last value for each label name.
r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], tableOff: lastTableOff})
if valueCount%symbolFactor != 0 {
// Always include last value for each label name.
r.postings[lastKey[0]].offsets = append(r.postings[lastKey[0]].offsets, postingOffset{value: lastKey[1], tableOff: lastTableOff})
}
r.postings[lastKey[0]].lastValOffset = int64(off - crc32.Size)
lastKey = nil
}
valueCount = 0
}

lastKey = key
if valueCount%symbolFactor == 0 {
r.postings[key[0]] = append(r.postings[key[0]], postingOffset{value: key[1], tableOff: tableOff})
lastKey = nil
r.postings[key[0]].offsets = append(r.postings[key[0]].offsets, postingOffset{value: key[1], tableOff: tableOff})
return nil
}
lastKey = key

lastTableOff = tableOff
valueCount++
return nil
}); err != nil {
return nil, errors.Wrap(err, "read postings table")
}
if lastKey != nil {
r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], tableOff: lastTableOff})
if valueCount%symbolFactor != 0 {
r.postings[lastKey[0]].offsets = append(r.postings[lastKey[0]].offsets, postingOffset{value: lastKey[1], tableOff: lastTableOff})
}
r.postings[lastKey[0]].lastValOffset = r.indexLastPostingEnd - crc32.Size
}
// Trim any extra space in the slices.
for k, v := range r.postings {
l := make([]postingOffset, len(v))
copy(l, v)
r.postings[k] = l
l := make([]postingOffset, len(v.offsets))
copy(l, v.offsets)
r.postings[k].offsets = l
}
}

Expand Down Expand Up @@ -637,7 +653,7 @@ func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Ran

skip := 0
valueIndex := 0
for valueIndex < len(values) && values[valueIndex] < e[0].value {
for valueIndex < len(values) && values[valueIndex] < e.offsets[0].value {
// Discard values before the start.
valueIndex++
}
Expand All @@ -646,19 +662,19 @@ func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Ran
for valueIndex < len(values) {
value := values[valueIndex]

i := sort.Search(len(e), func(i int) bool { return e[i].value >= value })
if i == len(e) {
i := sort.Search(len(e.offsets), func(i int) bool { return e.offsets[i].value >= value })
if i == len(e.offsets) {
// We're past the end.
break
}
if i > 0 && e[i].value != value {
if i > 0 && e.offsets[i].value != value {
// Need to look from previous entry.
i--
}
// Don't Crc32 the entire postings offset table, this is very slow
// so hope any issues were caught at startup.
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsOffsetTable), nil)
d.Skip(e[i].tableOff)
d.Skip(e.offsets[i].tableOff)

tmpRngs = tmpRngs[:0]
// Iterate on the offset table.
Expand All @@ -677,31 +693,29 @@ func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Ran
postingOffset := int64(d.Uvarint64()) // Offset.
for string(v) >= value {
if string(v) == value {
// Actual posting is 4 bytes after offset, which includes length.
tmpRngs = append(tmpRngs, index.Range{Start: postingOffset + 4})
tmpRngs = append(tmpRngs, index.Range{Start: postingOffset + postingLengthFieldSize})
}
valueIndex++
if valueIndex == len(values) {
break
}
value = values[valueIndex]
}
if i+1 == len(e) {
if i+1 == len(e.offsets) {
for i := range tmpRngs {
tmpRngs[i].End = r.indexLastPostingEnd
tmpRngs[i].End = e.lastValOffset
}
rngs = append(rngs, tmpRngs...)
// Need to go to a later postings offset entry, if there is one.
break
}

if value >= e[i+1].value || valueIndex == len(values) {
if value >= e.offsets[i+1].value || valueIndex == len(values) {
d.Skip(skip)
d.UvarintBytes() // Label value.
postingOffset := int64(d.Uvarint64()) // Offset.
for j := range tmpRngs {
// Actual posting end is 4 bytes before next offset.
tmpRngs[j].End = postingOffset - 4
tmpRngs[j].End = postingOffset - crc32.Size
}
rngs = append(rngs, tmpRngs...)
// Need to go to a later postings offset entry, if there is one.
Expand Down Expand Up @@ -748,14 +762,14 @@ func (r BinaryReader) LabelValues(name string) ([]string, error) {
if !ok {
return nil, nil
}
if len(e) == 0 {
if len(e.offsets) == 0 {
return nil, nil
}
values := make([]string, 0, len(e)*symbolFactor)
values := make([]string, 0, len(e.offsets)*symbolFactor)

d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsOffsetTable), nil)
d.Skip(e[0].tableOff)
lastVal := e[len(e)-1].value
d.Skip(e.offsets[0].tableOff)
lastVal := e.offsets[len(e.offsets)-1].value

skip := 0
for d.Err() == nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/block/indexheader/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Reader interface {
IndexVersion() int

// PostingsOffset returns start and end offsets of postings for given name and value.
// end offset might be bigger than actual posting ending, but not larger then the whole index file.
// The end offset might be bigger than the actual posting ending, but not larger than the whole index file.
// NotFoundRangeErr is returned when no index can be found for given name and value.
// TODO(bwplotka): Move to PostingsOffsets(name string, value ...string) []index.Range and benchmark.
PostingsOffset(name string, value string) (index.Range, error)
Expand All @@ -27,7 +27,9 @@ type Reader interface {
// Error is return if the symbol can't be found.
LookupSymbol(o uint32) (string, error)

// LabelValues returns all label values for given label name or error if not found.
// LabelValues returns all label values for given label name or error.
// If no values are found for label name, or label name does not exists,
// then empty string is returned and no error.
LabelValues(name string) ([]string, error)

// LabelNames returns all label names.
Expand Down
33 changes: 18 additions & 15 deletions pkg/block/indexheader/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,15 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe

minStart := int64(math.MaxInt64)
maxEnd := int64(math.MinInt64)
for _, lname := range expLabelNames {
for il, lname := range expLabelNames {
expectedLabelVals, err := indexReader.LabelValues(lname)
testutil.Ok(t, err)

vals, err := headerReader.LabelValues(lname)
testutil.Ok(t, err)
testutil.Equals(t, expectedLabelVals, vals)

for i, v := range vals {
for iv, v := range vals {
if minStart > expRanges[labels.Label{Name: lname, Value: v}].Start {
minStart = expRanges[labels.Label{Name: lname, Value: v}].Start
}
Expand All @@ -195,32 +195,35 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe

// For index-cache those values are exact.
//
// For binary they are exact except:
// * formatV2: last item posting offset. It's good enough if the value is larger than exact posting ending.
// * formatV1: all items.
if i == len(vals)-1 || indexReader.Version() == index.FormatV1 {
testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}].Start, ptr.Start)
testutil.Assert(t, expRanges[labels.Label{Name: lname, Value: v}].End <= ptr.End, "got offset %v earlier than actual posting end %v ", ptr.End, expRanges[labels.Label{Name: lname, Value: v}].End)
continue
// For binary they are exact except last item posting offset. It's good enough if the value is larger than exact posting ending.
if indexReader.Version() == index.FormatV2 {
if iv == len(vals)-1 && il == len(expLabelNames)-1 {
testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}].Start, ptr.Start)
testutil.Assert(t, expRanges[labels.Label{Name: lname, Value: v}].End <= ptr.End, "got offset %v earlier than actual posting end %v ", ptr.End, expRanges[labels.Label{Name: lname, Value: v}].End)
continue
}
} else {
// For index formatV1 the last one does not mean literally last value, as postings were not sorted.
// Account for that. We know it's 40 label value.
if v == "40" {
testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}].Start, ptr.Start)
testutil.Assert(t, expRanges[labels.Label{Name: lname, Value: v}].End <= ptr.End, "got offset %v earlier than actual posting end %v ", ptr.End, expRanges[labels.Label{Name: lname, Value: v}].End)
continue
}
}
testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}], ptr)
}
}

ptr, err := headerReader.PostingsOffset(index.AllPostingsKey())
testutil.Ok(t, err)
// For AllPostingsKey ending has also too large ending which is well handled further on.
testutil.Equals(t, expRanges[labels.Label{Name: "", Value: ""}].Start, ptr.Start)
testutil.Assert(t, expRanges[labels.Label{Name: "", Value: ""}].End <= ptr.End, "got offset %v earlier than actual posting end %v ", ptr.End, expRanges[labels.Label{Name: "", Value: ""}].End)
testutil.Equals(t, expRanges[labels.Label{Name: "", Value: ""}].End, ptr.End)

vals, err := indexReader.LabelValues("not-existing")
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), vals)

vals, err = headerReader.LabelValues("not-existing")
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), vals)

_, err = headerReader.PostingsOffset("not-existing", "1")
testutil.NotOk(t, err)
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1565,8 +1565,13 @@ func resizePostings(b []byte) ([]byte, error) {
if d.Err() != nil {
return nil, errors.Wrap(d.Err(), "read postings list")
}
// 4 for posting length, then n * 4, foreach each big endian posting.
return b[:4+n*4], nil

// 4 for postings number of entries, then 4, foreach each big endian posting.
size := 4 + n*4
if len(b) <= size {
return nil, encoding.ErrInvalidSize
}
return b[:size], nil
}

// bigEndianPostings implements the Postings interface over a byte stream of
Expand Down

0 comments on commit 462d607

Please sign in to comment.