diff --git a/sync2/poller.go b/sync2/poller.go index 6a20e659..b2cae811 100644 --- a/sync2/poller.go +++ b/sync2/poller.go @@ -552,6 +552,14 @@ func (p *poller) poll(ctx context.Context, s *pollLoopState) error { p.totalNumPolls.Inc() } if s.failCount > 0 { + if s.failCount > 1000 { + // 3s * 1000 = 3000s = 50 minutes + errMsg := "poller: access token has failed >1000 times to /sync, terminating loop" + p.logger.Warn().Msg(errMsg) + p.receiver.OnExpiredToken(ctx, hashToken(p.accessToken), p.userID, p.deviceID) + p.Terminate() + return fmt.Errorf(errMsg) + } // don't backoff when doing v2 syncs because the response is only in the cache for a short // period of time (on massive accounts on matrix.org) such that if you wait 2,4,8min between // requests it might force the server to do the work all over again :( diff --git a/sync2/poller_test.go b/sync2/poller_test.go index e51771af..ac8e4b2d 100644 --- a/sync2/poller_test.go +++ b/sync2/poller_test.go @@ -553,6 +553,42 @@ func mustEqualSince(t *testing.T, gotSince, expectedSince string) { } } +func TestPollerGivesUpEventually(t *testing.T) { + deviceID := "FOOBAR" + hasPolledSuccessfully := make(chan struct{}) + accumulator, client := newMocks(func(authHeader, since string) (*SyncResponse, int, error) { + return nil, 524, fmt.Errorf("gateway timeout") + }) + timeSleep = func(d time.Duration) { + // actually sleep to make sure async actions can happen if any + time.Sleep(1 * time.Microsecond) + } + defer func() { // reset the value after the test runs + timeSleep = time.Sleep + }() + var wg sync.WaitGroup + wg.Add(1) + poller := newPoller(PollerID{UserID: "@alice:localhost", DeviceID: deviceID}, "Authorization: hello world", client, accumulator, zerolog.New(os.Stderr), false) + go func() { + defer wg.Done() + poller.Poll("") + }() + go func() { + poller.WaitUntilInitialSync() + close(hasPolledSuccessfully) + }() + wg.Wait() + select { + case <-hasPolledSuccessfully: + case <-time.After(100 * time.Millisecond): + break + } + // poller should be in the terminated state + if !poller.terminated.Load() { + t.Errorf("poller was not terminated") + } +} + // Tests that the poller backs off in 2,4,8,etc second increments to a variety of errors func TestPollerBackoff(t *testing.T) { deviceID := "FOOBAR" diff --git a/sync3/caches/user.go b/sync3/caches/user.go index 51bd6b02..185d9ba7 100644 --- a/sync3/caches/user.go +++ b/sync3/caches/user.go @@ -42,10 +42,6 @@ type UserRoomData struct { HighlightCount int Invite *InviteData - // this field is set by LazyLoadTimelines and is per-function call, and is not persisted in-memory. - // The zero value of this safe to use (0 latest nid, no prev batch, no timeline). - RequestedLatestEvents state.LatestEvents - // TODO: should CanonicalisedName really be in RoomConMetadata? It's only set in SetRoom AFAICS CanonicalisedName string // stripped leading symbols like #, all in lower case // Set of spaces this room is a part of, from the perspective of this user. This is NOT global room data @@ -182,18 +178,18 @@ type UserCacheListener interface { // Tracks data specific to a given user. Specifically, this is the map of room ID to UserRoomData. // This data is user-scoped, not global or connection scoped. type UserCache struct { - LazyRoomDataOverride func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData - UserID string - roomToData map[string]UserRoomData - roomToDataMu *sync.RWMutex - listeners map[int]UserCacheListener - listenersMu *sync.RWMutex - id int - store *state.Storage - globalCache *GlobalCache - txnIDs TransactionIDFetcher - ignoredUsers map[string]struct{} - ignoredUsersMu *sync.RWMutex + LazyLoadTimelinesOverride func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents + UserID string + roomToData map[string]UserRoomData + roomToDataMu *sync.RWMutex + listeners map[int]UserCacheListener + listenersMu *sync.RWMutex + id int + store *state.Storage + globalCache *GlobalCache + txnIDs TransactionIDFetcher + ignoredUsers map[string]struct{} + ignoredUsersMu *sync.RWMutex } func NewUserCache(userID string, globalCache *GlobalCache, store *state.Storage, txnIDs TransactionIDFetcher) *UserCache { @@ -309,34 +305,29 @@ func (c *UserCache) OnRegistered(ctx context.Context) error { return nil } -// Load timelines from the database. Uses cached UserRoomData for metadata purposes only. -func (c *UserCache) LazyLoadTimelines(ctx context.Context, loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData { +// LazyLoadTimelines loads up to `maxTimelineEvents` from the database, plus other +// timeline-related data. Events from senders ignored by this user are dropped. +// Returns nil on error. +func (c *UserCache) LazyLoadTimelines(ctx context.Context, loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents { _, span := internal.StartSpan(ctx, "LazyLoadTimelines") defer span.End() - if c.LazyRoomDataOverride != nil { - return c.LazyRoomDataOverride(loadPos, roomIDs, maxTimelineEvents) + if c.LazyLoadTimelinesOverride != nil { + return c.LazyLoadTimelinesOverride(loadPos, roomIDs, maxTimelineEvents) } - result := make(map[string]UserRoomData) + result := make(map[string]state.LatestEvents) roomIDToLatestEvents, err := c.store.LatestEventsInRooms(c.UserID, roomIDs, loadPos, maxTimelineEvents) if err != nil { logger.Err(err).Strs("rooms", roomIDs).Msg("failed to get LatestEventsInRooms") internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err) return nil } - c.roomToDataMu.Lock() for _, requestedRoomID := range roomIDs { latestEvents := roomIDToLatestEvents[requestedRoomID] - urd, ok := c.roomToData[requestedRoomID] - if !ok { - urd = NewUserRoomData() - } if latestEvents != nil { latestEvents.DiscardIgnoredMessages(c.ShouldIgnore) - urd.RequestedLatestEvents = *latestEvents + result[requestedRoomID] = *latestEvents } - result[requestedRoomID] = urd } - c.roomToDataMu.Unlock() return result } @@ -350,6 +341,21 @@ func (c *UserCache) LoadRoomData(roomID string) UserRoomData { return data } +// LoadRooms is a batch version of LoadRoomData. Returns a map keyed by roomID. +func (c *UserCache) LoadRooms(roomIDs ...string) map[string]UserRoomData { + result := make(map[string]UserRoomData, len(roomIDs)) + c.roomToDataMu.RLock() + defer c.roomToDataMu.RUnlock() + for _, roomID := range roomIDs { + data, ok := c.roomToData[roomID] + if !ok { + data = NewUserRoomData() + } + result[roomID] = data + } + return result +} + type roomUpdateCache struct { roomID string // globalRoomData is a snapshot of the global metadata for this room immediately diff --git a/sync3/handler/connstate.go b/sync3/handler/connstate.go index d8bc451d..3cc11af2 100644 --- a/sync3/handler/connstate.go +++ b/sync3/handler/connstate.go @@ -554,31 +554,28 @@ func (s *ConnState) lazyLoadTypingMembers(ctx context.Context, response *sync3.R func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSubscription, bumpEventTypes []string, roomIDs ...string) map[string]sync3.Room { ctx, span := internal.StartSpan(ctx, "getInitialRoomData") defer span.End() - rooms := make(map[string]sync3.Room, len(roomIDs)) + + // 0. Load room metadata and timelines. // We want to grab the user room data and the room metadata for each room ID. We use the globally // highest NID we've seen to act as an anchor for the request. This anchor does not guarantee that // events returned here have already been seen - the position is not globally ordered - so because // room A has a position of 6 and B has 7 (so the highest is 7) does not mean that this connection // has seen 6, as concurrent room updates cause A and B to race. This is why we then go through the // response to this call to assign new load positions for each room. - roomIDToUserRoomData := s.userCache.LazyLoadTimelines(ctx, s.anchorLoadPosition, roomIDs, int(roomSub.TimelineLimit)) roomMetadatas := s.globalCache.LoadRooms(ctx, roomIDs...) - // prepare lazy loading data structures, txn IDs - roomToUsersInTimeline := make(map[string][]string, len(roomIDToUserRoomData)) + userRoomDatas := s.userCache.LoadRooms(roomIDs...) + timelines := s.userCache.LazyLoadTimelines(ctx, s.anchorLoadPosition, roomIDs, int(roomSub.TimelineLimit)) + + // 1. Prepare lazy loading data structures, txn IDs. + roomToUsersInTimeline := make(map[string][]string, len(timelines)) roomToTimeline := make(map[string][]json.RawMessage) - for roomID, urd := range roomIDToUserRoomData { - set := make(map[string]struct{}) - for _, ev := range urd.RequestedLatestEvents.Timeline { - set[gjson.GetBytes(ev, "sender").Str] = struct{}{} - } - userIDs := make([]string, len(set)) - i := 0 - for userID := range set { - userIDs[i] = userID - i++ - } - roomToUsersInTimeline[roomID] = userIDs - roomToTimeline[roomID] = urd.RequestedLatestEvents.Timeline + for roomID, latestEvents := range timelines { + senders := make(map[string]struct{}) + for _, ev := range latestEvents.Timeline { + senders[gjson.GetBytes(ev, "sender").Str] = struct{}{} + } + roomToUsersInTimeline[roomID] = internal.Keys(senders) + roomToTimeline[roomID] = latestEvents.Timeline // remember what we just loaded so if we see these events down the live stream we know to ignore them. // This means that requesting a direct room subscription causes the connection to jump ahead to whatever // is in the database at the time of the call, rather than gradually converging by consuming live data. @@ -586,10 +583,17 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu // room state is also pinned to the load position here, else you could see weird things in individual // responses such as an updated room.name without the associated m.room.name event (though this will // come through on the next request -> it converges to the right state so it isn't critical). - s.loadPositions[roomID] = urd.RequestedLatestEvents.LatestNID + s.loadPositions[roomID] = latestEvents.LatestNID } roomToTimeline = s.userCache.AnnotateWithTransactionIDs(ctx, s.userID, s.deviceID, roomToTimeline) + + // 2. Load required state events. rsm := roomSub.RequiredStateMap(s.userID) + if rsm.IsLazyLoading() { + for roomID, userIDs := range roomToUsersInTimeline { + s.lazyCache.Add(roomID, userIDs...) + } + } internal.Logf(ctx, "connstate", "getInitialRoomData for %d rooms, RequiredStateMap: %#v", len(roomIDs), rsm) @@ -597,7 +601,7 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu // since we'll be using the invite_state only. loadRoomIDs := make([]string, 0, len(roomIDs)) for _, roomID := range roomIDs { - userRoomData, ok := roomIDToUserRoomData[roomID] + userRoomData, ok := userRoomDatas[roomID] if !ok || !userRoomData.IsInvite { loadRoomIDs = append(loadRoomIDs, roomID) } @@ -610,8 +614,11 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu if roomIDToState == nil { // e.g no required_state roomIDToState = make(map[string][]json.RawMessage) } + + // 3. Build sync3.Room structs to return to clients. + rooms := make(map[string]sync3.Room, len(roomIDs)) for _, roomID := range roomIDs { - userRoomData, ok := roomIDToUserRoomData[roomID] + userRoomData, ok := userRoomDatas[roomID] if !ok { userRoomData = caches.NewUserRoomData() } @@ -677,7 +684,7 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu IsDM: userRoomData.IsDM, JoinedCount: metadata.JoinCount, InvitedCount: &metadata.InviteCount, - PrevBatch: userRoomData.RequestedLatestEvents.PrevBatch, + PrevBatch: timelines[roomID].PrevBatch, Timestamp: maxTs, } if roomSub.IncludeHeroes() && calculated { @@ -686,11 +693,6 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu rooms[roomID] = room } - if rsm.IsLazyLoading() { - for roomID, userIDs := range roomToUsersInTimeline { - s.lazyCache.Add(roomID, userIDs...) - } - } return rooms } diff --git a/sync3/handler/connstate_test.go b/sync3/handler/connstate_test.go index edd98ad3..d1c8ffdd 100644 --- a/sync3/handler/connstate_test.go +++ b/sync3/handler/connstate_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/matrix-org/sliding-sync/state" "reflect" "testing" "time" @@ -44,12 +45,12 @@ func newRoomMetadata(roomID string, lastMsgTimestamp spec.Timestamp) internal.Ro return *m } -func mockLazyRoomOverride(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]caches.UserRoomData { - result := make(map[string]caches.UserRoomData) +func mockLazyRoomOverride(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents { + result := make(map[string]state.LatestEvents) for _, roomID := range roomIDs { - u := caches.NewUserRoomData() - u.RequestedLatestEvents.Timeline = []json.RawMessage{[]byte(`{}`)} - result[roomID] = u + result[roomID] = state.LatestEvents{ + Timeline: []json.RawMessage{[]byte(`{}`)}, + } } return result } @@ -98,12 +99,12 @@ func TestConnStateInitial(t *testing.T) { userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{}) dispatcher.Register(context.Background(), userCache.UserID, userCache) dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache) - userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]caches.UserRoomData { - result := make(map[string]caches.UserRoomData) + userCache.LazyLoadTimelinesOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents { + result := make(map[string]state.LatestEvents) for _, roomID := range roomIDs { - u := caches.NewUserRoomData() - u.RequestedLatestEvents.Timeline = []json.RawMessage{timeline[roomID]} - result[roomID] = u + result[roomID] = state.LatestEvents{ + Timeline: []json.RawMessage{timeline[roomID]}, + } } return result } @@ -269,7 +270,7 @@ func TestConnStateMultipleRanges(t *testing.T) { return 1, roomMetadata, joinTimings, nil, nil } userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{}) - userCache.LazyRoomDataOverride = mockLazyRoomOverride + userCache.LazyLoadTimelinesOverride = mockLazyRoomOverride dispatcher.Register(context.Background(), userCache.UserID, userCache) dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache) cs := NewConnState(userID, deviceID, userCache, globalCache, &NopExtensionHandler{}, &NopJoinTracker{}, nil, nil, 1000, 0) @@ -448,7 +449,7 @@ func TestBumpToOutsideRange(t *testing.T) { } userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{}) - userCache.LazyRoomDataOverride = mockLazyRoomOverride + userCache.LazyLoadTimelinesOverride = mockLazyRoomOverride dispatcher.Register(context.Background(), userCache.UserID, userCache) dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache) cs := NewConnState(userID, deviceID, userCache, globalCache, &NopExtensionHandler{}, &NopJoinTracker{}, nil, nil, 1000, 0) @@ -551,12 +552,12 @@ func TestConnStateRoomSubscriptions(t *testing.T) { }, nil, nil } userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{}) - userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]caches.UserRoomData { - result := make(map[string]caches.UserRoomData) + userCache.LazyLoadTimelinesOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents { + result := make(map[string]state.LatestEvents) for _, roomID := range roomIDs { - u := caches.NewUserRoomData() - u.RequestedLatestEvents.Timeline = []json.RawMessage{timeline[roomID]} - result[roomID] = u + result[roomID] = state.LatestEvents{ + Timeline: []json.RawMessage{timeline[roomID]}, + } } return result }