Skip to content

Commit

Permalink
Differentiate Action Cache objects by instance name
Browse files Browse the repository at this point in the history
Fixes: buchgr#15
  • Loading branch information
gjasny committed Dec 28, 2019
1 parent 871dbe4 commit 79eba47
Show file tree
Hide file tree
Showing 11 changed files with 307 additions and 196 deletions.
18 changes: 9 additions & 9 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ type Cache interface {
// Put stores a stream of `size` bytes from `rdr` into the cache. If `hash` is
// not the empty string, and the contents don't match it, a non-nil error is
// returned.
Put(kind EntryKind, hash string, size int64, rdr io.Reader) error
Put(kind EntryKind, instanceName string, hash string, size int64, rdr io.Reader) error

// Get returns an io.ReadCloser with the content of the cache item stored under `hash`
// and the number of bytes that can be read from it. If the item is not found, `rdr` is
// nil. If some error occurred when processing the request, then it is returned.
Get(kind EntryKind, hash string) (rdr io.ReadCloser, sizeBytes int64, err error)
Get(kind EntryKind, instanceName string, hash string) (rdr io.ReadCloser, sizeBytes int64, err error)

// Contains returns true if the `hash` key exists in the cache.
Contains(kind EntryKind, hash string) (ok bool)
Contains(kind EntryKind, instanceName string, hash string) (ok bool)

// MaxSize returns the maximum cache size in bytes.
MaxSize() int64
Expand All @@ -78,8 +78,8 @@ type Cache interface {
// available in the CAS, return it and its serialized value.
// If not, return nil values.
// If something unexpected went wrong, return an error.
func GetValidatedActionResult(c Cache, hash string) (*pb.ActionResult, []byte, error) {
rdr, sizeBytes, err := c.Get(AC, hash)
func GetValidatedActionResult(c Cache, instanceName string, hash string) (*pb.ActionResult, []byte, error) {
rdr, sizeBytes, err := c.Get(AC, instanceName, hash)
if err != nil {
return nil, nil, err
}
Expand All @@ -101,26 +101,26 @@ func GetValidatedActionResult(c Cache, hash string) (*pb.ActionResult, []byte, e

for _, f := range result.OutputFiles {
if len(f.Contents) == 0 && f.Digest.SizeBytes > 0 {
if !c.Contains(CAS, f.Digest.Hash) {
if !c.Contains(CAS, instanceName, f.Digest.Hash) {
return nil, nil, nil // aka "not found"
}
}
}

for _, d := range result.OutputDirectories {
if !c.Contains(CAS, d.TreeDigest.Hash) {
if !c.Contains(CAS, instanceName, d.TreeDigest.Hash) {
return nil, nil, nil // aka "not found"
}
}

if result.StdoutDigest != nil {
if !c.Contains(CAS, result.StdoutDigest.Hash) {
if !c.Contains(CAS, instanceName, result.StdoutDigest.Hash) {
return nil, nil, nil // aka "not found"
}
}

if result.StderrDigest != nil {
if !c.Contains(CAS, result.StderrDigest.Hash) {
if !c.Contains(CAS, instanceName, result.StderrDigest.Hash) {
return nil, nil, nil // aka "not found"
}
}
Expand Down
28 changes: 16 additions & 12 deletions cache/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ func (c *diskCache) loadExistingFiles() error {
return nil
}

func (c *diskCache) Put(kind cache.EntryKind, hash string, expectedSize int64, r io.Reader) error {
func (c *diskCache) Put(kind cache.EntryKind, instanceName string, hash string, expectedSize int64, r io.Reader) error {
c.mu.Lock()

key := cacheKey(kind, hash)
key := cacheKey(kind, instanceName, hash)

// If there's an ongoing upload (i.e. cache key is present in uncommitted state),
// we drop the upload and discard the incoming stream. We do accept uploads
Expand Down Expand Up @@ -204,7 +204,7 @@ func (c *diskCache) Put(kind cache.EntryKind, hash string, expectedSize int64, r
}()

// Download to a temporary file
filePath := cacheFilePath(kind, c.dir, hash)
filePath := cacheFilePath(kind, c.dir, instanceName, hash)
tmpFilePath := filePath + ".tmp"
f, err := os.Create(tmpFilePath)
if err != nil {
Expand Down Expand Up @@ -260,12 +260,12 @@ func (c *diskCache) Put(kind cache.EntryKind, hash string, expectedSize int64, r
return err
}

func (c *diskCache) Get(kind cache.EntryKind, hash string) (rdr io.ReadCloser, sizeBytes int64, err error) {
if !c.Contains(kind, hash) {
func (c *diskCache) Get(kind cache.EntryKind, instanceName string, hash string) (rdr io.ReadCloser, sizeBytes int64, err error) {
if !c.Contains(kind, instanceName, hash) {
return nil, 0, nil
}

blobPath := cacheFilePath(kind, c.dir, hash)
blobPath := cacheFilePath(kind, c.dir, instanceName, hash)

fileInfo, err := os.Stat(blobPath)
if err != nil {
Expand All @@ -281,11 +281,11 @@ func (c *diskCache) Get(kind cache.EntryKind, hash string) (rdr io.ReadCloser, s
return rdr, sizeBytes, nil
}

func (c *diskCache) Contains(kind cache.EntryKind, hash string) (ok bool) {
func (c *diskCache) Contains(kind cache.EntryKind, instanceName string, hash string) (ok bool) {
c.mu.Lock()
defer c.mu.Unlock()

val, found := c.lru.Get(cacheKey(kind, hash))
val, found := c.lru.Get(cacheKey(kind, instanceName, hash))
// Uncommitted (i.e. uploading items) should be reported as not ok
return found && val.(*lruItem).committed
}
Expand All @@ -311,10 +311,14 @@ func ensureDirExists(path string) {
}
}

func cacheKey(kind cache.EntryKind, hash string) string {
return filepath.Join(kind.String(), hash[:2], hash)
func cacheKey(kind cache.EntryKind, instanceName string, hash string) string {
if kind == cache.CAS || instanceName == "" {
return filepath.Join(kind.String(), hash[:2], hash)
} else {
return filepath.Join(kind.String(), hash[:2], hash+"_"+instanceName)
}
}

func cacheFilePath(kind cache.EntryKind, cacheDir string, hash string) string {
return filepath.Join(cacheDir, cacheKey(kind, hash))
func cacheFilePath(kind cache.EntryKind, cacheDir string, instanceName string, hash string) string {
return filepath.Join(cacheDir, cacheKey(kind, instanceName, hash))
}
112 changes: 90 additions & 22 deletions cache/disk/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"testing"
"time"

"github.com/buchgr/bazel-remote/utils"
testutils "github.com/buchgr/bazel-remote/utils"

"github.com/buchgr/bazel-remote/cache"
)
Expand Down Expand Up @@ -57,6 +57,7 @@ func checkItems(cache *diskCache, expSize int64, expNum int) error {
return nil
}

const NO_INSTANCE_NAME = ""
const KEY = "a-key"
const CONTENTS = "hello"
const CONTENTS_HASH = "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824"
Expand All @@ -72,7 +73,7 @@ func TestCacheBasics(t *testing.T) {
}

// Non-existing item
rdr, sizeBytes, err := testCache.Get(cache.CAS, CONTENTS_HASH)
rdr, sizeBytes, err := testCache.Get(cache.CAS, NO_INSTANCE_NAME, CONTENTS_HASH)
if err != nil {
t.Fatal(err)
}
Expand All @@ -81,7 +82,7 @@ func TestCacheBasics(t *testing.T) {
}

// Add an item
err = testCache.Put(cache.CAS, CONTENTS_HASH, int64(len(CONTENTS)), strings.NewReader(CONTENTS))
err = testCache.Put(cache.CAS, NO_INSTANCE_NAME, CONTENTS_HASH, int64(len(CONTENTS)), strings.NewReader(CONTENTS))
if err != nil {
t.Fatal(err)
}
Expand All @@ -94,7 +95,7 @@ func TestCacheBasics(t *testing.T) {
}

// Get the item back
rdr, sizeBytes, err = testCache.Get(cache.CAS, CONTENTS_HASH)
rdr, sizeBytes, err = testCache.Get(cache.CAS, NO_INSTANCE_NAME, CONTENTS_HASH)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -126,7 +127,7 @@ func TestCacheEviction(t *testing.T) {

for i, thisExp := range expectedSizesNumItems {
strReader := strings.NewReader(strings.Repeat("a", i))
err := testCache.Put(cache.AC, fmt.Sprintf("aa-%d", i), int64(i), strReader)
err := testCache.Put(cache.AC, NO_INSTANCE_NAME, fmt.Sprintf("aa-%d", i), int64(i), strReader)
if err != nil {
t.Fatal(err)
}
Expand All @@ -143,7 +144,7 @@ func TestCachePutWrongSize(t *testing.T) {
defer os.RemoveAll(cacheDir)
testCache := New(cacheDir, 100)

err := testCache.Put(cache.AC, "aa-aa", int64(10), strings.NewReader("hello"))
err := testCache.Put(cache.AC, NO_INSTANCE_NAME, "aa-aa", int64(10), strings.NewReader("hello"))
if err == nil {
t.Fatal("Expected error due to size being different")
}
Expand Down Expand Up @@ -174,15 +175,30 @@ func putGetCompare(kind cache.EntryKind, hash string, content string, testCache
}

func putGetCompareBytes(kind cache.EntryKind, hash string, data []byte, testCache cache.Cache) error {

r := bytes.NewReader(data)

err := testCache.Put(kind, hash, int64(len(data)), r)
err := putBytes(kind, NO_INSTANCE_NAME, hash, data, testCache)
if err != nil {
return err
}

rdr, sizeBytes, err := testCache.Get(kind, hash)
return getCompareBytes(kind, NO_INSTANCE_NAME, hash, data, testCache)
}

func put(kind cache.EntryKind, instanceName string, hash string, content string, testCache cache.Cache) error {
return putBytes(kind, instanceName, hash, []byte(content), testCache)
}

func putBytes(kind cache.EntryKind, instanceName string, hash string, data []byte, testCache cache.Cache) error {
r := bytes.NewReader(data)

return testCache.Put(kind, instanceName, hash, int64(len(data)), r)
}

func getCompare(kind cache.EntryKind, instanceName string, hash string, content string, testCache cache.Cache) error {
return getCompareBytes(kind, instanceName, hash, []byte(content), testCache)
}

func getCompareBytes(kind cache.EntryKind, instanceName string, hash string, data []byte, testCache cache.Cache) error {
rdr, sizeBytes, err := testCache.Get(kind, instanceName, hash)
if err != nil {
return err
}
Expand Down Expand Up @@ -230,6 +246,58 @@ func TestOverwrite(t *testing.T) {
}
}

func TestRespectInstanceName(t *testing.T) {
cacheDir := tempDir(t)
defer os.RemoveAll(cacheDir)
testCache := New(cacheDir, 30)

hash := hashStr("just some common hash")

var err error
err = put(cache.AC, NO_INSTANCE_NAME, hash, "content", testCache)
if err != nil {
t.Fatal(err)
}
err = put(cache.AC, "a", hash, "contentA", testCache)
if err != nil {
t.Fatal(err)
}
err = put(cache.AC, "b", hash, "contentB", testCache)
if err != nil {
t.Fatal(err)
}

// expect different content for different instance names

err = getCompare(cache.AC, NO_INSTANCE_NAME, hash, "content", testCache)
if err != nil {
t.Fatal(err)
}
err = getCompare(cache.AC, "a", hash, "contentA", testCache)
if err != nil {
t.Fatal(err)
}
err = getCompare(cache.AC, "b", hash, "contentB", testCache)
if err != nil {
t.Fatal(err)
}

anotherTestCache := New(cacheDir, 30)

err = getCompare(cache.AC, NO_INSTANCE_NAME, hash, "content", anotherTestCache)
if err != nil {
t.Fatal(err)
}
err = getCompare(cache.AC, "a", hash, "contentA", anotherTestCache)
if err != nil {
t.Fatal(err)
}
err = getCompare(cache.AC, "b", hash, "contentB", anotherTestCache)
if err != nil {
t.Fatal(err)
}
}

func TestCacheExistingFiles(t *testing.T) {
cacheDir := tempDir(t)
defer os.RemoveAll(cacheDir)
Expand Down Expand Up @@ -264,7 +332,7 @@ func TestCacheExistingFiles(t *testing.T) {
}

// Adding a new file should evict items[0] (the oldest)
err = testCache.Put(cache.CAS, CONTENTS_HASH, int64(len(CONTENTS)), strings.NewReader(CONTENTS))
err = testCache.Put(cache.CAS, NO_INSTANCE_NAME, CONTENTS_HASH, int64(len(CONTENTS)), strings.NewReader(CONTENTS))
if err != nil {
t.Fatal(err)
}
Expand All @@ -273,7 +341,7 @@ func TestCacheExistingFiles(t *testing.T) {
if err != nil {
t.Fatal(err)
}
found := testCache.Contains(cache.CAS, "f53b46209596d170f7659a414c9ff9f6b545cf77ffd6e1cbe9bcc57e1afacfbd")
found := testCache.Contains(cache.CAS, NO_INSTANCE_NAME, "f53b46209596d170f7659a414c9ff9f6b545cf77ffd6e1cbe9bcc57e1afacfbd")
if found {
t.Fatalf("%s should have been evicted", items[0])
}
Expand All @@ -288,7 +356,7 @@ func TestCacheBlobTooLarge(t *testing.T) {

for k := range []cache.EntryKind{cache.AC, cache.RAW} {
kind := cache.EntryKind(k)
err := testCache.Put(kind, hashStr("foo"), 10000, strings.NewReader(CONTENTS))
err := testCache.Put(kind, NO_INSTANCE_NAME, hashStr("foo"), 10000, strings.NewReader(CONTENTS))
if err == nil {
t.Fatal("Expected an error")
}
Expand All @@ -309,14 +377,14 @@ func TestCacheCorruptedCASBlob(t *testing.T) {
defer os.RemoveAll(cacheDir)
testCache := New(cacheDir, 1000)

err := testCache.Put(cache.CAS, hashStr("foo"), int64(len(CONTENTS)),
err := testCache.Put(cache.CAS, NO_INSTANCE_NAME, hashStr("foo"), int64(len(CONTENTS)),
strings.NewReader(CONTENTS))
if err == nil {
t.Fatal("expected hash mismatch error")
}

// We expect the upload to succeed without validation:
err = testCache.Put(cache.RAW, hashStr("foo"), int64(len(CONTENTS)),
err = testCache.Put(cache.RAW, NO_INSTANCE_NAME, hashStr("foo"), int64(len(CONTENTS)),
strings.NewReader(CONTENTS))
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -344,13 +412,13 @@ func TestMigrateFromOldDirectoryStructure(t *testing.T) {
if numItems != 3 {
t.Fatalf("Expected test cache size 3 but was %d", numItems)
}
if !testCache.Contains(cache.AC, acHash) {
if !testCache.Contains(cache.AC, NO_INSTANCE_NAME, acHash) {
t.Fatalf("Expected cache to contain AC entry '%s'", acHash)
}
if !testCache.Contains(cache.CAS, casHash1) {
if !testCache.Contains(cache.CAS, NO_INSTANCE_NAME, casHash1) {
t.Fatalf("Expected cache to contain CAS entry '%s'", casHash1)
}
if !testCache.Contains(cache.CAS, casHash2) {
if !testCache.Contains(cache.CAS, NO_INSTANCE_NAME, casHash2) {
t.Fatalf("Expected cache to contain CAS entry '%s'", casHash2)
}
}
Expand Down Expand Up @@ -382,13 +450,13 @@ func TestLoadExistingEntries(t *testing.T) {
t.Fatalf("Expected test cache size %d but was %d",
numBlobs, numItems)
}
if !testCache.Contains(cache.AC, acHash) {
if !testCache.Contains(cache.AC, NO_INSTANCE_NAME, acHash) {
t.Fatalf("Expected cache to contain AC entry '%s'", acHash)
}
if !testCache.Contains(cache.CAS, casHash) {
if !testCache.Contains(cache.CAS, NO_INSTANCE_NAME, casHash) {
t.Fatalf("Expected cache to contain CAS entry '%s'", casHash)
}
if !testCache.Contains(cache.RAW, rawHash) {
if !testCache.Contains(cache.RAW, NO_INSTANCE_NAME, rawHash) {
t.Fatalf("Expected cache to contain RAW entry '%s'", rawHash)
}
}
Expand Down
Loading

0 comments on commit 79eba47

Please sign in to comment.