Skip to content

Commit

Permalink
Merge branch 'main' into dmr/resnapshot-2
Browse files Browse the repository at this point in the history
  • Loading branch information
David Robertson authored Oct 4, 2023
2 parents a46eedd + 4d8cbb2 commit 9055551
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 72 deletions.
8 changes: 8 additions & 0 deletions sync2/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,14 @@ func (p *poller) poll(ctx context.Context, s *pollLoopState) error {
p.totalNumPolls.Inc()
}
if s.failCount > 0 {
if s.failCount > 1000 {
// 3s * 1000 = 3000s = 50 minutes
errMsg := "poller: access token has failed >1000 times to /sync, terminating loop"
p.logger.Warn().Msg(errMsg)
p.receiver.OnExpiredToken(ctx, hashToken(p.accessToken), p.userID, p.deviceID)
p.Terminate()
return fmt.Errorf(errMsg)
}
// don't backoff when doing v2 syncs because the response is only in the cache for a short
// period of time (on massive accounts on matrix.org) such that if you wait 2,4,8min between
// requests it might force the server to do the work all over again :(
Expand Down
36 changes: 36 additions & 0 deletions sync2/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,42 @@ func mustEqualSince(t *testing.T, gotSince, expectedSince string) {
}
}

func TestPollerGivesUpEventually(t *testing.T) {
deviceID := "FOOBAR"
hasPolledSuccessfully := make(chan struct{})
accumulator, client := newMocks(func(authHeader, since string) (*SyncResponse, int, error) {
return nil, 524, fmt.Errorf("gateway timeout")
})
timeSleep = func(d time.Duration) {
// actually sleep to make sure async actions can happen if any
time.Sleep(1 * time.Microsecond)
}
defer func() { // reset the value after the test runs
timeSleep = time.Sleep
}()
var wg sync.WaitGroup
wg.Add(1)
poller := newPoller(PollerID{UserID: "@alice:localhost", DeviceID: deviceID}, "Authorization: hello world", client, accumulator, zerolog.New(os.Stderr), false)
go func() {
defer wg.Done()
poller.Poll("")
}()
go func() {
poller.WaitUntilInitialSync()
close(hasPolledSuccessfully)
}()
wg.Wait()
select {
case <-hasPolledSuccessfully:
case <-time.After(100 * time.Millisecond):
break
}
// poller should be in the terminated state
if !poller.terminated.Load() {
t.Errorf("poller was not terminated")
}
}

