-
Notifications
You must be signed in to change notification settings - Fork 36
/
dispatcher.go
286 lines (255 loc) · 8.64 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
package sync3
import (
"context"
"encoding/json"
"os"
"sync"
"github.com/matrix-org/sliding-sync/internal"
"github.com/matrix-org/sliding-sync/sync3/caches"
"github.com/rs/zerolog"
"github.com/tidwall/gjson"
)
var logger = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: "15:04:05",
})
const DispatcherAllUsers = "-"
// Receiver represents the callbacks that a Dispatcher may fire.
type Receiver interface {
OnNewEvent(ctx context.Context, event *caches.EventData)
OnReceipt(ctx context.Context, receipt internal.Receipt)
OnEphemeralEvent(ctx context.Context, roomID string, ephEvent json.RawMessage)
// OnRegistered is called after a successful call to Dispatcher.Register
OnRegistered(ctx context.Context) error
}
// Dispatches live events to caches
type Dispatcher struct {
jrt *JoinedRoomsTracker
userToReceiver map[string]Receiver
userToReceiverMu *sync.RWMutex
}
func NewDispatcher() *Dispatcher {
return &Dispatcher{
jrt: NewJoinedRoomsTracker(),
userToReceiver: make(map[string]Receiver),
userToReceiverMu: &sync.RWMutex{},
}
}
func (d *Dispatcher) IsUserJoined(userID, roomID string) bool {
return d.jrt.IsUserJoined(userID, roomID)
}
// Load joined members into the dispatcher.
// MUST BE CALLED BEFORE V2 POLL LOOPS START.
func (d *Dispatcher) Startup(roomToJoinedUsers map[string][]string) error {
// populate joined rooms tracker
d.jrt.Startup(roomToJoinedUsers)
return nil
}
func (d *Dispatcher) Unregister(userID string) {
d.userToReceiverMu.Lock()
defer d.userToReceiverMu.Unlock()
delete(d.userToReceiver, userID)
}
func (d *Dispatcher) Register(ctx context.Context, userID string, r Receiver) error {
d.userToReceiverMu.Lock()
defer d.userToReceiverMu.Unlock()
if _, ok := d.userToReceiver[userID]; ok {
logger.Warn().Str("user", userID).Msg("Dispatcher.Register: receiver already registered")
}
d.userToReceiver[userID] = r
return r.OnRegistered(ctx)
}
func (d *Dispatcher) ReceiverForUser(userID string) Receiver {
d.userToReceiverMu.RLock()
defer d.userToReceiverMu.RUnlock()
return d.userToReceiver[userID]
}
func (d *Dispatcher) newEventData(event json.RawMessage, roomID string, latestPos int64) *caches.EventData {
// parse the event to pull out fields we care about
var stateKey *string
ev := gjson.ParseBytes(event)
if sk := ev.Get("state_key"); sk.Exists() {
stateKey = &sk.Str
}
eventType := ev.Get("type").Str
return &caches.EventData{
Event: event,
RoomID: roomID,
EventType: eventType,
StateKey: stateKey,
Content: ev.Get("content"),
NID: latestPos,
Timestamp: ev.Get("origin_server_ts").Uint(),
Sender: ev.Get("sender").Str,
}
}
// Called by v2 pollers when we receive an initial state block. Very similar to OnNewEvents but
// done in bulk for speed.
func (d *Dispatcher) OnNewInitialRoomState(ctx context.Context, roomID string, state []json.RawMessage) {
// sanity check
if _, jc := d.jrt.JoinedUsersForRoom(roomID, nil); jc > 0 {
logger.Warn().Int("join_count", jc).Str("room", roomID).Int("num_state", len(state)).Msg(
"OnNewInitialRoomState but have entries in JoinedRoomsTracker already, this should be impossible. Degrading to live events",
)
for _, s := range state {
d.OnNewEvent(ctx, roomID, s, 0)
}
return
}
// create event datas for state
eventDatas := make([]*caches.EventData, len(state))
var joined, invited []string
for i, event := range state {
ed := d.newEventData(event, roomID, 0)
eventDatas[i] = ed
if ed.EventType == "m.room.member" && ed.StateKey != nil {
membership := ed.Content.Get("membership").Str
switch membership {
case "invite":
invited = append(invited, *ed.StateKey)
case "join":
joined = append(joined, *ed.StateKey)
}
}
}
// bulk update joined room tracker
forceInitial := d.jrt.UsersJoinedRoom(joined, roomID)
d.jrt.UsersInvitedToRoom(invited, roomID)
inviteCount := d.jrt.NumInvitedUsersForRoom(roomID)
// work out who to notify
userIDs, joinCount := d.jrt.JoinedUsersForRoom(roomID, func(userID string) bool {
if userID == DispatcherAllUsers {
return false // safety guard to prevent dupe global callbacks
}
return d.ReceiverForUser(userID) != nil
})
// notify listeners
for _, ed := range eventDatas {
ed.InviteCount = inviteCount
ed.JoinCount = joinCount
d.notifyListeners(ctx, ed, userIDs, "", forceInitial, "")
}
}
func (d *Dispatcher) OnNewEvent(
ctx context.Context, roomID string, event json.RawMessage, nid int64,
) {
ed := d.newEventData(event, roomID, nid)
// update the tracker
targetUser := ""
membership := ""
shouldForceInitial := false
if ed.EventType == "m.room.member" && ed.StateKey != nil {
targetUser = *ed.StateKey
membership = ed.Content.Get("membership").Str
switch membership {
case "invite":
// we only do this to track invite counts correctly.
d.jrt.UsersInvitedToRoom([]string{targetUser}, ed.RoomID)
case "join":
if d.jrt.UserJoinedRoom(targetUser, ed.RoomID) {
shouldForceInitial = true
}
case "ban":
fallthrough
case "leave":
d.jrt.UserLeftRoom(targetUser, ed.RoomID)
}
ed.InviteCount = d.jrt.NumInvitedUsersForRoom(ed.RoomID)
}
// notify all people in this room
userIDs, joinCount := d.jrt.JoinedUsersForRoom(ed.RoomID, func(userID string) bool {
if userID == DispatcherAllUsers {
return false // safety guard to prevent dupe global callbacks
}
return d.ReceiverForUser(userID) != nil
})
ed.JoinCount = joinCount
d.notifyListeners(ctx, ed, userIDs, targetUser, shouldForceInitial, membership)
}
func (d *Dispatcher) OnEphemeralEvent(ctx context.Context, roomID string, ephEvent json.RawMessage) {
notifyUserIDs, _ := d.jrt.JoinedUsersForRoom(roomID, func(userID string) bool {
if userID == DispatcherAllUsers {
return false // safety guard to prevent dupe global callbacks
}
return d.ReceiverForUser(userID) != nil
})
d.userToReceiverMu.RLock()
defer d.userToReceiverMu.RUnlock()
// global listeners (invoke before per-user listeners so caches can update)
listener := d.userToReceiver[DispatcherAllUsers]
if listener != nil {
listener.OnEphemeralEvent(ctx, roomID, ephEvent)
}
// poke user caches OnEphemeralEvent which then pokes ConnState
for _, userID := range notifyUserIDs {
l := d.userToReceiver[userID]
if l == nil {
continue
}
l.OnEphemeralEvent(ctx, roomID, ephEvent)
}
}
func (d *Dispatcher) OnReceipt(ctx context.Context, receipt internal.Receipt) {
notifyUserIDs, _ := d.jrt.JoinedUsersForRoom(receipt.RoomID, func(userID string) bool {
if userID == DispatcherAllUsers {
return false // safety guard to prevent dupe global callbacks
}
return d.ReceiverForUser(userID) != nil
})
d.userToReceiverMu.RLock()
defer d.userToReceiverMu.RUnlock()
// global listeners (invoke before per-user listeners so caches can update)
listener := d.userToReceiver[DispatcherAllUsers]
if listener != nil {
listener.OnReceipt(ctx, receipt) // FIXME: redundant, it doesn't care about receipts
}
// poke user caches OnReceipt which then pokes ConnState
for _, userID := range notifyUserIDs {
l := d.userToReceiver[userID]
if l == nil {
continue
}
l.OnReceipt(ctx, receipt)
}
}
func (d *Dispatcher) notifyListeners(ctx context.Context, ed *caches.EventData, userIDs []string, targetUser string, shouldForceInitial bool, membership string) {
internal.Logf(ctx, "dispatcher", "%s: notify %d users (nid=%d,join_count=%d)", ed.RoomID, len(userIDs), ed.NID, ed.JoinCount)
// invoke listeners
d.userToReceiverMu.RLock()
defer d.userToReceiverMu.RUnlock()
// global listeners (invoke before per-user listeners so caches can update)
listener := d.userToReceiver[DispatcherAllUsers]
if listener != nil {
listener.OnNewEvent(ctx, ed)
}
// per-user listeners
notifiedTarget := false
for _, userID := range userIDs {
l := d.userToReceiver[userID]
if l != nil {
edd := *ed
if targetUser == userID {
notifiedTarget = true
if shouldForceInitial {
edd.ForceInitial = true
}
}
l.OnNewEvent(ctx, &edd)
}
}
if targetUser != "" && !notifiedTarget { // e.g invites/leaves where you aren't joined yet but need to know about it
// We expect invites to come down the invitee's poller, which triggers OnInvite code paths and
// not normal event codepaths. We need the separate code path to ensure invite stripped state
// is sent to the conn and not live data. Hence, if we get the invite event early from a different
// connection, do not send it to the target, as they must wait for the invite on their poller.
if membership != "invite" {
if shouldForceInitial {
ed.ForceInitial = true
}
l := d.userToReceiver[targetUser]
if l != nil {
l.OnNewEvent(ctx, ed)
}
}
}
}