Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialise: make snapshots instead of prepending state events #310

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 146 additions & 87 deletions state/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,33 @@ type InitialiseResult struct {
PrependTimelineEvents []json.RawMessage
}

// Initialise starts a new sync accumulator for the given room using the given state as a baseline.
// Initialise processes the state block of a V2 sync response for a particular room. If
// the state of the room has changed, we persist any new state events and create a new
// "snapshot" of its entire state.
//
// This will only take effect if this is the first time the v3 server has seen this room, and it wasn't
// possible to get all events up to the create event (e.g Matrix HQ).
// This function:
// - Stores these events
// - Sets up the current snapshot based on the state list given.
// Summary of the logic:
//
// If the v3 server has seen this room before, this function
// - queries the DB to determine which state events are known to th server,
// - returns (via InitialiseResult.PrependTimelineEvents) a slice of unknown state events,
// 0. Ensure the state block is not empty.
//
// and otherwise does nothing.
// 1. Capture the current snapshot ID, possibly zero. If it is zero, ensure that the
// state block contains a `create event`.
//
// 2. Insert the events and determine if the state block alters the current state of
// the room from the proxy's POV. This is the case iff some event in the state block
// is unknown to the proxy. (We choose to ignore the possibility that a poller is
// slow and gives us e.g. a subset of current room state.)
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
//
// 2. Fetch the current state of the room, as a map from (type, state_key) to event.
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
// If there is no existing state snapshot, this map is the empty map.
//
// If the state hasn't altered, bail.
//
// 4. Create new snapshot.
// Update the map from (2) with the events in in `state`. (There must be similar logic already in A ccumulate for this?)
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
// Store the snapshot. Mark the room's current state as being this snapshot.
//
// 5. If the starting snapshot ID was not zero, emit a cache invalidation payload.
Copy link
Contributor Author

@DMRobertson DMRobertson Sep 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seem to have forgotten to do this... but the room name changes are seen in the tests. Does that mean the cache invalidation payload isn't needed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those tests are only testing cache invalidation via redaction, not cache invalidation via joining a room after a long period of no one being in the room. The latter case is what we're trying to catch here.

Copy link
Contributor Author

@DMRobertson DMRobertson Sep 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow. I specifically mean TestGappyState and TestGappyStateDoesNotAccumulateTheStateBlock, neither of which emit a redaction event.


