Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eventloop: gloo snapshots #83

Merged
merged 9 commits into from
May 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .circleci/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,7 @@ RUN go get -v github.com/golang/protobuf/...
RUN mkdir -p ${GOPATH}/src/k8s.io && \
git clone https://github.com/kubernetes/code-generator ${GOPATH}/src/k8s.io/code-generator

RUN git clone https://github.com/kubernetes/apimachinery ${GOPATH}/src/k8s.io/apimachinery


CMD ["/bin/bash"]
7 changes: 3 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,12 @@ hackrun: $(BINARY)
./hack/run-local.sh

unit:
ginkgo -r -v pkg/ internal/
ginkgo -r pkg/ internal/

e2e:
ginkgo -r -v test/

test: e2e unit
ginkgo -r test/

test: unit e2e



Expand Down
4 changes: 1 addition & 3 deletions cmd/control-plane/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ var rootCmd = &cobra.Command{
if err != nil {
return errors.Wrap(err, "setting up event loop")
}
if err := eventLoop.Run(stop); err != nil {
return errors.Wrap(err, "failed running event loop")
}
eventLoop.Run(stop)
return nil
},
}
Expand Down
7 changes: 4 additions & 3 deletions internal/control-plane/configwatcher/config_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/solo-io/gloo/pkg/api/types/v1"
"github.com/solo-io/gloo/pkg/log"
"github.com/solo-io/gloo/pkg/storage"
"github.com/gogo/protobuf/proto"
)

