Skip to content

Commit

Permalink
Make UserCaches respond to gappy memberships
Browse files Browse the repository at this point in the history
Remove redundant comment

Improve errmsg
  • Loading branch information
David Robertson committed Oct 10, 2023
1 parent 32479e2 commit db7e94d
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 16 deletions.
7 changes: 0 additions & 7 deletions sync3/caches/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,10 +769,3 @@ func (u *UserCache) ShouldIgnore(userID string) bool {
_, ignored := u.ignoredUsers[userID]
return ignored
}

func (u *UserCache) OnInvalidateRoom(ctx context.Context, roomID string) {
// Nothing for now. In UserRoomData the fields dependant on room state are
// IsDM, IsInvite, HasLeft, Invite, CanonicalisedName, ResolvedAvatarURL, Spaces.
// Not clear to me if we need to reload these or if we will inherit any changes from
// the global cache.
}
73 changes: 65 additions & 8 deletions sync3/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ type Receiver interface {
OnNewEvent(ctx context.Context, event *caches.EventData)
OnReceipt(ctx context.Context, receipt internal.Receipt)
OnEphemeralEvent(ctx context.Context, roomID string, ephEvent json.RawMessage)
OnInvalidateRoom(ctx context.Context, roomID string)
// OnRegistered is called after a successful call to Dispatcher.Register
OnRegistered(ctx context.Context) error
}
Expand Down Expand Up @@ -264,22 +263,80 @@ func (d *Dispatcher) notifyListeners(ctx context.Context, ed *caches.EventData,
}
}

