Skip to content

Commit

Permalink
Regression test for element-hq/synapse#16928
Browse files Browse the repository at this point in the history
And some drive-by logging improvements, along with a new
`ServerRoom.WaiterForEvent(eventID)` function which makes
waiting for an event to arrive on the Complement server
much much easier to write in code.
  • Loading branch information
kegsay committed Feb 16, 2024
1 parent 6b3745a commit b0f2451
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 5 deletions.
40 changes: 40 additions & 0 deletions federation/server_room.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/matrix-org/complement/b"
"github.com/matrix-org/complement/ct"
"github.com/matrix-org/complement/helpers"
)

type Event struct {
Expand Down Expand Up @@ -40,6 +41,8 @@ type ServerRoom struct {
TimelineMutex sync.RWMutex
ForwardExtremities []string
Depth int64
waiters map[string][]*helpers.Waiter // room ID -> []Waiter
waitersMu *sync.Mutex
}

// newRoom creates an empty room structure with no events
Expand All @@ -49,6 +52,8 @@ func newRoom(roomVer gomatrixserverlib.RoomVersion, roomId string) *ServerRoom {
Version: roomVer,
State: make(map[string]gomatrixserverlib.PDU),
ForwardExtremities: make([]string, 0),
waiters: make(map[string][]*helpers.Waiter),
waitersMu: &sync.Mutex{},
}
}

Expand All @@ -66,6 +71,41 @@ func (r *ServerRoom) AddEvent(ev gomatrixserverlib.PDU) {
r.Depth = ev.Depth()
}
r.ForwardExtremities = []string{ev.EventID()}

// inform waiters
r.waitersMu.Lock()
defer r.waitersMu.Unlock()
for _, w := range r.waiters[ev.EventID()] {
w.Finish()
}
delete(r.waiters, ev.EventID()) // clear the waiters
}

// WaiterForEvent creates a Waiter which waits until the given event ID is added to the room.
// This can be used as a synchronisation point to wait until the server under test has sent
// a given PDU in a /send transaction to the Complement server. This is the equivalent to listening
// for the PDU in the /send transaction and then unblocking the Waiter. Note that calling
// this function doesn't actually block. Call .Wait(time.Duration) on the waiter to block.
//
// Note: you must still add HandleTransactionRequests(nil,nil) to your server for the server to
// automatically add events to the room.
func (r *ServerRoom) WaiterForEvent(eventID string) *helpers.Waiter {
// we need to lock the timeline so we can check the timeline without racing. We need to check
// the timeline so we can immediately finish the waiter if the event ID is already in the timeline.
r.TimelineMutex.Lock()
defer r.TimelineMutex.Unlock()
w := helpers.NewWaiter()
r.waitersMu.Lock()
r.waiters[eventID] = append(r.waiters[eventID], w)
r.waitersMu.Unlock()
// check if the event is already there and if so immediately end the wait
for _, ev := range r.Timeline {
if ev.EventID() == eventID {
w.Finish()
break
}
}
return w
}

