Skip to content

Commit

Permalink
otelcol.exporter.otlphttp: new component (grafana#2429)
Browse files Browse the repository at this point in the history
Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>
  • Loading branch information
tpaschalis committed Oct 26, 2022
1 parent 6436b35 commit 2f178bb
Show file tree
Hide file tree
Showing 11 changed files with 549 additions and 67 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ Main (unreleased)
- `otelcol.exporter.otlp` accepts data from `otelcol` components and sends
it to a gRPC server using the OTLP protocol. (@rfratto)

- `otelcol.exporter.otlphttp` accepts data from `otelcol` components and
sends it to an HTTP server using the OTLP protocol. (@tpaschalis)

- `otelcol.auth.basic` performs basic authentication for `otelcol`
components which support authentication extensions. (@rfratto)

Expand Down
1 change: 1 addition & 0 deletions component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
_ "github.com/grafana/agent/component/local/file" // Import local.file
_ "github.com/grafana/agent/component/otelcol/auth/basic" // Import otelcol.auth.basic
_ "github.com/grafana/agent/component/otelcol/exporter/otlp" // Import otelcol.exporter.otlp
_ "github.com/grafana/agent/component/otelcol/exporter/otlphttp" // Import otelcol.exporter.otlphttp
_ "github.com/grafana/agent/component/otelcol/processor/batch" // Import otelcol.processor.batch
_ "github.com/grafana/agent/component/otelcol/processor/memorylimiter" // Import otelcol.processor.memory_limiter
_ "github.com/grafana/agent/component/otelcol/receiver/jaeger" // Import otelcol.receiver.jaeger
Expand Down
72 changes: 72 additions & 0 deletions component/otelcol/config_http.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package otelcol

import (
"time"

"github.com/alecthomas/units"
"github.com/grafana/agent/component/otelcol/auth"
otelcomponent "go.opentelemetry.io/collector/component"
otelconfig "go.opentelemetry.io/collector/config"
otelconfigauth "go.opentelemetry.io/collector/config/configauth"
otelconfighttp "go.opentelemetry.io/collector/config/confighttp"
)

Expand Down Expand Up @@ -63,3 +69,69 @@ func (args *CORSArguments) Convert() *otelconfighttp.CORSSettings {
MaxAge: args.MaxAge,
}
}

// HTTPClientArguments holds shared HTTP settings for components which launch
// HTTP clients.
type HTTPClientArguments struct {
Endpoint string `river:"endpoint,attr"`

Compression CompressionType `river:"compression,attr,optional"`

TLS TLSClientArguments `river:"tls,block,optional"`

ReadBufferSize units.Base2Bytes `river:"read_buffer_size,attr,optional"`
WriteBufferSize units.Base2Bytes `river:"write_buffer_size,attr,optional"`
Timeout time.Duration `river:"timeout,attr,optional"`
Headers map[string]string `river:"headers,attr,optional"`
// CustomRoundTripper func(next http.RoundTripper) (http.RoundTripper, error) TODO (@tpaschalis)
MaxIdleConns *int `river:"max_idle_conns,attr,optional"`
MaxIdleConnsPerHost *int `river:"max_idle_conns_per_host,attr,optional"`
MaxConnsPerHost *int `river:"max_conns_per_host,attr,optional"`
IdleConnTimeout *time.Duration `river:"idle_conn_timeout,attr,optional"`

// Auth is a binding to an otelcol.auth.* component extension which handles
// authentication.
Auth *auth.Handler `river:"auth,attr,optional"`
}

