Skip to content

Commit

Permalink
Set max age for roomserver input stream to avoid excessive interior d…
Browse files Browse the repository at this point in the history
…eletes (#3145)

If old messages build up in the input stream and do not get processed
successfully, this can create a significant drift between the stream
first sequence and the consumer ack floors, which results in a slow and
expensive start-up when interest-based retention is in use.

If a message is sat in the stream for 24 hours, it's probably not going
to get processed successfully, so let NATS drop them instead. Dendrite
can reconcile by fetching missing events later if it needs to.

---------

Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
  • Loading branch information
neilalexander and neilalexander authored Jul 7, 2023
1 parent c08c740 commit e93bdd5
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 3 deletions.
23 changes: 20 additions & 3 deletions setup/jetstream/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
return js, nc
}

// nolint:gocyclo
func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
if nc == nil {
var err error
Expand Down Expand Up @@ -126,16 +127,32 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
subjects = []string{name, name + ".>"}
}
if info != nil {
// If the stream config doesn't match what we expect, try to update
// it. If that doesn't work then try to blow it away and we'll then
// recreate it in the next section.
// Each specific option that we set must be checked by hand, as if
// you DeepEqual the whole config struct, it will always show that
// there's a difference because the NATS Server will return defaults
// in the stream info.
switch {
case !reflect.DeepEqual(info.Config.Subjects, subjects):
fallthrough
case info.Config.Retention != stream.Retention:
fallthrough
case info.Config.Storage != stream.Storage:
if err = s.DeleteStream(name); err != nil {
logrus.WithError(err).Fatal("Unable to delete stream")
fallthrough
case info.Config.MaxAge != stream.MaxAge:
// Try updating the stream first, as many things can be updated
// non-destructively.
if info, err = s.UpdateStream(stream); err != nil {
logrus.WithError(err).Warnf("Unable to update stream %q, recreating...", name)
// We failed to update the stream, this is a last attempt to get
// things working but may result in data loss.
if err = s.DeleteStream(name); err != nil {
logrus.WithError(err).Fatalf("Unable to delete stream %q", name)
}
info = nil
}
info = nil
}
}
if info == nil {
Expand Down
1 change: 1 addition & 0 deletions setup/jetstream/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var streams = []*nats.StreamConfig{
Name: InputRoomEvent,
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
MaxAge: time.Hour * 24,
},
{
Name: InputDeviceListUpdate,
Expand Down

0 comments on commit e93bdd5

Please sign in to comment.