Skip to content

Commit

Permalink
follow-up: implement tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
sbueringer committed Jul 10, 2023
1 parent 27d6257 commit 9fde59c
Show file tree
Hide file tree
Showing 41 changed files with 1,132 additions and 9 deletions.
6 changes: 6 additions & 0 deletions controllers/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"time"

oteltrace "go.opentelemetry.io/otel/trace"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -65,6 +66,7 @@ type MachineReconciler struct {
UnstructuredCachingClient client.Client
APIReader client.Reader
Tracker *remote.ClusterCacheTracker
TraceProvider oteltrace.TracerProvider

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
Expand All @@ -79,6 +81,7 @@ func (r *MachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag
UnstructuredCachingClient: r.UnstructuredCachingClient,
APIReader: r.APIReader,
Tracker: r.Tracker,
TraceProvider: r.TraceProvider,
WatchFilterValue: r.WatchFilterValue,
NodeDrainClientTimeout: r.NodeDrainClientTimeout,
}).SetupWithManager(ctx, mgr, options)
Expand Down Expand Up @@ -150,6 +153,8 @@ type ClusterTopologyReconciler struct {

RuntimeClient runtimeclient.Client

TraceProvider oteltrace.TracerProvider

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string

Expand All @@ -164,6 +169,7 @@ func (r *ClusterTopologyReconciler) SetupWithManager(ctx context.Context, mgr ct
APIReader: r.APIReader,
RuntimeClient: r.RuntimeClient,
UnstructuredCachingClient: r.UnstructuredCachingClient,
TraceProvider: r.TraceProvider,
WatchFilterValue: r.WatchFilterValue,
}).SetupWithManager(ctx, mgr, options)
}
Expand Down
7 changes: 7 additions & 0 deletions controllers/external/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
traceutil "sigs.k8s.io/cluster-api/internal/util/trace"
)

