Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/loadbalancing] Improve the performance when merging traces belonging to the same backend #32032

Merged
merged 3 commits into from
May 8, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
loadbalancingexporter: use pdata MoveAndAppendTo()
no need to reimplement that in an extremely allocation-inefficient
fashion.

I'm actually not sure why mergeTraces() and mergeMetrics() need to exist
in the first place; all the other exporters coupled with the batch
processor work just fine, not sure why loadbalancing would be special.
#30141
seems to imply they were implemented to improve performance, but I don't
really understand why batch processor would not have been sufficient for
that improvement originally.

benchmarks before:
	goos: darwin
	goarch: arm64
	pkg: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter
	BenchmarkMergeTraces_X100-8     	   50214	     23507 ns/op
	BenchmarkMergeTraces_X500-8     	   10000	    113952 ns/op
	BenchmarkMergeTraces_X1000-8    	    5208	    226062 ns/op
	BenchmarkMergeMetrics_X100-8    	   64933	     18540 ns/op
	BenchmarkMergeMetrics_X500-8    	   12885	     91418 ns/op
	BenchmarkMergeMetrics_X1000-8   	    6590	    184584 ns/op
	PASS
	ok  	github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter	9.783s

and after:
	goos: darwin
	goarch: arm64
	pkg: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter
	BenchmarkMergeTraces_X100-8     	295886529	         3.836 ns/op
	BenchmarkMergeTraces_X500-8     	309865370	         3.833 ns/op
	BenchmarkMergeTraces_X1000-8    	310739948	         3.800 ns/op
	BenchmarkMergeMetrics_X100-8    	315567813	         3.841 ns/op
	BenchmarkMergeMetrics_X500-8    	310341650	         3.849 ns/op
	BenchmarkMergeMetrics_X1000-8   	314292003	         3.830 ns/op
	PASS
	ok  	github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter	10.733s
  • Loading branch information
lauri-paypay authored and jpkrohling committed May 8, 2024
commit da2e7d0b3cf7c2d936614894a1e2f2bacedf46fa
110 changes: 4 additions & 106 deletions exporter/loadbalancingexporter/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,114 +10,12 @@ import (

// mergeTraces concatenates two ptrace.Traces into a single ptrace.Traces.
func mergeTraces(t1 ptrace.Traces, t2 ptrace.Traces) ptrace.Traces {
mergedTraces := ptrace.NewTraces()

if t1.SpanCount() == 0 && t2.SpanCount() == 0 {
return mergedTraces
}

// Iterate over the first trace and append spans to the merged traces
for i := 0; i < t1.ResourceSpans().Len(); i++ {
rs := t1.ResourceSpans().At(i)
newRS := mergedTraces.ResourceSpans().AppendEmpty()

rs.Resource().MoveTo(newRS.Resource())
newRS.SetSchemaUrl(rs.SchemaUrl())

for j := 0; j < rs.ScopeSpans().Len(); j++ {
ils := rs.ScopeSpans().At(j)

newILS := newRS.ScopeSpans().AppendEmpty()
ils.Scope().MoveTo(newILS.Scope())
newILS.SetSchemaUrl(ils.SchemaUrl())

for k := 0; k < ils.Spans().Len(); k++ {
span := ils.Spans().At(k)
newSpan := newILS.Spans().AppendEmpty()
span.MoveTo(newSpan)
}
}
}

// Iterate over the second trace and append spans to the merged traces
for i := 0; i < t2.ResourceSpans().Len(); i++ {
rs := t2.ResourceSpans().At(i)
newRS := mergedTraces.ResourceSpans().AppendEmpty()

rs.Resource().MoveTo(newRS.Resource())
newRS.SetSchemaUrl(rs.SchemaUrl())

for j := 0; j < rs.ScopeSpans().Len(); j++ {
ils := rs.ScopeSpans().At(j)

newILS := newRS.ScopeSpans().AppendEmpty()
ils.Scope().MoveTo(newILS.Scope())
newILS.SetSchemaUrl(ils.SchemaUrl())

for k := 0; k < ils.Spans().Len(); k++ {
span := ils.Spans().At(k)
newSpan := newILS.Spans().AppendEmpty()
span.MoveTo(newSpan)
}
}
}

return mergedTraces
t2.ResourceSpans().MoveAndAppendTo(t1.ResourceSpans())
return t1
}

// mergeMetrics concatenates two pmetric.Metrics into a single pmetric.Metrics.
func mergeMetrics(m1 pmetric.Metrics, m2 pmetric.Metrics) pmetric.Metrics {
mergedMetrics := pmetric.NewMetrics()

if m1.MetricCount() == 0 && m2.MetricCount() == 0 {
return mergedMetrics
}

// Iterate over the first metric and append metrics to the merged metrics
for i := 0; i < m1.ResourceMetrics().Len(); i++ {
rs := m1.ResourceMetrics().At(i)
newRS := mergedMetrics.ResourceMetrics().AppendEmpty()

rs.Resource().MoveTo(newRS.Resource())
newRS.SetSchemaUrl(rs.SchemaUrl())

for j := 0; j < rs.ScopeMetrics().Len(); j++ {
ils := rs.ScopeMetrics().At(j)

newILS := newRS.ScopeMetrics().AppendEmpty()
ils.Scope().MoveTo(newILS.Scope())
newILS.SetSchemaUrl(ils.SchemaUrl())

for k := 0; k < ils.Metrics().Len(); k++ {
metric := ils.Metrics().At(k)
newMetric := newILS.Metrics().AppendEmpty()
metric.MoveTo(newMetric)
}
}
}

// Iterate over the second metric and append metrics to the merged metrics
for i := 0; i < m2.ResourceMetrics().Len(); i++ {
rs := m2.ResourceMetrics().At(i)
newRS := mergedMetrics.ResourceMetrics().AppendEmpty()

rs.Resource().MoveTo(newRS.Resource())
newRS.SetSchemaUrl(rs.SchemaUrl())

for j := 0; j < rs.ScopeMetrics().Len(); j++ {
ils := rs.ScopeMetrics().At(j)

newILS := newRS.ScopeMetrics().AppendEmpty()
ils.Scope().MoveTo(newILS.Scope())
newILS.SetSchemaUrl(ils.SchemaUrl())

for k := 0; k < ils.Metrics().Len(); k++ {
metric := ils.Metrics().At(k)
newMetric := newILS.Metrics().AppendEmpty()
metric.MoveTo(newMetric)
}
}
}

return mergedMetrics
m2.ResourceMetrics().MoveAndAppendTo(m1.ResourceMetrics())
return m1
}