Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal for pipeline branching in the OpenTelemetry Collector #5414

Closed
siliconbrain opened this issue May 24, 2022 · 9 comments
Closed

Proposal for pipeline branching in the OpenTelemetry Collector #5414

siliconbrain opened this issue May 24, 2022 · 9 comments

Comments

@siliconbrain
Copy link

siliconbrain commented May 24, 2022

This document describes the notion of pipeline branching in the context of the OpenTelemetry collector, tries to argue and demonstrate its usefulness in different use cases supported by real world examples.

What is pipeline branching?

In the context of the OpenTelemetry Collector, a pipeline is "a path the data follows in the Collector starting from reception, then further processing or modification and finally exiting the Collector via exporters". [source]
A pipeline may have multiple receivers sending data to its first processor, and multiple exporters receiving data from its last processor, but there can be no forks between the first and last processor.
In other words, the processors of a pipeline constitute a linear (directed) graph.

This linear structure does not fit well cases when the incoming data needs to be processed in different ways (and exported with different exporters) based on its features (attributes, timestamps, content, name, etc.).
To make handling these cases easier, we want to introduce pipeline branching: allow for branches —like forks in a road— in the processing part of a pipeline.
More precisely, we want to allow the outdegree of any processor to be greater than 1, or atleast enable creating an equivalent structure in some other manner.

See our motivating examples if this explanation seems too dry or you want to see what problems can branching solve.

Current options

Feel free to skip this section if you have a good understanding of processors available for the OpenTelemetry Collector.

Processors currently available in the OpenTelemetry Collector (including the contributions) can only handle a limited set of these cases and/or —as we'll see later— introduce unnecessary overhead.

