Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenTelemetry Protocol with Apache Arrow Receiver component #32015

Merged
merged 28 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c7caed8
Raw import @ c86a6c7
jmacd Mar 27, 2024
c0d325f
lint
jmacd Mar 27, 2024
1c3d4c3
ArrowSettings -> ArrowConfig
jmacd Mar 27, 2024
e6556ed
ConfigValidator update
jmacd Mar 27, 2024
0835778
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Mar 27, 2024
b201147
tiny revert
jmacd Mar 27, 2024
af2fbbf
tiny revert2
jmacd Mar 27, 2024
a64bdee
tiny revert3
jmacd Mar 27, 2024
35d5e46
chlog
jmacd Mar 27, 2024
bbd31a3
chlog
jmacd Mar 27, 2024
df250fd
tidy
jmacd Mar 27, 2024
6c22cfb
restore deps
jmacd Mar 28, 2024
c4bc81a
versions
jmacd Mar 28, 2024
00f5f07
crosslink
jmacd Mar 28, 2024
7eedbb8
gen
jmacd Mar 28, 2024
1b34d24
GOWORK=off
jmacd Mar 28, 2024
a25f09e
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Apr 22, 2024
c73fd92
from otel-arrow@14c63d1eaac7c53585e6b9195d09f1f9703869ed
jmacd Apr 22, 2024
f00f6b3
from codeboten
jmacd Apr 22, 2024
456444e
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Apr 22, 2024
9fd6d58
remove two
jmacd Apr 22, 2024
435ab69
Update cmd/otelcontribcol/go.mod
codeboten Apr 23, 2024
ea02f75
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Apr 23, 2024
4f6452a
make tidygenetc
jmacd Apr 23, 2024
42e95ab
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Apr 23, 2024
2c995b8
update go.mod toolchain
codeboten Apr 24, 2024
af13423
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collec…
jmacd Apr 24, 2024
04b9782
Merge branch 'jmacd/arrow_receiver' of github.com:jmacd/opentelemetry…
jmacd Apr 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Raw import @ c86a6c7
  • Loading branch information
jmacd committed Mar 27, 2024
commit c7caed8e4ef6a51e0935c0d4ea827a08a6b67c98
21 changes: 20 additions & 1 deletion receiver/otelarrowreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,19 @@ import (
"github.com/open-telemetry/otel-arrow/collector/compression/zstd"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/confmap"
)

const (
// Confmap values.
protoGRPC = "protocols::grpc"
protoArrowMemoryLimitMiB = "protocols::arrow::memory_limit_mib"
)

// Protocols is the configuration for the supported protocols.
type Protocols struct {
GRPC configgrpc.ServerConfig `mapstructure:"grpc"`
Arrow ArrowSettings `mapstructure:"arrow"`
Arrow ArrowSettings `mapstructure:"arrow"`
}

// ArrowSettings support configuring the Arrow receiver.
Expand All @@ -35,6 +42,7 @@ type Config struct {
}

var _ component.Config = (*Config)(nil)
var _ confmap.Unmarshaler = (*Config)(nil)

// Validate checks the receiver configuration is valid
func (cfg *Config) Validate() error {
Expand All @@ -50,3 +58,14 @@ func (cfg *ArrowSettings) Validate() error {
}
return nil
}

// Unmarshal a confmap.Conf into the config struct.
func (cfg *Config) Unmarshal(conf *confmap.Conf) error {
// first load the config normally
err := conf.Unmarshal(cfg)
if err != nil {
return err
}

return nil
}
25 changes: 16 additions & 9 deletions receiver/otelarrowreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"testing"
"time"

