From e8e94944a41b429ccef0208403504384ad867ccc Mon Sep 17 00:00:00 2001 From: Renuka Fernando Date: Fri, 28 Oct 2022 14:39:34 +0530 Subject: [PATCH] Temp: Using ADS client from GCP Signed-off-by: Renuka Fernando --- src/provider/xds_grpc_sotw_provider.go | 75 +++++--------------------- 1 file changed, 13 insertions(+), 62 deletions(-) diff --git a/src/provider/xds_grpc_sotw_provider.go b/src/provider/xds_grpc_sotw_provider.go index c5b4a2074..553b31cd4 100644 --- a/src/provider/xds_grpc_sotw_provider.go +++ b/src/provider/xds_grpc_sotw_provider.go @@ -5,12 +5,9 @@ import ( "fmt" "io" - core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/golang/protobuf/ptypes/any" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" logger "github.com/sirupsen/logrus" - "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -24,6 +21,7 @@ import ( "google.golang.org/grpc/codes" grpcStatus "google.golang.org/grpc/status" + "github.com/envoyproxy/go-control-plane/pkg/adsclient/sotw/v3" rls_conf_v3 "github.com/envoyproxy/go-control-plane/ratelimit/config/ratelimit/v3" ) @@ -38,25 +36,22 @@ type XdsGrpcSotwProvider struct { configUpdateEventChan chan ConfigUpdateEvent statsManager stats.Manager ctx context.Context - // xdsStream is the ADS stream client - xdsStream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient - // lastAckedResponse is the last response acked by the rate limit service - lastAckedResponse *discovery.DiscoveryResponse - // lastReceivedResponse is the last response received from xDS server - lastReceivedResponse *discovery.DiscoveryResponse + adsClient sotw.AdsClient // connectionRetryChannel is the channel which trigger true for connection issues connectionRetryChannel chan bool } // NewXdsGrpcSotwProvider initializes xDS listener and returns the xDS provider. func NewXdsGrpcSotwProvider(settings settings.Settings, statsManager stats.Manager) RateLimitConfigProvider { + ctx := context.Background() p := &XdsGrpcSotwProvider{ settings: settings, statsManager: statsManager, - ctx: context.Background(), + ctx: ctx, configUpdateEventChan: make(chan ConfigUpdateEvent), connectionRetryChannel: make(chan bool), loader: config.NewRateLimitConfigLoaderImpl(), + adsClient: sotw.NewAdsClient(ctx, settings.ConfigGrpcXdsNodeId, configTypeURL), } go p.initXdsClient() return p @@ -92,27 +87,14 @@ func (p *XdsGrpcSotwProvider) initializeAndWatch() *grpc.ClientConn { p.connectionRetryChannel <- true return nil } + p.adsClient.InitConnect(conn) go p.watchConfigs() - - var lastAppliedVersion string - if p.lastAckedResponse != nil { - // If the connection is interrupted in the middle, apply the previously applied version - lastAppliedVersion = p.lastAckedResponse.VersionInfo - } else { - lastAppliedVersion = "" - } - discoveryRequest := &discovery.DiscoveryRequest{ - Node: &core.Node{Id: p.settings.ConfigGrpcXdsNodeId}, - VersionInfo: lastAppliedVersion, - TypeUrl: configTypeURL, - } - p.xdsStream.Send(discoveryRequest) return conn } func (p *XdsGrpcSotwProvider) watchConfigs() { for { - discoveryResponse, err := p.xdsStream.Recv() + resp, err := p.adsClient.Recv() if err == io.EOF { // reinitialize again, if stream ends logger.Error("EOF is received from xDS Configuration Server") @@ -130,12 +112,10 @@ func (p *XdsGrpcSotwProvider) watchConfigs() { } logger.Errorf("Error while xDS communication; errorCode: %s errorMessage: %s", errStatus.Code().String(), errStatus.Message()) - p.nack(errStatus.Message()) + p.adsClient.Nack(errStatus.Message()) } else { - p.lastReceivedResponse = discoveryResponse - logger.Debugf("Discovery response is received from xDS Configuration Server with response version: %s", discoveryResponse.VersionInfo) - logger.Tracef("Discovery response received from xDS Configuration Server: %v", discoveryResponse) - p.sendConfigs(discoveryResponse.Resources) + logger.Tracef("Discovery response received from xDS Configuration Server: %v", resp) + p.sendConfigs(resp.Resources) } } } @@ -146,7 +126,6 @@ func (p *XdsGrpcSotwProvider) initConnection() (*grpc.ClientConn, error) { logger.Errorf("Error initializing gRPC connection to xDS Configuration Server: %s", err.Error()) return nil, err } - p.xdsStream, err = discovery.NewAggregatedDiscoveryServiceClient(conn).StreamAggregatedResources(context.Background()) if err != nil { logger.Errorf("Error initializing gRPC stream to xDS Configuration Server: %s", err.Error()) return nil, err @@ -184,7 +163,7 @@ func (p *XdsGrpcSotwProvider) sendConfigs(resources []*any.Any) { defer func() { if e := recover(); e != nil { p.configUpdateEventChan <- &ConfigUpdateEventImpl{err: e} - p.nack(fmt.Sprint(e)) + p.adsClient.Nack(fmt.Sprint(e)) } }() @@ -194,7 +173,7 @@ func (p *XdsGrpcSotwProvider) sendConfigs(resources []*any.Any) { err := anypb.UnmarshalTo(res, confPb, proto.UnmarshalOptions{}) // err := ptypes.UnmarshalAny(res, config) if err != nil { logger.Errorf("Error while unmarshalling config from xDS Configuration Server: %s", err.Error()) - p.nack(err.Error()) + p.adsClient.Nack(err.Error()) return } @@ -204,33 +183,5 @@ func (p *XdsGrpcSotwProvider) sendConfigs(resources []*any.Any) { rlSettings := settings.NewSettings() rlsConf := p.loader.Load(conf, p.statsManager, rlSettings.MergeDomainConfigurations) p.configUpdateEventChan <- &ConfigUpdateEventImpl{config: rlsConf} - p.ack() -} - -func (p *XdsGrpcSotwProvider) ack() { - p.lastAckedResponse = p.lastReceivedResponse - discoveryRequest := &discovery.DiscoveryRequest{ - Node: &core.Node{Id: p.settings.ConfigGrpcXdsNodeId}, - VersionInfo: p.lastAckedResponse.VersionInfo, - TypeUrl: configTypeURL, - ResponseNonce: p.lastReceivedResponse.Nonce, - } - p.xdsStream.Send(discoveryRequest) -} - -func (p *XdsGrpcSotwProvider) nack(errorMessage string) { - discoveryRequest := &discovery.DiscoveryRequest{ - Node: &core.Node{Id: p.settings.ConfigGrpcXdsNodeId}, - TypeUrl: configTypeURL, - ErrorDetail: &status.Status{ - Message: errorMessage, - }, - } - if p.lastAckedResponse != nil { - discoveryRequest.VersionInfo = p.lastAckedResponse.VersionInfo - } - if p.lastReceivedResponse != nil { - discoveryRequest.ResponseNonce = p.lastReceivedResponse.Nonce - } - p.xdsStream.Send(discoveryRequest) + p.adsClient.Ack() }