From cf48adba6355d13ed37541d582f8696fe48b05d3 Mon Sep 17 00:00:00 2001 From: Kegan Dougal <=> Date: Thu, 23 Nov 2023 15:14:44 +0000 Subject: [PATCH 1/4] Add to-device event tests over federation; incl. connectivity tests The connectivity tests form part of complement-crypto's test suite, specifically: > If a server cannot send device list updates over federation, it retries. --- internal/docker/deployment.go | 4 ++ tests/federation_to_device_test.go | 95 ++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 tests/federation_to_device_test.go diff --git a/internal/docker/deployment.go b/internal/docker/deployment.go index f0e8a44d..bce78d28 100644 --- a/internal/docker/deployment.go +++ b/internal/docker/deployment.go @@ -246,6 +246,7 @@ func (d *Deployment) Restart(t *testing.T) error { func (d *Deployment) StartServer(t *testing.T, hsName string) { t.Helper() + t.Logf("StartServer %s", hsName) hsDep := d.HS[hsName] if hsDep == nil { t.Fatalf("StartServer: %s does not exist in this deployment", hsName) @@ -257,6 +258,7 @@ func (d *Deployment) StartServer(t *testing.T, hsName string) { func (d *Deployment) StopServer(t *testing.T, hsName string) { t.Helper() + t.Logf("StopServer %s", hsName) hsDep := d.HS[hsName] if hsDep == nil { t.Fatalf("StopServer: %s does not exist in this deployment", hsName) @@ -268,6 +270,7 @@ func (d *Deployment) StopServer(t *testing.T, hsName string) { func (d *Deployment) PauseServer(t *testing.T, hsName string) { t.Helper() + t.Logf("PauseServer %s", hsName) hsDep := d.HS[hsName] if hsDep == nil { t.Fatalf("PauseServer: %s does not exist in this deployment", hsName) @@ -279,6 +282,7 @@ func (d *Deployment) PauseServer(t *testing.T, hsName string) { func (d *Deployment) UnpauseServer(t *testing.T, hsName string) { t.Helper() + t.Logf("UnpauseServer %s", hsName) hsDep := d.HS[hsName] if hsDep == nil { t.Fatalf("UnpauseServer: %s does not exist in this deployment", hsName) diff --git a/tests/federation_to_device_test.go b/tests/federation_to_device_test.go new file mode 100644 index 00000000..92628f4d --- /dev/null +++ b/tests/federation_to_device_test.go @@ -0,0 +1,95 @@ +package tests + +import ( + "reflect" + "testing" + + "github.com/matrix-org/complement" + "github.com/matrix-org/complement/client" + "github.com/matrix-org/complement/helpers" + "github.com/tidwall/gjson" +) + +// Test that to-device messages can go from one homeserver to another. +func TestToDeviceMessagesOverFederation(t *testing.T) { + deployment := complement.Deploy(t, 2) + defer deployment.Destroy(t) + + testCases := []struct { + name string + makeUnreachable func(t *testing.T) + makeReachable func(t *testing.T) + }{ + { + name: "good connectivity", + makeUnreachable: func(t *testing.T) {}, + makeReachable: func(t *testing.T) {}, + }, + { + // cut networking but keep in-memory state + name: "interrupted connectivity", + makeUnreachable: func(t *testing.T) { + deployment.PauseServer(t, "hs2") + }, + makeReachable: func(t *testing.T) { + deployment.UnpauseServer(t, "hs2") + }, + }, + { + // interesting because this nukes memory + name: "stopped server", + makeUnreachable: func(t *testing.T) { + deployment.StopServer(t, "hs2") + }, + makeReachable: func(t *testing.T) { + deployment.StartServer(t, "hs2") + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{ + LocalpartSuffix: "alice", + }) + bob := deployment.Register(t, "hs2", helpers.RegistrationOpts{ + LocalpartSuffix: "bob", + }) + + _, bobSince := bob.MustSync(t, client.SyncReq{TimeoutMillis: "0"}) + + content := map[string]interface{}{ + "my_key": "my_value", + } + + tc.makeUnreachable(t) + + alice.MustSendToDeviceMessages(t, "my.test.type", map[string]map[string]map[string]interface{}{ + bob.UserID: { + bob.DeviceID: content, + }, + }) + + checkEvent := func(result gjson.Result) bool { + if result.Get("type").Str != "my.test.type" { + return false + } + + evContentRes := result.Get("content") + + if !evContentRes.Exists() || !evContentRes.IsObject() { + return false + } + + evContent := evContentRes.Value() + + return reflect.DeepEqual(evContent, content) + } + + tc.makeReachable(t) + + bob.MustSyncUntil(t, client.SyncReq{Since: bobSince}, client.SyncToDeviceHas(alice.UserID, checkEvent)) + }) + } +} From de60903886f51fc4a8e442281bcb35b0b3b5f728 Mon Sep 17 00:00:00 2001 From: Kegan Dougal <=> Date: Thu, 23 Nov 2023 16:47:07 +0000 Subject: [PATCH 2/4] Test we retry on startup --- tests/federation_to_device_test.go | 36 +++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/tests/federation_to_device_test.go b/tests/federation_to_device_test.go index 92628f4d..c0bbb3b9 100644 --- a/tests/federation_to_device_test.go +++ b/tests/federation_to_device_test.go @@ -2,7 +2,9 @@ package tests import ( "reflect" + "sync/atomic" "testing" + "time" "github.com/matrix-org/complement" "github.com/matrix-org/complement/client" @@ -29,10 +31,10 @@ func TestToDeviceMessagesOverFederation(t *testing.T) { // cut networking but keep in-memory state name: "interrupted connectivity", makeUnreachable: func(t *testing.T) { - deployment.PauseServer(t, "hs2") + deployment.StopServer(t, "hs2") }, makeReachable: func(t *testing.T) { - deployment.UnpauseServer(t, "hs2") + deployment.StartServer(t, "hs2") }, }, { @@ -42,6 +44,11 @@ func TestToDeviceMessagesOverFederation(t *testing.T) { deployment.StopServer(t, "hs2") }, makeReachable: func(t *testing.T) { + // kick over the sending server first to see if the server + // remembers to resend on startup + deployment.StopServer(t, "hs1") + deployment.StartServer(t, "hs1") + // now make the receiving server reachable. deployment.StartServer(t, "hs2") }, }, @@ -56,6 +63,8 @@ func TestToDeviceMessagesOverFederation(t *testing.T) { bob := deployment.Register(t, "hs2", helpers.RegistrationOpts{ LocalpartSuffix: "bob", }) + // it might take a while for retries, so keep on syncing! + bob.SyncUntilTimeout = 30 * time.Second _, bobSince := bob.MustSync(t, client.SyncReq{TimeoutMillis: "0"}) @@ -86,10 +95,31 @@ func TestToDeviceMessagesOverFederation(t *testing.T) { return reflect.DeepEqual(evContent, content) } + // just in case the server returns 200 OK before flushing to disk, give it a grace period. + // This is too nice of us given in the real world no grace is provided.. + time.Sleep(time.Second) tc.makeReachable(t) - bob.MustSyncUntil(t, client.SyncReq{Since: bobSince}, client.SyncToDeviceHas(alice.UserID, checkEvent)) + var completed atomic.Bool + go func() { + time.Sleep(10 * time.Second) + if completed.Load() { + return + } + // maybe kicking the server will make things work if we're still waiting after 10s + alice.MustSendToDeviceMessages(t, "kick.type", map[string]map[string]map[string]interface{}{ + bob.UserID: { + bob.DeviceID: content, + }, + }) + }() + + bob.MustSyncUntil(t, client.SyncReq{Since: bobSince}, func(clientUserID string, topLevelSyncJSON gjson.Result) error { + t.Logf("%s", topLevelSyncJSON.Raw) + return client.SyncToDeviceHas(alice.UserID, checkEvent)(clientUserID, topLevelSyncJSON) + }) + completed.Store(true) }) } } From ddc17d6f09737dee56c918a858742796f15689c1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal <=> Date: Fri, 24 Nov 2023 17:10:46 +0000 Subject: [PATCH 3/4] Comment why we need to poke --- tests/federation_to_device_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/federation_to_device_test.go b/tests/federation_to_device_test.go index c0bbb3b9..ed8177ae 100644 --- a/tests/federation_to_device_test.go +++ b/tests/federation_to_device_test.go @@ -101,6 +101,8 @@ func TestToDeviceMessagesOverFederation(t *testing.T) { tc.makeReachable(t) + // servers may need to be poked with another to-device msg. This isn't great. + // See https://github.com/matrix-org/synapse/issues/16680 var completed atomic.Bool go func() { time.Sleep(10 * time.Second) From 96055c9a5b1b8333c08aab987c65b95bf886c4aa Mon Sep 17 00:00:00 2001 From: kegsay <7190048+kegsay@users.noreply.github.com> Date: Mon, 27 Nov 2023 16:07:41 +0000 Subject: [PATCH 4/4] Update tests/federation_to_device_test.go Co-authored-by: Till <2353100+S7evinK@users.noreply.github.com> --- tests/federation_to_device_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/federation_to_device_test.go b/tests/federation_to_device_test.go index ed8177ae..eae36020 100644 --- a/tests/federation_to_device_test.go +++ b/tests/federation_to_device_test.go @@ -103,6 +103,8 @@ func TestToDeviceMessagesOverFederation(t *testing.T) { // servers may need to be poked with another to-device msg. This isn't great. // See https://github.com/matrix-org/synapse/issues/16680 + // bob has a sync timeout of 30s set, so if the test has not yet passed, we are kicking the server + // after 10s to ensure the server processes the previous sent to-device message. var completed atomic.Bool go func() { time.Sleep(10 * time.Second)