Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: parallelism plus denylist #71

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ buildGoModule {
'';

# update: set value to an empty string and run `nix build`. This will download Go, fetch the dependencies and calculates their hash.
vendorHash = "sha256-ceToA2DC1bhmg9WIeNSAfoNoU7sk9PrQqgqt5UbpivQ=";
vendorHash = "sha256-Vh7O0iMPG6nAvcyv92h5TVZS2awnR0vz75apyzJeu4c=";

nativeBuildInputs = [ installShellFiles ];
doCheck = false;
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/deckarep/golang-set v1.7.1
github.com/go-redis/redis/v7 v7.4.1
github.com/go-redis/redis/v8 v8.11.5
github.com/gorilla/websocket v1.4.2
github.com/juju/mgo/v2 v2.0.0-20210302023703-70d5d206e208
github.com/juju/replicaset v0.0.0-20210302050932-0303c8575745
Expand All @@ -29,10 +30,10 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/golang/protobuf v1.4.3 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/gomodule/redigo v1.8.5 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/juju/clock v0.0.0-20190205081909-9c5c9712527c // indirect
github.com/juju/errors v0.0.0-20200330140219-3fe23663418f // indirect
github.com/juju/loggo v0.0.0-20200526014432-9ce3a2e09b5e // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
Expand Down
88 changes: 88 additions & 0 deletions integration-tests/acceptance/denylist_http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package main

import (
"bytes"
"encoding/json"
"io"
"net/http"
"os"
"reflect"
"testing"
)

func doRequest(method string, path string, t *testing.T, expectedCode int) interface{} {
req, err := http.NewRequest(method, os.Getenv("OTR_URL")+path, &bytes.Buffer{})
if err != nil {
t.Fatalf("Error creating req: %s", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := (&http.Client{}).Do(req)
if err != nil {
t.Fatalf("Error sending request: %s", err)
}

defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Error eceiving response body: %s", err)
}

if resp.StatusCode != expectedCode {
t.Fatalf("Expected status code %d, but got %d.\nBody was: %s", expectedCode, resp.StatusCode, respBody)
}

if expectedCode == 200 {
var data interface{}
err = json.Unmarshal(respBody, &data)
if err != nil {
t.Fatalf("Error parsing JSON response: %s", err)
}

return data
}
return nil
}

// Test the /denylist HTTP operations
func TestDenyList(t *testing.T) {
// GET empty list of rules
data := doRequest("GET", "/denylist", t, 200)
if !reflect.DeepEqual(data, []interface{}{}) {
t.Fatalf("Expected empty list from blank GET, but got %#v", data)
}
// PUT new rule
doRequest("PUT", "/denylist/abc", t, 201)
// GET list with new rule in it
data = doRequest("GET", "/denylist", t, 200)
if !reflect.DeepEqual(data, []interface{}{"abc"}) {
t.Fatalf("Expected singleton from GET, but got %#v", data)
}
// GET existing rule
data = doRequest("GET", "/denylist/abc", t, 200)
if !reflect.DeepEqual(data, "abc") {
t.Fatalf("Expected matched body from GET, but got %#v", data)
}
// PUT second rule
doRequest("PUT", "/denylist/def", t, 201)
// GET second rule
data = doRequest("GET", "/denylist/def", t, 200)
if !reflect.DeepEqual(data, "def") {
t.Fatalf("Expected matched body from GET, but got %#v", data)
}
// GET list with both rules
data = doRequest("GET", "/denylist", t, 200)
// check both permutations, in case the server reordered them
if !reflect.DeepEqual(data, []interface{}{"abc", "def"}) && !reflect.DeepEqual(data, []interface{}{"def", "abc"}) {
t.Fatalf("Expected doubleton from GET, but got %#v", data)
}
// DELETE first rule
doRequest("DELETE", "/denylist/abc", t, 204)
// GET first rule
doRequest("GET", "/denylist/abc", t, 404)
// GET list with only second rule
data = doRequest("GET", "/denylist", t, 200)
if !reflect.DeepEqual(data, []interface{}{"def"}) {
t.Fatalf("Expected singleton from GET, but got %#V", data)
}
}
72 changes: 72 additions & 0 deletions integration-tests/acceptance/denylist_oplog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package main

import (
"context"
"testing"

"github.com/tulip/oplogtoredis/integration-tests/helpers"
"go.mongodb.org/mongo-driver/bson"
)

func TestDenyOplog(t *testing.T) {
harness := startHarness()
defer harness.stop()

_, err := harness.mongoClient.Collection("Foo").InsertOne(context.Background(), bson.M{
"_id": "id1",
"f": "1",
})
if err != nil {
panic(err)
}

expectedMessage1 := helpers.OTRMessage{
Event: "i",
Document: map[string]interface{}{
"_id": "id1",
},
Fields: []string{"_id", "f"},
}

harness.verify(t, map[string][]helpers.OTRMessage{
"tests.Foo": {expectedMessage1},
"tests.Foo::id1": {expectedMessage1},
})

doRequest("PUT", "/denylist/tests", t, 201)

_, err = harness.mongoClient.Collection("Foo").InsertOne(context.Background(), bson.M{
"_id": "id2",
"g": "2",
})
if err != nil {
panic(err)
}

// second message should not have been received, since it got denied
harness.verify(t, map[string][]helpers.OTRMessage{})

doRequest("DELETE", "/denylist/tests", t, 204)

_, err = harness.mongoClient.Collection("Foo").InsertOne(context.Background(), bson.M{
"_id": "id3",
"h": "3",
})
if err != nil {
panic(err)
}

expectedMessage3 := helpers.OTRMessage{
Event: "i",
Document: map[string]interface{}{
"_id": "id3",
},
Fields: []string{"_id", "h"},
}

// back to normal now that the deny rule is gone
harness.verify(t, map[string][]helpers.OTRMessage{
"tests.Foo": {expectedMessage3},
"tests.Foo::id3": {expectedMessage3},
})
}
11 changes: 10 additions & 1 deletion lib/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
package config

import (
"time"
"strings"
"time"

"github.com/kelseyhightower/envconfig"
)

Expand All @@ -21,6 +22,7 @@ type oplogtoredisConfiguration struct {
MongoConnectTimeout time.Duration `default:"10s" split_words:"true"`
MongoQueryTimeout time.Duration `default:"5s" split_words:"true"`
OplogV2ExtractSubfieldChanges bool `default:"false" envconfig:"OPLOG_V2_EXTRACT_SUBFIELD_CHANGES"`
WriteParallelism int `default:"1" split_words:"true"`
}

var globalConfig *oplogtoredisConfiguration
Expand Down Expand Up @@ -131,6 +133,13 @@ func OplogV2ExtractSubfieldChanges() bool {
return globalConfig.OplogV2ExtractSubfieldChanges
}

// WriteParallelism controls how many parallel write loops will be run (sharded based on a hash
// of the database name.) Each parallel loop has its own redis connection and internal buffer.
// Healthz endpoint will report fail if anyone of them dies.
func WriteParallelism() int {
return globalConfig.WriteParallelism
}

// ParseEnv parses the current environment variables and updates the stored
// configuration. It is *not* threadsafe, and should just be called once
// at the start of the program.
Expand Down
97 changes: 97 additions & 0 deletions lib/denylist/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package denylist

import (
"encoding/json"
"net/http"
"sync"
)

// CollectionEndpoint serves the endpoints for the whole Denylist at /denylist
func CollectionEndpoint(denylist *sync.Map) func(http.ResponseWriter, *http.Request) {
return func(response http.ResponseWriter, request *http.Request) {
switch request.Method {
case "GET":
listDenylistKeys(response, denylist)
default:
http.Error(response, http.StatusText(http.StatusNotFound), http.StatusNotFound)
}
}
}

// SingleEndpoint serves the endpoints for particular Denylist entries at /denylist/...
func SingleEndpoint(denylist *sync.Map) func(http.ResponseWriter, *http.Request) {
return func(response http.ResponseWriter, request *http.Request) {
switch request.Method {
case "GET":
getDenylistEntry(response, request, denylist)
case "PUT":
createDenylistEntry(response, request, denylist)
case "DELETE":
deleteDenylistEntry(response, request, denylist)
default:
http.Error(response, http.StatusText(http.StatusNotFound), http.StatusNotFound)
}
}
}

// GET /denylist
func listDenylistKeys(response http.ResponseWriter, denylist *sync.Map) {
keys := []interface{}{}

denylist.Range(func(key interface{}, value interface{}) bool {
keys = append(keys, key)
return true
})

response.Header().Set("Content-Type", "application/json")
response.WriteHeader(http.StatusOK)
err := json.NewEncoder(response).Encode(keys)
if err != nil {
http.Error(response, "couldn't encode result", http.StatusInternalServerError)
return
}
}

// GET /denylist/...
func getDenylistEntry(response http.ResponseWriter, request *http.Request, denylist *sync.Map) {
id := request.URL.Path
_, exists := denylist.Load(id)
if !exists {
http.Error(response, "denylist entry not found with that id", http.StatusNotFound)
return
}

response.Header().Set("Content-Type", "application/json")
response.WriteHeader(http.StatusOK)
err := json.NewEncoder(response).Encode(id)
if err != nil {
http.Error(response, "couldn't encode result", http.StatusInternalServerError)
return
}
}

// PUT /denylist/...
func createDenylistEntry(response http.ResponseWriter, request *http.Request, denylist *sync.Map) {
id := request.URL.Path
_, exists := denylist.Load(id)
if exists {
response.WriteHeader(http.StatusNoContent)
return
}

denylist.Store(id, true)
response.WriteHeader(http.StatusCreated)
}

// DELETE /denylist/...
func deleteDenylistEntry(response http.ResponseWriter, request *http.Request, denylist *sync.Map) {
id := request.URL.Path
_, exists := denylist.Load(id)
if !exists {
http.Error(response, "denylist entry not found with that id", http.StatusNotFound)
return
}

denylist.Delete(id)
response.WriteHeader(http.StatusNoContent)
}
16 changes: 15 additions & 1 deletion lib/oplog/processor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package oplog

import (
"bytes"
"crypto/sha256"
"encoding/binary"
"encoding/json"
"strings"

Expand Down Expand Up @@ -78,6 +81,16 @@ func processOplogEntry(op *oplogEntry) (*redispub.Publication, error) {
return nil, errors.Wrap(err, "marshalling outgoing message")
}

hash := sha256.Sum256([]byte(op.Database))
intSlice := hash[len(hash)-8:]

var hashInt uint64

err = binary.Read(bytes.NewReader(intSlice), binary.LittleEndian, &hashInt)
if err != nil {
panic(errors.Wrap(err, "decoding database hash as uint64"))
}

// We need to publish on both the full-collection channel and the
// single-document channel
return &redispub.Publication{
Expand All @@ -92,7 +105,8 @@ func processOplogEntry(op *oplogEntry) (*redispub.Publication, error) {
Msg: msgJSON,
OplogTimestamp: op.Timestamp,

TxIdx: op.TxIdx,
TxIdx: op.TxIdx,
ParallelismKey: int(hashInt),
}, nil
}

Expand Down
Loading
Loading