func (d *Dispatcher) OnInvalidateRoom(ctx context.Context, roomID string) {
func (d *Dispatcher) OnInvalidateRoom(
ctx context.Context,
roomID string,
joins map[string]*caches.EventData,
invites map[string][]json.RawMessage,
leaves map[string]json.RawMessage,
) {
// XXX: This is all a messy hack and doesn't feel very satisfying. Other than
// fetching the userCaches, the dispatcher isn't doing very much for us here.

// First dispatch to the global cache.
receiver, ok := d.userToReceiver[DispatcherAllUsers]
if !ok {
receiver := d.userToReceiver[DispatcherAllUsers]
gc := receiver.(*caches.GlobalCache)
if gc == nil {
logger.Error().Msgf("No receiver for global cache")
return
}
receiver.OnInvalidateRoom(ctx, roomID)
gc.OnInvalidateRoom(ctx, roomID)

// Reset the joined room tracker.
d.jrt.ReloadMembershipsForRoom(roomID, internal.Keys(joins), internal.Keys(invites))

// Then dispatch to any users who are joined to that room.
joinedUsers, _ := d.jrt.JoinedUsersForRoom(roomID, nil)
// Invalidation is currently triggered by a state change or a redaction.
//
// Ensure we update the following fields on UserRoomData:
//
// * IsInvite and HasLeave: can be altered by state changes
// * Invite: May need to remove this if we're no longer invited after a state change,
// but shouldn't alter otherwise
// * JoinTiming: can change if our membership has changed.
//
// Do this by fetching current membership and calling the appropriate callback.

// We should, but don't currently update these fields:
//
// * Spaces: can grow or shrink if m.space.parent state events have changed.
// Also, if this room is a space and its m.space.child state events have changed,
// we would need to update other rooms' Spaces maps.

// We ignore the following fields because invalidation cannot change them:
//
// * IsDM: can't be affected by either of the invalidation causes.
// * NotificationCount and HighlightCount: might be altered by redactions... but we
// don't have enough info to compute counts, so ignore these.
// * CanonicalisedName and ResolvedAvatarURL: not tracked by the UserCache, these
// are updated after global metadata changes in SetRoom.
// * Tags: account data can't be changed by the invalidation triggers.

d.userToReceiverMu.RLock()
defer d.userToReceiverMu.RUnlock()
for _, userID := range joinedUsers {

// TODO: if there is a state reset, users can leave without having a leave event.
// We would still need to mark those users as having left their rooms.
for userID, leaveEvent := range leaves {
receiver = d.userToReceiver[userID]
uc := receiver.(*caches.UserCache)
if uc != nil {
uc.OnLeftRoom(ctx, roomID, leaveEvent)
}
}

for userID, inviteState := range invites {
receiver = d.userToReceiver[userID]
uc := receiver.(*caches.UserCache)
if uc != nil {
uc.OnInvite(ctx, roomID, inviteState)
}
}

for userID, joinData := range joins {
receiver = d.userToReceiver[userID]
if receiver != nil {
receiver.OnInvalidateRoom(ctx, roomID)
receiver.OnNewEvent(ctx, joinData)
}
}
}
23 changes: 22 additions & 1 deletion sync3/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,28 @@ func (h *SyncLiveHandler) OnInvalidateRoom(p *pubsub.V2InvalidateRoom) {
ctx, task := internal.StartTask(context.Background(), "OnInvalidateRoom")
defer task.End()

h.Dispatcher.OnInvalidateRoom(ctx, p.RoomID)
joins, invites, leaves, err := h.Storage.FetchMemberships(p.RoomID)
if err != nil {
hub := internal.GetSentryHubFromContextOrDefault(ctx)
hub.WithScope(func(scope *sentry.Scope) {
scope.SetContext(internal.SentryCtxKey, map[string]any{
"room_id": p.RoomID,
})
hub.CaptureException(err)
})
logger.Err(err).
Str("room_id", p.RoomID).
Msg("Failed to fetch members after cache invalidation")
}

joinEventDatas := make(map[string]*caches.EventData, len(joins))
for userID, event := range joins {
ed := caches.NewEventData(event.JSON, p.RoomID, event.NID)
ed.AlwaysProcess = true
joinEventDatas[userID] = ed
}

h.Dispatcher.OnInvalidateRoom(ctx, p.RoomID, joinEventDatas, invites, leaves)
}

func parseIntFromQuery(u *url.URL, param string) (result int64, err *internal.HandlerError) {
Expand Down
41 changes: 41 additions & 0 deletions sync3/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,44 @@ func (t *JoinedRoomsTracker) NumInvitedUsersForRoom(roomID string) int {
defer t.mu.RUnlock()
return len(t.roomIDToInvitedUsers[roomID])
}

// ReloadMembershipsForRoom overwrites the JoinedRoomsTracker state for one room to the
// given list of joined and invited users. It returns the list of users who were joined
// or invited prior to this call, but are no longer joined nor invited.
func (t *JoinedRoomsTracker) ReloadMembershipsForRoom(roomID string, joined, invited []string) {
newJoined := make(set, len(joined))
newInvited := make(set, len(invited))
for _, member := range joined {
newJoined[member] = struct{}{}
}
for _, member := range invited {
newInvited[member] = struct{}{}
}

t.mu.Lock()
defer t.mu.Unlock()

// 1. Overwrite the room's memberships with the given arguments.
oldJoined := t.roomIDToJoinedUsers[roomID]
t.roomIDToJoinedUsers[roomID] = newJoined
t.roomIDToInvitedUsers[roomID] = newInvited

// 2. Mark the joined users as being joined to this room.
for userID := range newJoined {
_, userAlreadyTracked := t.userIDToJoinedRooms[userID]
if !userAlreadyTracked {
t.userIDToJoinedRooms[userID] = make(set)
}
t.userIDToJoinedRooms[userID][roomID] = struct{}{}
}

// 3. Scan the old joined list for users who are no longer joined, and mark them as such.
for userID := range oldJoined {
_, stillJoined := newJoined[userID]
if !stillJoined {
delete(t.userIDToJoinedRooms[userID], roomID)
}
}

return
}
39 changes: 39 additions & 0 deletions sync3/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,45 @@ func TestTrackerStartup(t *testing.T) {
assertInt(t, jrt.NumInvitedUsersForRoom(roomC), 0)
}

func TestTrackerReload(t *testing.T) {
roomA := "!a"
roomB := "!b"
roomC := "!c"
alice := "@alice"
bob := "@bob"
chris := "@chris"
jrt := NewJoinedRoomsTracker()
jrt.Startup(map[string][]string{
roomA: {alice, bob},
roomB: {bob},
roomC: {alice},
})

t.Log("Chris joins room C.")
jrt.ReloadMembershipsForRoom(roomC, []string{alice, chris}, nil)
members, _ := jrt.JoinedUsersForRoom(roomC, nil)
assertEqualSlices(t, "roomC joined members", members, []string{alice, chris})
assertEqualSlices(t, "alice's rooms", jrt.JoinedRoomsForUser(alice), []string{roomA, roomC})
assertEqualSlices(t, "chris's rooms", jrt.JoinedRoomsForUser(chris), []string{roomC})
assertInt(t, jrt.NumInvitedUsersForRoom(roomC), 0)

t.Log("Bob leaves room B.")
jrt.ReloadMembershipsForRoom(roomB, nil, nil)
members, _ = jrt.JoinedUsersForRoom(roomB, nil)
assertEqualSlices(t, "roomB joined members", members, nil)
assertEqualSlices(t, "bob's rooms", jrt.JoinedRoomsForUser(bob), []string{roomA})
assertInt(t, jrt.NumInvitedUsersForRoom(roomB), 0)

t.Log("Chris joins room A. Alice and Bob leave it, but Chris reinvites Bob.")
jrt.ReloadMembershipsForRoom(roomA, []string{chris}, []string{bob})
members, _ = jrt.JoinedUsersForRoom(roomA, nil)
assertEqualSlices(t, "roomA joined members", members, []string{chris})
assertEqualSlices(t, "alice's rooms", jrt.JoinedRoomsForUser(alice), []string{roomC})
assertEqualSlices(t, "bob's rooms", jrt.JoinedRoomsForUser(bob), nil)
assertEqualSlices(t, "chris's rooms", jrt.JoinedRoomsForUser(chris), []string{roomA, roomC})
assertInt(t, jrt.NumInvitedUsersForRoom(roomA), 1)
}

func assertBool(t *testing.T, msg string, got, want bool) {
t.Helper()
if got != want {
Expand Down

0 comments on commit db7e94d

Please sign in to comment.