Skip to content

Commit

Permalink
OplogToRedis: Improve metrics handling for parallelism (#73)
Browse files Browse the repository at this point in the history
Collection of a few metrics improvements:
- staleness metric is now on the publisher side in addition to the tail side
- added an additional staleness metric at the handoff between read and write
- staleness metrics now sharded by write parallelism
- lastprocessedtimestamp now computed separately per write parallelism, and the minimum is taken
- use proper coroutine variable reference when spinning up parallel coroutines in main to guarantee correct logs
  • Loading branch information
alex-goodisman authored Apr 29, 2024
1 parent 38e75cd commit c32c6f7
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 53 deletions.
23 changes: 12 additions & 11 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,19 @@ var (
},
}, []string{"database", "status"})

metricLastOplogEntryStaleness = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "otr",
Subsystem: "oplog",
Name: "last_entry_staleness_seconds",
Help: "Gauge recording the difference between this server's clock and the timestamp on the last read oplog entry.",
})

metricOplogEntriesFiltered = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "otr",
Subsystem: "oplog",
Name: "entries_filtered",
Help: "Oplog entries filtered by denylist",
}, []string{"database"})

metricLastReceivedStaleness = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "otr",
Subsystem: "oplog",
Name: "last_received_staleness",
Help: "Gauge recording the difference between this server's clock and the timestamp on the last read oplog entry.",
})
)

