Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenTelemetry Protocol with Apache Arrow Receiver component #32015

Merged
merged 28 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c7caed8
Raw import @ c86a6c7
jmacd Mar 27, 2024
c0d325f
lint
jmacd Mar 27, 2024
1c3d4c3
ArrowSettings -> ArrowConfig
jmacd Mar 27, 2024
e6556ed
ConfigValidator update
jmacd Mar 27, 2024
0835778
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Mar 27, 2024
b201147
tiny revert
jmacd Mar 27, 2024
af2fbbf
tiny revert2
jmacd Mar 27, 2024
a64bdee
tiny revert3
jmacd Mar 27, 2024
35d5e46
chlog
jmacd Mar 27, 2024
bbd31a3
chlog
jmacd Mar 27, 2024
df250fd
tidy
jmacd Mar 27, 2024
6c22cfb
restore deps
jmacd Mar 28, 2024
c4bc81a
versions
jmacd Mar 28, 2024
00f5f07
crosslink
jmacd Mar 28, 2024
7eedbb8
gen
jmacd Mar 28, 2024
1b34d24
GOWORK=off
jmacd Mar 28, 2024
a25f09e
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Apr 22, 2024
c73fd92
from otel-arrow@14c63d1eaac7c53585e6b9195d09f1f9703869ed
jmacd Apr 22, 2024
f00f6b3
from codeboten
jmacd Apr 22, 2024
456444e
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Apr 22, 2024
9fd6d58
remove two
jmacd Apr 22, 2024
435ab69
Update cmd/otelcontribcol/go.mod
codeboten Apr 23, 2024
ea02f75
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Apr 23, 2024
4f6452a
make tidygenetc
jmacd Apr 23, 2024
42e95ab
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Apr 23, 2024
2c995b8
update go.mod toolchain
codeboten Apr 24, 2024
af13423
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Apr 24, 2024
04b9782
Merge branch 'jmacd/arrow_receiver' of github.com:jmacd/opentelemetry…
jmacd Apr 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/otelarrowreceiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: OpenTelemetry Protocol with Apache Arrow Receiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implementation copied from opentelemetry/otel-arrow repository @v0.20.0.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26491]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 1 addition & 1 deletion cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/cmd/otelcontrib

go 1.21.0

toolchain go1.21.9
toolchain go1.22.2
codeboten marked this conversation as resolved.
Show resolved Hide resolved

require (
github.com/open-telemetry/opentelemetry-collector-contrib/connector/countconnector v0.99.0
Expand Down
17 changes: 5 additions & 12 deletions receiver/otelarrowreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
// Protocols is the configuration for the supported protocols.
type Protocols struct {
GRPC configgrpc.ServerConfig `mapstructure:"grpc"`
Arrow ArrowSettings `mapstructure:"arrow"`
Arrow ArrowConfig `mapstructure:"arrow"`
}

// ArrowSettings support configuring the Arrow receiver.
type ArrowSettings struct {
// ArrowConfig support configuring the Arrow receiver.
type ArrowConfig struct {
// MemoryLimitMiB is the size of a shared memory region used
// by all Arrow streams, in MiB. When too much load is
// passing through, they will see ResourceExhausted errors.
Expand All @@ -35,16 +35,9 @@ type Config struct {
}

var _ component.Config = (*Config)(nil)
var _ component.ConfigValidator = (*ArrowConfig)(nil)

// Validate checks the receiver configuration is valid
func (cfg *Config) Validate() error {
if err := cfg.Arrow.Validate(); err != nil {
return err
}
return nil
}

func (cfg *ArrowSettings) Validate() error {
func (cfg *ArrowConfig) Validate() error {
if err := cfg.Zstd.Validate(); err != nil {
return fmt.Errorf("zstd decoder: invalid configuration: %w", err)
}
Expand Down
20 changes: 13 additions & 7 deletions receiver/otelarrowreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"testing"
"time"

"github.com/open-telemetry/otel-arrow/collector/compression/zstd"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -77,11 +76,8 @@ func TestUnmarshalConfig(t *testing.T) {
},
},
},
Arrow: ArrowSettings{
Arrow: ArrowConfig{
MemoryLimitMiB: 123,
Zstd: zstd.DecoderConfig{
MemoryLimitMiB: 8,
},
},
},
}, cfg)
Expand All @@ -104,13 +100,21 @@ func TestUnmarshalConfigUnix(t *testing.T) {
},
ReadBufferSize: 512 * 1024,
},
Arrow: ArrowSettings{
Arrow: ArrowConfig{
MemoryLimitMiB: defaultMemoryLimitMiB,
},
},
}, cfg)
}

func TestUnmarshalConfigTypoDefaultProtocol(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "typo_default_proto_config.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.EqualError(t, component.UnmarshalConfig(cm, cfg), "1 error(s) decoding:\n\n* 'protocols' has invalid keys: htttp")
}

