Skip to content

Commit

Permalink
Implement endpoint groups
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Nov 2, 2022
1 parent 5898fb8 commit 7a6c1e3
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 3 deletions.
26 changes: 26 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"strings"
"time"

"google.golang.org/grpc"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
Expand Down Expand Up @@ -126,6 +128,9 @@ func registerQuery(app *extkingpin.App) {
endpoints := extkingpin.Addrs(cmd.Flag("endpoint", "Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Thanos API servers through respective DNS lookups.").
PlaceHolder("<endpoint>"))

endpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group", "DNS name of statically configured Thanos API server groups (repeatable). Targets resolved from the DNS name will be queried in a round-robin instead of a fanout manner. This flag should be used when connecting a Thanos Query to HA groups of Thanos components.").
PlaceHolder("<endpoint-group>"))

stores := extkingpin.Addrs(cmd.Flag("store", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups.").
PlaceHolder("<store>"))

Expand All @@ -149,6 +154,9 @@ func registerQuery(app *extkingpin.App) {
strictEndpoints := cmd.Flag("endpoint-strict", "Addresses of only statically configured Thanos API servers that are always used, even if the health check fails. Useful if you have a caching layer on top.").
PlaceHolder("<staticendpoint>").Strings()

strictEndpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group-strict", "DNS name of statically configured Thanos API server groups (repeatable) that are always used, even if the health check fails.").
PlaceHolder("<endpoint-group-strict>"))

fileSDFiles := cmd.Flag("store.sd-files", "Path to files that contain addresses of store API servers. The path can be a glob pattern (repeatable).").
PlaceHolder("<path>").Strings()

Expand Down Expand Up @@ -289,6 +297,7 @@ func registerQuery(app *extkingpin.App) {
selectorLset,
getFlagsMap(cmd.Flags()),
*endpoints,
*endpointGroups,
*stores,
*ruleEndpoints,
*targetEndpoints,
Expand All @@ -310,6 +319,7 @@ func registerQuery(app *extkingpin.App) {
*defaultMetadataTimeRange,
*strictStores,
*strictEndpoints,
*strictEndpointGroups,
*webDisableCORS,
enableQueryPushdown,
*alertQueryURL,
Expand Down Expand Up @@ -365,6 +375,7 @@ func runQuery(
selectorLset labels.Labels,
flagsMap map[string]string,
endpointAddrs []string,
endpointGroupAddrs []string,
storeAddrs []string,
ruleAddrs []string,
targetAddrs []string,
Expand All @@ -386,6 +397,7 @@ func runQuery(
defaultMetadataTimeRange time.Duration,
strictStores []string,
strictEndpoints []string,
strictEndpointGroups []string,
disableCORS bool,
enableQueryPushdown bool,
alertQueryURL string,
Expand Down Expand Up @@ -498,6 +510,20 @@ func runQuery(
specs = append(specs, tmpSpecs...)
}

for _, eg := range endpointGroupAddrs {
addr := fmt.Sprintf("dns:///%s", eg)
spec := query.NewGRPCEndpointSpec(addr, false, extgrpc.EndpointGroupGRPCOpts()...)
specs = append(specs, spec)
}

for _, eg := range strictEndpointGroups {
grpc.WithDisableRetry()

addr := fmt.Sprintf("dns:///%s", eg)
spec := query.NewGRPCEndpointSpec(addr, true, extgrpc.EndpointGroupGRPCOpts()...)
specs = append(specs, spec)
}

return specs
},
dialOpts,
Expand Down
11 changes: 11 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,17 @@ Flags:
prefixed with 'dns+' or 'dnssrv+' to detect
Thanos API servers through respective DNS
lookups.
--endpoint-group=<endpoint-group> ...
DNS name of statically configured Thanos API
server groups (repeatable). Targets resolved
from the DNS name will be queried in a
round-robin instead of a fanout manner. This
flag should be used when connecting a Thanos
Query to HA groups of Thanos components.
--endpoint-group-strict=<endpoint-group-strict> ...
DNS name of statically configured Thanos API
server groups (repeatable) that are always
used, even if the health check fails.
--endpoint-strict=<staticendpoint> ...
Addresses of only statically configured Thanos
API servers that are always used, even if
Expand Down
21 changes: 21 additions & 0 deletions pkg/extgrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,27 @@ import (
"github.com/thanos-io/thanos/pkg/tracing"
)

// EndpointGroupGRPCOpts creates gRPC dial options for connecting to endpoint groups.
// For details on retry capabilities, see https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy-capabilities
func EndpointGroupGRPCOpts() []grpc.DialOption {
serviceConfig := `
{
"loadBalancingPolicy":"round_robin",
"retryPolicy": {
"maxAttempts": 3,
"initialBackoff": "0.1s",
"backoffMultiplier": 2,
"retryableStatusCodes": [
"UNAVAILABLE"
]
}
}`

return []grpc.DialOption{
grpc.WithDefaultServiceConfig(serviceConfig),
}
}

// StoreClientGRPCOpts creates gRPC dial options for connecting to a store client.
func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure, skipVerify bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) {
grpcMets := grpc_prometheus.NewClientMetrics()
Expand Down
12 changes: 9 additions & 3 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,19 @@ const (
type GRPCEndpointSpec struct {
addr string
isStrictStatic bool
dialOpts []grpc.DialOption
}

const externalLabelLimit = 1000

// NewGRPCEndpointSpec creates gRPC endpoint spec.
// It uses InfoAPI to get Metadata.
func NewGRPCEndpointSpec(addr string, isStrictStatic bool) *GRPCEndpointSpec {
return &GRPCEndpointSpec{addr: addr, isStrictStatic: isStrictStatic}
func NewGRPCEndpointSpec(addr string, isStrictStatic bool, dialOpts ...grpc.DialOption) *GRPCEndpointSpec {
return &GRPCEndpointSpec{
addr: addr,
isStrictStatic: isStrictStatic,
dialOpts: dialOpts,
}
}

func (es *GRPCEndpointSpec) Addr() string {
Expand Down Expand Up @@ -618,7 +623,8 @@ type endpointRef struct {
// newEndpointRef creates a new endpointRef with a gRPC channel to the given the IP address.
// The call to newEndpointRef will return an error if establishing the channel fails.
func (e *EndpointSet) newEndpointRef(ctx context.Context, spec *GRPCEndpointSpec) (*endpointRef, error) {
conn, err := grpc.DialContext(ctx, spec.Addr(), e.dialOpts...)
dialOpts := append(e.dialOpts, spec.dialOpts...)
conn, err := grpc.DialContext(ctx, spec.Addr(), dialOpts...)
if err != nil {
return nil, errors.Wrap(err, "dialing connection")
}
Expand Down

0 comments on commit 7a6c1e3

Please sign in to comment.