diff --git a/cmd/oteldb/app.go b/cmd/oteldb/app.go index b4053a32..48d48aca 100644 --- a/cmd/oteldb/app.go +++ b/cmd/oteldb/app.go @@ -11,14 +11,17 @@ import ( sdkapp "github.com/go-faster/sdk/app" "github.com/go-faster/sdk/zctx" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/storage" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/confmap/converter/expandconverter" + "go.opentelemetry.io/collector/otelcol" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" "github.com/go-faster/oteldb/internal/httpmiddleware" "github.com/go-faster/oteldb/internal/logql" "github.com/go-faster/oteldb/internal/logql/logqlengine" - "github.com/go-faster/oteldb/internal/logstorage" "github.com/go-faster/oteldb/internal/lokiapi" "github.com/go-faster/oteldb/internal/lokihandler" "github.com/go-faster/oteldb/internal/otelreceiver" @@ -27,43 +30,43 @@ import ( "github.com/go-faster/oteldb/internal/tempoapi" "github.com/go-faster/oteldb/internal/tempohandler" "github.com/go-faster/oteldb/internal/traceql/traceqlengine" - "github.com/go-faster/oteldb/internal/tracestorage" ) // App contains application dependencies and services. type App struct { - services map[string]func(context.Context) error + cfg Config + services map[string]func(context.Context) error otelStorage lg *zap.Logger metrics *sdkapp.Metrics } -func newApp(ctx context.Context, m *sdkapp.Metrics) (_ *App, err error) { - var ( - storageType = strings.ToLower(os.Getenv("OTELDB_STORAGE")) - lg = zctx.From(ctx) - app = &App{ - services: map[string]func(context.Context) error{}, - lg: lg, - metrics: m, - } - ) +func newApp(ctx context.Context, cfg Config, m *sdkapp.Metrics) (_ *App, err error) { + cfg.setDefaults() + + app := &App{ + cfg: cfg, + services: map[string]func(context.Context) error{}, + lg: zctx.From(ctx), + metrics: m, + } - switch storageType { - case "ch": - store, err := setupCH(ctx, os.Getenv("CH_DSN"), lg, m) + { + dsn := os.Getenv("CH_DSN") + if dsn == "" { + dsn = cfg.DSN + } + store, err := setupCH(ctx, dsn, app.lg, m) if err != nil { - return nil, errors.Wrapf(err, "create storage %q", storageType) + return nil, errors.Wrapf(err, "create storage") } app.otelStorage = store - default: - return nil, errors.Errorf("unknown storage %q", storageType) } - if err := app.setupReceiver(); err != nil { - return nil, errors.Wrap(err, "otelreceiver") + if err := app.setupCollector(); err != nil { + return nil, errors.Wrap(err, "otelcol") } if err := app.trySetupTempo(); err != nil { return nil, errors.Wrap(err, "tempo") @@ -141,6 +144,8 @@ func (app *App) trySetupTempo() error { if q == nil { return nil } + cfg := app.cfg.Tempo + cfg.setDefaults() engine := traceqlengine.NewEngine(app.traceQuerier, traceqlengine.Options{ TracerProvider: app.metrics.TracerProvider(), @@ -155,7 +160,7 @@ func (app *App) trySetupTempo() error { return err } - addOgen[tempoapi.Route](app, "tempo", s, ":3200") + addOgen[tempoapi.Route](app, "tempo", s, cfg.Bind) return nil } @@ -164,12 +169,15 @@ func (app *App) trySetupLoki() error { if q == nil { return nil } + cfg := app.cfg.LokiConfig + cfg.setDefaults() engine := logqlengine.NewEngine(q, logqlengine.Options{ - TracerProvider: app.metrics.TracerProvider(), ParseOptions: logql.ParseOptions{ AllowDots: true, }, + LookbackDuration: cfg.LookbackDelta, + TracerProvider: app.metrics.TracerProvider(), }) loki := lokihandler.NewLokiAPI(q, engine) @@ -181,7 +189,7 @@ func (app *App) trySetupLoki() error { return err } - addOgen[lokiapi.Route](app, "loki", s, ":3100") + addOgen[lokiapi.Route](app, "loki", s, cfg.Bind) return nil } @@ -190,15 +198,18 @@ func (app *App) trySetupProm() error { if q == nil { return nil } + cfg := app.cfg.Prometheus + cfg.setDefaults() engine := promql.NewEngine(promql.EngineOpts{ - // These fields are zero by default, which makes + // NOTE: zero-value MaxSamples and Timeout makes // all queries to fail with error. - // - // TODO(tdakkota): make configurable. - Timeout: time.Minute, - MaxSamples: 1_000_000, - EnableNegativeOffset: true, + MaxSamples: cfg.MaxSamples, + Timeout: cfg.Timeout, + LookbackDelta: cfg.LookbackDelta, + EnableAtModifier: cfg.EnableAtModifier, + EnableNegativeOffset: *cfg.EnableNegativeOffset, + EnablePerStepStats: cfg.EnablePerStepStats, }) prom := promhandler.NewPromAPI(engine, q, q, promhandler.PromAPIOptions{}) @@ -211,36 +222,44 @@ func (app *App) trySetupProm() error { return err } - addOgen[promapi.Route](app, "prom", s, ":9090") + addOgen[promapi.Route](app, "prom", s, cfg.Bind) return nil } -func (app *App) setupReceiver() error { - var consumers otelreceiver.Consumers - if i := app.traceInserter; i != nil { - consumers.Traces = tracestorage.NewConsumer(i) - } - if i := app.logInserter; i != nil { - consumers.Logs = logstorage.NewConsumer(i) - } - if c := app.metricsConsumer; c != nil { - consumers.Metrics = c +func (app *App) setupCollector() error { + conf, err := otelcol.NewConfigProvider(otelcol.ConfigProviderSettings{ + ResolverSettings: confmap.ResolverSettings{ + URIs: []string{"oteldb:/"}, + Providers: map[string]confmap.Provider{ + "oteldb": otelreceiver.NewMapProvider("oteldb", app.cfg.Collector), + }, + Converters: []confmap.Converter{ + expandconverter.New(), + }, + }, + }) + if err != nil { + return errors.Wrap(err, "create otelcol config provider") } - recv, err := otelreceiver.NewReceiver( - consumers, - otelreceiver.ReceiverConfig{ - Logger: app.lg.Named("receiver"), - TracerProvider: app.metrics.TracerProvider(), - MeterProvider: app.metrics.MeterProvider(), + col, err := otelcol.NewCollector(otelcol.CollectorSettings{ + Factories: otelreceiver.Factories, + BuildInfo: component.NewDefaultBuildInfo(), + LoggingOptions: []zap.Option{ + zap.WrapCore(func(zapcore.Core) zapcore.Core { + return app.lg.Core() + }), }, - ) + DisableGracefulShutdown: false, + ConfigProvider: conf, + SkipSettingGRPCLogger: false, + }) if err != nil { - return errors.Wrap(err, "create OTEL receiver") + return errors.Wrap(err, "create collector") } - app.services["otelreceiver"] = func(ctx context.Context) error { - return recv.Run(ctx) + app.services["otelcol"] = func(ctx context.Context) error { + return col.Run(ctx) } return nil } @@ -256,18 +275,3 @@ func (app *App) Run(ctx context.Context) error { } return g.Wait() } - -type logQuerier interface { - logstorage.Querier - logqlengine.Querier -} - -type traceQuerier interface { - tracestorage.Querier - traceqlengine.Querier -} - -type metricQuerier interface { - storage.Queryable - storage.ExemplarQueryable -} diff --git a/cmd/oteldb/config.go b/cmd/oteldb/config.go new file mode 100644 index 00000000..c416db2c --- /dev/null +++ b/cmd/oteldb/config.go @@ -0,0 +1,137 @@ +package main + +import ( + "os" + "path/filepath" + "time" + + "github.com/go-faster/yaml" + "golang.org/x/exp/maps" +) + +// Config is a oteldb config. +type Config struct { + DSN string `json:"dsn" yaml:"dsn"` + Tempo TempoConfig `json:"tempo" yaml:"tempo"` + Prometheus PrometheusConfig `json:"prometheus" yaml:"prometheus"` + LokiConfig LokiConfig `json:"loki" yaml:"loki"` + + // Collector is an otelcol config. + Collector map[string]any `json:"otelcol" yaml:"otelcol"` +} + +func (cfg *Config) setDefaults() { + if cfg.DSN == "" { + cfg.DSN = "clickhouse://localhost:9000" + } + if cfg.Collector == nil { + var ( + receivers = map[string]any{ + "otlp": map[string]any{ + "protocols": map[string]any{ + "grpc": nil, + "http": nil, + }, + }, + "prometheusremotewrite": map[string]any{}, + } + receiverNames = maps.Keys(receivers) + + pipelines = map[string]any{} + ) + for _, name := range []string{ + "traces", + "metrics", + "logs", + } { + pipelines[name] = map[string]any{ + "receivers": receiverNames, + "exporters": []string{"oteldbexporter"}, + } + } + + cfg.Collector = map[string]any{ + "receivers": receivers, + "exporters": map[string]any{ + "oteldbexporter": map[string]any{ + "dsn": cfg.DSN, + }, + }, + "service": map[string]any{ + "pipelines": pipelines, + }, + } + } +} + +func loadConfig(name string) (cfg Config, _ error) { + if name == "" { + name = "oteldb.yml" + if _, err := os.Stat(name); err != nil { + return cfg, nil + } + } + + data, err := os.ReadFile(filepath.Clean(name)) + if err != nil { + return cfg, err + } + + if err := yaml.Unmarshal(data, &cfg); err != nil { + return cfg, err + } + + return cfg, nil +} + +// TempoConfig is Tempo API config. +type TempoConfig struct { + Bind string `json:"bind" yaml:"bind"` +} + +func (cfg *TempoConfig) setDefaults() { + if cfg.Bind == "" { + cfg.Bind = ":3200" + } +} + +// PrometheusConfig is Prometheus API config. +type PrometheusConfig struct { + Bind string `json:"bind" yaml:"bind"` + MaxSamples int `json:"max_samples" yaml:"max_samples"` + Timeout time.Duration `json:"timeout" yaml:"timeout"` + LookbackDelta time.Duration `json:"lookback_delta" yaml:"lookback_delta"` + EnableAtModifier bool `json:"enable_at_modifier" yaml:"enable_at_modifier"` + EnableNegativeOffset *bool `json:"enable_negative_offset" yaml:"enable_negative_offset"` + EnablePerStepStats bool `json:"enable_per_step_stats" yaml:"enable_per_step_stats"` +} + +func (cfg *PrometheusConfig) setDefaults() { + if cfg.Bind == "" { + cfg.Bind = ":9090" + } + if cfg.MaxSamples == 0 { + cfg.MaxSamples = 1_000_000 + } + if cfg.Timeout == 0 { + cfg.Timeout = time.Minute + } + setBool := func(p **bool, defaultValue bool) { + if *p == nil { + *p = &defaultValue + } + } + setBool(&cfg.EnableNegativeOffset, true) +} + +// LokiConfig is Loki API config. +type LokiConfig struct { + Bind string `json:"bind" yaml:"bind"` + LookbackDelta time.Duration `json:"lookback_delta" yaml:"lookback_delta"` +} + +func (cfg *LokiConfig) setDefaults() { + if cfg.Bind == "" { + cfg.Bind = ":3100" + } +} diff --git a/cmd/oteldb/oteldb.go b/cmd/oteldb/oteldb.go index e37e6466..8d82ac2a 100644 --- a/cmd/oteldb/oteldb.go +++ b/cmd/oteldb/oteldb.go @@ -2,6 +2,8 @@ package main import ( "context" + "flag" + "os" "github.com/go-faster/errors" "github.com/go-faster/sdk/app" @@ -23,7 +25,19 @@ func main() { if ctx, err = autologs.Setup(ctx, m); err != nil { return errors.Wrap(err, "setup logs") } - root, err := newApp(ctx, m) + + set := flag.NewFlagSet(os.Args[0], flag.ContinueOnError) + cfgPath := set.String("config", "", "Path to config (defaults to oteldb.yml)") + if err := set.Parse(os.Args[1:]); err != nil { + return err + } + + cfg, err := loadConfig(*cfgPath) + if err != nil { + return errors.Wrap(err, "load config") + } + + root, err := newApp(ctx, cfg, m) if err != nil { return errors.Wrap(err, "setup") } diff --git a/cmd/oteldb/storage.go b/cmd/oteldb/storage.go index 7e76923b..0c8cf84f 100644 --- a/cmd/oteldb/storage.go +++ b/cmd/oteldb/storage.go @@ -2,34 +2,38 @@ package main import ( "context" - "net/url" - "os" - "strings" - "time" - "github.com/ClickHouse/ch-go" - "github.com/ClickHouse/ch-go/chpool" - "github.com/cenkalti/backoff/v4" "github.com/go-faster/errors" "github.com/go-faster/sdk/app" + "github.com/prometheus/prometheus/storage" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "github.com/go-faster/oteldb/internal/chstorage" + "github.com/go-faster/oteldb/internal/logql/logqlengine" "github.com/go-faster/oteldb/internal/logstorage" - "github.com/go-faster/oteldb/internal/otelreceiver" + "github.com/go-faster/oteldb/internal/traceql/traceqlengine" "github.com/go-faster/oteldb/internal/tracestorage" ) type otelStorage struct { - logQuerier logQuerier - logInserter logstorage.Inserter + logQuerier logQuerier + traceQuerier traceQuerier + metricsQuerier metricQuerier +} + +type logQuerier interface { + logstorage.Querier + logqlengine.Querier +} - traceQuerier traceQuerier - traceInserter tracestorage.Inserter +type traceQuerier interface { + tracestorage.Querier + traceqlengine.Querier +} - metricsQuerier metricQuerier - metricsConsumer otelreceiver.MetricsConsumer +type metricQuerier interface { + storage.Queryable + storage.ExemplarQueryable } func setupCH( @@ -38,68 +42,15 @@ func setupCH( lg *zap.Logger, m *app.Metrics, ) (store otelStorage, _ error) { - u, err := url.Parse(dsn) - if err != nil { - return store, errors.Wrap(err, "parse DSN") - } - - pass, _ := u.User.Password() - chLogger := lg.Named("ch") - { - var lvl zapcore.Level - if v := os.Getenv("CH_LOG_LEVEL"); v != "" { - if err := lvl.UnmarshalText([]byte(v)); err != nil { - return store, errors.Wrap(err, "parse log level") - } - } else { - lvl = lg.Level() - } - chLogger = chLogger.WithOptions(zap.IncreaseLevel(lvl)) - } - opts := ch.Options{ - Logger: chLogger, - Address: u.Host, - Database: strings.TrimPrefix(u.Path, "/"), - User: u.User.Username(), - Password: pass, - MeterProvider: m.MeterProvider(), - TracerProvider: m.TracerProvider(), - - // Capture query body and other parameters. - OpenTelemetryInstrumentation: true, - } - - // First thing that every Yandex employee do is forgetting how to setup - // a docker liveness probe. - connectBackoff := backoff.NewExponentialBackOff() - connectBackoff.InitialInterval = 2 * time.Second - connectBackoff.MaxElapsedTime = time.Minute - c, err := backoff.RetryWithData(func() (*chpool.Pool, error) { - c, err := chpool.Dial(ctx, chpool.Options{ - ClientOptions: opts, - }) - if err != nil { - return nil, errors.Wrap(err, "dial") - } - return c, nil - }, connectBackoff) - if err != nil { - return store, errors.Wrap(err, "migrate") - } - - tables := chstorage.DefaultTables() - if err := tables.Create(ctx, c); err != nil { - return store, errors.Wrap(err, "create tables") - } - - inserter, err := chstorage.NewInserter(c, chstorage.InserterOptions{ - Tables: tables, + c, err := chstorage.Dial(ctx, dsn, chstorage.DialOptions{ MeterProvider: m.MeterProvider(), TracerProvider: m.TracerProvider(), + Logger: lg, }) if err != nil { - return store, errors.Wrap(err, "create inserter") + return store, errors.Wrap(err, "dial clickhouse") } + tables := chstorage.DefaultTables() querier, err := chstorage.NewQuerier(c, chstorage.QuerierOptions{ Tables: tables, @@ -111,11 +62,8 @@ func setupCH( } return otelStorage{ - logQuerier: querier, - logInserter: inserter, - traceQuerier: querier, - traceInserter: inserter, - metricsQuerier: querier, - metricsConsumer: inserter, + logQuerier: querier, + traceQuerier: querier, + metricsQuerier: querier, }, nil } diff --git a/go.mod b/go.mod index c4f7e8c8..e78e4cef 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/go-faster/errors v0.7.1 github.com/go-faster/jx v1.1.0 github.com/go-faster/sdk v0.12.0 + github.com/go-faster/yaml v0.4.6 github.com/go-logfmt/logfmt v0.6.0 github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 @@ -36,9 +37,10 @@ require ( go.opentelemetry.io/collector/confmap v0.91.0 go.opentelemetry.io/collector/consumer v0.91.0 go.opentelemetry.io/collector/exporter v0.91.0 - go.opentelemetry.io/collector/extension v0.91.0 go.opentelemetry.io/collector/otelcol v0.91.0 go.opentelemetry.io/collector/pdata v1.0.0 + go.opentelemetry.io/collector/processor v0.91.0 + go.opentelemetry.io/collector/processor/batchprocessor v0.91.0 go.opentelemetry.io/collector/receiver v0.91.0 go.opentelemetry.io/collector/receiver/otlpreceiver v0.91.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 @@ -95,7 +97,6 @@ require ( github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-faster/city v1.0.1 // indirect - github.com/go-faster/yaml v0.4.6 // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logr/logr v1.3.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -171,9 +172,9 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.91.0 // indirect go.opentelemetry.io/collector/config/internal v0.91.0 // indirect go.opentelemetry.io/collector/connector v0.91.0 // indirect + go.opentelemetry.io/collector/extension v0.91.0 // indirect go.opentelemetry.io/collector/extension/auth v0.91.0 // indirect go.opentelemetry.io/collector/featuregate v1.0.0 // indirect - go.opentelemetry.io/collector/processor v0.91.0 // indirect go.opentelemetry.io/collector/semconv v0.91.0 // indirect go.opentelemetry.io/collector/service v0.91.0 // indirect go.opentelemetry.io/contrib/config v0.1.1 // indirect diff --git a/go.sum b/go.sum index 73175b2a..af08c53c 100644 --- a/go.sum +++ b/go.sum @@ -677,6 +677,8 @@ go.opentelemetry.io/collector/pdata v1.0.0 h1:ECP2jnLztewsHmL1opL8BeMtWVc7/oSlKN go.opentelemetry.io/collector/pdata v1.0.0/go.mod h1:TsDFgs4JLNG7t6x9D8kGswXUz4mme+MyNChHx8zSF6k= go.opentelemetry.io/collector/processor v0.91.0 h1:Xi52gYMXTG4zYmNhsqJ8ly/9f7b0n0crMhKxVVI9HpY= go.opentelemetry.io/collector/processor v0.91.0/go.mod h1:naTuusZNfzM5MSqoTVzkKbR1MaJ8oD8v5ginR5JreDE= +go.opentelemetry.io/collector/processor/batchprocessor v0.91.0 h1:YuPG52D7otNr4lNd8KGsIIBHvawAAaOqGoNTK9799ko= +go.opentelemetry.io/collector/processor/batchprocessor v0.91.0/go.mod h1:U2ZVSMwgr4OsaKKMfvX9OGaurG83zAPKjVdpTgmj0ok= go.opentelemetry.io/collector/receiver v0.91.0 h1:0TZF/0OXoJtxgm+mvOinRRXo9LgVyOsOgCQfWkNGXJA= go.opentelemetry.io/collector/receiver v0.91.0/go.mod h1:d5qo2mpovqKoi47hrMxj5BLdLzOXM0mUHL5CKrjfWNM= go.opentelemetry.io/collector/receiver/otlpreceiver v0.91.0 h1:1Eyc1uR8yr3heKkC4YXFoZip0JqgOXuOiN/tXvl9WUo= diff --git a/integration/prome2e/common_test.go b/integration/prome2e/common_test.go index 244748a4..c69ed6b9 100644 --- a/integration/prome2e/common_test.go +++ b/integration/prome2e/common_test.go @@ -12,14 +12,17 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" "github.com/go-faster/oteldb/integration/prome2e" - "github.com/go-faster/oteldb/internal/otelreceiver" "github.com/go-faster/oteldb/internal/promapi" "github.com/go-faster/oteldb/internal/promhandler" ) -type metricQuerier interface{} +// MetricsConsumer is metrics consumer. +type MetricsConsumer interface { + ConsumeMetrics(ctx context.Context, ld pmetric.Metrics) error +} func readBatchSet(p string) (s prome2e.BatchSet, _ error) { f, err := os.Open(p) @@ -36,7 +39,7 @@ func setupDB( ctx context.Context, t *testing.T, set prome2e.BatchSet, - consumer otelreceiver.MetricsConsumer, + consumer MetricsConsumer, querier storage.Queryable, exemplarQuerier storage.ExemplarQueryable, ) *promapi.Client { @@ -66,7 +69,7 @@ func setupDB( func runTest( ctx context.Context, t *testing.T, - consumer otelreceiver.MetricsConsumer, + consumer MetricsConsumer, querier storage.Queryable, exemplarQuerier storage.ExemplarQueryable, ) { diff --git a/internal/otelreceiver/config_provider.go b/internal/otelreceiver/config_provider.go index 2823b716..0ecae440 100644 --- a/internal/otelreceiver/config_provider.go +++ b/internal/otelreceiver/config_provider.go @@ -10,13 +10,58 @@ var _ confmap.Provider = (*mapProvider)(nil) // mapProvider is a confmap.Provider that returns a single confmap.Retrieved instance with a fixed map. type mapProvider struct { - raw map[string]any + scheme string + raw map[string]any } +// NewMapProvider creates new [mapProvider]. +func NewMapProvider(scheme string, raw map[string]any) confmap.Provider { + return &mapProvider{ + scheme: scheme, + raw: raw, + } +} + +// Retrieve goes to the configuration source and retrieves the selected data which +// contains the value to be injected in the configuration and the corresponding watcher that +// will be used to monitor for updates of the retrieved value. +// +// `uri` must follow the ":" format. This format is compatible +// with the URI definition (see https://datatracker.ietf.org/doc/html/rfc3986). The "" +// must be always included in the `uri`. The "" supported by any provider: +// - MUST consist of a sequence of characters beginning with a letter and followed by any +// combination of letters, digits, plus ("+"), period ("."), or hyphen ("-"). +// See https://datatracker.ietf.org/doc/html/rfc3986#section-3.1. +// - MUST be at least 2 characters long to avoid conflicting with a driver-letter identifier as specified +// in https://tools.ietf.org/id/draft-kerwin-file-scheme-07.html#syntax. +// - For testing, all implementation MUST check that confmaptest.ValidateProviderScheme returns no error. +// +// `watcher` callback is called when the config changes. watcher may be called from +// a different go routine. After watcher is called Retrieved.Get should be called +// to get the new config. See description of Retrieved for more details. +// watcher may be nil, which indicates that the caller is not interested in +// knowing about the changes. +// +// If ctx is canceled should return immediately with an error. +// Should never be called concurrently with itself or with Shutdown. func (m *mapProvider) Retrieve(context.Context, string, confmap.WatcherFunc) (*confmap.Retrieved, error) { return confmap.NewRetrieved(m.raw, []confmap.RetrievedOption{}...) } -func (m *mapProvider) Scheme() string { return "mock" } +// Scheme returns the location scheme used by Retrieve. +func (m *mapProvider) Scheme() string { + return m.scheme +} -func (m *mapProvider) Shutdown(context.Context) error { return nil } +// Shutdown signals that the configuration for which this Provider was used to +// retrieve values is no longer in use and the Provider should close and release +// any resources that it may have created. +// +// This method must be called when the Collector service ends, either in case of +// success or error. Retrieve cannot be called after Shutdown. +// +// Should never be called concurrently with itself or with Retrieve. +// If ctx is canceled should return immediately with an error. +func (m *mapProvider) Shutdown(context.Context) error { + return nil +} diff --git a/internal/otelreceiver/consumers.go b/internal/otelreceiver/consumers.go deleted file mode 100644 index 0ca4d66c..00000000 --- a/internal/otelreceiver/consumers.go +++ /dev/null @@ -1,31 +0,0 @@ -package otelreceiver - -import ( - "context" - - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" -) - -// TracesConsumer is traces consumer. -type TracesConsumer interface { - ConsumeTraces(ctx context.Context, td ptrace.Traces) error -} - -// MetricsConsumer is metrics consumer. -type MetricsConsumer interface { - ConsumeMetrics(ctx context.Context, ld pmetric.Metrics) error -} - -// LogsConsumer is logs consumer. -type LogsConsumer interface { - ConsumeLogs(ctx context.Context, ld plog.Logs) error -} - -// Consumers is a set of telemetry consumers. -type Consumers struct { - Traces TracesConsumer - Metrics MetricsConsumer - Logs LogsConsumer -} diff --git a/internal/otelreceiver/receiver.go b/internal/otelreceiver/receiver.go index 5f3d6b99..9dcc9d92 100644 --- a/internal/otelreceiver/receiver.go +++ b/internal/otelreceiver/receiver.go @@ -2,337 +2,58 @@ package otelreceiver import ( - "context" - "sync" - "time" - "github.com/go-faster/errors" - "github.com/go-faster/sdk/zctx" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/confmap" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/otelcol" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/batchprocessor" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/otlpreceiver" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/trace" - "go.uber.org/multierr" - "go.uber.org/zap" + "github.com/go-faster/oteldb/internal/otelreceiver/oteldbexporter" "github.com/go-faster/oteldb/internal/otelreceiver/prometheusremotewritereceiver" ) -// Receiver is a OpenTelemetry-compatible trace receiver. -type Receiver struct { - receivers []component.Component - - fatal chan struct{} - fatalOnce sync.Once - logger *zap.Logger -} - -var defaultReceivers = map[string]any{ - "otlp": map[string]any{ - "protocols": map[string]any{ - "grpc": nil, - "http": nil, - }, - }, - "prometheusremotewrite": map[string]any{}, -} - -// ReceiverConfig is a config struct for Receiver. -type ReceiverConfig struct { - OTELConfig map[string]any - TracerProvider trace.TracerProvider - MeterProvider metric.MeterProvider - Logger *zap.Logger -} - -func (cfg *ReceiverConfig) setDefaults() { - if cfg.OTELConfig == nil { - cfg.OTELConfig = defaultReceivers - } - if cfg.TracerProvider == nil { - cfg.TracerProvider = otel.GetTracerProvider() - } - if cfg.MeterProvider == nil { - cfg.MeterProvider = otel.GetMeterProvider() - } - if cfg.Logger == nil { - cfg.Logger = zap.NewNop() - } -} - -// NewReceiver setups trace receiver. -func NewReceiver(consumers Consumers, cfg ReceiverConfig) (*Receiver, error) { - cfg.setDefaults() - shim := &Receiver{ - fatal: make(chan struct{}), - logger: cfg.Logger.Named("shim"), - } - - receiverFactories, err := receiver.MakeFactoryMap( +func receiverFactoryMap() (map[component.Type]receiver.Factory, error) { + return receiver.MakeFactoryMap( otlpreceiver.NewFactory(), prometheusremotewritereceiver.NewFactory(), ) - if err != nil { - return nil, err - } - - receiverCfg := cfg.OTELConfig - receivers := make([]string, 0, len(receiverCfg)) - for k := range receiverCfg { - receivers = append(receivers, k) - } - - var ( - tracesConsumer consumer.Traces - metricsConsumer consumer.Metrics - logsConsumer consumer.Logs +} - pipelines = map[string]any{} +func processorFactoryMap() (map[component.Type]processor.Factory, error) { + return processor.MakeFactoryMap( + batchprocessor.NewFactory(), ) - if impl := consumers.Traces; impl != nil { - tracesConsumer, err = consumer.NewTraces(func(ctx context.Context, ld ptrace.Traces) error { - if err := impl.ConsumeTraces(ctx, ld); err != nil { - zctx.From(ctx).Error("Consume traces", zap.Error(err)) - } - return nil - }) - if err != nil { - return nil, errors.Wrap(err, "create traces consumer") - } - - pipelines["traces"] = map[string]any{ - "exporters": []string{"nop"}, // nop exporter to avoid errors - "receivers": receivers, - } - } - if impl := consumers.Metrics; impl != nil { - metricsConsumer, err = consumer.NewMetrics(func(ctx context.Context, ld pmetric.Metrics) error { - if err := impl.ConsumeMetrics(ctx, ld); err != nil { - zctx.From(ctx).Error("Consume metrics", zap.Error(err)) - } - return nil - }) - if err != nil { - return nil, errors.Wrap(err, "create metrics consumer") - } - - pipelines["metrics"] = map[string]any{ - "exporters": []string{"nop"}, // nop exporter to avoid errors - "receivers": receivers, - } - } - if impl := consumers.Logs; impl != nil { - logsConsumer, err = consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { - if err := impl.ConsumeLogs(ctx, ld); err != nil { - zctx.From(ctx).Error("Consume logs", zap.Error(err)) - } - return nil - }) - if err != nil { - return nil, errors.Wrap(err, "create logs consumer") - } +} - pipelines["logs"] = map[string]any{ - "exporters": []string{"nop"}, // nop exporter to avoid errors - "receivers": receivers, - } - } - if len(pipelines) == 0 { - return nil, errors.New("at least one consumer must be set") - } +func exporterFactoryMap() (map[component.Type]exporter.Factory, error) { + return exporter.MakeFactoryMap( + oteldbexporter.NewFactory(), + ) +} - // Creates a config provider with the given config map. - // The provider will be used to retrieve the actual config for the pipeline (although we only need the receivers). - pro, err := otelcol.NewConfigProvider(otelcol.ConfigProviderSettings{ - ResolverSettings: confmap.ResolverSettings{ - URIs: []string{"mock:/"}, - Providers: map[string]confmap.Provider{"mock": &mapProvider{raw: map[string]any{ - "receivers": receiverCfg, - "exporters": map[string]any{ - "nop": map[string]any{}, - }, - "service": map[string]any{ - "pipelines": pipelines, - }, - }}}, - }, - }) +// Factories returns oteldb factories list. +func Factories() (f otelcol.Factories, _ error) { + receivers, err := receiverFactoryMap() if err != nil { - return nil, err + return f, errors.Wrap(err, "get receiver factory map") } - // Creates the configuration for the pipeline. - // We only need the receivers, the rest of the configuration is not used. - conf, err := pro.Get(context.Background(), otelcol.Factories{ - Receivers: receiverFactories, - Exporters: map[component.Type]exporter.Factory{"nop": exportertest.NewNopFactory()}, // nop exporter to avoid errors - }) + processors, err := processorFactoryMap() if err != nil { - return nil, err + return f, errors.Wrap(err, "get processor factory map") } - ctx := context.Background() - for componentID, componentCfg := range conf.Receivers { - factoryBase := receiverFactories[componentID.Type()] - if factoryBase == nil { - return nil, errors.Errorf("receiver factory not found for type: %s", componentID.Type()) - } - - // Make sure that the headers are added to context. Required for Authentication. - if componentID.Type() == "otlp" { - otlpRecvCfg := componentCfg.(*otlpreceiver.Config) - - if otlpRecvCfg.HTTP != nil { - otlpRecvCfg.HTTP.IncludeMetadata = true - componentCfg = otlpRecvCfg - } - } - - logger := cfg.Logger - if name := string(componentID.Type()); name != "" { - logger = logger.Named(name) - } - - if c := tracesConsumer; c != nil { - lg := logger.Named("traces") - params := receiver.CreateSettings{ - TelemetrySettings: component.TelemetrySettings{ - Logger: lg, - TracerProvider: cfg.TracerProvider, - MeterProvider: cfg.MeterProvider, - ReportComponentStatus: logStatusEvent(lg), - }, - } - - recv, err := factoryBase.CreateTracesReceiver(ctx, params, componentCfg, c) - switch { - case errors.Is(err, component.ErrDataTypeIsNotSupported): - case err != nil: - return nil, errors.Wrap(err, "create traces receiver") - default: - shim.receivers = append(shim.receivers, recv) - } - } - if c := metricsConsumer; c != nil { - lg := logger.Named("metrics") - params := receiver.CreateSettings{ - TelemetrySettings: component.TelemetrySettings{ - Logger: lg, - TracerProvider: cfg.TracerProvider, - MeterProvider: cfg.MeterProvider, - ReportComponentStatus: logStatusEvent(lg), - }, - } - - recv, err := factoryBase.CreateMetricsReceiver(ctx, params, componentCfg, c) - switch { - case errors.Is(err, component.ErrDataTypeIsNotSupported): - case err != nil: - return nil, errors.Wrap(err, "create traces receiver") - default: - shim.receivers = append(shim.receivers, recv) - } - } - if c := logsConsumer; c != nil { - lg := logger.Named("logs") - params := receiver.CreateSettings{ - TelemetrySettings: component.TelemetrySettings{ - Logger: lg, - TracerProvider: cfg.TracerProvider, - MeterProvider: cfg.MeterProvider, - ReportComponentStatus: logStatusEvent(lg), - }, - } - - recv, err := factoryBase.CreateLogsReceiver(ctx, params, componentCfg, c) - switch { - case errors.Is(err, component.ErrDataTypeIsNotSupported): - case err != nil: - return nil, errors.Wrap(err, "create traces receiver") - default: - shim.receivers = append(shim.receivers, recv) - } - } - } - - return shim, nil -} - -// Run setups corresponding listeners. -func (r *Receiver) Run(ctx context.Context) (rerr error) { - var running []component.Component - defer func() { - multierr.AppendInto(&rerr, shutdown(running)) - }() - - for _, recv := range r.receivers { - if err := recv.Start(ctx, r); err != nil { - return err - } - running = append(running, recv) - } - - select { - case <-ctx.Done(): - case <-r.fatal: - } - - return nil -} - -func shutdown(receivers []component.Component) error { - shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - - var errs []error - for _, r := range receivers { - if err := r.Shutdown(shutdownCtx); err != nil { - errs = append(errs, err) - } + exporters, err := exporterFactoryMap() + if err != nil { + return f, errors.Wrap(err, "get exporter factory map") } - return multierr.Combine(errs...) -} - -// ReportFatalError implements component.Host -func (r *Receiver) ReportFatalError(err error) { - r.logger.Error("Fatal receiver error", zap.Error(err)) - r.fatalOnce.Do(func() { - close(r.fatal) - }) -} - -// GetFactory implements component.Host -func (r *Receiver) GetFactory(component.Kind, component.Type) component.Factory { - return nil -} - -// GetExtensions implements component.Host -func (r *Receiver) GetExtensions() map[component.ID]extension.Extension { return nil } - -// GetExporters implements component.Host -func (r *Receiver) GetExporters() map[component.DataType]map[component.ID]component.Component { - return nil -} - -func logStatusEvent(lg *zap.Logger) component.StatusFunc { - return func(se *component.StatusEvent) error { - lg.Info("Status change", - zap.Time("changed_at", se.Timestamp()), - zap.Stringer("status", se.Status()), - zap.Error(se.Err()), - ) - return nil - } + return otelcol.Factories{ + Receivers: receivers, + Processors: processors, + Exporters: exporters, + }, nil }