From db7e94dd0fec55338638c6dd49b9989c252e9296 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 27 Sep 2023 18:19:57 +0100 Subject: [PATCH] Make UserCaches respond to gappy memberships Remove redundant comment Improve errmsg --- sync3/caches/user.go | 7 ---- sync3/dispatcher.go | 73 +++++++++++++++++++++++++++++++++++----- sync3/handler/handler.go | 23 ++++++++++++- sync3/tracker.go | 41 ++++++++++++++++++++++ sync3/tracker_test.go | 39 +++++++++++++++++++++ 5 files changed, 167 insertions(+), 16 deletions(-) diff --git a/sync3/caches/user.go b/sync3/caches/user.go index 62138055..185d9ba7 100644 --- a/sync3/caches/user.go +++ b/sync3/caches/user.go @@ -769,10 +769,3 @@ func (u *UserCache) ShouldIgnore(userID string) bool { _, ignored := u.ignoredUsers[userID] return ignored } - -func (u *UserCache) OnInvalidateRoom(ctx context.Context, roomID string) { - // Nothing for now. In UserRoomData the fields dependant on room state are - // IsDM, IsInvite, HasLeft, Invite, CanonicalisedName, ResolvedAvatarURL, Spaces. - // Not clear to me if we need to reload these or if we will inherit any changes from - // the global cache. -} diff --git a/sync3/dispatcher.go b/sync3/dispatcher.go index 90d8df0a..e0a167f8 100644 --- a/sync3/dispatcher.go +++ b/sync3/dispatcher.go @@ -23,7 +23,6 @@ type Receiver interface { OnNewEvent(ctx context.Context, event *caches.EventData) OnReceipt(ctx context.Context, receipt internal.Receipt) OnEphemeralEvent(ctx context.Context, roomID string, ephEvent json.RawMessage) - OnInvalidateRoom(ctx context.Context, roomID string) // OnRegistered is called after a successful call to Dispatcher.Register OnRegistered(ctx context.Context) error } @@ -264,22 +263,80 @@ func (d *Dispatcher) notifyListeners(ctx context.Context, ed *caches.EventData, } } -func (d *Dispatcher) OnInvalidateRoom(ctx context.Context, roomID string) { +func (d *Dispatcher) OnInvalidateRoom( + ctx context.Context, + roomID string, + joins map[string]*caches.EventData, + invites map[string][]json.RawMessage, + leaves map[string]json.RawMessage, +) { + // XXX: This is all a messy hack and doesn't feel very satisfying. Other than + // fetching the userCaches, the dispatcher isn't doing very much for us here. + // First dispatch to the global cache. - receiver, ok := d.userToReceiver[DispatcherAllUsers] - if !ok { + receiver := d.userToReceiver[DispatcherAllUsers] + gc := receiver.(*caches.GlobalCache) + if gc == nil { logger.Error().Msgf("No receiver for global cache") + return } - receiver.OnInvalidateRoom(ctx, roomID) + gc.OnInvalidateRoom(ctx, roomID) + + // Reset the joined room tracker. + d.jrt.ReloadMembershipsForRoom(roomID, internal.Keys(joins), internal.Keys(invites)) // Then dispatch to any users who are joined to that room. - joinedUsers, _ := d.jrt.JoinedUsersForRoom(roomID, nil) + // Invalidation is currently triggered by a state change or a redaction. + // + // Ensure we update the following fields on UserRoomData: + // + // * IsInvite and HasLeave: can be altered by state changes + // * Invite: May need to remove this if we're no longer invited after a state change, + // but shouldn't alter otherwise + // * JoinTiming: can change if our membership has changed. + // + // Do this by fetching current membership and calling the appropriate callback. + + // We should, but don't currently update these fields: + // + // * Spaces: can grow or shrink if m.space.parent state events have changed. + // Also, if this room is a space and its m.space.child state events have changed, + // we would need to update other rooms' Spaces maps. + + // We ignore the following fields because invalidation cannot change them: + // + // * IsDM: can't be affected by either of the invalidation causes. + // * NotificationCount and HighlightCount: might be altered by redactions... but we + // don't have enough info to compute counts, so ignore these. + // * CanonicalisedName and ResolvedAvatarURL: not tracked by the UserCache, these + // are updated after global metadata changes in SetRoom. + // * Tags: account data can't be changed by the invalidation triggers. + d.userToReceiverMu.RLock() defer d.userToReceiverMu.RUnlock() - for _, userID := range joinedUsers { + + // TODO: if there is a state reset, users can leave without having a leave event. + // We would still need to mark those users as having left their rooms. + for userID, leaveEvent := range leaves { + receiver = d.userToReceiver[userID] + uc := receiver.(*caches.UserCache) + if uc != nil { + uc.OnLeftRoom(ctx, roomID, leaveEvent) + } + } + + for userID, inviteState := range invites { + receiver = d.userToReceiver[userID] + uc := receiver.(*caches.UserCache) + if uc != nil { + uc.OnInvite(ctx, roomID, inviteState) + } + } + + for userID, joinData := range joins { receiver = d.userToReceiver[userID] if receiver != nil { - receiver.OnInvalidateRoom(ctx, roomID) + receiver.OnNewEvent(ctx, joinData) } } } diff --git a/sync3/handler/handler.go b/sync3/handler/handler.go index 5e6b80e3..948dcc6a 100644 --- a/sync3/handler/handler.go +++ b/sync3/handler/handler.go @@ -807,7 +807,28 @@ func (h *SyncLiveHandler) OnInvalidateRoom(p *pubsub.V2InvalidateRoom) { ctx, task := internal.StartTask(context.Background(), "OnInvalidateRoom") defer task.End() - h.Dispatcher.OnInvalidateRoom(ctx, p.RoomID) + joins, invites, leaves, err := h.Storage.FetchMemberships(p.RoomID) + if err != nil { + hub := internal.GetSentryHubFromContextOrDefault(ctx) + hub.WithScope(func(scope *sentry.Scope) { + scope.SetContext(internal.SentryCtxKey, map[string]any{ + "room_id": p.RoomID, + }) + hub.CaptureException(err) + }) + logger.Err(err). + Str("room_id", p.RoomID). + Msg("Failed to fetch members after cache invalidation") + } + + joinEventDatas := make(map[string]*caches.EventData, len(joins)) + for userID, event := range joins { + ed := caches.NewEventData(event.JSON, p.RoomID, event.NID) + ed.AlwaysProcess = true + joinEventDatas[userID] = ed + } + + h.Dispatcher.OnInvalidateRoom(ctx, p.RoomID, joinEventDatas, invites, leaves) } func parseIntFromQuery(u *url.URL, param string) (result int64, err *internal.HandlerError) { diff --git a/sync3/tracker.go b/sync3/tracker.go index 3a1c73cf..2ec20203 100644 --- a/sync3/tracker.go +++ b/sync3/tracker.go @@ -184,3 +184,44 @@ func (t *JoinedRoomsTracker) NumInvitedUsersForRoom(roomID string) int { defer t.mu.RUnlock() return len(t.roomIDToInvitedUsers[roomID]) } + +// ReloadMembershipsForRoom overwrites the JoinedRoomsTracker state for one room to the +// given list of joined and invited users. It returns the list of users who were joined +// or invited prior to this call, but are no longer joined nor invited. +func (t *JoinedRoomsTracker) ReloadMembershipsForRoom(roomID string, joined, invited []string) { + newJoined := make(set, len(joined)) + newInvited := make(set, len(invited)) + for _, member := range joined { + newJoined[member] = struct{}{} + } + for _, member := range invited { + newInvited[member] = struct{}{} + } + + t.mu.Lock() + defer t.mu.Unlock() + + // 1. Overwrite the room's memberships with the given arguments. + oldJoined := t.roomIDToJoinedUsers[roomID] + t.roomIDToJoinedUsers[roomID] = newJoined + t.roomIDToInvitedUsers[roomID] = newInvited + + // 2. Mark the joined users as being joined to this room. + for userID := range newJoined { + _, userAlreadyTracked := t.userIDToJoinedRooms[userID] + if !userAlreadyTracked { + t.userIDToJoinedRooms[userID] = make(set) + } + t.userIDToJoinedRooms[userID][roomID] = struct{}{} + } + + // 3. Scan the old joined list for users who are no longer joined, and mark them as such. + for userID := range oldJoined { + _, stillJoined := newJoined[userID] + if !stillJoined { + delete(t.userIDToJoinedRooms[userID], roomID) + } + } + + return +} diff --git a/sync3/tracker_test.go b/sync3/tracker_test.go index 7be2336a..39b12d77 100644 --- a/sync3/tracker_test.go +++ b/sync3/tracker_test.go @@ -82,6 +82,45 @@ func TestTrackerStartup(t *testing.T) { assertInt(t, jrt.NumInvitedUsersForRoom(roomC), 0) } +func TestTrackerReload(t *testing.T) { + roomA := "!a" + roomB := "!b" + roomC := "!c" + alice := "@alice" + bob := "@bob" + chris := "@chris" + jrt := NewJoinedRoomsTracker() + jrt.Startup(map[string][]string{ + roomA: {alice, bob}, + roomB: {bob}, + roomC: {alice}, + }) + + t.Log("Chris joins room C.") + jrt.ReloadMembershipsForRoom(roomC, []string{alice, chris}, nil) + members, _ := jrt.JoinedUsersForRoom(roomC, nil) + assertEqualSlices(t, "roomC joined members", members, []string{alice, chris}) + assertEqualSlices(t, "alice's rooms", jrt.JoinedRoomsForUser(alice), []string{roomA, roomC}) + assertEqualSlices(t, "chris's rooms", jrt.JoinedRoomsForUser(chris), []string{roomC}) + assertInt(t, jrt.NumInvitedUsersForRoom(roomC), 0) + + t.Log("Bob leaves room B.") + jrt.ReloadMembershipsForRoom(roomB, nil, nil) + members, _ = jrt.JoinedUsersForRoom(roomB, nil) + assertEqualSlices(t, "roomB joined members", members, nil) + assertEqualSlices(t, "bob's rooms", jrt.JoinedRoomsForUser(bob), []string{roomA}) + assertInt(t, jrt.NumInvitedUsersForRoom(roomB), 0) + + t.Log("Chris joins room A. Alice and Bob leave it, but Chris reinvites Bob.") + jrt.ReloadMembershipsForRoom(roomA, []string{chris}, []string{bob}) + members, _ = jrt.JoinedUsersForRoom(roomA, nil) + assertEqualSlices(t, "roomA joined members", members, []string{chris}) + assertEqualSlices(t, "alice's rooms", jrt.JoinedRoomsForUser(alice), []string{roomC}) + assertEqualSlices(t, "bob's rooms", jrt.JoinedRoomsForUser(bob), nil) + assertEqualSlices(t, "chris's rooms", jrt.JoinedRoomsForUser(chris), []string{roomA, roomC}) + assertInt(t, jrt.NumInvitedUsersForRoom(roomA), 1) +} + func assertBool(t *testing.T, msg string, got, want bool) { t.Helper() if got != want {