func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (InitialiseResult, error) {
var res InitialiseResult
if len(state) == 0 {
Expand All @@ -172,36 +186,8 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia
// we don't race with multiple calls to Initialise with the same room ID.
snapshotID, err := a.roomsTable.CurrentAfterSnapshotID(txn, roomID)
if err != nil {
return fmt.Errorf("error fetching snapshot id for room %s: %s", roomID, err)
return fmt.Errorf("error fetching snapshot id for room %s: %w", roomID, err)
}
if snapshotID > 0 {
// Poller A has received a gappy sync v2 response with a state block, and
// we have seen this room before. If we knew for certain that there is some
// other active poller B in this room then we could safely skip this logic.

// Log at debug for now. If we find an unknown event, we'll return it so
// that the poller can log a warning.
logger.Debug().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called with incremental state but current snapshot already exists.")
eventIDs := make([]string, len(state))
eventIDToRawEvent := make(map[string]json.RawMessage, len(state))
for i := range state {
eventID := gjson.ParseBytes(state[i]).Get("event_id")
if !eventID.Exists() || eventID.Type != gjson.String {
return fmt.Errorf("Event %d lacks an event ID", i)
}
eventIDToRawEvent[eventID.Str] = state[i]
eventIDs[i] = eventID.Str
}
unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs)
if err != nil {
return fmt.Errorf("error determing which event IDs are unknown: %s", err)
}
for unknownEventID := range unknownEventIDs {
res.PrependTimelineEvents = append(res.PrependTimelineEvents, eventIDToRawEvent[unknownEventID])
}
return nil
}

// We don't have a snapshot for this room. Parse the events first.
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
events := make([]Event, len(state))
for i := range events {
Expand All @@ -213,71 +199,65 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia
}
events = filterAndEnsureFieldsSet(events)
if len(events) == 0 {
return fmt.Errorf("failed to insert events, all events were filtered out: %w", err)
return fmt.Errorf("failed to parse state block, all events were filtered out: %w", err)
}

// Before proceeding further, ensure that we have "proper" state and not just a
// single stray event by looking for the create event.
hasCreate := false
for _, e := range events {
if e.Type == "m.room.create" && e.StateKey == "" {
hasCreate = true
break
if snapshotID == 0 {
// Ensure that we have "proper" state and not "stray" events from Synapse.
if err = ensureStateHasCreateEvent(events); err != nil {
return err
}
}
if !hasCreate {
const errMsg = "cannot create first snapshot without a create event"
sentry.WithScope(func(scope *sentry.Scope) {
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
"room_id": roomID,
"len_state": len(events),
})
sentry.CaptureMessage(errMsg)
})
logger.Warn().
Str("room_id", roomID).
Int("len_state", len(events)).
Msg(errMsg)
// the HS gave us bad data so there's no point retrying => return DataError
return internal.NewDataError(errMsg)
}

// Insert the events.
eventIDToNID, err := a.eventsTable.Insert(txn, events, false)
// Insert the events and determine which ones are new.
newEventIDToNID, err := a.eventsTable.Insert(txn, events, false)
if err != nil {
return fmt.Errorf("failed to insert events: %w", err)
}
if len(eventIDToNID) == 0 {
// we don't have a current snapshot for this room but yet no events are new,
// no idea how this should be handled.
const errMsg = "Accumulator.Initialise: room has no current snapshot but also no new inserted events, doing nothing. This is probably a bug."
logger.Error().Str("room_id", roomID).Msg(errMsg)
sentry.CaptureException(fmt.Errorf(errMsg))
if len(newEventIDToNID) == 0 {
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
if snapshotID == 0 {
// we don't have a current snapshot for this room but yet no events are new,
// no idea how this should be handled.
const errMsg = "Accumulator.Initialise: room has no current snapshot but also no new inserted events, doing nothing. This is probably a bug."
logger.Error().Str("room_id", roomID).Msg(errMsg)
sentry.CaptureException(fmt.Errorf(errMsg))
}
// Otherwise we do have a current snapshot and there are no new events in
// the state block. We assume that the room state has not changed. (This
// could wrong if there was a state rollback or reset. But sync v2 doesn't
// communicate those, so there's no point trying to account for this.)
return nil
}

// pull out the event NIDs we just inserted
membershipEventIDs := make(map[string]struct{}, len(events))
newEvents := make([]Event, 0, len(newEventIDToNID))
for _, event := range events {
if event.Type == "m.room.member" {
membershipEventIDs[event.ID] = struct{}{}
newNid, isNew := newEventIDToNID[event.ID]
if isNew {
event.NID = newNid
newEvents = append(newEvents, event)
}
}
memberNIDs := make([]int64, 0, len(eventIDToNID))
otherNIDs := make([]int64, 0, len(eventIDToNID))
for evID, nid := range eventIDToNID {
if _, exists := membershipEventIDs[evID]; exists {
memberNIDs = append(memberNIDs, int64(nid))
} else {
otherNIDs = append(otherNIDs, int64(nid))

// Fetch the current state of the room.
var currentState stateMap
if snapshotID > 0 {
currentState, err = a.stateMapAtSnapshot(txn, snapshotID)
if err != nil {
return fmt.Errorf("failed to load state map: %w", err)
}
} else {
currentState = stateMap{
Memberships: make(map[string]int64, len(events)),
Other: make(map[[2]string]int64, len(events)),
}
}

// Make a current snapshot
for _, ev := range newEvents {
currentState.Ingest(ev)
}
memberNIDs, otherNIDs := currentState.NIDs()
snapshot := &SnapshotRow{
RoomID: roomID,
MembershipEvents: pq.Int64Array(memberNIDs),
OtherEvents: pq.Int64Array(otherNIDs),
MembershipEvents: memberNIDs,
OtherEvents: otherNIDs,
}
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
err = a.snapshotTable.Insert(txn, snapshot)
if err != nil {
Expand Down Expand Up @@ -650,3 +630,82 @@ func (a *Accumulator) filterToNewTimelineEvents(txn *sqlx.Tx, dedupedEvents []Ev
// A is seen event s[A,B,C] => s[0+1:] => [B,C]
return dedupedEvents[seenIndex+1:], nil
}

func ensureStateHasCreateEvent(events []Event) error {
hasCreate := false
for _, e := range events {
if e.Type == "m.room.create" && e.StateKey == "" {
hasCreate = true
break
}
}
if !hasCreate {
const errMsg = "cannot create first snapshot without a create event"
sentry.WithScope(func(scope *sentry.Scope) {
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
"room_id": events[0].RoomID,
"len_state": len(events),
})
sentry.CaptureMessage(errMsg)
})
logger.Warn().
Str("room_id", events[0].RoomID).
Int("len_state", len(events)).
Msg(errMsg)
// the HS gave us bad data so there's no point retrying => return DataError
return internal.NewDataError(errMsg)
}
return nil
}

type stateMap struct {
// state_key (user id) -> NID
Memberships map[string]int64
// type, state_key -> NID
Other map[[2]string]int64
kegsay marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *stateMap) Ingest(e Event) (replacedNID int64) {
if e.Type == "m.room.member" {
replacedNID = s.Memberships[e.StateKey]
s.Memberships[e.StateKey] = e.NID
} else {
key := [2]string{e.Type, e.StateKey}
replacedNID = s.Other[key]
s.Other[key] = e.NID
}
return
}
Comment on lines +677 to +687
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't actually use the return value here. At some point I was more ambitious and wanted to reuse this machinery in Accumulate... but then I decided that was not really getting us anything and left it for a rainy day.


func (s *stateMap) NIDs() (membershipNIDs, otherNIDs []int64) {
membershipNIDs = make([]int64, 0, len(s.Memberships))
otherNIDs = make([]int64, 0, len(s.Other))
for _, nid := range s.Memberships {
membershipNIDs = append(membershipNIDs, nid)
}
for _, nid := range s.Other {
otherNIDs = append(otherNIDs, nid)
}
return
}
Comment on lines +689 to +699
Copy link
Contributor Author

@DMRobertson DMRobertson Sep 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a little inefficient to convert from []Event to maps and then back to []int64 again if we're only changing the state by one event. But we're optimising for leaving and rejoining large rooms here, with large gappy state blocks. We've seen gappy state blocks of size multiple thousands on the m.org deployment!


func (a *Accumulator) stateMapAtSnapshot(txn *sqlx.Tx, snapID int64) (stateMap, error) {
snapshot, err := a.snapshotTable.Select(txn, snapID)
if err != nil {
return stateMap{}, err
}
// pull stripped events as this may be huge (think Matrix HQ)
events, err := a.eventsTable.SelectStrippedEventsByNIDs(txn, true, append(snapshot.MembershipEvents, snapshot.OtherEvents...))
if err != nil {
return stateMap{}, err
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only want event NID, type and state key here, so we could use a more specialised query here to fetch less data.

}

state := stateMap{
Memberships: make(map[string]int64, len(snapshot.MembershipEvents)),
Other: make(map[[2]string]int64, len(snapshot.OtherEvents)),
}
for _, e := range events {
state.Ingest(e)
}
return state, nil
}
6 changes: 3 additions & 3 deletions sync2/handler2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,20 +369,20 @@ func (h *Handler) Accumulate(ctx context.Context, userID, deviceID, roomID strin
return nil
}

func (h *Handler) Initialise(ctx context.Context, roomID string, state []json.RawMessage) ([]json.RawMessage, error) {
func (h *Handler) Initialise(ctx context.Context, roomID string, state []json.RawMessage) error {
res, err := h.Store.Initialise(roomID, state)
if err != nil {
logger.Err(err).Int("state", len(state)).Str("room", roomID).Msg("V2: failed to initialise room")
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
return nil, err
return err
}
if res.AddedEvents {
h.v2Pub.Notify(pubsub.ChanV2, &pubsub.V2Initialise{
RoomID: roomID,
SnapshotNID: res.SnapshotID,
})
}
return res.PrependTimelineEvents, nil
return nil
}

func (h *Handler) SetTyping(ctx context.Context, pollerID sync2.PollerID, roomID string, ephEvent json.RawMessage) {
Expand Down
27 changes: 4 additions & 23 deletions sync2/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type V2DataReceiver interface {
// Initialise the room, if it hasn't been already. This means the state section of the v2 response.
// If given a state delta from an incremental sync, returns the slice of all state events unknown to the DB.
// Return an error to stop the since token advancing.
Initialise(ctx context.Context, roomID string, state []json.RawMessage) ([]json.RawMessage, error) // snapshot ID?
Initialise(ctx context.Context, roomID string, state []json.RawMessage) error // snapshot ID?
// SetTyping indicates which users are typing.
SetTyping(ctx context.Context, pollerID PollerID, roomID string, ephEvent json.RawMessage)
// Sent when there is a new receipt
Expand Down Expand Up @@ -320,11 +320,11 @@ func (h *PollerMap) Accumulate(ctx context.Context, userID, deviceID, roomID str
wg.Wait()
return
}
func (h *PollerMap) Initialise(ctx context.Context, roomID string, state []json.RawMessage) (result []json.RawMessage, err error) {
func (h *PollerMap) Initialise(ctx context.Context, roomID string, state []json.RawMessage) (err error) {
var wg sync.WaitGroup
wg.Add(1)
h.executor <- func() {
result, err = h.callbacks.Initialise(ctx, roomID, state)
err = h.callbacks.Initialise(ctx, roomID, state)
wg.Done()
}
wg.Wait()
Expand Down Expand Up @@ -775,30 +775,11 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
for roomID, roomData := range res.Rooms.Join {
if len(roomData.State.Events) > 0 {
stateCalls++
prependStateEvents, err := p.receiver.Initialise(ctx, roomID, roomData.State.Events)
err := p.receiver.Initialise(ctx, roomID, roomData.State.Events)
if err != nil {
lastErrs = append(lastErrs, fmt.Errorf("Initialise[%s]: %w", roomID, err))
continue
}
if len(prependStateEvents) > 0 {
// The poller has just learned of these state events due to an
// incremental poller sync; we must have missed the opportunity to see
// these down /sync in a timeline. As a workaround, inject these into
// the timeline now so that future events are received under the
// correct room state.
const warnMsg = "parseRoomsResponse: prepending state events to timeline after gappy poll"
logger.Warn().Str("room_id", roomID).Int("prependStateEvents", len(prependStateEvents)).Msg(warnMsg)
hub := internal.GetSentryHubFromContextOrDefault(ctx)
hub.WithScope(func(scope *sentry.Scope) {
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
"room_id": roomID,
"num_prepend_state_events": len(prependStateEvents),
})
hub.CaptureMessage(warnMsg)
})
p.trackGappyStateSize(len(prependStateEvents))
roomData.Timeline.Events = append(prependStateEvents, roomData.Timeline.Events...)
}
}
// process typing/receipts before events so we seed the caches correctly for when we return the room
for _, ephEvent := range roomData.Ephemeral.Events {
Expand Down
Loading
Loading