Skip to content

Commit

Permalink
Add validation on ballast size between memorylimiter and ballastexten…
Browse files Browse the repository at this point in the history
…sion (open-telemetry#3532)

* Add validation to make sure memorylimiter and ballastextension have the same setting on ballast size in config
Add BallastSizePercentage support in memory_limiter processor config

* Deprecate `ballast_size_mib` in memory limiter and use the ballast size set by `ballast extension`
  • Loading branch information
mxiamxia committed Jul 14, 2021
1 parent 7e7ee8f commit 0546fac
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 16 deletions.
8 changes: 7 additions & 1 deletion extension/ballastextension/memory_ballast.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

const megaBytes = 1024 * 1024

var ballastSizeBytes uint64

type memoryBallast struct {
cfg *Config
logger *zap.Logger
Expand All @@ -32,7 +34,6 @@ type memoryBallast struct {
}

func (m *memoryBallast) Start(_ context.Context, _ component.Host) error {
var ballastSizeBytes uint64
// absolute value supersedes percentage setting
if m.cfg.SizeMiB > 0 {
ballastSizeBytes = m.cfg.SizeMiB * megaBytes
Expand Down Expand Up @@ -66,3 +67,8 @@ func newMemoryBallast(cfg *Config, logger *zap.Logger, getTotalMem func() (uint6
getTotalMem: getTotalMem,
}
}

// GetBallastSize returns the current ballast memory setting in bytes
func GetBallastSize() uint64 {
return ballastSizeBytes
}
20 changes: 8 additions & 12 deletions processor/memorylimiter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,19 @@ value (otherwise memory usage may exceed the hard limit - even if temporarily).
A good starting point for `spike_limit_mib` is 20% of the hard limit. Bigger
`spike_limit_mib` values may be necessary for spiky traffic or for longer check intervals.

In addition, if the command line option `mem-ballast-size-mib` is used to specify a
ballast (see command line help for details), the same value that is provided via the
command line must also be defined in the memory_limiter processor using `ballast_size_mib`
config option. If the command line option value and config option value don't match
the behavior of the memory_limiter processor will be unpredictable.
In addition, if the ballast size is specified in [ballastextension](../../extension/ballastextension),
the same value that is provided via the `ballastextension` will be used in `memory_limitor` for
calculating the total allocated memory for the collector.
The `memory_limiter.ballast_size_mib` config has been deprecated and will be removed soon.

Note that while the processor can help mitigate out of memory situations,
it is not a replacement for properly sizing and configuring the
collector. Keep in mind that if the soft limit is crossed, the collector will
return errors to all receive operations until enough memory is freed. This will
result in dropped data.

It is highly recommended to configure the ballast command line option as well as the
memory_limiter processor on every collector. The ballast should be configured to
It is highly recommended to configure `ballastextension` as well as the
`memory_limiter` processor on every collector. The ballast should be configured to
be 1/3 to 1/2 of the memory allocated to the collector. The memory_limiter
processor should be the first processor defined in the pipeline (immediately after
the receivers). This is to ensure that backpressure can be sent to applicable
Expand Down Expand Up @@ -79,16 +78,14 @@ This option is used to calculate `spike_limit_mib` from the total available memo
For instance setting of 25% with the total memory of 1GiB will result in the spike limit of 250MiB.
This option is intended to be used only with `limit_percentage`.

The following configuration options can also be modified:
- `ballast_size_mib` (default = 0): Must match the `mem-ballast-size-mib`
command line option.
The `ballast_size_mib` configuration has been deprecated and replaced by `ballast_extension`.
- <del>`ballast_size_mib` (default = 0): Must match the value of `ballast_size_mib` in `ballastextension` config</del>

Examples:

```yaml
processors:
memory_limiter:
ballast_size_mib: 2000
check_interval: 1s
limit_mib: 4000
spike_limit_mib: 800
Expand All @@ -97,7 +94,6 @@ processors:
```yaml
processors:
memory_limiter:
ballast_size_mib: 2000
check_interval: 1s
limit_percentage: 50
spike_limit_percentage: 30
Expand Down
1 change: 1 addition & 0 deletions processor/memorylimiter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Config struct {

// BallastSizeMiB is the size, in MiB, of the ballast size being used by the
// process.
// Deprecated: use the ballast size configuration in `ballastextension` component instead.
BallastSizeMiB uint32 `mapstructure:"ballast_size_mib"`

// MemoryLimitPercentage is the maximum amount of memory, in %, targeted to be
Expand Down
7 changes: 4 additions & 3 deletions processor/memorylimiter/memorylimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/extension/ballastextension"
"go.opentelemetry.io/collector/internal/iruntime"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/obsreport"
Expand Down Expand Up @@ -88,7 +89,8 @@ const minGCIntervalWhenSoftLimited = 10 * time.Second

// newMemoryLimiter returns a new memorylimiter processor.
func newMemoryLimiter(logger *zap.Logger, cfg *Config) (*memoryLimiter, error) {
ballastSize := uint64(cfg.BallastSizeMiB) * mibBytes
// Get ballast size in bytes from ballastextension
ballastSize := ballastextension.GetBallastSize()

if cfg.CheckInterval <= 0 {
return nil, errCheckIntervalOutOfRange
Expand Down Expand Up @@ -213,8 +215,7 @@ func (ml *memoryLimiter) readMemStats() *runtime.MemStats {
} else if !ml.configMismatchedLogged {
// This indicates misconfiguration. Log it once.
ml.configMismatchedLogged = true
ml.logger.Warn(typeStr + " is likely incorrectly configured. " + ballastSizeMibKey +
" must be set equal to --mem-ballast-size-mib command line option.")
ml.logger.Warn(ballastSizeMibKey + " in ballast extension is likely incorrectly configured.")
}

return ms
Expand Down
39 changes: 39 additions & 0 deletions processor/memorylimiter/memorylimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/extension/ballastextension"
"go.opentelemetry.io/collector/internal/iruntime"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/obsreport"
Expand Down Expand Up @@ -409,3 +411,40 @@ func TestDropDecision(t *testing.T) {
})
}
}

func TestBallastSizeMiB(t *testing.T) {
ctx := context.Background()
ballastExtFactory := ballastextension.NewFactory()
ballastExtCfg := ballastExtFactory.CreateDefaultConfig().(*ballastextension.Config)
ballastExtCfg.SizeMiB = 100
extCreateSet := componenttest.NewNopExtensionCreateSettings()

tests := []struct {
name string
ballastExtBallastSizeSetting uint64
expectedMemLimiterBallastSize uint64
expectResult bool
}{
{
name: "ballast size matched",
ballastExtBallastSizeSetting: 100,
expectedMemLimiterBallastSize: 100,
expectResult: true,
},
{
name: "ballast size not matched",
ballastExtBallastSizeSetting: 1000,
expectedMemLimiterBallastSize: 100,
expectResult: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ballastExtCfg.SizeMiB = tt.ballastExtBallastSizeSetting
ballastExt, _ := ballastExtFactory.CreateExtension(ctx, extCreateSet, ballastExtCfg)
ballastExt.Start(ctx, nil)
assert.Equal(t, tt.expectResult, tt.expectedMemLimiterBallastSize*mibBytes == ballastextension.GetBallastSize())
})
}
}
8 changes: 8 additions & 0 deletions service/defaultcomponents/default_extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/extension/ballastextension"
"go.opentelemetry.io/collector/extension/bearertokenauthextension"
"go.opentelemetry.io/collector/extension/healthcheckextension"
"go.opentelemetry.io/collector/extension/pprofextension"
Expand Down Expand Up @@ -74,6 +75,13 @@ func TestDefaultExtensions(t *testing.T) {
return cfg
},
},
{
extension: "memory_ballast",
getConfigFn: func() config.Extension {
cfg := extFactories["memory_ballast"].CreateDefaultConfig().(*ballastextension.Config)
return cfg
},
},
}

// we have one more extension that we can't test here: the OIDC Auth extension requires
Expand Down
2 changes: 2 additions & 0 deletions service/defaultcomponents/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/collector/exporter/prometheusexporter"
"go.opentelemetry.io/collector/exporter/prometheusremotewriteexporter"
"go.opentelemetry.io/collector/exporter/zipkinexporter"
"go.opentelemetry.io/collector/extension/ballastextension"
"go.opentelemetry.io/collector/extension/bearertokenauthextension"
"go.opentelemetry.io/collector/extension/healthcheckextension"
"go.opentelemetry.io/collector/extension/oidcauthextension"
Expand Down Expand Up @@ -63,6 +64,7 @@ func Components() (
oidcauthextension.NewFactory(),
pprofextension.NewFactory(),
zpagesextension.NewFactory(),
ballastextension.NewFactory(),
)
if err != nil {
errs = append(errs, err)
Expand Down

0 comments on commit 0546fac

Please sign in to comment.