Skip to content

Commit

Permalink
ambient: support explicit EndpointSlice endpoints (istio#51591)
Browse files Browse the repository at this point in the history
* ambient: support reading workloads from EndpointSlice

This is for direct endpoint-slice objects. Note this is required after istio/ztunnel#1137
otherwise kubernetes.default.svc is broken. However, this is useful
beyond that for correctness.

* add comments
  • Loading branch information
howardjohn committed Jun 24, 2024
1 parent cc059c4 commit 99d3d4c
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"sigs.k8s.io/gateway-api/apis/v1beta1"

networkingclient "istio.io/client-go/pkg/apis/networking/v1alpha3"
Expand Down Expand Up @@ -149,6 +150,10 @@ func New(options Options) Index {
// TODO: Should this go ahead and transform the full ns into some intermediary with just the details we care about?
Namespaces := krt.NewInformer[*v1.Namespace](options.Client, krt.WithName("Namespaces"))

EndpointSlices := krt.NewInformerFiltered[*discovery.EndpointSlice](options.Client, kclient.Filter{
ObjectFilter: options.Client.ObjectFilter(),
}, krt.WithName("EndpointSlices"))

MeshConfig := MeshConfigCollection(ConfigMaps, options)
Waypoints := WaypointsCollection(Gateways, GatewayClasses, Pods)

Expand Down Expand Up @@ -202,7 +207,7 @@ func New(options Options) Index {
WorkloadServices,
WorkloadEntries,
ServiceEntries,
AllPolicies,
EndpointSlices,
Namespaces,
)
WorkloadAddressIndex := krt.NewIndex[model.WorkloadInfo, networkAddress](Workloads, networkAddressFromWorkload)
Expand Down
149 changes: 147 additions & 2 deletions pilot/pkg/serviceregistry/kube/controller/ambient/workloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"net/netip"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"istio.io/api/label"
Expand All @@ -27,16 +28,19 @@ import (
securityclient "istio.io/client-go/pkg/apis/security/v1beta1"
"istio.io/istio/pilot/pkg/features"
"istio.io/istio/pilot/pkg/model"
"istio.io/istio/pilot/pkg/serviceregistry/kube"
labelutil "istio.io/istio/pilot/pkg/serviceregistry/util/label"
"istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/config/labels"
"istio.io/istio/pkg/config/schema/gvk"
"istio.io/istio/pkg/config/schema/kind"
kubeutil "istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/krt"
kubelabels "istio.io/istio/pkg/kube/labels"
"istio.io/istio/pkg/log"
"istio.io/istio/pkg/ptr"
"istio.io/istio/pkg/slices"
"istio.io/istio/pkg/util/sets"
"istio.io/istio/pkg/workloadapi"
)

Expand All @@ -50,7 +54,7 @@ func (a *index) WorkloadsCollection(
WorkloadServices krt.Collection[model.ServiceInfo],
WorkloadEntries krt.Collection[*networkingclient.WorkloadEntry],
ServiceEntries krt.Collection[*networkingclient.ServiceEntry],
AllPolicies krt.Collection[model.WorkloadAuthorization],
EndpointSlices krt.Collection[*discovery.EndpointSlice],
Namespaces krt.Collection[*v1.Namespace],
) krt.Collection[model.WorkloadInfo] {
WorkloadServicesNamespaceIndex := krt.NewNamespaceIndex(WorkloadServices)
Expand All @@ -69,7 +73,17 @@ func (a *index) WorkloadsCollection(
a.serviceEntryWorkloadBuilder(MeshConfig, AuthorizationPolicies, PeerAuths, Waypoints, Namespaces),
krt.WithName("ServiceEntryWorkloads"),
)
Workloads := krt.JoinCollection([]krt.Collection[model.WorkloadInfo]{PodWorkloads, WorkloadEntryWorkloads, ServiceEntryWorkloads}, krt.WithName("Workloads"))
EndpointSliceWorkloads := krt.NewManyCollection(
EndpointSlices,
a.endpointSlicesBuilder(MeshConfig, WorkloadServices),
krt.WithName("EndpointSliceWorkloads"))

Workloads := krt.JoinCollection([]krt.Collection[model.WorkloadInfo]{
PodWorkloads,
WorkloadEntryWorkloads,
ServiceEntryWorkloads,
EndpointSliceWorkloads,
}, krt.WithName("Workloads"))
return Workloads
}

Expand Down Expand Up @@ -337,6 +351,137 @@ func (a *index) serviceEntryWorkloadBuilder(
}
}

func (a *index) endpointSlicesBuilder(
MeshConfig krt.Singleton[MeshConfig],
WorkloadServices krt.Collection[model.ServiceInfo],
) krt.TransformationMulti[*discovery.EndpointSlice, model.WorkloadInfo] {
return func(ctx krt.HandlerContext, es *discovery.EndpointSlice) []model.WorkloadInfo {
// EndpointSlices carry port information and a list of IPs.
// We only care about EndpointSlices that are for a Service.
// Otherwise, it is just an arbitrary bag of IP addresses for some user-specific purpose, which doesn't have a clear
// usage for us (if it had some additional info like service account, etc, then perhaps it would be useful).
serviceName, f := es.Labels[discovery.LabelServiceName]
if !f {
return nil
}
if es.AddressType == discovery.AddressTypeFQDN {
// Currently we do not support FQDN. In theory, we could, but its' support in Kubernetes entirely is questionable and
// may be removed in the near future.
return nil
}
var res []model.WorkloadInfo
seen := sets.New[string]()
meshCfg := krt.FetchOne(ctx, MeshConfig.AsCollection())

// The slice must be for a single service, based on the label above.
serviceKey := es.Namespace + "/" + string(kube.ServiceHostname(serviceName, es.Namespace, a.DomainSuffix))
svcs := krt.Fetch(ctx, WorkloadServices, krt.FilterKey(serviceKey), krt.FilterGeneric(func(a any) bool {
// Only find Service, not Service Entry
return a.(model.ServiceInfo).Source == kind.Service
}))
if len(svcs) == 0 {
// no service found
return nil
}
// There SHOULD only be one. This is only Service which has unique hostnames.
svc := svcs[0]

// Translate slice ports to our port.
pl := &workloadapi.PortList{Ports: make([]*workloadapi.Port, 0, len(es.Ports))}
for _, p := range es.Ports {
// We must have name and port (Kubernetes should always set these)
if p.Name == nil {
continue
}
if p.Port == nil {
continue
}
// We only support TCP for now
if p.Protocol == nil || *p.Protocol != v1.ProtocolTCP {
continue
}
// Endpoint slice port has name (service port name, not containerPort) and port (targetPort)
// We need to join with the Service port list to translate the port name to
for _, svcPort := range svc.Ports {
portName := svc.PortNames[int32(svcPort.ServicePort)]
if portName.PortName != *p.Name {
continue
}
pl.Ports = append(pl.Ports, &workloadapi.Port{
ServicePort: svcPort.ServicePort,
TargetPort: uint32(*p.Port),
})
break
}
}
services := map[string]*workloadapi.PortList{
serviceKey: pl,
}

// Each endpoint in the slice is going to create a Workload
for _, ep := range es.Endpoints {
if ep.TargetRef != nil && ep.TargetRef.Kind == gvk.Pod.Kind {
// Normal case; this is a slice for a pod. We already handle pods, with much more information, so we can skip them
continue
}
// This should not be possible
if len(ep.Addresses) == 0 {
continue
}
// We currently only support 1 address. Kubernetes will never set more (IPv4 and IPv6 will be two slices), so its mostly undefined.
key := ep.Addresses[0]
if seen.InsertContains(key) {
// Shouldn't happen. Make sure our UID is actually unique
log.Warnf("IP address %v seen twice in %v/%v", key, es.Namespace, es.Name)
continue
}
health := workloadapi.WorkloadStatus_UNHEALTHY
if ep.Conditions.Ready == nil || *ep.Conditions.Ready {
health = workloadapi.WorkloadStatus_HEALTHY
}
// Translate our addresses.
// Note: users may put arbitrary addresses here. It is recommended by Kubernetes to not
// give untrusted users EndpointSlice write access.
addresses, err := slices.MapErr(ep.Addresses, func(e string) ([]byte, error) {
n, err := netip.ParseAddr(e)
if err != nil {
log.Warnf("invalid address in endpointslice %v: %v", e, err)
return nil, err
}
return n.AsSlice(), nil
})
if err != nil {
// If any invalid, skip
continue
}
w := &workloadapi.Workload{
Uid: a.ClusterID.String() + "/discovery.k8s.io/EndpointSlice/" + es.Namespace + "/" + es.Name + "/" + key,
Name: es.Name,
Namespace: es.Namespace,
Addresses: addresses,
Hostname: "",
Network: a.Network(key, nil).String(),
TrustDomain: pickTrustDomain(meshCfg),
Services: services,
Status: health,
ClusterId: string(a.ClusterID),
AuthorizationPolicies: nil, // Not support. This can only be used for outbound, so not relevant
ServiceAccount: "", // Unknown. TODO: make this possible to express in ztunnel
Waypoint: nil, // Not supported. In theory, we could allow it as an EndpointSlice label, but there is no real use case.
Locality: nil, // Not supported. We could maybe, there is a "zone", but it doesn't seem to be well supported
}
res = append(res, model.WorkloadInfo{
Workload: w,
Labels: nil,
Source: kind.EndpointSlice,
CreationTime: es.CreationTimestamp.Time,
})
}

return res
}
}

func setTunnelProtocol(labels, annotations map[string]string, w *workloadapi.Workload) {
if annotations[constants.AmbientRedirection] == constants.AmbientRedirectionEnabled {
// Configured for override
Expand Down

0 comments on commit 99d3d4c

Please sign in to comment.