"github.com/open-telemetry/otel-arrow/collector/compression/zstd"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confignet"
Expand Down Expand Up @@ -51,10 +51,10 @@ func TestUnmarshalConfig(t *testing.T) {
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: "0.0.0.0:4317",
Transport: confignet.TransportTypeTCP,
Transport: "tcp",
jmacd marked this conversation as resolved.
Show resolved Hide resolved
},
TLSSetting: &configtls.ServerConfig{
TLSSetting: configtls.Config{
TLSSetting: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "test.crt",
KeyFile: "test.key",
},
Expand All @@ -79,9 +79,6 @@ func TestUnmarshalConfig(t *testing.T) {
},
Arrow: ArrowSettings{
MemoryLimitMiB: 123,
Zstd: zstd.DecoderConfig{
MemoryLimitMiB: 8,
},
},
},
}, cfg)
Expand All @@ -100,7 +97,7 @@ func TestUnmarshalConfigUnix(t *testing.T) {
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: "/tmp/grpc_otlp.sock",
Transport: confignet.TransportTypeUnix,
Transport: "unix",
jmacd marked this conversation as resolved.
Show resolved Hide resolved
},
ReadBufferSize: 512 * 1024,
},
Expand All @@ -111,6 +108,14 @@ func TestUnmarshalConfigUnix(t *testing.T) {
}, cfg)
}

func TestUnmarshalConfigTypoDefaultProtocol(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "typo_default_proto_config.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.EqualError(t, component.UnmarshalConfig(cm, cfg), "1 error(s) decoding:\n\n* 'protocols' has invalid keys: htttp")
}