// Convert converts args into the upstream type.
func (args *HTTPClientArguments) Convert() *otelconfighttp.HTTPClientSettings {
if args == nil {
return nil
}

// Configure the authentication if args.Auth is set.
var auth *otelconfigauth.Authentication
if args.Auth != nil {
auth = &otelconfigauth.Authentication{AuthenticatorID: args.Auth.ID}
}

return &otelconfighttp.HTTPClientSettings{
Endpoint: args.Endpoint,

Compression: args.Compression.Convert(),

TLSSetting: *args.TLS.Convert(),

ReadBufferSize: int(args.ReadBufferSize),
WriteBufferSize: int(args.WriteBufferSize),
Timeout: args.Timeout,
Headers: args.Headers,
// CustomRoundTripper: func(http.RoundTripper) (http.RoundTripper, error) { panic("not implemented") }, TODO (@tpaschalis)
MaxIdleConns: args.MaxIdleConns,
MaxIdleConnsPerHost: args.MaxIdleConnsPerHost,
MaxConnsPerHost: args.MaxConnsPerHost,
IdleConnTimeout: args.IdleConnTimeout,

Auth: auth,
}
}

// Extensions exposes extensions used by args.
func (args *HTTPClientArguments) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {
m := make(map[otelconfig.ComponentID]otelcomponent.Extension)
if args.Auth != nil {
m[args.Auth.ID] = args.Auth.Extension
}
return m
}
122 changes: 122 additions & 0 deletions component/otelcol/exporter/otlphttp/otlphttp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Package otlphttp provides an otelcol.exporter.otlphttp component.
package otlphttp

import (
"errors"
"time"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/exporter"
"github.com/grafana/agent/pkg/river"
otelcomponent "go.opentelemetry.io/collector/component"
otelconfig "go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/exporter/otlphttpexporter"
)

func init() {
component.Register(component.Registration{
Name: "otelcol.exporter.otlphttp",
Args: Arguments{},
Exports: otelcol.ConsumerExports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := otlphttpexporter.NewFactory()
return exporter.New(opts, fact, args.(Arguments))
},
})
}

// Arguments configures the otelcol.exporter.otlphttp component.
type Arguments struct {
Client HTTPClientArguments `river:"client,block"`
Queue otelcol.QueueArguments `river:"sending_queue,block,optional"`
Retry otelcol.RetryArguments `river:"retry_on_failure,block,optional"`

// The URLs to send metrics/logs/traces to. If omitted the exporter will
// use Client.Endpoint by appending "/v1/metrics", "/v1/logs" or
// "/v1/traces", respectively. If set, these settings override
// Client.Endpoint for the corresponding signal.
TracesEndpoint string `river:"traces_endpoint,attr,optional"`
MetricsEndpoint string `river:"metrics_endpoint,attr,optional"`
LogsEndpoint string `river:"logs_endpoint,attr,optional"`
}

var (
_ river.Unmarshaler = (*Arguments)(nil)
_ river.Unmarshaler = (*HTTPClientArguments)(nil)
_ exporter.Arguments = Arguments{}
)

// DefaultArguments holds default values for Arguments.
var DefaultArguments = Arguments{
Queue: otelcol.DefaultQueueArguments,
Retry: otelcol.DefaultRetryArguments,
Client: DefaultHTTPClientArguments,
}

// UnmarshalRiver implements river.Unmarshaler.
func (args *Arguments) UnmarshalRiver(f func(interface{}) error) error {
*args = DefaultArguments
type arguments Arguments
err := f((*arguments)(args))
if err != nil {
return err
}
return args.Validate()
}

// Convert implements exporter.Arguments.
func (args Arguments) Convert() otelconfig.Exporter {
return &otlphttpexporter.Config{
ExporterSettings: otelconfig.NewExporterSettings(otelconfig.NewComponentID("otlp")),
HTTPClientSettings: *(*otelcol.HTTPClientArguments)(&args.Client).Convert(),
QueueSettings: *args.Queue.Convert(),
RetrySettings: *args.Retry.Convert(),
}
}

// Extensions implements exporter.Arguments.
func (args Arguments) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {
return (*otelcol.HTTPClientArguments)(&args.Client).Extensions()
}

// Exporters implements exporter.Arguments.
func (args Arguments) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {
return nil
}

