Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[extension/healthcheckv2] Add event aggregation logic #32695

Merged
merged 8 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address review feedback
  • Loading branch information
mwear committed Jun 10, 2024
commit a3016a3fcff5a572f8c96bd93f03da8835a22858
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,18 @@ func TestEventTemporalOrder(t *testing.T) {
// Still returns first error
assert.Equal(t, st.ComponentStatusMap["c2"].Event, aggFunc(st))

// Clear first error
// Replace first error with later error
st.ComponentStatusMap["c2"] = &AggregateStatus{
Event: component.NewStatusEvent(component.StatusOK),
Event: component.NewRecoverableErrorEvent(assert.AnError),
}

// Returns second error now
assert.Equal(t, st.ComponentStatusMap["c3"].Event, aggFunc(st))

// Clear second error
// Clear errors
st.ComponentStatusMap["c2"] = &AggregateStatus{
Event: component.NewStatusEvent(component.StatusOK),
}
st.ComponentStatusMap["c3"] = &AggregateStatus{
Event: component.NewStatusEvent(component.StatusOK),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type subscription struct {
// Aggregator records individual status events for components and aggregates statuses for the
// pipelines they belong to and the collector overall.
type Aggregator struct {
// mu protects aggregateStatus and subscriptions from concurrent modification
mu sync.RWMutex
mwear marked this conversation as resolved.
Show resolved Hide resolved
aggregateStatus *AggregateStatus
subscriptions map[string][]*subscription
Expand Down
142 changes: 142 additions & 0 deletions extension/healthcheckv2extension/internal/status/aggregator_test.go
mwear marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,49 @@ func TestAggregateStatusVerbose(t *testing.T) {

}

func TestAggregateStatusPriorityRecoverable(t *testing.T) {
agg := status.NewAggregator(status.PriorityRecoverable)
traces := testhelpers.NewPipelineMetadata("traces")

testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusOK)

t.Run("pipeline statuses all successful", func(t *testing.T) {
st, ok := agg.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
assert.Equal(t, component.StatusOK, st.Status())
})

agg.RecordStatus(
traces.ProcessorID,
component.NewPermanentErrorEvent(assert.AnError),
)

t.Run("pipeline with permanent error", func(t *testing.T) {
st, ok := agg.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
assertErrorEventsMatch(t,
component.StatusPermanentError,
assert.AnError,
st,
)
})

agg.RecordStatus(
traces.ExporterID,
component.NewRecoverableErrorEvent(assert.AnError),
)

t.Run("pipeline with recoverable error", func(t *testing.T) {
st, ok := agg.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
assertErrorEventsMatch(t,
component.StatusRecoverableError,
assert.AnError,
st,
)
})
}

func TestPipelineAggregateStatus(t *testing.T) {
agg := status.NewAggregator(status.PriorityPermanent)
traces := testhelpers.NewPipelineMetadata("traces")
Expand Down Expand Up @@ -209,6 +252,64 @@ func TestPipelineAggregateStatusVerbose(t *testing.T) {
})
}

func TestAggregateStatusExtensions(t *testing.T) {
agg := status.NewAggregator(status.PriorityPermanent)

extsID := component.MustNewID("extensions")
extInstanceID1 := &component.InstanceID{
ID: component.MustNewID("ext1"),
Kind: component.KindExtension,
PipelineIDs: map[component.ID]struct{}{
extsID: {},
},
}
extInstanceID2 := &component.InstanceID{
ID: component.MustNewID("ext2"),
Kind: component.KindExtension,
PipelineIDs: map[component.ID]struct{}{
extsID: {},
},
}
extInstanceIDs := []*component.InstanceID{extInstanceID1, extInstanceID2}

testhelpers.SeedAggregator(agg, extInstanceIDs, component.StatusOK)

t.Run("extension statuses all successful", func(t *testing.T) {
st, ok := agg.AggregateStatus(status.ScopeExtensions, status.Concise)
require.True(t, ok)
assert.Equal(t, component.StatusOK, st.Status())
})

agg.RecordStatus(
extInstanceID1,
component.NewRecoverableErrorEvent(assert.AnError),
)

t.Run("extension with recoverable error", func(t *testing.T) {
st, ok := agg.AggregateStatus(status.ScopeExtensions, status.Concise)
require.True(t, ok)
assertErrorEventsMatch(t,
component.StatusRecoverableError,
assert.AnError,
st,
)
})

agg.RecordStatus(
extInstanceID1,
component.NewStatusEvent(component.StatusOK),
)

t.Run("extensions recovered", func(t *testing.T) {
st, ok := agg.AggregateStatus(status.ScopeExtensions, status.Concise)
require.True(t, ok)
assertEventsMatch(t,
component.StatusOK,
st,
)
})
}

func TestStreaming(t *testing.T) {
agg := status.NewAggregator(status.PriorityPermanent)
defer agg.Close()
Expand Down Expand Up @@ -324,6 +425,37 @@ func TestStreamingVerbose(t *testing.T) {
})
}

func TestUnsubscribe(t *testing.T) {
agg := status.NewAggregator(status.PriorityPermanent)
defer agg.Close()

traces := testhelpers.NewPipelineMetadata("traces")

traceEvents := agg.Subscribe(status.Scope(traces.PipelineID.String()), status.Concise)
allEvents := agg.Subscribe(status.ScopeAll, status.Concise)

assert.Nil(t, <-traceEvents)
assert.NotNil(t, <-allEvents)

// Start pipeline
testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusStarting)
assertEventsRecvdMatch(t, component.StatusStarting, traceEvents, allEvents)

agg.Unsubscribe(traceEvents)

// Pipeline OK
testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusOK)
assertNoEventsRecvd(t, traceEvents)
assertEventsRecvdMatch(t, component.StatusOK, allEvents)

agg.Unsubscribe(allEvents)

// Stop pipeline
testhelpers.SeedAggregator(agg, traces.InstanceIDs(), component.StatusStopping)

assertNoEventsRecvd(t, traceEvents, allEvents)
}

// assertEventMatches ensures one or more events share the expected status and are
// otherwise equal, ignoring timestamp.
func assertEventsMatch(
Expand Down Expand Up @@ -403,3 +535,13 @@ func toComponentKey(id *component.InstanceID) string {
func toPipelineKey(id component.ID) string {
return fmt.Sprintf("pipeline:%s", id.String())
}

func assertNoEventsRecvd(t *testing.T, chans ...<-chan *status.AggregateStatus) {
for _, stCh := range chans {
select {
case <-stCh:
require.Fail(t, "Found unexpected event")
default:
}
}
}