Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributor push wrapper should only receive unforwarded samples. #2980

Merged
merged 2 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ type Config struct {
// This allows downstream projects to wrap the distributor push function
// and access the deserialized write requests before/after they are pushed.
// This function will only receive samples that don't get forwarded to an
// alternative remote_write endpoint by the distributor's forwarding feature.
// alternative remote_write endpoint by the distributor's forwarding feature,
// or dropped by HA deduplication.
DistributorPushWrapper DistributorPushWrapper `yaml:"-"`

// The CustomConfigHandler allows for providing a different handler for the
Expand All @@ -84,15 +85,6 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.PrometheusHTTPPrefix, prefix+"http.prometheus-http-prefix", "/prometheus", "HTTP URL path under which the Prometheus api will be served.")
}

// Push either wraps the distributor push function as configured or returns the distributor push directly.
func (cfg *Config) wrapDistributorPush(next push.Func) push.Func {
if cfg.DistributorPushWrapper != nil {
return cfg.DistributorPushWrapper(next)
}

return next
}

type API struct {
AuthMiddleware middleware.Interface

Expand Down Expand Up @@ -242,9 +234,9 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc, userL
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, reg prometheus.Registerer) {
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

wrappedPush := a.cfg.wrapDistributorPush(d.PushWithMiddlewares)
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, wrappedPush), true, false, "POST")
a.RegisterRoute("/otlp/v1/metrics", push.OTLPHandler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, reg, wrappedPush), true, false, "POST")
pushFn := d.GetPushFunc(a.cfg.DistributorPushWrapper)
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, pushFn), true, false, "POST")
a.RegisterRoute("/otlp/v1/metrics", push.OTLPHandler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, reg, pushFn), true, false, "POST")

a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{
{Desc: "Ring status", Path: "/distributor/ring"},
Expand Down
19 changes: 13 additions & 6 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ type Distributor struct {
sampleValidationMetrics *validation.SampleValidationMetrics
exemplarValidationMetrics *validation.ExemplarValidationMetrics
metadataValidationMetrics *validation.MetadataValidationMetrics

PushWithMiddlewares push.Func
}

// Config contains the configuration required to
Expand Down Expand Up @@ -427,8 +425,6 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
subservices = append(subservices, d.forwarder)
}

d.PushWithMiddlewares = d.wrapPushWithMiddlewares(d.push)

subservices = append(subservices, d.ingesterPool, d.activeUsers)
d.subservices, err = services.NewManager(subservices...)
if err != nil {
Expand Down Expand Up @@ -666,7 +662,8 @@ func (d *Distributor) validateSeries(nowt time.Time, ts mimirpb.PreallocTimeseri
return nil
}

func (d *Distributor) wrapPushWithMiddlewares(next push.Func) push.Func {
// wrapPushWithMiddlewares returns push function wrapped in all Distributor's middlewares.
func (d *Distributor) wrapPushWithMiddlewares(externalMiddleware func(next push.Func) push.Func, next push.Func) push.Func {
var middlewares []func(push.Func) push.Func

// The middlewares will be applied to the request (!) in the specified order, from first to last.
Expand All @@ -678,6 +675,9 @@ func (d *Distributor) wrapPushWithMiddlewares(next push.Func) push.Func {
middlewares = append(middlewares, d.prePushRelabelMiddleware)
middlewares = append(middlewares, d.prePushValidationMiddleware)
middlewares = append(middlewares, d.prePushForwardingMiddleware)
if externalMiddleware != nil {
middlewares = append(middlewares, externalMiddleware)
}

for ix := len(middlewares) - 1; ix >= 0; ix-- {
next = middlewares[ix](next)
Expand Down Expand Up @@ -1119,7 +1119,14 @@ func (d *Distributor) forwardSamples(ctx context.Context, userID string, ts []mi
func (d *Distributor) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
pushReq := push.NewParsedRequest(req)
pushReq.AddCleanup(func() { mimirpb.ReuseSlice(req.Timeseries) })
return d.PushWithMiddlewares(ctx, pushReq)

return d.GetPushFunc(nil)(ctx, pushReq)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rewrap the middlewares for each Push() request (previously we were not doing it). Distributor.Push() is called by the ruler. We should fix it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done here: #3462

}

// GetPushFunc returns push.Func that can be used by push handler.
// Wrapper, if not nil, is added to the list of distributor middlewares.
func (d *Distributor) GetPushFunc(externalMiddleware func(next push.Func) push.Func) push.Func {
return d.wrapPushWithMiddlewares(externalMiddleware, d.push)
}

// push takes a write request and distributes it to ingesters using the ring.
Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2965,7 +2965,7 @@ func TestInstanceLimitsBeforeHaDedupe(t *testing.T) {
enableTracker: true,
maxInflightRequests: 1,
})
wrappedMockPush := ds[0].wrapPushWithMiddlewares(mockPush)
wrappedMockPush := ds[0].wrapPushWithMiddlewares(nil, mockPush)

// Make sure first request hits the limit.
ds[0].inflightPushRequests.Inc()
Expand Down Expand Up @@ -3176,7 +3176,7 @@ func TestHaDedupeAndRelabelBeforeForwarding(t *testing.T) {
forwarding: true,
getForwarder: getForwarder,
})
wrappedMockPush := ds[0].wrapPushWithMiddlewares(mockPush)
wrappedMockPush := ds[0].wrapPushWithMiddlewares(nil, mockPush)

// Submit the two write requests into the wrapped mock push function, it should:
// 1) Perform HA-deduplication
Expand Down Expand Up @@ -3310,7 +3310,7 @@ func TestValidationBeforeForwarding(t *testing.T) {
forwarding: true,
getForwarder: getForwarder,
})
wrappedMockPush := ds[0].wrapPushWithMiddlewares(mockPush)
wrappedMockPush := ds[0].wrapPushWithMiddlewares(nil, mockPush)

// Submit the write request into the wrapped mock push function,
// before samples get forwarded the invalid ones should be removed.
Expand Down