Skip to content

Commit

Permalink
[metric] #3 Reset declared_resource to 0 when commit updates
Browse files Browse the repository at this point in the history
The current OpenCensus library keeps a metric stream that is uniquely identified by commit alive even though no new values are issued.

This makes the Otel Collector metricstransform processor receives all time value when calculating the max and gives false output.

The temporary work around is to reset the stream of previous commit to 0 when parser sees new commit.

b/321875474
  • Loading branch information
tiffanny29631 committed Jun 3, 2024
1 parent 83b1a23 commit 75eb21f
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 8 deletions.
71 changes: 71 additions & 0 deletions e2e/testcases/otel_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,54 @@ func TestGCMMetrics(t *testing.T) {
if err != nil {
nt.T.Fatal(err)
}

nt.T.Log("Adding test namespace")
namespace := fake.NamespaceObject("foo")
nt.Must(nt.RootRepos[configsync.RootSyncName].Add("acme/ns.yaml", namespace))
nt.Must(nt.RootRepos[configsync.RootSyncName].CommitAndPush("Adding foo namespace"))
if err := nt.WatchForAllSyncs(); err != nil {
nt.T.Fatal(err)
}

nt.T.Log("Checking resource related metrics after adding test resource")
resourceMetrics := []string{
csmetrics.DeclaredResourcesName,
rgmetrics.ResourceCountName,
rgmetrics.ReadyResourceCountName,
}
_, err = retry.Retry(60*time.Second, func() error {
var err error
for _, metricType := range resourceMetrics {
descriptor := fmt.Sprintf("%s/%s", GCMMetricPrefix, metricType)
it := listMetricInGCM(ctx, nt, client, startTime, descriptor)
err = multierr.Append(err, validateMetricInGCM(nt, it, descriptor, nt.ClusterName, metricHasValue(3)))
}
return err
})
if err != nil {
nt.T.Fatal(err)
}

nt.T.Log("Remove the test resource")
nt.Must(nt.RootRepos[configsync.RootSyncName].Remove("acme/ns.yaml"))
nt.Must(nt.RootRepos[configsync.RootSyncName].CommitAndPush("Remove the test namespace"))
if err := nt.WatchForAllSyncs(); err != nil {
nt.T.Fatal(err)
}

nt.T.Log("Checking resource related metrics after removing test resource")
_, err = retry.Retry(60*time.Second, func() error {
var err error
for _, metricType := range resourceMetrics {
descriptor := fmt.Sprintf("%s/%s", GCMMetricPrefix, metricType)
it := listMetricInGCM(ctx, nt, client, startTime, descriptor)
err = multierr.Append(err, validateMetricInGCM(nt, it, descriptor, nt.ClusterName, metricHasLastestValue(2)))
}
return err
})
if err != nil {
nt.T.Fatal(err)
}
}

// TestOtelCollectorGCMLabelAggregation validates that Google Cloud Monitoring
Expand Down Expand Up @@ -451,6 +499,29 @@ func metricDoesNotHaveLabel(label string) metricValidatorFunc {
}
}

func metricHasValue(value int64) metricValidatorFunc {
return func(serie *monitoringpb.TimeSeries) error {
points := serie.GetPoints()
for _, point := range points {
if point.GetValue().GetInt64Value() == value {
return nil
}
}
return fmt.Errorf("expected metric to have value %v but did not find in response", value)
}
}

func metricHasLastestValue(value int64) metricValidatorFunc {
return func(serie *monitoringpb.TimeSeries) error {
points := serie.GetPoints()
lastPoint := points[len(points)-1]
if lastPoint.GetValue().GetInt64Value() == value {
return nil
}
return fmt.Errorf("expected metric to have latest value %v but did not find in response", value)
}
}