// Get uses the client and reference to get an external, unstructured object.
func Get(ctx context.Context, c client.Reader, ref *corev1.ObjectReference, namespace string) (*unstructured.Unstructured, error) {
ctx, span := traceutil.Start(ctx, "external.Get")
defer span.End()

if ref == nil {
return nil, errors.Errorf("cannot get object - object reference not set")
}
Expand All @@ -48,6 +52,9 @@ func Get(ctx context.Context, c client.Reader, ref *corev1.ObjectReference, name

// Delete uses the client and reference to delete an external, unstructured object.
func Delete(ctx context.Context, c client.Writer, ref *corev1.ObjectReference) error {
ctx, span := traceutil.Start(ctx, "external.Delete")
defer span.End()

obj := new(unstructured.Unstructured)
obj.SetAPIVersion(ref.APIVersion)
obj.SetKind(ref.Kind)
Expand Down
12 changes: 12 additions & 0 deletions controllers/remote/cluster_cache_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/go-logr/logr"
"github.com/pkg/errors"
oteltrace "go.opentelemetry.io/otel/trace"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -97,6 +98,8 @@ type ClusterCacheTracker struct {
// This information will be used to detected if the controller is running on a workload cluster, so
// that we can then access the apiserver directly.
controllerPodMetadata *metav1.ObjectMeta

traceProvider oteltrace.TracerProvider
}

// ClusterCacheTrackerOptions defines options to configure
Expand All @@ -121,6 +124,8 @@ type ClusterCacheTrackerOptions struct {
// This is used to calculate the user agent string.
// If not set, it defaults to "cluster-cache-tracker".
ControllerName string

TraceProvider oteltrace.TracerProvider
}

func setDefaultOptions(opts *ClusterCacheTrackerOptions) {
Expand All @@ -135,6 +140,10 @@ func setDefaultOptions(opts *ClusterCacheTrackerOptions) {
&corev1.Secret{},
}
}

if opts.TraceProvider == nil {
opts.TraceProvider = oteltrace.NewNoopTracerProvider()
}
}

// NewClusterCacheTracker creates a new ClusterCacheTracker.
Expand Down Expand Up @@ -166,6 +175,7 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt
controllerPodMetadata: controllerPodMetadata,
log: *options.Log,
clientUncachedObjects: options.ClientUncachedObjects,
traceProvider: options.TraceProvider,
client: manager.GetClient(),
secretCachingClient: options.SecretCachingClient,
scheme: manager.GetScheme(),
Expand Down Expand Up @@ -294,6 +304,8 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
if err != nil {
return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String())
}
// FIXME: this seems to lead to problems with spans (random 10s spans in the trace)
// config.Wrap(tracing.WrapperFor(t.traceProvider)) //nolint:gocritic

// Create a client and a cache for the cluster.
c, uncachedClient, cache, err := t.createClient(ctx, config, cluster, indexes)
Expand Down
3 changes: 3 additions & 0 deletions controlplane/kubeadm/controllers/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"time"

"go.opentelemetry.io/otel/trace"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -33,6 +34,7 @@ type KubeadmControlPlaneReconciler struct {
Client client.Client
SecretCachingClient client.Client
Tracker *remote.ClusterCacheTracker
TraceProvider trace.TracerProvider

EtcdDialTimeout time.Duration
EtcdCallTimeout time.Duration
Expand All @@ -47,6 +49,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg
Client: r.Client,
SecretCachingClient: r.SecretCachingClient,
Tracker: r.Tracker,
TraceProvider: r.TraceProvider,
EtcdDialTimeout: r.EtcdDialTimeout,
EtcdCallTimeout: r.EtcdCallTimeout,
WatchFilterValue: r.WatchFilterValue,
Expand Down
16 changes: 16 additions & 0 deletions controlplane/kubeadm/internal/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/remote"
expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
traceutil "sigs.k8s.io/cluster-api/internal/util/trace"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/secret"
)
Expand Down Expand Up @@ -84,11 +85,17 @@ func (m *Management) List(ctx context.Context, list client.ObjectList, opts ...c
// GetMachinesForCluster returns a list of machines that can be filtered or not.
// If no filter is supplied then all machines associated with the target cluster are returned.
func (m *Management) GetMachinesForCluster(ctx context.Context, cluster *clusterv1.Cluster, filters ...collections.Func) (collections.Machines, error) {
ctx, span := traceutil.Start(ctx, "Management.GetMachinesForCluster")
defer span.End()

return collections.GetFilteredMachinesForCluster(ctx, m.Client, cluster, filters...)
}

// GetMachinePoolsForCluster returns a list of machine pools owned by the cluster.
func (m *Management) GetMachinePoolsForCluster(ctx context.Context, cluster *clusterv1.Cluster) (*expv1.MachinePoolList, error) {
ctx, span := traceutil.Start(ctx, "Management.GetMachinesForCluster")
defer span.End()

selectors := []client.ListOption{
client.InNamespace(cluster.GetNamespace()),
client.MatchingLabels{
Expand All @@ -103,6 +110,9 @@ func (m *Management) GetMachinePoolsForCluster(ctx context.Context, cluster *clu
// GetWorkloadCluster builds a cluster object.
// The cluster comes with an etcd client generator to connect to any etcd pod living on a managed machine.
func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.ObjectKey) (WorkloadCluster, error) {
ctx, span := traceutil.Start(ctx, "Management.GetWorkloadCluster")
defer span.End()

// TODO(chuckha): Inject this dependency.
// TODO(chuckha): memoize this function. The workload client only exists as long as a reconciliation loop.
restConfig, err := m.Tracker.GetRESTConfig(ctx, clusterKey)
Expand Down Expand Up @@ -178,6 +188,9 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O
}

func (m *Management) getEtcdCAKeyPair(ctx context.Context, clusterKey client.ObjectKey) ([]byte, []byte, error) {
ctx, span := traceutil.Start(ctx, "Management.getEtcdCAKeyPair")
defer span.End()

etcdCASecret := &corev1.Secret{}
etcdCAObjectKey := client.ObjectKey{
Namespace: clusterKey.Namespace,
Expand Down Expand Up @@ -207,6 +220,9 @@ func (m *Management) getEtcdCAKeyPair(ctx context.Context, clusterKey client.Obj
}

func (m *Management) getAPIServerEtcdClientCert(ctx context.Context, clusterKey client.ObjectKey) (tls.Certificate, error) {
ctx, span := traceutil.Start(ctx, "Management.getAPIServerEtcdClientCert")
defer span.End()

apiServerEtcdClientCertificateSecret := &corev1.Secret{}
apiServerEtcdClientCertificateObjectKey := client.ObjectKey{
Namespace: clusterKey.Namespace,
Expand Down
7 changes: 7 additions & 0 deletions controlplane/kubeadm/internal/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/external"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
traceutil "sigs.k8s.io/cluster-api/internal/util/trace"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/failuredomains"
"sigs.k8s.io/cluster-api/util/patch"
Expand Down Expand Up @@ -58,6 +59,9 @@ type ControlPlane struct {

// NewControlPlane returns an instantiated ControlPlane.
func NewControlPlane(ctx context.Context, managementCluster ManagementCluster, client client.Client, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, ownedMachines collections.Machines) (*ControlPlane, error) {
ctx, span := traceutil.Start(ctx, "NewControlPlane")
defer span.End()

infraObjects, err := getInfraResources(ctx, client, ownedMachines)
if err != nil {
return nil, err
Expand Down Expand Up @@ -255,6 +259,9 @@ func (c *ControlPlane) HasUnhealthyMachine() bool {

// PatchMachines patches all the machines conditions.
func (c *ControlPlane) PatchMachines(ctx context.Context) error {
ctx, span := traceutil.Start(ctx, "ControlPlane.PatchMachines")
defer span.End()

errList := []error{}
for i := range c.Machines {
machine := c.Machines[i]
Expand Down
38 changes: 37 additions & 1 deletion controlplane/kubeadm/internal/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/blang/semver"
"github.com/pkg/errors"
oteltrace "go.opentelemetry.io/otel/trace"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -47,6 +48,7 @@ import (
"sigs.k8s.io/cluster-api/feature"
"sigs.k8s.io/cluster-api/internal/contract"
"sigs.k8s.io/cluster-api/internal/util/ssa"
traceutil "sigs.k8s.io/cluster-api/internal/util/trace"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/collections"
Expand Down Expand Up @@ -76,6 +78,7 @@ type KubeadmControlPlaneReconciler struct {
controller controller.Controller
recorder record.EventRecorder
Tracker *remote.ClusterCacheTracker
TraceProvider oteltrace.TracerProvider

EtcdDialTimeout time.Duration
EtcdCallTimeout time.Duration
Expand All @@ -95,6 +98,10 @@ type KubeadmControlPlaneReconciler struct {
}

func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
if r.TraceProvider == nil {
r.TraceProvider = oteltrace.NewNoopTracerProvider()
}
tr := traceutil.Reconciler(r, r.TraceProvider, "kubeadmcontrolplane", "KubeadmControlPlane")
c, err := ctrl.NewControllerManagedBy(mgr).
For(&controlplanev1.KubeadmControlPlane{}).
Owns(&clusterv1.Machine{}).
Expand All @@ -109,7 +116,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(ctx context.Context, mg
predicates.ClusterUnpausedAndInfrastructureReady(ctrl.LoggerFrom(ctx)),
),
),
).Build(r)
).Build(tr)
if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
}
Expand Down Expand Up @@ -301,6 +308,9 @@ func (r *KubeadmControlPlaneReconciler) initControlPlaneScope(ctx context.Contex
}

func patchKubeadmControlPlane(ctx context.Context, patchHelper *patch.Helper, kcp *controlplanev1.KubeadmControlPlane) error {
ctx, span := traceutil.Start(ctx, "patchKubeadmControlPlane")
defer span.End()

// Always update the readyCondition by summarizing the state of other conditions.
conditions.SetSummary(kcp,
conditions.WithConditions(
Expand Down Expand Up @@ -332,6 +342,9 @@ func patchKubeadmControlPlane(ctx context.Context, patchHelper *patch.Helper, kc

// reconcile handles KubeadmControlPlane reconciliation.
func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, controlPlane *internal.ControlPlane) (res ctrl.Result, reterr error) {
ctx, span := traceutil.Start(ctx, "kubeadmcontrolplane.Reconciler.reconcile")
defer span.End()

log := ctrl.LoggerFrom(ctx)
log.Info("Reconcile KubeadmControlPlane")

Expand Down Expand Up @@ -507,6 +520,9 @@ func (r *KubeadmControlPlaneReconciler) reconcileClusterCertificates(ctx context
// The implementation does not take non-control plane workloads into consideration. This may or may not change in the future.
// Please see https://github.com/kubernetes-sigs/cluster-api/issues/2064.
func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, controlPlane *internal.ControlPlane) (ctrl.Result, error) {
ctx, span := traceutil.Start(ctx, "kubeadmcontrolplane.Reconciler.reconcileDelete")
defer span.End()

log := ctrl.LoggerFrom(ctx)
log.Info("Reconcile KubeadmControlPlane deletion")

Expand Down Expand Up @@ -594,6 +610,9 @@ func (r *KubeadmControlPlaneReconciler) ClusterToKubeadmControlPlane(_ context.C
// Otherwise, fields would be co-owned by our "old" "manager" and "capi-kubeadmcontrolplane" and then we would not be
// able to e.g. drop labels and annotations.
func (r *KubeadmControlPlaneReconciler) syncMachines(ctx context.Context, controlPlane *internal.ControlPlane) error {
ctx, span := traceutil.Start(ctx, "kubeadmcontrolplane.Reconciler.syncMachines")
defer span.End()

patchHelpers := map[string]*patch.Helper{}
for machineName := range controlPlane.Machines {
m := controlPlane.Machines[machineName]
Expand Down Expand Up @@ -677,6 +696,9 @@ func (r *KubeadmControlPlaneReconciler) syncMachines(ctx context.Context, contro
// reconcileControlPlaneConditions is responsible of reconciling conditions reporting the status of static pods and
// the status of the etcd cluster.
func (r *KubeadmControlPlaneReconciler) reconcileControlPlaneConditions(ctx context.Context, controlPlane *internal.ControlPlane) error {
ctx, span := traceutil.Start(ctx, "kubeadmcontrolplane.Reconciler.reconcileControlPlaneConditions")
defer span.End()

// If the cluster is not yet initialized, there is no way to connect to the workload cluster and fetch information
// for updating conditions. Return early.
if !controlPlane.KCP.Status.Initialized {
Expand Down Expand Up @@ -706,6 +728,9 @@ func (r *KubeadmControlPlaneReconciler) reconcileControlPlaneConditions(ctx cont
//
// NOTE: this func uses KCP conditions, it is required to call reconcileControlPlaneConditions before this.
func (r *KubeadmControlPlaneReconciler) reconcileEtcdMembers(ctx context.Context, controlPlane *internal.ControlPlane) error {
ctx, span := traceutil.Start(ctx, "kubeadmcontrolplane.Reconciler.reconcileEtcdMembers")
defer span.End()

log := ctrl.LoggerFrom(ctx)

// If etcd is not managed by KCP this is a no-op.
Expand Down Expand Up @@ -758,6 +783,9 @@ func (r *KubeadmControlPlaneReconciler) reconcileEtcdMembers(ctx context.Context
}

func (r *KubeadmControlPlaneReconciler) reconcileCertificateExpiries(ctx context.Context, controlPlane *internal.ControlPlane) error {
ctx, span := traceutil.Start(ctx, "kubeadmcontrolplane.Reconciler.reconcileCertificateExpiries")
defer span.End()

log := ctrl.LoggerFrom(ctx)

// Return if there are no KCP-owned control-plane machines.
Expand Down Expand Up @@ -828,6 +856,9 @@ func (r *KubeadmControlPlaneReconciler) reconcileCertificateExpiries(ctx context
}

func (r *KubeadmControlPlaneReconciler) adoptMachines(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, machines collections.Machines, cluster *clusterv1.Cluster) error {
ctx, span := traceutil.Start(ctx, "kubeadmcontrolplane.Reconciler.adoptMachines")
defer span.End()

// We do an uncached full quorum read against the KCP to avoid re-adopting Machines the garbage collector just intentionally orphaned
// See https://github.com/kubernetes/kubernetes/issues/42639
uncached := controlplanev1.KubeadmControlPlane{}
Expand Down Expand Up @@ -905,6 +936,9 @@ func (r *KubeadmControlPlaneReconciler) adoptMachines(ctx context.Context, kcp *
}

func (r *KubeadmControlPlaneReconciler) adoptOwnedSecrets(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, currentOwner *bootstrapv1.KubeadmConfig, clusterName string) error {
ctx, span := traceutil.Start(ctx, "kubeadmcontrolplane.Reconciler.adoptOwnedSecrets")
defer span.End()

secrets := corev1.SecretList{}
if err := r.Client.List(ctx, &secrets, client.InNamespace(kcp.Namespace), client.MatchingLabels{clusterv1.ClusterNameLabel: clusterName}); err != nil {
return errors.Wrap(err, "error finding secrets for adoption")
Expand Down Expand Up @@ -941,6 +975,8 @@ func (r *KubeadmControlPlaneReconciler) adoptOwnedSecrets(ctx context.Context, k

// ensureCertificatesOwnerRef ensures an ownerReference to the owner is added on the Secrets holding certificates.
func (r *KubeadmControlPlaneReconciler) ensureCertificatesOwnerRef(ctx context.Context, certificates secret.Certificates, owner metav1.OwnerReference) error {
ctx, span := traceutil.Start(ctx, "kubeadmcontrolplane.Reconciler.ensureCertificatesOwnerRef")
defer span.End()
for _, c := range certificates {
if c.Secret == nil {
continue
Expand Down
Loading

0 comments on commit 9fde59c

Please sign in to comment.