// Tests that the poller backs off in 2,4,8,etc second increments to a variety of errors
func TestPollerBackoff(t *testing.T) {
deviceID := "FOOBAR"
Expand Down
64 changes: 35 additions & 29 deletions sync3/caches/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ type UserRoomData struct {
HighlightCount int
Invite *InviteData

// this field is set by LazyLoadTimelines and is per-function call, and is not persisted in-memory.
// The zero value of this safe to use (0 latest nid, no prev batch, no timeline).
RequestedLatestEvents state.LatestEvents

// TODO: should CanonicalisedName really be in RoomConMetadata? It's only set in SetRoom AFAICS
CanonicalisedName string // stripped leading symbols like #, all in lower case
// Set of spaces this room is a part of, from the perspective of this user. This is NOT global room data
Expand Down Expand Up @@ -182,18 +178,18 @@ type UserCacheListener interface {
// Tracks data specific to a given user. Specifically, this is the map of room ID to UserRoomData.
// This data is user-scoped, not global or connection scoped.
type UserCache struct {
LazyRoomDataOverride func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData
UserID string
roomToData map[string]UserRoomData
roomToDataMu *sync.RWMutex
listeners map[int]UserCacheListener
listenersMu *sync.RWMutex
id int
store *state.Storage
globalCache *GlobalCache
txnIDs TransactionIDFetcher
ignoredUsers map[string]struct{}
ignoredUsersMu *sync.RWMutex
LazyLoadTimelinesOverride func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents
UserID string
roomToData map[string]UserRoomData
roomToDataMu *sync.RWMutex
listeners map[int]UserCacheListener
listenersMu *sync.RWMutex
id int
store *state.Storage
globalCache *GlobalCache
txnIDs TransactionIDFetcher
ignoredUsers map[string]struct{}
ignoredUsersMu *sync.RWMutex
}

func NewUserCache(userID string, globalCache *GlobalCache, store *state.Storage, txnIDs TransactionIDFetcher) *UserCache {
Expand Down Expand Up @@ -309,34 +305,29 @@ func (c *UserCache) OnRegistered(ctx context.Context) error {
return nil
}

// Load timelines from the database. Uses cached UserRoomData for metadata purposes only.
func (c *UserCache) LazyLoadTimelines(ctx context.Context, loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]UserRoomData {
// LazyLoadTimelines loads up to `maxTimelineEvents` from the database, plus other
// timeline-related data. Events from senders ignored by this user are dropped.
// Returns nil on error.
func (c *UserCache) LazyLoadTimelines(ctx context.Context, loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents {
_, span := internal.StartSpan(ctx, "LazyLoadTimelines")
defer span.End()
if c.LazyRoomDataOverride != nil {
return c.LazyRoomDataOverride(loadPos, roomIDs, maxTimelineEvents)
if c.LazyLoadTimelinesOverride != nil {
return c.LazyLoadTimelinesOverride(loadPos, roomIDs, maxTimelineEvents)
}
result := make(map[string]UserRoomData)
result := make(map[string]state.LatestEvents)
roomIDToLatestEvents, err := c.store.LatestEventsInRooms(c.UserID, roomIDs, loadPos, maxTimelineEvents)
if err != nil {
logger.Err(err).Strs("rooms", roomIDs).Msg("failed to get LatestEventsInRooms")
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
return nil
}
c.roomToDataMu.Lock()
for _, requestedRoomID := range roomIDs {
latestEvents := roomIDToLatestEvents[requestedRoomID]
urd, ok := c.roomToData[requestedRoomID]
if !ok {
urd = NewUserRoomData()
}
if latestEvents != nil {
latestEvents.DiscardIgnoredMessages(c.ShouldIgnore)
urd.RequestedLatestEvents = *latestEvents
result[requestedRoomID] = *latestEvents
}
result[requestedRoomID] = urd
}
c.roomToDataMu.Unlock()
return result
}

Expand All @@ -350,6 +341,21 @@ func (c *UserCache) LoadRoomData(roomID string) UserRoomData {
return data
}

// LoadRooms is a batch version of LoadRoomData. Returns a map keyed by roomID.
func (c *UserCache) LoadRooms(roomIDs ...string) map[string]UserRoomData {
result := make(map[string]UserRoomData, len(roomIDs))
c.roomToDataMu.RLock()
defer c.roomToDataMu.RUnlock()
for _, roomID := range roomIDs {
data, ok := c.roomToData[roomID]
if !ok {
data = NewUserRoomData()
}
result[roomID] = data
}
return result
}

type roomUpdateCache struct {
roomID string
// globalRoomData is a snapshot of the global metadata for this room immediately
Expand Down
54 changes: 28 additions & 26 deletions sync3/handler/connstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,50 +554,54 @@ func (s *ConnState) lazyLoadTypingMembers(ctx context.Context, response *sync3.R
func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSubscription, bumpEventTypes []string, roomIDs ...string) map[string]sync3.Room {
ctx, span := internal.StartSpan(ctx, "getInitialRoomData")
defer span.End()
rooms := make(map[string]sync3.Room, len(roomIDs))

// 0. Load room metadata and timelines.
// We want to grab the user room data and the room metadata for each room ID. We use the globally
// highest NID we've seen to act as an anchor for the request. This anchor does not guarantee that
// events returned here have already been seen - the position is not globally ordered - so because
// room A has a position of 6 and B has 7 (so the highest is 7) does not mean that this connection
// has seen 6, as concurrent room updates cause A and B to race. This is why we then go through the
// response to this call to assign new load positions for each room.
roomIDToUserRoomData := s.userCache.LazyLoadTimelines(ctx, s.anchorLoadPosition, roomIDs, int(roomSub.TimelineLimit))
roomMetadatas := s.globalCache.LoadRooms(ctx, roomIDs...)
// prepare lazy loading data structures, txn IDs
roomToUsersInTimeline := make(map[string][]string, len(roomIDToUserRoomData))
userRoomDatas := s.userCache.LoadRooms(roomIDs...)
timelines := s.userCache.LazyLoadTimelines(ctx, s.anchorLoadPosition, roomIDs, int(roomSub.TimelineLimit))

// 1. Prepare lazy loading data structures, txn IDs.
roomToUsersInTimeline := make(map[string][]string, len(timelines))
roomToTimeline := make(map[string][]json.RawMessage)
for roomID, urd := range roomIDToUserRoomData {
set := make(map[string]struct{})
for _, ev := range urd.RequestedLatestEvents.Timeline {
set[gjson.GetBytes(ev, "sender").Str] = struct{}{}
}
userIDs := make([]string, len(set))
i := 0
for userID := range set {
userIDs[i] = userID
i++
}
roomToUsersInTimeline[roomID] = userIDs
roomToTimeline[roomID] = urd.RequestedLatestEvents.Timeline
for roomID, latestEvents := range timelines {
senders := make(map[string]struct{})
for _, ev := range latestEvents.Timeline {
senders[gjson.GetBytes(ev, "sender").Str] = struct{}{}
}
roomToUsersInTimeline[roomID] = keys(senders)

Check failure on line 577 in sync3/handler/connstate.go

View workflow job for this annotation

GitHub Actions / end_to_end (1)

undefined: keys

Check failure on line 577 in sync3/handler/connstate.go

View workflow job for this annotation

GitHub Actions / upgrade-test

undefined: keys

Check failure on line 577 in sync3/handler/connstate.go

View workflow job for this annotation

GitHub Actions / integration

undefined: keys
roomToTimeline[roomID] = latestEvents.Timeline
// remember what we just loaded so if we see these events down the live stream we know to ignore them.
// This means that requesting a direct room subscription causes the connection to jump ahead to whatever
// is in the database at the time of the call, rather than gradually converging by consuming live data.
// This is fine, so long as we jump ahead on a per-room basis. We need to make sure (ideally) that the
// room state is also pinned to the load position here, else you could see weird things in individual
// responses such as an updated room.name without the associated m.room.name event (though this will
// come through on the next request -> it converges to the right state so it isn't critical).
s.loadPositions[roomID] = urd.RequestedLatestEvents.LatestNID
s.loadPositions[roomID] = latestEvents.LatestNID
}
roomToTimeline = s.userCache.AnnotateWithTransactionIDs(ctx, s.userID, s.deviceID, roomToTimeline)

// 2. Load required state events.
rsm := roomSub.RequiredStateMap(s.userID)
if rsm.IsLazyLoading() {
for roomID, userIDs := range roomToUsersInTimeline {
s.lazyCache.Add(roomID, userIDs...)
}
}

internal.Logf(ctx, "connstate", "getInitialRoomData for %d rooms, RequiredStateMap: %#v", len(roomIDs), rsm)

// Filter out rooms we are only invited to, as we don't need to fetch the state
// since we'll be using the invite_state only.
loadRoomIDs := make([]string, 0, len(roomIDs))
for _, roomID := range roomIDs {
userRoomData, ok := roomIDToUserRoomData[roomID]
userRoomData, ok := userRoomDatas[roomID]
if !ok || !userRoomData.IsInvite {
loadRoomIDs = append(loadRoomIDs, roomID)
}
Expand All @@ -610,8 +614,11 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu
if roomIDToState == nil { // e.g no required_state
roomIDToState = make(map[string][]json.RawMessage)
}

// 3. Build sync3.Room structs to return to clients.
rooms := make(map[string]sync3.Room, len(roomIDs))
for _, roomID := range roomIDs {
userRoomData, ok := roomIDToUserRoomData[roomID]
userRoomData, ok := userRoomDatas[roomID]
if !ok {
userRoomData = caches.NewUserRoomData()
}
Expand Down Expand Up @@ -677,7 +684,7 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu
IsDM: userRoomData.IsDM,
JoinedCount: metadata.JoinCount,
InvitedCount: &metadata.InviteCount,
PrevBatch: userRoomData.RequestedLatestEvents.PrevBatch,
PrevBatch: timelines[roomID].PrevBatch,
Timestamp: maxTs,
}
if roomSub.IncludeHeroes() && calculated {
Expand All @@ -686,11 +693,6 @@ func (s *ConnState) getInitialRoomData(ctx context.Context, roomSub sync3.RoomSu
rooms[roomID] = room
}

if rsm.IsLazyLoading() {
for roomID, userIDs := range roomToUsersInTimeline {
s.lazyCache.Add(roomID, userIDs...)
}
}
return rooms
}

Expand Down
35 changes: 18 additions & 17 deletions sync3/handler/connstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/matrix-org/sliding-sync/state"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -44,12 +45,12 @@ func newRoomMetadata(roomID string, lastMsgTimestamp spec.Timestamp) internal.Ro
return *m
}

func mockLazyRoomOverride(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]caches.UserRoomData {
result := make(map[string]caches.UserRoomData)
func mockLazyRoomOverride(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents {
result := make(map[string]state.LatestEvents)
for _, roomID := range roomIDs {
u := caches.NewUserRoomData()
u.RequestedLatestEvents.Timeline = []json.RawMessage{[]byte(`{}`)}
result[roomID] = u
result[roomID] = state.LatestEvents{
Timeline: []json.RawMessage{[]byte(`{}`)},
}
}
return result
}
Expand Down Expand Up @@ -98,12 +99,12 @@ func TestConnStateInitial(t *testing.T) {
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
dispatcher.Register(context.Background(), userCache.UserID, userCache)
dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache)
userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]caches.UserRoomData {
result := make(map[string]caches.UserRoomData)
userCache.LazyLoadTimelinesOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents {
result := make(map[string]state.LatestEvents)
for _, roomID := range roomIDs {
u := caches.NewUserRoomData()
u.RequestedLatestEvents.Timeline = []json.RawMessage{timeline[roomID]}
result[roomID] = u
result[roomID] = state.LatestEvents{
Timeline: []json.RawMessage{timeline[roomID]},
}
}
return result
}
Expand Down Expand Up @@ -269,7 +270,7 @@ func TestConnStateMultipleRanges(t *testing.T) {
return 1, roomMetadata, joinTimings, nil, nil
}
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
userCache.LazyRoomDataOverride = mockLazyRoomOverride
userCache.LazyLoadTimelinesOverride = mockLazyRoomOverride
dispatcher.Register(context.Background(), userCache.UserID, userCache)
dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache)
cs := NewConnState(userID, deviceID, userCache, globalCache, &NopExtensionHandler{}, &NopJoinTracker{}, nil, nil, 1000, 0)
Expand Down Expand Up @@ -448,7 +449,7 @@ func TestBumpToOutsideRange(t *testing.T) {

}
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
userCache.LazyRoomDataOverride = mockLazyRoomOverride
userCache.LazyLoadTimelinesOverride = mockLazyRoomOverride
dispatcher.Register(context.Background(), userCache.UserID, userCache)
dispatcher.Register(context.Background(), sync3.DispatcherAllUsers, globalCache)
cs := NewConnState(userID, deviceID, userCache, globalCache, &NopExtensionHandler{}, &NopJoinTracker{}, nil, nil, 1000, 0)
Expand Down Expand Up @@ -551,12 +552,12 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
}, nil, nil
}
userCache := caches.NewUserCache(userID, globalCache, nil, &NopTransactionFetcher{})
userCache.LazyRoomDataOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]caches.UserRoomData {
result := make(map[string]caches.UserRoomData)
userCache.LazyLoadTimelinesOverride = func(loadPos int64, roomIDs []string, maxTimelineEvents int) map[string]state.LatestEvents {
result := make(map[string]state.LatestEvents)
for _, roomID := range roomIDs {
u := caches.NewUserRoomData()
u.RequestedLatestEvents.Timeline = []json.RawMessage{timeline[roomID]}
result[roomID] = u
result[roomID] = state.LatestEvents{
Timeline: []json.RawMessage{timeline[roomID]},
}
}
return result
}
Expand Down

0 comments on commit 9055551

Please sign in to comment.