Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache: rework snapshot api #484

Merged
merged 4 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed h1:OZmjad4L3H8ncOIR8rnb5MREYqG8ixi5+WbeUsquF0c=
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158 h1:CevA8fI91PAnP8vpnXuB8ZYAZ5wqY86nAbxfgK8tWO4=
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
Expand Down
15 changes: 6 additions & 9 deletions internal/example/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,11 @@ func makeConfigSource() *core.ConfigSource {
}

func GenerateSnapshot() cache.Snapshot {
return cache.NewSnapshot(
"1",
[]types.Resource{}, // endpoints
[]types.Resource{makeCluster(ClusterName)},
[]types.Resource{makeRoute(RouteName, ClusterName)},
[]types.Resource{makeHTTPListener(ListenerName, RouteName)},
[]types.Resource{}, // runtimes
[]types.Resource{}, // secrets
[]types.Resource{}, // extension configs
return cache.NewSnapshot("1",
map[resource.Type][]types.Resource{
resource.ClusterType: {makeCluster(ClusterName)},
resource.RouteType: {makeRoute(RouteName, ClusterName)},
resource.ListenerType: {makeHTTPListener(ListenerName, RouteName)},
},
)
}
2 changes: 1 addition & 1 deletion pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
id := fmt.Sprintf("%d", i%2)
responses := make(chan cache.DeltaResponse, 1)
if i < 25 {
snap := cache.Snapshot{}
snap := cache.NewSnapshot("", map[rsrc.Type][]types.Resource{})
snap.Resources[types.Endpoint] = cache.NewResources(version, []types.Resource{resource.MakeEndpoint(clusterName, uint32(i))})
if err := c.SetSnapshot(context.Background(), key, snap); err != nil {
t.Fatalf("snapshot failed: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/v3/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

// GetResponseType returns the enumeration for a valid xDS type URL
func GetResponseType(typeURL string) types.ResponseType {
func GetResponseType(typeURL resource.Type) types.ResponseType {
switch typeURL {
case resource.EndpointType:
return types.Endpoint
Expand Down
47 changes: 47 additions & 0 deletions pkg/cache/v3/resources.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package cache

import "github.com/envoyproxy/go-control-plane/pkg/cache/types"

// Resources is a versioned group of resources.
type Resources struct {
// Version information.
Version string

// Items in the group indexed by name.
Items map[string]types.ResourceWithTTL
}

// IndexResourcesByName creates a map from the resource name to the resource.
func IndexResourcesByName(items []types.ResourceWithTTL) map[string]types.ResourceWithTTL {
indexed := make(map[string]types.ResourceWithTTL)
for _, item := range items {
indexed[GetResourceName(item.Resource)] = item
}
return indexed
}

// IndexRawResourcesByName creates a map from the resource name to the resource.
func IndexRawResourcesByName(items []types.Resource) map[string]types.Resource {
indexed := make(map[string]types.Resource)
for _, item := range items {
indexed[GetResourceName(item)] = item
}
return indexed
}

// NewResources creates a new resource group.
func NewResources(version string, items []types.Resource) Resources {
itemsWithTTL := []types.ResourceWithTTL{}
for _, item := range items {
itemsWithTTL = append(itemsWithTTL, types.ResourceWithTTL{Resource: item})
}
return NewResourcesWithTTL(version, itemsWithTTL)
}

// NewResourcesWithTTL creates a new resource group.
func NewResourcesWithTTL(version string, items []types.ResourceWithTTL) Resources {
return Resources{
Version: version,
Items: IndexResourcesByName(items),
}
}
84 changes: 84 additions & 0 deletions pkg/cache/v3/resources_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package cache_test

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
)

func TestIndexResourcesByName(t *testing.T) {
tests := []struct {
name string
resources []types.ResourceWithTTL
want map[string]types.ResourceWithTTL
}{
{
name: "empty",
resources: nil,
want: map[string]types.ResourceWithTTL{},
},
{
name: "more than one",
resources: []types.ResourceWithTTL{
{Resource: testEndpoint, TTL: &ttl},
{Resource: testRoute, TTL: &ttl},
},
want: map[string]types.ResourceWithTTL{
"cluster0": {Resource: testEndpoint, TTL: &ttl},
"route0": {Resource: testRoute, TTL: &ttl},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := cache.IndexResourcesByName(tt.resources)
assert.Equal(t, tt.want, got)
})
}
}

func TestIndexRawResourceByName(t *testing.T) {
tests := []struct {
name string
resources []types.Resource
want map[string]types.Resource
}{
{
name: "empty",
resources: nil,
want: map[string]types.Resource{},
},
{
name: "more than one",
resources: []types.Resource{
testEndpoint,
testRoute,
},
want: map[string]types.Resource{
"cluster0": testEndpoint,
"route0": testRoute,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := cache.IndexRawResourcesByName(tt.resources)
assert.Equal(t, tt.want, got)
})
}
}

func TestNewResources(t *testing.T) {
resources := cache.NewResources("x", []types.Resource{
testEndpoint,
testRoute,
})

assert.NotNil(t, resources.Items)
assert.Equal(t, "x", resources.Version)
}
8 changes: 4 additions & 4 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) {
for id, watch := range info.watches {
// Respond with the current version regardless of whether the version has changed.
version := snapshot.GetVersion(watch.Request.TypeUrl)
resources := snapshot.GetResourcesAndTtl(watch.Request.TypeUrl)
resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl)

// TODO(snowp): Construct this once per type instead of once per watch.
resourcesWithTTL := map[string]types.ResourceWithTTL{}
Expand Down Expand Up @@ -201,7 +201,7 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
if cache.log != nil {
cache.log.Debugf("respond open watch %d%v with new version %q", id, watch.Request.ResourceNames, version)
}
resources := snapshot.GetResourcesAndTtl(watch.Request.TypeUrl)
resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl)
err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false)
if err != nil {
return err
Expand Down Expand Up @@ -321,7 +321,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, value chan Response) f
}

// otherwise, the watch may be responded immediately
resources := snapshot.GetResourcesAndTtl(request.TypeUrl)
resources := snapshot.GetResourcesAndTTL(request.TypeUrl)
_ = cache.respond(context.Background(), request, value, resources, version, false)

return nil
Expand Down Expand Up @@ -492,7 +492,7 @@ func (cache *snapshotCache) Fetch(ctx context.Context, request *Request) (Respon
return nil, &types.SkipFetchError{}
}

resources := snapshot.GetResourcesAndTtl(request.TypeUrl)
resources := snapshot.GetResourcesAndTTL(request.TypeUrl)
out := createResponse(ctx, request, resources, version, false)
return out, nil
}
Expand Down
58 changes: 30 additions & 28 deletions pkg/cache/v3/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,26 @@ var (
version = "x"
version2 = "y"

snapshot = cache.NewSnapshot(version,
[]types.Resource{testEndpoint},
[]types.Resource{testCluster},
[]types.Resource{testRoute},
[]types.Resource{testListener},
[]types.Resource{testRuntime},
[]types.Resource{testSecret[0]},
[]types.Resource{testExtensionConfig})

ttl = 2 * time.Second

snapshotWithTTL = cache.NewSnapshotWithTtls(version,
[]types.ResourceWithTTL{{Resource: testEndpoint, TTL: &ttl}},
[]types.ResourceWithTTL{{Resource: testCluster}},
[]types.ResourceWithTTL{{Resource: testRoute}},
[]types.ResourceWithTTL{{Resource: testListener}},
[]types.ResourceWithTTL{{Resource: testRuntime}},
[]types.ResourceWithTTL{{Resource: testSecret[0]}})
snapshot = cache.NewSnapshot(version, map[rsrc.Type][]types.Resource{
rsrc.EndpointType: {testEndpoint},
rsrc.ClusterType: {testCluster},
rsrc.RouteType: {testRoute},
rsrc.ListenerType: {testListener},
rsrc.RuntimeType: {testRuntime},
rsrc.SecretType: {testSecret[0]},
rsrc.ExtensionConfigType: {testExtensionConfig},
})

ttl = 2 * time.Second
snapshotWithTTL = cache.NewSnapshotWithTTLs(version, map[rsrc.Type][]types.ResourceWithTTL{
rsrc.EndpointType: {{Resource: testEndpoint, TTL: &ttl}},
rsrc.ClusterType: {{Resource: testCluster}},
rsrc.RouteType: {{Resource: testRoute}},
rsrc.ListenerType: {{Resource: testListener}},
rsrc.RuntimeType: {{Resource: testRuntime}},
rsrc.SecretType: {{Resource: testSecret[0]}},
rsrc.ExtensionConfigType: {{Resource: testExtensionConfig}},
})

names = map[string][]string{
rsrc.EndpointType: {clusterName},
Expand Down Expand Up @@ -94,7 +96,7 @@ func (log logger) Infof(format string, args ...interface{}) { log.t.Logf(format
func (log logger) Warnf(format string, args ...interface{}) { log.t.Logf(format, args...) }
func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) }

func TestSnapshotCacheWithTtl(t *testing.T) {
func TestSnapshotCacheWithTTL(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := cache.NewSnapshotCacheWithHeartbeating(ctx, true, group{}, logger{t: t}, time.Second)
Expand Down Expand Up @@ -128,8 +130,8 @@ func TestSnapshotCacheWithTtl(t *testing.T) {
if gotVersion, _ := out.GetVersion(); gotVersion != version {
t.Errorf("got version %q, want %q", gotVersion, version)
}
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTTL.GetResourcesAndTtl(typ)) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTTL.GetResourcesAndTtl(typ))
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTTL.GetResourcesAndTTL(typ)) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTTL.GetResourcesAndTTL(typ))
}
case <-time.After(2 * time.Second):
t.Errorf("failed to receive snapshot response")
Expand All @@ -156,11 +158,11 @@ func TestSnapshotCacheWithTtl(t *testing.T) {
if gotVersion, _ := out.GetVersion(); gotVersion != version {
t.Errorf("got version %q, want %q", gotVersion, version)
}
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTTL.GetResourcesAndTtl(typ)) {
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTTL.GetResourcesAndTTL(typ)) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTTL.GetResources(typ))
}

if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTTL.GetResourcesAndTtl(typ)) {
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTTL.GetResourcesAndTTL(typ)) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTTL.GetResources(typ))
}

Expand Down Expand Up @@ -222,8 +224,8 @@ func TestSnapshotCache(t *testing.T) {
if gotVersion, _ := out.GetVersion(); gotVersion != version {
t.Errorf("got version %q, want %q", gotVersion, version)
}
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTtl(typ)) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTtl(typ))
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTTL(typ)) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTTL(typ))
}
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
Expand Down Expand Up @@ -280,8 +282,8 @@ func TestSnapshotCacheWatch(t *testing.T) {
if gotVersion, _ := out.GetVersion(); gotVersion != version {
t.Errorf("got version %q, want %q", gotVersion, version)
}
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTtl(typ)) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTtl(typ))
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTTL(typ)) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTTL(typ))
}
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
Expand Down Expand Up @@ -315,7 +317,7 @@ func TestSnapshotCacheWatch(t *testing.T) {
t.Errorf("got version %q, want %q", gotVersion, version2)
}
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot2.Resources[types.Endpoint].Items) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot2.Resources[types.Endpoint].Items)
t.Errorf("got resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot2.Resources[types.Endpoint].Items)
}
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
Expand Down
Loading