Skip to content

Commit

Permalink
Fix #192: ignore unseen old events
Browse files Browse the repository at this point in the history
  • Loading branch information
kegsay committed Jul 11, 2023
1 parent ea25b81 commit e947612
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 108 deletions.
125 changes: 98 additions & 27 deletions state/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,34 +293,20 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia
// - Else it creates a new room state snapshot if the timeline contains state events (as this now represents the current state)
// - It adds entries to the membership log for membership events.
func (a *Accumulator) Accumulate(txn *sqlx.Tx, roomID string, prevBatch string, timeline []json.RawMessage) (numNew int, timelineNIDs []int64, err error) {
// Insert the events. Check for duplicates which can happen in the real world when joining
// Matrix HQ on Synapse.
dedupedEvents := make([]Event, 0, len(timeline))
seenEvents := make(map[string]struct{})
for i := range timeline {
e := Event{
JSON: timeline[i],
RoomID: roomID,
}
if err := e.ensureFieldsSetOnEvent(); err != nil {
return 0, nil, fmt.Errorf("event malformed: %s", err)
}
if _, ok := seenEvents[e.ID]; ok {
logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Msg(
"Accumulator.Accumulate: seen the same event ID twice, ignoring",
)
continue
}
if i == 0 && prevBatch != "" {
// tag the first timeline event with the prev batch token
e.PrevBatch = sql.NullString{
String: prevBatch,
Valid: true,
}
}
dedupedEvents = append(dedupedEvents, e)
seenEvents[e.ID] = struct{}{}
// The first stage of accumulating events is mostly around validation around what the upstream HS sends us. For accumulation to work correctly
// we expect:
// - there to be no duplicate events
// - if there are new events, they are always new.
// Both of these assumptions can be false for different reasons
dedupedEvents, err := a.filterAndParseTimelineEvents(txn, roomID, timeline, prevBatch)
if err != nil {
err = fmt.Errorf("filterTimelineEvents: %w", err)
return
}
if len(dedupedEvents) == 0 {
return 0, nil, err // nothing to do
}

eventIDToNID, err := a.eventsTable.Insert(txn, dedupedEvents, false)
if err != nil {
return 0, nil, err
Expand Down Expand Up @@ -413,6 +399,91 @@ func (a *Accumulator) Accumulate(txn *sqlx.Tx, roomID string, prevBatch string,
return numNew, timelineNIDs, nil
}

// filterAndParseTimelineEvents takes a raw timeline array from sync v2 and applies sanity to it:
// - removes duplicate events: this is just a bug which has been seen on Synapse on matrix.org
// - removes old events: this is an edge case when joining rooms over federation, see https://github.com/matrix-org/sliding-sync/issues/192
// - parses it and returns Event structs.
// - check which events are unknown. If all events are known, filter them all out.
func (a *Accumulator) filterAndParseTimelineEvents(txn *sqlx.Tx, roomID string, timeline []json.RawMessage, prevBatch string) ([]Event, error) {
// Check for duplicates which can happen in the real world when joining
// Matrix HQ on Synapse, as well as when you join rooms for the first time over federation.
dedupedEvents := make([]Event, 0, len(timeline))
seenEvents := make(map[string]struct{})
for i := range timeline {
e := Event{
JSON: timeline[i],
RoomID: roomID,
}
if err := e.ensureFieldsSetOnEvent(); err != nil {
return nil, fmt.Errorf("event malformed: %s", err)
}
if _, ok := seenEvents[e.ID]; ok {
logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Msg(
"Accumulator.filterAndParseTimelineEvents: seen the same event ID twice, ignoring",
)
continue
}
if i == 0 && prevBatch != "" {
// tag the first timeline event with the prev batch token
e.PrevBatch = sql.NullString{
String: prevBatch,
Valid: true,
}
}
dedupedEvents = append(dedupedEvents, e)
seenEvents[e.ID] = struct{}{}
}

// if we only have a single timeline event we cannot determine if it is old or not, as we rely on already seen events
// being after (higher index) than it.
if len(dedupedEvents) <= 1 {
return dedupedEvents, nil
}

// Figure out which of these events are unseen and hence brand new live events.
// In some cases, we may have unseen OLD events - see https://github.com/matrix-org/sliding-sync/issues/192
// in which case we need to drop those events.
dedupedEventIDs := make([]string, 0, len(seenEvents))
for evID := range seenEvents {
dedupedEventIDs = append(dedupedEventIDs, evID)
}
unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, dedupedEventIDs)
if err != nil {
return nil, fmt.Errorf("filterAndParseTimelineEvents: failed to SelectUnknownEventIDs: %w", err)
}

if len(unknownEventIDs) == 0 {
// every event has been seen already, no work to do
return nil, nil
}

// In the happy case, we expect to see timeline arrays like this: (SEEN=S, UNSEEN=U)
// [S,S,U,U] -> want last 2
// [U,U,U] -> want all
// In the backfill edge case, we might see:
// [U,S,S,S] -> want none
// [U,S,S,U] -> want last 1
// We should never see scenarios like:
// [U,S,S,U,S,S] <- we should only see 1 contiguous block of seen events.
// If we do, we'll just ignore all unseen events less than the highest seen event.

// The algorithm starts at the end and just looks for the first S event, returning the subslice after that S event (which may be [])
seenIndex := -1
for i := len(dedupedEvents) - 1; i >= 0; i-- {
_, unseen := unknownEventIDs[dedupedEvents[i].ID]
if !unseen {
seenIndex = i
break
}
}
// seenIndex can be -1 if all are unseen, or len-1 if all are seen, either way if we +1 this slices correctly:
// no seen events s[A,B,C] => s[-1+1:] => [A,B,C]
// C is seen event s[A,B,C] => s[2+1:] => []
// B is seen event s[A,B,C] => s[1+1:] => [C]
// A is seen event s[A,B,C] => s[0+1:] => [B,C]
return dedupedEvents[seenIndex+1:], nil
}

// Delta returns a list of events of at most `limit` for the room not including `lastEventNID`.
// Returns the latest NID of the last event (most recent)
func (a *Accumulator) Delta(roomID string, lastEventNID int64, limit int) (eventsJSON []json.RawMessage, latest int64, err error) {
Expand Down
81 changes: 0 additions & 81 deletions state/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/jmoiron/sqlx"
"github.com/matrix-org/sliding-sync/sqlutil"
"github.com/matrix-org/sliding-sync/sync2"
"github.com/matrix-org/sliding-sync/testutils"
"github.com/tidwall/gjson"
)

Expand Down Expand Up @@ -417,86 +416,6 @@ func TestAccumulatorDupeEvents(t *testing.T) {
}
}

// Regression test for corrupt state snapshots.
// This seems to have happened in the wild, whereby the snapshot exhibited 2 things:
// - A message event having a event_replaces_nid. This should be impossible as messages are not state.
// - Duplicate events in the state snapshot.
//
// We can reproduce a message event having a event_replaces_nid by doing the following:
// - Create a room with initial state A,C
// - Accumulate events D, A, B(msg). This should be impossible because we already got A initially but whatever, roll with it, blame state resets or something.
// - This leads to A,B being processed and D ignored if you just take the newest results.
//
// This can then be tested by:
// - Query the current room snapshot. This will include B(msg) when it shouldn't.
func TestAccumulatorMisorderedGraceful(t *testing.T) {
alice := "@alice:localhost"
bob := "@bob:localhost"

eventA := testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "join"})
eventC := testutils.NewStateEvent(t, "m.room.create", "", alice, map[string]interface{}{})
eventD := testutils.NewStateEvent(
t, "m.room.member", bob, "join", map[string]interface{}{"membership": "join"},
)
eventBMsg := testutils.NewEvent(
t, "m.room.message", bob, map[string]interface{}{"body": "hello"},
)
t.Logf("A=member-alice, B=msg, C=create, D=member-bob")