Conditional processing of data in a pipeline can be implemented by using a filter processor.
The filter processor can be configured to include or exclude data from the remainder of the pipeline based on a few features of metrics or logs (it does not support traces at the moment).
This is useful as a tool for guarding part of the pipeline from unintended data but requires duplicating parts of the pipeline before the filter processor for handling complementary cases (i.e. cases not matching the filter's condition).
This duplication introduces unwanted redundancy in the Collector's configuration that hinders modification.
Also, defining the complementary pipeline(s) is a non trivial problem that becomes more complex when multiple branches are introduced, or even more complex when using multiple levels of filters.

Another processor related to conditional forwarding of data is the routing processor.
This processor can selectively route data to different exporters using a configurable routing table indexed with an attribute of the data.
It's a useful but limited tool. One of its limitations is that indexing requires an exact match of the table key and the attribute value, however, this can be worked around by utilizing one or more processors previously in the pipeline to create a synthetic attribute just for routing —but then this attribute will be exported with your data which you might not want.
Another, more limiting restriction is that it can only route directly to exporters, thus no further processing can occur after routing.

There are other processors which only forward part of the data they receive based on some condition, but those cannot be used to implement the kind of branching behavior we discuss here.

Our proposal

The opentelemetry-log-collection project, originally known as Stanza, has been contributed by observIQ to "accelerate development of the OpenTelemetry Collector's log collection capabilities".
At its core, this project has a similar architecture to the Collector's: it has a notion of pipelines which consist of input, output and transformer operators connected as a directed acyclic graph.
The main difference lies in how these connections are configured: in a Stanza pipeline, operators connect to each other using IDs.
This enables every operator to reference —and thus send data to— any other operator in the pipeline.
This is exploited by the project's router operator which serves a similar purpose to the Collector's previously mentioned routing processor, but has a more advanced solution for determining whether some data matches a specific route employing the Expr library.

We propose to create a new routing processor —or maybe modify the existing one— that has similar capabilities to this router operator: it has advanced data matching capabilities and can forward data to one or many of its configured outputs which can either be other processors or exporters.
Unfortunately, this latter feature requires some modifications of the core OpenTelemetry Collector code.
In the current architecture, there is no way for a processor access a collection of available processors to send data to.
This could be solved by extending the Host interface with a GetProcessors method similar to the GetExporters method that would return available processors.

There is still a debatable question about whether processors from one pipeline should be able to send data to processors in another pipeline.
In this case, in addition to processor and exporter IDs, the pipeline ID can also be specified to address components of another pipeline.

  • Pros:
    • New router can support more advanced rules (expressions, etc...)
    • Simple architecture (does not require extra steps)
  • Cons:
    • Cyclical flows are possible and hard to detect on startup with the current architecture (Stanza architecture is more advanced, allows checking for cycles in the graph.)
    • Makes following a flow of data in the configuration harder
    • Can lead to creation of unused components without further validation

Example code for solution

package routerprocessor

import (
	"context"
	"errors"

	"github.com/antonmedv/expr"
	"github.com/antonmedv/expr/vm"
	"go.opentelemetry.io/collector/component"
	"go.opentelemetry.io/collector/config"
	"go.opentelemetry.io/collector/consumer"
	"go.opentelemetry.io/collector/model/pdata"
	"go.uber.org/multierr"
)

var _ component.LogsProcessor = (*RouterProcessor)(nil)

type RouterProcessor struct {
	cfg *Config
	branches []Branch
}

func (*RouterProcessor) Capabilities() (c consumer.Capabilities) {
	return
}

func (p *RouterProcessor) ConsumeLogs(ctx context.Context, logsIn pdata.Logs) error {
	logsOutForBranch := make([]pdata.Logs, len(p.branches))
	for i := range logsOutForBranch {
		logsOutForBranch[i] = pdata.NewLogs()
	}
	logsOutForDefault := pdata.NewLogs()

	rls := logsIn.ResourceLogs()
	for i := 0; i < rls.Len(); i++ {
		rl := rls.At(i)
		ills := rl.InstrumentationLibraryLogs()
		for i := 0; i < ills.Len(); i++ {
			ill := ills.At(i)
			lrs := ill.LogRecords()
			for i := 0; i < lrs.Len(); i++ {
				lr := lrs.At(i)

				for i := range p.branches {
					if p.branches[i].Match(lr) {
						appendRecord(logsOutForBranch[i], rl, ill, lr)
						continue
					}
				}
				appendRecord(logsOutForDefault, rl, ill, lr)
			}
		}
	}

	var errs error
	for i := range logsOutForBranch {
		if output, logsOut := p.branches[i].output, logsOutForBranch[i]; output != nil && logsOut.LogRecordCount() > 0 {
			errs = multierr.Append(errs, output.ConsumeLogs(ctx, logsOut))
		}
	}
	if output, logsOut := p.defaultOutput, logsOutForDefault; output != nil && logsOut.LogRecordCount() > 0 {
		errs = multierr.Append(errs, output.ConsumeLogs(ctx, logsOut))
	}
	return errs
}

func (p *RouterProcessor) Start(_ context.Context, host component.Host) error {
	pipelines := host.GetProcessors()
	for _, branchCfg := range p.Config.Branches {
		prog, err := expr.Compile(branchCfg.Expr, expr.AsBool)
		if err != nil {
			return err
		}
		pipelineID, err := config.NewComponentIDFromString(branchCfg.Output.Pipeline)
		if err != nil {
			return err
		}
		pipeline, ok := pipelines[pipelineID]
		if !ok {
			return errors.New("missing pipeline")
		}
		processorID, err := config.NewComponentIDFromString(branchCfg.Output.Processor)
		if err != nil {
			return err
		}
		processor, ok := pipeline[processorID]
		if !ok {
			return errors.New("missing processor")
		}
		output, ok := processor.(consumer.Logs)
		if !ok {
			return errors.New("not a log processor")
		}
		p.branches = append(p.branches, Branch{
			prog: prog,
			output: output,
		})
	}
	return nil
}

func (*RouterProcessor) Shutdown(context.Context) error {
	return nil
}

type Branch struct {
	prog *vm.Program
	output consumer.Logs
}

func (b *Branch) Match(lr pdata.LogRecord) bool {
	output, _ := expr.Run(b.prog, envFromLogRecord(lr))
	result, ok := output.(bool)
	return ok && result
}

func envFromLogRecord(lr pdata.LogRecord) map[string]interface{} {
	env := make(map[string]interface{})
	env["$"] = lr.Body()
	// ...
	return env
}

type Config struct {
	Branches []BranchConfig
}

type BranchConfig struct {
	Expr string
	Output struct {
		Pipeline string
		Processor string
	}
}
// extend Host with a new method in go.opentelemetry.io/collector/component/host.go

type Host interface {
	// ...
	GetProcessors() map[config.ComponentID]map[config.ComponentID]component.Processor
}
// add something like this to go.opentelemetry.io/collector/service/service.go

func (srv *service) GetProcessors() map[config.ComponentID]map[config.ComponentID]component.Processor {
	res := make(map[config.ComponentID]map[config.ComponentID]component.Processor)
	for pipelineID, pipeline := range srv.builtPipelines {
		res[pipelineID] = pipeline.GetProcessors()
	}
	return res
}

Example config for solution

exporters:
  loki: {}
  elasticsearch: {}
  in_process/elasticsearch: {}
  in_process/loki: {}
extensions:
  health_check: {}
processors:
  batch: {}
  memory_limiter:
    check_interval: 5s
    limit_mib: 409
    spike_limit_mib: 128
  routes:
  - expr: $$body matches "^\\{"
    output:
      pipeline: logs/loki
      processor: json_parser/docker
  - expr: $$body matches "^[^ Z]+ "
    output:
      pipeline: logs/branch
      processor: regex_parser/crio
  regex_parser/crio:
    regex: ^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) (?P<log>.*)$
    timestamp:
      layout: "2006-01-02T15:04:05.000000000-07:00"
      layout_type: gotime
      parse_from: time
  json_parser/docker:
    timestamp:
      layout: '%Y-%m-%dT%H:%M:%S.%LZ'
      parse_from: time

