Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use partitions ring in write path and ingesters consumption #7376

Merged
merged 10 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/golang/snappy v0.0.4
github.com/google/gopacket v1.1.19
github.com/gorilla/mux v1.8.1
github.com/grafana/dskit v0.0.0-20240213103939-80881aa4a62f
github.com/grafana/dskit v0.0.0-20240214175438-3cabc9619ece
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/json-iterator/go v1.1.12
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,8 @@ github.com/gosimple/slug v1.1.1 h1:fRu/digW+NMwBIP+RmviTK97Ho/bEj/C9swrCspN3D4=
github.com/gosimple/slug v1.1.1/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0=
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85Tnn+WEvr8fDpfwibmEPgfgFEaC87G24=
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4=
github.com/grafana/dskit v0.0.0-20240213103939-80881aa4a62f h1:SU2XpJOzuclXYls6LyMgmQhv2TVLe7Oj6UbezYxZeM0=
github.com/grafana/dskit v0.0.0-20240213103939-80881aa4a62f/go.mod h1:x5DMwyr1kyirtHOxoFSZ7RnyOgHdGh03ZruupdPetQM=
github.com/grafana/dskit v0.0.0-20240214175438-3cabc9619ece h1:4t5jKvI8Fq13ozG9gCkOMlvECl82EbO2COnniHUP+Uc=
github.com/grafana/dskit v0.0.0-20240214175438-3cabc9619ece/go.mod h1:x5DMwyr1kyirtHOxoFSZ7RnyOgHdGh03ZruupdPetQM=
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM=
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI=
github.com/grafana/goautoneg v0.0.0-20231010094147-47ce5e72a9ae h1:Yxbw9jKGJVC6qAK5Ubzzb/qZwM6rRMMqaDc/d4Vp3pM=
Expand Down
11 changes: 11 additions & 0 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,17 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

// When using the ingest storage, wait until partitions are ACTIVE in the ring.
if flags["-ingest-storage.enabled"] == "true" {
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_partition_ring_partitions"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester-partitions"),
labels.MustNewMatcher(labels.MatchEqual, "state", "Active"))))

require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_partition_ring_partitions"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester-partitions"),
labels.MustNewMatcher(labels.MatchEqual, "state", "Active"))))
}

// Push a series for each user to Mimir.
now := time.Now()
expectedVectors := make([]model.Vector, numUsers)
Expand Down
9 changes: 9 additions & 0 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,15 @@ func TestRulerRemoteEvaluation_ShouldEnforceStrongReadConsistencyForDependentRul
// The distributor should have 512 tokens for the ingester ring and 1 for the distributor ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512+1), "cortex_ring_tokens_total"))

// Wait until partitions are ACTIVE in the ring.
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_partition_ring_partitions"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester-partitions"),
labels.MustNewMatcher(labels.MatchEqual, "state", "Active"))))

require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_partition_ring_partitions"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester-partitions"),
labels.MustNewMatcher(labels.MatchEqual, "state", "Active"))))

client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

Expand Down
51 changes: 34 additions & 17 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"math"
"net/http"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -156,13 +157,16 @@ type Distributor struct {
// Pool of []byte used when marshalling write requests.
writeRequestBytePool sync.Pool

// ingesterDoBatchPushWorkers is the Go function passed to ring.DoBatchWithOptions.
// doBatchPushWorkers is the Go function passed to ring.DoBatchWithOptions.
// It can be nil, in which case a simple `go f()` will be used.
// See Config.ReusableIngesterPushWorkers on how to configure this.
ingesterDoBatchPushWorkers func(func())
doBatchPushWorkers func(func())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: renamed because it's also used for partitions.


// ingestStorageWriter is the writer used when ingest storage is enabled.
ingestStorageWriter *ingest.Writer

// partitionsRing is the hash ring holding ingester partitions. It's used when ingest storage is enabled.
partitionsRing *ring.PartitionInstanceRing
}

// Config contains the configuration required to
Expand Down Expand Up @@ -248,7 +252,7 @@ const (
)