func TestUnmarshalConfigInvalidProtocol(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "bad_proto_config.yaml"))
require.NoError(t, err)
Expand All @@ -121,5 +125,7 @@ func TestUnmarshalConfigInvalidProtocol(t *testing.T) {

func TestUnmarshalConfigNoProtocols(t *testing.T) {
cfg := Config{}
assert.Error(t, component.ValidateConfig(cfg))
// This now produces an error due to breaking change.
// https://github.com/open-telemetry/opentelemetry-collector/pull/9385
assert.ErrorContains(t, component.ValidateConfig(cfg), "invalid transport type")
}
33 changes: 15 additions & 18 deletions receiver/otelarrowreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ package otelarrowreceiver // import "github.com/open-telemetry/opentelemetry-col
import (
"context"

"github.com/open-telemetry/otel-arrow/collector/sharedcomponent"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metadata"
)

Expand Down Expand Up @@ -44,7 +44,7 @@ func createDefaultConfig() component.Config {
// We almost write 0 bytes, so no need to tune WriteBufferSize.
ReadBufferSize: 512 * 1024,
},
Arrow: ArrowSettings{
Arrow: ArrowConfig{
MemoryLimitMiB: defaultMemoryLimitMiB,
},
},
Expand All @@ -57,17 +57,16 @@ func createTraces(
set receiver.CreateSettings,
cfg component.Config,
nextConsumer consumer.Traces,
) (_ receiver.Traces, err error) {
) (receiver.Traces, error) {
oCfg := cfg.(*Config)
r := receivers.GetOrAdd(oCfg, func() (comp component.Component) {
comp, err = newOTelArrowReceiver(oCfg, set)
return
r, err := receivers.GetOrAdd(oCfg, func() (*otelArrowReceiver, error) {
return newOTelArrowReceiver(oCfg, set)
})
if err != nil {
return nil, err
}

r.Unwrap().(*otelArrowReceiver).registerTraceConsumer(nextConsumer)
r.Unwrap().registerTraceConsumer(nextConsumer)
return r, nil
}

Expand All @@ -77,17 +76,16 @@ func createMetrics(
set receiver.CreateSettings,
cfg component.Config,
consumer consumer.Metrics,
) (_ receiver.Metrics, err error) {
) (receiver.Metrics, error) {
oCfg := cfg.(*Config)
r := receivers.GetOrAdd(oCfg, func() (comp component.Component) {
comp, err = newOTelArrowReceiver(oCfg, set)
return comp
r, err := receivers.GetOrAdd(oCfg, func() (*otelArrowReceiver, error) {
return newOTelArrowReceiver(oCfg, set)
})
if err != nil {
return nil, err
}

r.Unwrap().(*otelArrowReceiver).registerMetricsConsumer(consumer)
r.Unwrap().registerMetricsConsumer(consumer)
return r, nil
}

Expand All @@ -97,17 +95,16 @@ func createLog(
set receiver.CreateSettings,
cfg component.Config,
consumer consumer.Logs,
) (_ receiver.Logs, err error) {
) (receiver.Logs, error) {
oCfg := cfg.(*Config)
r := receivers.GetOrAdd(oCfg, func() (comp component.Component) {
comp, err = newOTelArrowReceiver(oCfg, set)
return comp
r, err := receivers.GetOrAdd(oCfg, func() (*otelArrowReceiver, error) {
return newOTelArrowReceiver(oCfg, set)
})
if err != nil {
return nil, err
}

r.Unwrap().(*otelArrowReceiver).registerLogsConsumer(consumer)
r.Unwrap().registerLogsConsumer(consumer)
return r, nil
}

Expand All @@ -117,4 +114,4 @@ func createLog(
// create separate objects, they must use one otelArrowReceiver object per configuration.
// When the receiver is shutdown it should be removed from this map so the same configuration
// can be recreated successfully.
var receivers = sharedcomponent.NewSharedComponents()
var receivers = sharedcomponent.NewSharedComponents[*Config, *otelArrowReceiver]()
16 changes: 7 additions & 9 deletions receiver/otelarrowreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,29 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelar
go 1.21.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.99.0
github.com/open-telemetry/otel-arrow v0.18.0
github.com/open-telemetry/otel-arrow/collector v0.21.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector v0.99.0
go.opentelemetry.io/collector/component v0.99.0
go.opentelemetry.io/collector/config/configauth v0.99.0
go.opentelemetry.io/collector/config/configgrpc v0.99.0
go.opentelemetry.io/collector/config/confignet v0.99.0
go.opentelemetry.io/collector/config/configtelemetry v0.99.0
go.opentelemetry.io/collector/config/configtls v0.99.0
go.opentelemetry.io/collector/confmap v0.99.0
go.opentelemetry.io/collector/consumer v0.99.0
go.opentelemetry.io/collector/extension/auth v0.99.0
go.opentelemetry.io/collector/pdata v1.6.0
go.opentelemetry.io/collector/receiver v0.99.0
go.opentelemetry.io/otel v1.25.0
go.opentelemetry.io/otel/metric v1.25.0
go.opentelemetry.io/otel/trace v1.25.0
go.uber.org/goleak v1.3.0
go.uber.org/mock v0.4.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/net v0.23.0
google.golang.org/grpc v1.63.2
)

Expand Down Expand Up @@ -61,23 +67,17 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opentelemetry.io/collector v0.99.0 // indirect
go.opentelemetry.io/collector/config/configauth v0.99.0 // indirect
go.opentelemetry.io/collector/config/configcompression v1.6.0 // indirect
go.opentelemetry.io/collector/config/configopaque v1.6.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.99.0 // indirect
go.opentelemetry.io/collector/config/internal v0.99.0 // indirect
go.opentelemetry.io/collector/exporter v0.99.0 // indirect
go.opentelemetry.io/collector/extension v0.99.0 // indirect
go.opentelemetry.io/collector/featuregate v1.6.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.50.0 // indirect
go.opentelemetry.io/otel v1.25.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.47.0 // indirect
go.opentelemetry.io/otel/sdk v1.25.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.25.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.14.0 // indirect
Expand All @@ -86,5 +86,3 @@ require (
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent
2 changes: 2 additions & 0 deletions receiver/otelarrowreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading