Skip to content

Commit

Permalink
Remove PrependStateEvents
Browse files Browse the repository at this point in the history
  • Loading branch information
David Robertson committed Sep 21, 2023
1 parent ba90161 commit 6352304
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 26 deletions.
6 changes: 3 additions & 3 deletions sync2/handler2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,20 +369,20 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID strin
return nil
}

func (h *Handler) Initialise(ctx context.Context, roomID string, state []json.RawMessage) ([]json.RawMessage, error) {
func (h *Handler) Initialise(ctx context.Context, roomID string, state []json.RawMessage) error {
res, err := h.Store.Initialise(roomID, state)
if err != nil {
logger.Err(err).Int("state", len(state)).Str("room", roomID).Msg("V2: failed to initialise room")
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
return nil, err
return err
}
if res.AddedEvents {
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2Initialise{
RoomID: roomID,
SnapshotNID: res.SnapshotID,
})
}
return res.PrependTimelineEvents, nil
return nil
}

func (h *Handler) SetTyping(ctx context.Context, pollerID sync2.PollerID, roomID string, ephEvent json.RawMessage) {
Expand Down
27 changes: 4 additions & 23 deletions sync2/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type V2DataReceiver interface {
// Initialise the room, if it hasn't been already. This means the state section of the v2 response.
// If given a state delta from an incremental sync, returns the slice of all state events unknown to the DB.
// Return an error to stop the since token advancing.
Initialise(ctx context.Context, roomID string, state []json.RawMessage) ([]json.RawMessage, error) // snapshot ID?
Initialise(ctx context.Context, roomID string, state []json.RawMessage) error // snapshot ID?
// SetTyping indicates which users are typing.
SetTyping(ctx context.Context, pollerID PollerID, roomID string, ephEvent json.RawMessage)
// Sent when there is a new receipt
Expand Down Expand Up @@ -320,11 +320,11 @@ func (h *PollerMap) Accumulate(ctx context.Context, userID, deviceID, roomID str
wg.Wait()
return
}
func (h *PollerMap) Initialise(ctx context.Context, roomID string, state []json.RawMessage) (result []json.RawMessage, err error) {
func (h *PollerMap) Initialise(ctx context.Context, roomID string, state []json.RawMessage) (err error) {
var wg sync.WaitGroup
wg.Add(1)
h.executor <- func() {
result, err = h.callbacks.Initialise(ctx, roomID, state)
err = h.callbacks.Initialise(ctx, roomID, state)
wg.Done()
}
wg.Wait()
Expand Down Expand Up @@ -775,30 +775,11 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
for roomID, roomData := range res.Rooms.Join {
if len(roomData.State.Events) > 0 {
stateCalls++
prependStateEvents, err := p.receiver.Initialise(ctx, roomID, roomData.State.Events)
err := p.receiver.Initialise(ctx, roomID, roomData.State.Events)
if err != nil {
lastErrs = append(lastErrs, fmt.Errorf("Initialise[%s]: %w", roomID, err))
continue
}
if len(prependStateEvents) > 0 {
// The poller has just learned of these state events due to an
// incremental poller sync; we must have missed the opportunity to see
// these down /sync in a timeline. As a workaround, inject these into
// the timeline now so that future events are received under the
// correct room state.
const warnMsg = "parseRoomsResponse: prepending state events to timeline after gappy poll"
logger.Warn().Str("room_id", roomID).Int("prependStateEvents", len(prependStateEvents)).Msg(warnMsg)
hub := internal.GetSentryHubFromContextOrDefault(ctx)
hub.WithScope(func(scope *sentry.Scope) {
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
"room_id": roomID,
"num_prepend_state_events": len(prependStateEvents),
})
hub.CaptureMessage(warnMsg)
})
p.trackGappyStateSize(len(prependStateEvents))
roomData.Timeline.Events = append(prependStateEvents, roomData.Timeline.Events...)
}
}
// process typing/receipts before events so we seed the caches correctly for when we return the room
for _, ephEvent := range roomData.Ephemeral.Events {
Expand Down

0 comments on commit 6352304

Please sign in to comment.