From c32c6f78ce03763236a77a4376928e9f49230c37 Mon Sep 17 00:00:00 2001 From: alex-goodisman <42395598+alex-goodisman@users.noreply.github.com> Date: Mon, 29 Apr 2024 12:04:55 -0400 Subject: [PATCH] OplogToRedis: Improve metrics handling for parallelism (#73) Collection of a few metrics improvements: - staleness metric is now on the publisher side in addition to the tail side - added an additional staleness metric at the handoff between read and write - staleness metrics now sharded by write parallelism - lastprocessedtimestamp now computed separately per write parallelism, and the minimum is taken - use proper coroutine variable reference when spinning up parallel coroutines in main to guarantee correct logs --- lib/oplog/tail.go | 23 +++++++-------- lib/oplog/tail_test.go | 4 +-- lib/redispub/lastProcessedTime.go | 24 ++++++++++++++-- lib/redispub/lastProcessedTime_test.go | 12 ++++---- lib/redispub/publisher.go | 39 ++++++++++++++++++-------- lib/redispub/publisher_test.go | 4 +-- main.go | 36 +++++++++++------------- 7 files changed, 89 insertions(+), 53 deletions(-) diff --git a/lib/oplog/tail.go b/lib/oplog/tail.go index 1bdce758..59f9793a 100644 --- a/lib/oplog/tail.go +++ b/lib/oplog/tail.go @@ -95,19 +95,19 @@ var ( }, }, []string{"database", "status"}) - metricLastOplogEntryStaleness = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: "otr", - Subsystem: "oplog", - Name: "last_entry_staleness_seconds", - Help: "Gauge recording the difference between this server's clock and the timestamp on the last read oplog entry.", - }) - metricOplogEntriesFiltered = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "otr", Subsystem: "oplog", Name: "entries_filtered", Help: "Oplog entries filtered by denylist", }, []string{"database"}) + + metricLastReceivedStaleness = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "otr", + Subsystem: "oplog", + Name: "last_received_staleness", + Help: "Gauge recording the difference between this server's clock and the timestamp on the last read oplog entry.", + }) ) func init() { @@ -151,7 +151,7 @@ func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan b oplogCollection := session.Client().Database("local").Collection("oplog.rs") - startTime := tailer.getStartTime(func() (*primitive.Timestamp, error) { + startTime := tailer.getStartTime(parallelismSize-1, func() (*primitive.Timestamp, error) { // Get the timestamp of the last entry in the oplog (as a position to // start from if we don't have a last-written timestamp from Redis) var entry rawOplogEntry @@ -357,6 +357,7 @@ func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map) (time status := "ignored" database := "(no database)" messageLen := float64(len(rawData)) + metricLastReceivedStaleness.Set(float64(time.Since(time.Unix(int64(timestamp.T), 0)))) defer func() { // TODO: remove these in a future version @@ -365,7 +366,6 @@ func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map) (time metricOplogEntriesBySize.WithLabelValues(database, status).Observe(messageLen) metricMaxOplogEntryByMinute.Report(messageLen, database, status) - metricLastOplogEntryStaleness.Set(float64(time.Since(time.Unix(int64(timestamp.T), 0)))) }() if len(entries) > 0 { @@ -422,8 +422,9 @@ func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map) (time // We take the function to get the timestamp of the last oplog entry (as a // fallback if we don't have a latest timestamp from Redis) as an arg instead // of using tailer.mongoClient directly so we can unit test this function -func (tailer *Tailer) getStartTime(getTimestampOfLastOplogEntry func() (*primitive.Timestamp, error)) primitive.Timestamp { - ts, tsTime, redisErr := redispub.LastProcessedTimestamp(tailer.RedisClients[0], tailer.RedisPrefix) +func (tailer *Tailer) getStartTime(maxOrdinal int, getTimestampOfLastOplogEntry func() (*primitive.Timestamp, error)) primitive.Timestamp { + // Get the earliest "last processed time" for each shard. This assumes that the number of shards is constant. + ts, tsTime, redisErr := redispub.FirstLastProcessedTimestamp(tailer.RedisClients[0], tailer.RedisPrefix, maxOrdinal) if redisErr == nil { // we have a last write time, check that it's not too far in the diff --git a/lib/oplog/tail_test.go b/lib/oplog/tail_test.go index 81fcd0b4..651e346d 100644 --- a/lib/oplog/tail_test.go +++ b/lib/oplog/tail_test.go @@ -74,7 +74,7 @@ func TestGetStartTime(t *testing.T) { panic(err) } defer redisServer.Close() - require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry", strconv.FormatInt(int64(test.redisTimestamp.T), 10))) + require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry.0", strconv.FormatInt(int64(test.redisTimestamp.T), 10))) redisClient := []redis.UniversalClient{redis.NewUniversalClient(&redis.UniversalOptions{ Addrs: []string{redisServer.Addr()}, @@ -87,7 +87,7 @@ func TestGetStartTime(t *testing.T) { Denylist: &sync.Map{}, } - actualResult := tailer.getStartTime(func() (*primitive.Timestamp, error) { + actualResult := tailer.getStartTime(0, func() (*primitive.Timestamp, error) { if test.mongoEndOfOplogErr != nil { return nil, test.mongoEndOfOplogErr } diff --git a/lib/redispub/lastProcessedTime.go b/lib/redispub/lastProcessedTime.go index dbfa44ba..f320b641 100644 --- a/lib/redispub/lastProcessedTime.go +++ b/lib/redispub/lastProcessedTime.go @@ -2,6 +2,7 @@ package redispub import ( "context" + "strconv" "time" "github.com/go-redis/redis/v8" @@ -17,8 +18,8 @@ import ( // // If oplogtoredis has not processed any messages, returns redis.Nil as an // error. -func LastProcessedTimestamp(redisClient redis.UniversalClient, metadataPrefix string) (primitive.Timestamp, time.Time, error) { - str, err := redisClient.Get(context.Background(), metadataPrefix+"lastProcessedEntry").Result() +func LastProcessedTimestamp(redisClient redis.UniversalClient, metadataPrefix string, ordinal int) (primitive.Timestamp, time.Time, error) { + str, err := redisClient.Get(context.Background(), metadataPrefix+"lastProcessedEntry."+strconv.Itoa(ordinal)).Result() if err != nil { return primitive.Timestamp{}, time.Unix(0, 0), err } @@ -31,3 +32,22 @@ func LastProcessedTimestamp(redisClient redis.UniversalClient, metadataPrefix st time := mongoTimestampToTime(ts) return ts, time, nil } + +// FirstLastProcessedTimestamp runs LastProcessedTimestamp for each ordinal up to the provided count, +// then returns the earliest such timestamp obtained. If any ordinal produces an error, that error is returned. +func FirstLastProcessedTimestamp(redisClient redis.UniversalClient, metadataPrefix string, maxOrdinal int) (primitive.Timestamp, time.Time, error) { + var minTs primitive.Timestamp + var minTime time.Time + for i := 0; i <= maxOrdinal; i++ { + ts, time, err := LastProcessedTimestamp(redisClient, metadataPrefix, i) + if err != nil { + return ts, time, err + } + + if i == 0 || ts.Before(minTs) { + minTs = ts + minTime = time + } + } + return minTs, minTime, nil +} diff --git a/lib/redispub/lastProcessedTime_test.go b/lib/redispub/lastProcessedTime_test.go index ffb4b185..acd689dc 100644 --- a/lib/redispub/lastProcessedTime_test.go +++ b/lib/redispub/lastProcessedTime_test.go @@ -31,9 +31,9 @@ func TestLastProcessedTimestampSuccess(t *testing.T) { redisServer, redisClient := startMiniredis() defer redisServer.Close() - require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry", encodeMongoTimestamp(nowTS))) + require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry.0", encodeMongoTimestamp(nowTS))) - gotTS, gotTime, err := LastProcessedTimestamp(redisClient, "someprefix.") + gotTS, gotTime, err := LastProcessedTimestamp(redisClient, "someprefix.", 0) if err != nil { t.Errorf("Got unexpected error: %s", err) @@ -52,7 +52,7 @@ func TestLastProcessedTimestampNoRecord(t *testing.T) { redisServer, redisClient := startMiniredis() defer redisServer.Close() - _, _, err := LastProcessedTimestamp(redisClient, "someprefix.") + _, _, err := LastProcessedTimestamp(redisClient, "someprefix.", 0) if err == nil { t.Errorf("Expected redis.Nil error, got no error") @@ -64,9 +64,9 @@ func TestLastProcessedTimestampNoRecord(t *testing.T) { func TestLastProcessedTimestampInvalidRecord(t *testing.T) { redisServer, redisClient := startMiniredis() defer redisServer.Close() - require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry", "not a number")) + require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry.0", "not a number")) - _, _, err := LastProcessedTimestamp(redisClient, "someprefix.") + _, _, err := LastProcessedTimestamp(redisClient, "someprefix.", 0) if err == nil { t.Errorf("Expected strconv error, got no error") @@ -80,7 +80,7 @@ func TestLastProcessedTimestampRedisError(t *testing.T) { Addrs: []string{"not a server"}, }) - _, _, err := LastProcessedTimestamp(redisClient, "someprefix.") + _, _, err := LastProcessedTimestamp(redisClient, "someprefix.", 0) if err == nil { t.Errorf("Expected TCP error, got no error") diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index f6363eac..6236c588 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -7,6 +7,7 @@ package redispub import ( "context" "fmt" + "strconv" "strings" "time" @@ -61,28 +62,43 @@ var metricLastCommandDuration = promauto.NewGauge(prometheus.GaugeOpts{ Help: "The round trip time in seconds of the most recent write to Redis.", }) +var metricStalenessPreRetries = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "otr", + Subsystem: "redispub", + Name: "pre_retry_staleness", + Help: "Gauge recording the staleness on receiving a message from the tailing routine.", +}, []string{"ordinal"}) + +var metricLastOplogEntryStaleness = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "otr", + Subsystem: "redispub", + Name: "last_entry_staleness_seconds", + Help: "Gauge recording the difference between this server's clock and the timestamp on the last published oplog entry.", +}, []string{"ordinal"}) + // PublishStream reads Publications from the given channel and publishes them // to Redis. -func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts *PublishOpts, stop <-chan bool) { +func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts *PublishOpts, stop <-chan bool, ordinal int) { + // Start up a background goroutine for periodically updating the last-processed // timestamp timestampC := make(chan primitive.Timestamp) - for _,client := range clients { - go periodicallyUpdateTimestamp(client, timestampC, opts) + for _, client := range clients { + go periodicallyUpdateTimestamp(client, timestampC, opts, ordinal) } // Redis expiration is in integer seconds, so we have to convert the // time.Duration dedupeExpirationSeconds := int(opts.DedupeExpiration.Seconds()) - type PubFn func(*Publication)error + type PubFn func(*Publication) error var publishFns []PubFn - for _,client := range clients { + for _, client := range clients { client := client publishFn := func(p *Publication) error { - return publishSingleMessage(p, client, opts.MetadataPrefix, dedupeExpirationSeconds) + return publishSingleMessage(p, client, opts.MetadataPrefix, dedupeExpirationSeconds, ordinal) } publishFns = append(publishFns, publishFn) } @@ -97,11 +113,11 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts return case p := <-in: - for i,publishFn := range publishFns { + metricStalenessPreRetries.WithLabelValues(strconv.Itoa(ordinal)).Set(float64(time.Since(time.Unix(int64(p.OplogTimestamp.T), 0)).Seconds())) + for i, publishFn := range publishFns { err := publishSingleMessageWithRetries(p, 30, time.Second, publishFn) log.Log.Debugw("Published to", "idx", i) - if err != nil { metricSendFailed.Inc() log.Log.Errorw("Permanent error while trying to publish message; giving up", @@ -146,8 +162,9 @@ func publishSingleMessageWithRetries(p *Publication, maxRetries int, sleepTime t return errors.Errorf("sending message (retried %v times)", maxRetries) } -func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix string, dedupeExpirationSeconds int) error { +func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix string, dedupeExpirationSeconds int, ordinal int) error { start := time.Now() + metricLastOplogEntryStaleness.WithLabelValues(strconv.Itoa(ordinal)).Set(float64(time.Since(time.Unix(int64(p.OplogTimestamp.T), 0)).Seconds())) _, err := publishDedupe.Run( context.Background(), @@ -181,14 +198,14 @@ func formatKey(p *Publication, prefix string) string { // channel, and this function throttles that to only update occasionally. // // This blocks forever; it should be run in a goroutine -func periodicallyUpdateTimestamp(client redis.UniversalClient, timestamps <-chan primitive.Timestamp, opts *PublishOpts) { +func periodicallyUpdateTimestamp(client redis.UniversalClient, timestamps <-chan primitive.Timestamp, opts *PublishOpts, ordinal int) { var lastFlush time.Time var mostRecentTimestamp primitive.Timestamp var needFlush bool flush := func() { if needFlush { - client.Set(context.Background(), opts.MetadataPrefix+"lastProcessedEntry", encodeMongoTimestamp(mostRecentTimestamp), 0) + client.Set(context.Background(), opts.MetadataPrefix+"lastProcessedEntry."+strconv.Itoa(ordinal), encodeMongoTimestamp(mostRecentTimestamp), 0) lastFlush = time.Now() needFlush = false } diff --git a/lib/redispub/publisher_test.go b/lib/redispub/publisher_test.go index 0d7f39fa..66553f22 100644 --- a/lib/redispub/publisher_test.go +++ b/lib/redispub/publisher_test.go @@ -121,11 +121,11 @@ func TestPeriodicallyUpdateTimestamp(t *testing.T) { periodicallyUpdateTimestamp(redisClient, timestampC, &PublishOpts{ MetadataPrefix: "someprefix.", FlushInterval: testSpeed, - }) + }, 0) waitGroup.Done() }() - key := "someprefix.lastProcessedEntry" + key := "someprefix.lastProcessedEntry.0" // Key should be unset if redisServer.Exists(key) { diff --git a/main.go b/main.go index 1aab503d..f4b5ad0b 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,7 @@ import ( "net/http/pprof" "os" "os/signal" + "strconv" "strings" "sync" @@ -69,16 +70,16 @@ func main() { if err != nil { panic(fmt.Sprintf("[%d] Error initializing Redis client: %s", i, err.Error())) } - defer func() { + defer func(ordinal int) { for _, redisClient := range redisClients { redisCloseErr := redisClient.Close() if redisCloseErr != nil { log.Log.Errorw("Error closing Redis client", "error", redisCloseErr, - "i", i) + "i", ordinal) } } - }() + }(i) log.Log.Infow("Initialized connection to Redis", "i", i) aggregatedRedisClients[i] = redisClients @@ -98,30 +99,27 @@ func main() { stopRedisPub := make(chan bool) waitGroup.Add(1) - go func() { + go func(ordinal int) { redispub.PublishStream(redisClients, redisPubs, &redispub.PublishOpts{ FlushInterval: config.TimestampFlushInterval(), DedupeExpiration: config.RedisDedupeExpiration(), MetadataPrefix: config.RedisMetadataPrefix(), - }, stopRedisPub) - log.Log.Infow("Redis publisher completed", "i", i) + }, stopRedisPub, ordinal) + log.Log.Infow("Redis publisher completed", "i", ordinal) waitGroup.Done() - }() + }(i) log.Log.Info("Started up processing goroutines") stopRedisPubs[i] = stopRedisPub - } - promauto.NewGaugeFunc(prometheus.GaugeOpts{ - Namespace: "otr", - Name: "buffer_available", - Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.", - }, func() float64 { - total := 0 - for _, redisPubs := range aggregatedRedisPubs { - total += (bufferSize - len(redisPubs)) - } - return float64(total) - }) + promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "otr", + Name: "buffer_available", + Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.", + ConstLabels: prometheus.Labels{"ordinal": strconv.Itoa(i)}, + }, func() float64 { + return float64(bufferSize - len(redisPubs)) + }) + } stopOplogTail := make(chan bool) waitGroup.Add(1)