db, close := connectToDB(t)
defer close()
accumulator := NewAccumulator(db)
roomID := "!TestAccumulatorStateReset:localhost"
// Create a room with initial state A,C
_, err := accumulator.Initialise(roomID, []json.RawMessage{
eventA, eventC,
})
if err != nil {
t.Fatalf("failed to Initialise accumulator: %s", err)
}

// Accumulate events D, A, B(msg).
err = sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error {
_, _, err = accumulator.Accumulate(txn, roomID, "", []json.RawMessage{eventD, eventA, eventBMsg})
return err
})
if err != nil {
t.Fatalf("failed to Accumulate: %s", err)
}

eventIDs := []string{
gjson.GetBytes(eventA, "event_id").Str,
gjson.GetBytes(eventBMsg, "event_id").Str,
gjson.GetBytes(eventC, "event_id").Str,
gjson.GetBytes(eventD, "event_id").Str,
}
t.Logf("Events A,B,C,D: %v", eventIDs)
txn := accumulator.db.MustBeginTx(context.Background(), nil)
idsToNIDs, err := accumulator.eventsTable.SelectNIDsByIDs(txn, eventIDs)
if err != nil {
t.Fatalf("Failed to SelectNIDsByIDs: %s", err)
}
if len(idsToNIDs) != len(eventIDs) {
t.Errorf("SelectNIDsByIDs: asked for %v got %v", eventIDs, idsToNIDs)
}
t.Logf("Events: %v", idsToNIDs)