// AuthEvents returns the state event IDs of the auth events which authenticate this event
Expand Down
11 changes: 7 additions & 4 deletions match/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ func jsonCheckOffInternal(wantKey string, wantItems []interface{}, allowUnwanted
// }
// })
func JSONCheckOffAllowUnwanted(wantKey string, wantItems []interface{}, mapper func(gjson.Result) interface{}, fn func(interface{}, gjson.Result) error) JSON {
// TODO: it would be nicer if `mapper` was before `wantItems` as then it's easier to see what's happening here
// e.g select this array, map them into this format, then check they are in `wantItems`. The optional `fn` is spurious
// and should either be variadic or not here at all.
return jsonCheckOffInternal(wantKey, wantItems, true, mapper, fn)
}

Expand Down Expand Up @@ -232,10 +235,10 @@ func JSONArrayEach(wantKey string, fn func(gjson.Result) error) JSON {
}

if !body.Exists() {
return fmt.Errorf("missing key '%s'", wantKey)
return fmt.Errorf("JSONArrayEach: missing key '%s'", wantKey)
}
if !body.IsArray() {
return fmt.Errorf("key '%s' is not an array", wantKey)
return fmt.Errorf("JSONArrayEach: key '%s' is not an array", wantKey)
}
var err error
body.ForEach(func(_, val gjson.Result) bool {
Expand All @@ -252,10 +255,10 @@ func JSONMapEach(wantKey string, fn func(k, v gjson.Result) error) JSON {
return func(body gjson.Result) error {
res := body.Get(wantKey)
if !res.Exists() {
return fmt.Errorf("missing key '%s'", wantKey)
return fmt.Errorf("JSONMapEach: missing key '%s'", wantKey)
}
if !res.IsObject() {
return fmt.Errorf("key '%s' is not an object", wantKey)
return fmt.Errorf("JSONMapEach: key '%s' is not an object", wantKey)
}
var err error
res.ForEach(func(key, val gjson.Result) bool {
Expand Down
2 changes: 1 addition & 1 deletion should/should.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func MatchJSONBytes(rawJson []byte, matchers ...match.JSON) error {
body := gjson.ParseBytes(rawJson)
for _, jm := range matchers {
if err := jm(body); err != nil {
return fmt.Errorf("MatchJSONBytes %s with input = %v", err, string(rawJson))
return fmt.Errorf("MatchJSONBytes %s with input = %v", err, body.Get("@pretty").String())
}
}
return nil
Expand Down
124 changes: 124 additions & 0 deletions tests/federation_sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package tests

import (
"encoding/json"
"fmt"
"testing"
"time"

"github.com/matrix-org/complement"
"github.com/matrix-org/complement/b"
"github.com/matrix-org/complement/client"
"github.com/matrix-org/complement/federation"
"github.com/matrix-org/complement/helpers"
"github.com/matrix-org/complement/match"
"github.com/matrix-org/complement/must"
"github.com/matrix-org/complement/runtime"
"github.com/tidwall/gjson"
)

// Tests https://github.com/element-hq/synapse/issues/16928
// To set up:
// - Alice and Bob join the same room, sends E1.
// - Alice sends event E3.
// - Bob forks the graph at E1 and sends S2.
// - Alice sends event E4 on the same fork as E3.
// - Alice sends event E5 merging the forks.
// - Alice sync with timeline_limit=1 and a filter that skips E5
func TestSyncOmitsStateChangeOnFilteredEvents(t *testing.T) {
runtime.SkipIf(t, runtime.Dendrite)
deployment := complement.Deploy(t, 1)
defer deployment.Destroy(t)
srv := federation.NewServer(t, deployment,
federation.HandleKeyRequests(),
federation.HandleMakeSendJoinRequests(),
federation.HandleTransactionRequests(nil, nil),
)
srv.UnexpectedRequestsAreErrors = false
cancel := srv.Listen()
defer cancel()

alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
bob := srv.UserID("bob")
ver := alice.GetDefaultRoomVersion(t)
serverRoom := srv.MustMakeRoom(t, ver, federation.InitialRoomEvents(ver, bob))

// Join Alice to the new room on the federation server and send E1.
alice.MustJoinRoom(t, serverRoom.RoomID, []string{srv.ServerName()})
e1 := alice.SendEventSynced(t, serverRoom.RoomID, b.Event{
Type: "m.room.message",
Content: map[string]interface{}{
"msgtype": "m.text",
"body": "E1",
},
})

// wait until bob's server sees e1
serverRoom.WaiterForEvent(e1).Waitf(t, 5*time.Second, "failed to see e1 (%s) on complement server", e1)

// create S2 but don't send it yet, prev_events will be set to [e1]
roomName := "I am the room name, S2"
s2 := srv.MustCreateEvent(t, serverRoom, federation.Event{
Type: "m.room.name",
StateKey: b.Ptr(""),
Sender: bob,
Content: map[string]interface{}{
"name": roomName,
},
})

// Alice sends E3 & E4
alice.SendEventSynced(t, serverRoom.RoomID, b.Event{
Type: "m.room.message",
Content: map[string]interface{}{
"msgtype": "m.text",
"body": "E3",
},
})
alice.SendEventSynced(t, serverRoom.RoomID, b.Event{
Type: "m.room.message",
Content: map[string]interface{}{
"msgtype": "m.text",
"body": "E4",
},
})

// fork the dag earlier at e1 and send s2
srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{s2.JSON()}, nil)

// wait until we see S2 to ensure the server has processed this.
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(serverRoom.RoomID, s2.EventID()))

// now send E5, merging the forks
alice.SendEventSynced(t, serverRoom.RoomID, b.Event{
Type: "please_filter_me",
Content: map[string]interface{}{
"msgtype": "m.text",
"body": "E5",
},
})

// now do a sync request which filters events of type 'please_filter_me', and ensure we still see the
// correct room name. Note we don't need to SyncUntil here as we have all the data in the right places
// already.
res, _ := alice.MustSync(t, client.SyncReq{
Filter: `{
"room": {
"timeline": {
"not_types": ["please_filter_me"],
"limit": 1
}
}
}`,
})
must.MatchGJSON(t, res, match.JSONCheckOffAllowUnwanted(
// look in this array
fmt.Sprintf("rooms.join.%s.state.events", client.GjsonEscape(serverRoom.RoomID)),
// for these items
[]interface{}{s2.EventID()},
// and map them first into this format
func(r gjson.Result) interface{} {
return r.Get("event_id").Str
}, nil,
))
}

0 comments on commit b0f2451

Please sign in to comment.