Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Introduce a new stream for the appservice consumer (#3277)
Browse files Browse the repository at this point in the history
This introduces a new stream the syncAPI produces to once it processed a
`OutputRoomEvent` and the appservices consumes.
This is to work around a race condition where appservices receive an
event before the syncAPI has handled it, this can result in e.g. calls
to `/joined_members` returning a wrong membership list.
  • Loading branch information
S7evinK authored Dec 12, 2023
1 parent 185ad6b commit 1555b35
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 2 deletions.
197 changes: 197 additions & 0 deletions appservice/appservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,18 @@ import (
"testing"
"time"

"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi"
uapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"github.com/tidwall/gjson"

"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/appservice/api"
Expand Down Expand Up @@ -407,3 +417,190 @@ func TestRoomserverConsumerOneInvite(t *testing.T) {
close(evChan)
})
}

// Note: If this test panics, it is because we timed out waiting for the
// join event to come through to the appservice and we close the DB/shutdown Dendrite. This makes the
// syncAPI unhappy, as it is unable to write to the database.
func TestOutputAppserviceEvent(t *testing.T) {
alice := test.NewUser(t)
bob := test.NewUser(t)

test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
cfg, processCtx, closeDB := testrig.CreateConfig(t, dbType)
defer closeDB()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
natsInstance := &jetstream.NATSInstance{}

evChan := make(chan struct{})

caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
// Create required internal APIs
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)

// Create the router, so we can hit `/joined_members`
routers := httputil.NewRouters()

accessTokens := map[*test.User]userDevice{
bob: {},
}

usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
clientapi.AddPublicRoutes(processCtx, routers, cfg, natsInstance, nil, rsAPI, nil, nil, nil, usrAPI, nil, nil, caching.DisableMetrics)
createAccessTokens(t, accessTokens, usrAPI, processCtx.Context(), routers)

room := test.NewRoom(t, alice)

// Invite Bob
room.CreateAndInsert(t, alice, spec.MRoomMember, map[string]interface{}{
"membership": "invite",
}, test.WithStateKey(bob.ID))

// create a dummy AS url, handling the events
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var txn consumers.ApplicationServiceTransaction
err := json.NewDecoder(r.Body).Decode(&txn)
if err != nil {
t.Fatal(err)
}
for _, ev := range txn.Events {
if ev.Type != spec.MRoomMember {
continue
}
if ev.StateKey != nil && *ev.StateKey == bob.ID {
membership := gjson.GetBytes(ev.Content, "membership").Str
t.Logf("Processing membership: %s", membership)
switch membership {
case spec.Invite:
// Accept the invite
joinEv := room.CreateAndInsert(t, bob, spec.MRoomMember, map[string]interface{}{
"membership": "join",
}, test.WithStateKey(bob.ID))

if err := rsapi.SendEvents(context.Background(), rsAPI, rsapi.KindNew, []*types.HeaderedEvent{joinEv}, "test", "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}
case spec.Join: // the AS has received the join event, now hit `/joined_members` to validate that
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/_matrix/client/v3/rooms/"+room.ID+"/joined_members", nil)
req.Header.Set("Authorization", "Bearer "+accessTokens[bob].accessToken)
routers.Client.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("expected HTTP 200, got %d: %s", rec.Code, rec.Body.String())
}

// Both Alice and Bob should be joined. If not, we have a race condition
if !gjson.GetBytes(rec.Body.Bytes(), "joined."+alice.ID).Exists() {
t.Errorf("Alice is not joined to the room") // in theory should not happen
}
if !gjson.GetBytes(rec.Body.Bytes(), "joined."+bob.ID).Exists() {
t.Errorf("Bob is not joined to the room")
}
evChan <- struct{}{}
default:
t.Fatalf("Unexpected membership: %s", membership)
}
}
}
}))
defer srv.Close()

