Skip to content

Commit

Permalink
Added partitions support to Distributor.UserStats()
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed Feb 16, 2024
1 parent 6e6a002 commit c217503
Show file tree
Hide file tree
Showing 3 changed files with 767 additions and 83 deletions.
128 changes: 86 additions & 42 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1940,7 +1940,7 @@ func (cm *labelValuesCardinalityConcurrentMap) toLabelValuesCardinalityResponse(
labelValueSeriesCountMap := make(map[string]uint64, len(labelValueSeriesCountMapByZone))

for labelValue, seriesCountMapByZone := range labelValueSeriesCountMapByZone {
labelValueSeriesCountMap[labelValue] = approximateFromZones(zoneCount, replicationFactor, seriesCountMapByZone)
labelValueSeriesCountMap[labelValue] = approximateFromZones(zoneCount > 1, replicationFactor, seriesCountMapByZone)
}

cardinalityItems = append(cardinalityItems, &ingester_client.LabelValueSeriesCount{
Expand Down Expand Up @@ -2137,13 +2137,16 @@ func (r *activeSeriesResponse) result() []labels.Labels {

// approximateFromZones computes a zonal value while factoring in replication.
// e.g. series cardinality or ingestion rate.
func approximateFromZones[T ~float64 | ~uint64](zoneCount int, replicationFactor int, seriesCountMapByZone map[string]T) T {
//
// If Mimir isn't deployed in a multi-zone configuration, approximateFromZones
// divides the sum of all values by the replication factor to come up with an approximation.
func approximateFromZones[T ~float64 | ~uint64](isMultiZone bool, replicationFactor int, seriesCountMapByZone map[string]T) T {
// If we have more than one zone, we return the max value across zones.
// Values can be different across zones due to incomplete replication or
// other issues. Any inconsistency should always be an underestimation of
// the real value, so we take the max to get the best available
// approximation.
if zoneCount > 1 {
if isMultiZone {
var max T
for _, seriesCount := range seriesCountMapByZone {
if seriesCount > max {
Expand All @@ -2162,7 +2165,7 @@ func approximateFromZones[T ~float64 | ~uint64](zoneCount int, replicationFactor
for _, seriesCount := range seriesCountMapByZone {
sum += seriesCount
}
return sum / T(replicationFactor)
return T(math.Round(float64(sum) / float64(replicationFactor)))
}

// LabelNames returns the names of all labels from series with samples timestamp between from and to, and matching
Expand Down Expand Up @@ -2281,66 +2284,107 @@ func (d *Distributor) MetricsMetadata(ctx context.Context, req *ingester_client.

// UserStats returns statistics about the current user.
func (d *Distributor) UserStats(ctx context.Context, countMethod cardinality.CountMethod) (*UserStats, error) {
//nolint:staticcheck
replicationSet, err := d.getIngesterReplicationSetForQuery(ctx)
replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx)
if err != nil {
return nil, err
}

// If we have a single zone, we can't tolerate any errors.
if replicationSet.ZoneCount() == 1 {
replicationSet.MaxErrors = 0
}

type zonedUserStatsResponse struct {
zone string
resp *ingester_client.UserStatsResponse
}

// When ingest storage is disabled, if ingesters are running in a single zone we can't tolerate any errors.
// In this case we expect exactly 1 replication set.
if !d.cfg.IngestStorageConfig.Enabled && len(replicationSets) == 1 && replicationSets[0].ZoneCount() == 1 {
replicationSets[0].MaxErrors = 0
}

ingesterCountMethod, err := toIngesterCountMethod(countMethod)
if err != nil {
return nil, err
}
req := &ingester_client.UserStatsRequest{
CountMethod: ingesterCountMethod,
}
resps, err := ring.DoUntilQuorum[zonedUserStatsResponse](ctx, replicationSet, d.queryQuorumConfig(ctx, replicationSet), func(ctx context.Context, desc *ring.InstanceDesc) (zonedUserStatsResponse, error) {
poolClient, err := d.ingesterPool.GetClientForInstance(*desc)
if err != nil {
return zonedUserStatsResponse{}, err
}

client := poolClient.(ingester_client.IngesterClient)
resp, err := client.UserStats(ctx, req)
var (
req = &ingester_client.UserStatsRequest{CountMethod: ingesterCountMethod}
quorumConfig = d.queryQuorumConfigForReplicationSets(ctx, replicationSets)

responsesByReplicationSetMx = sync.Mutex{}
responsesByReplicationSet = make(map[int][]zonedUserStatsResponse, len(replicationSets))
)

// Fetch user stats from each ingester and collect responses by ReplicationSet.
//
// When ingest storage is enabled we expect 1 ReplicationSet for each partition. A series is sharded only to 1
// partition and the number of successful responses for each partition (ReplicationSet) may be different when
// ingesters request minimization is disabled. For this reason, we collect responses by ReplicationSet, so that
// we can later estimate the number of series with a higher accuracy.
err = concurrency.ForEachJob(ctx, len(replicationSets), 0, func(ctx context.Context, replicationSetIdx int) error {
replicationSet := replicationSets[replicationSetIdx]

resps, err := ring.DoUntilQuorum[zonedUserStatsResponse](ctx, replicationSet, quorumConfig, func(ctx context.Context, desc *ring.InstanceDesc) (zonedUserStatsResponse, error) {
poolClient, err := d.ingesterPool.GetClientForInstance(*desc)
if err != nil {
return zonedUserStatsResponse{}, err
}

client := poolClient.(ingester_client.IngesterClient)
resp, err := client.UserStats(ctx, req)
if err != nil {
return zonedUserStatsResponse{}, err
}

return zonedUserStatsResponse{zone: desc.Zone, resp: resp}, nil
}, func(zonedUserStatsResponse) {})

if err != nil {
return zonedUserStatsResponse{}, err
return err
}
return zonedUserStatsResponse{zone: desc.Zone, resp: resp}, nil
}, func(zusr zonedUserStatsResponse) {})

// Collect the response.
responsesByReplicationSetMx.Lock()
responsesByReplicationSet[replicationSetIdx] = resps
responsesByReplicationSetMx.Unlock()

return nil
})

if err != nil {
return nil, err
}

var (
zoneIngestionRate = map[string]float64{}
zoneAPIIngestionRate = map[string]float64{}
zoneRuleIngestionRate = map[string]float64{}
zoneNumSeries = map[string]uint64{}
)
// We need to take the lock because the ring.DoUntilQuorum() returns as soon as quorum is reached
// but there's no guarantee that once it returns there are no in-flight callback functions.
responsesByReplicationSetMx.Lock()
defer responsesByReplicationSetMx.Unlock()

// collect responses by zone
for _, r := range resps {
zoneIngestionRate[r.zone] += r.resp.IngestionRate
zoneAPIIngestionRate[r.zone] += r.resp.ApiIngestionRate
zoneRuleIngestionRate[r.zone] += r.resp.RuleIngestionRate
zoneNumSeries[r.zone] += r.resp.NumSeries
}
totalStats := &UserStats{}

for replicationSetIdx, resps := range responsesByReplicationSet {
var (
replicationSet = replicationSets[replicationSetIdx]
zoneIngestionRate = map[string]float64{}
zoneAPIIngestionRate = map[string]float64{}
zoneRuleIngestionRate = map[string]float64{}
zoneNumSeries = map[string]uint64{}
)

// Collect responses by zone.
for _, r := range resps {
zoneIngestionRate[r.zone] += r.resp.IngestionRate
zoneAPIIngestionRate[r.zone] += r.resp.ApiIngestionRate
zoneRuleIngestionRate[r.zone] += r.resp.RuleIngestionRate
zoneNumSeries[r.zone] += r.resp.NumSeries
}

// When the ingest storage is enabled a partition is owned by only 1 ingester per zone,
// so regardless the number of zones we have it's behaving like multi-zone is always enabled.
isMultiZone := d.cfg.IngestStorageConfig.Enabled || replicationSet.ZoneCount() > 1

totalStats := &UserStats{
IngestionRate: approximateFromZones(replicationSet.ZoneCount(), d.ingestersRing.ReplicationFactor(), zoneIngestionRate),
APIIngestionRate: approximateFromZones(replicationSet.ZoneCount(), d.ingestersRing.ReplicationFactor(), zoneAPIIngestionRate),
RuleIngestionRate: approximateFromZones(replicationSet.ZoneCount(), d.ingestersRing.ReplicationFactor(), zoneRuleIngestionRate),
NumSeries: approximateFromZones(replicationSet.ZoneCount(), d.ingestersRing.ReplicationFactor(), zoneNumSeries),
totalStats.IngestionRate += approximateFromZones(isMultiZone, d.ingestersRing.ReplicationFactor(), zoneIngestionRate)
totalStats.APIIngestionRate += approximateFromZones(isMultiZone, d.ingestersRing.ReplicationFactor(), zoneAPIIngestionRate)
totalStats.RuleIngestionRate += approximateFromZones(isMultiZone, d.ingestersRing.ReplicationFactor(), zoneRuleIngestionRate)
totalStats.NumSeries += approximateFromZones(isMultiZone, d.ingestersRing.ReplicationFactor(), zoneNumSeries)
}

return totalStats, nil
Expand Down
Loading

0 comments on commit c217503

Please sign in to comment.