receivers:
  filelog: {...}

service:
  pipelines:
    logs/main:
      exporters: []
      processors:
      - memory_limiter
      - batch
      - routes
      receivers:
      - filelog
    logs/loki:
      exporters:
      - loki
      processors:
      - json_parser/docker
      receivers: []
    logs/branch:
      exporters:
      - elasticsearch
      processors:
      - regex_parser/crio
      receivers: []

Our backup proposal

This Stanza-esque router, while providing significant flexibility, requires changes to the core API and design of the OTel Collector which might not be something the community wants.
For this reason, we also present a more conservative solution that only exploits already available features of the Collector.

The routing processor's last mentioned missing feature —that further processing of data cannot occur after routing— could be overcome by exporting the data in such a way that it's picked up by another pipeline that continues its processing.
With the currently available array of exporters and receivers the simplest way this could be done is by exporting the data using the otlp exporter to the otlp receiver of another pipeline in the same collector, using ports on the same machine, but this solution wastes resources unnecessarily.

We propose to create a new pair of exporter and receiver that communicate in-process thus using the absolute minimum of extra resources.
In-process receivers are configured to look up their exporter pairs by component ID taking advantage of the same mechanism used by the routing processor.
They register themselves as targets with these exporters and the exporters forward any incoming data to their registered targets.

Branch Flow

  • Pros:

    • Fits in the current architecture (receivers, processors, exporters), no core code modification required
    • Allows to create more complex pipelines
  • Cons:

    • Cyclical flows are possible and hard to detect on startup with the current architecture (Stanza architecture is more advanced, allows checking for cycles in the graph.)
    • Makes following a flow of data in the configuration harder

Example code for solution

package inprocessexporter

import (
	"context"

	"go.opentelemetry.io/collector/component"
	"go.opentelemetry.io/collector/consumer"
	"go.opentelemetry.io/collector/model/pdata"
	"go.uber.org/multierr"
)

var _ component.LogsExporter = (*Exporter)(nil)

type Exporter struct {
	logConsumers []consumer.Logs
}

func (e *Exporter) Capabilities() (c consumer.Capabilities) {
	return
}

func (e *Exporter) ConsumeLogs(ctx context.Context, ld pdata.Logs) (errs error) {
	for _, consumer := range e.logConsumers {
		ld := ld
		if consumer.Capabilities().MutatesData {
			ld = ld.Clone()
		}
		errs = multierr.Append(errs, consumer.ConsumeLogs(ctx, ld))
	}
	return
}

