Skip to content

Commit

Permalink
Distributor push wrapper should only receive unforwarded samples. (#2980
Browse files Browse the repository at this point in the history
)

* Distributor push wrapper should only receive unforwarded samples.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
  • Loading branch information
pstibrany authored Nov 11, 2022
1 parent 5356edd commit 3d14b39
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 22 deletions.
18 changes: 5 additions & 13 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ type Config struct {
// This allows downstream projects to wrap the distributor push function
// and access the deserialized write requests before/after they are pushed.
// This function will only receive samples that don't get forwarded to an
// alternative remote_write endpoint by the distributor's forwarding feature.
// alternative remote_write endpoint by the distributor's forwarding feature,
// or dropped by HA deduplication.
DistributorPushWrapper DistributorPushWrapper `yaml:"-"`

// The CustomConfigHandler allows for providing a different handler for the
Expand All @@ -84,15 +85,6 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.PrometheusHTTPPrefix, prefix+"http.prometheus-http-prefix", "/prometheus", "HTTP URL path under which the Prometheus api will be served.")
}

// Push either wraps the distributor push function as configured or returns the distributor push directly.
func (cfg *Config) wrapDistributorPush(next push.Func) push.Func {
if cfg.DistributorPushWrapper != nil {
return cfg.DistributorPushWrapper(next)
}

return next
}

type API struct {
AuthMiddleware middleware.Interface

Expand Down Expand Up @@ -242,9 +234,9 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc, userL
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, reg prometheus.Registerer) {
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

wrappedPush := a.cfg.wrapDistributorPush(d.PushWithMiddlewares)
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, wrappedPush), true, false, "POST")
a.RegisterRoute("/otlp/v1/metrics", push.OTLPHandler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, reg, wrappedPush), true, false, "POST")
pushFn := d.GetPushFunc(a.cfg.DistributorPushWrapper)
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, pushFn), true, false, "POST")
a.RegisterRoute("/otlp/v1/metrics", push.OTLPHandler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, reg, pushFn), true, false, "POST")

a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{
{Desc: "Ring status", Path: "/distributor/ring"},
Expand Down
19 changes: 13 additions & 6 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ type Distributor struct {
sampleValidationMetrics *validation.SampleValidationMetrics
exemplarValidationMetrics *validation.ExemplarValidationMetrics
metadataValidationMetrics *validation.MetadataValidationMetrics

PushWithMiddlewares push.Func
}

// Config contains the configuration required to
Expand Down Expand Up @@ -427,8 +425,6 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
subservices = append(subservices, d.forwarder)
}

d.PushWithMiddlewares = d.wrapPushWithMiddlewares(d.push)

subservices = append(subservices, d.ingesterPool, d.activeUsers)
d.subservices, err = services.NewManager(subservices...)
if err != nil {
Expand Down Expand Up @@ -666,7 +662,8 @@ func (d *Distributor) validateSeries(nowt time.Time, ts mimirpb.PreallocTimeseri
return nil
}

func (d *Distributor) wrapPushWithMiddlewares(next push.Func) push.Func {
// wrapPushWithMiddlewares returns push function wrapped in all Distributor's middlewares.
func (d *Distributor) wrapPushWithMiddlewares(externalMiddleware func(next push.Func) push.Func, next push.Func) push.Func {
var middlewares []func(push.Func) push.Func

// The middlewares will be applied to the request (!) in the specified order, from first to last.
Expand All @@ -678,6 +675,9 @@ func (d *Distributor) wrapPushWithMiddlewares(next push.Func) push.Func {
middlewares = append(middlewares, d.prePushRelabelMiddleware)
middlewares = append(middlewares, d.prePushValidationMiddleware)
middlewares = append(middlewares, d.prePushForwardingMiddleware)
if externalMiddleware != nil {
middlewares = append(middlewares, externalMiddleware)
}

for ix := len(middlewares) - 1; ix >= 0; ix-- {
next = middlewares[ix](next)
Expand Down Expand Up @@ -1119,7 +1119,14 @@ func (d *Distributor) forwardSamples(ctx context.Context, userID string, ts []mi
func (d *Distributor) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
pushReq := push.NewParsedRequest(req)
pushReq.AddCleanup(func() { mimirpb.ReuseSlice(req.Timeseries) })
return d.PushWithMiddlewares(ctx, pushReq)

return d.GetPushFunc(nil)(ctx, pushReq)
}

// GetPushFunc returns push.Func that can be used by push handler.
// Wrapper, if not nil, is added to the list of distributor middlewares.
func (d *Distributor) GetPushFunc(externalMiddleware func(next push.Func) push.Func) push.Func {
return d.wrapPushWithMiddlewares(externalMiddleware, d.push)
}

// push takes a write request and distributes it to ingesters using the ring.
Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2965,7 +2965,7 @@ func TestInstanceLimitsBeforeHaDedupe(t *testing.T) {
enableTracker: true,
maxInflightRequests: 1,
})
wrappedMockPush := ds[0].wrapPushWithMiddlewares(mockPush)
wrappedMockPush := ds[0].wrapPushWithMiddlewares(nil, mockPush)

// Make sure first request hits the limit.
ds[0].inflightPushRequests.Inc()
Expand Down Expand Up @@ -3176,7 +3176,7 @@ func TestHaDedupeAndRelabelBeforeForwarding(t *testing.T) {
forwarding: true,
getForwarder: getForwarder,
})
wrappedMockPush := ds[0].wrapPushWithMiddlewares(mockPush)
wrappedMockPush := ds[0].wrapPushWithMiddlewares(nil, mockPush)

// Submit the two write requests into the wrapped mock push function, it should:
// 1) Perform HA-deduplication
Expand Down Expand Up @@ -3310,7 +3310,7 @@ func TestValidationBeforeForwarding(t *testing.T) {
forwarding: true,
getForwarder: getForwarder,
})
wrappedMockPush := ds[0].wrapPushWithMiddlewares(mockPush)
wrappedMockPush := ds[0].wrapPushWithMiddlewares(nil, mockPush)

// Submit the write request into the wrapped mock push function,
// before samples get forwarded the invalid ones should be removed.
Expand Down

0 comments on commit 3d14b39

Please sign in to comment.