Skip to content

Commit

Permalink
Temp: Using ADS client from GCP
Browse files Browse the repository at this point in the history
Signed-off-by: Renuka Fernando <renukapiyumal@gmail.com>
  • Loading branch information
renuka-fernando committed Oct 28, 2022
1 parent be6c107 commit e8e9494
Showing 1 changed file with 13 additions and 62 deletions.
75 changes: 13 additions & 62 deletions src/provider/xds_grpc_sotw_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@ import (
"fmt"
"io"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/golang/protobuf/ptypes/any"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
logger "github.com/sirupsen/logrus"
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -24,6 +21,7 @@ import (
"google.golang.org/grpc/codes"
grpcStatus "google.golang.org/grpc/status"

"github.com/envoyproxy/go-control-plane/pkg/adsclient/sotw/v3"
rls_conf_v3 "github.com/envoyproxy/go-control-plane/ratelimit/config/ratelimit/v3"
)

Expand All @@ -38,25 +36,22 @@ type XdsGrpcSotwProvider struct {
configUpdateEventChan chan ConfigUpdateEvent
statsManager stats.Manager
ctx context.Context
// xdsStream is the ADS stream client
xdsStream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient
// lastAckedResponse is the last response acked by the rate limit service
lastAckedResponse *discovery.DiscoveryResponse
// lastReceivedResponse is the last response received from xDS server
lastReceivedResponse *discovery.DiscoveryResponse
adsClient sotw.AdsClient
// connectionRetryChannel is the channel which trigger true for connection issues
connectionRetryChannel chan bool
}

// NewXdsGrpcSotwProvider initializes xDS listener and returns the xDS provider.
func NewXdsGrpcSotwProvider(settings settings.Settings, statsManager stats.Manager) RateLimitConfigProvider {
ctx := context.Background()
p := &XdsGrpcSotwProvider{
settings: settings,
statsManager: statsManager,
ctx: context.Background(),
ctx: ctx,
configUpdateEventChan: make(chan ConfigUpdateEvent),
connectionRetryChannel: make(chan bool),
loader: config.NewRateLimitConfigLoaderImpl(),
adsClient: sotw.NewAdsClient(ctx, settings.ConfigGrpcXdsNodeId, configTypeURL),
}
go p.initXdsClient()
return p
Expand Down Expand Up @@ -92,27 +87,14 @@ func (p *XdsGrpcSotwProvider) initializeAndWatch() *grpc.ClientConn {
p.connectionRetryChannel <- true
return nil
}
p.adsClient.InitConnect(conn)
go p.watchConfigs()

var lastAppliedVersion string
if p.lastAckedResponse != nil {
// If the connection is interrupted in the middle, apply the previously applied version
lastAppliedVersion = p.lastAckedResponse.VersionInfo
} else {
lastAppliedVersion = ""
}
discoveryRequest := &discovery.DiscoveryRequest{
Node: &core.Node{Id: p.settings.ConfigGrpcXdsNodeId},
VersionInfo: lastAppliedVersion,
TypeUrl: configTypeURL,
}
p.xdsStream.Send(discoveryRequest)
return conn
}

func (p *XdsGrpcSotwProvider) watchConfigs() {
for {
discoveryResponse, err := p.xdsStream.Recv()
resp, err := p.adsClient.Recv()
if err == io.EOF {
// reinitialize again, if stream ends
logger.Error("EOF is received from xDS Configuration Server")
Expand All @@ -130,12 +112,10 @@ func (p *XdsGrpcSotwProvider) watchConfigs() {
}
logger.Errorf("Error while xDS communication; errorCode: %s errorMessage: %s",
errStatus.Code().String(), errStatus.Message())
p.nack(errStatus.Message())
p.adsClient.Nack(errStatus.Message())
} else {
p.lastReceivedResponse = discoveryResponse
logger.Debugf("Discovery response is received from xDS Configuration Server with response version: %s", discoveryResponse.VersionInfo)
logger.Tracef("Discovery response received from xDS Configuration Server: %v", discoveryResponse)
p.sendConfigs(discoveryResponse.Resources)
logger.Tracef("Discovery response received from xDS Configuration Server: %v", resp)
p.sendConfigs(resp.Resources)
}
}
}
Expand All @@ -146,7 +126,6 @@ func (p *XdsGrpcSotwProvider) initConnection() (*grpc.ClientConn, error) {
logger.Errorf("Error initializing gRPC connection to xDS Configuration Server: %s", err.Error())
return nil, err
}
p.xdsStream, err = discovery.NewAggregatedDiscoveryServiceClient(conn).StreamAggregatedResources(context.Background())
if err != nil {
logger.Errorf("Error initializing gRPC stream to xDS Configuration Server: %s", err.Error())
return nil, err
Expand Down Expand Up @@ -184,7 +163,7 @@ func (p *XdsGrpcSotwProvider) sendConfigs(resources []*any.Any) {
defer func() {
if e := recover(); e != nil {
p.configUpdateEventChan <- &ConfigUpdateEventImpl{err: e}
p.nack(fmt.Sprint(e))
p.adsClient.Nack(fmt.Sprint(e))
}
}()

Expand All @@ -194,7 +173,7 @@ func (p *XdsGrpcSotwProvider) sendConfigs(resources []*any.Any) {
err := anypb.UnmarshalTo(res, confPb, proto.UnmarshalOptions{}) // err := ptypes.UnmarshalAny(res, config)
if err != nil {
logger.Errorf("Error while unmarshalling config from xDS Configuration Server: %s", err.Error())
p.nack(err.Error())
p.adsClient.Nack(err.Error())
return
}

Expand All @@ -204,33 +183,5 @@ func (p *XdsGrpcSotwProvider) sendConfigs(resources []*any.Any) {
rlSettings := settings.NewSettings()
rlsConf := p.loader.Load(conf, p.statsManager, rlSettings.MergeDomainConfigurations)
p.configUpdateEventChan <- &ConfigUpdateEventImpl{config: rlsConf}
p.ack()
}

func (p *XdsGrpcSotwProvider) ack() {
p.lastAckedResponse = p.lastReceivedResponse
discoveryRequest := &discovery.DiscoveryRequest{
Node: &core.Node{Id: p.settings.ConfigGrpcXdsNodeId},
VersionInfo: p.lastAckedResponse.VersionInfo,
TypeUrl: configTypeURL,
ResponseNonce: p.lastReceivedResponse.Nonce,
}
p.xdsStream.Send(discoveryRequest)
}

func (p *XdsGrpcSotwProvider) nack(errorMessage string) {
discoveryRequest := &discovery.DiscoveryRequest{
Node: &core.Node{Id: p.settings.ConfigGrpcXdsNodeId},
TypeUrl: configTypeURL,
ErrorDetail: &status.Status{
Message: errorMessage,
},
}
if p.lastAckedResponse != nil {
discoveryRequest.VersionInfo = p.lastAckedResponse.VersionInfo
}
if p.lastReceivedResponse != nil {
discoveryRequest.ResponseNonce = p.lastReceivedResponse.Nonce
}
p.xdsStream.Send(discoveryRequest)
p.adsClient.Ack()
}

0 comments on commit e8e9494

Please sign in to comment.