Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add to-device event tests over federation; incl. connectivity tests #694

Merged
merged 4 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/docker/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ func (d *Deployment) Restart(t *testing.T) error {

func (d *Deployment) StartServer(t *testing.T, hsName string) {
t.Helper()
t.Logf("StartServer %s", hsName)
hsDep := d.HS[hsName]
if hsDep == nil {
t.Fatalf("StartServer: %s does not exist in this deployment", hsName)
Expand All @@ -257,6 +258,7 @@ func (d *Deployment) StartServer(t *testing.T, hsName string) {

func (d *Deployment) StopServer(t *testing.T, hsName string) {
t.Helper()
t.Logf("StopServer %s", hsName)
hsDep := d.HS[hsName]
if hsDep == nil {
t.Fatalf("StopServer: %s does not exist in this deployment", hsName)
Expand All @@ -268,6 +270,7 @@ func (d *Deployment) StopServer(t *testing.T, hsName string) {

func (d *Deployment) PauseServer(t *testing.T, hsName string) {
t.Helper()
t.Logf("PauseServer %s", hsName)
hsDep := d.HS[hsName]
if hsDep == nil {
t.Fatalf("PauseServer: %s does not exist in this deployment", hsName)
Expand All @@ -279,6 +282,7 @@ func (d *Deployment) PauseServer(t *testing.T, hsName string) {

func (d *Deployment) UnpauseServer(t *testing.T, hsName string) {
t.Helper()
t.Logf("UnpauseServer %s", hsName)
hsDep := d.HS[hsName]
if hsDep == nil {
t.Fatalf("UnpauseServer: %s does not exist in this deployment", hsName)
Expand Down
129 changes: 129 additions & 0 deletions tests/federation_to_device_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package tests

import (
"reflect"
"sync/atomic"
"testing"
"time"

"github.com/matrix-org/complement"
"github.com/matrix-org/complement/client"
"github.com/matrix-org/complement/helpers"
"github.com/tidwall/gjson"
)

// Test that to-device messages can go from one homeserver to another.
func TestToDeviceMessagesOverFederation(t *testing.T) {
deployment := complement.Deploy(t, 2)
defer deployment.Destroy(t)

testCases := []struct {
name string
makeUnreachable func(t *testing.T)
makeReachable func(t *testing.T)
}{
{
name: "good connectivity",
makeUnreachable: func(t *testing.T) {},
makeReachable: func(t *testing.T) {},
},
{
// cut networking but keep in-memory state
name: "interrupted connectivity",
makeUnreachable: func(t *testing.T) {
deployment.StopServer(t, "hs2")
},
makeReachable: func(t *testing.T) {
deployment.StartServer(t, "hs2")
},
},
{
// interesting because this nukes memory
name: "stopped server",
makeUnreachable: func(t *testing.T) {
deployment.StopServer(t, "hs2")
},
makeReachable: func(t *testing.T) {
// kick over the sending server first to see if the server
// remembers to resend on startup
deployment.StopServer(t, "hs1")
deployment.StartServer(t, "hs1")
// now make the receiving server reachable.
deployment.StartServer(t, "hs2")
},
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{
LocalpartSuffix: "alice",
})
bob := deployment.Register(t, "hs2", helpers.RegistrationOpts{
LocalpartSuffix: "bob",
})
// it might take a while for retries, so keep on syncing!
bob.SyncUntilTimeout = 30 * time.Second

_, bobSince := bob.MustSync(t, client.SyncReq{TimeoutMillis: "0"})

content := map[string]interface{}{
"my_key": "my_value",
}

tc.makeUnreachable(t)

alice.MustSendToDeviceMessages(t, "my.test.type", map[string]map[string]map[string]interface{}{
bob.UserID: {
bob.DeviceID: content,
},
})

checkEvent := func(result gjson.Result) bool {
if result.Get("type").Str != "my.test.type" {
return false
}

evContentRes := result.Get("content")

if !evContentRes.Exists() || !evContentRes.IsObject() {
return false
}

evContent := evContentRes.Value()

return reflect.DeepEqual(evContent, content)
}
// just in case the server returns 200 OK before flushing to disk, give it a grace period.
// This is too nice of us given in the real world no grace is provided..
time.Sleep(time.Second)

tc.makeReachable(t)

// servers may need to be poked with another to-device msg. This isn't great.
// See https://github.com/matrix-org/synapse/issues/16680
kegsay marked this conversation as resolved.
Show resolved Hide resolved
// bob has a sync timeout of 30s set, so if the test has not yet passed, we are kicking the server
// after 10s to ensure the server processes the previous sent to-device message.
var completed atomic.Bool
go func() {
time.Sleep(10 * time.Second)
if completed.Load() {
return
}
// maybe kicking the server will make things work if we're still waiting after 10s
alice.MustSendToDeviceMessages(t, "kick.type", map[string]map[string]map[string]interface{}{
bob.UserID: {
bob.DeviceID: content,
},
})
}()

bob.MustSyncUntil(t, client.SyncReq{Since: bobSince}, func(clientUserID string, topLevelSyncJSON gjson.Result) error {
t.Logf("%s", topLevelSyncJSON.Raw)
return client.SyncToDeviceHas(alice.UserID, checkEvent)(clientUserID, topLevelSyncJSON)
})
completed.Store(true)
})
}
}
Loading