Skip to content

Commit

Permalink
[extension/healthcheckv2] Add HTTP service (#33528)
Browse files Browse the repository at this point in the history
**Description:**

This PR is the third in a series to decompose
#30673
into more manageable pieces for review. This PR introduces the HTTP
service which builds upon the aggregation logic added in the [previous
PR](#32695).
Following this will be a PR to add a gRPC health check service.

A summary of the changes is below:
- http service based on component status
- supports legacy behavior and config; to be deprecated
- overall collector health can be monitored as well as pipeline health
- additionally the verbosity of the response can be controlled by
passing a `verbose` query parameter
- adds optional endpoint to retrieve the config of the running collector
	- this is currently unredacted JSON and is opt-in

Note, that there will be a follow up PR to add the gRPC service. This
will be relevant when reviewing the extension code. It is setup to
manage a slice of subcomponents. The HTTP and gRPC services are the
subcomponents to be managed, and both implement the `Component`
interface. See the [reference
PR](#30673)
for details.

I've provided some examples below to help get an idea of what the
responses look like when serialized as JSON.

**Collector Health Example Response**

Below is an example verbose response for the overall collector health.
Note, the top level fields correspond to the collector overall, the next
level corresponds to the pipelines, and the level below that is the
health of the individual components in each pipeline.

```json
{
    "start_time": "2024-01-18T17:27:12.570394-08:00",
    "healthy": true,
    "status": "StatusRecoverableError",
    "error": "rpc error: code = ResourceExhausted desc = resource exhausted",
    "status_time": "2024-01-18T17:27:32.572301-08:00",
    "components": {
        "extensions": {
            "healthy": true,
            "status": "StatusOK",
            "status_time": "2024-01-18T17:27:12.570428-08:00",
            "components": {
                "extension:healthcheckv2": {
                    "healthy": true,
                    "status": "StatusOK",
                    "status_time": "2024-01-18T17:27:12.570428-08:00"
                }
            }
        },
        "pipeline:metrics/grpc": {
            "healthy": true,
            "status": "StatusRecoverableError",
            "error": "rpc error: code = ResourceExhausted desc = resource exhausted",
            "status_time": "2024-01-18T17:27:32.572301-08:00",
            "components": {
                "exporter:otlp/staging": {
                    "healthy": true,
                    "status": "StatusRecoverableError",
                    "error": "rpc error: code = ResourceExhausted desc = resource exhausted",
                    "status_time": "2024-01-18T17:27:32.572301-08:00"
                },
                "processor:batch": {
                    "healthy": true,
                    "status": "StatusOK",
                    "status_time": "2024-01-18T17:27:12.571132-08:00"
                },
                "receiver:otlp": {
                    "healthy": true,
                    "status": "StatusOK",
                    "status_time": "2024-01-18T17:27:12.571576-08:00"
                }
            }
        },
        "pipeline:traces/http": {
            "healthy": true,
            "status": "StatusOK",
            "status_time": "2024-01-18T17:27:12.571625-08:00",
            "components": {
                "exporter:otlphttp/staging": {
                    "healthy": true,
                    "status": "StatusOK",
                    "status_time": "2024-01-18T17:27:12.571615-08:00"
                },
                "processor:batch": {
                    "healthy": true,
                    "status": "StatusOK",
                    "status_time": "2024-01-18T17:27:12.571621-08:00"
                },
                "receiver:otlp": {
                    "healthy": true,
                    "status": "StatusOK",
                    "status_time": "2024-01-18T17:27:12.571625-08:00"
                }
            }
        }
    }
}
```

**Pipeline Health Example Response**

This is an example verbose response for the `traces/http` pipeline. The
top level corresponds to the health of the pipeline, and the second
level contains the health of the individual components that make up the
pipeline.

```json
{
    "start_time": "2024-01-18T17:27:12.570394-08:00",
    "healthy": true,
    "status": "StatusRecoverableError",
    "error": "rpc error: code = ResourceExhausted desc = resource exhausted",
    "status_time": "2024-01-18T17:27:32.572301-08:00",
    "components": {
        "extensions": {
            "healthy": true,
            "status": "StatusOK",
            "status_time": "2024-01-18T17:27:12.570428-08:00",
            "components": {
                "extension:healthcheckv2": {
                    "healthy": true,
                    "status": "StatusOK",
                    "status_time": "2024-01-18T17:27:12.570428-08:00"
                }
            }
        },
        "pipeline:metrics/grpc": {
            "healthy": true,
            "status": "StatusRecoverableError",
            "error": "rpc error: code = ResourceExhausted desc = resource exhausted",
            "status_time": "2024-01-18T17:27:32.572301-08:00",
            "components": {
                "exporter:otlp/staging": {
                    "healthy": true,
                    "status": "StatusRecoverableError",
                    "error": "rpc error: code = ResourceExhausted desc = resource exhausted",
                    "status_time": "2024-01-18T17:27:32.572301-08:00"
                },
                "processor:batch": {
                    "healthy": true,
                    "status": "StatusOK",
                    "status_time": "2024-01-18T17:27:12.571132-08:00"
                },
                "receiver:otlp": {
                    "healthy": true,
                    "status": "StatusOK",
                    "status_time": "2024-01-18T17:27:12.571576-08:00"
                }
            }
        },
        "pipeline:traces/http": {
            "healthy": true,
            "status": "StatusOK",
            "status_time": "2024-01-18T17:27:12.571625-08:00",
            "components": {
                "exporter:otlphttp/staging": {
                    "healthy": true,
                    "status": "StatusOK",
                    "status_time": "2024-01-18T17:27:12.571615-08:00"
                },
                "processor:batch": {
                    "healthy": true,
                    "status": "StatusOK",
                    "status_time": "2024-01-18T17:27:12.571621-08:00"
                },
                "receiver:otlp": {
                    "healthy": true,
                    "status": "StatusOK",
                    "status_time": "2024-01-18T17:27:12.571625-08:00"
                }
            }
        }
    }
}
```

**Link to tracking Issue:** #26661

**Testing:** Units / manual

**Documentation:** Comments, etc

---------

Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com>
Co-authored-by: Alex Boten <223565+codeboten@users.noreply.github.com>
  • Loading branch information
mwear and codeboten committed Jul 10, 2024
1 parent e42da8b commit d53bb4e
Show file tree
Hide file tree
Showing 11 changed files with 3,674 additions and 3 deletions.
27 changes: 27 additions & 0 deletions .chloggen/healthcheckv2-http.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: 'healthcheckv2extension'

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add HTTP service to healthcheckv2

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26661]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
9 changes: 6 additions & 3 deletions extension/healthcheckv2extension/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,19 @@ func TestCreateDefaultConfig(t *testing.T) {
}, cfg)

assert.NoError(t, componenttest.CheckConfigStruct(cfg))
ext, err := createExtension(context.Background(), extensiontest.NewNopSettings(), cfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ext, err := createExtension(ctx, extensiontest.NewNopSettings(), cfg)
require.NoError(t, err)
require.NotNil(t, ext)
}

func TestCreateExtension(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Endpoint = testutil.GetAvailableLocalAddress(t)

ext, err := createExtension(context.Background(), extensiontest.NewNopSettings(), cfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ext, err := createExtension(ctx, extensiontest.NewNopSettings(), cfg)
require.NoError(t, err)
require.NotNil(t, ext)
}
44 changes: 44 additions & 0 deletions extension/healthcheckv2extension/internal/http/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package http // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/http"

import (
"net/http"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status"
)

func (s *Server) statusHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
pipeline := r.URL.Query().Get("pipeline")
verbose := r.URL.Query().Has("verbose") && r.URL.Query().Get("verbose") != "false"
st, ok := s.aggregator.AggregateStatus(status.Scope(pipeline), status.Verbosity(verbose))

if !ok {
w.WriteHeader(http.StatusNotFound)
return
}

if err := s.responder.respond(st, w); err != nil {
s.telemetry.Logger.Warn(err.Error())
}
})
}

