Skip to content

Commit

Permalink
[exporter/elasticsearch] Add span event to traces OTel mapping mode
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Aug 30, 2024
1 parent 0f63b5a commit 001999c
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 9 deletions.
29 changes: 29 additions & 0 deletions .chloggen/elasticsearchexporter_otel-mode-traces-span-events.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add span event support to traces OTel mapping mode

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34831]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
Span events are now supported in OTel mapping mode.
They will be routed to `logs-${data_stream.dataset}-${data_stream.namespace}` if `traces_dynamic_index::enabled` is `true`.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
8 changes: 5 additions & 3 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ This can be customised through the following settings:

- `traces_dynamic_index` (optional): uses resource, scope, or span attributes to dynamically construct index name.
- `enabled`(default=false): Enable/Disable dynamic index for trace spans. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: span attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if
`elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `traces-generic-default`, and `traces_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields.
`elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `traces-generic-default`, and `traces_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. There is an exception for span events under OTel mapping mode (`mapping::mode: otel`), where span event attributes instead of span attributes are considered, and `data_stream.type` is always `logs` instead of `traces` such that documents are routed to `logs-${data_stream.dataset}-${data_stream.namespace}`.

- `logstash_format` (optional): Logstash format compatibility. Logs, metrics and traces can be written into an index in Logstash format.
- `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `(logs|metrics|traces)_index` or `(logs|metrics|traces)_dynamic_index` as prefix and the date as suffix,
Expand All @@ -155,8 +155,10 @@ behaviours, which may be configured through the following settings:
- `none`: Use original fields and event structure from the OTLP event.
- `ecs`: Try to map fields to [Elastic Common Schema (ECS)][ECS]
- `otel`: Elastic's preferred "OTel-native" mapping mode. Uses original fields and event structure from the OTLP event.
:warning: This mode's behavior is unstable, it is currently is experimental and undergoing changes.
There's a special treatment for the following attributes: `data_stream.type`, `data_stream.dataset`, `data_stream.namespace`. Instead of serializing these values under the `*attributes.*` namespace, they're put at the root of the document, to conform with the conventions of the data stream naming scheme that maps these as `constant_keyword` fields.
- :warning: This mode's behavior is unstable, it is currently is experimental and undergoing changes.
- There's a special treatment for the following attributes: `data_stream.type`, `data_stream.dataset`, `data_stream.namespace`. Instead of serializing these values under the `*attributes.*` namespace, they're put at the root of the document, to conform with the conventions of the data stream naming scheme that maps these as `constant_keyword` fields.
- `data_stream.dataset` will always be appended with `.otel`. It is recommended to use with `*_dynamic_index.enabled: true` to route documents to data stream `${data_stream.type}-${data_stream.dataset}-${data_stream.namespace}`.
- Span events are stored in separate documents. They will be routed with `data_stream.type` set to `logs` if `traces_dynamic_index::enabled` is `true`.

- `raw`: Omit the `Attributes.` string prefixed to field names for log and
span attributes as well as omit the `Events.` string prefixed to
Expand Down
16 changes: 15 additions & 1 deletion exporter/elasticsearchexporter/data_stream_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
)

func routeWithDefaults(defaultDSType, defaultDSDataset, defaultDSNamespace string) func(
func routeWithDefaults(defaultDSType, defaultDSDataset, defaultDSNamespace string) func( // nolint:unparam
pcommon.Map,
pcommon.Map,
pcommon.Map,
Expand Down Expand Up @@ -91,3 +91,17 @@ func routeSpan(
route := routeWithDefaults(defaultDataStreamTypeTraces, defaultDataStreamDataset, defaultDataStreamNamespace)
return route(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
}

// routeSpanEvent returns the name of the index to send the span event to according to data stream routing attributes.
// This function may mutate record attributes.
func routeSpanEvent(
spanEvent ptrace.SpanEvent,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
otel bool,
) string {
// span events are sent to logs-*, not traces-*
route := routeWithDefaults(defaultDataStreamTypeLogs, defaultDataStreamDataset, defaultDataStreamNamespace)
return route(spanEvent.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
}
40 changes: 40 additions & 0 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,12 @@ func (e *elasticsearchExporter) pushTraceData(
}
errs = append(errs, err)
}
for ii := 0; ii < span.Events().Len(); ii++ {
spanEvent := span.Events().At(ii)
if err := e.pushSpanEvent(ctx, resource, il.SchemaUrl(), span, spanEvent, scope, scopeSpan.SchemaUrl(), session); err != nil {
errs = append(errs, err)
}
}
}
}
}
Expand Down Expand Up @@ -402,3 +408,37 @@ func (e *elasticsearchExporter) pushTraceRecord(
}
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil)
}

func (e *elasticsearchExporter) pushSpanEvent(
ctx context.Context,
resource pcommon.Resource,
resourceSchemaURL string,
span ptrace.Span,
spanEvent ptrace.SpanEvent,
scope pcommon.InstrumentationScope,
scopeSchemaURL string,
bulkIndexerSession bulkIndexerSession,
) error {
fIndex := e.index
if e.dynamicIndex {
fIndex = routeSpanEvent(spanEvent, scope, resource, fIndex, e.otel)
}

if e.logstashFormat.Enabled {
formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now())
if err != nil {
return err
}
fIndex = formattedIndex
}

document := e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL)
if document == nil {
return nil
}
docBytes, err := e.model.encodeDocument(*document)
if err != nil {
return err
}
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil)
}
11 changes: 10 additions & 1 deletion exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,11 @@ func TestExporterTraces(t *testing.T) {
span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(3600, 0)))
span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Unix(7200, 0)))

event := span.Events().AppendEmpty()
event.SetName("exception")
event.Attributes().PutStr("event.attr.foo", "event.attr.bar")
event.SetDroppedAttributesCount(1)

scopeAttr := span.Attributes()
fillResourceAttributeMap(scopeAttr, map[string]string{
"attr.foo": "attr.bar",
Expand All @@ -1082,13 +1087,17 @@ func TestExporterTraces(t *testing.T) {

mustSendTraces(t, exporter, traces)

rec.WaitItems(1)
rec.WaitItems(2)

expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"traces-generic.otel-default"}}`),
Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","attributes":{"attr.foo":"attr.bar"},"data_stream":{"dataset":"generic.otel","namespace":"default","type":"traces"},"dropped_attributes_count":2,"dropped_events_count":3,"dropped_links_count":4,"duration":3600000000000,"kind":"Unspecified","links":[{"attributes":{"link.attr.foo":"link.attr.bar"},"dropped_attributes_count":11,"span_id":"","trace_id":"","trace_state":"bar"}],"name":"name","resource":{"attributes":{"resource.foo":"resource.bar"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"status":{"code":"Unset"},"trace_state":"foo"}`),
},
{
Action: []byte(`{"create":{"_index":"logs-generic.otel-default"}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"event.attr.foo":"event.attr.bar","event.name":"exception"},"data_stream":{"dataset":"generic.otel","namespace":"default","type":"logs"},"dropped_attributes_count":1,"resource":{"attributes":{"resource.foo":"resource.bar"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`),
},
}

