Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add TTL functionality #1

Merged
merged 2 commits into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 31 additions & 17 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ limitations under the License.

package galaxycache

import "time"

// Codec includes both the BinaryMarshaler and BinaryUnmarshaler
// interfaces
type Codec interface {
MarshalBinary() ([]byte, error)
UnmarshalBinary(data []byte) error
MarshalBinary() ([]byte, time.Time, error)
UnmarshalBinary(data []byte, expire time.Time) error
}

// Note: to ensure that unmarshaling is a read-only operation, bytes
Expand All @@ -32,48 +34,60 @@ func cloneBytes(b []byte) []byte {
}

// ByteCodec is a byte slice type that implements Codec
type ByteCodec []byte
type ByteCodec struct {
bytes []byte
expire time.Time
}

// MarshalBinary on a ByteCodec returns the bytes
func (c *ByteCodec) MarshalBinary() ([]byte, error) {
return *c, nil
func (c *ByteCodec) MarshalBinary() ([]byte, time.Time, error) {
return c.bytes, c.expire, nil
}

// UnmarshalBinary on a ByteCodec sets the ByteCodec to
// a copy of the provided data
func (c *ByteCodec) UnmarshalBinary(data []byte) error {
*c = cloneBytes(data)
func (c *ByteCodec) UnmarshalBinary(data []byte, expire time.Time) error {
c.bytes = cloneBytes(data)
c.expire = expire
return nil
}

// CopyingByteCodec is a byte slice type that implements Codec
// and returns a copy of the bytes when marshaled
type CopyingByteCodec []byte
type CopyingByteCodec struct {
bytes []byte
expire time.Time
}

// MarshalBinary on a CopyingByteCodec returns a copy of the bytes
func (c *CopyingByteCodec) MarshalBinary() ([]byte, error) {
return cloneBytes(*c), nil
func (c *CopyingByteCodec) MarshalBinary() ([]byte, time.Time, error) {
return cloneBytes(c.bytes), c.expire, nil
}

// UnmarshalBinary on a CopyingByteCodec sets the ByteCodec to
// a copy of the provided data
func (c *CopyingByteCodec) UnmarshalBinary(data []byte) error {
*c = cloneBytes(data)
func (c *CopyingByteCodec) UnmarshalBinary(data []byte, expire time.Time) error {
c.bytes = cloneBytes(data)
c.expire = expire
return nil
}

// StringCodec is a string type that implements Codec
type StringCodec string
type StringCodec struct {
str string
expire time.Time
}

// MarshalBinary on a StringCodec returns the bytes underlying
// the string
func (c *StringCodec) MarshalBinary() ([]byte, error) {
return []byte(*c), nil
func (c *StringCodec) MarshalBinary() ([]byte, time.Time, error) {
return []byte(c.str), c.expire, nil
}

// UnmarshalBinary on a StringCodec sets the StringCodec to
// a stringified copy of the provided data
func (c *StringCodec) UnmarshalBinary(data []byte) error {
*c = StringCodec(data)
func (c *StringCodec) UnmarshalBinary(data []byte, expire time.Time) error {
c.str = string(data)
c.expire = expire
return nil
}
13 changes: 10 additions & 3 deletions codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package galaxycache
import (
"bytes"
"testing"
"time"
)

const testBytes = "some bytes"
Expand Down Expand Up @@ -51,25 +52,31 @@ func TestCodec(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
inBytes := []byte(testBytes)
tc.codec.UnmarshalBinary(inBytes)
tc.codec.UnmarshalBinary(inBytes, time.Time{})
inBytes[0] = 'a' // change the original byte slice to ensure copy was made
marshaledBytes, err := tc.codec.MarshalBinary()
marshaledBytes, expTm, err := tc.codec.MarshalBinary()
if err != nil {
t.Errorf("Error marshaling from byteCodec: %s", err)
}
if !expTm.Equal(time.Time{}) {
t.Errorf("Expected empty expiration time")
}
if string(marshaledBytes) != testBytes {
t.Errorf("Unmarshal/Marshal resulted in %q; want %q", marshaledBytes, testBytes)
}

if tc.checkCopy {
marshaledBytes[0] = 'a' // change marshaled bytes to ensure full copy on marshal
secondMarshaledBytes, errM := tc.codec.MarshalBinary()
secondMarshaledBytes, expTm, errM := tc.codec.MarshalBinary()
if errM != nil {
t.Errorf("Error marshaling from byteCodec: %s", errM)
}
if bytes.Equal(marshaledBytes, secondMarshaledBytes) {
t.Errorf("Marshaling did not copy the bytes")
}
if !expTm.Equal(time.Time{}) {
t.Errorf("Expected empty expiration time")
}
}
})
}
Expand Down
29 changes: 15 additions & 14 deletions galaxycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error {
value.stats.touch()
g.recordRequest(ctx, hlvl, false)
g.recordStats(ctx, nil, MValueLength.M(int64(len(value.data))))
return dest.UnmarshalBinary(value.data)
return dest.UnmarshalBinary(value.data, value.expire)
}

span.Annotatef([]trace.Attribute{trace.BoolAttribute("cache_hit", false)}, "Cache miss")
Expand All @@ -456,7 +456,7 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error {
if destPopulated {
return nil
}
return dest.UnmarshalBinary(value.data)
return dest.UnmarshalBinary(value.data, value.expire)
}

type valWithLevel struct {
Expand Down Expand Up @@ -525,7 +525,7 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi
// probably boring (normal task movement), so not
// worth logging I imagine.
}
data, err := g.getLocally(ctx, key, dest)
data, expTm, err := g.getLocally(ctx, key, dest)
if err != nil {
g.Stats.BackendLoadErrors.Add(1)
g.recordStats(ctx, nil, MBackendLoadErrors.M(1))
Expand All @@ -535,7 +535,7 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi
g.Stats.CoalescedBackendLoads.Add(1)
g.recordStats(ctx, nil, MCoalescedBackendLoads.M(1))
destPopulated = true // only one caller of load gets this return value
value = newValWithStat(data, nil)
value = newValWithStat(data, nil, expTm)
g.populateCache(ctx, key, value, &g.mainCache)
return &valWithLevel{value, hitBackend, authoritative, peerErr, err}, nil
})
Expand All @@ -548,22 +548,22 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi
return
}

func (g *Galaxy) getLocally(ctx context.Context, key string, dest Codec) ([]byte, error) {
func (g *Galaxy) getLocally(ctx context.Context, key string, dest Codec) ([]byte, time.Time, error) {
err := g.getter.Get(ctx, key, dest)
if err != nil {
return nil, err
return nil, time.Time{}, err
}
return dest.MarshalBinary()
}

func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string) (*valWithStat, error) {
data, err := peer.Fetch(ctx, g.name, key)
data, expire, err := peer.Fetch(ctx, g.name, key)
if err != nil {
return nil, err
}
vi, ok := g.candidateCache.get(key)
if !ok {
vi = g.addNewToCandidateCache(key)
vi = g.addNewToCandidateCache(key, expire)
}

g.maybeUpdateHotCacheStats() // will update if at least a second has passed since the last update
Expand All @@ -573,7 +573,7 @@ func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string
KeyQPS: kStats.val(),
HCStats: g.hcStatsWithTime.hcs,
}
value := newValWithStat(data, kStats)
value := newValWithStat(data, kStats, expire)
if g.opts.promoter.ShouldPromote(key, value.data, stats) {
g.populateCache(ctx, key, value, &g.hotCache)
}
Expand Down Expand Up @@ -692,8 +692,9 @@ func (c *cache) stats() CacheStats {
}

type valWithStat struct {
data []byte
stats *keyStats
data []byte
stats *keyStats
expire time.Time
}

// sizeOfValWithStats returns the total size of the value in the hot/main
Expand All @@ -704,21 +705,21 @@ func (v *valWithStat) size() int64 {
return int64(unsafe.Sizeof(*v.stats)) + int64(len(v.data)) + int64(unsafe.Sizeof(v)) + int64(unsafe.Sizeof(*v))
}

func (c *cache) setLRUOnEvicted(f func(key string, kStats *keyStats)) {
func (c *cache) setLRUOnEvicted(f func(key string, kStats *keyStats, ttl time.Time)) {
c.lru.OnEvicted = func(key lru.Key, value interface{}) {
val := value.(*valWithStat)
c.nbytes -= int64(len(key.(string))) + val.size()
c.nevict++
if f != nil {
f(key.(string), val.stats)
f(key.(string), val.stats, val.expire)
}
}
}

func (c *cache) add(key string, value *valWithStat) {
c.mu.Lock()
defer c.mu.Unlock()
c.lru.Add(key, value)
c.lru.Add(key, value, value.expire)
c.nbytes += int64(len(key)) + value.size()
}

Expand Down
Loading