diff --git a/codec.go b/codec.go index 53be4c8d..13639ec3 100644 --- a/codec.go +++ b/codec.go @@ -16,11 +16,13 @@ limitations under the License. package galaxycache +import "time" + // Codec includes both the BinaryMarshaler and BinaryUnmarshaler // interfaces type Codec interface { - MarshalBinary() ([]byte, error) - UnmarshalBinary(data []byte) error + MarshalBinary() ([]byte, time.Time, error) + UnmarshalBinary(data []byte, expire time.Time) error } // Note: to ensure that unmarshaling is a read-only operation, bytes @@ -32,48 +34,60 @@ func cloneBytes(b []byte) []byte { } // ByteCodec is a byte slice type that implements Codec -type ByteCodec []byte +type ByteCodec struct { + bytes []byte + expire time.Time +} // MarshalBinary on a ByteCodec returns the bytes -func (c *ByteCodec) MarshalBinary() ([]byte, error) { - return *c, nil +func (c *ByteCodec) MarshalBinary() ([]byte, time.Time, error) { + return c.bytes, c.expire, nil } // UnmarshalBinary on a ByteCodec sets the ByteCodec to // a copy of the provided data -func (c *ByteCodec) UnmarshalBinary(data []byte) error { - *c = cloneBytes(data) +func (c *ByteCodec) UnmarshalBinary(data []byte, expire time.Time) error { + c.bytes = cloneBytes(data) + c.expire = expire return nil } // CopyingByteCodec is a byte slice type that implements Codec // and returns a copy of the bytes when marshaled -type CopyingByteCodec []byte +type CopyingByteCodec struct { + bytes []byte + expire time.Time +} // MarshalBinary on a CopyingByteCodec returns a copy of the bytes -func (c *CopyingByteCodec) MarshalBinary() ([]byte, error) { - return cloneBytes(*c), nil +func (c *CopyingByteCodec) MarshalBinary() ([]byte, time.Time, error) { + return cloneBytes(c.bytes), c.expire, nil } // UnmarshalBinary on a CopyingByteCodec sets the ByteCodec to // a copy of the provided data -func (c *CopyingByteCodec) UnmarshalBinary(data []byte) error { - *c = cloneBytes(data) +func (c *CopyingByteCodec) UnmarshalBinary(data []byte, expire time.Time) error { + c.bytes = cloneBytes(data) + c.expire = expire return nil } // StringCodec is a string type that implements Codec -type StringCodec string +type StringCodec struct { + str string + expire time.Time +} // MarshalBinary on a StringCodec returns the bytes underlying // the string -func (c *StringCodec) MarshalBinary() ([]byte, error) { - return []byte(*c), nil +func (c *StringCodec) MarshalBinary() ([]byte, time.Time, error) { + return []byte(c.str), c.expire, nil } // UnmarshalBinary on a StringCodec sets the StringCodec to // a stringified copy of the provided data -func (c *StringCodec) UnmarshalBinary(data []byte) error { - *c = StringCodec(data) +func (c *StringCodec) UnmarshalBinary(data []byte, expire time.Time) error { + c.str = string(data) + c.expire = expire return nil } diff --git a/codec_test.go b/codec_test.go index 76342233..0a4a9129 100644 --- a/codec_test.go +++ b/codec_test.go @@ -19,6 +19,7 @@ package galaxycache import ( "bytes" "testing" + "time" ) const testBytes = "some bytes" @@ -51,25 +52,31 @@ func TestCodec(t *testing.T) { for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { inBytes := []byte(testBytes) - tc.codec.UnmarshalBinary(inBytes) + tc.codec.UnmarshalBinary(inBytes, time.Time{}) inBytes[0] = 'a' // change the original byte slice to ensure copy was made - marshaledBytes, err := tc.codec.MarshalBinary() + marshaledBytes, expTm, err := tc.codec.MarshalBinary() if err != nil { t.Errorf("Error marshaling from byteCodec: %s", err) } + if !expTm.Equal(time.Time{}) { + t.Errorf("Expected empty expiration time") + } if string(marshaledBytes) != testBytes { t.Errorf("Unmarshal/Marshal resulted in %q; want %q", marshaledBytes, testBytes) } if tc.checkCopy { marshaledBytes[0] = 'a' // change marshaled bytes to ensure full copy on marshal - secondMarshaledBytes, errM := tc.codec.MarshalBinary() + secondMarshaledBytes, expTm, errM := tc.codec.MarshalBinary() if errM != nil { t.Errorf("Error marshaling from byteCodec: %s", errM) } if bytes.Equal(marshaledBytes, secondMarshaledBytes) { t.Errorf("Marshaling did not copy the bytes") } + if !expTm.Equal(time.Time{}) { + t.Errorf("Expected empty expiration time") + } } }) } diff --git a/galaxycache.go b/galaxycache.go index 626db13d..e23a915f 100644 --- a/galaxycache.go +++ b/galaxycache.go @@ -436,7 +436,7 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error { value.stats.touch() g.recordRequest(ctx, hlvl, false) g.recordStats(ctx, nil, MValueLength.M(int64(len(value.data)))) - return dest.UnmarshalBinary(value.data) + return dest.UnmarshalBinary(value.data, value.expire) } span.Annotatef([]trace.Attribute{trace.BoolAttribute("cache_hit", false)}, "Cache miss") @@ -456,7 +456,7 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error { if destPopulated { return nil } - return dest.UnmarshalBinary(value.data) + return dest.UnmarshalBinary(value.data, value.expire) } type valWithLevel struct { @@ -525,7 +525,7 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi // probably boring (normal task movement), so not // worth logging I imagine. } - data, err := g.getLocally(ctx, key, dest) + data, expTm, err := g.getLocally(ctx, key, dest) if err != nil { g.Stats.BackendLoadErrors.Add(1) g.recordStats(ctx, nil, MBackendLoadErrors.M(1)) @@ -535,7 +535,7 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi g.Stats.CoalescedBackendLoads.Add(1) g.recordStats(ctx, nil, MCoalescedBackendLoads.M(1)) destPopulated = true // only one caller of load gets this return value - value = newValWithStat(data, nil) + value = newValWithStat(data, nil, expTm) g.populateCache(ctx, key, value, &g.mainCache) return &valWithLevel{value, hitBackend, authoritative, peerErr, err}, nil }) @@ -548,22 +548,22 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi return } -func (g *Galaxy) getLocally(ctx context.Context, key string, dest Codec) ([]byte, error) { +func (g *Galaxy) getLocally(ctx context.Context, key string, dest Codec) ([]byte, time.Time, error) { err := g.getter.Get(ctx, key, dest) if err != nil { - return nil, err + return nil, time.Time{}, err } return dest.MarshalBinary() } func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string) (*valWithStat, error) { - data, err := peer.Fetch(ctx, g.name, key) + data, expire, err := peer.Fetch(ctx, g.name, key) if err != nil { return nil, err } vi, ok := g.candidateCache.get(key) if !ok { - vi = g.addNewToCandidateCache(key) + vi = g.addNewToCandidateCache(key, expire) } g.maybeUpdateHotCacheStats() // will update if at least a second has passed since the last update @@ -573,7 +573,7 @@ func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string KeyQPS: kStats.val(), HCStats: g.hcStatsWithTime.hcs, } - value := newValWithStat(data, kStats) + value := newValWithStat(data, kStats, expire) if g.opts.promoter.ShouldPromote(key, value.data, stats) { g.populateCache(ctx, key, value, &g.hotCache) } @@ -692,8 +692,9 @@ func (c *cache) stats() CacheStats { } type valWithStat struct { - data []byte - stats *keyStats + data []byte + stats *keyStats + expire time.Time } // sizeOfValWithStats returns the total size of the value in the hot/main @@ -704,13 +705,13 @@ func (v *valWithStat) size() int64 { return int64(unsafe.Sizeof(*v.stats)) + int64(len(v.data)) + int64(unsafe.Sizeof(v)) + int64(unsafe.Sizeof(*v)) } -func (c *cache) setLRUOnEvicted(f func(key string, kStats *keyStats)) { +func (c *cache) setLRUOnEvicted(f func(key string, kStats *keyStats, ttl time.Time)) { c.lru.OnEvicted = func(key lru.Key, value interface{}) { val := value.(*valWithStat) c.nbytes -= int64(len(key.(string))) + val.size() c.nevict++ if f != nil { - f(key.(string), val.stats) + f(key.(string), val.stats, val.expire) } } } @@ -718,7 +719,7 @@ func (c *cache) setLRUOnEvicted(f func(key string, kStats *keyStats)) { func (c *cache) add(key string, value *valWithStat) { c.mu.Lock() defer c.mu.Unlock() - c.lru.Add(key, value) + c.lru.Add(key, value, value.expire) c.nbytes += int64(len(key)) + value.size() } diff --git a/galaxycache_test.go b/galaxycache_test.go index fac06279..fe67486e 100644 --- a/galaxycache_test.go +++ b/galaxycache_test.go @@ -54,7 +54,7 @@ func setupStringGalaxyTest(cacheFills *AtomicInt) (*Galaxy, context.Context, cha } cacheFills.Add(1) str := "ECHO:" + key - return dest.UnmarshalBinary([]byte(str)) + return dest.UnmarshalBinary([]byte(str), time.Now().Add(5*time.Minute)) })) return stringGalaxy, ctx, stringc } @@ -75,7 +75,13 @@ func TestGetDupSuppress(t *testing.T) { resc <- "ERROR:" + err.Error() return } - resc <- string(s) + + ret, _, err := s.MarshalBinary() + if err != nil { + resc <- "ERROR MARSHAL: " + err.Error() + return + } + resc <- string(ret) }() } @@ -150,7 +156,12 @@ func TestCacheEviction(t *testing.T) { var res StringCodec key := fmt.Sprintf("dummy-key-%d", bytesFlooded) stringGalaxy.Get(ctx, key, &res) - bytesFlooded += int64(len(key) + len(res)) + + ret, _, err := res.MarshalBinary() + if err != nil { + t.Fatalf("marshaling binary: %v", err.Error()) + } + bytesFlooded += int64(len(key) + len(ret)) } evicts := stringGalaxy.mainCache.nevict - evict0 if evicts <= 0 { @@ -179,12 +190,12 @@ func (fetcher *TestFetcher) Close() error { type testFetchers []RemoteFetcher -func (fetcher *TestFetcher) Fetch(ctx context.Context, galaxy string, key string) ([]byte, error) { +func (fetcher *TestFetcher) Fetch(ctx context.Context, galaxy string, key string) ([]byte, time.Time, error) { if fetcher.fail { - return nil, errors.New("simulated error from peer") + return nil, time.Time{}, errors.New("simulated error from peer") } fetcher.hits++ - return []byte("got:" + key), nil + return []byte("got:" + key), time.Time{}, nil } func (proto *TestProtocol) NewFetcher(url string) (RemoteFetcher, error) { @@ -271,7 +282,7 @@ func TestPeers(t *testing.T) { getter := func(_ context.Context, key string, dest Codec) error { // these are local hits testproto.TestFetchers["fetcher0"].hits++ - return dest.UnmarshalBinary([]byte("got:" + key)) + return dest.UnmarshalBinary([]byte("got:"+key), time.Now().Add(5*time.Minute)) } testGalaxy := universe.NewGalaxy("TestPeers-galaxy", tc.cacheSize, GetterFunc(getter), WithPromoter(&promoter.ProbabilisticPromoter{ProbDenominator: 10})) @@ -293,8 +304,14 @@ func TestPeers(t *testing.T) { t.Errorf("%s: error on key %q: %v", tc.testName, key, err) continue } - if string(got) != want { - t.Errorf("%s: for key %q, got %q; want %q", tc.testName, key, got, want) + + ret, _, err := got.MarshalBinary() + if err != nil { + t.Errorf("%s: error marshaling on key %q: %v", tc.testName, key, err) + continue + } + if string(ret) != want { + t.Errorf("%s: for key %q, got %q; want %q", tc.testName, key, ret, want) } } for name, fetcher := range testproto.TestFetchers { @@ -335,7 +352,7 @@ func TestNoDedup(t *testing.T) { const testkey = "testkey" const testval = "testval" g := universe.NewGalaxy("testgalaxy", 1024, GetterFunc(func(_ context.Context, key string, dest Codec) error { - return dest.UnmarshalBinary([]byte(testval)) + return dest.UnmarshalBinary([]byte(testval), time.Now().Add(5*time.Minute)) })) orderedGroup := &orderedFlightGroup{ @@ -359,7 +376,13 @@ func TestNoDedup(t *testing.T) { resc <- "ERROR:" + err.Error() return } - resc <- string(s) + + ret, _, err := s.MarshalBinary() + if err != nil { + resc <- "ERROR MARSHAL:" + err.Error() + return + } + resc <- string(ret) }() } @@ -386,7 +409,7 @@ func TestNoDedup(t *testing.T) { // upon entry, we would increment nbytes twice but the entry would // only be in the cache once. testKStats := keyStats{dQPS: dampedQPS{period: time.Second}} - testvws := newValWithStat([]byte(testval), &testKStats) + testvws := newValWithStat([]byte(testval), &testKStats, time.Now().Add(1*time.Second)) wantBytes := int64(len(testkey)) + testvws.size() if g.mainCache.nbytes != wantBytes { t.Errorf("cache has %d bytes, want %d", g.mainCache.nbytes, wantBytes) @@ -454,14 +477,14 @@ func TestHotcache(t *testing.T) { t.Run(tc.name, func(t *testing.T) { u := NewUniverse(&TestProtocol{}, "test-universe") g := u.NewGalaxy("test-galaxy", 1<<20, GetterFunc(func(_ context.Context, key string, dest Codec) error { - return dest.UnmarshalBinary([]byte("hello")) + return dest.UnmarshalBinary([]byte("hello"), time.Now().Add(5*time.Minute)) })) kStats := &keyStats{ dQPS: dampedQPS{ period: time.Second, }, } - value := newValWithStat([]byte("hello"), kStats) + value := newValWithStat([]byte("hello"), kStats, time.Now().Add(1*time.Second)) g.hotCache.add(keyToAdd, value) now := time.Now() // blast the key in the hotcache with a bunch of hypothetical gets every few seconds @@ -476,7 +499,7 @@ func TestHotcache(t *testing.T) { if math.Abs(val-tc.expectedBurstQPS) > val/100 { // ensure less than %1 error t.Errorf("QPS after bursts: %f, Wanted: %f", val, tc.expectedBurstQPS) } - value2 := newValWithStat([]byte("hello there"), nil) + value2 := newValWithStat([]byte("hello there"), nil, time.Now().Add(1*time.Second)) g.hotCache.add(keyToAdd+"2", value2) // ensure that hcStats are properly updated after adding g.maybeUpdateHotCacheStats() @@ -568,7 +591,7 @@ func TestPromotion(t *testing.T) { fetcher := &TestFetcher{} testProto := &TestProtocol{} getter := func(_ context.Context, key string, dest Codec) error { - return dest.UnmarshalBinary([]byte("got:" + key)) + return dest.UnmarshalBinary([]byte("got:"+key), time.Now().Add(5*time.Minute)) } universe := NewUniverse(testProto, "promotion-test") galaxy := universe.NewGalaxy("test-galaxy", tc.cacheSize, GetterFunc(getter), WithPromoter(tc.promoter)) @@ -594,7 +617,7 @@ func TestRecorder(t *testing.T) { meter.Register(testView) getter := func(_ context.Context, key string, dest Codec) error { - return dest.UnmarshalBinary([]byte("got:" + key)) + return dest.UnmarshalBinary([]byte("got:"+key), time.Now().Add(5*time.Minute)) } u := NewUniverse(&TestProtocol{}, "test-universe", WithRecorder(meter)) g := u.NewGalaxy("test", 1024, GetterFunc(getter)) diff --git a/galaxycachepb/galaxycache.pb.go b/galaxycachepb/galaxycache.pb.go index 116e3d79..401ed086 100644 --- a/galaxycachepb/galaxycache.pb.go +++ b/galaxycachepb/galaxycache.pb.go @@ -1,212 +1,256 @@ +// +//Copyright 2012 Google Inc. +// +//Licensed under the Apache License, Version 2.0 (the "License"); +//you may not use this file except in compliance with the License. +//You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, software +//distributed under the License is distributed on an "AS IS" BASIS, +//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//See the License for the specific language governing permissions and +//limitations under the License. + // Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.19.1 // source: galaxycache.proto package galaxycachepb import ( - context "context" - fmt "fmt" - proto "github.com/golang/protobuf/proto" - grpc "google.golang.org/grpc" - math "math" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" ) -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) type GetRequest struct { - Galaxy string `protobuf:"bytes,1,opt,name=galaxy,proto3" json:"galaxy,omitempty"` - Key string `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields -func (m *GetRequest) Reset() { *m = GetRequest{} } -func (m *GetRequest) String() string { return proto.CompactTextString(m) } -func (*GetRequest) ProtoMessage() {} -func (*GetRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_23bd509ca7b74957, []int{0} + Galaxy string `protobuf:"bytes,1,opt,name=galaxy,proto3" json:"galaxy,omitempty"` + Key string `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` // not actually required/guaranteed to be UTF-8 } -func (m *GetRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_GetRequest.Unmarshal(m, b) -} -func (m *GetRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_GetRequest.Marshal(b, m, deterministic) -} -func (m *GetRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_GetRequest.Merge(m, src) +func (x *GetRequest) Reset() { + *x = GetRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_galaxycache_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } } -func (m *GetRequest) XXX_Size() int { - return xxx_messageInfo_GetRequest.Size(m) + +func (x *GetRequest) String() string { + return protoimpl.X.MessageStringOf(x) } -func (m *GetRequest) XXX_DiscardUnknown() { - xxx_messageInfo_GetRequest.DiscardUnknown(m) + +func (*GetRequest) ProtoMessage() {} + +func (x *GetRequest) ProtoReflect() protoreflect.Message { + mi := &file_galaxycache_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) } -var xxx_messageInfo_GetRequest proto.InternalMessageInfo +// Deprecated: Use GetRequest.ProtoReflect.Descriptor instead. +func (*GetRequest) Descriptor() ([]byte, []int) { + return file_galaxycache_proto_rawDescGZIP(), []int{0} +} -func (m *GetRequest) GetGalaxy() string { - if m != nil { - return m.Galaxy +func (x *GetRequest) GetGalaxy() string { + if x != nil { + return x.Galaxy } return "" } -func (m *GetRequest) GetKey() string { - if m != nil { - return m.Key +func (x *GetRequest) GetKey() string { + if x != nil { + return x.Key } return "" } type GetResponse struct { - Value []byte `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` - MinuteQps float64 `protobuf:"fixed64,2,opt,name=minute_qps,json=minuteQps,proto3" json:"minute_qps,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields -func (m *GetResponse) Reset() { *m = GetResponse{} } -func (m *GetResponse) String() string { return proto.CompactTextString(m) } -func (*GetResponse) ProtoMessage() {} -func (*GetResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_23bd509ca7b74957, []int{1} + Value []byte `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` + MinuteQps float64 `protobuf:"fixed64,2,opt,name=minute_qps,json=minuteQps,proto3" json:"minute_qps,omitempty"` + Expire int64 `protobuf:"varint,3,opt,name=expire,proto3" json:"expire,omitempty"` } -func (m *GetResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_GetResponse.Unmarshal(m, b) -} -func (m *GetResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_GetResponse.Marshal(b, m, deterministic) -} -func (m *GetResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_GetResponse.Merge(m, src) +func (x *GetResponse) Reset() { + *x = GetResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_galaxycache_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } } -func (m *GetResponse) XXX_Size() int { - return xxx_messageInfo_GetResponse.Size(m) + +func (x *GetResponse) String() string { + return protoimpl.X.MessageStringOf(x) } -func (m *GetResponse) XXX_DiscardUnknown() { - xxx_messageInfo_GetResponse.DiscardUnknown(m) + +func (*GetResponse) ProtoMessage() {} + +func (x *GetResponse) ProtoReflect() protoreflect.Message { + mi := &file_galaxycache_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) } -var xxx_messageInfo_GetResponse proto.InternalMessageInfo +// Deprecated: Use GetResponse.ProtoReflect.Descriptor instead. +func (*GetResponse) Descriptor() ([]byte, []int) { + return file_galaxycache_proto_rawDescGZIP(), []int{1} +} -func (m *GetResponse) GetValue() []byte { - if m != nil { - return m.Value +func (x *GetResponse) GetValue() []byte { + if x != nil { + return x.Value } return nil } -func (m *GetResponse) GetMinuteQps() float64 { - if m != nil { - return m.MinuteQps +func (x *GetResponse) GetMinuteQps() float64 { + if x != nil { + return x.MinuteQps } return 0 } -func init() { - proto.RegisterType((*GetRequest)(nil), "galaxycachepb.GetRequest") - proto.RegisterType((*GetResponse)(nil), "galaxycachepb.GetResponse") +func (x *GetResponse) GetExpire() int64 { + if x != nil { + return x.Expire + } + return 0 } -func init() { proto.RegisterFile("galaxycache.proto", fileDescriptor_23bd509ca7b74957) } +var File_galaxycache_proto protoreflect.FileDescriptor -var fileDescriptor_23bd509ca7b74957 = []byte{ - // 186 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4c, 0x4f, 0xcc, 0x49, - 0xac, 0xa8, 0x4c, 0x4e, 0x4c, 0xce, 0x48, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x45, - 0x12, 0x2a, 0x48, 0x52, 0x32, 0xe3, 0xe2, 0x72, 0x4f, 0x2d, 0x09, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, - 0x2e, 0x11, 0x12, 0xe3, 0x62, 0x83, 0x48, 0x4b, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x06, 0x41, 0x79, - 0x42, 0x02, 0x5c, 0xcc, 0xd9, 0xa9, 0x95, 0x12, 0x4c, 0x60, 0x41, 0x10, 0x53, 0xc9, 0x89, 0x8b, - 0x1b, 0xac, 0xaf, 0xb8, 0x20, 0x3f, 0xaf, 0x38, 0x55, 0x48, 0x84, 0x8b, 0xb5, 0x2c, 0x31, 0xa7, - 0x34, 0x15, 0xac, 0x8f, 0x27, 0x08, 0xc2, 0x11, 0x92, 0xe5, 0xe2, 0xca, 0xcd, 0xcc, 0x2b, 0x2d, - 0x49, 0x8d, 0x2f, 0x2c, 0x28, 0x06, 0xeb, 0x66, 0x0c, 0xe2, 0x84, 0x88, 0x04, 0x16, 0x14, 0x1b, - 0x85, 0x72, 0x71, 0xbb, 0x83, 0xcd, 0x77, 0x06, 0x39, 0x46, 0xc8, 0x0d, 0x6c, 0xa4, 0x5b, 0x51, - 0x7e, 0x6e, 0x40, 0x6a, 0x6a, 0x91, 0x90, 0xa4, 0x1e, 0x8a, 0x4b, 0xf5, 0x10, 0xce, 0x94, 0x92, - 0xc2, 0x26, 0x05, 0x71, 0x89, 0x12, 0x43, 0x12, 0x1b, 0xd8, 0xa3, 0xc6, 0x80, 0x00, 0x00, 0x00, - 0xff, 0xff, 0x84, 0x48, 0x7a, 0xc8, 0xfd, 0x00, 0x00, 0x00, +var file_galaxycache_proto_rawDesc = []byte{ + 0x0a, 0x11, 0x67, 0x61, 0x6c, 0x61, 0x78, 0x79, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x12, 0x0d, 0x67, 0x61, 0x6c, 0x61, 0x78, 0x79, 0x63, 0x61, 0x63, 0x68, 0x65, + 0x70, 0x62, 0x22, 0x36, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x16, 0x0a, 0x06, 0x67, 0x61, 0x6c, 0x61, 0x78, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x06, 0x67, 0x61, 0x6c, 0x61, 0x78, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x22, 0x5a, 0x0a, 0x0b, 0x47, 0x65, + 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, + 0x1d, 0x0a, 0x0a, 0x6d, 0x69, 0x6e, 0x75, 0x74, 0x65, 0x5f, 0x71, 0x70, 0x73, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x01, 0x52, 0x09, 0x6d, 0x69, 0x6e, 0x75, 0x74, 0x65, 0x51, 0x70, 0x73, 0x12, 0x16, + 0x0a, 0x06, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, + 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x32, 0x55, 0x0a, 0x0b, 0x47, 0x61, 0x6c, 0x61, 0x78, 0x79, + 0x43, 0x61, 0x63, 0x68, 0x65, 0x12, 0x46, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x46, 0x72, 0x6f, 0x6d, + 0x50, 0x65, 0x65, 0x72, 0x12, 0x19, 0x2e, 0x67, 0x61, 0x6c, 0x61, 0x78, 0x79, 0x63, 0x61, 0x63, + 0x68, 0x65, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x1a, 0x2e, 0x67, 0x61, 0x6c, 0x61, 0x78, 0x79, 0x63, 0x61, 0x63, 0x68, 0x65, 0x70, 0x62, 0x2e, + 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x11, 0x5a, + 0x0f, 0x2e, 0x2f, 0x67, 0x61, 0x6c, 0x61, 0x78, 0x79, 0x63, 0x61, 0x63, 0x68, 0x65, 0x70, 0x62, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// GalaxyCacheClient is the client API for GalaxyCache service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type GalaxyCacheClient interface { - GetFromPeer(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*GetResponse, error) -} - -type galaxyCacheClient struct { - cc *grpc.ClientConn -} +var ( + file_galaxycache_proto_rawDescOnce sync.Once + file_galaxycache_proto_rawDescData = file_galaxycache_proto_rawDesc +) -func NewGalaxyCacheClient(cc *grpc.ClientConn) GalaxyCacheClient { - return &galaxyCacheClient{cc} +func file_galaxycache_proto_rawDescGZIP() []byte { + file_galaxycache_proto_rawDescOnce.Do(func() { + file_galaxycache_proto_rawDescData = protoimpl.X.CompressGZIP(file_galaxycache_proto_rawDescData) + }) + return file_galaxycache_proto_rawDescData } -func (c *galaxyCacheClient) GetFromPeer(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*GetResponse, error) { - out := new(GetResponse) - err := c.cc.Invoke(ctx, "/galaxycachepb.GalaxyCache/GetFromPeer", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil +var file_galaxycache_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_galaxycache_proto_goTypes = []interface{}{ + (*GetRequest)(nil), // 0: galaxycachepb.GetRequest + (*GetResponse)(nil), // 1: galaxycachepb.GetResponse } - -// GalaxyCacheServer is the server API for GalaxyCache service. -type GalaxyCacheServer interface { - GetFromPeer(context.Context, *GetRequest) (*GetResponse, error) +var file_galaxycache_proto_depIdxs = []int32{ + 0, // 0: galaxycachepb.GalaxyCache.GetFromPeer:input_type -> galaxycachepb.GetRequest + 1, // 1: galaxycachepb.GalaxyCache.GetFromPeer:output_type -> galaxycachepb.GetResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name } -func RegisterGalaxyCacheServer(s *grpc.Server, srv GalaxyCacheServer) { - s.RegisterService(&_GalaxyCache_serviceDesc, srv) -} - -func _GalaxyCache_GetFromPeer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(GetRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(GalaxyCacheServer).GetFromPeer(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/galaxycachepb.GalaxyCache/GetFromPeer", +func init() { file_galaxycache_proto_init() } +func file_galaxycache_proto_init() { + if File_galaxycache_proto != nil { + return } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(GalaxyCacheServer).GetFromPeer(ctx, req.(*GetRequest)) + if !protoimpl.UnsafeEnabled { + file_galaxycache_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_galaxycache_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } - return interceptor(ctx, in, info, handler) -} - -var _GalaxyCache_serviceDesc = grpc.ServiceDesc{ - ServiceName: "galaxycachepb.GalaxyCache", - HandlerType: (*GalaxyCacheServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "GetFromPeer", - Handler: _GalaxyCache_GetFromPeer_Handler, + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_galaxycache_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "galaxycache.proto", + GoTypes: file_galaxycache_proto_goTypes, + DependencyIndexes: file_galaxycache_proto_depIdxs, + MessageInfos: file_galaxycache_proto_msgTypes, + }.Build() + File_galaxycache_proto = out.File + file_galaxycache_proto_rawDesc = nil + file_galaxycache_proto_goTypes = nil + file_galaxycache_proto_depIdxs = nil } diff --git a/galaxycachepb/galaxycache.proto b/galaxycachepb/galaxycache.proto index a30fb340..00fdfc23 100644 --- a/galaxycachepb/galaxycache.proto +++ b/galaxycachepb/galaxycache.proto @@ -17,6 +17,8 @@ limitations under the License. syntax = "proto3"; package galaxycachepb; +option go_package = "./galaxycachepb"; + message GetRequest { string galaxy = 1; @@ -26,8 +28,9 @@ message GetRequest { message GetResponse { bytes value = 1; double minute_qps = 2; + int64 expire = 3; } service GalaxyCache { rpc GetFromPeer(GetRequest) returns (GetResponse) {} -} \ No newline at end of file +} diff --git a/galaxycachepb/galaxycache_grpc.pb.go b/galaxycachepb/galaxycache_grpc.pb.go new file mode 100644 index 00000000..3719ccef --- /dev/null +++ b/galaxycachepb/galaxycache_grpc.pb.go @@ -0,0 +1,101 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package galaxycachepb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// GalaxyCacheClient is the client API for GalaxyCache service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type GalaxyCacheClient interface { + GetFromPeer(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*GetResponse, error) +} + +type galaxyCacheClient struct { + cc grpc.ClientConnInterface +} + +func NewGalaxyCacheClient(cc grpc.ClientConnInterface) GalaxyCacheClient { + return &galaxyCacheClient{cc} +} + +func (c *galaxyCacheClient) GetFromPeer(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*GetResponse, error) { + out := new(GetResponse) + err := c.cc.Invoke(ctx, "/galaxycachepb.GalaxyCache/GetFromPeer", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// GalaxyCacheServer is the server API for GalaxyCache service. +// All implementations must embed UnimplementedGalaxyCacheServer +// for forward compatibility +type GalaxyCacheServer interface { + GetFromPeer(context.Context, *GetRequest) (*GetResponse, error) + mustEmbedUnimplementedGalaxyCacheServer() +} + +// UnimplementedGalaxyCacheServer must be embedded to have forward compatible implementations. +type UnimplementedGalaxyCacheServer struct { +} + +func (UnimplementedGalaxyCacheServer) GetFromPeer(context.Context, *GetRequest) (*GetResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetFromPeer not implemented") +} +func (UnimplementedGalaxyCacheServer) mustEmbedUnimplementedGalaxyCacheServer() {} + +// UnsafeGalaxyCacheServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to GalaxyCacheServer will +// result in compilation errors. +type UnsafeGalaxyCacheServer interface { + mustEmbedUnimplementedGalaxyCacheServer() +} + +func RegisterGalaxyCacheServer(s grpc.ServiceRegistrar, srv GalaxyCacheServer) { + s.RegisterService(&GalaxyCache_ServiceDesc, srv) +} + +func _GalaxyCache_GetFromPeer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(GalaxyCacheServer).GetFromPeer(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/galaxycachepb.GalaxyCache/GetFromPeer", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(GalaxyCacheServer).GetFromPeer(ctx, req.(*GetRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// GalaxyCache_ServiceDesc is the grpc.ServiceDesc for GalaxyCache service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var GalaxyCache_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "galaxycachepb.GalaxyCache", + HandlerType: (*GalaxyCacheServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetFromPeer", + Handler: _GalaxyCache_GetFromPeer_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "galaxycache.proto", +} diff --git a/go.mod b/go.mod index 12af9092..75fb9276 100644 --- a/go.mod +++ b/go.mod @@ -7,4 +7,5 @@ require ( go.opencensus.io v0.22.5 golang.org/x/net v0.0.0-20210119194325-5f4716e94777 // indirect google.golang.org/grpc v1.35.0 + google.golang.org/protobuf v1.25.0 ) diff --git a/go.sum b/go.sum index 8f329b03..97d8b4f6 100644 --- a/go.sum +++ b/go.sum @@ -8,13 +8,13 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= @@ -27,30 +27,40 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= -github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210119194325-5f4716e94777 h1:003p0dJM77cxMSyCPFphvZf/Y5/NXf5fzg6ufd1/Oew= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -58,10 +68,13 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd h1:r7DufRZuZbWB7j439YfAzP8RPDa9unLkpwQKUYbIMPI= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -73,18 +86,22 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb h1:i1Ppqkc3WQXikh8bXiwHqAN5Rv3/qDCcRk0/Otx73BY= google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= -google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= diff --git a/grpc/grpc_test.go b/grpc/grpc_test.go index 625aee7e..500387e3 100644 --- a/grpc/grpc_test.go +++ b/grpc/grpc_test.go @@ -24,6 +24,7 @@ import ( "strconv" "sync" "testing" + "time" gc "github.com/vimeo/galaxycache" @@ -79,10 +80,15 @@ func TestGRPCPeerServer(t *testing.T) { if err := g.Get(ctx, key, &value); err != nil { t.Fatal(err) } - if string(value) != ":"+key { - t.Errorf("Unexpected value: Get(%q) = %q, expected %q", key, value, ":"+key) + + ret, _, err := value.MarshalBinary() + if err != nil { + t.Fatal(err) + } + if string(ret) != ":"+key { + t.Errorf("Unexpected value: Get(%q) = %q, expected %q", key, ret, ":"+key) } - t.Logf("Get key=%q, value=%q (peer:key)", key, value) + t.Logf("Get key=%q, value=%q (peer:key)", key, ret) } cancel() wg.Wait() @@ -104,7 +110,7 @@ func runTestPeerGRPCServer(ctx context.Context, t testing.TB, addresses []string } getter := gc.GetterFunc(func(ctx context.Context, key string, dest gc.Codec) error { - dest.UnmarshalBinary([]byte(":" + key)) + dest.UnmarshalBinary([]byte(":"+key), time.Now().Add(5*time.Minute)) return nil }) universe.NewGalaxy("peerFetchTest", 1<<20, getter) diff --git a/grpc/grpcclient.go b/grpc/grpcclient.go index d39f4c94..de42e1f0 100644 --- a/grpc/grpcclient.go +++ b/grpc/grpcclient.go @@ -18,6 +18,7 @@ package grpc import ( "context" + "time" gc "github.com/vimeo/galaxycache" pb "github.com/vimeo/galaxycache/galaxycachepb" @@ -70,7 +71,7 @@ func (gp *GRPCFetchProtocol) NewFetcher(address string) (gc.RemoteFetcher, error // Fetch here implements the RemoteFetcher interface for // sending Gets to peers over an RPC connection -func (g *grpcFetcher) Fetch(ctx context.Context, galaxy string, key string) ([]byte, error) { +func (g *grpcFetcher) Fetch(ctx context.Context, galaxy string, key string) ([]byte, time.Time, error) { span := trace.FromContext(ctx) span.Annotatef(nil, "fetching from %s; connection state %s", g.address, g.conn.GetState()) resp, err := g.client.GetFromPeer(ctx, &pb.GetRequest{ @@ -78,10 +79,10 @@ func (g *grpcFetcher) Fetch(ctx context.Context, galaxy string, key string) ([]b Key: key, }) if err != nil { - return nil, status.Errorf(status.Code(err), "Failed to fetch from peer over RPC [%q, %q]: %s", galaxy, g.address, err) + return nil, time.Time{}, status.Errorf(status.Code(err), "Failed to fetch from peer over RPC [%q, %q]: %s", galaxy, g.address, err) } - return resp.Value, nil + return resp.Value, time.UnixMilli(resp.Expire), nil } // Close here implements the RemoteFetcher interface for diff --git a/grpc/grpcserver.go b/grpc/grpcserver.go index f4b8faf9..c1c80d67 100644 --- a/grpc/grpcserver.go +++ b/grpc/grpcserver.go @@ -20,6 +20,7 @@ import ( "context" gc "github.com/vimeo/galaxycache" + "github.com/vimeo/galaxycache/galaxycachepb" pb "github.com/vimeo/galaxycache/galaxycachepb" "google.golang.org/grpc" @@ -31,6 +32,7 @@ import ( // interface generated by the GalaxyCache pb service type serviceImpl struct { universe *gc.Universe + galaxycachepb.UnimplementedGalaxyCacheServer } // RegisterGRPCServer registers the given grpc.Server with @@ -58,5 +60,10 @@ func (gp *serviceImpl) GetFromPeer(ctx context.Context, req *pb.GetRequest) (*pb return nil, status.Errorf(status.Code(err), "Failed to retrieve [%s]: %v", req, err) } - return &pb.GetResponse{Value: value}, nil + ret, expTm, err := value.MarshalBinary() + if err != nil { + return nil, status.Errorf(status.Code(err), "Failed to marshal [%s]: %v", req, err) + } + + return &pb.GetResponse{Value: ret, Expire: expTm.UnixMilli()}, nil } diff --git a/grpc/unsafe_byte_codec.go b/grpc/unsafe_byte_codec.go index 1ff1d7d0..c215cd76 100644 --- a/grpc/unsafe_byte_codec.go +++ b/grpc/unsafe_byte_codec.go @@ -1,17 +1,23 @@ package grpc +import "time" + // unsafeByteCodec is a byte slice type that implements Codec -type unsafeByteCodec []byte +type unsafeByteCodec struct { + bytes []byte + expire time.Time +} // MarshalBinary returns the contained byte-slice -func (c *unsafeByteCodec) MarshalBinary() ([]byte, error) { - return *c, nil +func (c *unsafeByteCodec) MarshalBinary() ([]byte, time.Time, error) { + return c.bytes, c.expire, nil } // UnmarshalBinary to provided data so they share the same backing array // this is a generally unsafe performance optimization, but safe in the context // of the gRPC server. -func (c *unsafeByteCodec) UnmarshalBinary(data []byte) error { - *c = data +func (c *unsafeByteCodec) UnmarshalBinary(data []byte, expire time.Time) error { + c.bytes = data + c.expire = expire return nil } diff --git a/hotcache.go b/hotcache.go index 9521734a..f7a9de42 100644 --- a/hotcache.go +++ b/hotcache.go @@ -57,14 +57,15 @@ type keyStats struct { dQPS dampedQPS } -func newValWithStat(data []byte, kStats *keyStats) *valWithStat { +func newValWithStat(data []byte, kStats *keyStats, expire time.Time) *valWithStat { if kStats == nil { kStats = &keyStats{dampedQPS{period: time.Second}} } return &valWithStat{ - data: data, - stats: kStats, + data: data, + stats: kStats, + expire: expire, } } @@ -126,19 +127,19 @@ func (d *dampedQPS) val(now time.Time) float64 { return d.curDQPS } -func (g *Galaxy) addNewToCandidateCache(key string) *keyStats { +func (g *Galaxy) addNewToCandidateCache(key string, expire time.Time) *keyStats { kStats := &keyStats{ dQPS: dampedQPS{ period: time.Second, }, } - g.candidateCache.addToCandidateCache(key, kStats) + g.candidateCache.addToCandidateCache(key, kStats, expire) return kStats } -func (c *cache) addToCandidateCache(key string, kStats *keyStats) { +func (c *cache) addToCandidateCache(key string, kStats *keyStats, expire time.Time) { c.mu.Lock() defer c.mu.Unlock() - c.lru.Add(key, kStats) + c.lru.Add(key, kStats, expire) } diff --git a/http/http.go b/http/http.go index 928dad0d..3743bb64 100644 --- a/http/http.go +++ b/http/http.go @@ -22,7 +22,9 @@ import ( "io/ioutil" "net/http" "net/url" + "strconv" "strings" + "time" gc "github.com/vimeo/galaxycache" @@ -32,6 +34,9 @@ import ( const defaultBasePath = "/_galaxycache/" +// When the retrieved value should expire. Unix timestamp in milliseconds. +const ttlHeader = "X-Galaxycache-Expire" + // HTTPFetchProtocol specifies HTTP specific options for HTTP-based // peer communication type HTTPFetchProtocol struct { @@ -181,7 +186,13 @@ func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } w.Header().Set("Content-Type", "application/octet-stream") - w.Write(value) + b, expTm, err := value.MarshalBinary() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set(ttlHeader, fmt.Sprintf("%d", expTm.UnixMilli())) + w.Write(b) } type httpFetcher struct { @@ -190,7 +201,7 @@ type httpFetcher struct { } // Fetch here implements the RemoteFetcher interface for sending a GET request over HTTP to a peer -func (h *httpFetcher) Fetch(ctx context.Context, galaxy string, key string) ([]byte, error) { +func (h *httpFetcher) Fetch(ctx context.Context, galaxy string, key string) ([]byte, time.Time, error) { u := fmt.Sprintf( "%v%v/%v", h.baseURL, @@ -199,21 +210,30 @@ func (h *httpFetcher) Fetch(ctx context.Context, galaxy string, key string) ([]b ) req, err := http.NewRequest("GET", u, nil) if err != nil { - return nil, err + return nil, time.Time{}, err } res, err := h.transport.RoundTrip(req.WithContext(ctx)) if err != nil { - return nil, err + return nil, time.Time{}, err } defer res.Body.Close() if res.StatusCode != http.StatusOK { - return nil, fmt.Errorf("server returned HTTP response status code: %v", res.Status) + return nil, time.Time{}, fmt.Errorf("server returned HTTP response status code: %v", res.Status) } data, err := ioutil.ReadAll(res.Body) if err != nil { - return nil, fmt.Errorf("reading response body: %v", err) + return nil, time.Time{}, fmt.Errorf("reading response body: %v", err) + } + + expireStr := res.Header[ttlHeader] + if len(expireStr) == 0 { + return nil, time.Time{}, fmt.Errorf("failed reading TTL header %s", ttlHeader) + } + expire, err := strconv.ParseInt(expireStr[0], 10, 64) + if err != nil { + return nil, time.Time{}, fmt.Errorf("parsing TTL header %s: %w", ttlHeader, err) } - return data, nil + return data, time.UnixMilli(expire), nil } // Close here implements the RemoteFetcher interface for closing (does nothing for HTTP) diff --git a/http/http_test.go b/http/http_test.go index 5c3371b1..46d0c4c5 100644 --- a/http/http_test.go +++ b/http/http_test.go @@ -25,6 +25,7 @@ import ( "strings" "sync" "testing" + "time" gc "github.com/vimeo/galaxycache" @@ -86,11 +87,21 @@ func TestHTTPHandler(t *testing.T) { if err := g.Get(ctx, key, &value); err != nil { t.Fatal(err) } - if suffix := ":" + key; !strings.HasSuffix(string(value), suffix) { - t.Errorf("Get(%q) = %q, want value ending in %q", key, value, suffix) + ret, expTm, err := value.MarshalBinary() + if err != nil { + t.Fatal(err) + } + if expTm.Equal(time.Time{}) { + t.Fatal("expiry time must be set") } - t.Logf("Get key=%q, value=%q (peer:key)", key, value) + if suffix := ":" + key; !strings.HasSuffix(string(ret), suffix) { + t.Errorf("Get(%q) = %q, want value ending in %q", key, ret, suffix) + } + t.Logf("Get key=%q, value=%q (peer:key)", key, ret) } + + currentExp := time.Now().Add(2 * time.Second) + time.Sleep(5 * time.Second) // Try it again, this time with a slash in the middle to ensure we're // handling those characters properly for _, key := range testKeys(nGets) { @@ -99,10 +110,19 @@ func TestHTTPHandler(t *testing.T) { if err := g.Get(ctx, testKey, &value); err != nil { t.Fatal(err) } - if suffix := ":" + testKey; !strings.HasSuffix(string(value), suffix) { - t.Errorf("Get(%q) = %q, want value ending in %q", key, value, suffix) + ret, expTm, err := value.MarshalBinary() + if err != nil { + t.Fatal(err) + } + + // Ensure that the keys were regenerated. + if expTm.Before(currentExp) { + t.Fatalf("expected key to expire after the current time i.e. it should be regenerated (%s, %s)", expTm, currentExp) + } + if suffix := ":" + testKey; !strings.HasSuffix(string(ret), suffix) { + t.Errorf("Get(%q) = %q, want value ending in %q", key, ret, suffix) } - t.Logf("Get key=%q, value=%q (peer:key)", testKey, value) + t.Logf("Get key=%q, value=%q (peer:key)", testKey, ret) } }) @@ -119,7 +139,7 @@ func makeHTTPServerUniverse(ctx context.Context, t testing.TB, galaxyName string t.Errorf("Error setting peers: %s", err) } getter := gc.GetterFunc(func(ctx context.Context, key string, dest gc.Codec) error { - dest.UnmarshalBinary([]byte(":" + key)) + dest.UnmarshalBinary([]byte(":"+key), time.Now().Add(2*time.Second)) return nil }) universe.NewGalaxy(galaxyName, 1<<20, getter) diff --git a/lru/lru.go b/lru/lru.go index a9f42c3f..11176448 100644 --- a/lru/lru.go +++ b/lru/lru.go @@ -19,6 +19,7 @@ package lru // import "github.com/vimeo/galaxycache/lru" import ( "container/list" + "time" ) // Cache is an LRU cache. It is not safe for concurrent access. @@ -39,8 +40,9 @@ type Cache struct { type Key interface{} type entry struct { - key Key - value interface{} + key Key + value interface{} + expire time.Time } // New creates a new Cache. @@ -55,7 +57,7 @@ func New(maxEntries int) *Cache { } // Add adds a value to the cache. -func (c *Cache) Add(key Key, value interface{}) { +func (c *Cache) Add(key Key, value interface{}, expire time.Time) { if c.cache == nil { c.cache = make(map[interface{}]*list.Element) c.ll = list.New() @@ -65,7 +67,7 @@ func (c *Cache) Add(key Key, value interface{}) { ele.Value.(*entry).value = value return } - ele := c.ll.PushFront(&entry{key, value}) + ele := c.ll.PushFront(&entry{key, value, expire}) c.cache[key] = ele if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries { c.RemoveOldest() @@ -78,8 +80,15 @@ func (c *Cache) Get(key Key) (value interface{}, ok bool) { return } if ele, hit := c.cache[key]; hit { + entry := ele.Value.(*entry) + // If the entry has expired, remove it from the cache + if !entry.expire.IsZero() && entry.expire.Before(time.Now()) { + c.removeElement(ele) + return nil, false + } + c.ll.MoveToFront(ele) - return ele.Value.(*entry).value, true + return entry.value, true } return } diff --git a/lru/lru_test.go b/lru/lru_test.go index 870fe38c..16622f59 100644 --- a/lru/lru_test.go +++ b/lru/lru_test.go @@ -19,6 +19,7 @@ package lru import ( "fmt" "testing" + "time" ) type simpleStruct struct { @@ -48,7 +49,7 @@ func TestGet(t *testing.T) { for _, tt := range getTests { lru := New(0) - lru.Add(tt.keyToAdd, 1234) + lru.Add(tt.keyToAdd, 1234, time.Time{}) val, ok := lru.Get(tt.keyToGet) if ok != tt.expectedOk { t.Fatalf("%s: cache hit = %v; want %v", tt.name, ok, !ok) @@ -60,7 +61,7 @@ func TestGet(t *testing.T) { func TestRemove(t *testing.T) { lru := New(0) - lru.Add("myKey", 1234) + lru.Add("myKey", 1234, time.Time{}) if val, ok := lru.Get("myKey"); !ok { t.Fatal("TestRemove returned no match") } else if val != 1234 { @@ -82,7 +83,7 @@ func TestEvict(t *testing.T) { lru := New(20) lru.OnEvicted = onEvictedFun for i := 0; i < 22; i++ { - lru.Add(fmt.Sprintf("myKey%d", i), 1234) + lru.Add(fmt.Sprintf("myKey%d", i), 1234, time.Time{}) } if len(evictedKeys) != 2 { @@ -95,3 +96,28 @@ func TestEvict(t *testing.T) { t.Fatalf("got %v in second evicted key; want %s", evictedKeys[1], "myKey1") } } + +func TestExpire(t *testing.T) { + var tests = []struct { + name string + key interface{} + expectedOk bool + expire time.Duration + wait time.Duration + }{ + {"not-expired", "myKey", true, time.Second * 1, time.Duration(0)}, + {"expired", "expiredKey", false, time.Millisecond * 100, time.Millisecond * 150}, + } + + for _, tt := range tests { + lru := New(0) + lru.Add(tt.key, 1234, time.Now().Add(tt.expire)) + time.Sleep(tt.wait) + val, ok := lru.Get(tt.key) + if ok != tt.expectedOk { + t.Fatalf("%s: cache hit = %v; want %v", tt.name, ok, !ok) + } else if ok && val != 1234 { + t.Fatalf("%s expected get to return 1234 but got %v", tt.name, val) + } + } +} diff --git a/peers.go b/peers.go index bf86733c..57f78cb9 100644 --- a/peers.go +++ b/peers.go @@ -28,6 +28,7 @@ import ( "errors" "fmt" "sync" + "time" "github.com/vimeo/galaxycache/consistenthash" ) @@ -38,7 +39,8 @@ const defaultReplicas = 50 // other peers; the PeerPicker contains a map of these fetchers corresponding // to each other peer address type RemoteFetcher interface { - Fetch(context context.Context, galaxy string, key string) ([]byte, error) + // The value and when it should expire. + Fetch(context context.Context, galaxy string, key string) ([]byte, time.Time, error) // Close closes a client-side connection (may be a nop) Close() error } @@ -166,8 +168,8 @@ func (n *NullFetchProtocol) NewFetcher(url string) (RemoteFetcher, error) { type nullFetchFetcher struct{} -func (n *nullFetchFetcher) Fetch(context context.Context, galaxy string, key string) ([]byte, error) { - return nil, errors.New("empty fetcher") +func (n *nullFetchFetcher) Fetch(context context.Context, galaxy string, key string) ([]byte, time.Time, error) { + return nil, time.Time{}, errors.New("empty fetcher") } // Close closes a client-side connection (may be a nop) diff --git a/proto.sh b/proto.sh new file mode 100644 index 00000000..4927f6d4 --- /dev/null +++ b/proto.sh @@ -0,0 +1,2 @@ +#!/bin/bash +protoc --go-grpc_out=. --proto_path=galaxycachepb --go_out=galaxycachepb --go_opt=paths=source_relative galaxycache.proto