assertItemsEqual(t, expected, rec.Items(), false)
Expand Down
29 changes: 25 additions & 4 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ var resourceAttrsToPreserve = map[string]bool{
type mappingModel interface {
encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string) ([]byte, error)
encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string) ([]byte, error)
encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string) *objmodel.Document
upsertMetricDataPointValue(map[uint32]objmodel.Document, pcommon.Resource, string, pcommon.InstrumentationScope, string, pmetric.Metric, dataPoint, pcommon.Value) error
encodeDocument(objmodel.Document) ([]byte, error)
}
Expand Down Expand Up @@ -463,7 +464,9 @@ func (m *encodeModel) encodeScopeOTelMode(document *objmodel.Document, scope pco
}

func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, attributeMap pcommon.Map) {
attributeMap.RemoveIf(func(key string, val pcommon.Value) bool {
attrsCopy := pcommon.NewMap() // Copy to avoid mutating original map
attributeMap.CopyTo(attrsCopy)
attrsCopy.RemoveIf(func(key string, val pcommon.Value) bool {
switch key {
case dataStreamType, dataStreamDataset, dataStreamNamespace:
// At this point the data_stream attributes are expected to be in the record attributes,
Expand All @@ -474,7 +477,7 @@ func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, attr
}
return false
})
document.AddAttributes("attributes", attributeMap)
document.AddAttributes("attributes", attrsCopy)
}

func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string) ([]byte, error) {
Expand Down Expand Up @@ -529,8 +532,6 @@ func (m *encodeModel) encodeSpanOTelMode(resource pcommon.Resource, resourceSche
m.encodeResourceOTelMode(&document, resource, resourceSchemaURL)
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)

// TODO: add span events to log data streams

return document
}

Expand All @@ -554,6 +555,26 @@ func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptra
return document
}

func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string) *objmodel.Document {
if m.mode != MappingOTel {
// Currently span events are stored separately only in OTel mapping mode.
// In other modes, they are stored within the span document.
return nil
}
var document objmodel.Document
document.AddTimestamp("@timestamp", spanEvent.Timestamp())
document.AddString("attributes.event.name", spanEvent.Name())
document.AddSpanID("span_id", span.SpanID())
document.AddTraceID("trace_id", span.TraceID())
document.AddInt("dropped_attributes_count", int64(spanEvent.DroppedAttributesCount()))

m.encodeAttributesOTelMode(&document, spanEvent.Attributes())
m.encodeResourceOTelMode(&document, resource, resourceSchemaURL)
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)

return &document
}

func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes pcommon.Map) {
key := "Attributes"
if m.mode == MappingRaw {
Expand Down

0 comments on commit 001999c

Please sign in to comment.