Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: skip callback if xds client reports error #3240

Merged
merged 2 commits into from
Dec 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions xds/internal/balancer/xds_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package balancer

import (
"fmt"
"testing"
"time"

xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
Expand All @@ -34,9 +36,11 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/lrs"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/client/fakexds"
"google.golang.org/grpc/xds/internal/testutils"
)

const (
Expand Down Expand Up @@ -164,6 +168,72 @@ func (s) TestEDSClientResponseHandling(t *testing.T) {
}
}

type testXDSClient struct {
clusterNameChan *testutils.Channel
edsCb func(*xdsclient.EDSUpdate, error)
}

func newTestXDSClient() *testXDSClient {
return &testXDSClient{
clusterNameChan: testutils.NewChannel(),
}
}

func (c *testXDSClient) WatchEDS(clusterName string, edsCb func(*xdsclient.EDSUpdate, error)) (cancel func()) {
c.clusterNameChan.Send(clusterName)
c.edsCb = edsCb
return func() {}
}

func (c *testXDSClient) ReportLoad(server string, clusterName string, loadStore lrs.Store) (cancel func()) {
panic("implement me 2")
}

func (c *testXDSClient) Close() {
panic("implement me 3")
}

// Test that error from the xds client is handled correctly.
func (s) TestEDSClientResponseErrorHandling(t *testing.T) {
td, cleanup := fakexds.StartServer(t)
defer cleanup()
edsRespChan := make(chan *xdsclient.EDSUpdate, 10)
newEDS := func(i *xdsclient.EDSUpdate) error {
edsRespChan <- i
return nil
}

client := newXDSClientWrapper(newEDS, func() {}, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil)
defer client.close()

// Create a client to be passed in attributes.
c := newTestXDSClient()
client.handleUpdate(&XDSConfig{
BalancerName: td.Address,
EDSServiceName: testEDSClusterName,
LrsLoadReportingServerName: nil,
}, attributes.New(xdsinternal.XDSClientID, c),
)

if gotClusterName, err := c.clusterNameChan.Receive(); err != nil || gotClusterName != testEDSClusterName {
t.Fatalf("got EDS watch clusterName %v, expected: %v", gotClusterName, testEDSClusterName)
}
c.edsCb(nil, fmt.Errorf("testing err"))

// The ballback is called with an error, expect no update from edsRespChan.
//
// TODO: check for loseContact() when errors indicating "lose contact" are
// handled correctly.

timer := time.NewTimer(time.Second)
defer timer.Stop()
select {
case <-timer.C:
case resp := <-edsRespChan:
t.Fatalf("unexpected resp: %v", resp)
}
}

// Test that if xds_client is in attributes, the xdsclientnew function will not
// be called, and the xds_client from attributes will be used.
//
Expand Down
7 changes: 5 additions & 2 deletions xds/internal/balancer/xds_client_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,11 @@ func (c *xdsclientWrapper) startEDSWatch(nameToWatch string) {

c.edsServiceName = nameToWatch
c.cancelEDSWatch = c.xdsclient.WatchEDS(c.edsServiceName, func(update *xdsclient.EDSUpdate, err error) {
// TODO: this should trigger a call to `c.loseContact`, when the error
// indicates "lose contact".
if err != nil {
// TODO: this should trigger a call to `c.loseContact`, when the
// error indicates "lose contact".
return
}
if err := c.newEDSUpdate(update); err != nil {
grpclog.Warningf("xds: processing new EDS update failed due to %v.", err)
}
Expand Down