Skip to content

Commit

Permalink
cache: detach Set operations from given context (#6180)
Browse files Browse the repository at this point in the history
All remote index caches support timeouts by themselves so detach the
given context from the Set operation because the given context is
attached to the original request.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS authored Mar 15, 2023
1 parent 2ab0ec2 commit 9b5f1fe
Show file tree
Hide file tree
Showing 23 changed files with 74 additions and 71 deletions.
3 changes: 2 additions & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ type Cache interface {
// Store data into the cache.
//
// Note that individual byte buffers may be retained by the cache!
Store(ctx context.Context, data map[string][]byte, ttl time.Duration)
// Cache by itself needs to support write timeouts by itself.
Store(data map[string][]byte, ttl time.Duration)

// Fetch multiple keys from cache. Returns map of input keys to data.
// If key isn't in the map, data for given key was not found.
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (c *unsafeByteCodec) UnmarshalBinary(data []byte, expire time.Time) error {
return nil
}

func (c *Groupcache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
func (c *Groupcache) Store(data map[string][]byte, ttl time.Duration) {
// Noop since cache is already filled during fetching.
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (c *InMemoryCache) reset() {
c.curSize = 0
}

func (c *InMemoryCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
func (c *InMemoryCache) Store(data map[string][]byte, ttl time.Duration) {
for key, val := range data {
c.set(key, val, ttl)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ max_item_size: 2KB

// Store the postings expected before running the test.
ctx := context.Background()
c.Store(ctx, testData.setup, testData.ttl)
c.Store(testData.setup, testData.ttl)

// Add delay to test expiry functionality.
if testData.testTTL {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ func NewMemcachedCache(name string, logger log.Logger, memcached cacheutil.Remot
// Store data identified by keys.
// The function enqueues the request and returns immediately: the entry will be
// asynchronously stored in the cache.
func (c *MemcachedCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
func (c *MemcachedCache) Store(data map[string][]byte, ttl time.Duration) {
var (
firstErr error
failed int
)

for key, val := range data {
if err := c.memcached.SetAsync(ctx, key, val, ttl); err != nil {
if err := c.memcached.SetAsync(key, val, ttl); err != nil {
failed++
if firstErr == nil {
firstErr = err
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/memcached_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestMemcachedCache(t *testing.T) {

// Store the postings expected before running the test.
ctx := context.Background()
c.Store(ctx, testData.setup, time.Hour)
c.Store(testData.setup, time.Hour)

// Fetch postings from cached and assert on it.
hits := c.Fetch(ctx, testData.fetchKeys)
Expand Down Expand Up @@ -118,7 +118,7 @@ func (c *mockedMemcachedClient) GetMulti(_ context.Context, keys []string) map[s
return hits
}

func (c *mockedMemcachedClient) SetAsync(_ context.Context, key string, value []byte, _ time.Duration) error {
func (c *mockedMemcachedClient) SetAsync(key string, value []byte, _ time.Duration) error {
c.cache[key] = value
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func NewRedisCache(name string, logger log.Logger, redisClient *cacheutil.RedisC
}

// Store data identified by keys.
func (c *RedisCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
c.redisClient.SetMulti(ctx, data, ttl)
func (c *RedisCache) Store(data map[string][]byte, ttl time.Duration) {
c.redisClient.SetMulti(data, ttl)
}

// Fetch fetches multiple keys and returns a map containing cache hits, along with a list of missing keys.
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestRedisCache(t *testing.T) {
c := NewRedisCache(tt.name, logger, c, reg)
// Store the cache expected before running the test.
ctx := context.Background()
c.Store(ctx, tt.args.data, time.Hour)
c.Store(tt.args.data, time.Hour)

// Fetch postings from cached and assert on it.
hits := c.Fetch(ctx, tt.args.fetchKeys)
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/tracing_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func NewTracingCache(cache Cache) Cache {
return TracingCache{c: cache}
}

func (t TracingCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
t.c.Store(ctx, data, ttl)
func (t TracingCache) Store(data map[string][]byte, ttl time.Duration) {
t.c.Store(data, ttl)
}

func (t TracingCache) Fetch(ctx context.Context, keys []string) (result map[string][]byte) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type RemoteCacheClient interface {
// SetAsync enqueues an asynchronous operation to store a key into memcached.
// Returns an error in case it fails to enqueue the operation. In case the
// underlying async operation will fail, the error will be tracked/logged.
SetAsync(ctx context.Context, key string, value []byte, ttl time.Duration) error
SetAsync(key string, value []byte, ttl time.Duration) error

// Stop client and release underlying resources.
Stop()
Expand Down Expand Up @@ -384,7 +384,7 @@ func (c *memcachedClient) Stop() {
c.workers.Wait()
}

func (c *memcachedClient) SetAsync(_ context.Context, key string, value []byte, ttl time.Duration) error {
func (c *memcachedClient) SetAsync(key string, value []byte, ttl time.Duration) error {
// Skip hitting memcached at all if the item is bigger than the max allowed size.
if c.config.MaxItemSize > 0 && uint64(len(value)) > uint64(c.config.MaxItemSize) {
c.skipped.WithLabelValues(opSet, reasonMaxItemSize).Inc()
Expand Down
10 changes: 5 additions & 5 deletions pkg/cacheutil/memcached_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ func TestMemcachedClient_SetAsync(t *testing.T) {
testutil.Ok(t, err)
defer client.Stop()

testutil.Ok(t, client.SetAsync(ctx, "key-1", []byte("value-1"), time.Second))
testutil.Ok(t, client.SetAsync(ctx, "key-2", []byte("value-2"), time.Second))
testutil.Ok(t, client.SetAsync("key-1", []byte("value-1"), time.Second))
testutil.Ok(t, client.SetAsync("key-2", []byte("value-2"), time.Second))
testutil.Ok(t, backendMock.waitItems(2))

actual, err := client.getMultiSingle(ctx, []string{"key-1", "key-2"})
Expand All @@ -166,8 +166,8 @@ func TestMemcachedClient_SetAsyncWithCustomMaxItemSize(t *testing.T) {
testutil.Ok(t, err)
defer client.Stop()

testutil.Ok(t, client.SetAsync(ctx, "key-1", []byte("value-1"), time.Second))
testutil.Ok(t, client.SetAsync(ctx, "key-2", []byte("value-2-too-long-to-be-stored"), time.Second))
testutil.Ok(t, client.SetAsync("key-1", []byte("value-1"), time.Second))
testutil.Ok(t, client.SetAsync("key-2", []byte("value-2-too-long-to-be-stored"), time.Second))
testutil.Ok(t, backendMock.waitItems(1))

actual, err := client.getMultiSingle(ctx, []string{"key-1", "key-2"})
Expand Down Expand Up @@ -384,7 +384,7 @@ func TestMemcachedClient_GetMulti(t *testing.T) {

// Populate memcached with the initial items.
for _, item := range testData.initialItems {
testutil.Ok(t, client.SetAsync(ctx, item.Key, item.Value, time.Second))
testutil.Ok(t, client.SetAsync(item.Key, item.Value, time.Second))
}

// Wait until initial items have been added.
Expand Down
8 changes: 4 additions & 4 deletions pkg/cacheutil/redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,9 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient
}

// SetAsync implement RemoteCacheClient.
func (c *RedisClient) SetAsync(ctx context.Context, key string, value []byte, ttl time.Duration) error {
func (c *RedisClient) SetAsync(key string, value []byte, ttl time.Duration) error {
start := time.Now()
if err := c.client.Do(ctx, c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error(); err != nil {
if err := c.client.Do(context.Background(), c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error(); err != nil {
level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key, "value_size", len(value))
return nil
}
Expand All @@ -264,7 +264,7 @@ func (c *RedisClient) SetAsync(ctx context.Context, key string, value []byte, tt
}

// SetMulti set multiple keys and value.
func (c *RedisClient) SetMulti(ctx context.Context, data map[string][]byte, ttl time.Duration) {
func (c *RedisClient) SetMulti(data map[string][]byte, ttl time.Duration) {
if len(data) == 0 {
return
}
Expand All @@ -274,7 +274,7 @@ func (c *RedisClient) SetMulti(ctx context.Context, data map[string][]byte, ttl
for k, v := range data {
sets = append(sets, c.client.B().Setex().Key(k).Seconds(ittl).Value(rueidis.BinaryString(v)).Build())
}
for _, resp := range c.client.DoMulti(ctx, sets...) {
for _, resp := range c.client.DoMulti(context.Background(), sets...) {
if err := resp.Error(); err != nil {
level.Warn(c.logger).Log("msg", "failed to set multi items from redis", "err", err, "items", len(data))
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/cacheutil/redis_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestRedisClient(t *testing.T) {
defer c.Stop()
defer s.FlushAll()
ctx := context.Background()
c.SetMulti(ctx, tt.args.data, time.Hour)
c.SetMulti(tt.args.data, time.Hour)
hits := c.GetMulti(ctx, tt.args.fetchKeys)
testutil.Equals(t, tt.want.hits, hits)
})
Expand Down
8 changes: 4 additions & 4 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,12 +368,12 @@ func (s *BucketStore) validate() error {

type noopCache struct{}

func (noopCache) StorePostings(context.Context, ulid.ULID, labels.Label, []byte) {}
func (noopCache) StorePostings(ulid.ULID, labels.Label, []byte) {}
func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) {
return map[labels.Label][]byte{}, keys
}

func (noopCache) StoreSeries(context.Context, ulid.ULID, storage.SeriesRef, []byte) {}
func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte) {}
func (noopCache) FetchMultiSeries(_ context.Context, _ ulid.ULID, ids []storage.SeriesRef) (map[storage.SeriesRef][]byte, []storage.SeriesRef) {
return map[storage.SeriesRef][]byte{}, ids
}
Expand Down Expand Up @@ -2446,7 +2446,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
// Truncate first 4 bytes which are length of posting.
output[p.keyID] = newBigEndianPostings(pBytes[4:])

r.block.indexCache.StorePostings(ctx, r.block.meta.ULID, keys[p.keyID], dataToCache)
r.block.indexCache.StorePostings(r.block.meta.ULID, keys[p.keyID], dataToCache)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
Expand Down Expand Up @@ -2607,7 +2607,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series
c = c[n : n+int(l)]
r.mtx.Lock()
r.loadedSeries[id] = c
r.block.indexCache.StoreSeries(ctx, r.block.meta.ULID, id, c)
r.block.indexCache.StoreSeries(r.block.meta.ULID, id, c)
r.mtx.Unlock()
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@ func (c *swappableCache) SwapWith(ptr2 storecache.IndexCache) {
c.ptr = ptr2
}

func (c *swappableCache) StorePostings(ctx context.Context, blockID ulid.ULID, l labels.Label, v []byte) {
c.ptr.StorePostings(ctx, blockID, l, v)
func (c *swappableCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {
c.ptr.StorePostings(blockID, l, v)
}

func (c *swappableCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) {
return c.ptr.FetchMultiPostings(ctx, blockID, keys)
}

func (c *swappableCache) StoreSeries(ctx context.Context, blockID ulid.ULID, id storage.SeriesRef, v []byte) {
c.ptr.StoreSeries(ctx, blockID, id, v)
func (c *swappableCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {
c.ptr.StoreSeries(blockID, id, v)
}

func (c *swappableCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (map[storage.SeriesRef][]byte, []storage.SeriesRef) {
Expand Down
8 changes: 6 additions & 2 deletions pkg/store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@ var (
)

// IndexCache is the interface exported by index cache backends.
// Store operations do not support context.Context, deadlines need to be
// supported by the backends themselves. This is because Set operations are
// run async and it does not make sense to attach same context
// (potentially with a deadline) as in the original user's request.
type IndexCache interface {
// StorePostings stores postings for a single series.
StorePostings(ctx context.Context, blockID ulid.ULID, l labels.Label, v []byte)
StorePostings(blockID ulid.ULID, l labels.Label, v []byte)

// FetchMultiPostings fetches multiple postings - each identified by a label -
// and returns a map containing cache hits, along with a list of missing keys.
FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label)

// StoreSeries stores a single series.
StoreSeries(ctx context.Context, blockID ulid.ULID, id storage.SeriesRef, v []byte)
StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte)

// FetchMultiSeries fetches multiple series - each identified by ID - from the cache
// and returns a map containing cache hits, along with a list of missing IDs.
Expand Down
18 changes: 9 additions & 9 deletions pkg/store/cache/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (cb *CachingBucket) Iter(ctx context.Context, dir string, f func(string) er
if err == nil && remainingTTL > 0 {
data, encErr := cfg.Codec.Encode(list)
if encErr == nil {
cfg.Cache.Store(ctx, map[string][]byte{key: data}, remainingTTL)
cfg.Cache.Store(map[string][]byte{key: data}, remainingTTL)
return nil
}
level.Warn(cb.logger).Log("msg", "failed to encode Iter result", "key", key, "err", encErr)
Expand Down Expand Up @@ -194,13 +194,13 @@ func (cb *CachingBucket) Exists(ctx context.Context, name string) (bool, error)
existsTime := time.Now()
ok, err := cb.Bucket.Exists(ctx, name)
if err == nil {
storeExistsCacheEntry(ctx, key, ok, existsTime, cfg.Cache, cfg.ExistsTTL, cfg.DoesntExistTTL)
storeExistsCacheEntry(key, ok, existsTime, cfg.Cache, cfg.ExistsTTL, cfg.DoesntExistTTL)
}

return ok, err
}

func storeExistsCacheEntry(ctx context.Context, cachingKey string, exists bool, ts time.Time, cache cache.Cache, existsTTL, doesntExistTTL time.Duration) {
func storeExistsCacheEntry(cachingKey string, exists bool, ts time.Time, cache cache.Cache, existsTTL, doesntExistTTL time.Duration) {
var ttl time.Duration
if exists {
ttl = existsTTL - time.Since(ts)
Expand All @@ -209,7 +209,7 @@ func storeExistsCacheEntry(ctx context.Context, cachingKey string, exists bool,
}

if ttl > 0 {
cache.Store(ctx, map[string][]byte{cachingKey: []byte(strconv.FormatBool(exists))}, ttl)
cache.Store(map[string][]byte{cachingKey: []byte(strconv.FormatBool(exists))}, ttl)
}
}

Expand Down Expand Up @@ -245,13 +245,13 @@ func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, e
if err != nil {
if cb.Bucket.IsObjNotFoundErr(err) {
// Cache that object doesn't exist.
storeExistsCacheEntry(ctx, existsKey, false, getTime, cfg.Cache, cfg.ExistsTTL, cfg.DoesntExistTTL)
storeExistsCacheEntry(existsKey, false, getTime, cfg.Cache, cfg.ExistsTTL, cfg.DoesntExistTTL)
}

return nil, err
}

storeExistsCacheEntry(ctx, existsKey, true, getTime, cfg.Cache, cfg.ExistsTTL, cfg.DoesntExistTTL)
storeExistsCacheEntry(existsKey, true, getTime, cfg.Cache, cfg.ExistsTTL, cfg.DoesntExistTTL)
return &getReader{
c: cfg.Cache,
ctx: ctx,
Expand Down Expand Up @@ -314,7 +314,7 @@ func (cb *CachingBucket) cachedAttributes(ctx context.Context, name, cfgName str
}

if raw, err := json.Marshal(attrs); err == nil {
cache.Store(ctx, map[string][]byte{key: raw}, ttl)
cache.Store(map[string][]byte{key: raw}, ttl)
} else {
level.Warn(cb.logger).Log("msg", "failed to encode cached Attributes result", "key", key, "err", err)
}
Expand Down Expand Up @@ -466,7 +466,7 @@ func (cb *CachingBucket) fetchMissingSubranges(ctx context.Context, name string,

if storeToCache {
cb.fetchedGetRangeBytes.WithLabelValues(originBucket, cfgName).Add(float64(len(subrangeData)))
cfg.Cache.Store(gctx, map[string][]byte{key: subrangeData}, cfg.SubrangeTTL)
cfg.Cache.Store(map[string][]byte{key: subrangeData}, cfg.SubrangeTTL)
} else {
cb.refetchedGetRangeBytes.WithLabelValues(originCache, cfgName).Add(float64(len(subrangeData)))
}
Expand Down Expand Up @@ -594,7 +594,7 @@ func (g *getReader) Read(p []byte) (n int, err error) {
if err == io.EOF && g.buf != nil {
remainingTTL := g.ttl - time.Since(g.startTime)
if remainingTTL > 0 {
g.c.Store(g.ctx, map[string][]byte{g.cacheKey: g.buf.Bytes()}, remainingTTL)
g.c.Store(map[string][]byte{g.cacheKey: g.buf.Bytes()}, remainingTTL)
}
// Clear reference, to avoid doing another Store on next read.
g.buf = nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/cache/caching_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func newMockCache() *mockCache {
return c
}

func (m *mockCache) Store(_ context.Context, data map[string][]byte, ttl time.Duration) {
func (m *mockCache) Store(data map[string][]byte, ttl time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()

Expand Down
4 changes: 2 additions & 2 deletions pkg/store/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func copyToKey(l labels.Label) cacheKeyPostings {

// StorePostings sets the postings identified by the ulid and label to the value v,
// if the postings already exists in the cache it is not mutated.
func (c *InMemoryIndexCache) StorePostings(_ context.Context, blockID ulid.ULID, l labels.Label, v []byte) {
func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {
c.set(cacheTypePostings, cacheKey{block: blockID, key: copyToKey(l)}, v)
}

Expand All @@ -312,7 +312,7 @@ func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.

// StoreSeries sets the series identified by the ulid and id to the value v,
// if the series already exists in the cache it is not mutated.
func (c *InMemoryIndexCache) StoreSeries(_ context.Context, blockID ulid.ULID, id storage.SeriesRef, v []byte) {
func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {
c.set(cacheTypeSeries, cacheKey{blockID, cacheKeySeries(id)}, v)
}

Expand Down
Loading

0 comments on commit 9b5f1fe

Please sign in to comment.