func init() {
Expand Down Expand Up @@ -151,7 +151,7 @@ func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan b

oplogCollection := session.Client().Database("local").Collection("oplog.rs")

startTime := tailer.getStartTime(func() (*primitive.Timestamp, error) {
startTime := tailer.getStartTime(parallelismSize-1, func() (*primitive.Timestamp, error) {
// Get the timestamp of the last entry in the oplog (as a position to
// start from if we don't have a last-written timestamp from Redis)
var entry rawOplogEntry
Expand Down Expand Up @@ -357,6 +357,7 @@ func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map) (time
status := "ignored"
database := "(no database)"
messageLen := float64(len(rawData))
metricLastReceivedStaleness.Set(float64(time.Since(time.Unix(int64(timestamp.T), 0))))

defer func() {
// TODO: remove these in a future version
Expand All @@ -365,7 +366,6 @@ func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map) (time

metricOplogEntriesBySize.WithLabelValues(database, status).Observe(messageLen)
metricMaxOplogEntryByMinute.Report(messageLen, database, status)
metricLastOplogEntryStaleness.Set(float64(time.Since(time.Unix(int64(timestamp.T), 0))))
}()

if len(entries) > 0 {
Expand Down Expand Up @@ -422,8 +422,9 @@ func (tailer *Tailer) unmarshalEntry(rawData bson.Raw, denylist *sync.Map) (time
// We take the function to get the timestamp of the last oplog entry (as a
// fallback if we don't have a latest timestamp from Redis) as an arg instead
// of using tailer.mongoClient directly so we can unit test this function
func (tailer *Tailer) getStartTime(getTimestampOfLastOplogEntry func() (*primitive.Timestamp, error)) primitive.Timestamp {
ts, tsTime, redisErr := redispub.LastProcessedTimestamp(tailer.RedisClients[0], tailer.RedisPrefix)
func (tailer *Tailer) getStartTime(maxOrdinal int, getTimestampOfLastOplogEntry func() (*primitive.Timestamp, error)) primitive.Timestamp {
// Get the earliest "last processed time" for each shard. This assumes that the number of shards is constant.
ts, tsTime, redisErr := redispub.FirstLastProcessedTimestamp(tailer.RedisClients[0], tailer.RedisPrefix, maxOrdinal)

if redisErr == nil {
// we have a last write time, check that it's not too far in the
Expand Down
4 changes: 2 additions & 2 deletions lib/oplog/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestGetStartTime(t *testing.T) {
panic(err)
}
defer redisServer.Close()
require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry", strconv.FormatInt(int64(test.redisTimestamp.T), 10)))
require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry.0", strconv.FormatInt(int64(test.redisTimestamp.T), 10)))

redisClient := []redis.UniversalClient{redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: []string{redisServer.Addr()},
Expand All @@ -87,7 +87,7 @@ func TestGetStartTime(t *testing.T) {
Denylist: &sync.Map{},
}

actualResult := tailer.getStartTime(func() (*primitive.Timestamp, error) {
actualResult := tailer.getStartTime(0, func() (*primitive.Timestamp, error) {
if test.mongoEndOfOplogErr != nil {
return nil, test.mongoEndOfOplogErr
}
Expand Down
24 changes: 22 additions & 2 deletions lib/redispub/lastProcessedTime.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package redispub

import (
"context"
"strconv"
"time"

"github.com/go-redis/redis/v8"
Expand All @@ -17,8 +18,8 @@ import (
//
// If oplogtoredis has not processed any messages, returns redis.Nil as an
// error.
func LastProcessedTimestamp(redisClient redis.UniversalClient, metadataPrefix string) (primitive.Timestamp, time.Time, error) {
str, err := redisClient.Get(context.Background(), metadataPrefix+"lastProcessedEntry").Result()
func LastProcessedTimestamp(redisClient redis.UniversalClient, metadataPrefix string, ordinal int) (primitive.Timestamp, time.Time, error) {
str, err := redisClient.Get(context.Background(), metadataPrefix+"lastProcessedEntry."+strconv.Itoa(ordinal)).Result()
if err != nil {
return primitive.Timestamp{}, time.Unix(0, 0), err
}
Expand All @@ -31,3 +32,22 @@ func LastProcessedTimestamp(redisClient redis.UniversalClient, metadataPrefix st
time := mongoTimestampToTime(ts)
return ts, time, nil
}

// FirstLastProcessedTimestamp runs LastProcessedTimestamp for each ordinal up to the provided count,
// then returns the earliest such timestamp obtained. If any ordinal produces an error, that error is returned.
func FirstLastProcessedTimestamp(redisClient redis.UniversalClient, metadataPrefix string, maxOrdinal int) (primitive.Timestamp, time.Time, error) {
var minTs primitive.Timestamp
var minTime time.Time
for i := 0; i <= maxOrdinal; i++ {
ts, time, err := LastProcessedTimestamp(redisClient, metadataPrefix, i)
if err != nil {
return ts, time, err
}

if i == 0 || ts.Before(minTs) {
minTs = ts
minTime = time
}
}
return minTs, minTime, nil
}
12 changes: 6 additions & 6 deletions lib/redispub/lastProcessedTime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ func TestLastProcessedTimestampSuccess(t *testing.T) {
redisServer, redisClient := startMiniredis()
defer redisServer.Close()

require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry", encodeMongoTimestamp(nowTS)))
require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry.0", encodeMongoTimestamp(nowTS)))

gotTS, gotTime, err := LastProcessedTimestamp(redisClient, "someprefix.")
gotTS, gotTime, err := LastProcessedTimestamp(redisClient, "someprefix.", 0)

if err != nil {
t.Errorf("Got unexpected error: %s", err)
Expand All @@ -52,7 +52,7 @@ func TestLastProcessedTimestampNoRecord(t *testing.T) {
redisServer, redisClient := startMiniredis()
defer redisServer.Close()

_, _, err := LastProcessedTimestamp(redisClient, "someprefix.")
_, _, err := LastProcessedTimestamp(redisClient, "someprefix.", 0)

if err == nil {
t.Errorf("Expected redis.Nil error, got no error")
Expand All @@ -64,9 +64,9 @@ func TestLastProcessedTimestampNoRecord(t *testing.T) {
func TestLastProcessedTimestampInvalidRecord(t *testing.T) {
redisServer, redisClient := startMiniredis()
defer redisServer.Close()
require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry", "not a number"))
require.NoError(t, redisServer.Set("someprefix.lastProcessedEntry.0", "not a number"))

_, _, err := LastProcessedTimestamp(redisClient, "someprefix.")
_, _, err := LastProcessedTimestamp(redisClient, "someprefix.", 0)

if err == nil {
t.Errorf("Expected strconv error, got no error")
Expand All @@ -80,7 +80,7 @@ func TestLastProcessedTimestampRedisError(t *testing.T) {
Addrs: []string{"not a server"},
})

_, _, err := LastProcessedTimestamp(redisClient, "someprefix.")
_, _, err := LastProcessedTimestamp(redisClient, "someprefix.", 0)

if err == nil {
t.Errorf("Expected TCP error, got no error")
Expand Down
39 changes: 28 additions & 11 deletions lib/redispub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package redispub
import (
"context"
"fmt"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -61,28 +62,43 @@ var metricLastCommandDuration = promauto.NewGauge(prometheus.GaugeOpts{
Help: "The round trip time in seconds of the most recent write to Redis.",
})

var metricStalenessPreRetries = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "otr",
Subsystem: "redispub",
Name: "pre_retry_staleness",
Help: "Gauge recording the staleness on receiving a message from the tailing routine.",
}, []string{"ordinal"})

var metricLastOplogEntryStaleness = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "otr",
Subsystem: "redispub",
Name: "last_entry_staleness_seconds",
Help: "Gauge recording the difference between this server's clock and the timestamp on the last published oplog entry.",
}, []string{"ordinal"})

// PublishStream reads Publications from the given channel and publishes them
// to Redis.
func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts *PublishOpts, stop <-chan bool) {
func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts *PublishOpts, stop <-chan bool, ordinal int) {

// Start up a background goroutine for periodically updating the last-processed
// timestamp
timestampC := make(chan primitive.Timestamp)
for _,client := range clients {
go periodicallyUpdateTimestamp(client, timestampC, opts)
for _, client := range clients {
go periodicallyUpdateTimestamp(client, timestampC, opts, ordinal)
}

// Redis expiration is in integer seconds, so we have to convert the
// time.Duration
dedupeExpirationSeconds := int(opts.DedupeExpiration.Seconds())

type PubFn func(*Publication)error
type PubFn func(*Publication) error

var publishFns []PubFn

for _,client := range clients {
for _, client := range clients {
client := client
publishFn := func(p *Publication) error {
return publishSingleMessage(p, client, opts.MetadataPrefix, dedupeExpirationSeconds)
return publishSingleMessage(p, client, opts.MetadataPrefix, dedupeExpirationSeconds, ordinal)
}
publishFns = append(publishFns, publishFn)
}
Expand All @@ -97,11 +113,11 @@ func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts
return

case p := <-in:
for i,publishFn := range publishFns {
metricStalenessPreRetries.WithLabelValues(strconv.Itoa(ordinal)).Set(float64(time.Since(time.Unix(int64(p.OplogTimestamp.T), 0)).Seconds()))
for i, publishFn := range publishFns {
err := publishSingleMessageWithRetries(p, 30, time.Second, publishFn)
log.Log.Debugw("Published to", "idx", i)


if err != nil {
metricSendFailed.Inc()
log.Log.Errorw("Permanent error while trying to publish message; giving up",
Expand Down Expand Up @@ -146,8 +162,9 @@ func publishSingleMessageWithRetries(p *Publication, maxRetries int, sleepTime t
return errors.Errorf("sending message (retried %v times)", maxRetries)
}

func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix string, dedupeExpirationSeconds int) error {
func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix string, dedupeExpirationSeconds int, ordinal int) error {
start := time.Now()
metricLastOplogEntryStaleness.WithLabelValues(strconv.Itoa(ordinal)).Set(float64(time.Since(time.Unix(int64(p.OplogTimestamp.T), 0)).Seconds()))

_, err := publishDedupe.Run(
context.Background(),
Expand Down Expand Up @@ -181,14 +198,14 @@ func formatKey(p *Publication, prefix string) string {
// channel, and this function throttles that to only update occasionally.
//
// This blocks forever; it should be run in a goroutine
func periodicallyUpdateTimestamp(client redis.UniversalClient, timestamps <-chan primitive.Timestamp, opts *PublishOpts) {
func periodicallyUpdateTimestamp(client redis.UniversalClient, timestamps <-chan primitive.Timestamp, opts *PublishOpts, ordinal int) {
var lastFlush time.Time
var mostRecentTimestamp primitive.Timestamp
var needFlush bool

flush := func() {
if needFlush {
client.Set(context.Background(), opts.MetadataPrefix+"lastProcessedEntry", encodeMongoTimestamp(mostRecentTimestamp), 0)
client.Set(context.Background(), opts.MetadataPrefix+"lastProcessedEntry."+strconv.Itoa(ordinal), encodeMongoTimestamp(mostRecentTimestamp), 0)
lastFlush = time.Now()
needFlush = false
}
Expand Down
4 changes: 2 additions & 2 deletions lib/redispub/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ func TestPeriodicallyUpdateTimestamp(t *testing.T) {
periodicallyUpdateTimestamp(redisClient, timestampC, &PublishOpts{
MetadataPrefix: "someprefix.",
FlushInterval: testSpeed,
})
}, 0)
waitGroup.Done()
}()

key := "someprefix.lastProcessedEntry"
key := "someprefix.lastProcessedEntry.0"

// Key should be unset
if redisServer.Exists(key) {
Expand Down
36 changes: 17 additions & 19 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http/pprof"
"os"
"os/signal"
"strconv"
"strings"
"sync"

Expand Down Expand Up @@ -69,16 +70,16 @@ func main() {
if err != nil {
panic(fmt.Sprintf("[%d] Error initializing Redis client: %s", i, err.Error()))
}
defer func() {
defer func(ordinal int) {
for _, redisClient := range redisClients {
redisCloseErr := redisClient.Close()
if redisCloseErr != nil {
log.Log.Errorw("Error closing Redis client",
"error", redisCloseErr,
"i", i)
"i", ordinal)
}
}
}()
}(i)
log.Log.Infow("Initialized connection to Redis", "i", i)

aggregatedRedisClients[i] = redisClients
Expand All @@ -98,30 +99,27 @@ func main() {

stopRedisPub := make(chan bool)
waitGroup.Add(1)
go func() {
go func(ordinal int) {
redispub.PublishStream(redisClients, redisPubs, &redispub.PublishOpts{
FlushInterval: config.TimestampFlushInterval(),
DedupeExpiration: config.RedisDedupeExpiration(),
MetadataPrefix: config.RedisMetadataPrefix(),
}, stopRedisPub)
log.Log.Infow("Redis publisher completed", "i", i)
}, stopRedisPub, ordinal)
log.Log.Infow("Redis publisher completed", "i", ordinal)
waitGroup.Done()
}()
}(i)
log.Log.Info("Started up processing goroutines")
stopRedisPubs[i] = stopRedisPub
}

promauto.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "otr",
Name: "buffer_available",
Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.",
}, func() float64 {
total := 0
for _, redisPubs := range aggregatedRedisPubs {
total += (bufferSize - len(redisPubs))
}
return float64(total)
})
promauto.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "otr",
Name: "buffer_available",
Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.",
ConstLabels: prometheus.Labels{"ordinal": strconv.Itoa(i)},
}, func() float64 {
return float64(bufferSize - len(redisPubs))
})
}

stopOplogTail := make(chan bool)
waitGroup.Add(1)
Expand Down

0 comments on commit c32c6f7

Please sign in to comment.