Skip to content

Commit

Permalink
store/cache: remove iteration limit and reset on internal inconsisten…
Browse files Browse the repository at this point in the history
…cy (#1274)
  • Loading branch information
abursavich authored and bwplotka committed Jun 25, 2019
1 parent a18df4a commit 2431318
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 28 deletions.
32 changes: 12 additions & 20 deletions pkg/store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,6 @@ func (c *IndexCache) set(typ string, key cacheKey, val []byte) {
// ensureFits tries to make sure that the passed slice will fit into the LRU cache.
// Returns true if it will fit.
func (c *IndexCache) ensureFits(size uint64, typ string) bool {
const saneMaxIterations = 500

if size > c.maxItemSizeBytes {
level.Debug(c.logger).Log(
"msg", "item bigger than maxItemSizeBytes. Ignoring..",
Expand All @@ -249,36 +247,30 @@ func (c *IndexCache) ensureFits(size uint64, typ string) bool {
return false
}

for i := 0; c.curSize+size > c.maxSizeBytes; i++ {
if i >= saneMaxIterations {
level.Error(c.logger).Log(
"msg", "After max sane iterations of LRU evictions, we still cannot allocate the item. Ignoring.",
"maxItemSizeBytes", c.maxItemSizeBytes,
"maxSizeBytes", c.maxSizeBytes,
"curSize", c.curSize,
"itemSize", size,
"cacheType", typ,
"iterations", i,
)
return false
}

_, _, ok := c.lru.RemoveOldest()
if !ok {
for c.curSize+size > c.maxSizeBytes {
if _, _, ok := c.lru.RemoveOldest(); !ok {
level.Error(c.logger).Log(
"msg", "LRU has nothing more to evict, but we still cannot allocate the item. Ignoring.",
"msg", "LRU has nothing more to evict, but we still cannot allocate the item. Resetting cache.",
"maxItemSizeBytes", c.maxItemSizeBytes,
"maxSizeBytes", c.maxSizeBytes,
"curSize", c.curSize,
"itemSize", size,
"cacheType", typ,
)
return false
c.reset()
}
}
return true
}

func (c *IndexCache) reset() {
c.lru.Purge()
c.current.Reset()
c.currentSize.Reset()
c.totalCurrentSize.Reset()
c.curSize = 0
}

// SetPostings sets the postings identfied by the ulid and label to the value v,
// if the postings already exists in the cache it is not mutated.
func (c *IndexCache) SetPostings(b ulid.ULID, l labels.Label, v []byte) {
Expand Down
17 changes: 9 additions & 8 deletions pkg/store/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,26 @@ func TestIndexCache_AvoidsDeadlock(t *testing.T) {
testutil.Ok(t, err)

l, err := simplelru.NewLRU(math.MaxInt64, func(key, val interface{}) {
// Hack LRU to simulate broken accounting: evictions do not reduce current size.
size := cache.curSize
cache.onEvict(key, val)

// We hack LRU to add back entry on eviction to simulate broken evictions.
cache.lru.Add(key, val)
cache.curSize += sliceHeaderSize + uint64(len(val.([]byte))) // Slice header + bytes.
cache.curSize = size
})
testutil.Ok(t, err)
cache.lru = l

cache.SetPostings(ulid.MustNew(0, nil), labels.Label{Name: "test2", Value: "1"}, []byte{42, 33, 14, 67, 11})

testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries)))
testutil.Equals(t, uint64(sliceHeaderSize+5), cache.curSize)
testutil.Equals(t, float64(cache.curSize), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings)))

// This triggers deadlock logic.
cache.SetPostings(ulid.MustNew(0, nil), labels.Label{Name: "test1", Value: "1"}, []byte{42})

testutil.Equals(t, float64(1), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries)))
testutil.Equals(t, uint64(sliceHeaderSize+1), cache.curSize)
testutil.Equals(t, float64(cache.curSize), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings)))
}

func TestIndexCache_UpdateItem(t *testing.T) {
Expand Down

0 comments on commit 2431318

Please sign in to comment.