Skip to content

Commit

Permalink
Add internal/otelarrow package for shared code between otelarrow expo…
Browse files Browse the repository at this point in the history
…rter, receiver (#33579)

**Description:** The otelarrowexporter and otelarrowreceiver components
depend on several libraries in the open-telemetry/otel-arrow repository,
which would complicate release management in both repositories. This
change eliminates the dependency on `otel-arrow/collector` by moving
each of those packages into a subdirectory of `internal/otelarrow`.

**Link to tracking Issue:** #33567 

**Testing:** This brings new integration tests into this repository for
the two otelarrow components.

**Documentation:** Each of the three non-test packages exposed here has
a brief README.md explaining why it exists. Each of these could be
useful to non-Arrow components if helpful.

---------

Co-authored-by: Alex Boten <223565+codeboten@users.noreply.github.com>
Co-authored-by: Matthew Wear <matthew.wear@gmail.com>
  • Loading branch information
3 people committed Jul 17, 2024
1 parent f3fdea3 commit 7e3fc98
Show file tree
Hide file tree
Showing 32 changed files with 3,229 additions and 0 deletions.
27 changes: 27 additions & 0 deletions .chloggen/otelarrow-internals.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otelarrowreceiver, otelarrowexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: OTel-Arrow internal packages moved into this repository.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33567]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: New integration testing between otelarrowexporter and otelarrowreceiver.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ internal/k8stest/ @open-teleme
internal/kafka/ @open-telemetry/collector-contrib-approvers @pavolloffay @MovieStoreGuy
internal/kubelet/ @open-telemetry/collector-contrib-approvers @dmitryax
internal/metadataproviders/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @dashpole
internal/otelarrow/ @open-telemetry/collector-contrib-approvers @jmacd @moh-osman3
internal/pdatautil/ @open-telemetry/collector-contrib-approvers @djaglowski
internal/sharedcomponent/ @open-telemetry/collector-contrib-approvers @open-telemetry/collector-approvers
internal/splunk/ @open-telemetry/collector-contrib-approvers @dmitryax
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ body:
- internal/kafka
- internal/kubelet
- internal/metadataproviders
- internal/otelarrow
- internal/pdatautil
- internal/sharedcomponent
- internal/splunk
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ body:
- internal/kafka
- internal/kubelet
- internal/metadataproviders
- internal/otelarrow
- internal/pdatautil
- internal/sharedcomponent
- internal/splunk
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ body:
- internal/kafka
- internal/kubelet
- internal/metadataproviders
- internal/otelarrow
- internal/pdatautil
- internal/sharedcomponent
- internal/splunk
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/unmaintained.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ body:
- internal/kafka
- internal/kubelet
- internal/metadataproviders
- internal/otelarrow
- internal/pdatautil
- internal/sharedcomponent
- internal/splunk
Expand Down
1 change: 1 addition & 0 deletions internal/otelarrow/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
20 changes: 20 additions & 0 deletions internal/otelarrow/admission/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Admission Package

## Overview

The admission package provides a BoundedQueue object which is a semaphore implementation that limits the number of bytes admitted into a collector pipeline. Additionally the BoundedQueue limits the number of waiters that can block on a call to `bq.Acquire(sz int64)`.

This package is an experiment to improve the behavior of Collector pipelines having their `exporterhelper` configured to apply backpressure. This package is meant to be used in receivers, via an interceptor or custom logic. Therefore, the BoundedQueue helps limit memory within the entire collector pipeline by limiting two dimensions that cause memory issues:
1. bytes: large requests that enter the collector pipeline can require large allocations even if downstream components will eventually limit or ratelimit the request.
2. waiters: limiting on bytes alone is not enough because requests that enter the pipeline and block on `bq.Acquire()` can still consume memory within the receiver. If there are enough waiters this can be a significant contribution to memory usage.

## Usage

Create a new BoundedQueue by calling `bq := admission.NewBoundedQueue(maxLimitBytes, maxLimitWaiters)`

Within the component call `bq.Acquire(ctx, requestSize)` which will either
1. succeed immediately if there is enough available memory
2. fail immediately if there are too many waiters
3. block until context cancelation or enough bytes becomes available

