Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable envoy configs to be grouped #75

Merged
merged 16 commits into from
May 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
name = "github.com/golang/glog"

[[constraint]]
branch = "master"
version = "v1.1.0"
name = "github.com/golang/protobuf"

[[constraint]]
Expand Down
4 changes: 2 additions & 2 deletions api/v1/virtualmesh.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import "metadata.proto";
* A Virtual Mesh is a container for a set of Virtual Services that will be used to generate a single proxy config
* to be applied to one or more Envoy nodes. The Virtual Mesh is best understood as an in-mesh application's localized view
* of the rest of the mesh.
* Each domains for each Virtual Services contained in a Virtual Mesh cannot appear more than once, or the Virtual Mesh
* Each domain for each Virtual Service contained in a Virtual Mesh cannot appear more than once, or the Virtual Mesh
* will be invalid.
*/
message VirtualMesh {
Expand All @@ -28,7 +28,7 @@ message VirtualMesh {
// One or more lowercase rfc1035/rfc1123 labels separated by '.' with a maximum length of 253 characters.
string name = 1;

// the list of names of the virtual services this vmesh includes.
// the list of names of the virtual services this vmesh encapsulates.
repeated string virtual_services = 2;

// Status indicates the validation status of the virtual mesh resource.
Expand Down
4 changes: 2 additions & 2 deletions docs/api.json
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@
"name": "VirtualMesh",
"longName": "VirtualMesh",
"fullName": "v1.VirtualMesh",
"description": "A Virtual Mesh is a container for a set of Virtual Services that will be used to generate a single proxy config\nto be applied to one or more Envoy nodes. The Virtual Mesh is best understood as an in-mesh application's localized view\nof the rest of the mesh.\nEach domains for each Virtual Services contained in a Virtual Mesh cannot appear more than once, or the Virtual Mesh\nwill be invalid.",
"description": "A Virtual Mesh is a container for a set of Virtual Services that will be used to generate a single proxy config\nto be applied to one or more Envoy nodes. The Virtual Mesh is best understood as an in-mesh application's localized view\nof the rest of the mesh.\nEach domain for each Virtual Service contained in a Virtual Mesh cannot appear more than once, or the Virtual Mesh\nwill be invalid.",
"hasExtensions": false,
"hasFields": true,
"extensions": [],
Expand All @@ -386,7 +386,7 @@
},
{
"name": "virtual_services",
"description": "the list of names of the virtual services this vmesh includes.",
"description": "the list of names of the virtual services this vmesh encapsulates.",
"label": "repeated",
"type": "string",
"longType": "string",
Expand Down
2 changes: 1 addition & 1 deletion example/nats/envoy.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

node:
cluster: ingress
id: ingress-1
id: ingress~1
static_resources:
clusters:
- name: xds_cluster
Expand Down
2 changes: 1 addition & 1 deletion hack/run-local-binaries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ GLOO_IP=localhost
cat > ${CONFIG_DIR}/envoy.yaml <<EOF
node:
cluster: ingress
id: ingress
id: ingress~1

static_resources:
clusters:
Expand Down
2 changes: 1 addition & 1 deletion hack/run-local-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ CONTROL_PLANE_IP=localhost
cat > ${CONFIG_DIR}/envoy.yaml <<EOF
node:
cluster: ingress
id: ingress
id: ingress~1

static_resources:
clusters:
Expand Down
2 changes: 1 addition & 1 deletion hack/run-local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ GLOO_IP=127.0.0.1
cat > ${CONFIG_DIR}/envoy.yaml <<EOF
node:
cluster: ingress
id: ingress
id: ingress~1

static_resources:
clusters:
Expand Down
2 changes: 1 addition & 1 deletion install/helm/gloo/templates/ingress-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ data:
envoy.yaml: |
node:
cluster: ingress
id: NODE_ID_PLACE_HOLDER
id: ingress~NODE_ID_PLACE_HOLDER
static_resources:
clusters:
- name: xds_cluster
Expand Down
2 changes: 1 addition & 1 deletion install/kube/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ data:
envoy.yaml: |
node:
cluster: ingress
id: NODE_ID_PLACE_HOLDER
id: ingress~NODE_ID_PLACE_HOLDER
static_resources:
clusters:
- name: xds_cluster
Expand Down
2 changes: 1 addition & 1 deletion install/nomad/install.nomad
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ job "gloo" {
data = <<EOF
node:
cluster: ingress
id: {{ env "NOMAD_ALLOC_ID" }}
id: ingress~{{ env "NOMAD_ALLOC_ID" }}

static_resources:
clusters:
Expand Down
2 changes: 1 addition & 1 deletion install/openshift/install.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ data:
envoy.yaml: |
node:
cluster: ingress
id: NODE_ID_PLACE_HOLDER
id: ingress~NODE_ID_PLACE_HOLDER
static_resources:
clusters:
- name: xds_cluster
Expand Down
6 changes: 3 additions & 3 deletions internal/control-plane/bootstrap/flags/ingress_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

func AddIngressFlags(cmd *cobra.Command, opts *bootstrap.Options) {
// TODO ingress.bind-adress
cmd.PersistentFlags().StringVar(&opts.IngressOptions.BindAddress, "envoy.bind-adress", "", "The address that the ingress envoy should bind to.")
cmd.PersistentFlags().IntVar(&opts.IngressOptions.Port, "envoy.port", 8080, "The HTTP port envoy uses.")
cmd.PersistentFlags().IntVar(&opts.IngressOptions.SecurePort, "envoy.secure-port", 8443, "The HTTPS port envoy uses.")
cmd.PersistentFlags().StringVar(&opts.IngressOptions.BindAddress, "envoy.bind-adress", "::", "The address that the ingress envoy should bind to.")
cmd.PersistentFlags().Uint32Var(&opts.IngressOptions.Port, "envoy.port", 8080, "The HTTP port envoy uses.")
cmd.PersistentFlags().Uint32Var(&opts.IngressOptions.SecurePort, "envoy.secure-port", 8443, "The HTTPS port envoy uses.")
}
4 changes: 2 additions & 2 deletions internal/control-plane/bootstrap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ type Options struct {

type IngressOptions struct {
BindAddress string
Port int
SecurePort int
Port uint32
SecurePort uint32
}
33 changes: 32 additions & 1 deletion internal/control-plane/configwatcher/config_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,17 @@ func NewConfigWatcher(storageClient storage.Interface) (*configWatcher, error) {
log.Warnf("Startup: failed to read virtual services from storage: %v", err)
initialVirtualServices = []*v1.VirtualService{}
}
initialVirtualMeshes, err := storageClient.V1().VirtualMeshes().List()
if err != nil {
log.Warnf("Startup: failed to read virtual services from storage: %v", err)
initialVirtualMeshes = []*v1.VirtualMesh{}
}
configs := make(chan *v1.Config)
// do a first time read
cache := &v1.Config{
Upstreams: initialUpstreams,
VirtualServices: initialVirtualServices,
VirtualMeshes: initialVirtualMeshes,
}
// throw it down the channel to get things going
go func() {
Expand Down Expand Up @@ -67,6 +73,7 @@ func NewConfigWatcher(storageClient storage.Interface) (*configWatcher, error) {
if err != nil {
return nil, errors.Wrap(err, "failed to create watcher for upstreams")
}

syncvServices := func(updatedList []*v1.VirtualService, _ *v1.VirtualService) {
sort.SliceStable(updatedList, func(i, j int) bool {
return updatedList[i].GetName() < updatedList[j].GetName()
Expand All @@ -90,8 +97,32 @@ func NewConfigWatcher(storageClient storage.Interface) (*configWatcher, error) {
return nil, errors.Wrap(err, "failed to create watcher for virtualservices")
}


syncvMeshes := func(updatedList []*v1.VirtualMesh, _ *v1.VirtualMesh) {
sort.SliceStable(updatedList, func(i, j int) bool {
return updatedList[i].GetName() < updatedList[j].GetName()
})

diff, equal := messagediff.PrettyDiff(cache.VirtualMeshes, updatedList)
if equal {
return
}
log.GreyPrintf("change detected in virtualservices: %v", diff)

cache.VirtualMeshes = updatedList
configs <- cache
}
vMeshWatcher, err := storageClient.V1().VirtualMeshes().Watch(&storage.VirtualMeshEventHandlerFuncs{
AddFunc: syncvMeshes,
UpdateFunc: syncvMeshes,
DeleteFunc: syncvMeshes,
})
if err != nil {
return nil, errors.Wrap(err, "failed to create watcher for virtualservices")
}

return &configWatcher{
watchers: []*storage.Watcher{vServiceWatcher, upstreamWatcher},
watchers: []*storage.Watcher{vServiceWatcher, vMeshWatcher, upstreamWatcher},
configs: configs,
errs: make(chan error),
}, nil
Expand Down
61 changes: 60 additions & 1 deletion internal/control-plane/configwatcher/crd_config_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,36 @@ var _ = Describe("KubeConfigWatcher", func() {
TeardownKube(namespace)
})
Describe("controller", func() {
It("watches kube crds", func() {
It("watches kube upstream crds", func() {
cfg, err := clientcmd.BuildConfigFromFlags(masterUrl, kubeconfigPath)
Expect(err).NotTo(HaveOccurred())

storageClient, err := crd.NewStorage(cfg, namespace, time.Second)
Expect(err).NotTo(HaveOccurred())

watcher, err := NewConfigWatcher(storageClient)
Must(err)
go func() { watcher.Run(make(chan struct{})) }()

upstream := NewTestUpstream1()
created, err := storageClient.V1().Upstreams().Create(upstream)
Expect(err).NotTo(HaveOccurred())

// give controller time to register
time.Sleep(time.Second * 2)

select {
case <-time.After(time.Second * 5):
Expect(fmt.Errorf("expected to have received resource event before 5s")).NotTo(HaveOccurred())
case cfg := <-watcher.Config():
Expect(len(cfg.Upstreams)).To(Equal(1))
Expect(cfg.Upstreams[0]).To(Equal(created))
Expect(cfg.Upstreams[0].Spec).To(Equal(created.Spec))
case err := <-watcher.Error():
Expect(err).NotTo(HaveOccurred())
}
})
It("watches kube virtual service crds", func() {
cfg, err := clientcmd.BuildConfigFromFlags(masterUrl, kubeconfigPath)
Expect(err).NotTo(HaveOccurred())

Expand Down Expand Up @@ -65,5 +94,35 @@ var _ = Describe("KubeConfigWatcher", func() {
Expect(err).NotTo(HaveOccurred())
}
})
It("watches kube virtual mesh crds", func() {
cfg, err := clientcmd.BuildConfigFromFlags(masterUrl, kubeconfigPath)
Expect(err).NotTo(HaveOccurred())

storageClient, err := crd.NewStorage(cfg, namespace, time.Second)
Expect(err).NotTo(HaveOccurred())

watcher, err := NewConfigWatcher(storageClient)
Must(err)
go func() { watcher.Run(make(chan struct{})) }()

virtualMesh := NewTestVirtualMesh("something", "foo")
created, err := storageClient.V1().VirtualMeshes().Create(virtualMesh)
Expect(err).NotTo(HaveOccurred())

// give controller time to register
time.Sleep(time.Second * 2)

select {
case <-time.After(time.Second * 5):
Expect(fmt.Errorf("expected to have received resource event before 5s")).NotTo(HaveOccurred())
case cfg := <-watcher.Config():
Expect(len(cfg.VirtualMeshes)).To(Equal(1))
Expect(cfg.VirtualMeshes[0]).To(Equal(created))
Expect(len(cfg.VirtualMeshes[0].VirtualServices)).To(Equal(1))
Expect(cfg.VirtualMeshes[0].VirtualServices[0]).To(Equal(created.VirtualServices[0]))
case err := <-watcher.Error():
Expect(err).NotTo(HaveOccurred())
}
})
})
})
21 changes: 8 additions & 13 deletions internal/control-plane/eventloop/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,6 @@ type eventLoop struct {
startFuncs []func() error
}

func translatorConfig(opts bootstrap.Options) translator.TranslatorConfig {
return translator.TranslatorConfig{
IngressBindAddress: opts.IngressOptions.BindAddress,
IngressPort: uint32(opts.IngressOptions.Port),
IngressSecurePort: uint32(opts.IngressOptions.SecurePort),
}
}

func Setup(opts bootstrap.Options, xdsPort int, stop <-chan struct{}) (*eventLoop, error) {
store, err := configstorage.Bootstrap(opts.Options)
if err != nil {
Expand All @@ -63,14 +55,17 @@ func Setup(opts bootstrap.Options, xdsPort int, stop <-chan struct{}) (*eventLoo
return nil, errors.Wrap(err, "failed to set up file watcher")
}

xdsConfig, _, err := xds.RunXDS(xdsPort)
// create a snapshot to give to misconfigured envoy instances
badNodeSnapshot := xds.BadNodeSnapshot(opts.IngressOptions.BindAddress, opts.IngressOptions.Port)

xdsConfig, _, err := xds.RunXDS(xdsPort, badNodeSnapshot)
if err != nil {
return nil, errors.Wrap(err, "failed to start xds server")
}

plugs := plugins.RegisteredPlugins()

trans := translator.NewTranslator(translatorConfig(opts), plugs)
trans := translator.NewTranslator(opts.IngressOptions, plugs)

e := &eventLoop{
configWatcher: cfgWatcher,
Expand Down Expand Up @@ -227,8 +222,8 @@ func (e *eventLoop) updateXds(cache *cache) {
}
}

log.Debugf("FINAL: XDS Snapshot: %v", snapshot)
e.xdsConfig.SetSnapshot(xds.NodeKey, *snapshot)
log.Debugf("Setting xDS Snapshot for Virtual Mesh %v: %v", "ingress", snapshot)
e.xdsConfig.SetSnapshot("ingress", *snapshot)
}

// fan out to cover all endpoint discovery services
Expand Down Expand Up @@ -268,7 +263,7 @@ func (e *eventLoop) errors() <-chan error {
for _, ed := range e.endpointDiscoveries {
go func() {
for err := range ed.Error() {
aggregatedErrorsChan <- err
aggregatedErrorsChan <- errors.Wrap(err, "endpoint discovery encountered an error")
}
}()
}
Expand Down
Loading