Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle gappy state blocks in a performant way #329

Closed
wants to merge 62 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
d4eace7
Tweak existing E2E test
Sep 21, 2023
1bd4f12
Write integration test too
Sep 21, 2023
44ef0c3
move keys() helper to internal
Oct 4, 2023
c1b41f4
New Storage functions
Oct 4, 2023
638554c
Handle state in Initialise after first snapshot
Sep 21, 2023
2f6a5cf
Emit cache invalidation for a replaced snapshot
Sep 22, 2023
d08f82a
Reminder for me in the future
Oct 3, 2023
32479e2
Move NewEventData to free function
Oct 4, 2023
db7e94d
Make UserCaches respond to gappy memberships
Sep 27, 2023
c2f99a6
Fix stupid syntax
Oct 10, 2023
19859fe
Fixes to the test
Oct 10, 2023
4cc3c31
Add comments for our future selves
Oct 10, 2023
6e03c2d
Select less data
Oct 10, 2023
899000c
Fix stupid bug in internal.Keys
Oct 10, 2023
08d1034
Merge remote-tracking branch 'origin/main' into dmr/resnapshot-2
Oct 11, 2023
a1ded0c
Update tests. prevBatches are broken
Oct 12, 2023
abeb273
Add nil guards to loops
Oct 12, 2023
ac17cb4
Merge branch 'main' into dmr/resnapshot-2
kegsay Oct 17, 2023
c3dfd66
Change the test to have a timeline with 10 msgs
Oct 13, 2023
ae9777d
ConnState: only call load once
Oct 13, 2023
4b3a617
WIP big-ass table test
Oct 17, 2023
5a661f7
Merge remote-tracking branch 'origin/main' into dmr/resnapshot-2
Oct 17, 2023
d82be3a
Fixup TestConnStateInitial
Oct 17, 2023
10b38ab
Fix a segfault when reloading the JRT
Oct 17, 2023
0e3b294
Queue up the post-gap membership
Oct 17, 2023
cada224
WIP
Oct 18, 2023
e2761be
Matchers
Oct 18, 2023
66dc048
Fix missing locks in v3 test
Oct 18, 2023
7205340
WIP snapshot
Oct 18, 2023
1da56ae
MatchRoomSubscription: report all errors
Oct 18, 2023
193e45d
Improve comments
Oct 18, 2023
0e21262
More matcher improvements
Oct 18, 2023
ea8506f
Improve tests again
Oct 18, 2023
51fc05c
Remove redundant bit
Oct 18, 2023
ec95409
Add in the required state check again
Oct 18, 2023
30f2d2d
Add in the sentinel check
Oct 18, 2023
c2045bf
Minimised test case
Oct 19, 2023
8c88f76
Merge remote-tracking branch 'origin/main' into dmr/resnapshot-2
Oct 19, 2023
b12c672
Remove unused chris and chrisToken
Oct 19, 2023
540006e
Additional comment
Oct 19, 2023
a54de00
Don't change Alice's room subscription.
Oct 25, 2023
4230323
Merge branch 'main' into dmr/resnapshot-2
Oct 31, 2023
c872511
Implement latest plan
Oct 31, 2023
4abc811
Track no. conns nuked due to room invalidation
Oct 31, 2023
def9e28
TODO note
Oct 31, 2023
3b3f092
Re-track gappy state block sizes
Oct 31, 2023
1f31550
Test rework part 1 of N
Nov 1, 2023
530ef78
2 of N
Nov 1, 2023
94f6518
3 of N
Nov 1, 2023
110bb81
4 of N
Nov 1, 2023
faf8caa
Extra assertion of N
Nov 1, 2023
3f2644e
6 of N
Nov 1, 2023
4d40f41
Minor formatting
Nov 1, 2023
5b0c8e3
Update existing integration tests
Nov 1, 2023
ebe2641
Merge remote-tracking branch 'origin/main' into dmr/resnapshot-2
Nov 2, 2023
9a40fb7
Fix test typo
Nov 2, 2023
8ba3661
Also test that bob sees prev batch
Nov 2, 2023
b72d682
Simplify FetchMembership
Nov 2, 2023
518f1a6
Merge branch 'main' into dmr/resnapshot-2
Nov 2, 2023
eaeff79
Merge remote-tracking branch 'origin/main' into dmr/resnapshot-2
Nov 2, 2023
fe7c5a2
Fix bad merge
Nov 3, 2023
b8e34a7
Remove uninsteresting gofmt change
Nov 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 175 additions & 96 deletions state/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,69 +141,62 @@ type InitialiseResult struct {
// AddedEvents is true iff this call to Initialise added new state events to the DB.
AddedEvents bool
// SnapshotID is the ID of the snapshot which incorporates all added events.
// It has no meaning if AddedEvents is False.
// It has no meaning if AddedEvents is false.
SnapshotID int64
// PrependTimelineEvents is empty if the room was not initialised prior to this call.
// Otherwise, it is an order-preserving subset of the `state` argument to Initialise
// containing all events that were not persisted prior to the Initialise call. These
// should be prepended to the room timeline by the caller.
PrependTimelineEvents []json.RawMessage
// ReplacedExistingSnapshot is true when we created a new snapshot for the room and
// there a pre-existing room snapshot. It has no meaning if AddedEvents is false.
ReplacedExistingSnapshot bool
}