wantEventNIDs := []int64{
idsToNIDs[eventIDs[0]], idsToNIDs[eventIDs[2]], idsToNIDs[eventIDs[3]],
}
sort.Slice(wantEventNIDs, func(i, j int) bool {
return wantEventNIDs[i] < wantEventNIDs[j]
})
// Query the current room snapshot
gotSnapshotEvents := currentSnapshotNIDs(t, accumulator.snapshotTable, roomID)
if len(gotSnapshotEvents) != len(wantEventNIDs) { // events A,C,D
t.Errorf("corrupt snapshot, got %v want %v", gotSnapshotEvents, wantEventNIDs)
}
if !reflect.DeepEqual(wantEventNIDs, gotSnapshotEvents) {
t.Errorf("got %v want %v", gotSnapshotEvents, wantEventNIDs)
}
}

// Regression test for corrupt state snapshots.
// This seems to have happened in the wild, whereby the snapshot exhibited 2 things:
// - A message event having a event_replaces_nid. This should be impossible as messages are not state.
Expand Down
112 changes: 112 additions & 0 deletions tests-integration/regressions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package syncv3

import (
"encoding/json"
"testing"
"time"

"github.com/matrix-org/sliding-sync/sync2"
"github.com/matrix-org/sliding-sync/sync3"
"github.com/matrix-org/sliding-sync/testutils"
"github.com/matrix-org/sliding-sync/testutils/m"
)

// catch all file for any kind of regression test which doesn't fall into a unique category

// Regression test for https://github.com/matrix-org/sliding-sync/issues/192
// - Bob on his server invites Alice to a room.
// - Alice joins the room first over federation. Proxy does the right thing and sets her membership to join. There is no timeline though due to not having backfilled.
// - Alice's client backfills in the room which pulls in the invite event, but the SS proxy doesn't see it as it's backfill, not /sync.
// - Charlie joins the same room via SS, which makes the SS proxy see 50 timeline events, which includes the invite.
// As the proxy has never seen this invite event before, it assumes it is newer than the join event and inserts it, corrupting state.
//
// Manually confirmed this can happen with 3x Element clients. We need to make sure we drop those earlier events.
// The first join over federation presents itself as a single join event in the timeline, with the create event, etc in state.
func TestBackfillInviteDoesntCorruptState(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
// setup code
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()

fedBob := "@bob:over_federation"
charlie := "@charlie:localhost"
charlieToken := "CHARLIE_TOKEN"
joinEvent := testutils.NewJoinEvent(t, alice)

room := roomEvents{
roomID: "!TestBackfillInviteDoesntCorruptState:localhost",
events: []json.RawMessage{
joinEvent,
},
state: createRoomState(t, fedBob, time.Now()),
}
v2.addAccount(t, alice, aliceToken)
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(room),
},
})

// alice syncs and should see the room.
aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{
Lists: map[string]sync3.RequestList{
"a": {
Ranges: sync3.SliceRanges{{0, 20}},
RoomSubscription: sync3.RoomSubscription{
TimelineLimit: 5,
},
},
},
})
m.MatchResponse(t, aliceRes, m.MatchList("a", m.MatchV3Count(1), m.MatchV3Ops(m.MatchV3SyncOp(0, 0, []string{room.roomID}))))

// Alice's client "backfills" new data in, meaning the next user who joins is going to see a different set of timeline events
dummyMsg := testutils.NewMessageEvent(t, fedBob, "you didn't see this before joining")
charlieJoinEvent := testutils.NewJoinEvent(t, charlie)
backfilledTimelineEvents := append(
room.state, []json.RawMessage{
dummyMsg,
testutils.NewStateEvent(t, "m.room.member", alice, fedBob, map[string]interface{}{
"membership": "invite",
}),
joinEvent,
charlieJoinEvent,
}...,
)

// now charlie also joins the room, causing a different response from /sync v2
v2.addAccount(t, charlie, charlieToken)
v2.queueResponse(charlie, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: room.roomID,
events: backfilledTimelineEvents,
}),
},
})

// and now charlie hits SS, which might corrupt membership state for alice.
charlieRes := v3.mustDoV3Request(t, charlieToken, sync3.Request{
Lists: map[string]sync3.RequestList{
"a": {
Ranges: sync3.SliceRanges{{0, 20}},
},
},
})
m.MatchResponse(t, charlieRes, m.MatchList("a", m.MatchV3Count(1), m.MatchV3Ops(m.MatchV3SyncOp(0, 0, []string{room.roomID}))))

// alice should not see dummyMsg or the invite
aliceRes = v3.mustDoV3RequestWithPos(t, aliceToken, aliceRes.Pos, sync3.Request{})
m.MatchResponse(t, aliceRes, m.MatchNoV3Ops(), m.LogResponse(t), m.MatchRoomSubscriptionsStrict(
map[string][]m.RoomMatcher{
room.roomID: {
m.MatchJoinCount(3), // alice, bob, charlie,
m.MatchNoInviteCount(),
m.MatchNumLive(1),
m.MatchRoomTimeline([]json.RawMessage{charlieJoinEvent}),
},
},
))
}

0 comments on commit e947612

Please sign in to comment.