Skip to content

Commit

Permalink
[Backport 5.1] grpc: add interceptor that tracks the sizes of all mes…
Browse files Browse the repository at this point in the history
…sages sent by servers/clients (#55495)

Follow up to https://github.com/sourcegraph/sourcegraph/pull/55209 and
https://github.com/sourcegraph/sourcegraph/pull/55242.

This PR adds interceptors that records Prometheus metrics that observe:

- the individual size of each **sent** protobuf message by a server or
client
- the total amount data sent over the course a single RPC by a server
(responses) or client (requests)

This allows us to track the total amount of a data returned by any of
our RPCs. In some cases, this can reveal opportunities for future
performance / stability improvements (Example: symbols'
[LocalCodeIntel method returning ~gigabyte sized responses that has to
be held all at once in
memory](https://github.com/sourcegraph/sourcegraph/pull/55242)).

This PR also provides new grafana dashboards that track this metric for
every gRPC service. See below for a screenshot of what this looks like
when I run the symbols service locally.

Co-authored-by: Geoffrey Gilmore <geoffrey@sourcegraph.com>
  • Loading branch information
github-actions[bot] and ggilmore committed Aug 2, 2023
1 parent 71efffe commit aa4f838
Show file tree
Hide file tree
Showing 21 changed files with 2,025 additions and 140 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ All notable changes to Sourcegraph are documented in this file.

### Added

- Experimental support for Azure OpenAI for the completions and embeddings provider has been added. [#55178](https://github.com/sourcegraph/sourcegraph/pull/55178)
- Added a feature flag for alternate GitLab project visibility resolution. This may solve some weird cases with not being able to see GitLab internal projects. [#54426](https://github.com/sourcegraph/sourcegraph/pull/54426)
- To use this feature flag, create a Boolean feature flag named "gitLabProjectVisibilityExperimental" and set the value to True.
- It is now possible to add annotations to pods spawned by jobs created by the Kubernetes executor. [#55361](https://github.com/sourcegraph/sourcegraph/pull/55361)
- New Prometheus metrics have been added to track the response / request sizes of gRPC calls. [#55381](https://github.com/sourcegraph/sourcegraph/pull/55381)
- A new embeddings site configuration setting `excludeChunkOnError` allows embedding jobs to complete job execution despite chunks of code or text that fail. When enabled the chunks are skipped after failed retries but the index can continue being populated. When disabled the entire job fails and the index is not saved. This setting is enabled by default. Embedding job statistics now capture `code_chunks_excluded` and `text_chunks_excluded` for successfully completed jobs. Total excluded chunks and file names for excluded chunks are logged as warnings. [#55180](https://github.com/sourcegraph/sourcegraph/pull/55180)

### Changed
Expand Down
740 changes: 685 additions & 55 deletions doc/admin/observability/dashboards.md

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion internal/grpc/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/grpc/contextconv"
"github.com/sourcegraph/sourcegraph/internal/grpc/messagesize"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -22,7 +23,6 @@ import (
"github.com/sourcegraph/sourcegraph/internal/env"
internalgrpc "github.com/sourcegraph/sourcegraph/internal/grpc"
"github.com/sourcegraph/sourcegraph/internal/grpc/internalerrs"
"github.com/sourcegraph/sourcegraph/internal/grpc/messagesize"
"github.com/sourcegraph/sourcegraph/internal/grpc/propagator"
"github.com/sourcegraph/sourcegraph/internal/requestclient"
"github.com/sourcegraph/sourcegraph/internal/trace/policy"
Expand Down Expand Up @@ -58,6 +58,7 @@ func DialOptions(logger log.Logger, additionalOptions ...grpc.DialOption) []grpc
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainStreamInterceptor(
grpc_prometheus.StreamClientInterceptor(metrics),
messagesize.StreamClientInterceptor,
propagator.StreamClientPropagator(actor.ActorPropagator{}),
propagator.StreamClientPropagator(policy.ShouldTracePropagator{}),
propagator.StreamClientPropagator(requestclient.Propagator{}),
Expand All @@ -68,6 +69,7 @@ func DialOptions(logger log.Logger, additionalOptions ...grpc.DialOption) []grpc
),
grpc.WithChainUnaryInterceptor(
grpc_prometheus.UnaryClientInterceptor(metrics),
messagesize.UnaryClientInterceptor,
propagator.UnaryClientPropagator(actor.ActorPropagator{}),
propagator.UnaryClientPropagator(policy.ShouldTracePropagator{}),
propagator.UnaryClientPropagator(requestclient.Propagator{}),
Expand Down Expand Up @@ -115,6 +117,7 @@ func ServerOptions(logger log.Logger, additionalOptions ...grpc.ServerOption) []
internalgrpc.NewStreamPanicCatcher(logger),
internalerrs.LoggingStreamServerInterceptor(logger),
grpc_prometheus.StreamServerInterceptor(metrics),
messagesize.StreamServerInterceptor,
propagator.StreamServerPropagator(requestclient.Propagator{}),
propagator.StreamServerPropagator(actor.ActorPropagator{}),
propagator.StreamServerPropagator(policy.ShouldTracePropagator{}),
Expand All @@ -125,6 +128,7 @@ func ServerOptions(logger log.Logger, additionalOptions ...grpc.ServerOption) []
internalgrpc.NewUnaryPanicCatcher(logger),
internalerrs.LoggingUnaryServerInterceptor(logger),
grpc_prometheus.UnaryServerInterceptor(metrics),
messagesize.UnaryServerInterceptor,
propagator.UnaryServerPropagator(requestclient.Propagator{}),
propagator.UnaryServerPropagator(actor.ActorPropagator{}),
propagator.UnaryServerPropagator(policy.ShouldTracePropagator{}),
Expand Down
16 changes: 16 additions & 0 deletions internal/grpc/grpcutil/BUILD.bazel

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions internal/grpc/grpcutil/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package grpcutil

import "strings"

// SplitMethodName splits a full gRPC method name (e.g. "/package.service/method") in to its individual components (service, method)
//
// Copied from github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/reporter.go
func SplitMethodName(fullMethod string) (string, string) {
fullMethod = strings.TrimPrefix(fullMethod, "/") // remove leading slash
if i := strings.Index(fullMethod, "/"); i >= 0 {
return fullMethod[:i], fullMethod[i+1:]
}
return "unknown", "unknown"
}
59 changes: 59 additions & 0 deletions internal/grpc/grpcutil/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package grpcutil

import (
"testing"

"github.com/google/go-cmp/cmp"
)

func TestSplitMethodName(t *testing.T) {
testCases := []struct {
name string

fullMethod string
wantService string
wantMethod string
}{
{
name: "full method with service and method",

fullMethod: "/package.service/method",
wantService: "package.service",
wantMethod: "method",
},
{
name: "method without leading slash",

fullMethod: "package.service/method",
wantService: "package.service",
wantMethod: "method",
},
{
name: "service without method",

fullMethod: "/package.service/",
wantService: "package.service",
wantMethod: "",
},
{
name: "empty input",

fullMethod: "",
wantService: "unknown",
wantMethod: "unknown",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
service, method := SplitMethodName(tc.fullMethod)
if diff := cmp.Diff(service, tc.wantService); diff != "" {
t.Errorf("splitMethodName(%q) service (-want +got):\n%s", tc.fullMethod, diff)
}

if diff := cmp.Diff(method, tc.wantMethod); diff != "" {
t.Errorf("splitMethodName(%q) method (-want +got):\n%s", tc.fullMethod, diff)
}
})
}
}
1 change: 1 addition & 0 deletions internal/grpc/internalerrs/BUILD.bazel

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 0 additions & 11 deletions internal/grpc/internalerrs/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,6 @@ func gRPCUnexpectedContentTypeChecker(s *status.Status) bool {
return s.Code() != codes.OK && strings.Contains(s.Message(), "transport: received unexpected content-type")
}

// splitMethodName splits a full gRPC method name in to its components (service, method)
//
// Copied from github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/reporter.go
func splitMethodName(fullMethod string) (string, string) {
fullMethod = strings.TrimPrefix(fullMethod, "/") // remove leading slash
if i := strings.Index(fullMethod, "/"); i >= 0 {
return fullMethod[:i], fullMethod[i+1:]
}
return "unknown", "unknown"
}

// findNonUTF8StringFields returns a list of field names that contain invalid UTF-8 strings
// in the given proto message.
//
Expand Down
52 changes: 0 additions & 52 deletions internal/grpc/internalerrs/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,58 +367,6 @@ func TestGRPCUnexpectedContentTypeChecker(t *testing.T) {
}
}

func TestSplitMethodName(t *testing.T) {
testCases := []struct {
name string

fullMethod string
wantService string
wantMethod string
}{
{
name: "full method with service and method",

fullMethod: "/package.service/method",
wantService: "package.service",
wantMethod: "method",
},
{
name: "method without leading slash",

fullMethod: "package.service/method",
wantService: "package.service",
wantMethod: "method",
},
{
name: "service without method",

fullMethod: "/package.service/",
wantService: "package.service",
wantMethod: "",
},
{
name: "empty input",

fullMethod: "",
wantService: "unknown",
wantMethod: "unknown",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
service, method := splitMethodName(tc.fullMethod)
if diff := cmp.Diff(service, tc.wantService); diff != "" {
t.Errorf("splitMethodName(%q) service (-want +got):\n%s", tc.fullMethod, diff)
}

if diff := cmp.Diff(method, tc.wantMethod); diff != "" {
t.Errorf("splitMethodName(%q) method (-want +got):\n%s", tc.fullMethod, diff)
}
})
}
}

func TestFindNonUTF8StringFields(t *testing.T) {
// Create instances of the BinaryAttachment and KeyValueAttachment messages
invalidBinaryAttachment := &newspb.BinaryAttachment{
Expand Down
9 changes: 5 additions & 4 deletions internal/grpc/internalerrs/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/dustin/go-humanize"
"github.com/sourcegraph/sourcegraph/internal/grpc/grpcutil"

"google.golang.org/grpc/codes"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -49,7 +50,7 @@ func LoggingUnaryClientInterceptor(l log.Logger) grpc.UnaryClientInterceptor {
return func(ctx context.Context, fullMethod string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
err := invoker(ctx, fullMethod, req, reply, cc, opts...)
if err != nil {
serviceName, methodName := splitMethodName(fullMethod)
serviceName, methodName := grpcutil.SplitMethodName(fullMethod)

var initialRequest proto.Message
if m, ok := req.(proto.Message); ok {
Expand Down Expand Up @@ -77,7 +78,7 @@ func LoggingStreamClientInterceptor(l log.Logger) grpc.StreamClientInterceptor {
logger = logger.Scoped("streamingMethod", "errors that originated from a streaming method")

return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, fullMethod string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
serviceName, methodName := splitMethodName(fullMethod)
serviceName, methodName := grpcutil.SplitMethodName(fullMethod)

stream, err := streamer(ctx, desc, cc, fullMethod, opts...)
if err != nil {
Expand Down Expand Up @@ -111,7 +112,7 @@ func LoggingUnaryServerInterceptor(l log.Logger) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
response, err := handler(ctx, req)
if err != nil {
serviceName, methodName := splitMethodName(info.FullMethod)
serviceName, methodName := grpcutil.SplitMethodName(info.FullMethod)

var initialRequest proto.Message
if m, ok := req.(proto.Message); ok {
Expand Down Expand Up @@ -139,7 +140,7 @@ func LoggingStreamServerInterceptor(l log.Logger) grpc.StreamServerInterceptor {
logger = logger.Scoped("streamingMethod", "errors that originated from a streaming method")

return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
serviceName, methodName := splitMethodName(info.FullMethod)
serviceName, methodName := grpcutil.SplitMethodName(info.FullMethod)

stream := newLoggingServerStream(ss, logger, serviceName, methodName)
return handler(srv, stream)
Expand Down
5 changes: 3 additions & 2 deletions internal/grpc/internalerrs/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sourcegraph/sourcegraph/internal/grpc/grpcutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
Expand All @@ -26,7 +27,7 @@ var metricGRPCMethodStatus = promauto.NewCounterVec(prometheus.CounterOpts{
// PrometheusUnaryClientInterceptor returns a grpc.UnaryClientInterceptor that observes the result of
// the RPC and records it as a Prometheus metric ("src_grpc_method_status").
func PrometheusUnaryClientInterceptor(ctx context.Context, fullMethod string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
serviceName, methodName := splitMethodName(fullMethod)
serviceName, methodName := grpcutil.SplitMethodName(fullMethod)

err := invoker(ctx, fullMethod, req, reply, cc, opts...)
doObservation(serviceName, methodName, err)
Expand All @@ -39,7 +40,7 @@ func PrometheusUnaryClientInterceptor(ctx context.Context, fullMethod string, re
// If any errors are encountered during the stream, the first error is recorded. Otherwise, the
// final status of the stream is recorded.
func PrometheusStreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, fullMethod string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
serviceName, methodName := splitMethodName(fullMethod)
serviceName, methodName := grpcutil.SplitMethodName(fullMethod)

s, err := streamer(ctx, desc, cc, fullMethod, opts...)
if err != nil {
Expand Down
42 changes: 40 additions & 2 deletions internal/grpc/messagesize/BUILD.bazel

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion internal/grpc/messagesize/messagesize.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package messagesize

import (
"fmt"
"google.golang.org/grpc"
"math"

"google.golang.org/grpc"

"github.com/dustin/go-humanize"
"github.com/sourcegraph/sourcegraph/internal/env"
)
Expand Down
Loading

0 comments on commit aa4f838

Please sign in to comment.