From fbca054af61c26fe045c64c5f3d2b5dae0cfe28c Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 19 Jan 2024 21:25:30 +0000 Subject: [PATCH 01/16] storage: don't wrap single querier in merge-queriers If given a single querier, just return it instead of constructing a complicated wrapper. The code in `mergeGenericQuerier` which skipped merging when there was only one is not needed any more. This change required a few tests to be tweaked, because they relied on the specific behaviour of `mergeGenericQuerier.Select()`. Signed-off-by: Bryan Boreham --- storage/merge.go | 26 +++++++++++++++++--------- storage/merge_test.go | 22 ++++++++++------------ 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/storage/merge.go b/storage/merge.go index 38897449b5..8f2dcb82ef 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -46,9 +46,15 @@ type mergeGenericQuerier struct { // // In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used. func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier { - if len(primaries)+len(secondaries) == 0 { - return NoopQuerier() + switch { + case len(primaries)+len(secondaries) == 0: + return noopQuerier{} + case len(primaries) == 1 && len(secondaries) == 0: + return primaries[0] + case len(primaries) == 0 && len(secondaries) == 1: + return secondaries[0] } + queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries)) for _, q := range primaries { if _, ok := q.(noopQuerier); !ok && q != nil { @@ -78,6 +84,15 @@ func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMer // In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used. // TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670 func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier { + switch { + case len(primaries) == 0 && len(secondaries) == 0: + return noopChunkQuerier{} + case len(primaries) == 1 && len(secondaries) == 0: + return primaries[0] + case len(primaries) == 0 && len(secondaries) == 1: + return secondaries[0] + } + queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries)) for _, q := range primaries { if _, ok := q.(noopChunkQuerier); !ok && q != nil { @@ -103,13 +118,6 @@ func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn Vertica // Select returns a set of series that matches the given label matchers. func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet { - if len(q.queriers) == 0 { - return noopGenericSeriesSet{} - } - if len(q.queriers) == 1 { - return q.queriers[0].Select(ctx, sortSeries, hints, matchers...) - } - seriesSets := make([]genericSeriesSet, 0, len(q.queriers)) if !q.concurrentSelect { for _, querier := range q.queriers { diff --git a/storage/merge_test.go b/storage/merge_test.go index 05e1c75278..f42869d8ea 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -180,9 +180,9 @@ func TestMergeQuerierWithChainMerger(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - var p Querier + var p []Querier if tc.primaryQuerierSeries != nil { - p = &mockQuerier{toReturn: tc.primaryQuerierSeries} + p = append(p, &mockQuerier{toReturn: tc.primaryQuerierSeries}) } var qs []Querier for _, in := range tc.querierSeries { @@ -190,7 +190,7 @@ func TestMergeQuerierWithChainMerger(t *testing.T) { } qs = append(qs, tc.extraQueriers...) - mergedQuerier := NewMergeQuerier([]Querier{p}, qs, ChainedSeriesMerge).Select(context.Background(), false, nil) + mergedQuerier := NewMergeQuerier(p, qs, ChainedSeriesMerge).Select(context.Background(), false, nil) // Get all merged series upfront to make sure there are no incorrectly retained shared // buffers causing bugs. @@ -355,9 +355,9 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - var p ChunkQuerier + var p []ChunkQuerier if tc.primaryChkQuerierSeries != nil { - p = &mockChunkQurier{toReturn: tc.primaryChkQuerierSeries} + p = append(p, &mockChunkQurier{toReturn: tc.primaryChkQuerierSeries}) } var qs []ChunkQuerier @@ -366,7 +366,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) { } qs = append(qs, tc.extraQueriers...) - merged := NewMergeChunkQuerier([]ChunkQuerier{p}, qs, NewCompactingChunkSeriesMerger(nil)).Select(context.Background(), false, nil) + merged := NewMergeChunkQuerier(p, qs, NewCompactingChunkSeriesMerger(nil)).Select(context.Background(), false, nil) for merged.Next() { require.True(t, tc.expected.Next(), "Expected Next() to be true") actualSeries := merged.At() @@ -1444,6 +1444,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { expectedErrs [4]error }{ { + // NewMergeQuerier will not create a mergeGenericQuerier + // with just one querier inside, but we can test it anyway. name: "one successful primary querier", queriers: []genericQuerier{&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}}, expectedSelectsSeries: []labels.Labels{ @@ -1552,12 +1554,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { for _, qr := range q.queriers { m := unwrapMockGenericQuerier(t, qr) - - exp := []bool{true} - if len(q.queriers) == 1 { - exp[0] = false - } - require.Equal(t, exp, m.sortedSeriesRequested) + // mergeGenericQuerier forces all Selects to be sorted. + require.Equal(t, []bool{true}, m.sortedSeriesRequested) } }) t.Run("LabelNames", func(t *testing.T) { From ea82b49c33a1440d91b6e33719ddba5062011807 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 9 May 2024 14:29:34 +0100 Subject: [PATCH 02/16] [ENHANCEMENT] PromQL: use Kahan summation for sum() This can give a more precise result, by keeping a separate running compensation value to accumulate small errors. See https://en.wikipedia.org/wiki/Kahan_summation_algorithm Signed-off-by: Bryan Boreham --- promql/engine.go | 13 +++++++++---- promql/promqltest/testdata/aggregators.test | 12 ++++++++++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index ea4bc1af85..24c616a835 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2730,7 +2730,7 @@ type groupedAggregation struct { hasHistogram bool // Has at least 1 histogram sample aggregated. floatValue float64 histogramValue *histogram.FloatHistogram - floatMean float64 + floatMean float64 // Mean, or "compensating value" for Kahan summation. groupCount int heap vectorByValueHeap } @@ -2758,11 +2758,13 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix *group = groupedAggregation{ seen: true, floatValue: f, - floatMean: f, groupCount: 1, } switch op { - case parser.SUM, parser.AVG: + case parser.AVG: + group.floatMean = f + fallthrough + case parser.SUM: if h == nil { group.hasFloat = true } else { @@ -2770,6 +2772,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix group.hasHistogram = true } case parser.STDVAR, parser.STDDEV: + group.floatMean = f group.floatValue = 0 case parser.QUANTILE: group.heap = make(vectorByValueHeap, 1) @@ -2792,7 +2795,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // point in copying the histogram in that case. } else { group.hasFloat = true - group.floatValue += f + group.floatValue, group.floatMean = kahanSumInc(f, group.floatValue, group.floatMean) } case parser.AVG: @@ -2903,6 +2906,8 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix } if aggr.hasHistogram { aggr.histogramValue.Compact(0) + } else { + aggr.floatValue += aggr.floatMean // Add Kahan summation compensating term. } default: // For other aggregations, we already have the right value. diff --git a/promql/promqltest/testdata/aggregators.test b/promql/promqltest/testdata/aggregators.test index 8709b393b2..be689c65f6 100644 --- a/promql/promqltest/testdata/aggregators.test +++ b/promql/promqltest/testdata/aggregators.test @@ -503,6 +503,18 @@ eval instant at 1m avg(data{test="-big"}) eval instant at 1m avg(data{test="bigzero"}) {} 0 +# Test summing extreme values. +clear + +load 10s + data{test="ten",point="a"} 2 + data{test="ten",point="b"} 8 + data{test="ten",point="c"} 1e+100 + data{test="ten",point="d"} -1e100 + +eval instant at 1m sum(data{test="ten"}) + {} 10 + clear # Test that aggregations are deterministic. From 2aaf99dd0ad23266a09e8be87087fa08c89d3f3e Mon Sep 17 00:00:00 2001 From: akunszt <32456696+akunszt@users.noreply.github.com> Date: Thu, 20 Jun 2024 15:36:20 +0200 Subject: [PATCH 03/16] discovery: aws: expose Primary IPv6 addresses as label, partially fixes #7406 (#14156) * discovery: aws: expose Primary IPv6 addresses as label Add __meta_ec2_primary_ipv6_addresses label. This label contains the Primary IPv6 address for every ENI attached to the EC2 instance. It is ordered by the DeviceIndex and the missing elements (interface without Primary IPv6 address) are kept in the list. --------- Signed-off-by: Arpad Kunszt Co-authored-by: Ayoub Mrini --- discovery/aws/ec2.go | 61 ++++++++++++++++++----------- docs/configuration/configuration.md | 1 + 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/discovery/aws/ec2.go b/discovery/aws/ec2.go index a6a0a82577..a44912481a 100644 --- a/discovery/aws/ec2.go +++ b/discovery/aws/ec2.go @@ -42,28 +42,29 @@ import ( ) const ( - ec2Label = model.MetaLabelPrefix + "ec2_" - ec2LabelAMI = ec2Label + "ami" - ec2LabelAZ = ec2Label + "availability_zone" - ec2LabelAZID = ec2Label + "availability_zone_id" - ec2LabelArch = ec2Label + "architecture" - ec2LabelIPv6Addresses = ec2Label + "ipv6_addresses" - ec2LabelInstanceID = ec2Label + "instance_id" - ec2LabelInstanceLifecycle = ec2Label + "instance_lifecycle" - ec2LabelInstanceState = ec2Label + "instance_state" - ec2LabelInstanceType = ec2Label + "instance_type" - ec2LabelOwnerID = ec2Label + "owner_id" - ec2LabelPlatform = ec2Label + "platform" - ec2LabelPrimarySubnetID = ec2Label + "primary_subnet_id" - ec2LabelPrivateDNS = ec2Label + "private_dns_name" - ec2LabelPrivateIP = ec2Label + "private_ip" - ec2LabelPublicDNS = ec2Label + "public_dns_name" - ec2LabelPublicIP = ec2Label + "public_ip" - ec2LabelRegion = ec2Label + "region" - ec2LabelSubnetID = ec2Label + "subnet_id" - ec2LabelTag = ec2Label + "tag_" - ec2LabelVPCID = ec2Label + "vpc_id" - ec2LabelSeparator = "," + ec2Label = model.MetaLabelPrefix + "ec2_" + ec2LabelAMI = ec2Label + "ami" + ec2LabelAZ = ec2Label + "availability_zone" + ec2LabelAZID = ec2Label + "availability_zone_id" + ec2LabelArch = ec2Label + "architecture" + ec2LabelIPv6Addresses = ec2Label + "ipv6_addresses" + ec2LabelInstanceID = ec2Label + "instance_id" + ec2LabelInstanceLifecycle = ec2Label + "instance_lifecycle" + ec2LabelInstanceState = ec2Label + "instance_state" + ec2LabelInstanceType = ec2Label + "instance_type" + ec2LabelOwnerID = ec2Label + "owner_id" + ec2LabelPlatform = ec2Label + "platform" + ec2LabelPrimaryIPv6Addresses = ec2Label + "primary_ipv6_addresses" + ec2LabelPrimarySubnetID = ec2Label + "primary_subnet_id" + ec2LabelPrivateDNS = ec2Label + "private_dns_name" + ec2LabelPrivateIP = ec2Label + "private_ip" + ec2LabelPublicDNS = ec2Label + "public_dns_name" + ec2LabelPublicIP = ec2Label + "public_ip" + ec2LabelRegion = ec2Label + "region" + ec2LabelSubnetID = ec2Label + "subnet_id" + ec2LabelTag = ec2Label + "tag_" + ec2LabelVPCID = ec2Label + "vpc_id" + ec2LabelSeparator = "," ) // DefaultEC2SDConfig is the default EC2 SD configuration. @@ -317,6 +318,7 @@ func (d *EC2Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error var subnets []string var ipv6addrs []string + var primaryipv6addrs []string subnetsMap := make(map[string]struct{}) for _, eni := range inst.NetworkInterfaces { if eni.SubnetId == nil { @@ -330,6 +332,15 @@ func (d *EC2Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error for _, ipv6addr := range eni.Ipv6Addresses { ipv6addrs = append(ipv6addrs, *ipv6addr.Ipv6Address) + if *ipv6addr.IsPrimaryIpv6 { + // we might have to extend the slice with more than one element + // that could leave empty strings in the list which is intentional + // to keep the position/device index information + for int64(len(primaryipv6addrs)) <= *eni.Attachment.DeviceIndex { + primaryipv6addrs = append(primaryipv6addrs, "") + } + primaryipv6addrs[*eni.Attachment.DeviceIndex] = *ipv6addr.Ipv6Address + } } } labels[ec2LabelSubnetID] = model.LabelValue( @@ -342,6 +353,12 @@ func (d *EC2Discovery) refresh(ctx context.Context) ([]*targetgroup.Group, error strings.Join(ipv6addrs, ec2LabelSeparator) + ec2LabelSeparator) } + if len(primaryipv6addrs) > 0 { + labels[ec2LabelPrimaryIPv6Addresses] = model.LabelValue( + ec2LabelSeparator + + strings.Join(primaryipv6addrs, ec2LabelSeparator) + + ec2LabelSeparator) + } } for _, t := range inst.Tags { diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 5df7dae3c0..164f426ad5 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -1229,6 +1229,7 @@ The following meta labels are available on targets during [relabeling](#relabel_ * `__meta_ec2_ipv6_addresses`: comma separated list of IPv6 addresses assigned to the instance's network interfaces, if present * `__meta_ec2_owner_id`: the ID of the AWS account that owns the EC2 instance * `__meta_ec2_platform`: the Operating System platform, set to 'windows' on Windows servers, absent otherwise +* `__meta_ec2_primary_ipv6_addresses`: comma separated list of the Primary IPv6 addresses of the instance, if present. The list is ordered based on the position of each corresponding network interface in the attachment order. * `__meta_ec2_primary_subnet_id`: the subnet ID of the primary network interface, if available * `__meta_ec2_private_dns_name`: the private DNS name of the instance, if available * `__meta_ec2_private_ip`: the private IP address of the instance, if present From dbd29df5df8c623783e756ebd521ec6804ed2541 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Mierzwa?= Date: Thu, 20 Jun 2024 18:25:44 +0100 Subject: [PATCH 04/16] Fix @goyacc invocation (#14324) goyacc is installed using 'install-goyacc' and ends up in GOPATH/bin. GOPATH isn't usually part of standard PATH, so when make tries to run goyacc it fails, unless PATH includes GOPATH/bin. Other Go tools, like golangci-lint, are also installed via go install into GOPATH/bin but they run correctly because make invocations for them use FIRST_GOPATH viriable to use full path. Call goyacc using FIRST_GOPATH/bin as well so it works without GOPATH being included in PATH. Signed-off-by: Lukasz Mierzwa --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 5dcebfd1af..f2bb3fcb7a 100644 --- a/Makefile +++ b/Makefile @@ -91,7 +91,7 @@ endif promql/parser/generated_parser.y.go: promql/parser/generated_parser.y @echo ">> running goyacc to generate the .go file." - @goyacc -l -o promql/parser/generated_parser.y.go promql/parser/generated_parser.y + @$(FIRST_GOPATH)/bin/goyacc -l -o promql/parser/generated_parser.y.go promql/parser/generated_parser.y .PHONY: clean-parser clean-parser: From d78253319daa62c8f28ed47e40bafcad2dd8b586 Mon Sep 17 00:00:00 2001 From: Piotr <17101802+thampiotr@users.noreply.github.com> Date: Fri, 21 Jun 2024 00:45:13 +0100 Subject: [PATCH 05/16] queue_manager: add histogram info to error logs (#14326) Signed-off-by: Piotr Gwizdala <17101802+thampiotr@users.noreply.github.com> --- storage/remote/queue_manager.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index b244b331b0..488485e385 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -16,6 +16,7 @@ package remote import ( "context" "errors" + "fmt" "math" "strconv" "sync" @@ -1224,12 +1225,16 @@ func (s *shards) stop() { // Force an unclean shutdown. s.hardShutdown() <-s.done - if dropped := s.samplesDroppedOnHardShutdown.Load(); dropped > 0 { - level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown", "count", dropped) - } - if dropped := s.exemplarsDroppedOnHardShutdown.Load(); dropped > 0 { - level.Error(s.qm.logger).Log("msg", "Failed to flush all exemplars on shutdown", "count", dropped) + + // Log error for any dropped samples, exemplars, or histograms. + logDroppedError := func(t string, counter atomic.Uint32) { + if dropped := counter.Load(); dropped > 0 { + level.Error(s.qm.logger).Log("msg", fmt.Sprintf("Failed to flush all %s on shutdown", t), "count", dropped) + } } + logDroppedError("samples", s.samplesDroppedOnHardShutdown) + logDroppedError("exemplars", s.exemplarsDroppedOnHardShutdown) + logDroppedError("histograms", s.histogramsDroppedOnHardShutdown) } // enqueue data (sample or exemplar). If the shard is full, shutting down, or @@ -1537,7 +1542,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s begin := time.Now() err := s.sendSamplesWithBackoff(ctx, samples, sampleCount, exemplarCount, histogramCount, pBuf, buf) if err != nil { - level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "err", err) + level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", sampleCount, "exemplarCount", exemplarCount, "histogramCount", histogramCount, "err", err) s.qm.metrics.failedSamplesTotal.Add(float64(sampleCount)) s.qm.metrics.failedExemplarsTotal.Add(float64(exemplarCount)) s.qm.metrics.failedHistogramsTotal.Add(float64(histogramCount)) From 0d25931049e2e1c3820a8cbf479ec58504094124 Mon Sep 17 00:00:00 2001 From: unknown Date: Sat, 20 Jan 2024 20:51:03 +0800 Subject: [PATCH 06/16] rebase main and adjust the configuration Signed-off-by: ouyang1204@gmail.com --- config/config_test.go | 1 + discovery/moby/docker.go | 58 +++- discovery/moby/docker_test.go | 253 +++++++++++++++++- .../testdata/dockerprom/containers/json.json | 100 +++++++ .../moby/testdata/dockerprom/networks.json | 54 ++++ docs/configuration/configuration.md | 3 + 6 files changed, 466 insertions(+), 3 deletions(-) diff --git a/config/config_test.go b/config/config_test.go index ff056a2676..d84059b48f 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -998,6 +998,7 @@ var expectedConf = &Config{ HostNetworkingHost: "localhost", RefreshInterval: model.Duration(60 * time.Second), HTTPClientConfig: config.DefaultHTTPClientConfig, + MatchFirstNetwork: true, }, }, }, diff --git a/discovery/moby/docker.go b/discovery/moby/docker.go index 6a2b2d9302..11445092ee 100644 --- a/discovery/moby/docker.go +++ b/discovery/moby/docker.go @@ -22,8 +22,10 @@ import ( "strconv" "time" + "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/api/types/network" "github.com/docker/docker/client" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -58,6 +60,7 @@ var DefaultDockerSDConfig = DockerSDConfig{ Filters: []Filter{}, HostNetworkingHost: "localhost", HTTPClientConfig: config.DefaultHTTPClientConfig, + MatchFirstNetwork: true, } func init() { @@ -73,7 +76,8 @@ type DockerSDConfig struct { Filters []Filter `yaml:"filters"` HostNetworkingHost string `yaml:"host_networking_host"` - RefreshInterval model.Duration `yaml:"refresh_interval"` + RefreshInterval model.Duration `yaml:"refresh_interval"` + MatchFirstNetwork bool `yaml:"match_first_network"` } // NewDiscovererMetrics implements discovery.Config. @@ -119,6 +123,7 @@ type DockerDiscovery struct { port int hostNetworkingHost string filters filters.Args + matchFirstNetwork bool } // NewDockerDiscovery returns a new DockerDiscovery which periodically refreshes its targets. @@ -131,6 +136,7 @@ func NewDockerDiscovery(conf *DockerSDConfig, logger log.Logger, metrics discove d := &DockerDiscovery{ port: conf.Port, hostNetworkingHost: conf.HostNetworkingHost, + matchFirstNetwork: conf.MatchFirstNetwork, } hostURL, err := url.Parse(conf.Host) @@ -202,6 +208,11 @@ func (d *DockerDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, er return nil, fmt.Errorf("error while computing network labels: %w", err) } + allContainers := make(map[string]types.Container) + for _, c := range containers { + allContainers[c.ID] = c + } + for _, c := range containers { if len(c.Names) == 0 { continue @@ -218,7 +229,50 @@ func (d *DockerDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, er commonLabels[dockerLabelContainerLabelPrefix+ln] = v } - for _, n := range c.NetworkSettings.Networks { + networks := c.NetworkSettings.Networks + containerNetworkMode := container.NetworkMode(c.HostConfig.NetworkMode) + if len(networks) == 0 { + // Try to lookup shared networks + for { + if containerNetworkMode.IsContainer() { + tmpContainer, exists := allContainers[containerNetworkMode.ConnectedContainer()] + if !exists { + break + } + networks = tmpContainer.NetworkSettings.Networks + containerNetworkMode = container.NetworkMode(tmpContainer.HostConfig.NetworkMode) + if len(networks) > 0 { + break + } + } else { + break + } + } + } + + if d.matchFirstNetwork && len(networks) > 1 { + // Match user defined network + if containerNetworkMode.IsUserDefined() { + networkMode := string(containerNetworkMode) + networks = map[string]*network.EndpointSettings{networkMode: networks[networkMode]} + } else { + // Get first network if container network mode has "none" value. + // This case appears under certain condition: + // 1. Container created with network set to "--net=none". + // 2. Disconnect network "none". + // 3. Reconnect network with user defined networks. + var first string + for k, n := range networks { + if n != nil { + first = k + break + } + } + networks = map[string]*network.EndpointSettings{first: networks[first]} + } + } + + for _, n := range networks { var added bool for _, p := range c.Ports { diff --git a/discovery/moby/docker_test.go b/discovery/moby/docker_test.go index fec56d3e5f..c108ddf582 100644 --- a/discovery/moby/docker_test.go +++ b/discovery/moby/docker_test.go @@ -16,6 +16,7 @@ package moby import ( "context" "fmt" + "sort" "testing" "github.com/go-kit/log" @@ -59,7 +60,7 @@ host: %s tg := tgs[0] require.NotNil(t, tg) require.NotNil(t, tg.Targets) - require.Len(t, tg.Targets, 3) + require.Len(t, tg.Targets, 6) for i, lbls := range []model.LabelSet{ { @@ -113,9 +114,259 @@ host: %s "__meta_docker_container_network_mode": "host", "__meta_docker_network_ip": "", }, + { + "__address__": "172.20.0.2:3306", + "__meta_docker_container_id": "f9ade4b83199d6f83020b7c0bfd1e8281b19dbf9e6cef2cf89bc45c8f8d20fe8", + "__meta_docker_container_label_com_docker_compose_project": "dockersd", + "__meta_docker_container_label_com_docker_compose_service": "mysql", + "__meta_docker_container_label_com_docker_compose_version": "2.2.2", + "__meta_docker_container_name": "/dockersd_mysql", + "__meta_docker_container_network_mode": "dockersd_private", + "__meta_docker_network_id": "e804771e55254a360fdb70dfdd78d3610fdde231b14ef2f837a00ac1eeb9e601", + "__meta_docker_network_ingress": "false", + "__meta_docker_network_internal": "false", + "__meta_docker_network_ip": "172.20.0.2", + "__meta_docker_network_name": "dockersd_private", + "__meta_docker_network_scope": "local", + "__meta_docker_port_private": "3306", + }, + { + "__address__": "172.20.0.2:33060", + "__meta_docker_container_id": "f9ade4b83199d6f83020b7c0bfd1e8281b19dbf9e6cef2cf89bc45c8f8d20fe8", + "__meta_docker_container_label_com_docker_compose_project": "dockersd", + "__meta_docker_container_label_com_docker_compose_service": "mysql", + "__meta_docker_container_label_com_docker_compose_version": "2.2.2", + "__meta_docker_container_name": "/dockersd_mysql", + "__meta_docker_container_network_mode": "dockersd_private", + "__meta_docker_network_id": "e804771e55254a360fdb70dfdd78d3610fdde231b14ef2f837a00ac1eeb9e601", + "__meta_docker_network_ingress": "false", + "__meta_docker_network_internal": "false", + "__meta_docker_network_ip": "172.20.0.2", + "__meta_docker_network_name": "dockersd_private", + "__meta_docker_network_scope": "local", + "__meta_docker_port_private": "33060", + }, + { + "__address__": "172.20.0.2:9104", + "__meta_docker_container_id": "59bf76e8816af98856b90dd619c91027145ca501043b1c51756d03b085882e06", + "__meta_docker_container_label_com_docker_compose_project": "dockersd", + "__meta_docker_container_label_com_docker_compose_service": "mysqlexporter", + "__meta_docker_container_label_com_docker_compose_version": "2.2.2", + "__meta_docker_container_label_maintainer": "The Prometheus Authors ", + "__meta_docker_container_name": "/dockersd_mysql_exporter", + "__meta_docker_container_network_mode": "container:f9ade4b83199d6f83020b7c0bfd1e8281b19dbf9e6cef2cf89bc45c8f8d20fe8", + "__meta_docker_network_id": "e804771e55254a360fdb70dfdd78d3610fdde231b14ef2f837a00ac1eeb9e601", + "__meta_docker_network_ingress": "false", + "__meta_docker_network_internal": "false", + "__meta_docker_network_ip": "172.20.0.2", + "__meta_docker_network_name": "dockersd_private", + "__meta_docker_network_scope": "local", + "__meta_docker_port_private": "9104", + }, } { t.Run(fmt.Sprintf("item %d", i), func(t *testing.T) { require.Equal(t, lbls, tg.Targets[i]) }) } } + +func TestDockerSDRefreshMatchAllNetworks(t *testing.T) { + sdmock := NewSDMock(t, "dockerprom") + sdmock.Setup() + + e := sdmock.Endpoint() + url := e[:len(e)-1] + cfgString := fmt.Sprintf(` +--- +host: %s +`, url) + var cfg DockerSDConfig + require.NoError(t, yaml.Unmarshal([]byte(cfgString), &cfg)) + + cfg.MatchFirstNetwork = false + reg := prometheus.NewRegistry() + refreshMetrics := discovery.NewRefreshMetrics(reg) + metrics := cfg.NewDiscovererMetrics(reg, refreshMetrics) + require.NoError(t, metrics.Register()) + defer metrics.Unregister() + defer refreshMetrics.Unregister() + d, err := NewDockerDiscovery(&cfg, log.NewNopLogger(), metrics) + require.NoError(t, err) + + ctx := context.Background() + tgs, err := d.refresh(ctx) + require.NoError(t, err) + + require.Len(t, tgs, 1) + + tg := tgs[0] + require.NotNil(t, tg) + require.NotNil(t, tg.Targets) + require.Len(t, tg.Targets, 9) + + sortFunc := func(labelSets []model.LabelSet) { + sort.Slice(labelSets, func(i, j int) bool { + return labelSets[i]["__address__"] < labelSets[j]["__address__"] + }) + } + expected := []model.LabelSet{ + { + "__address__": "172.19.0.2:9100", + "__meta_docker_container_id": "c301b928faceb1a18fe379f6bc178727ef920bb30b0f9b8592b32b36255a0eca", + "__meta_docker_container_label_com_docker_compose_project": "dockersd", + "__meta_docker_container_label_com_docker_compose_service": "node", + "__meta_docker_container_label_com_docker_compose_version": "1.25.0", + "__meta_docker_container_label_maintainer": "The Prometheus Authors ", + "__meta_docker_container_label_prometheus_job": "node", + "__meta_docker_container_name": "/dockersd_node_1", + "__meta_docker_container_network_mode": "dockersd_default", + "__meta_docker_network_id": "7189986ab399e144e52a71b7451b4e04e2158c044b4cd2f3ae26fc3a285d3798", + "__meta_docker_network_ingress": "false", + "__meta_docker_network_internal": "false", + "__meta_docker_network_ip": "172.19.0.2", + "__meta_docker_network_label_com_docker_compose_network": "default", + "__meta_docker_network_label_com_docker_compose_project": "dockersd", + "__meta_docker_network_label_com_docker_compose_version": "1.25.0", + "__meta_docker_network_name": "dockersd_default", + "__meta_docker_network_scope": "local", + "__meta_docker_port_private": "9100", + }, + { + "__address__": "172.19.0.3:80", + "__meta_docker_container_id": "c301b928faceb1a18fe379f6bc178727ef920bb30b0f9b8592b32b36255a0eca", + "__meta_docker_container_label_com_docker_compose_project": "dockersd", + "__meta_docker_container_label_com_docker_compose_service": "noport", + "__meta_docker_container_label_com_docker_compose_version": "1.25.0", + "__meta_docker_container_label_maintainer": "The Prometheus Authors ", + "__meta_docker_container_label_prometheus_job": "noport", + "__meta_docker_container_name": "/dockersd_noport_1", + "__meta_docker_container_network_mode": "dockersd_default", + "__meta_docker_network_id": "7189986ab399e144e52a71b7451b4e04e2158c044b4cd2f3ae26fc3a285d3798", + "__meta_docker_network_ingress": "false", + "__meta_docker_network_internal": "false", + "__meta_docker_network_ip": "172.19.0.3", + "__meta_docker_network_label_com_docker_compose_network": "default", + "__meta_docker_network_label_com_docker_compose_project": "dockersd", + "__meta_docker_network_label_com_docker_compose_version": "1.25.0", + "__meta_docker_network_name": "dockersd_default", + "__meta_docker_network_scope": "local", + }, + { + "__address__": "localhost", + "__meta_docker_container_id": "54ed6cc5c0988260436cb0e739b7b6c9cad6c439a93b4c4fdbe9753e1c94b189", + "__meta_docker_container_label_com_docker_compose_project": "dockersd", + "__meta_docker_container_label_com_docker_compose_service": "host_networking", + "__meta_docker_container_label_com_docker_compose_version": "1.25.0", + "__meta_docker_container_name": "/dockersd_host_networking_1", + "__meta_docker_container_network_mode": "host", + "__meta_docker_network_ip": "", + }, + { + "__address__": "172.20.0.2:3306", + "__meta_docker_container_id": "f9ade4b83199d6f83020b7c0bfd1e8281b19dbf9e6cef2cf89bc45c8f8d20fe8", + "__meta_docker_container_label_com_docker_compose_project": "dockersd", + "__meta_docker_container_label_com_docker_compose_service": "mysql", + "__meta_docker_container_label_com_docker_compose_version": "2.2.2", + "__meta_docker_container_name": "/dockersd_mysql", + "__meta_docker_container_network_mode": "dockersd_private", + "__meta_docker_network_id": "e804771e55254a360fdb70dfdd78d3610fdde231b14ef2f837a00ac1eeb9e601", + "__meta_docker_network_ingress": "false", + "__meta_docker_network_internal": "false", + "__meta_docker_network_ip": "172.20.0.2", + "__meta_docker_network_name": "dockersd_private", + "__meta_docker_network_scope": "local", + "__meta_docker_port_private": "3306", + }, + { + "__address__": "172.20.0.2:33060", + "__meta_docker_container_id": "f9ade4b83199d6f83020b7c0bfd1e8281b19dbf9e6cef2cf89bc45c8f8d20fe8", + "__meta_docker_container_label_com_docker_compose_project": "dockersd", + "__meta_docker_container_label_com_docker_compose_service": "mysql", + "__meta_docker_container_label_com_docker_compose_version": "2.2.2", + "__meta_docker_container_name": "/dockersd_mysql", + "__meta_docker_container_network_mode": "dockersd_private", + "__meta_docker_network_id": "e804771e55254a360fdb70dfdd78d3610fdde231b14ef2f837a00ac1eeb9e601", + "__meta_docker_network_ingress": "false", + "__meta_docker_network_internal": "false", + "__meta_docker_network_ip": "172.20.0.2", + "__meta_docker_network_name": "dockersd_private", + "__meta_docker_network_scope": "local", + "__meta_docker_port_private": "33060", + }, + { + "__address__": "172.21.0.2:3306", + "__meta_docker_container_id": "f9ade4b83199d6f83020b7c0bfd1e8281b19dbf9e6cef2cf89bc45c8f8d20fe8", + "__meta_docker_container_label_com_docker_compose_project": "dockersd", + "__meta_docker_container_label_com_docker_compose_service": "mysql", + "__meta_docker_container_label_com_docker_compose_version": "2.2.2", + "__meta_docker_container_name": "/dockersd_mysql", + "__meta_docker_container_network_mode": "dockersd_private", + "__meta_docker_network_id": "bfcf66a6b64f7d518f009e34290dc3f3c66a08164257ad1afc3bd31d75f656e8", + "__meta_docker_network_ingress": "false", + "__meta_docker_network_internal": "false", + "__meta_docker_network_ip": "172.21.0.2", + "__meta_docker_network_name": "dockersd_private1", + "__meta_docker_network_scope": "local", + "__meta_docker_port_private": "3306", + }, + { + "__address__": "172.21.0.2:33060", + "__meta_docker_container_id": "f9ade4b83199d6f83020b7c0bfd1e8281b19dbf9e6cef2cf89bc45c8f8d20fe8", + "__meta_docker_container_label_com_docker_compose_project": "dockersd", + "__meta_docker_container_label_com_docker_compose_service": "mysql", + "__meta_docker_container_label_com_docker_compose_version": "2.2.2", + "__meta_docker_container_name": "/dockersd_mysql", + "__meta_docker_container_network_mode": "dockersd_private", + "__meta_docker_network_id": "bfcf66a6b64f7d518f009e34290dc3f3c66a08164257ad1afc3bd31d75f656e8", + "__meta_docker_network_ingress": "false", + "__meta_docker_network_internal": "false", + "__meta_docker_network_ip": "172.21.0.2", + "__meta_docker_network_name": "dockersd_private1", + "__meta_docker_network_scope": "local", + "__meta_docker_port_private": "33060", + }, + { + "__address__": "172.21.0.2:9104", + "__meta_docker_container_id": "59bf76e8816af98856b90dd619c91027145ca501043b1c51756d03b085882e06", + "__meta_docker_container_label_com_docker_compose_project": "dockersd", + "__meta_docker_container_label_com_docker_compose_service": "mysqlexporter", + "__meta_docker_container_label_com_docker_compose_version": "2.2.2", + "__meta_docker_container_label_maintainer": "The Prometheus Authors ", + "__meta_docker_container_name": "/dockersd_mysql_exporter", + "__meta_docker_container_network_mode": "container:f9ade4b83199d6f83020b7c0bfd1e8281b19dbf9e6cef2cf89bc45c8f8d20fe8", + "__meta_docker_network_id": "bfcf66a6b64f7d518f009e34290dc3f3c66a08164257ad1afc3bd31d75f656e8", + "__meta_docker_network_ingress": "false", + "__meta_docker_network_internal": "false", + "__meta_docker_network_ip": "172.21.0.2", + "__meta_docker_network_name": "dockersd_private1", + "__meta_docker_network_scope": "local", + "__meta_docker_port_private": "9104", + }, + { + "__address__": "172.20.0.2:9104", + "__meta_docker_container_id": "59bf76e8816af98856b90dd619c91027145ca501043b1c51756d03b085882e06", + "__meta_docker_container_label_com_docker_compose_project": "dockersd", + "__meta_docker_container_label_com_docker_compose_service": "mysqlexporter", + "__meta_docker_container_label_com_docker_compose_version": "2.2.2", + "__meta_docker_container_label_maintainer": "The Prometheus Authors ", + "__meta_docker_container_name": "/dockersd_mysql_exporter", + "__meta_docker_container_network_mode": "container:f9ade4b83199d6f83020b7c0bfd1e8281b19dbf9e6cef2cf89bc45c8f8d20fe8", + "__meta_docker_network_id": "e804771e55254a360fdb70dfdd78d3610fdde231b14ef2f837a00ac1eeb9e601", + "__meta_docker_network_ingress": "false", + "__meta_docker_network_internal": "false", + "__meta_docker_network_ip": "172.20.0.2", + "__meta_docker_network_name": "dockersd_private", + "__meta_docker_network_scope": "local", + "__meta_docker_port_private": "9104", + }, + } + + sortFunc(expected) + sortFunc(tg.Targets) + + for i, lbls := range expected { + t.Run(fmt.Sprintf("item %d", i), func(t *testing.T) { + require.Equal(t, lbls, tg.Targets[i]) + }) + } +} diff --git a/discovery/moby/testdata/dockerprom/containers/json.json b/discovery/moby/testdata/dockerprom/containers/json.json index 37f575d22c..ebfc56b6d5 100644 --- a/discovery/moby/testdata/dockerprom/containers/json.json +++ b/discovery/moby/testdata/dockerprom/containers/json.json @@ -128,5 +128,105 @@ } }, "Mounts": [] + }, + { + "Id": "f9ade4b83199d6f83020b7c0bfd1e8281b19dbf9e6cef2cf89bc45c8f8d20fe8", + "Names": [ + "/dockersd_mysql" + ], + "Image": "mysql:5.7.29", + "ImageID": "sha256:5d9483f9a7b21c87e0f5b9776c3e06567603c28c0062013eda127c968175f5e8", + "Command": "mysqld", + "Created": 1616273136, + "Ports": [ + { + "PrivatePort": 3306, + "Type": "tcp" + }, + { + "PrivatePort": 33060, + "Type": "tcp" + } + ], + "Labels": { + "com.docker.compose.project": "dockersd", + "com.docker.compose.service": "mysql", + "com.docker.compose.version": "2.2.2" + }, + "State": "running", + "Status": "Up 40 seconds", + "HostConfig": { + "NetworkMode": "dockersd_private" + }, + "NetworkSettings": { + "Networks": { + "dockersd_private": { + "IPAMConfig": null, + "Links": null, + "Aliases": null, + "NetworkID": "e804771e55254a360fdb70dfdd78d3610fdde231b14ef2f837a00ac1eeb9e601", + "EndpointID": "80f8a61b37701a9991bb98c75ddd23fd9b7c16b5575ca81343f6b44ff4a2a9d9", + "Gateway": "172.20.0.1", + "IPAddress": "172.20.0.2", + "IPPrefixLen": 16, + "IPv6Gateway": "", + "GlobalIPv6Address": "", + "GlobalIPv6PrefixLen": 0, + "MacAddress": "02:42:ac:14:00:0a", + "DriverOpts": null + }, + "dockersd_private1": { + "IPAMConfig": {}, + "Links": null, + "Aliases": [ + "mysql", + "mysql", + "f9ade4b83199" + ], + "NetworkID": "bfcf66a6b64f7d518f009e34290dc3f3c66a08164257ad1afc3bd31d75f656e8", + "EndpointID": "f80921d10e78c99a5907705aae75befea40c3d3e9f820e66ab392f7274be16b8", + "Gateway": "172.21.0.1", + "IPAddress": "172.21.0.2", + "IPPrefixLen": 24, + "IPv6Gateway": "", + "GlobalIPv6Address": "", + "GlobalIPv6PrefixLen": 0, + "MacAddress": "02:42:ac:15:00:02", + "DriverOpts": null + } + } + }, + "Mounts": [] + }, + { + "Id": "59bf76e8816af98856b90dd619c91027145ca501043b1c51756d03b085882e06", + "Names": [ + "/dockersd_mysql_exporter" + ], + "Image": "prom/mysqld-exporter:latest", + "ImageID": "sha256:121b8a7cd0525dd89aaec58ad7d34c3bb3714740e5a67daf6510ccf71ab219a9", + "Command": "/bin/mysqld_exporter", + "Created": 1616273136, + "Ports": [ + { + "PrivatePort": 9104, + "Type": "tcp" + } + ], + "Labels": { + "com.docker.compose.project": "dockersd", + "com.docker.compose.service": "mysqlexporter", + "com.docker.compose.version": "2.2.2", + "maintainer": "The Prometheus Authors " + }, + "State": "running", + "Status": "Up 40 seconds", + "HostConfig": { + "NetworkMode": "container:f9ade4b83199d6f83020b7c0bfd1e8281b19dbf9e6cef2cf89bc45c8f8d20fe8" + }, + "NetworkSettings": { + "Networks": {} + }, + "Mounts": [] } ] diff --git a/discovery/moby/testdata/dockerprom/networks.json b/discovery/moby/testdata/dockerprom/networks.json index 35facd3bb9..75d4442df8 100644 --- a/discovery/moby/testdata/dockerprom/networks.json +++ b/discovery/moby/testdata/dockerprom/networks.json @@ -111,5 +111,59 @@ "Containers": {}, "Options": {}, "Labels": {} + }, + { + "Name": "dockersd_private", + "Id": "e804771e55254a360fdb70dfdd78d3610fdde231b14ef2f837a00ac1eeb9e601", + "Created": "2022-03-25T09:21:17.718370976+08:00", + "Scope": "local", + "Driver": "bridge", + "EnableIPv6": false, + "IPAM": { + "Driver": "default", + "Options": null, + "Config": [ + { + "Subnet": "172.20.0.1/16" + } + ] + }, + "Internal": false, + "Attachable": false, + "Ingress": false, + "ConfigFrom": { + "Network": "" + }, + "ConfigOnly": false, + "Containers": {}, + "Options": {}, + "Labels": {} + }, + { + "Name": "dockersd_private1", + "Id": "bfcf66a6b64f7d518f009e34290dc3f3c66a08164257ad1afc3bd31d75f656e8", + "Created": "2022-03-25T09:21:17.718370976+08:00", + "Scope": "local", + "Driver": "bridge", + "EnableIPv6": false, + "IPAM": { + "Driver": "default", + "Options": null, + "Config": [ + { + "Subnet": "172.21.0.1/16" + } + ] + }, + "Internal": false, + "Attachable": false, + "Ingress": false, + "ConfigFrom": { + "Network": "" + }, + "ConfigOnly": false, + "Containers": {}, + "Options": {}, + "Labels": {} } ] diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 164f426ad5..86599c40e0 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -941,6 +941,9 @@ tls_config: # The host to use if the container is in host networking mode. [ host_networking_host: | default = "localhost" ] +# Match the first network if the container has multiple networks defined, thus avoiding collecting duplicate targets. +[ match_first_network: | default = true ] + # Optional filters to limit the discovery process to a subset of available # resources. # The available filters are listed in the upstream documentation: From 00b110c65c7af446e675ee27cb06b7971afc6e18 Mon Sep 17 00:00:00 2001 From: Martin Chodur Date: Fri, 21 Jun 2024 23:19:58 +0200 Subject: [PATCH 07/16] Fix data corruption in remote write if max_sample_age is applied (#14078) * fix: try to reproduce the bug from https://github.com/prometheus/prometheus/issues/13979 in a test case Signed-off-by: David Vavra * fix: data corruption in remote write if max_sample_age is applied Signed-off-by: David Vavra * add benchmark for buildTimeSeries which does the filtering Signed-off-by: Callum Styan --------- Signed-off-by: David Vavra Signed-off-by: Callum Styan Co-authored-by: David Vavra Co-authored-by: Callum Styan --- storage/remote/queue_manager.go | 8 +- storage/remote/queue_manager_test.go | 186 ++++++++++++++++++++++++--- 2 files changed, 176 insertions(+), 18 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 488485e385..dde78d35e5 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -1783,9 +1783,11 @@ func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeri if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest { lowest = ts.Histograms[0].Timestamp } - - // Move the current element to the write position and increment the write pointer - timeSeries[keepIdx] = timeSeries[i] + if i != keepIdx { + // We have to swap the kept timeseries with the one which should be dropped. + // Copying any elements within timeSeries could cause data corruptions when reusing the slice in a next batch (shards.populateTimeSeries). + timeSeries[keepIdx], timeSeries[i] = timeSeries[i], timeSeries[keepIdx] + } keepIdx++ } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 06783167fb..4d299994bd 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "math" + "math/rand" "os" "runtime/pprof" "sort" @@ -29,6 +30,7 @@ import ( "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" + "github.com/google/go-cmp/cmp" "github.com/prometheus/client_golang/prometheus" client_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" @@ -611,6 +613,30 @@ func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([ return samples, series } +func createProtoTimeseriesWithOld(numSamples, baseTs int64, extraLabels ...labels.Label) []prompb.TimeSeries { + samples := make([]prompb.TimeSeries, numSamples) + // use a fixed rand source so tests are consistent + r := rand.New(rand.NewSource(99)) + for j := int64(0); j < numSamples; j++ { + name := fmt.Sprintf("test_metric_%d", j) + + samples[j] = prompb.TimeSeries{ + Labels: []prompb.Label{{Name: "__name__", Value: name}}, + Samples: []prompb.Sample{ + { + Timestamp: baseTs + j, + Value: float64(j), + }, + }, + } + // 10% of the time use a ts that is too old + if r.Intn(10) == 0 { + samples[j].Samples[0].Timestamp = baseTs - 5 + } + } + return samples +} + func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) { exemplars := make([]record.RefExemplar, 0, numExemplars) series := make([]record.RefSeries, 0, numSeries) @@ -679,8 +705,8 @@ func createHistograms(numSamples, numSeries int, floatHistogram bool) ([]record. return histograms, nil, series } -func getSeriesNameFromRef(r record.RefSeries) string { - return r.Labels.Get("__name__") +func getSeriesIDFromRef(r record.RefSeries) string { + return r.Labels.String() } type TestWriteClient struct { @@ -698,6 +724,9 @@ type TestWriteClient struct { wg sync.WaitGroup mtx sync.Mutex buf []byte + + storeWait time.Duration + returnError error } func NewTestWriteClient() *TestWriteClient { @@ -706,6 +735,8 @@ func NewTestWriteClient() *TestWriteClient { receivedSamples: map[string][]prompb.Sample{}, expectedSamples: map[string][]prompb.Sample{}, receivedMetadata: map[string][]prompb.MetricMetadata{}, + storeWait: 0, + returnError: nil, } } @@ -720,12 +751,15 @@ func (c *TestWriteClient) expectSamples(ss []record.RefSample, series []record.R c.receivedSamples = map[string][]prompb.Sample{} for _, s := range ss { - seriesName := getSeriesNameFromRef(series[s.Ref]) - c.expectedSamples[seriesName] = append(c.expectedSamples[seriesName], prompb.Sample{ + tsID := getSeriesIDFromRef(series[s.Ref]) + c.expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{ Timestamp: s.T, Value: s.V, }) } + if !c.withWaitGroup { + return + } c.wg.Add(len(ss)) } @@ -740,13 +774,13 @@ func (c *TestWriteClient) expectExemplars(ss []record.RefExemplar, series []reco c.receivedExemplars = map[string][]prompb.Exemplar{} for _, s := range ss { - seriesName := getSeriesNameFromRef(series[s.Ref]) + tsID := getSeriesIDFromRef(series[s.Ref]) e := prompb.Exemplar{ Labels: LabelsToLabelsProto(s.Labels, nil), Timestamp: s.T, Value: s.V, } - c.expectedExemplars[seriesName] = append(c.expectedExemplars[seriesName], e) + c.expectedExemplars[tsID] = append(c.expectedExemplars[tsID], e) } c.wg.Add(len(ss)) } @@ -762,8 +796,8 @@ func (c *TestWriteClient) expectHistograms(hh []record.RefHistogramSample, serie c.receivedHistograms = map[string][]prompb.Histogram{} for _, h := range hh { - seriesName := getSeriesNameFromRef(series[h.Ref]) - c.expectedHistograms[seriesName] = append(c.expectedHistograms[seriesName], HistogramToHistogramProto(h.T, h.H)) + tsID := getSeriesIDFromRef(series[h.Ref]) + c.expectedHistograms[tsID] = append(c.expectedHistograms[tsID], HistogramToHistogramProto(h.T, h.H)) } c.wg.Add(len(hh)) } @@ -779,8 +813,8 @@ func (c *TestWriteClient) expectFloatHistograms(fhs []record.RefFloatHistogramSa c.receivedFloatHistograms = map[string][]prompb.Histogram{} for _, fh := range fhs { - seriesName := getSeriesNameFromRef(series[fh.Ref]) - c.expectedFloatHistograms[seriesName] = append(c.expectedFloatHistograms[seriesName], FloatHistogramToHistogramProto(fh.T, fh.FH)) + tsID := getSeriesIDFromRef(series[fh.Ref]) + c.expectedFloatHistograms[tsID] = append(c.expectedFloatHistograms[tsID], FloatHistogramToHistogramProto(fh.T, fh.FH)) } c.wg.Add(len(fhs)) } @@ -806,9 +840,27 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB) { } } +func (c *TestWriteClient) SetStoreWait(w time.Duration) { + c.mtx.Lock() + defer c.mtx.Unlock() + c.storeWait = w +} + +func (c *TestWriteClient) SetReturnError(err error) { + c.mtx.Lock() + defer c.mtx.Unlock() + c.returnError = err +} + func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { c.mtx.Lock() defer c.mtx.Unlock() + if c.storeWait > 0 { + time.Sleep(c.storeWait) + } + if c.returnError != nil { + return c.returnError + } // nil buffers are ok for snappy, ignore cast error. if c.buf != nil { c.buf = c.buf[:cap(c.buf)] @@ -827,23 +879,23 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { count := 0 for _, ts := range reqProto.Timeseries { labels := LabelProtosToLabels(&builder, ts.Labels) - seriesName := labels.Get("__name__") + tsID := labels.String() for _, sample := range ts.Samples { count++ - c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample) + c.receivedSamples[tsID] = append(c.receivedSamples[tsID], sample) } for _, ex := range ts.Exemplars { count++ - c.receivedExemplars[seriesName] = append(c.receivedExemplars[seriesName], ex) + c.receivedExemplars[tsID] = append(c.receivedExemplars[tsID], ex) } for _, histogram := range ts.Histograms { count++ if histogram.IsFloatHistogram() { - c.receivedFloatHistograms[seriesName] = append(c.receivedFloatHistograms[seriesName], histogram) + c.receivedFloatHistograms[tsID] = append(c.receivedFloatHistograms[tsID], histogram) } else { - c.receivedHistograms[seriesName] = append(c.receivedHistograms[seriesName], histogram) + c.receivedHistograms[tsID] = append(c.receivedHistograms[tsID], histogram) } } } @@ -1441,6 +1493,99 @@ func TestIsSampleOld(t *testing.T) { require.False(t, isSampleOld(currentTime, 60*time.Second, timestamp.FromTime(currentTime.Add(-59*time.Second)))) } +// Simulates scenario in which remote write endpoint is down and a subset of samples is dropped due to age limit while backoffing. +func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) { + maxSamplesPerSend := 10 + sampleAgeLimit := time.Second + + cfg := config.DefaultQueueConfig + cfg.MaxShards = 1 + cfg.SampleAgeLimit = model.Duration(sampleAgeLimit) + // Set the batch send deadline to 5 minutes to effectively disable it. + cfg.BatchSendDeadline = model.Duration(time.Minute * 5) + cfg.Capacity = 10 * maxSamplesPerSend // more than the amount of data we append in the test + cfg.MaxBackoff = model.Duration(time.Millisecond * 100) + cfg.MinBackoff = model.Duration(time.Millisecond * 100) + cfg.MaxSamplesPerSend = maxSamplesPerSend + metadataCfg := config.DefaultMetadataConfig + metadataCfg.Send = true + metadataCfg.SendInterval = model.Duration(time.Second * 60) + metadataCfg.MaxSamplesPerSend = maxSamplesPerSend + c := NewTestWriteClient() + c.withWaitGroup = false + m := newTestQueueManager(t, cfg, metadataCfg, time.Second, c) + + m.Start() + + batchID := 0 + expectedSamples := map[string][]prompb.Sample{} + + appendData := func(numberOfSeries int, timeAdd time.Duration, shouldBeDropped bool) { + t.Log(">>>> Appending series ", numberOfSeries, " as batch ID ", batchID, " with timeAdd ", timeAdd, " and should be dropped ", shouldBeDropped) + samples, series := createTimeseriesWithRandomLabelCount(strconv.Itoa(batchID), numberOfSeries, timeAdd, 9) + m.StoreSeries(series, batchID) + sent := m.Append(samples) + require.True(t, sent, "samples not sent") + if !shouldBeDropped { + for _, s := range samples { + tsID := getSeriesIDFromRef(series[s.Ref]) + expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{ + Timestamp: s.T, + Value: s.V, + }) + } + } + batchID++ + } + timeShift := -time.Millisecond * 5 + + c.SetReturnError(RecoverableError{context.DeadlineExceeded, defaultBackoff}) + + appendData(maxSamplesPerSend/2, timeShift, true) + time.Sleep(sampleAgeLimit) + appendData(maxSamplesPerSend/2, timeShift, true) + time.Sleep(sampleAgeLimit / 10) + appendData(maxSamplesPerSend/2, timeShift, true) + time.Sleep(2 * sampleAgeLimit) + appendData(2*maxSamplesPerSend, timeShift, false) + time.Sleep(sampleAgeLimit / 2) + c.SetReturnError(nil) + appendData(5, timeShift, false) + m.Stop() + + if diff := cmp.Diff(expectedSamples, c.receivedSamples); diff != "" { + t.Errorf("mismatch (-want +got):\n%s", diff) + } +} + +func createTimeseriesWithRandomLabelCount(id string, seriesCount int, timeAdd time.Duration, maxLabels int) ([]record.RefSample, []record.RefSeries) { + samples := []record.RefSample{} + series := []record.RefSeries{} + // use a fixed rand source so tests are consistent + r := rand.New(rand.NewSource(99)) + for i := 0; i < seriesCount; i++ { + s := record.RefSample{ + Ref: chunks.HeadSeriesRef(i), + T: time.Now().Add(timeAdd).UnixMilli(), + V: r.Float64(), + } + samples = append(samples, s) + labelsCount := r.Intn(maxLabels) + lb := labels.NewScratchBuilder(1 + labelsCount) + lb.Add("__name__", "batch_"+id+"_id_"+strconv.Itoa(i)) + for j := 1; j < labelsCount+1; j++ { + // same for both name and value + label := "batch_" + id + "_label_" + strconv.Itoa(j) + lb.Add(label, label) + } + series = append(series, record.RefSeries{ + Ref: chunks.HeadSeriesRef(i), + Labels: lb.Labels(), + }) + } + return samples, series +} + func createTimeseriesWithOldSamples(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSample, []record.RefSeries) { newSamples := make([]record.RefSample, 0, numSamples) samples := make([]record.RefSample, 0, numSamples) @@ -1668,3 +1813,14 @@ func TestBuildTimeSeries(t *testing.T) { }) } } + +func BenchmarkBuildTimeSeries(b *testing.B) { + // Send one sample per series, which is the typical remote_write case + const numSamples = 10000 + filter := func(ts prompb.TimeSeries) bool { return filterTsLimit(99, ts) } + for i := 0; i < b.N; i++ { + samples := createProtoTimeseriesWithOld(numSamples, 100, extraLabels...) + _, _, result, _, _, _ := buildTimeSeries(samples, filter) + require.NotNil(b, result) + } +} From d902116b415b62f3d17ede478edb491b4974a562 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Mon, 24 Jun 2024 16:11:53 -0700 Subject: [PATCH 08/16] Fix various linting errors Signed-off-by: Arve Knudsen --- cmd/prometheus/query_log_test.go | 2 +- discovery/eureka/client.go | 1 + discovery/hetzner/robot.go | 1 + notifier/notifier.go | 1 + storage/remote/client.go | 3 +++ web/api/v1/api_test.go | 2 -- 6 files changed, 7 insertions(+), 3 deletions(-) diff --git a/cmd/prometheus/query_log_test.go b/cmd/prometheus/query_log_test.go index 9a7a3ed855..62e317bf8b 100644 --- a/cmd/prometheus/query_log_test.go +++ b/cmd/prometheus/query_log_test.go @@ -72,7 +72,7 @@ func (p *queryLogTest) waitForPrometheus() error { var err error for x := 0; x < 20; x++ { var r *http.Response - if r, err = http.Get(fmt.Sprintf("http://%s:%d%s/-/ready", p.host, p.port, p.prefix)); err == nil && r.StatusCode == 200 { + if r, err = http.Get(fmt.Sprintf("http://%s:%d%s/-/ready", p.host, p.port, p.prefix)); err == nil && r.StatusCode == http.StatusOK { break } time.Sleep(500 * time.Millisecond) diff --git a/discovery/eureka/client.go b/discovery/eureka/client.go index 52e8ce7b48..5a90968f1b 100644 --- a/discovery/eureka/client.go +++ b/discovery/eureka/client.go @@ -97,6 +97,7 @@ func fetchApps(ctx context.Context, server string, client *http.Client) (*Applic resp.Body.Close() }() + //nolint:usestdlibvars if resp.StatusCode/100 != 2 { return nil, fmt.Errorf("non 2xx status '%d' response during eureka service discovery", resp.StatusCode) } diff --git a/discovery/hetzner/robot.go b/discovery/hetzner/robot.go index 516470b05a..64155bfaed 100644 --- a/discovery/hetzner/robot.go +++ b/discovery/hetzner/robot.go @@ -87,6 +87,7 @@ func (d *robotDiscovery) refresh(context.Context) ([]*targetgroup.Group, error) resp.Body.Close() }() + //nolint:usestdlibvars if resp.StatusCode/100 != 2 { return nil, fmt.Errorf("non 2xx status '%d' response during hetzner service discovery with role robot", resp.StatusCode) } diff --git a/notifier/notifier.go b/notifier/notifier.go index eb83c45b07..cd00a4507d 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -616,6 +616,7 @@ func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []b }() // Any HTTP status 2xx is OK. + //nolint:usestdlibvars if resp.StatusCode/100 != 2 { return fmt.Errorf("bad response status %s", resp.Status) } diff --git a/storage/remote/client.go b/storage/remote/client.go index 140194ec71..e8791b643a 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -231,6 +231,7 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error { httpResp.Body.Close() }() + //nolint:usestdlibvars if httpResp.StatusCode/100 != 2 { scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen)) line := "" @@ -239,6 +240,7 @@ func (c *Client) Store(ctx context.Context, req []byte, attempt int) error { } err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) } + //nolint:usestdlibvars if httpResp.StatusCode/100 == 5 || (c.retryOnRateLimit && httpResp.StatusCode == http.StatusTooManyRequests) { return RecoverableError{err, retryAfterDuration(httpResp.Header.Get("Retry-After"))} @@ -323,6 +325,7 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe return nil, fmt.Errorf("error reading response. HTTP status code: %s: %w", httpResp.Status, err) } + //nolint:usestdlibvars if httpResp.StatusCode/100 != 2 { return nil, fmt.Errorf("remote server %s returned HTTP status %s: %s", c.urlString, httpResp.Status, strings.TrimSpace(string(compressed))) } diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index b30890893b..74cd2239d5 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -2973,10 +2973,8 @@ func assertAPIError(t *testing.T, got *apiError, exp errorType) { t.Helper() if exp == errorNone { - //nolint:testifylint require.Nil(t, got) } else { - //nolint:testifylint require.NotNil(t, got) require.Equal(t, exp, got.typ, "(%q)", got) } From 0395b0441917b0de8062c32e0abad803c2f46252 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Mon, 24 Jun 2024 16:14:22 -0700 Subject: [PATCH 09/16] golangci-lint: Upgrade to v1.59.1 Signed-off-by: Arve Knudsen --- .github/workflows/ci.yml | 2 +- Makefile.common | 2 +- scripts/golangci-lint.yml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 978218dba2..8b3624383c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -174,7 +174,7 @@ jobs: with: args: --verbose # Make sure to sync this with Makefile.common and scripts/golangci-lint.yml. - version: v1.59.0 + version: v1.59.1 fuzzing: uses: ./.github/workflows/fuzzing.yml if: github.event_name == 'pull_request' diff --git a/Makefile.common b/Makefile.common index 1617292350..e3da72ab47 100644 --- a/Makefile.common +++ b/Makefile.common @@ -61,7 +61,7 @@ PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_ SKIP_GOLANGCI_LINT := GOLANGCI_LINT := GOLANGCI_LINT_OPTS ?= -GOLANGCI_LINT_VERSION ?= v1.59.0 +GOLANGCI_LINT_VERSION ?= v1.59.1 # golangci-lint only supports linux, darwin and windows platforms on i386/amd64/arm64. # windows isn't included here because of the path separator being different. ifeq ($(GOHOSTOS),$(filter $(GOHOSTOS),linux darwin)) diff --git a/scripts/golangci-lint.yml b/scripts/golangci-lint.yml index 8de7af6394..bb65d7f607 100644 --- a/scripts/golangci-lint.yml +++ b/scripts/golangci-lint.yml @@ -36,4 +36,4 @@ jobs: uses: golangci/golangci-lint-action@a4f60bb28d35aeee14e6880718e0c85ff1882e64 # v6.0.1 with: args: --verbose - version: v1.59.0 + version: v1.59.1 From 2c5e88748e58c09485f64049b82bcdd5b0f58aaf Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Tue, 25 Jun 2024 14:22:44 +1000 Subject: [PATCH 10/16] Fix issue where pending OOO read can be left dangling if creating querier fails Signed-off-by: Charles Korn --- tsdb/db.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tsdb/db.go b/tsdb/db.go index c44737c692..95250392e0 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -2077,6 +2077,9 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef) outOfOrderHeadQuerier, err := NewBlockChunkQuerier(rh, mint, maxt) if err != nil { + // If NewBlockQuerier() failed, make sure to clean up the pending read created by NewOOORangeHead. + rh.isoState.Close() + return nil, fmt.Errorf("open block chunk querier for ooo head %s: %w", rh, err) } From 5585a3c7e5e382cd6f81901cc24135b0e35c640e Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 25 Jun 2024 00:47:06 -0700 Subject: [PATCH 11/16] tsdb: expose hook to customize block querier (#14114) * expose hook for block querier Signed-off-by: Ben Ye * update comment Signed-off-by: Ben Ye * use defined type Signed-off-by: Ben Ye --------- Signed-off-by: Ben Ye --- tsdb/db.go | 54 ++++++++++++++++++++++++++--------- tsdb/db_test.go | 75 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 13 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index c44737c692..61990a12f9 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -192,12 +192,22 @@ type Options struct { // NewCompactorFunc is a function that returns a TSDB compactor. NewCompactorFunc NewCompactorFunc + + // BlockQuerierFunc is a function to return storage.Querier from a BlockReader. + BlockQuerierFunc BlockQuerierFunc + + // BlockChunkQuerierFunc is a function to return storage.ChunkQuerier from a BlockReader. + BlockChunkQuerierFunc BlockChunkQuerierFunc } type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} +type BlockQuerierFunc func(b BlockReader, mint, maxt int64) (storage.Querier, error) + +type BlockChunkQuerierFunc func(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) + // DB handles reads and writes of time series falling into // a hashed partition of a seriedb. type DB struct { @@ -244,6 +254,10 @@ type DB struct { writeNotified wlog.WriteNotified registerer prometheus.Registerer + + blockQuerierFunc BlockQuerierFunc + + blockChunkQuerierFunc BlockChunkQuerierFunc } type dbMetrics struct { @@ -559,10 +573,12 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue db.closers = append(db.closers, head) return &DB{ - dir: db.dir, - logger: db.logger, - blocks: blocks, - head: head, + dir: db.dir, + logger: db.logger, + blocks: blocks, + head: head, + blockQuerierFunc: NewBlockQuerier, + blockChunkQuerierFunc: NewBlockChunkQuerier, }, nil } @@ -870,6 +886,18 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } db.compactCancel = cancel + if opts.BlockQuerierFunc == nil { + db.blockQuerierFunc = NewBlockQuerier + } else { + db.blockQuerierFunc = opts.BlockQuerierFunc + } + + if opts.BlockChunkQuerierFunc == nil { + db.blockChunkQuerierFunc = NewBlockChunkQuerier + } else { + db.blockChunkQuerierFunc = opts.BlockChunkQuerierFunc + } + var wal, wbl *wlog.WL segmentSize := wlog.DefaultSegmentSize // Wal is enabled. @@ -1964,7 +1992,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { if maxt >= db.head.MinTime() { rh := NewRangeHead(db.head, mint, maxt) var err error - inOrderHeadQuerier, err := NewBlockQuerier(rh, mint, maxt) + inOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt) if err != nil { return nil, fmt.Errorf("open block querier for head %s: %w", rh, err) } @@ -1981,7 +2009,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { } if getNew { rh := NewRangeHead(db.head, newMint, maxt) - inOrderHeadQuerier, err = NewBlockQuerier(rh, newMint, maxt) + inOrderHeadQuerier, err = db.blockQuerierFunc(rh, newMint, maxt) if err != nil { return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err) } @@ -1995,9 +2023,9 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef) var err error - outOfOrderHeadQuerier, err := NewBlockQuerier(rh, mint, maxt) + outOfOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt) if err != nil { - // If NewBlockQuerier() failed, make sure to clean up the pending read created by NewOOORangeHead. + // If BlockQuerierFunc() failed, make sure to clean up the pending read created by NewOOORangeHead. rh.isoState.Close() return nil, fmt.Errorf("open block querier for ooo head %s: %w", rh, err) @@ -2007,7 +2035,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { } for _, b := range blocks { - q, err := NewBlockQuerier(b, mint, maxt) + q, err := db.blockQuerierFunc(b, mint, maxt) if err != nil { return nil, fmt.Errorf("open querier for block %s: %w", b, err) } @@ -2045,7 +2073,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer if maxt >= db.head.MinTime() { rh := NewRangeHead(db.head, mint, maxt) - inOrderHeadQuerier, err := NewBlockChunkQuerier(rh, mint, maxt) + inOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt) if err != nil { return nil, fmt.Errorf("open querier for head %s: %w", rh, err) } @@ -2062,7 +2090,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer } if getNew { rh := NewRangeHead(db.head, newMint, maxt) - inOrderHeadQuerier, err = NewBlockChunkQuerier(rh, newMint, maxt) + inOrderHeadQuerier, err = db.blockChunkQuerierFunc(rh, newMint, maxt) if err != nil { return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err) } @@ -2075,7 +2103,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef) - outOfOrderHeadQuerier, err := NewBlockChunkQuerier(rh, mint, maxt) + outOfOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt) if err != nil { return nil, fmt.Errorf("open block chunk querier for ooo head %s: %w", rh, err) } @@ -2084,7 +2112,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer } for _, b := range blocks { - q, err := NewBlockChunkQuerier(b, mint, maxt) + q, err := db.blockChunkQuerierFunc(b, mint, maxt) if err != nil { return nil, fmt.Errorf("open querier for block %s: %w", b, err) } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 3d2fb2d99d..1fb6d30d61 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -7159,3 +7159,78 @@ func TestNewCompactorFunc(t *testing.T) { require.Len(t, ulids, 1) require.Equal(t, block2, ulids[0]) } + +func TestBlockQuerierAndBlockChunkQuerier(t *testing.T) { + opts := DefaultOptions() + opts.BlockQuerierFunc = func(b BlockReader, mint, maxt int64) (storage.Querier, error) { + // Only block with hints can be queried. + if len(b.Meta().Compaction.Hints) > 0 { + return NewBlockQuerier(b, mint, maxt) + } + return storage.NoopQuerier(), nil + } + opts.BlockChunkQuerierFunc = func(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) { + // Only level 4 compaction block can be queried. + if b.Meta().Compaction.Level == 4 { + return NewBlockChunkQuerier(b, mint, maxt) + } + return storage.NoopChunkedQuerier(), nil + } + + db := openTestDB(t, opts, nil) + defer func() { + require.NoError(t, db.Close()) + }() + + metas := []BlockMeta{ + {Compaction: BlockMetaCompaction{Hints: []string{"test-hint"}}}, + {Compaction: BlockMetaCompaction{Level: 4}}, + } + for i := range metas { + // Include blockID into series to identify which block got touched. + serieses := []storage.Series{storage.NewListSeries(labels.FromMap(map[string]string{"block": fmt.Sprintf("block-%d", i), labels.MetricName: "test_metric"}), []chunks.Sample{sample{t: 0, f: 1}})} + blockDir := createBlock(t, db.Dir(), serieses) + b, err := OpenBlock(db.logger, blockDir, db.chunkPool) + require.NoError(t, err) + + // Overwrite meta.json with compaction section for testing purpose. + b.meta.Compaction = metas[i].Compaction + _, err = writeMetaFile(db.logger, blockDir, &b.meta) + require.NoError(t, err) + require.NoError(t, b.Close()) + } + require.NoError(t, db.reloadBlocks()) + require.Len(t, db.Blocks(), 2) + + querier, err := db.Querier(0, 500) + require.NoError(t, err) + defer querier.Close() + matcher := labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric") + seriesSet := querier.Select(context.Background(), false, nil, matcher) + count := 0 + var lbls labels.Labels + for seriesSet.Next() { + count++ + lbls = seriesSet.At().Labels() + } + require.NoError(t, seriesSet.Err()) + require.Equal(t, 1, count) + // Make sure only block-0 is queried. + require.Equal(t, "block-0", lbls.Get("block")) + + chunkQuerier, err := db.ChunkQuerier(0, 500) + require.NoError(t, err) + defer chunkQuerier.Close() + css := chunkQuerier.Select(context.Background(), false, nil, matcher) + count = 0 + // Reset lbls variable. + lbls = labels.EmptyLabels() + for css.Next() { + count++ + lbls = css.At().Labels() + } + require.NoError(t, css.Err()) + require.Equal(t, 1, count) + // Make sure only block-1 is queried. + require.Equal(t, "block-1", lbls.Get("block")) +} From 246b7c6a5c5d7d22874318c663dcf5b18a94b9cf Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 25 Jun 2024 01:21:48 -0700 Subject: [PATCH 12/16] TSDB: Change block populator to accept postings index function (#14213) Signed-off-by: Ben Ye --- tsdb/compact.go | 28 +++++++++++++------- tsdb/compact_test.go | 62 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 79 insertions(+), 11 deletions(-) diff --git a/tsdb/compact.go b/tsdb/compact.go index 3c921520f5..9ef42b339b 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -656,7 +656,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl } closers = append(closers, indexw) - if err := blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, indexw, chunkw); err != nil { + if err := blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, indexw, chunkw, AllSortedPostings); err != nil { return fmt.Errorf("populate block: %w", err) } @@ -722,7 +722,20 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl } type BlockPopulator interface { - PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error + PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter, postingsFunc IndexReaderPostingsFunc) error +} + +// IndexReaderPostingsFunc is a function to get a sorted posting iterator from a given index reader. +type IndexReaderPostingsFunc func(ctx context.Context, reader IndexReader) index.Postings + +// AllSortedPostings returns a sorted all posting iterator from the input index reader. +func AllSortedPostings(ctx context.Context, reader IndexReader) index.Postings { + k, v := index.AllPostingsKey() + all, err := reader.Postings(ctx, k, v) + if err != nil { + return index.ErrPostings(err) + } + return reader.SortedPostings(all) } type DefaultBlockPopulator struct{} @@ -730,7 +743,7 @@ type DefaultBlockPopulator struct{} // PopulateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. // It expects sorted blocks input by mint. -func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) { +func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter, postingsFunc IndexReaderPostingsFunc) (err error) { if len(blocks) == 0 { return errors.New("cannot populate block from no readers") } @@ -788,14 +801,9 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa } closers = append(closers, tombsr) - k, v := index.AllPostingsKey() - all, err := indexr.Postings(ctx, k, v) - if err != nil { - return err - } - all = indexr.SortedPostings(all) + postings := postingsFunc(ctx, indexr) // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. - sets = append(sets, NewBlockChunkSeriesSet(b.Meta().ULID, indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1, false)) + sets = append(sets, NewBlockChunkSeriesSet(b.Meta().ULID, indexr, chunkr, tombsr, postings, meta.MinTime, meta.MaxTime-1, false)) syms := indexr.Symbols() if i == 0 { symbols = syms diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 5ce163f1ef..0df6ca0505 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -38,6 +38,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/tsdb/wlog" @@ -493,6 +494,7 @@ func TestCompaction_populateBlock(t *testing.T) { inputSeriesSamples [][]seriesSamples compactMinTime int64 compactMaxTime int64 // When not defined the test runner sets a default of math.MaxInt64. + irPostingsFunc IndexReaderPostingsFunc expSeriesSamples []seriesSamples expErr error }{ @@ -961,6 +963,60 @@ func TestCompaction_populateBlock(t *testing.T) { }, }, }, + { + title: "Populate from single block with index reader postings function selecting different series. Expect empty block.", + inputSeriesSamples: [][]seriesSamples{ + { + { + lset: map[string]string{"a": "b"}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + }, + }, + }, + irPostingsFunc: func(ctx context.Context, reader IndexReader) index.Postings { + p, err := reader.Postings(ctx, "a", "c") + if err != nil { + return index.EmptyPostings() + } + return reader.SortedPostings(p) + }, + }, + { + title: "Populate from single block with index reader postings function selecting one series. Expect partial block.", + inputSeriesSamples: [][]seriesSamples{ + { + { + lset: map[string]string{"a": "b"}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + }, + { + lset: map[string]string{"a": "c"}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + }, + { + lset: map[string]string{"a": "d"}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + }, + }, + }, + irPostingsFunc: func(ctx context.Context, reader IndexReader) index.Postings { + p, err := reader.Postings(ctx, "a", "c", "d") + if err != nil { + return index.EmptyPostings() + } + return reader.SortedPostings(p) + }, + expSeriesSamples: []seriesSamples{ + { + lset: map[string]string{"a": "c"}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + }, + { + lset: map[string]string{"a": "d"}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + }, + }, + }, } { t.Run(tc.title, func(t *testing.T) { blocks := make([]BlockReader, 0, len(tc.inputSeriesSamples)) @@ -982,7 +1038,11 @@ func TestCompaction_populateBlock(t *testing.T) { iw := &mockIndexWriter{} blockPopulator := DefaultBlockPopulator{} - err = blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, iw, nopChunkWriter{}) + irPostingsFunc := AllSortedPostings + if tc.irPostingsFunc != nil { + irPostingsFunc = tc.irPostingsFunc + } + err = blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, iw, nopChunkWriter{}, irPostingsFunc) if tc.expErr != nil { require.Error(t, err) require.Equal(t, tc.expErr.Error(), err.Error()) From 99355443c774b5e681e7d7f0cc1c213bfa55ce11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan-Otto=20Kr=C3=B6pke?= Date: Tue, 25 Jun 2024 13:25:39 +0200 Subject: [PATCH 13/16] remote write handler: reject samples with future timestamps (#14304) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(remote_write): reject samples with future timestamps * increase check to +10 minutes to allow for clock drift --------- Signed-off-by: Jan-Otto Kröpke Signed-off-by: Jan-Otto Kröpke Signed-off-by: Jan-Otto Kröpke Co-authored-by: Bryan Boreham --- storage/remote/write_handler.go | 67 ++++++++++-- storage/remote/write_handler_test.go | 147 ++++++++++++++++++--------- 2 files changed, 161 insertions(+), 53 deletions(-) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index e7515a42b8..0832c65abe 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "net/http" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -25,7 +26,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" @@ -38,6 +41,8 @@ type writeHandler struct { samplesWithInvalidLabelsTotal prometheus.Counter } +const maxAheadTime = 10 * time.Minute + // NewWriteHandler creates a http.Handler that accepts remote write requests and // writes them to the provided appendable. func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable) http.Handler { @@ -104,17 +109,22 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err outOfOrderExemplarErrs := 0 samplesWithInvalidLabels := 0 - app := h.appendable.Appender(ctx) + timeLimitApp := &timeLimitAppender{ + Appender: h.appendable.Appender(ctx), + maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), + } + defer func() { if err != nil { - _ = app.Rollback() + _ = timeLimitApp.Rollback() return } - err = app.Commit() + err = timeLimitApp.Commit() }() b := labels.NewScratchBuilder(0) var exemplarErr error + for _, ts := range req.Timeseries { labels := LabelProtosToLabels(&b, ts.Labels) if !labels.IsValid() { @@ -124,7 +134,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err } var ref storage.SeriesRef for _, s := range ts.Samples { - ref, err = app.Append(ref, labels, s.Timestamp, s.Value) + ref, err = timeLimitApp.Append(ref, labels, s.Timestamp, s.Value) if err != nil { unwrappedErr := errors.Unwrap(err) if unwrappedErr == nil { @@ -140,7 +150,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err for _, ep := range ts.Exemplars { e := exemplarProtoToExemplar(&b, ep) - _, exemplarErr = app.AppendExemplar(0, labels, e) + _, exemplarErr = timeLimitApp.AppendExemplar(0, labels, e) exemplarErr = h.checkAppendExemplarError(exemplarErr, e, &outOfOrderExemplarErrs) if exemplarErr != nil { // Since exemplar storage is still experimental, we don't fail the request on ingestion errors. @@ -151,11 +161,12 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err for _, hp := range ts.Histograms { if hp.IsFloatHistogram() { fhs := FloatHistogramProtoToFloatHistogram(hp) - _, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs) + _, err = timeLimitApp.AppendHistogram(0, labels, hp.Timestamp, nil, fhs) } else { hs := HistogramProtoToHistogram(hp) - _, err = app.AppendHistogram(0, labels, hp.Timestamp, hs, nil) + _, err = timeLimitApp.AppendHistogram(0, labels, hp.Timestamp, hs, nil) } + if err != nil { unwrappedErr := errors.Unwrap(err) if unwrappedErr == nil { @@ -233,3 +244,45 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } + +type timeLimitAppender struct { + storage.Appender + + maxTime int64 +} + +func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + if t > app.maxTime { + return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds) + } + + ref, err := app.Appender.Append(ref, lset, t, v) + if err != nil { + return 0, err + } + return ref, nil +} + +func (app *timeLimitAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if t > app.maxTime { + return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds) + } + + ref, err := app.Appender.AppendHistogram(ref, l, t, h, fh) + if err != nil { + return 0, err + } + return ref, nil +} + +func (app *timeLimitAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + if e.Ts > app.maxTime { + return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds) + } + + ref, err := app.Appender.AppendExemplar(ref, l, e) + if err != nil { + return 0, err + } + return ref, nil +} diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 1715e92c27..30dc1b3d69 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "io" + "math" "net/http" "net/http/httptest" "strconv" @@ -87,73 +88,127 @@ func TestRemoteWriteHandler(t *testing.T) { } func TestOutOfOrderSample(t *testing.T) { - buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ - Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, - Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, - }}, nil, nil, nil, nil) - require.NoError(t, err) + tests := []struct { + Name string + Timestamp int64 + }{ + { + Name: "historic", + Timestamp: 0, + }, + { + Name: "future", + Timestamp: math.MaxInt64, + }, + } - req, err := http.NewRequest("", "", bytes.NewReader(buf)) - require.NoError(t, err) + for _, tc := range tests { + t.Run(tc.Name, func(t *testing.T) { + buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ + Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, + Samples: []prompb.Sample{{Value: 1, Timestamp: tc.Timestamp}}, + }}, nil, nil, nil, nil) + require.NoError(t, err) - appendable := &mockAppendable{ - latestSample: 100, - } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + require.NoError(t, err) - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) + appendable := &mockAppendable{ + latestSample: 100, + } + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) - resp := recorder.Result() - require.Equal(t, http.StatusBadRequest, resp.StatusCode) + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(t, http.StatusBadRequest, resp.StatusCode) + }) + } } // This test case currently aims to verify that the WriteHandler endpoint // don't fail on ingestion errors since the exemplar storage is // still experimental. func TestOutOfOrderExemplar(t *testing.T) { - buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ - Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, - Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}}, - }}, nil, nil, nil, nil) - require.NoError(t, err) + tests := []struct { + Name string + Timestamp int64 + }{ + { + Name: "historic", + Timestamp: 0, + }, + { + Name: "future", + Timestamp: math.MaxInt64, + }, + } - req, err := http.NewRequest("", "", bytes.NewReader(buf)) - require.NoError(t, err) + for _, tc := range tests { + t.Run(tc.Name, func(t *testing.T) { + buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ + Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, + Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: tc.Timestamp}}, + }}, nil, nil, nil, nil) + require.NoError(t, err) - appendable := &mockAppendable{ - latestExemplar: 100, - } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + require.NoError(t, err) - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) + appendable := &mockAppendable{ + latestExemplar: 100, + } + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) - resp := recorder.Result() - // TODO: update to require.Equal(t, http.StatusConflict, resp.StatusCode) once exemplar storage is not experimental. - require.Equal(t, http.StatusNoContent, resp.StatusCode) + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + // TODO: update to require.Equal(t, http.StatusConflict, resp.StatusCode) once exemplar storage is not experimental. + require.Equal(t, http.StatusNoContent, resp.StatusCode) + }) + } } func TestOutOfOrderHistogram(t *testing.T) { - buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ - Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, - Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))}, - }}, nil, nil, nil, nil) - require.NoError(t, err) + tests := []struct { + Name string + Timestamp int64 + }{ + { + Name: "historic", + Timestamp: 0, + }, + { + Name: "future", + Timestamp: math.MaxInt64, + }, + } - req, err := http.NewRequest("", "", bytes.NewReader(buf)) - require.NoError(t, err) + for _, tc := range tests { + t.Run(tc.Name, func(t *testing.T) { + buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{ + Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, + Histograms: []prompb.Histogram{HistogramToHistogramProto(tc.Timestamp, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))}, + }}, nil, nil, nil, nil) + require.NoError(t, err) - appendable := &mockAppendable{ - latestHistogram: 100, - } - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + require.NoError(t, err) - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) + appendable := &mockAppendable{ + latestHistogram: 100, + } + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable) - resp := recorder.Result() - require.Equal(t, http.StatusBadRequest, resp.StatusCode) + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(t, http.StatusBadRequest, resp.StatusCode) + }) + } } func BenchmarkRemoteWritehandler(b *testing.B) { From 1b5f65002a06ac9c762660def5bd80290819e0f4 Mon Sep 17 00:00:00 2001 From: Daniel Mellado Date: Tue, 25 Jun 2024 16:31:03 +0200 Subject: [PATCH 14/16] Bump go-retryablehttp to fix basic auth creds leak This PR updates go-retryablehttp to version 0.7.7, even if it's used as an indirect import. Versions previous to that can didn't sanitize urls, discussed at HDCSEC-2024-12 [1] [1] https://discuss.hashicorp.com/t/hcsec-2024-12-go-retryablehttp-can-leak-basic-auth-credentials-to-log-files/68027 Signed-off-by: Daniel Mellado --- go.mod | 4 ++-- go.sum | 9 ++++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index ac8b4f469d..ce2f0714a0 100644 --- a/go.mod +++ b/go.mod @@ -146,10 +146,10 @@ require ( github.com/hashicorp/cronexpr v1.1.2 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect - github.com/hashicorp/go-hclog v1.5.0 // indirect + github.com/hashicorp/go-hclog v1.6.3 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect - github.com/hashicorp/go-retryablehttp v0.7.4 // indirect + github.com/hashicorp/go-retryablehttp v0.7.7 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect github.com/hashicorp/golang-lru v0.6.0 // indirect github.com/hashicorp/serf v0.10.1 // indirect diff --git a/go.sum b/go.sum index 06db002f55..956b9d8949 100644 --- a/go.sum +++ b/go.sum @@ -369,9 +369,8 @@ github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtng github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= -github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= -github.com/hashicorp/go-hclog v1.5.0 h1:bI2ocEMgcVlz55Oj1xZNBsVi900c7II+fWDyV9o+13c= -github.com/hashicorp/go-hclog v1.5.0/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= +github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc= github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= @@ -383,8 +382,8 @@ github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= -github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZnpUv3/+BxzFA= -github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= +github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= +github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU= github.com/hashicorp/go-rootcerts v1.0.2 h1:jzhAVGtqPKbwpyCPELlgNWhE1znq+qwJtW5Oi2viEzc= github.com/hashicorp/go-rootcerts v1.0.2/go.mod h1:pqUvnprVnM5bf7AOirdbb01K4ccR319Vf4pU3K5EGc8= From 2dd07fbb1bfe8ebeca8ea11f4623e0f5faed2236 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 26 Jun 2024 20:32:04 +1000 Subject: [PATCH 15/16] notifier: optionally drain queued notifications before shutting down (#14290) * Add draining of queued notifications to `notifier.Manager` Signed-off-by: Charles Korn * Update docs Signed-off-by: Charles Korn * Address PR feedback Signed-off-by: Charles Korn * Add more logging Signed-off-by: Charles Korn * Address offline feedback: remove timeout Signed-off-by: Charles Korn * Ensure stopping takes priority over further processing, make tests more robust Signed-off-by: Charles Korn * Make channel unbuffered Signed-off-by: Charles Korn * Update docs Signed-off-by: Charles Korn * Fix race in test Signed-off-by: Charles Korn * Remove unnecessary context Signed-off-by: Charles Korn * Make Stop safe to call multiple times Signed-off-by: Charles Korn --------- Signed-off-by: Charles Korn --- cmd/prometheus/main.go | 3 + docs/command-line/prometheus.md | 1 + notifier/notifier.go | 139 +++++++++++++++++++------- notifier/notifier_test.go | 170 ++++++++++++++++++++++++++++++++ 4 files changed, 277 insertions(+), 36 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index cd7f533d1c..7544f276a6 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -445,6 +445,9 @@ func main() { serverOnlyFlag(a, "alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications."). Default("10000").IntVar(&cfg.notifier.QueueCapacity) + serverOnlyFlag(a, "alertmanager.drain-notification-queue-on-shutdown", "Send any outstanding Alertmanager notifications when shutting down. If false, any outstanding Alertmanager notifications will be dropped when shutting down."). + Default("true").BoolVar(&cfg.notifier.DrainOnShutdown) + // TODO: Remove in Prometheus 3.0. alertmanagerTimeout := a.Flag("alertmanager.timeout", "[DEPRECATED] This flag has no effect.").Hidden().String() diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index aa9bf3bfb0..1fc032d09b 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -50,6 +50,7 @@ The Prometheus monitoring server | --rules.alert.resend-delay | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` | | --rules.max-concurrent-evals | Global concurrency limit for independent rules that can run concurrently. When set, "query.max-concurrency" may need to be adjusted accordingly. Use with server mode only. | `4` | | --alertmanager.notification-queue-capacity | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` | +| --alertmanager.drain-notification-queue-on-shutdown | Send any outstanding Alertmanager notifications when shutting down. If false, any outstanding Alertmanager notifications will be dropped when shutting down. Use with server mode only. | `true` | | --query.lookback-delta | The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` | | --query.timeout | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | | --query.max-concurrency | Maximum number of queries executed concurrently. Use with server mode only. | `20` | diff --git a/notifier/notifier.go b/notifier/notifier.go index cd00a4507d..68b0d4961e 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -110,10 +110,11 @@ type Manager struct { metrics *alertMetrics - more chan struct{} - mtx sync.RWMutex - ctx context.Context - cancel func() + more chan struct{} + mtx sync.RWMutex + + stopOnce *sync.Once + stopRequested chan struct{} alertmanagers map[string]*alertmanagerSet logger log.Logger @@ -121,9 +122,10 @@ type Manager struct { // Options are the configurable parameters of a Handler. type Options struct { - QueueCapacity int - ExternalLabels labels.Labels - RelabelConfigs []*relabel.Config + QueueCapacity int + DrainOnShutdown bool + ExternalLabels labels.Labels + RelabelConfigs []*relabel.Config // Used for sending HTTP requests to the Alertmanager. Do func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) @@ -217,8 +219,6 @@ func do(ctx context.Context, client *http.Client, req *http.Request) (*http.Resp // NewManager is the manager constructor. func NewManager(o *Options, logger log.Logger) *Manager { - ctx, cancel := context.WithCancel(context.Background()) - if o.Do == nil { o.Do = do } @@ -227,12 +227,12 @@ func NewManager(o *Options, logger log.Logger) *Manager { } n := &Manager{ - queue: make([]*Alert, 0, o.QueueCapacity), - ctx: ctx, - cancel: cancel, - more: make(chan struct{}, 1), - opts: o, - logger: logger, + queue: make([]*Alert, 0, o.QueueCapacity), + more: make(chan struct{}, 1), + stopRequested: make(chan struct{}), + stopOnce: &sync.Once{}, + opts: o, + logger: logger, } queueLenFunc := func() float64 { return float64(n.queueLen()) } @@ -298,40 +298,98 @@ func (n *Manager) nextBatch() []*Alert { return alerts } +// Run dispatches notifications continuously, returning once Stop has been called and all +// pending notifications have been drained from the queue (if draining is enabled). +// +// Dispatching of notifications occurs in parallel to processing target updates to avoid one starving the other. +// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details. +func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) { + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + defer wg.Done() + n.targetUpdateLoop(tsets) + }() + + go func() { + defer wg.Done() + n.sendLoop() + n.drainQueue() + }() + + wg.Wait() + level.Info(n.logger).Log("msg", "Notification manager stopped") +} + // sendLoop continuously consumes the notifications queue and sends alerts to // the configured Alertmanagers. func (n *Manager) sendLoop() { for { + // If we've been asked to stop, that takes priority over sending any further notifications. select { - case <-n.ctx.Done(): + case <-n.stopRequested: return - case <-n.more: - } - alerts := n.nextBatch() + default: + select { + case <-n.stopRequested: + return - if !n.sendAll(alerts...) { - n.metrics.dropped.Add(float64(len(alerts))) - } - // If the queue still has items left, kick off the next iteration. - if n.queueLen() > 0 { - n.setMore() + case <-n.more: + n.sendOneBatch() + + // If the queue still has items left, kick off the next iteration. + if n.queueLen() > 0 { + n.setMore() + } + } } } } -// Run receives updates of target groups and triggers a reload. -// The dispatching of notifications occurs in the background to prevent blocking the receipt of target updates. -// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details. -func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) { - go n.sendLoop() +// targetUpdateLoop receives updates of target groups and triggers a reload. +func (n *Manager) targetUpdateLoop(tsets <-chan map[string][]*targetgroup.Group) { for { + // If we've been asked to stop, that takes priority over processing any further target group updates. select { - case <-n.ctx.Done(): + case <-n.stopRequested: return - case ts := <-tsets: - n.reload(ts) + default: + select { + case <-n.stopRequested: + return + case ts := <-tsets: + n.reload(ts) + } + } + } +} + +func (n *Manager) sendOneBatch() { + alerts := n.nextBatch() + + if !n.sendAll(alerts...) { + n.metrics.dropped.Add(float64(len(alerts))) + } +} + +func (n *Manager) drainQueue() { + if !n.opts.DrainOnShutdown { + if n.queueLen() > 0 { + level.Warn(n.logger).Log("msg", "Draining remaining notifications on shutdown is disabled, and some notifications have been dropped", "count", n.queueLen()) + n.metrics.dropped.Add(float64(n.queueLen())) } + + return + } + + level.Info(n.logger).Log("msg", "Draining any remaining notifications...") + + for n.queueLen() > 0 { + n.sendOneBatch() } + + level.Info(n.logger).Log("msg", "Remaining notifications drained") } func (n *Manager) reload(tgs map[string][]*targetgroup.Group) { @@ -546,7 +604,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool { for _, am := range ams.ams { wg.Add(1) - ctx, cancel := context.WithTimeout(n.ctx, time.Duration(ams.cfg.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(ams.cfg.Timeout)) defer cancel() go func(ctx context.Context, client *http.Client, url string, payload []byte, count int) { @@ -624,10 +682,19 @@ func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []b return nil } -// Stop shuts down the notification handler. +// Stop signals the notification manager to shut down and immediately returns. +// +// Run will return once the notification manager has successfully shut down. +// +// The manager will optionally drain any queued notifications before shutting down. +// +// Stop is safe to call multiple times. func (n *Manager) Stop() { level.Info(n.logger).Log("msg", "Stopping notification manager...") - n.cancel() + + n.stopOnce.Do(func() { + close(n.stopRequested) + }) } // Alertmanager holds Alertmanager endpoint information. diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index 03290a58ca..2cdaa9e06d 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -847,3 +847,173 @@ loop2: } } } + +func TestStop_DrainingDisabled(t *testing.T) { + releaseReceiver := make(chan struct{}) + receiverReceivedRequest := make(chan struct{}, 2) + alertsReceived := atomic.NewInt64(0) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Let the test know we've received a request. + receiverReceivedRequest <- struct{}{} + + var alerts []*Alert + + b, err := io.ReadAll(r.Body) + require.NoError(t, err) + + err = json.Unmarshal(b, &alerts) + require.NoError(t, err) + + alertsReceived.Add(int64(len(alerts))) + + // Wait for the test to release us. + <-releaseReceiver + + w.WriteHeader(http.StatusOK) + })) + defer func() { + server.Close() + }() + + m := NewManager( + &Options{ + QueueCapacity: 10, + DrainOnShutdown: false, + }, + nil, + ) + + m.alertmanagers = make(map[string]*alertmanagerSet) + + am1Cfg := config.DefaultAlertmanagerConfig + am1Cfg.Timeout = model.Duration(time.Second) + + m.alertmanagers["1"] = &alertmanagerSet{ + ams: []alertmanager{ + alertmanagerMock{ + urlf: func() string { return server.URL }, + }, + }, + cfg: &am1Cfg, + } + + notificationManagerStopped := make(chan struct{}) + + go func() { + defer close(notificationManagerStopped) + m.Run(nil) + }() + + // Queue two alerts. The first should be immediately sent to the receiver, which should block until we release it later. + m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")}) + + select { + case <-receiverReceivedRequest: + // Nothing more to do. + case <-time.After(time.Second): + require.FailNow(t, "gave up waiting for receiver to receive notification of first alert") + } + + m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")}) + + // Stop the notification manager, pause to allow the shutdown to be observed, and then allow the receiver to proceed. + m.Stop() + time.Sleep(time.Second) + close(releaseReceiver) + + // Wait for the notification manager to stop and confirm only the first notification was sent. + // The second notification should be dropped. + select { + case <-notificationManagerStopped: + // Nothing more to do. + case <-time.After(time.Second): + require.FailNow(t, "gave up waiting for notification manager to stop") + } + + require.Equal(t, int64(1), alertsReceived.Load()) +} + +func TestStop_DrainingEnabled(t *testing.T) { + releaseReceiver := make(chan struct{}) + receiverReceivedRequest := make(chan struct{}, 2) + alertsReceived := atomic.NewInt64(0) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Let the test know we've received a request. + receiverReceivedRequest <- struct{}{} + + var alerts []*Alert + + b, err := io.ReadAll(r.Body) + require.NoError(t, err) + + err = json.Unmarshal(b, &alerts) + require.NoError(t, err) + + alertsReceived.Add(int64(len(alerts))) + + // Wait for the test to release us. + <-releaseReceiver + + w.WriteHeader(http.StatusOK) + })) + defer func() { + server.Close() + }() + + m := NewManager( + &Options{ + QueueCapacity: 10, + DrainOnShutdown: true, + }, + nil, + ) + + m.alertmanagers = make(map[string]*alertmanagerSet) + + am1Cfg := config.DefaultAlertmanagerConfig + am1Cfg.Timeout = model.Duration(time.Second) + + m.alertmanagers["1"] = &alertmanagerSet{ + ams: []alertmanager{ + alertmanagerMock{ + urlf: func() string { return server.URL }, + }, + }, + cfg: &am1Cfg, + } + + notificationManagerStopped := make(chan struct{}) + + go func() { + defer close(notificationManagerStopped) + m.Run(nil) + }() + + // Queue two alerts. The first should be immediately sent to the receiver, which should block until we release it later. + m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")}) + + select { + case <-receiverReceivedRequest: + // Nothing more to do. + case <-time.After(time.Second): + require.FailNow(t, "gave up waiting for receiver to receive notification of first alert") + } + + m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")}) + + // Stop the notification manager and allow the receiver to proceed. + m.Stop() + close(releaseReceiver) + + // Wait for the notification manager to stop and confirm both notifications were sent. + select { + case <-notificationManagerStopped: + // Nothing more to do. + case <-time.After(200 * time.Millisecond): + require.FailNow(t, "gave up waiting for notification manager to stop") + } + + require.Equal(t, int64(2), alertsReceived.Load()) +} From ab6bd47f5fe04e168711bc145eb3dbef8477c32a Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Thu, 27 Jun 2024 13:06:57 +1000 Subject: [PATCH 16/16] Bump golangci-lint version Signed-off-by: Charles Korn --- .github/workflows/golangci-lint.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index d2fb1175c2..f13d513432 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -29,4 +29,4 @@ jobs: with: args: --verbose # Make sure to sync this with Makefile.common and scripts/golangci-lint.yml. - version: v1.59.0 + version: v1.59.1