// Initialise starts a new sync accumulator for the given room using the given state as a baseline.
// Initialise processes the state block of a V2 sync response for a particular room. If
// the state of the room has changed, we persist any new state events and create a new
// "snapshot" of its entire state.
//
// This will only take effect if this is the first time the v3 server has seen this room, and it wasn't
// possible to get all events up to the create event (e.g Matrix HQ).
// This function:
// - Stores these events
// - Sets up the current snapshot based on the state list given.
// Summary of the logic:
//
// If the v3 server has seen this room before, this function
// - queries the DB to determine which state events are known to th server,
// - returns (via InitialiseResult.PrependTimelineEvents) a slice of unknown state events,
// 0. Ensure the state block is not empty.
//
// and otherwise does nothing.
// 1. Capture the current snapshot ID, possibly zero. If it is zero, ensure that the
// state block contains a `create event`.
//
// 2. Insert the events. If there are no newly inserted events, bail. If there are new
// events, then the state block has definitely changed. Note: we ignore cases where
// the state has only changed to a known subset of state events (i.e in the case of
// state resets, slow pollers) as it is impossible to then reconcile that state with
// any new events, as any "catchup" state will be ignored due to the events already
// existing.
//
// 3. Fetch the current state of the room, as a map from (type, state_key) to event.
// If there is no existing state snapshot, this map is the empty map.
// If the state hasn't altered, bail.
//
// 4. Create new snapshot. Update the map from (3) with the events in `state`.
// (There is similar logic for this in Accumulate.)
// Store the snapshot. Mark the room's current state as being this snapshot.
//
// 5. Any other processing of the new state events.
//
// 6. Return an "AddedEvents" bool (if true, emit an Initialise payload) and a
// "ReplacedSnapshot" bool (if true, emit a cache invalidation payload).

