diff --git a/internal/docker/deployment.go b/internal/docker/deployment.go index f0e8a44d..bce78d28 100644 --- a/internal/docker/deployment.go +++ b/internal/docker/deployment.go @@ -246,6 +246,7 @@ func (d *Deployment) Restart(t *testing.T) error { func (d *Deployment) StartServer(t *testing.T, hsName string) { t.Helper() + t.Logf("StartServer %s", hsName) hsDep := d.HS[hsName] if hsDep == nil { t.Fatalf("StartServer: %s does not exist in this deployment", hsName) @@ -257,6 +258,7 @@ func (d *Deployment) StartServer(t *testing.T, hsName string) { func (d *Deployment) StopServer(t *testing.T, hsName string) { t.Helper() + t.Logf("StopServer %s", hsName) hsDep := d.HS[hsName] if hsDep == nil { t.Fatalf("StopServer: %s does not exist in this deployment", hsName) @@ -268,6 +270,7 @@ func (d *Deployment) StopServer(t *testing.T, hsName string) { func (d *Deployment) PauseServer(t *testing.T, hsName string) { t.Helper() + t.Logf("PauseServer %s", hsName) hsDep := d.HS[hsName] if hsDep == nil { t.Fatalf("PauseServer: %s does not exist in this deployment", hsName) @@ -279,6 +282,7 @@ func (d *Deployment) PauseServer(t *testing.T, hsName string) { func (d *Deployment) UnpauseServer(t *testing.T, hsName string) { t.Helper() + t.Logf("UnpauseServer %s", hsName) hsDep := d.HS[hsName] if hsDep == nil { t.Fatalf("UnpauseServer: %s does not exist in this deployment", hsName) diff --git a/tests/federation_to_device_test.go b/tests/federation_to_device_test.go new file mode 100644 index 00000000..eae36020 --- /dev/null +++ b/tests/federation_to_device_test.go @@ -0,0 +1,129 @@ +package tests + +import ( + "reflect" + "sync/atomic" + "testing" + "time" + + "github.com/matrix-org/complement" + "github.com/matrix-org/complement/client" + "github.com/matrix-org/complement/helpers" + "github.com/tidwall/gjson" +) + +// Test that to-device messages can go from one homeserver to another. +func TestToDeviceMessagesOverFederation(t *testing.T) { + deployment := complement.Deploy(t, 2) + defer deployment.Destroy(t) + + testCases := []struct { + name string + makeUnreachable func(t *testing.T) + makeReachable func(t *testing.T) + }{ + { + name: "good connectivity", + makeUnreachable: func(t *testing.T) {}, + makeReachable: func(t *testing.T) {}, + }, + { + // cut networking but keep in-memory state + name: "interrupted connectivity", + makeUnreachable: func(t *testing.T) { + deployment.StopServer(t, "hs2") + }, + makeReachable: func(t *testing.T) { + deployment.StartServer(t, "hs2") + }, + }, + { + // interesting because this nukes memory + name: "stopped server", + makeUnreachable: func(t *testing.T) { + deployment.StopServer(t, "hs2") + }, + makeReachable: func(t *testing.T) { + // kick over the sending server first to see if the server + // remembers to resend on startup + deployment.StopServer(t, "hs1") + deployment.StartServer(t, "hs1") + // now make the receiving server reachable. + deployment.StartServer(t, "hs2") + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{ + LocalpartSuffix: "alice", + }) + bob := deployment.Register(t, "hs2", helpers.RegistrationOpts{ + LocalpartSuffix: "bob", + }) + // it might take a while for retries, so keep on syncing! + bob.SyncUntilTimeout = 30 * time.Second + + _, bobSince := bob.MustSync(t, client.SyncReq{TimeoutMillis: "0"}) + + content := map[string]interface{}{ + "my_key": "my_value", + } + + tc.makeUnreachable(t) + + alice.MustSendToDeviceMessages(t, "my.test.type", map[string]map[string]map[string]interface{}{ + bob.UserID: { + bob.DeviceID: content, + }, + }) + + checkEvent := func(result gjson.Result) bool { + if result.Get("type").Str != "my.test.type" { + return false + } + + evContentRes := result.Get("content") + + if !evContentRes.Exists() || !evContentRes.IsObject() { + return false + } + + evContent := evContentRes.Value() + + return reflect.DeepEqual(evContent, content) + } + // just in case the server returns 200 OK before flushing to disk, give it a grace period. + // This is too nice of us given in the real world no grace is provided.. + time.Sleep(time.Second) + + tc.makeReachable(t) + + // servers may need to be poked with another to-device msg. This isn't great. + // See https://github.com/matrix-org/synapse/issues/16680 + // bob has a sync timeout of 30s set, so if the test has not yet passed, we are kicking the server + // after 10s to ensure the server processes the previous sent to-device message. + var completed atomic.Bool + go func() { + time.Sleep(10 * time.Second) + if completed.Load() { + return + } + // maybe kicking the server will make things work if we're still waiting after 10s + alice.MustSendToDeviceMessages(t, "kick.type", map[string]map[string]map[string]interface{}{ + bob.UserID: { + bob.DeviceID: content, + }, + }) + }() + + bob.MustSyncUntil(t, client.SyncReq{Since: bobSince}, func(clientUserID string, topLevelSyncJSON gjson.Result) error { + t.Logf("%s", topLevelSyncJSON.Raw) + return client.SyncToDeviceHas(alice.UserID, checkEvent)(clientUserID, topLevelSyncJSON) + }) + completed.Store(true) + }) + } +}