Skip to content

Commit

Permalink
Oplogtoredis: Add configurable Write Parallelism (#70)
Browse files Browse the repository at this point in the history
This adds a new configuration option, OTR_WRITE_PARALLELISM. This will be an integer that controls how many parallel writer loops deliver the oplog messages to Redis. Each oplog message will be routed to one of the loops based on a SHA256 hash of its database name. The value defaults to 1 if unspecified, which means it parallelism is off by default (since a parallelism of 1 means a single writer, the same as currently).
  • Loading branch information
alex-goodisman authored Apr 25, 2024
1 parent 9b4148d commit 137014e
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 69 deletions.
11 changes: 10 additions & 1 deletion lib/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
package config

import (
"time"
"strings"
"time"

"github.com/kelseyhightower/envconfig"
)

Expand All @@ -21,6 +22,7 @@ type oplogtoredisConfiguration struct {
MongoConnectTimeout time.Duration `default:"10s" split_words:"true"`
MongoQueryTimeout time.Duration `default:"5s" split_words:"true"`
OplogV2ExtractSubfieldChanges bool `default:"false" envconfig:"OPLOG_V2_EXTRACT_SUBFIELD_CHANGES"`
WriteParallelism int `default:"1" split_words:"true"`
}

var globalConfig *oplogtoredisConfiguration
Expand Down Expand Up @@ -131,6 +133,13 @@ func OplogV2ExtractSubfieldChanges() bool {
return globalConfig.OplogV2ExtractSubfieldChanges
}

// WriteParallelism controls how many parallel write loops will be run (sharded based on a hash
// of the database name.) Each parallel loop has its own redis connection and internal buffer.
// Healthz endpoint will report fail if anyone of them dies.
func WriteParallelism() int {
return globalConfig.WriteParallelism
}

// ParseEnv parses the current environment variables and updates the stored
// configuration. It is *not* threadsafe, and should just be called once
// at the start of the program.
Expand Down
16 changes: 15 additions & 1 deletion lib/oplog/processor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package oplog

import (
"bytes"
"crypto/sha256"
"encoding/binary"
"encoding/json"
"strings"

Expand Down Expand Up @@ -78,6 +81,16 @@ func processOplogEntry(op *oplogEntry) (*redispub.Publication, error) {
return nil, errors.Wrap(err, "marshalling outgoing message")
}

hash := sha256.Sum256([]byte(op.Database))
intSlice := hash[len(hash)-8:]

var hashInt uint64

err = binary.Read(bytes.NewReader(intSlice), binary.LittleEndian, &hashInt)
if err != nil {
panic(errors.Wrap(err, "decoding database hash as uint64"))
}

// We need to publish on both the full-collection channel and the
// single-document channel
return &redispub.Publication{
Expand All @@ -92,7 +105,8 @@ func processOplogEntry(op *oplogEntry) (*redispub.Publication, error) {
Msg: msgJSON,
OplogTimestamp: op.Timestamp,

TxIdx: op.TxIdx,
TxIdx: op.TxIdx,
ParallelismKey: int(hashInt),
}, nil
}

Expand Down
22 changes: 16 additions & 6 deletions lib/oplog/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"go.mongodb.org/mongo-driver/bson"
)

// hash of the database name "foo" to be expected for ParallelismKey
const fooHash = -5843589418109203719

// nolint: gocyclo
func TestProcessOplogEntry(t *testing.T) {
// We can't compare raw publications because they contain JSON that can
Expand All @@ -25,9 +28,10 @@ func TestProcessOplogEntry(t *testing.T) {
Fields []string `json:"f"`
}
type decodedPublication struct {
Channels []string
Msg decodedPublicationMessage
OplogTimestamp primitive.Timestamp
Channels []string
Msg decodedPublicationMessage
OplogTimestamp primitive.Timestamp
ParallelismKey int
}

testObjectId, err := primitive.ObjectIDFromHex("deadbeefdeadbeefdeadbeef")
Expand Down Expand Up @@ -67,6 +71,7 @@ func TestProcessOplogEntry(t *testing.T) {
Fields: []string{"some"},
},
OplogTimestamp: primitive.Timestamp{T: 1234},
ParallelismKey: fooHash,
},
},
"Replacement update": {
Expand All @@ -92,6 +97,7 @@ func TestProcessOplogEntry(t *testing.T) {
Fields: []string{"some", "new"},
},
OplogTimestamp: primitive.Timestamp{T: 1234},
ParallelismKey: fooHash,
},
},
"Non-replacement update": {
Expand Down Expand Up @@ -123,6 +129,7 @@ func TestProcessOplogEntry(t *testing.T) {
Fields: []string{"a", "b", "c"},
},
OplogTimestamp: primitive.Timestamp{T: 1234},
ParallelismKey: fooHash,
},
},
"Delete": {
Expand All @@ -145,6 +152,7 @@ func TestProcessOplogEntry(t *testing.T) {
Fields: []string{},
},
OplogTimestamp: primitive.Timestamp{T: 1234},
ParallelismKey: fooHash,
},
},
"ObjectID id": {
Expand Down Expand Up @@ -172,6 +180,7 @@ func TestProcessOplogEntry(t *testing.T) {
Fields: []string{"some"},
},
OplogTimestamp: primitive.Timestamp{T: 1234},
ParallelismKey: fooHash,
},
},
"Unsupported id type": {
Expand Down Expand Up @@ -242,9 +251,10 @@ func TestProcessOplogEntry(t *testing.T) {
sort.Strings(msg.Fields)

return &decodedPublication{
Channels: pub.Channels,
Msg: msg,
OplogTimestamp: pub.OplogTimestamp,
Channels: pub.Channels,
Msg: msg,
OplogTimestamp: pub.OplogTimestamp,
ParallelismKey: pub.ParallelismKey,
}
}

Expand Down
15 changes: 9 additions & 6 deletions lib/oplog/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import (
// Tailer persistently tails the oplog of a Mongo cluster, handling
// reconnection and resumption of where it left off.
type Tailer struct {
MongoClient *mongo.Client
MongoClient *mongo.Client
RedisClients []redis.UniversalClient
RedisPrefix string
MaxCatchUp time.Duration
RedisPrefix string
MaxCatchUp time.Duration
}

// Raw oplog entry from Mongo
Expand Down Expand Up @@ -107,7 +107,7 @@ func init() {

// Tail begins tailing the oplog. It doesn't return unless it receives a message
// on the stop channel, in which case it wraps up its work and then returns.
func (tailer *Tailer) Tail(out chan<- *redispub.Publication, stop <-chan bool) {
func (tailer *Tailer) Tail(out []chan<- *redispub.Publication, stop <-chan bool) {
childStopC := make(chan bool)
wasStopped := false

Expand All @@ -131,7 +131,9 @@ func (tailer *Tailer) Tail(out chan<- *redispub.Publication, stop <-chan bool) {
}
}

func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan bool) {
func (tailer *Tailer) tailOnce(out []chan<- *redispub.Publication, stop <-chan bool) {
parallelismSize := len(out)

session, err := tailer.MongoClient.StartSession()
if err != nil {
log.Log.Errorw("Failed to start Mongo session", "error", err)
Expand Down Expand Up @@ -203,7 +205,8 @@ func (tailer *Tailer) tailOnce(out chan<- *redispub.Publication, stop <-chan boo

for _, pub := range pubs {
if pub != nil {
out <- pub
outIdx := (pub.ParallelismKey%parallelismSize + parallelismSize) % parallelismSize
out[outIdx] <- pub
} else {
log.Log.Error("Nil Redis publication")
}
Expand Down
4 changes: 4 additions & 0 deletions lib/redispub/publication.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ type Publication struct {

// TxIdx is the index of the operation within a transaction. Used to supplement OplogTimestamp in a transaction.
TxIdx uint

// ParallelismKey is a number representing which parallel write loop will process this message.
// It is a hash of the database name, assuming that a single database is the unit of ordering guarantee.
ParallelismKey int
}
132 changes: 77 additions & 55 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
stdlog "log"
"net/http"
"net/http/pprof"
Expand Down Expand Up @@ -53,74 +54,91 @@ func main() {
}()
log.Log.Info("Initialized connection to Mongo")

redisClients, err := createRedisClients()
if err != nil {
panic("Error initializing Redis client: " + err.Error())
}
defer func() {
for _, redisClient := range redisClients {
redisCloseErr := redisClient.Close()
if redisCloseErr != nil {
log.Log.Errorw("Error closing Redis client",
"error", redisCloseErr)
}
}
}()
log.Log.Info("Initialized connection to Redis")
parallelism := config.WriteParallelism()
aggregatedRedisClients := make([][]redis.UniversalClient, parallelism)
aggregatedRedisPubs := make([]chan<- *redispub.Publication, parallelism)
stopRedisPubs := make([]chan bool, parallelism)

// We crate two goroutines:
//
// The oplog.Tail goroutine reads messages from the oplog, and generates the
// messages that we need to write to redis. It then writes them to a
// buffered channel.
//
// The redispub.PublishStream goroutine reads messages from the buffered channel
// and sends them to Redis.
//
// TODO PERF: Use a leaky buffer (https://github.com/tulip/oplogtoredis/issues/2)
bufferSize := 10000
redisPubs := make(chan *redispub.Publication, bufferSize)
waitGroup := sync.WaitGroup{}

for i := 0; i < config.WriteParallelism(); i++ {
redisClients, err := createRedisClients()
if err != nil {
panic(fmt.Sprintf("[%d] Error initializing Redis client: %s", i, err.Error()))
}
defer func() {
for _, redisClient := range redisClients {
redisCloseErr := redisClient.Close()
if redisCloseErr != nil {
log.Log.Errorw("Error closing Redis client",
"error", redisCloseErr,
"i", i)
}
}
}()
log.Log.Infow("Initialized connection to Redis", "i", i)

aggregatedRedisClients[i] = redisClients

// We crate two goroutines:
//
// The oplog.Tail goroutine reads messages from the oplog, and generates the
// messages that we need to write to redis. It then writes them to a
// buffered channel.
//
// The redispub.PublishStream goroutine reads messages from the buffered channel
// and sends them to Redis.
//
// TODO PERF: Use a leaky buffer (https://github.com/tulip/oplogtoredis/issues/2)
redisPubs := make(chan *redispub.Publication, bufferSize)
aggregatedRedisPubs[i] = redisPubs

stopRedisPub := make(chan bool)
waitGroup.Add(1)
go func() {
redispub.PublishStream(redisClients, redisPubs, &redispub.PublishOpts{
FlushInterval: config.TimestampFlushInterval(),
DedupeExpiration: config.RedisDedupeExpiration(),
MetadataPrefix: config.RedisMetadataPrefix(),
}, stopRedisPub)
log.Log.Infow("Redis publisher completed", "i", i)
waitGroup.Done()
}()
log.Log.Info("Started up processing goroutines")
stopRedisPubs[i] = stopRedisPub
}

promauto.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "otr",
Name: "buffer_available",
Help: "Gauge indicating the available space in the buffer of oplog entries waiting to be written to redis.",
}, func () float64 {
return float64(bufferSize - len(redisPubs))
}, func() float64 {
total := 0
for _, redisPubs := range aggregatedRedisPubs {
total += (bufferSize - len(redisPubs))
}
return float64(total)
})

waitGroup := sync.WaitGroup{}

stopOplogTail := make(chan bool)
waitGroup.Add(1)
go func() {
tailer := oplog.Tailer{
MongoClient: mongoSession,
RedisClients: redisClients,
RedisPrefix: config.RedisMetadataPrefix(),
MaxCatchUp: config.MaxCatchUp(),
RedisClients: aggregatedRedisClients[0], // the tailer coroutine needs a redis client for determining start timestamp
// it doesn't really matter which one since this isn't a meaningful amount of load, so just take the first one
RedisPrefix: config.RedisMetadataPrefix(),
MaxCatchUp: config.MaxCatchUp(),
}
tailer.Tail(redisPubs, stopOplogTail)
tailer.Tail(aggregatedRedisPubs, stopOplogTail)

log.Log.Info("Oplog tailer completed")
waitGroup.Done()
}()

stopRedisPub := make(chan bool)
waitGroup.Add(1)
go func() {
redispub.PublishStream(redisClients, redisPubs, &redispub.PublishOpts{
FlushInterval: config.TimestampFlushInterval(),
DedupeExpiration: config.RedisDedupeExpiration(),
MetadataPrefix: config.RedisMetadataPrefix(),
}, stopRedisPub)
log.Log.Info("Redis publisher completed")
waitGroup.Done()
}()
log.Log.Info("Started up processing goroutines")

// Start one more goroutine for the HTTP server
httpServer := makeHTTPServer(redisClients, mongoSession)
httpServer := makeHTTPServer(aggregatedRedisClients, mongoSession)
go func() {
httpErr := httpServer.ListenAndServe()
if httpErr != nil {
Expand All @@ -147,7 +165,9 @@ func main() {
signal.Reset()

stopOplogTail <- true
stopRedisPub <- true
for _, stopRedisPub := range stopRedisPubs {
stopRedisPub <- true
}

err = httpServer.Shutdown(context.Background())
if err != nil {
Expand Down Expand Up @@ -226,17 +246,19 @@ func createRedisClients() ([]redis.UniversalClient, error) {
return ret, nil
}

func makeHTTPServer(clients []redis.UniversalClient, mongo *mongo.Client) *http.Server {
func makeHTTPServer(aggregatedClients [][]redis.UniversalClient, mongo *mongo.Client) *http.Server {
mux := http.NewServeMux()

mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
redisOK := true
for _, redis := range clients {
redisErr := redis.Ping(context.Background()).Err()
redisOK = (redisOK && (redisErr == nil))
if !redisOK {
log.Log.Errorw("Error connecting to Redis during healthz check",
"error", redisErr)
for _, clients := range aggregatedClients {
for _, redis := range clients {
redisErr := redis.Ping(context.Background()).Err()
redisOK = (redisOK && (redisErr == nil))
if !redisOK {
log.Log.Errorw("Error connecting to Redis during healthz check",
"error", redisErr)
}
}
}

Expand Down

0 comments on commit 137014e

Please sign in to comment.