func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (InitialiseResult, error) {
var res InitialiseResult
var startingSnapshotID int64

// 0. Ensure the state block is not empty.
if len(state) == 0 {
return res, nil
}
err := sqlutil.WithTransaction(a.db, func(txn *sqlx.Tx) error {
err := sqlutil.WithTransaction(a.db, func(txn *sqlx.Tx) (err error) {
// 1. Capture the current snapshot ID, checking for a create event if this is our first snapshot.

// Attempt to short-circuit. This has to be done inside a transaction to make sure
// we don't race with multiple calls to Initialise with the same room ID.
snapshotID, err := a.roomsTable.CurrentAfterSnapshotID(txn, roomID)
startingSnapshotID, err = a.roomsTable.CurrentAfterSnapshotID(txn, roomID)
if err != nil {
return fmt.Errorf("error fetching snapshot id for room %s: %s", roomID, err)
}
if snapshotID > 0 {
// Poller A has received a gappy sync v2 response with a state block, and
// we have seen this room before. If we knew for certain that there is some
// other active poller B in this room then we could safely skip this logic.

// Log at debug for now. If we find an unknown event, we'll return it so
// that the poller can log a warning.
logger.Debug().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called with incremental state but current snapshot already exists.")
eventIDs := make([]string, len(state))
eventIDToRawEvent := make(map[string]json.RawMessage, len(state))
for i := range state {
eventID := gjson.ParseBytes(state[i]).Get("event_id")
if !eventID.Exists() || eventID.Type != gjson.String {
return fmt.Errorf("Event %d lacks an event ID", i)
}
eventIDToRawEvent[eventID.Str] = state[i]
eventIDs[i] = eventID.Str
}
unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs)
if err != nil {
return fmt.Errorf("error determing which event IDs are unknown: %s", err)
}
for unknownEventID := range unknownEventIDs {
res.PrependTimelineEvents = append(res.PrependTimelineEvents, eventIDToRawEvent[unknownEventID])
}
return nil
return fmt.Errorf("error fetching snapshot id for room %s: %w", roomID, err)
}

// We don't have a snapshot for this room. Parse the events first.
// Start by parsing the events in the state block.
events := make([]Event, len(state))
for i := range events {
events[i] = Event{
Expand All @@ -214,77 +207,76 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia
}
events = filterAndEnsureFieldsSet(events)
if len(events) == 0 {
return fmt.Errorf("failed to insert events, all events were filtered out: %w", err)
return fmt.Errorf("failed to parse state block, all events were filtered out: %w", err)
}

// Before proceeding further, ensure that we have "proper" state and not just a
// single stray event by looking for the create event.
hasCreate := false
for _, e := range events {
if e.Type == "m.room.create" && e.StateKey == "" {
hasCreate = true
break
if startingSnapshotID == 0 {
// Ensure that we have "proper" state and not "stray" events from Synapse.
if err = ensureStateHasCreateEvent(events); err != nil {
return err
}
}
if !hasCreate {
const errMsg = "cannot create first snapshot without a create event"
sentry.WithScope(func(scope *sentry.Scope) {
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
"room_id": roomID,
"len_state": len(events),
})
sentry.CaptureMessage(errMsg)
})
logger.Warn().
Str("room_id", roomID).
Int("len_state", len(events)).
Msg(errMsg)
// the HS gave us bad data so there's no point retrying => return DataError
return internal.NewDataError(errMsg)
}

// Insert the events.
eventIDToNID, err := a.eventsTable.Insert(txn, events, false)
// 2. Insert the events and determine which ones are new.
newEventIDToNID, err := a.eventsTable.Insert(txn, events, false)
if err != nil {
return fmt.Errorf("failed to insert events: %w", err)
}
if len(eventIDToNID) == 0 {
// we don't have a current snapshot for this room but yet no events are new,
// no idea how this should be handled.
const errMsg = "Accumulator.Initialise: room has no current snapshot but also no new inserted events, doing nothing. This is probably a bug."
logger.Error().Str("room_id", roomID).Msg(errMsg)
sentry.CaptureException(fmt.Errorf(errMsg))
if len(newEventIDToNID) == 0 {
if startingSnapshotID == 0 {
// we don't have a current snapshot for this room but yet no events are new,
// no idea how this should be handled.
const errMsg = "Accumulator.Initialise: room has no current snapshot but also no new inserted events, doing nothing. This is probably a bug."
logger.Error().Str("room_id", roomID).Msg(errMsg)
sentry.CaptureException(fmt.Errorf(errMsg))
}
// Note: we otherwise ignore cases where the state has only changed to a
// known subset of state events (i.e in the case of state resets, slow
// pollers) as it is impossible to then reconcile that state with
// any new events, as any "catchup" state will be ignored due to the events
// already existing.
return nil
}

// pull out the event NIDs we just inserted
membershipEventIDs := make(map[string]struct{}, len(events))
newEvents := make([]Event, 0, len(newEventIDToNID))
for _, event := range events {
if event.Type == "m.room.member" {
membershipEventIDs[event.ID] = struct{}{}
newNid, isNew := newEventIDToNID[event.ID]
if isNew {
event.NID = newNid
newEvents = append(newEvents, event)
}
}
memberNIDs := make([]int64, 0, len(eventIDToNID))
otherNIDs := make([]int64, 0, len(eventIDToNID))
for evID, nid := range eventIDToNID {
if _, exists := membershipEventIDs[evID]; exists {
memberNIDs = append(memberNIDs, int64(nid))
} else {
otherNIDs = append(otherNIDs, int64(nid))

// 3. Fetch the current state of the room.
var currentState stateMap
if startingSnapshotID > 0 {
currentState, err = a.stateMapAtSnapshot(txn, startingSnapshotID)
if err != nil {
return fmt.Errorf("failed to load state map: %w", err)
}
} else {
currentState = stateMap{
Memberships: make(map[string]int64, len(events)),
Other: make(map[[2]string]int64, len(events)),
}
}

// Make a current snapshot
// 4. Update the map from (3) with the new events to create a new snapshot.
for _, ev := range newEvents {
currentState.Ingest(ev)
}
memberNIDs, otherNIDs := currentState.NIDs()
snapshot := &SnapshotRow{
RoomID: roomID,
MembershipEvents: pq.Int64Array(memberNIDs),
OtherEvents: pq.Int64Array(otherNIDs),
MembershipEvents: memberNIDs,
OtherEvents: otherNIDs,
}
err = a.snapshotTable.Insert(txn, snapshot)
if err != nil {
return fmt.Errorf("failed to insert snapshot: %w", err)
}
res.AddedEvents = true

// 5. Any other processing of new state events.
latestNID := int64(0)
for _, nid := range otherNIDs {
if nid > latestNID {
Expand Down Expand Up @@ -313,8 +305,16 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia
// will have an associated state snapshot ID on the event.

// Set the snapshot ID as the current state
err = a.roomsTable.Upsert(txn, info, snapshot.SnapshotID, latestNID)
if err != nil {
return err
}

// 6. Tell the caller what happened, so they know what payloads to emit.
res.SnapshotID = snapshot.SnapshotID
return a.roomsTable.Upsert(txn, info, snapshot.SnapshotID, latestNID)
res.AddedEvents = true
res.ReplacedExistingSnapshot = startingSnapshotID > 0
return nil
})
return res, err
}
Expand Down Expand Up @@ -652,3 +652,82 @@ func (a *Accumulator) filterToNewTimelineEvents(txn *sqlx.Tx, dedupedEvents []Ev
// A is seen event s[A,B,C] => s[0+1:] => [B,C]
return dedupedEvents[seenIndex+1:], nil
}

func ensureStateHasCreateEvent(events []Event) error {
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
hasCreate := false
for _, e := range events {
if e.Type == "m.room.create" && e.StateKey == "" {
hasCreate = true
break
}
}
if !hasCreate {
const errMsg = "cannot create first snapshot without a create event"
sentry.WithScope(func(scope *sentry.Scope) {
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
"room_id": events[0].RoomID,
"len_state": len(events),
})
sentry.CaptureMessage(errMsg)
})
logger.Warn().
Str("room_id", events[0].RoomID).
Int("len_state", len(events)).
Msg(errMsg)
// the HS gave us bad data so there's no point retrying => return DataError
return internal.NewDataError(errMsg)
}
return nil
}

type stateMap struct {
// state_key (user id) -> NID
Memberships map[string]int64
// type, state_key -> NID
Other map[[2]string]int64
}

func (s *stateMap) Ingest(e Event) (replacedNID int64) {
if e.Type == "m.room.member" {
replacedNID = s.Memberships[e.StateKey]
s.Memberships[e.StateKey] = e.NID
} else {
key := [2]string{e.Type, e.StateKey}
replacedNID = s.Other[key]
s.Other[key] = e.NID
}
return
}

func (s *stateMap) NIDs() (membershipNIDs, otherNIDs []int64) {
membershipNIDs = make([]int64, 0, len(s.Memberships))
otherNIDs = make([]int64, 0, len(s.Other))
for _, nid := range s.Memberships {
membershipNIDs = append(membershipNIDs, nid)
}
for _, nid := range s.Other {
otherNIDs = append(otherNIDs, nid)
}
return
}

func (a *Accumulator) stateMapAtSnapshot(txn *sqlx.Tx, snapID int64) (stateMap, error) {
snapshot, err := a.snapshotTable.Select(txn, snapID)
if err != nil {
return stateMap{}, err
}
// pull stripped events as this may be huge (think Matrix HQ)
events, err := a.eventsTable.SelectStrippedEventsByNIDs(txn, true, append(snapshot.MembershipEvents, snapshot.OtherEvents...))
if err != nil {
return stateMap{}, err
}

state := stateMap{
Memberships: make(map[string]int64, len(snapshot.MembershipEvents)),
Other: make(map[[2]string]int64, len(snapshot.OtherEvents)),
}
for _, e := range events {
state.Ingest(e)
}
return state, nil
}
Loading
Loading