func TestUnmarshalConfigInvalidProtocol(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "bad_proto_config.yaml"))
require.NoError(t, err)
Expand All @@ -121,5 +126,7 @@ func TestUnmarshalConfigInvalidProtocol(t *testing.T) {

func TestUnmarshalConfigNoProtocols(t *testing.T) {
cfg := Config{}
assert.Error(t, component.ValidateConfig(cfg))
// This now produces an error due to breaking change.
// https://github.com/open-telemetry/opentelemetry-collector/pull/9385
assert.ErrorContains(t, component.ValidateConfig(cfg), "invalid transport type")
}
46 changes: 24 additions & 22 deletions receiver/otelarrowreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ package otelarrowreceiver // import "github.com/open-telemetry/opentelemetry-col
import (
"context"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metadata"
"github.com/open-telemetry/otel-arrow/collector/sharedcomponent"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metadata"
)

const (
Expand All @@ -22,10 +21,10 @@ const (
defaultMemoryLimitMiB = 128
)

// NewFactory creates a new OTel-Arrow receiver factory.
// NewFactory creates a new OTLP receiver factory.
jmacd marked this conversation as resolved.
Show resolved Hide resolved
func NewFactory() receiver.Factory {
return receiver.NewFactory(
metadata.Type,
component.MustNewType(metadata.Type),
createDefaultConfig,
receiver.WithTraces(createTraces, metadata.TracesStability),
receiver.WithMetrics(createMetrics, metadata.MetricsStability),
Expand All @@ -39,7 +38,7 @@ func createDefaultConfig() component.Config {
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: defaultGRPCEndpoint,
Transport: confignet.TransportTypeTCP,
Transport: "tcp",
},
// We almost write 0 bytes, so no need to tune WriteBufferSize.
ReadBufferSize: 512 * 1024,
Expand All @@ -57,17 +56,18 @@ func createTraces(
set receiver.CreateSettings,
cfg component.Config,
nextConsumer consumer.Traces,
) (_ receiver.Traces, err error) {
) (receiver.Traces, error) {
oCfg := cfg.(*Config)
r := receivers.GetOrAdd(oCfg, func() (comp component.Component) {
comp, err = newOTelArrowReceiver(oCfg, set)
return
r, err := receivers.GetOrAdd(oCfg, func() (*otelArrowReceiver, error) {
return newOTelArrowReceiver(oCfg, set)
})
if err != nil {
return nil, err
}

r.Unwrap().(*otelArrowReceiver).registerTraceConsumer(nextConsumer)
if err = r.Unwrap().registerTraceConsumer(nextConsumer); err != nil {
return nil, err
}
return r, nil
}

Expand All @@ -77,17 +77,18 @@ func createMetrics(
set receiver.CreateSettings,
cfg component.Config,
consumer consumer.Metrics,
) (_ receiver.Metrics, err error) {
) (receiver.Metrics, error) {
oCfg := cfg.(*Config)
r := receivers.GetOrAdd(oCfg, func() (comp component.Component) {
comp, err = newOTelArrowReceiver(oCfg, set)
return comp
r, err := receivers.GetOrAdd(oCfg, func() (*otelArrowReceiver, error) {
return newOTelArrowReceiver(oCfg, set)
})
if err != nil {
return nil, err
}

r.Unwrap().(*otelArrowReceiver).registerMetricsConsumer(consumer)
if err = r.Unwrap().registerMetricsConsumer(consumer); err != nil {
return nil, err
}
return r, nil
}

Expand All @@ -97,17 +98,18 @@ func createLog(
set receiver.CreateSettings,
cfg component.Config,
consumer consumer.Logs,
) (_ receiver.Logs, err error) {
) (receiver.Logs, error) {
oCfg := cfg.(*Config)
r := receivers.GetOrAdd(oCfg, func() (comp component.Component) {
comp, err = newOTelArrowReceiver(oCfg, set)
return comp
r, err := receivers.GetOrAdd(oCfg, func() (*otelArrowReceiver, error) {
return newOTelArrowReceiver(oCfg, set)
})
if err != nil {
return nil, err
}

r.Unwrap().(*otelArrowReceiver).registerLogsConsumer(consumer)
if err = r.Unwrap().registerLogsConsumer(consumer); err != nil {
return nil, err
}
return r, nil
}

Expand All @@ -117,4 +119,4 @@ func createLog(
// create separate objects, they must use one otelArrowReceiver object per configuration.
// When the receiver is shutdown it should be removed from this map so the same configuration
// can be recreated successfully.
var receivers = sharedcomponent.NewSharedComponents()
var receivers = sharedcomponent.NewSharedComponents[*Config, *otelArrowReceiver]()
15 changes: 8 additions & 7 deletions receiver/otelarrowreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
"context"
"testing"

"github.com/open-telemetry/otel-arrow/collector/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/otel-arrow/collector/testutil"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confignet"
Expand Down Expand Up @@ -45,7 +46,7 @@ func TestCreateTracesReceiver(t *testing.T) {
defaultGRPCSettings := configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: testutil.GetAvailableLocalAddress(t),
Transport: confignet.TransportTypeTCP,
Transport: "tcp",
},
}

Expand All @@ -69,7 +70,7 @@ func TestCreateTracesReceiver(t *testing.T) {
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: "localhost:112233",
Transport: confignet.TransportTypeTCP,
Transport: "tcp",
},
},
},
Expand Down Expand Up @@ -101,7 +102,7 @@ func TestCreateMetricReceiver(t *testing.T) {
defaultGRPCSettings := configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: testutil.GetAvailableLocalAddress(t),
Transport: confignet.TransportTypeTCP,
Transport: "tcp",
},
}

Expand All @@ -125,7 +126,7 @@ func TestCreateMetricReceiver(t *testing.T) {
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: "327.0.0.1:1122",
Transport: confignet.TransportTypeTCP,
Transport: "tcp",
},
},
},
Expand Down Expand Up @@ -156,7 +157,7 @@ func TestCreateLogReceiver(t *testing.T) {
defaultGRPCSettings := configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: testutil.GetAvailableLocalAddress(t),
Transport: confignet.TransportTypeTCP,
Transport: "tcp",
},
}

Expand All @@ -183,7 +184,7 @@ func TestCreateLogReceiver(t *testing.T) {
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: "327.0.0.1:1122",
Transport: confignet.TransportTypeTCP,
Transport: "tcp",
},
},
},
Expand Down
75 changes: 0 additions & 75 deletions receiver/otelarrowreceiver/generated_component_test.go

This file was deleted.

Loading