func (e *Exporter) RegisterLogConsumer(lc consumer.Logs) {
	if lc == nil {
		return
	}
	e.logConsumers = append(e.logConsumers, lc)
}

func (*Exporter) Start(context.Context, component.Host) error {
	return nil
}

func (*Exporter) Shutdown(context.Context) error {
	return nil
}
package inprocessreceiver

import (
	"context"
	"errors"

	"go.opentelemetry.io/collector/component"
	"go.opentelemetry.io/collector/config"
	"go.opentelemetry.io/collector/consumer"
	"go.uber.org/multierr"
)

var _ component.LogsReceiver = (*Receiver)(nil)

type Receiver struct {
	cfg  *Config
	next consumer.Logs
}

func (r *Receiver) Start(_ context.Context, host component.Host) error {
	exporters := host.GetExporters()
	logExporters := make(map[string]component.Exporter)
	for id, exp := range exporters[config.LogsDataType] {
		logExporters[id.String()] = exp
	}
	for _, expID := range r.cfg.LogExporters {
		exp, ok := logExporters[expID]
		if !ok {
			return errors.New("missing exporter")
		}
		ex, ok := exp.(interface{ RegisterLogConsumer(consumer.Logs) })
		if !ok {
			return errors.New("unsupported exporter type")
		}
		ex.RegisterLogConsumer(r.next)
	}
	return nil
}

func (*Receiver) Shutdown(context.Context) error {
	return nil
}

type Config struct {
	LogExporters []string
}

Example config for solution

exporters:
  loki: {}
  elasticsearch: {}
  in_process/elasticsearch: {}
  in_process/loki: {}
extensions:
  health_check: {}
processors:
  batch: {}
  memory_limiter:
    check_interval: 5s
    limit_mib: 409
    spike_limit_mib: 128
  routing/main:
    from_attribute: app
    default_exporters: []
    table:
    - value: loki
      exporters: [in_process/loki]
    - value: elasticsearch
      exporters: [in_process/elasticsearch]
receivers:
  in_process/elasticsearch:
    exporters:
    - in_process/elasticsearch
  in_process/loki:
    exporters:
    - in_process/loki
service:
  pipelines:
    logs/main:
      exporters: []
      processors:
      - memory_limiter
      - batch
      - routing/main
      receivers:
      - filelog
    logs/loki:
      exporters:
      - loki
      processors: []
      receivers:
      - in_process/loki
    logs/branch:
      exporters:
      - elasticsearch
      processors: []
      receivers:
      - in_process/elasticsearch

Detecting cycles at runtime

Both of our proposed solutions suffer from the problem of enabling the creation of cyclic flows of data that when unchecked can lead to infinite loops.
With the current architecture, this problem cannot be detected at startup time, but can be handled at runtime.
We can mark data that has been processed by one of our components —the router processor or the in-process exporter— using the context.
The key of the mark identifies the component —or checkpoint— the data is passing through, and the value is the number of times the data has passed through the checkpoint.
Components can detect cycles by checking this special value in the context and handle cases according to their configuration.

Example code

const key = "in_process/loki"
cnt, _ := ctx.Value(key).(int)
ctx = context.WithValue(ctx, key, cnt+1)

Example config

max_seen_counter: 1
seen_counter_action: [drop|error|panic|...]

Alternative ideas

An alternative solution for applying only some processors of a pipeline to a specific piece of data would be to allow specifying a condition for every processor which would determine whether that processor should be applied to a specific piece of data or should it just be passed on to the next stage unmodified.
The same idea is used for example in GitHub Actions.
We did not explore this further because with our use-cases it suffers from some of the same problems mentioned for the filter processor.

If you have any additional ideas that you feel would support our use-cases while fitting the OpenTelemetry Collector better, please, let us know!

Motivating examples

Here are some examples that articulate why pipeline branching is a useful and sometimes unavoidable tool that should be part of the Collector.
If you recognize another use case that would be enabled by or benefit from pipeline branching, please reach out to us so that we can add it to this collection — especially if it's related to metrics or tracing.

Better support for logs