func (s *Server) configHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
conf := s.colconf.Load()

if conf == nil {
w.WriteHeader(http.StatusServiceUnavailable)
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if _, err := w.Write(conf.([]byte)); err != nil {
s.telemetry.Logger.Warn(err.Error())
}
})
}
14 changes: 14 additions & 0 deletions extension/healthcheckv2extension/internal/http/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package http // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/http"

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
156 changes: 156 additions & 0 deletions extension/healthcheckv2extension/internal/http/responders.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package http // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/http"

import (
"encoding/json"
"fmt"
"net/http"
"time"

"go.opentelemetry.io/collector/component"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension/internal/status"
)

var responseCodes = map[component.Status]int{
component.StatusNone: http.StatusServiceUnavailable,
component.StatusStarting: http.StatusServiceUnavailable,
component.StatusOK: http.StatusOK,
component.StatusRecoverableError: http.StatusOK,
component.StatusPermanentError: http.StatusOK,
component.StatusFatalError: http.StatusInternalServerError,
component.StatusStopping: http.StatusServiceUnavailable,
component.StatusStopped: http.StatusServiceUnavailable,
}

type serializationErr struct {
ErrorMessage string `json:"error_message"`
}

type responder interface {
respond(*status.AggregateStatus, http.ResponseWriter) error
}