// Validate returns an error if the configuration is invalid.
func (args *Arguments) Validate() error {
if args.Client.Endpoint == "" && args.TracesEndpoint == "" && args.MetricsEndpoint == "" && args.LogsEndpoint == "" {
return errors.New("at least one endpoint must be specified")
}
return nil
}

// HTTPClientArguments is used to configure otelcol.exporter.otlphttp with
// component-specific defaults.
type HTTPClientArguments otelcol.HTTPClientArguments

// Default server settings.
var (
DefaultMaxIddleConns = 100
DefaultIdleConnTimeout = 90 * time.Second
DefaultHTTPClientArguments = HTTPClientArguments{
MaxIdleConns: &DefaultMaxIddleConns,
IdleConnTimeout: &DefaultIdleConnTimeout,

Timeout: 30 * time.Second,
Headers: map[string]string{},
Compression: otelcol.CompressionTypeGzip,
ReadBufferSize: 0,
WriteBufferSize: 512 * 1024,
}
)

// UnmarshalRiver implements river.Unmarshaler and supplies defaults.
func (args *HTTPClientArguments) UnmarshalRiver(f func(interface{}) error) error {
*args = DefaultHTTPClientArguments
type arguments HTTPClientArguments
return f((*arguments)(args))
}
114 changes: 114 additions & 0 deletions component/otelcol/exporter/otlphttp/otlphttp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package otlphttp_test

import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/exporter/otlphttp"
"github.com/grafana/agent/pkg/flow/componenttest"
"github.com/grafana/agent/pkg/river"
"github.com/grafana/agent/pkg/util"
"github.com/grafana/dskit/backoff"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
)

// Test performs a basic integration test which runs the
// otelcol.exporter.otlphttp component and ensures that it can pass data to an
// OTLP HTTP server.
func Test(t *testing.T) {
ch := make(chan ptrace.Traces)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
b, _ := ioutil.ReadAll(r.Body)
trace, _ := ptrace.NewProtoUnmarshaler().UnmarshalTraces(b)
require.Equal(t, 1, trace.SpanCount())
name := trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Name()
require.Equal(t, "TestSpan", name)
ch <- trace
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()

ctx := componenttest.TestContext(t)
l := util.TestLogger(t)

ctrl, err := componenttest.NewControllerFromID(l, "otelcol.exporter.otlphttp")
require.NoError(t, err)

cfg := fmt.Sprintf(`
client {
endpoint = "%s"
compression = "none"
tls {
insecure = true
insecure_skip_verify = true
}
}
`, srv.URL)
var args otlphttp.Arguments
require.NoError(t, river.Unmarshal([]byte(cfg), &args))

go func() {
err := ctrl.Run(ctx, args)
require.NoError(t, err)
}()

require.NoError(t, ctrl.WaitRunning(time.Second), "component never started")
require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything")

// Send traces in the background to our exporter.
go func() {
exports := ctrl.Exports().(otelcol.ConsumerExports)

bo := backoff.New(ctx, backoff.Config{
MinBackoff: 10 * time.Millisecond,
MaxBackoff: 100 * time.Millisecond,
})
for bo.Ongoing() {
err := exports.Input.ConsumeTraces(ctx, createTestTraces())
if err != nil {
level.Error(l).Log("msg", "failed to send traces", "err", err)
bo.Wait()
continue
}

return
}
}()

// Wait for our exporter to finish and pass data to our HTTP server.
select {
case <-time.After(time.Second):
require.FailNow(t, "failed waiting for traces")
case tr := <-ch:
require.Equal(t, 1, tr.SpanCount())
}
}

func createTestTraces() ptrace.Traces {
// Matches format from the protobuf definition:
// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto
var bb = `{
"resource_spans": [{
"scope_spans": [{
"spans": [{
"name": "TestSpan"
}]
}]
}]
}`

data, err := ptrace.NewJSONUnmarshaler().UnmarshalTraces([]byte(bb))
if err != nil {
panic(err)
}
return data
}
Loading

0 comments on commit 2f178bb

Please sign in to comment.