type configWatcher struct {
Expand Down Expand Up @@ -63,7 +64,7 @@ func NewConfigWatcher(storageClient storage.Interface) (*configWatcher, error) {
log.GreyPrintf("change detected in upstream: %v", diff)

cache.Upstreams = updatedList
configs <- cache
configs <- proto.Clone(cache).(*v1.Config)
}
upstreamWatcher, err := storageClient.V1().Upstreams().Watch(&storage.UpstreamEventHandlerFuncs{
AddFunc: syncUpstreams,
Expand All @@ -86,7 +87,7 @@ func NewConfigWatcher(storageClient storage.Interface) (*configWatcher, error) {
log.GreyPrintf("change detected in virtualservices: %v", diff)

cache.VirtualServices = updatedList
configs <- cache
configs <- proto.Clone(cache).(*v1.Config)
}
vServiceWatcher, err := storageClient.V1().VirtualServices().Watch(&storage.VirtualServiceEventHandlerFuncs{
AddFunc: syncvServices,
Expand All @@ -110,7 +111,7 @@ func NewConfigWatcher(storageClient storage.Interface) (*configWatcher, error) {
log.GreyPrintf("change detected in virtualservices: %v", diff)

cache.Roles = updatedList
configs <- cache
configs <- proto.Clone(cache).(*v1.Config)
}
roleWatcher, err := storageClient.V1().Roles().Watch(&storage.RoleEventHandlerFuncs{
AddFunc: syncroles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func createKubeResources() {
}
upstreams = append(upstreams, kubeUpstreams...)
}

func createConsulResources() {
cfg := api.DefaultConfig()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ func TestEndpointswatcher(t *testing.T) {

}

var opts = bootstrap.Options {
KubeOptions: bootstrap.KubeOptions{
KubeConfig: filepath.Join(os.Getenv("HOME"), ".kube", "config"),
},
}
var opts bootstrap.Options

// consul vars
var (
Expand All @@ -62,6 +58,13 @@ var _ = BeforeSuite(func() {
err = helpers.SetupKubeForTest(namespace)
helpers.Must(err)

opts = bootstrap.Options{
KubeOptions: bootstrap.KubeOptions{
Namespace: namespace,
KubeConfig: filepath.Join(os.Getenv("HOME"), ".kube", "config"),
},
}

cfg, err := clientcmd.BuildConfigFromFlags(opts.KubeOptions.MasterURL, opts.KubeOptions.KubeConfig)
Expect(err).NotTo(HaveOccurred())

Expand Down
181 changes: 35 additions & 146 deletions internal/control-plane/eventloop/eventloop.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package eventloop


import (
envoycache "github.com/envoyproxy/go-control-plane/pkg/cache"
"github.com/mitchellh/hashstructure"
"github.com/pkg/errors"

"github.com/solo-io/gloo/internal/control-plane/bootstrap"
Expand All @@ -16,22 +14,17 @@ import (
"github.com/solo-io/gloo/pkg/bootstrap/artifactstorage"
"github.com/solo-io/gloo/pkg/bootstrap/configstorage"
secretwatchersetup "github.com/solo-io/gloo/pkg/bootstrap/secretwatcher"
"github.com/solo-io/gloo/pkg/endpointdiscovery"
"github.com/solo-io/gloo/pkg/log"
"github.com/solo-io/gloo/pkg/plugins"
"github.com/solo-io/gloo/pkg/secretwatcher"
"github.com/solo-io/gloo/internal/control-plane/endpointswatcher"
"github.com/solo-io/gloo/internal/control-plane/snapshot"
)

type eventLoop struct {
configWatcher configwatcher.Interface
secretWatcher secretwatcher.Interface
fileWatcher filewatcher.Interface
endpointsWatcher endpointdiscovery.Interface
reporter reporter.Interface
translator *translator.Translator
xdsConfig envoycache.SnapshotCache
getDependencies func(cfg *v1.Config) []*plugins.Dependencies
snapshotEmitter *snapshot.Emitter
reporter reporter.Interface
translator *translator.Translator
xdsConfig envoycache.SnapshotCache
}

func Setup(opts bootstrap.Options, xdsPort int, stop <-chan struct{}) (*eventLoop, error) {
Expand Down Expand Up @@ -66,6 +59,9 @@ func Setup(opts bootstrap.Options, xdsPort int, stop <-chan struct{}) (*eventLoo

endpointsWatcher := endpointswatcher.NewEndpointsWatcher(opts.Options, edPlugins...)

snapshotEmitter := snapshot.NewEmitter(cfgWatcher, secretWatcher,
fileWatcher, endpointsWatcher, getDependenciesFor(plugs))

trans := translator.NewTranslator(opts.IngressOptions, plugs)

// create a snapshot to give to misconfigured envoy instances
Expand All @@ -77,14 +73,10 @@ func Setup(opts bootstrap.Options, xdsPort int, stop <-chan struct{}) (*eventLoo
}

e := &eventLoop{
configWatcher: cfgWatcher,
secretWatcher: secretWatcher,
fileWatcher: fileWatcher,
endpointsWatcher: endpointsWatcher,
translator: trans,
xdsConfig: xdsConfig,
getDependencies: getDependenciesFor(plugs),
reporter: reporter.NewReporter(store),
snapshotEmitter: snapshotEmitter,
translator: trans,
xdsConfig: xdsConfig,
reporter: reporter.NewReporter(store),
}

return e, nil
Expand Down Expand Up @@ -112,77 +104,40 @@ func setupFileWatcher(opts bootstrap.Options) (filewatcher.Interface, error) {
return filewatcher.NewFileWatcher(store)
}

func (e *eventLoop) Run(stop <-chan struct{}) error {
go e.configWatcher.Run(stop)
go e.fileWatcher.Run(stop)
go e.secretWatcher.Run(stop)
go e.endpointsWatcher.Run(stop)

workerErrors := e.errors()
func (e *eventLoop) Run(stop <-chan struct{}) {
go e.snapshotEmitter.Run(stop)

// cache the most recent read for any of these
var hash uint64
current := newCache()
sync := func(current *cache) {
newHash := current.hash()
if hash == newHash {
return
}
hash = newHash
e.updateXds(current)
}
var oldHash uint64
for {
select {
case cfg := <-e.configWatcher.Config():
log.Debugf("change triggered by config")
current.cfg = cfg
dependencies := e.getDependencies(cfg)
var secretRefs, fileRefs []string
for _, dep := range dependencies {
secretRefs = append(secretRefs, dep.SecretRefs...)
fileRefs = append(fileRefs, dep.FileRefs...)
}
// secrets for virtualservices
for _, vService := range cfg.VirtualServices {
if vService.SslConfig != nil && vService.SslConfig.SecretRef != "" {
secretRefs = append(secretRefs, vService.SslConfig.SecretRef)
}
case <-stop:
log.Printf("event loop shutting down")
return
case snap := <-e.snapshotEmitter.Snapshot():
newHash := snap.Hash()
log.Printf( "\nold hash: %v\nnew hash: %v", oldHash, newHash)
if newHash == oldHash {
continue
}
go e.secretWatcher.TrackSecrets(secretRefs)
go e.fileWatcher.TrackFiles(fileRefs)
go e.endpointsWatcher.TrackUpstreams(cfg.Upstreams)

sync(current)
case secrets := <-e.secretWatcher.Secrets():
log.Debugf("change triggered by secrets")
current.secrets = secrets
sync(current)
case files := <-e.fileWatcher.Files():
log.Debugf("change triggered by files")
current.files = files
sync(current)
case endpoints := <-e.endpointsWatcher.Endpoints():
log.Debugf("change triggered by endpoints")
current.endpoints = endpoints
sync(current)
case err := <-workerErrors:
log.Debugf("new snapshot received")
oldHash = newHash
e.updateXds(snap)
case err := <-e.snapshotEmitter.Error():
log.Warnf("error in control plane event loop: %v", err)
}
}
}

func (e *eventLoop) updateXds(cache *cache) {
if !cache.ready() {
log.Debugf("cache is not fully constructed to produce a first snapshot yet")
func (e *eventLoop) updateXds(snap *snapshot.Cache) {
if !snap.Ready() {
log.Debugf("snapshot is not ready for translation yet")
return
}

snapshot, reports, err := e.translator.Translate(translator.Inputs{
Cfg: cache.cfg,
Secrets: cache.secrets,
Files: cache.files,
Endpoints: cache.endpoints,
})
log.Debugf("Gloo Snapshot: %v", snap)

xdsSnapshot, reports, err := e.translator.Translate(snap)
if err != nil {
// TODO: panic or handle these internal errors smartly
log.Warnf("failed to translate based on the latest config: %v", err)
Expand All @@ -199,72 +154,6 @@ func (e *eventLoop) updateXds(cache *cache) {
}
}

log.Debugf("Setting xDS Snapshot for Role %v: %v", "ingress", snapshot)
e.xdsConfig.SetSnapshot("ingress", *snapshot)
}

// fan out to cover all channels that return errors
func (e *eventLoop) errors() <-chan error {
aggregatedErrorsChan := make(chan error)
go func() {
for err := range e.configWatcher.Error() {
aggregatedErrorsChan <- errors.Wrap(err, "config watcher encountered an error")
}
}()
go func() {
for err := range e.secretWatcher.Error() {
aggregatedErrorsChan <- errors.Wrap(err, "secret watcher encountered an error")
}
}()
go func() {
for err := range e.fileWatcher.Error() {
aggregatedErrorsChan <- errors.Wrap(err, "file watcher encountered an error")
}
}()
go func() {
for err := range e.endpointsWatcher.Error() {
aggregatedErrorsChan <- errors.Wrap(err, "endpoints watcher encountered an error")
}
}()
return aggregatedErrorsChan
}

// cache contains the latest "gloo snapshot"
type cache struct {
cfg *v1.Config
secrets secretwatcher.SecretMap
files filewatcher.Files
// need to separate endpoints by the service who discovered them
endpoints endpointdiscovery.EndpointGroups
}

func newCache() *cache {
return &cache{}
}

// ready doesn't necessarily tell us whetehr endpoints have been discovered yet
// but that's okay. envoy won't mind
func (c *cache) ready() bool {
return c.cfg != nil
}

func (c *cache) hash() uint64 {
h0, err := hashstructure.Hash(*c.cfg, nil)
if err != nil {
panic(err)
}
h1, err := hashstructure.Hash(c.secrets, nil)
if err != nil {
panic(err)
}
h2, err := hashstructure.Hash(c.endpoints, nil)
if err != nil {
panic(err)
}
h3, err := hashstructure.Hash(c.files, nil)
if err != nil {
panic(err)
}
h := h0 + h1 + h2 + h3
return h
log.Debugf("Setting xDS Snapshot for Role %v: %v", "ingress", xdsSnapshot)
e.xdsConfig.SetSnapshot("ingress", *xdsSnapshot)
}
Loading