type responderFunc func(*status.AggregateStatus, http.ResponseWriter) error

func (f responderFunc) respond(st *status.AggregateStatus, w http.ResponseWriter) error {
return f(st, w)
}

func respondWithJSON(code int, content any, w http.ResponseWriter) error {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)

body, mErr := json.Marshal(content)
if mErr != nil {
body, _ = json.Marshal(&serializationErr{ErrorMessage: mErr.Error()})
}
_, wErr := w.Write(body)
return wErr
}

func defaultResponder(startTimestamp *time.Time) responderFunc {
return func(st *status.AggregateStatus, w http.ResponseWriter) error {
code := responseCodes[st.Status()]
sst := toSerializableStatus(st, &serializationOptions{
includeStartTime: true,
startTimestamp: startTimestamp,
})
return respondWithJSON(code, sst, w)
}
}

func componentHealthResponder(
startTimestamp *time.Time,
config *common.ComponentHealthConfig,
) responderFunc {
healthyFunc := func(now *time.Time) func(status.Event) bool {
return func(ev status.Event) bool {
if ev.Status() == component.StatusPermanentError {
return !config.IncludePermanent
}

if ev.Status() == component.StatusRecoverableError && config.IncludeRecoverable {
return now.Before(ev.Timestamp().Add(config.RecoveryDuration))
}

return ev.Status() != component.StatusFatalError
}
}
return func(st *status.AggregateStatus, w http.ResponseWriter) error {
now := time.Now()
sst := toSerializableStatus(
st,
&serializationOptions{
includeStartTime: true,
startTimestamp: startTimestamp,
healthyFunc: healthyFunc(&now),
},
)

code := responseCodes[st.Status()]
if !sst.Healthy {
code = http.StatusInternalServerError
}

return respondWithJSON(code, sst, w)
}
}

// Below are responders ported from the original healthcheck extension. We will
// keep them for backwards compatibility, but eventually deprecate and remove
// them.

// legacyResponseCodes match the current response code mapping with the exception
// of FatalError, which maps to 503 instead of 500.
var legacyResponseCodes = map[component.Status]int{
component.StatusNone: http.StatusServiceUnavailable,
component.StatusStarting: http.StatusServiceUnavailable,
component.StatusOK: http.StatusOK,
component.StatusRecoverableError: http.StatusOK,
component.StatusPermanentError: http.StatusOK,
component.StatusFatalError: http.StatusServiceUnavailable,
component.StatusStopping: http.StatusServiceUnavailable,
component.StatusStopped: http.StatusServiceUnavailable,
}

func legacyDefaultResponder(startTimestamp *time.Time) responderFunc {
type healthCheckResponse struct {
StatusMsg string `json:"status"`
UpSince time.Time `json:"upSince"`
Uptime string `json:"uptime"`
}

codeToMsgMap := map[int]string{
http.StatusOK: "Server available",
http.StatusServiceUnavailable: "Server not available",
}

return func(st *status.AggregateStatus, w http.ResponseWriter) error {
code := legacyResponseCodes[st.Status()]
resp := healthCheckResponse{
StatusMsg: codeToMsgMap[code],
}
if code == http.StatusOK {
resp.UpSince = *startTimestamp
resp.Uptime = fmt.Sprintf("%v", time.Since(*startTimestamp))
}
return respondWithJSON(code, resp, w)
}
}

func legacyCustomResponder(config *ResponseBodyConfig) responderFunc {
codeToMsgMap := map[int][]byte{
http.StatusOK: []byte(config.Healthy),
http.StatusServiceUnavailable: []byte(config.Unhealthy),
}
return func(st *status.AggregateStatus, w http.ResponseWriter) error {
code := legacyResponseCodes[st.Status()]
w.WriteHeader(code)
_, err := w.Write(codeToMsgMap[code])
return err
}
}
37 changes: 37 additions & 0 deletions extension/healthcheckv2extension/internal/http/responders_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package http

import (
"errors"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// var errUnserializable = errors.New("cannot marshal JSON")
var unserializableErrString = "cannot marshal unserializable"

type unserializable struct{}

func (*unserializable) MarshalJSON() ([]byte, error) {
return nil, errors.New(unserializableErrString)
}

func TestRespondWithJSON(t *testing.T) {
content := &unserializable{}
w := httptest.NewRecorder()
require.NoError(t, respondWithJSON(http.StatusOK, content, w))
resp := w.Result()
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, resp.Header.Get("Content-Type"), "application/json")

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
assert.Contains(t, string(body), unserializableErrString)
}
Loading

0 comments on commit d53bb4e

Please sign in to comment.