// Validates a metricType from a specific cluster_name can be found within given
// TimeSeries
func validateMetricInGCM(nt *nomostest.NT, it *monitoringv2.TimeSeriesIterator, metricType, clusterName string, valFns ...metricValidatorFunc) error {
Expand Down
13 changes: 13 additions & 0 deletions pkg/declared/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Resources struct {
objectMap *orderedmap.OrderedMap[core.ID, *unstructured.Unstructured]
// commit of the source in which the resources were declared
commit string
// previousCommit is the preceding commit to the commit
previousCommit string
}

// Update performs an atomic update on the resource declaration set.
Expand All @@ -68,6 +70,17 @@ func (r *Resources) Update(ctx context.Context, objects []client.Object, commit

// Record the declared_resources metric, after parsing but before validation.
metrics.RecordDeclaredResources(ctx, commit, len(newObjects))
if r.previousCommit != commit && r.previousCommit != "" {
// Reset the stream value of previous commit to 0 to provide correct input
// for the Otel Collector metricstransform processor's MAX aggregation.
// This is a temporary fix for issue b/321875474, ensuring the metrics
// accurately reflect decreases in declared resources. Eventually we should
// migrate to otel-collector-go to use the async gauge so that older streams
// won't get updated when there's no longer new values.
// TODO: b/339722287
metrics.RecordDeclaredResources(ctx, r.previousCommit, 0)
}
r.previousCommit = commit

previousSet, _ := r.getObjectMap()
if err := deletesAllNamespaces(previousSet, newSet); err != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/declared/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestUpdate(t *testing.T) {
commit := "1"
expectedIDs := getIDs(objects)

newObjects, err := dr.Update(context.Background(), objects, commit)
newObjects, err := dr.Update(context.Background(), objects, commit, "")
if err != nil {
t.Fatalf("unexpected error %v", err)
}
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestMutateImpossible(t *testing.T) {
o2.SetResourceVersion(wantResourceVersion)

expectedCommit := "example"
_, err := dr.Update(context.Background(), []client.Object{o1, o2}, expectedCommit)
_, err := dr.Update(context.Background(), []client.Object{o1, o2}, expectedCommit, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -136,7 +136,7 @@ func asUnstructured(t *testing.T, o client.Object) *unstructured.Unstructured {
func TestDeclarations(t *testing.T) {
dr := Resources{}
expectedCommit := "example"
objects, err := dr.Update(context.Background(), testSet, expectedCommit)
objects, err := dr.Update(context.Background(), testSet, expectedCommit, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestDeclarations(t *testing.T) {
func TestGet(t *testing.T) {
dr := Resources{}
expectedCommit := "example"
_, err := dr.Update(context.Background(), testSet, expectedCommit)
_, err := dr.Update(context.Background(), testSet, expectedCommit, "")
if err != nil {
t.Fatal(err)
}
Expand All @@ -188,7 +188,7 @@ func TestGet(t *testing.T) {
func TestGVKSet(t *testing.T) {
dr := Resources{}
expectedCommit := "example"
_, err := dr.Update(context.Background(), testSet, expectedCommit)
_, err := dr.Update(context.Background(), testSet, expectedCommit, "")
if err != nil {
t.Fatal(err)
}
Expand All @@ -207,7 +207,7 @@ func TestGVKSet(t *testing.T) {
func TestResources_InternalErrorMetricValidation(t *testing.T) {
m := testmetrics.RegisterMetrics(metrics.InternalErrorsView)
dr := Resources{}
if _, err := dr.Update(context.Background(), nilSet, "unused"); err != nil {
if _, err := dr.Update(context.Background(), nilSet, "unused", ""); err != nil {
t.Fatal(err)
}
wantMetrics := []*view.Row{
Expand Down
2 changes: 1 addition & 1 deletion pkg/remediator/reconcile/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func TestRemediator_Reconcile_Metrics(t *testing.T) {
func makeDeclared(t *testing.T, commit string, objs ...client.Object) *declared.Resources {
t.Helper()
d := &declared.Resources{}
if _, err := d.Update(context.Background(), objs, commit); err != nil {
if _, err := d.Update(context.Background(), objs, commit, ""); err != nil {
// Test precondition; fail early.
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/remediator/watch/filteredwatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func TestFilteredWatcher(t *testing.T) {
} else {
ctx, cancel = context.WithCancel(ctx)
}
if _, err := dr.Update(ctx, tc.declared, "unused"); err != nil {
if _, err := dr.Update(ctx, tc.declared, "unused", ""); err != nil {
t.Fatalf("unexpected error %v", err)
}

Expand Down

0 comments on commit 75eb21f

Please sign in to comment.