Logs are the newest addition to the types of data OpenTelemetry can handle and thus they are not as well supported as traces or metrics.
Code donated by the Stanza project in opentelemetry-log-collection gave a huge boost to the number of receivers available for ingesting logs, but log forwarding and more importantly, log processing still has a long way to go.
Strong processing capabilities are especially important with logs since they are highly heterogeneous with varying levels of structure.
While newer applications seem to have embraced the move toward structured logging, most older applications still emit logs that can only be ingested as a blob of plain text data with almost no structured metadata attached.
That blob of text almost always contains a plethora of useful information waiting to be extracted, however, industry standards for log line syntax are aplenty and still many developers just roll their own syntax.
This means that advanced processing capabilities might be needed to process and transform logs into one standard format.
This processing might be handled by a third-party service, but the OpenTelemetry Collector already sits in the perfect place(s) and has the necessary infrastructure for processing logs, it only lacks some tools.

If you take a look at the features available for transforming and parsing logs in the Stanza codebase mentioned previously, you can see that there are many.
The problem is that these cannot be mixed with the OpenTelemetry Collector's processors, since Stanza's operators are only available inside the receiver components that are implemented by them.
The main barrier for reimplementing Stanza's operators as OpenTelemetry processors seems to be that OTel's pipeline architecture is missing some features exploited by these operators.

Reimplementing the Logging Operator using OTel Collector

Lately, we've been looking into replacing FluentBit and Fluentd with the OpenTelemetry Collector inside Logging Operator.
To achieve this, we need to examine how features of Fluent(Bit|d) translate to features of the OTel Collector.

One of the main features of the Logging Operator is that it can route logs based on labels and each route can have its own transformation pipeline.
When we tried to replicate the logic behind this feature in the OTel Collector with the available tools, we ran into an obstacle.
Take a look at the following picture. The main difference is how the different log flows are treated by the tools.

Logging Operator vs OTel

Logging Operator:

  1. Collect logs
  2. Enrich with metadata
  3. Route based on metadata or content
  4. Apply filters/transformations
  5. Send to output(s)

OTel Collector:

  1. Collect logs (stanza)
  2. Add primary identifiers to metadata (stanza)
  3. Enrich with more metadata based on primary identifiers (otel)
  4. Apply filters/transformations
  5. Route logs to one or more outputs

As you can see, Logging Operator routes logs after augmenting them with some additional metadata retrieved from the Kubernetes cluster —much like the k8sattributes processor does— and then applies some user defined transformations.
With the OTel Collector however, routing can only happen just before the end of the pipeline, thus allowing no more transformation to take place after routing.

If you take a closer look at the code from the Stanza project, you can see that they also have a routing facility implemented which even has more advanced capabilities for matching records and can route them to any operator in the Stanza pipeline.
However, this cannot be used as a routing solution in our case, since it can only run as part of a receiver component but Kubernetes metadata addition, which needs to happen before routing, can only be done earliest as the first processor in an OTel Collector pipeline which runs only after the receiver.
As you can see, its a case of the chicken and egg problem.

@mx-psi mx-psi transferred this issue from open-telemetry/opentelemetry-collector-contrib May 25, 2022
@djaglowski
Copy link
Member

@siliconbrain, thank you for this detailed proposal. I think you've laid all of this out in fantastic detail and made a compelling argument that some form of routing is necessary. I am one of the original authors of stanza and continue to maintain the donated codebase within the OTel project, so I can certainly appreciate the use cases you are targeting here.

As you can imagine, some of these same concerns were discussed when debating exactly how to integrate stanza into the OTel collector. It is strongly preferred that expectations around how data flows through pipelines be respected. However, this of course does not preclude composition of pipelines, such as in your backup proposal.

I think you would be interested to compare your proposal to #2336. The explicit motivation for that proposal was to enable translation between signals, but the proposed implementation follows from the same line of thought as your backup proposal. I took it one step further and suggested that the exporter/receiver pair be formalized into a new type of component called a connector. Unfortunately, this has not yet been implemented, but it is already accepted in concept.

You've also made a very lucid point about the distinction between a true routing component vs a "duplicate and filter" strategy. I think this may be worth exploring further. That said, I'm curious to first hear your thoughts on the design proposed in #2336, and whether you think it would be sufficient for your use cases.

@siliconbrain
Copy link
Author