Once a request has finished processing and is sent downstream call `bq.Release(requestSize)` to allow waiters to be admitted for processing. Release should only fail if releasing more bytes than previously acquired.
152 changes: 152 additions & 0 deletions internal/otelarrow/admission/boundedqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package admission // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission"

import (
"context"
"fmt"
"sync"

"github.com/google/uuid"
orderedmap "github.com/wk8/go-ordered-map/v2"
)

var ErrTooManyWaiters = fmt.Errorf("rejecting request, too many waiters")

type BoundedQueue struct {
maxLimitBytes int64
maxLimitWaiters int64
currentBytes int64
currentWaiters int64
lock sync.Mutex
waiters *orderedmap.OrderedMap[uuid.UUID, waiter]
}

type waiter struct {
readyCh chan struct{}
pendingBytes int64
ID uuid.UUID
}

func NewBoundedQueue(maxLimitBytes, maxLimitWaiters int64) *BoundedQueue {
return &BoundedQueue{
maxLimitBytes: maxLimitBytes,
maxLimitWaiters: maxLimitWaiters,
waiters: orderedmap.New[uuid.UUID, waiter](),
}
}

func (bq *BoundedQueue) admit(pendingBytes int64) (bool, error) {
bq.lock.Lock()
defer bq.lock.Unlock()

if pendingBytes > bq.maxLimitBytes { // will never succeed
return false, fmt.Errorf("rejecting request, request size larger than configured limit")
}

if bq.currentBytes+pendingBytes <= bq.maxLimitBytes { // no need to wait to admit
bq.currentBytes += pendingBytes
return true, nil
}

// since we were unable to admit, check if we can wait.
if bq.currentWaiters+1 > bq.maxLimitWaiters { // too many waiters
return false, ErrTooManyWaiters
}

// if we got to this point we need to wait to acquire bytes, so update currentWaiters before releasing mutex.
bq.currentWaiters++
return false, nil
}

func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error {
success, err := bq.admit(pendingBytes)
if err != nil || success {
return err
}

// otherwise we need to wait for bytes to be released
curWaiter := waiter{
pendingBytes: pendingBytes,
readyCh: make(chan struct{}),
}

bq.lock.Lock()

// generate unique key
for {
id := uuid.New()
_, keyExists := bq.waiters.Get(id)
if keyExists {
continue
}
bq.waiters.Set(id, curWaiter)
curWaiter.ID = id
break
}

bq.lock.Unlock()
// @@@ instrument this code path

select {
case <-curWaiter.readyCh:
return nil
case <-ctx.Done():
// canceled before acquired so remove waiter.
bq.lock.Lock()
defer bq.lock.Unlock()
err = fmt.Errorf("context canceled: %w ", ctx.Err())

_, found := bq.waiters.Delete(curWaiter.ID)
if !found {
return err
}

bq.currentWaiters--
return err
}
}

func (bq *BoundedQueue) Release(pendingBytes int64) error {
bq.lock.Lock()
defer bq.lock.Unlock()

bq.currentBytes -= pendingBytes

if bq.currentBytes < 0 {
return fmt.Errorf("released more bytes than acquired")
}

for {
if bq.waiters.Len() == 0 {
return nil
}
next := bq.waiters.Oldest()
nextWaiter := next.Value
nextKey := next.Key
if bq.currentBytes+nextWaiter.pendingBytes <= bq.maxLimitBytes {
bq.currentBytes += nextWaiter.pendingBytes
bq.currentWaiters--
close(nextWaiter.readyCh)
_, found := bq.waiters.Delete(nextKey)
if !found {
return fmt.Errorf("deleting waiter that doesn't exist")
}
continue
}
break
}

return nil
}

func (bq *BoundedQueue) TryAcquire(pendingBytes int64) bool {
bq.lock.Lock()
defer bq.lock.Unlock()
if bq.currentBytes+pendingBytes <= bq.maxLimitBytes {
bq.currentBytes += pendingBytes
return true
}
return false
}
Loading

0 comments on commit 7e3fc98

Please sign in to comment.