as := &config.ApplicationService{
ID: "someID",
URL: srv.URL,
ASToken: "",
HSToken: "",
SenderLocalpart: "senderLocalPart",
NamespaceMap: map[string][]config.ApplicationServiceNamespace{
"users": {{RegexpObject: regexp.MustCompile(bob.ID)}},
"aliases": {{RegexpObject: regexp.MustCompile(room.ID)}},
},
}
as.CreateHTTPClient(cfg.AppServiceAPI.DisableTLSValidation)

// Create a dummy application service
cfg.AppServiceAPI.Derived.ApplicationServices = []config.ApplicationService{*as}

// Prepare AS Streams on the old topic to validate that they get deleted
jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)

token := jetstream.Tokenise(as.ID)
if err := jetstream.JetStreamConsumer(
processCtx.Context(), jsCtx, cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent),
cfg.Global.JetStream.Durable("Appservice_"+token),
50, // maximum number of events to send in a single transaction
func(ctx context.Context, msgs []*nats.Msg) bool {
return true
},
); err != nil {
t.Fatal(err)
}

// Start the syncAPI to have `/joined_members` available
syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, natsInstance, usrAPI, rsAPI, caches, caching.DisableMetrics)

// start the consumer
appservice.NewInternalAPI(processCtx, cfg, natsInstance, usrAPI, rsAPI)

// At this point, the old JetStream consumers should be deleted
for consumer := range jsCtx.Consumers(cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)) {
if consumer.Name == cfg.Global.JetStream.Durable("Appservice_"+token)+"Pull" {
t.Fatalf("Consumer still exists")
}
}

// Create the room, this triggers the AS to receive an invite for Bob.
if err := rsapi.SendEvents(context.Background(), rsAPI, rsapi.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}

select {
// Pretty generous timeout duration...
case <-time.After(time.Millisecond * 1000): // wait for the AS to process the events
t.Errorf("Timed out waiting for join event")
case <-evChan:
}
close(evChan)
})
}

type userDevice struct {
accessToken string
deviceID string
password string
}

func createAccessTokens(t *testing.T, accessTokens map[*test.User]userDevice, userAPI uapi.UserInternalAPI, ctx context.Context, routers httputil.Routers) {
t.Helper()
for u := range accessTokens {
localpart, serverName, _ := gomatrixserverlib.SplitID('@', u.ID)
userRes := &uapi.PerformAccountCreationResponse{}
password := util.RandomString(8)
if err := userAPI.PerformAccountCreation(ctx, &uapi.PerformAccountCreationRequest{
AccountType: u.AccountType,
Localpart: localpart,
ServerName: serverName,
Password: password,
}, userRes); err != nil {
t.Errorf("failed to create account: %s", err)
}
req := test.NewRequest(t, http.MethodPost, "/_matrix/client/v3/login", test.WithJSONBody(t, map[string]interface{}{
"type": authtypes.LoginTypePassword,
"identifier": map[string]interface{}{
"type": "m.id.user",
"user": u.ID,
},
"password": password,
}))
rec := httptest.NewRecorder()
routers.Client.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("failed to login: %s", rec.Body.String())
}
accessTokens[u] = userDevice{
accessToken: gjson.GetBytes(rec.Body.Bytes(), "access_token").String(),
deviceID: gjson.GetBytes(rec.Body.Bytes(), "device_id").String(),
password: password,
}
}
}
12 changes: 11 additions & 1 deletion appservice/consumers/roomserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,14 @@ func NewOutputRoomEventConsumer(
ctx: process.Context(),
cfg: cfg,
jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputAppserviceEvent),
rsAPI: rsAPI,
}
}

// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
durableNames := make([]string, 0, len(s.cfg.Derived.ApplicationServices))
for _, as := range s.cfg.Derived.ApplicationServices {
appsvc := as
state := &appserviceState{
Expand All @@ -95,6 +96,15 @@ func (s *OutputRoomEventConsumer) Start() error {
); err != nil {
return fmt.Errorf("failed to create %q consumer: %w", token, err)
}
durableNames = append(durableNames, s.cfg.Matrix.JetStream.Durable("Appservice_"+token))
}
// Cleanup any consumers still existing on the OutputRoomEvent stream
// to avoid messages not being deleted
for _, consumerName := range durableNames {
err := s.jetstream.DeleteConsumer(s.cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), consumerName+"Pull")
if err != nil && err != nats.ErrConsumerNotFound {
return err
}
}
return nil
}
Expand Down
6 changes: 6 additions & 0 deletions setup/jetstream/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var (
InputDeviceListUpdate = "InputDeviceListUpdate"
InputSigningKeyUpdate = "InputSigningKeyUpdate"
OutputRoomEvent = "OutputRoomEvent"
OutputAppserviceEvent = "OutputAppserviceEvent"
OutputSendToDeviceEvent = "OutputSendToDeviceEvent"
OutputKeyChangeEvent = "OutputKeyChangeEvent"
OutputTypingEvent = "OutputTypingEvent"
Expand Down Expand Up @@ -65,6 +66,11 @@ var streams = []*nats.StreamConfig{
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
{
Name: OutputAppserviceEvent,
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
{
Name: OutputSendToDeviceEvent,
Retention: nats.InterestPolicy,
Expand Down
9 changes: 9 additions & 0 deletions syncapi/consumers/roomserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/synctypes"
Expand All @@ -55,6 +56,7 @@ type OutputRoomEventConsumer struct {
inviteStream streams.StreamProvider
notifier *notifier.Notifier
fts fulltext.Indexer
asProducer *producers.AppserviceEventProducer
}

// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
Expand All @@ -68,6 +70,7 @@ func NewOutputRoomEventConsumer(
inviteStream streams.StreamProvider,
rsAPI api.SyncRoomserverAPI,
fts *fulltext.Search,
asProducer *producers.AppserviceEventProducer,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
ctx: process.Context(),
Expand All @@ -81,6 +84,7 @@ func NewOutputRoomEventConsumer(
inviteStream: inviteStream,
rsAPI: rsAPI,
fts: fts,
asProducer: asProducer,
}
}

Expand Down Expand Up @@ -119,6 +123,11 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
}
}
err = s.onNewRoomEvent(s.ctx, *output.NewRoomEvent)
if err == nil && s.asProducer != nil {
if err = s.asProducer.ProduceRoomEvents(msg); err != nil {
log.WithError(err).Warn("failed to produce OutputAppserviceEvent")
}
}
case api.OutputTypeOldRoomEvent:
err = s.onOldRoomEvent(s.ctx, *output.OldRoomEvent)
case api.OutputTypeNewInviteEvent:
Expand Down
33 changes: 33 additions & 0 deletions syncapi/producers/appservices.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package producers

import (
"github.com/nats-io/nats.go"
)

// AppserviceEventProducer produces events for the appservice API to consume
type AppserviceEventProducer struct {
Topic string
JetStream nats.JetStreamContext
}

func (a *AppserviceEventProducer) ProduceRoomEvents(
msg *nats.Msg,
) error {
msg.Subject = a.Topic
_, err := a.JetStream.PublishMsg(msg)
return err
}
9 changes: 8 additions & 1 deletion syncapi/syncapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,16 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start key change consumer")
}

var asProducer *producers.AppserviceEventProducer
if len(dendriteCfg.AppServiceAPI.Derived.ApplicationServices) > 0 {
asProducer = &producers.AppserviceEventProducer{
JetStream: js, Topic: dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputAppserviceEvent),
}
}

roomConsumer := consumers.NewOutputRoomEventConsumer(
processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.PDUStreamProvider,
streams.InviteStreamProvider, rsAPI, fts,
streams.InviteStreamProvider, rsAPI, fts, asProducer,
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")
Expand Down

0 comments on commit 1555b35

Please sign in to comment.