@djaglowski, your comment is very much appreciated!

It is strongly preferred that expectations around how data flows through pipelines be respected.

I had a hunch it would be so, hence the backup proposal. 🙂 Could you give me some insight as to why that is? Also, to me, the routing processor seems to clearly break those expectations.

I think you would be interested to compare your proposal to #2336.

It was pleasant to read your proposal and see that we're not alone with missing a feature like this. Indeed, your proposal suggests something very similar to our backup. I quite like the notion of combining the exporter-receiver pair into its own entity as this eliminates the redundancy of defining pairs and I also think the connector is a very descriptive name for it. 👍 However, my argument against it is that this requires the topological sorting of connected pipelines, while with the exporter-receiver pair this is not necessary. Maybe we could have the best of both worlds by implementing connectors as a kind of "syntactic sugar" for the configuration and generating exporter-receiver pairs underneath?

Another question that comes to mind (and this might be better asked in your proposal's discussion, but anyway) is how would the definition of a translating connector look like in the configuration, especially with regards to indicating its input and output signal type?

You've also made a very lucid point about the distinction between a true routing component vs a "duplicate and filter" strategy. I think this may be worth exploring further.

What do you mean about exploring further?

@djaglowski
Copy link
Member

Could you give me some insight as to why that is? Also, to me, the routing processor seems to clearly break those expectations.

Basically because the design is easy to understand and is already in place, works for most use cases, and it appears to be sufficient to build upon for other use cases (via connectors or other similar mechanism). I agree that the routing processor is not really conformant to the expected data flow. I believe I've heard this sentiment from others as well, and there is a desire to reconcile this eventually.

However, my argument against it is that this requires the topological sorting of connected pipelines, while with the exporter-receiver pair this is not necessary.

Isn't the need for topological sorting just a consequence of the decision to disallow cycles? Technically cycles are always possible via other means (eg. export back to same pipeline via otlp exporer-receiver pair), but I would think we should prevent them where possible.

Another question that comes to mind (and this might be better asked in your proposal's discussion, but anyway) is how would the definition of a translating connector look like in the configuration, especially with regards to indicating its input and output signal type?

I believe this could be implicit:

receivers:
  filelog:
    
exporters:
  otlp:

connectors: # must be used as both exporter and receiver
  guesswhichkind:

service:
  pipelines:
    logs:
      receivers: [filelog]
      exporters: [guesswhichkind]
    metrics:
      receivers: [guesswhichkind]
      exporters: [otlp]

What do you mean about exploring further?

I need to give this some more thought, but perhaps it's possible to implement a routing mechanism within a receiver. Currently it's possible to use a single receiver instance in multiple pipelines. Rather than requiring each pipeline to apply a filter, perhaps the receiver can selectively emit to its pipelines. I don't believe the current interface allows this, but it seems possible. In theory this could mitigate some work (eg. a receiver that emits to only 1 of 100 pipelines would not need to duplicate the signal and would prevent 99 pipelines from having to filter out the signal).

@gfonseca-tc
Copy link

I would love to see something like this in the collector. My current approach is creating otlp receiver/exporters to connect the pipelines, it's not the most elegant solution, but it works...

@jpkrohling
Copy link
Member

jpkrohling commented Aug 22, 2022

I could only catch up with this proposal now, and I like it. The current state of the routing processor is MVP-like, the minimal solution that works. What you outlined in the backup proposal is mainly what I had in mind during the development: "... communicate in-process thus using the absolute minimum of extra resources". I don't think I had a new receiver in mind for the in-process, but an evolution of the current lookup.

In any case, keep me posted on the evolution of this!

@suiluj
Copy link

suiluj commented Dec 5, 2022

I like your proposal! :)

@kamalmarhubi
Copy link

Is anyone likely to work on this in the near future? I constantly feel like my observability pipeline is waiting for either the otel collector to implement this, or for vector to properly support otel traces.

@siliconbrain
Copy link
Author

@kamalmarhubi AFAIK, @djaglowski is working on implementing his proposal which is a superset of ours, supporting even more use-cases.

@codeboten
Copy link
Contributor

As per @siliconbrain's last comment, closing this issue as addressed in an alternative proposal.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants