Skip to content

Commit

Permalink
ep
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenctl committed Aug 27, 2024
1 parent 7de5786 commit 08a5322
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 50 deletions.
2 changes: 2 additions & 0 deletions projects/gloo/pkg/plugins/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/solo-io/gloo/projects/gloo/pkg/plugins/tunneling"
"github.com/solo-io/gloo/projects/gloo/pkg/plugins/upstreamconn"
"github.com/solo-io/gloo/projects/gloo/pkg/plugins/virtualhost"
"github.com/solo-io/gloo/projects/gloo/pkg/upstreams/serviceentry"
"github.com/solo-io/gloo/projects/gloo/pkg/utils"
"github.com/solo-io/go-utils/contextutils"
)
Expand Down Expand Up @@ -114,6 +115,7 @@ func Plugins(opts bootstrap.Opts) []plugins.Plugin {

if opts.KubeClient != nil {
glooPlugins = append(glooPlugins, kubernetes.NewPlugin(opts.KubeClient, opts.KubeCoreCache))
glooPlugins = append(glooPlugins, serviceentry.NewPlugin(opts.KubeClient, opts.KubeCoreCache))
}
if opts.Consul.ConsulWatcher != nil {
glooPlugins = append(glooPlugins, consul.NewPlugin(opts.Consul.ConsulWatcher, consul.NewConsulDnsResolver(opts.Consul.DnsServer), opts.Consul.DnsPollingInterval))
Expand Down
207 changes: 203 additions & 4 deletions projects/gloo/pkg/upstreams/serviceentry/se_ep_plugin.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,218 @@
package serviceentry

import (
"context"
"strings"

"github.com/solo-io/gloo/pkg/utils/settingsutil"
"github.com/solo-io/gloo/projects/gloo/pkg/discovery"
"github.com/solo-io/go-utils/contextutils"

v1 "github.com/solo-io/gloo/projects/gloo/pkg/api/v1"
"github.com/solo-io/solo-kit/pkg/api/v1/clients"
core "github.com/solo-io/solo-kit/pkg/api/v1/resources/core"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"

v1 "github.com/solo-io/gloo/projects/gloo/pkg/api/v1"
networkingclient "istio.io/client-go/pkg/apis/networking/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _ discovery.DiscoveryPlugin = &sePlugin{}

// TODO informers
// type seNsInformer struct {
// serviceEntryInformer cache.SharedIndexInformer
// podInformer cache.SharedIndexInformer
// // TODO workloadentry
// }
//
// type seInformers struct {
// perNamespace map[string]seNsInformer
// }

func (s *sePlugin) listEndpoints(
ctx context.Context,
writeNamespace string,
listOpts metav1.ListOptions,
watchNamespaces []string,
upstreams v1.UpstreamList,
) (v1.EndpointList, error) {
serviceEntries := make(map[string][]*networkingclient.ServiceEntry, len(watchNamespaces))
pods := make(map[string][]corev1.Pod, len(watchNamespaces))
for _, ns := range watchNamespaces {
seList, _ := s.istio.NetworkingV1beta1().ServiceEntries(ns).List(ctx, listOpts)
serviceEntries[ns] = append(serviceEntries[ns], seList.Items...)
podList, _ := s.kube.CoreV1().Pods(ns).List(ctx, listOpts)
pods[ns] = append(pods[ns], podList.Items...)
}

var out v1.EndpointList
for _, us := range upstreams {
kubeUs := us.GetKube()
if kubeUs == nil {
// we pre-filtered, this should never happen
contextutils.LoggerFrom(ctx).DPanicw(
"upstream was not kube",
"namespace", us.GetMetadata().GetNamespace(),
"name", us.GetMetadata().GetName(),
)
continue
}
if !strings.HasPrefix(us.GetMetadata().Name, UpstreamNamePrefix) {
continue
}
var upstreamServiceEntry *networkingclient.ServiceEntry
for _, se := range serviceEntries[us.GetMetadata().GetNamespace()] {
if se.Name == kubeUs.ServiceName && se.Namespace == kubeUs.ServiceNamespace {
upstreamServiceEntry = se
break
}
}
if upstreamServiceEntry == nil {
contextutils.LoggerFrom(ctx).Warn("could not find the associated service entry for this upstream")
continue
}
for _, p := range pods[us.GetMetadata().Namespace] {
selector := labels.Set(kubeUs.Selector).AsSelector()
if !selector.Matches(labels.Set(p.Labels)) {
continue
}
out = append(out, buildPodEndpoint(us, upstreamServiceEntry, p))
}
// TODO workloadentry
}
return out, nil
}

func buildPodEndpoint(us *v1.Upstream, se *networkingclient.ServiceEntry, pod corev1.Pod) *v1.Endpoint {
port := us.GetKube().GetServicePort()
for _, p := range se.Spec.GetPorts() {
if p.GetTargetPort() != 0 && p.GetNumber() == us.GetKube().GetServicePort() {
port = p.GetTargetPort()
break
}
}
return &v1.Endpoint{
Upstreams: []*core.ResourceRef{{
// TODO re-use the same endpoint selected by multiple upstreams
Name: us.GetMetadata().GetName(),
Namespace: us.GetMetadata().GetNamespace(),
}},
Address: pod.Status.PodIP,
Port: port,
// Hostname: "",
Metadata: &core.Metadata{},
}
}

func (s *sePlugin) WatchEndpoints(writeNamespace string, upstreamsToTrack v1.UpstreamList, opts clients.WatchOpts) (<-chan v1.EndpointList, <-chan error, error) {
// kubeUpstreams := slices.Filter(upstreamsToTrack, func(e *v1.Upstream) bool {
// return e.GetKube() != nil
watchNamespaces := s.getWatchNamespaces(upstreamsToTrack)
watchNamespacesSet := sets.NewString(watchNamespaces...)
// kubeUpstreams := filterMap(upstreamsToTrack, func(e *v1.Upstream) *kubernetes.UpstreamSpec {
// return e.GetKube()
// })
panic("todo")

// TODO informers/cache reads
listOpts := buildListOptions(opts.ExpressionSelector, opts.Selector)
podWatch, err := s.kube.CoreV1().Pods(metav1.NamespaceAll).Watch(opts.Ctx, listOpts)
if err != nil {
return nil, nil, err
}
seWatch, err := s.istio.NetworkingV1beta1().ServiceEntries(metav1.NamespaceAll).Watch(opts.Ctx, listOpts)
if err != nil {
return nil, nil, err
}
// TODO workloadentry

endpointsChan := make(chan v1.EndpointList)
errs := make(chan error)

updateResourceList := func() {
list, err := s.listEndpoints(opts.Ctx, writeNamespace, listOpts, watchNamespaces, upstreamsToTrack)
if err != nil {
errs <- err
return
}
select {
case <-opts.Ctx.Done():
return
case endpointsChan <- list:
}
}

go func() {
defer seWatch.Stop()
defer podWatch.Stop()
defer close(endpointsChan)
defer close(errs)

updateResourceList()
for {
select {
case ev, ok := <-seWatch.ResultChan():
if !ok {
return
}
if !watchNamespacesSet.Has(getNamespace(ev.Object)) {
continue
}
updateResourceList()
case ev, ok := <-podWatch.ResultChan():
if !ok {
return
}
if !watchNamespacesSet.Has(getNamespace(ev.Object)) {
continue
}
updateResourceList()
case <-opts.Ctx.Done():
return
}
}
}()

return endpointsChan, errs, nil
}

func getNamespace(obj runtime.Object) string {
metaObj, err := meta.Accessor(obj)
if err != nil {
return ""
}
return metaObj.GetNamespace()
}

func (s *sePlugin) getWatchNamespaces(upstreamsToTrack v1.UpstreamList) []string {
var namespaces []string
if settingsutil.IsAllNamespacesFromSettings(s.settings) {
namespaces = []string{metav1.NamespaceAll}
} else {
nsSet := map[string]bool{}
for _, upstream := range upstreamsToTrack {
svcNs := upstream.GetKube().GetServiceNamespace()
// only care about kube upstreams
if svcNs == "" {
continue
}
nsSet[svcNs] = true
}
for ns := range nsSet {
namespaces = append(namespaces, ns)
}
}

// If there are no upstreams to watch (eg: if discovery is disabled), namespaces remains an empty list.
// When creating the InformerFactory, by convention, an empty namespace list means watch all namespaces.
// To ensure that we only watch what we are supposed to, fallback to WatchNamespaces if namespaces is an empty list.
if len(namespaces) == 0 {
namespaces = s.settings.GetWatchNamespaces()
}

return namespaces
}

// satisfy interface but don't implement; only implement hybrid client for ServiceEntry Upstreams.
Expand Down
2 changes: 1 addition & 1 deletion projects/gloo/pkg/upstreams/serviceentry/se_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type sePlugin struct {
settings *v1.Settings
}

func NewDiscoveryPlugin(kube kubernetes.Interface, kubeCoreCache corecache.KubeCoreCache) plugins.Plugin {
func NewPlugin(kube kubernetes.Interface, kubeCoreCache corecache.KubeCoreCache) plugins.Plugin {
return &sePlugin{
istio: mustBuildIstioClient(),
kube: kube,
Expand Down
46 changes: 1 addition & 45 deletions projects/gloo/pkg/upstreams/serviceentry/se_us_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"slices"
"strings"
"time"

v1 "github.com/solo-io/gloo/projects/gloo/pkg/api/v1"
Expand All @@ -20,9 +19,6 @@ import (
networking "istio.io/api/networking/v1beta1"
networkingclient "istio.io/client-go/pkg/apis/networking/v1beta1"
istioclient "istio.io/client-go/pkg/clientset/versioned"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
)

const UpstreamNamePrefix = "istio-se:"
Expand Down Expand Up @@ -61,6 +57,7 @@ func (s *serviceEntryClient) Watch(namespace string, opts skclients.WatchOpts) (
return
}
upstreams := ConvertUpstreamList(list.Items)
// TODO only push if upstreams != previous
unchanged := !initialized && slices.EqualFunc(upstreams, previous, func(a, b *v1.Upstream) bool {
return a.Equal(b)
})
Expand Down Expand Up @@ -110,22 +107,6 @@ func (s *serviceEntryClient) Watch(namespace string, opts skclients.WatchOpts) (
return us, errs, nil
}

func buildListOptions(expressionSelector string, selector map[string]string) metav1.ListOptions {
if expressionSelector != "" {
return metav1.ListOptions{
LabelSelector: expressionSelector,
}
}
sel := labels.NewSelector()
for k, v := range selector {
req, _ := labels.NewRequirement(k, selection.Equals, strings.Split(v, ","))
sel = sel.Add(*req)
}
return metav1.ListOptions{
LabelSelector: sel.String(),
}
}

func ConvertUpstreamList(servicEntries []*networkingclient.ServiceEntry) v1.UpstreamList {
var out v1.UpstreamList
for _, se := range servicEntries {
Expand Down Expand Up @@ -197,38 +178,13 @@ func buildStatic(se *networkingclient.ServiceEntry, port *networking.ServicePort
}
}

// parseIstioProtocol always gives the all-caps protocol part of a port name.
// Example: http-foobar would be HTTP.
func parseIstioProtocol(protocol string) string {
protocol = strings.ToUpper(protocol)
if idx := strings.Index(protocol, "-"); idx != -1 {
protocol = protocol[:idx]
}
return protocol
}

func isProtocolTLS(protocol string) bool {
p := parseIstioProtocol(protocol)
return p == "HTTPS" || p == "TLS"
}

func wePort(we *networking.WorkloadEntry, svcPort *networking.ServicePort) uint32 {
if we.Ports == nil {
return 0
}
return we.Ports[svcPort.Name]
}

func targetPort(base, svc, ep uint32) uint32 {
if ep > 0 {
return ep
}
if svc > 0 {
return svc
}
return base
}

// We don't actually use the following in thi shim, but we must satisfy the UpstreamClient interface.

const notImplementedErrMsg = "this operation is not supported by this client"
Expand Down
60 changes: 60 additions & 0 deletions projects/gloo/pkg/upstreams/serviceentry/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package serviceentry

import (
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
)

func filterSlice[T any](items []T, f func(T) bool) []T {
var result []T
for _, item := range items {
if f(item) {
result = append(result, item)
}
}
return result
}

func targetPort(base, svc, ep uint32) uint32 {
if ep > 0 {
return ep
}
if svc > 0 {
return svc
}
return base
}

// parseIstioProtocol always gives the all-caps protocol part of a port name.
// Example: http-foobar would be HTTP.
func parseIstioProtocol(protocol string) string {
protocol = strings.ToUpper(protocol)
if idx := strings.Index(protocol, "-"); idx != -1 {
protocol = protocol[:idx]
}
return protocol
}

func isProtocolTLS(protocol string) bool {
p := parseIstioProtocol(protocol)
return p == "HTTPS" || p == "TLS"
}

func buildListOptions(expressionSelector string, selector map[string]string) metav1.ListOptions {
if expressionSelector != "" {
return metav1.ListOptions{
LabelSelector: expressionSelector,
}
}
sel := labels.NewSelector()
for k, v := range selector {
req, _ := labels.NewRequirement(k, selection.Equals, strings.Split(v, ","))
sel = sel.Add(*req)
}
return metav1.ListOptions{
LabelSelector: sel.String(),
}
}

0 comments on commit 08a5322

Please sign in to comment.