Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement endpoint groups #5548

Merged
merged 4 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ func registerQuery(app *extkingpin.App) {
endpoints := extkingpin.Addrs(cmd.Flag("endpoint", "Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Thanos API servers through respective DNS lookups.").
PlaceHolder("<endpoint>"))

endpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group", "Experimental: DNS name of statically configured Thanos API server groups (repeatable). Targets resolved from the DNS name will be queried in a round-robin, instead of a fanout manner. This flag should be used when connecting a Thanos Query to HA groups of Thanos components.").
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HA groups of Thanos components. means store only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be any component, querier, store, exemplar etc.. as long as they implement the same APIs. So a group should not have a mix of components.

PlaceHolder("<endpoint-group>"))

stores := extkingpin.Addrs(cmd.Flag("store", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups.").
PlaceHolder("<store>"))

Expand All @@ -151,6 +154,9 @@ func registerQuery(app *extkingpin.App) {
strictEndpoints := cmd.Flag("endpoint-strict", "Addresses of only statically configured Thanos API servers that are always used, even if the health check fails. Useful if you have a caching layer on top.").
PlaceHolder("<staticendpoint>").Strings()

strictEndpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group-strict", "Experimental: DNS name of statically configured Thanos API server groups (repeatable) that are always used, even if the health check fails.").
PlaceHolder("<endpoint-group-strict>"))

fileSDFiles := cmd.Flag("store.sd-files", "Path to files that contain addresses of store API servers. The path can be a glob pattern (repeatable).").
PlaceHolder("<path>").Strings()

Expand Down Expand Up @@ -291,6 +297,7 @@ func registerQuery(app *extkingpin.App) {
selectorLset,
getFlagsMap(cmd.Flags()),
*endpoints,
*endpointGroups,
*stores,
*ruleEndpoints,
*targetEndpoints,
Expand All @@ -312,6 +319,7 @@ func registerQuery(app *extkingpin.App) {
*defaultMetadataTimeRange,
*strictStores,
*strictEndpoints,
*strictEndpointGroups,
*webDisableCORS,
enableQueryPushdown,
*alertQueryURL,
Expand Down Expand Up @@ -367,6 +375,7 @@ func runQuery(
selectorLset labels.Labels,
flagsMap map[string]string,
endpointAddrs []string,
endpointGroupAddrs []string,
storeAddrs []string,
ruleAddrs []string,
targetAddrs []string,
Expand All @@ -388,6 +397,7 @@ func runQuery(
defaultMetadataTimeRange time.Duration,
strictStores []string,
strictEndpoints []string,
strictEndpointGroups []string,
disableCORS bool,
enableQueryPushdown bool,
alertQueryURL string,
Expand Down Expand Up @@ -500,6 +510,18 @@ func runQuery(
specs = append(specs, tmpSpecs...)
}

for _, eg := range endpointGroupAddrs {
addr := fmt.Sprintf("dns:///%s", eg)
spec := query.NewGRPCEndpointSpec(addr, false, extgrpc.EndpointGroupGRPCOpts()...)
specs = append(specs, spec)
}

for _, eg := range strictEndpointGroups {
addr := fmt.Sprintf("dns:///%s", eg)
spec := query.NewGRPCEndpointSpec(addr, true, extgrpc.EndpointGroupGRPCOpts()...)
specs = append(specs, spec)
}

return specs
},
dialOpts,
Expand Down
11 changes: 11 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,17 @@ Flags:
prefixed with 'dns+' or 'dnssrv+' to detect
Thanos API servers through respective DNS
lookups.
--endpoint-group=<endpoint-group> ...
Experimental: DNS name of statically configured
Thanos API server groups (repeatable). Targets
resolved from the DNS name will be queried in
a round-robin, instead of a fanout manner.
This flag should be used when connecting a
Thanos Query to HA groups of Thanos components.
--endpoint-group-strict=<endpoint-group-strict> ...
Experimental: DNS name of statically configured
Thanos API server groups (repeatable) that are
always used, even if the health check fails.
--endpoint-strict=<staticendpoint> ...
Addresses of only statically configured Thanos
API servers that are always used, even if
Expand Down
21 changes: 21 additions & 0 deletions pkg/extgrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,27 @@ import (
"github.com/thanos-io/thanos/pkg/tracing"
)

// EndpointGroupGRPCOpts creates gRPC dial options for connecting to endpoint groups.
// For details on retry capabilities, see https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy-capabilities
func EndpointGroupGRPCOpts() []grpc.DialOption {
serviceConfig := `
{
"loadBalancingPolicy":"round_robin",
"retryPolicy": {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this should be configurable via flags?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this could be a good idea. The new endpoint config would also help here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this flag in this PR, with this particular config as the default? 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This policy is really an implementation detail, I don't think it makes sense to expose it to the end user. The retry always has to be there so that the client will re-resolve dns endpoints when the one it's connected to goes away. This is how HA is implemented in gRPC.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see!

"maxAttempts": 3,
"initialBackoff": "0.1s",
"backoffMultiplier": 2,
"retryableStatusCodes": [
"UNAVAILABLE"
]
}
}`

return []grpc.DialOption{
grpc.WithDefaultServiceConfig(serviceConfig),
}
}

// StoreClientGRPCOpts creates gRPC dial options for connecting to a store client.
func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure, skipVerify bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) {
grpcMets := grpc_prometheus.NewClientMetrics()
Expand Down
12 changes: 9 additions & 3 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,19 @@ const (
type GRPCEndpointSpec struct {
addr string
isStrictStatic bool
dialOpts []grpc.DialOption
}

const externalLabelLimit = 1000

// NewGRPCEndpointSpec creates gRPC endpoint spec.
// It uses InfoAPI to get Metadata.
func NewGRPCEndpointSpec(addr string, isStrictStatic bool) *GRPCEndpointSpec {
return &GRPCEndpointSpec{addr: addr, isStrictStatic: isStrictStatic}
func NewGRPCEndpointSpec(addr string, isStrictStatic bool, dialOpts ...grpc.DialOption) *GRPCEndpointSpec {
return &GRPCEndpointSpec{
addr: addr,
isStrictStatic: isStrictStatic,
dialOpts: dialOpts,
}
}

func (es *GRPCEndpointSpec) Addr() string {
Expand Down Expand Up @@ -622,7 +627,8 @@ type endpointRef struct {
// newEndpointRef creates a new endpointRef with a gRPC channel to the given the IP address.
// The call to newEndpointRef will return an error if establishing the channel fails.
func (e *EndpointSet) newEndpointRef(ctx context.Context, spec *GRPCEndpointSpec) (*endpointRef, error) {
conn, err := grpc.DialContext(ctx, spec.Addr(), e.dialOpts...)
dialOpts := append(e.dialOpts, spec.dialOpts...)
conn, err := grpc.DialContext(ctx, spec.Addr(), dialOpts...)
fpetkovski marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, errors.Wrap(err, "dialing connection")
}
Expand Down