// New constructs a new Distributor
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, ingestersRing ring.ReadRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
clientMetrics := ingester_client.NewMetrics(reg)
if cfg.IngesterClientFactory == nil {
cfg.IngesterClientFactory = ring_client.PoolInstFunc(func(inst ring.InstanceDesc) (ring_client.PoolClient, error) {
Expand All @@ -270,6 +274,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
cfg: cfg,
log: log,
ingestersRing: ingestersRing,
partitionsRing: partitionsRing,
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
healthyInstancesCount: atomic.NewUint32(0),
limits: limits,
Expand Down Expand Up @@ -462,7 +467,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove

if cfg.ReusableIngesterPushWorkers > 0 {
wp := concurrency.NewReusableGoroutinesPool(cfg.ReusableIngesterPushWorkers)
d.ingesterDoBatchPushWorkers = wp.Go
d.doBatchPushWorkers = wp.Go
// Closing the pool doesn't stop the workload it's running, we're doing this just to avoid leaking goroutines in tests.
subservices = append(subservices, services.NewBasicService(
nil,
Expand Down Expand Up @@ -1344,24 +1349,36 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {

// Get both series and metadata keys in one slice.
keys, initialMetadataIndex := getSeriesAndMetadataTokens(userID, req)
// Get a subring if tenant has shuffle shard size configured.
subRing := d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID))

// Get the tenant's subring to use to either write to ingesters or partitions.
var tenantRing ring.DoBatchRing
if d.cfg.IngestStorageConfig.Enabled {
subring, err := d.partitionsRing.ShuffleShard(userID, d.limits.IngestionPartitionsTenantShardSize(userID))
if err != nil {
return err
}

tenantRing = ring.NewActivePartitionBatchRing(subring.PartitionRing())
} else {
tenantRing = d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID))
}

// we must not re-use buffers now until all DoBatch goroutines have finished,
// so set this flag false and pass cleanup() to DoBatch.
cleanupInDefer = false
err = ring.DoBatchWithOptions(ctx, ring.WriteNoExtend, subRing, keys,
func(ingester ring.InstanceDesc, indexes []int) error {

err = ring.DoBatchWithOptions(ctx, ring.WriteNoExtend, tenantRing, keys,
func(instance ring.InstanceDesc, indexes []int) error {
req := req.ForIndexes(indexes, initialMetadataIndex)

// Do not cancel the remoteRequestContext in this callback:
// there are more callbacks using it at the same time.
localCtx, _ := remoteRequestContext()
var err error
if d.cfg.IngestStorageConfig.Enabled {
err = d.sendToStorage(localCtx, userID, ingester, req)
err = d.sendToStorage(localCtx, userID, instance, req)
} else {
err = d.sendToIngester(localCtx, ingester, req)
err = d.sendToIngester(localCtx, instance, req)
}

if errors.Is(err, context.DeadlineExceeded) {
Expand All @@ -1376,7 +1393,7 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
cancel()
},
IsClientError: isClientError,
Go: d.ingesterDoBatchPushWorkers,
Go: d.doBatchPushWorkers,
},
)

Expand Down Expand Up @@ -1455,16 +1472,16 @@ func (d *Distributor) sendToIngester(ctx context.Context, ingester ring.Instance
return wrapIngesterPushError(err, ingester.Id)
}

// sendToStorage sends received data to the object storage, computing the partition based on the input ingester.
// This function is used when ingest storage is enabled.
func (d *Distributor) sendToStorage(ctx context.Context, userID string, ingester ring.InstanceDesc, req *mimirpb.WriteRequest) error {
//nolint:staticcheck
partitionID, err := ingest.IngesterZonalPartition(ingester.Id)
// sendToStorage sends received data to the configured ingest storage. This function is used when ingest storage is enabled.
func (d *Distributor) sendToStorage(ctx context.Context, userID string, partition ring.InstanceDesc, req *mimirpb.WriteRequest) error {
// The partition ID is stored in the ring.InstanceDesc Id.
partitionID, err := strconv.ParseUint(partition.Id, 10, 31)
if err != nil {
return err
}

return d.ingestStorageWriter.WriteSync(ctx, partitionID, userID, req)
err = d.ingestStorageWriter.WriteSync(ctx, int32(partitionID), userID, req)
return wrapPartitionPushError(err, int32(partitionID))
}

// forReplicationSet runs f, in parallel, for all ingesters in the input replication set.
Expand Down
Loading
Loading