From 781b52768cf3b344fb76d6c44e1eaf09184209d5 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 7 Jul 2023 17:03:12 +0100 Subject: [PATCH 1/3] Set `max_age` for roomserver input stream to avoid excessive interior deletes --- setup/jetstream/nats.go | 17 +++++++++-------- setup/jetstream/streams.go | 1 + 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index e440879c08..a81701fe68 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -125,15 +125,16 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc // with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc. subjects = []string{name, name + ".>"} } - if info != nil { - switch { - case !reflect.DeepEqual(info.Config.Subjects, subjects): - fallthrough - case info.Config.Retention != stream.Retention: - fallthrough - case info.Config.Storage != stream.Storage: + if info != nil && !reflect.DeepEqual(info.Config, stream) { + // If the stream config doesn't match what we expect, try to update + // it. If that doesn't work then try to blow it away and we'll then + // recreate it in the next section. + if info, err = s.UpdateStream(stream); err != nil { + logrus.WithError(err).Warnf("Unable to update stream %q, recreating...", name) + // We failed to update the stream, this is a last attempt to get + // things working but may result in data loss. if err = s.DeleteStream(name); err != nil { - logrus.WithError(err).Fatal("Unable to delete stream") + logrus.WithError(err).Fatalf("Unable to delete stream %q", name) } info = nil } diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index 590f0cbd9d..741407926d 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -48,6 +48,7 @@ var streams = []*nats.StreamConfig{ Name: InputRoomEvent, Retention: nats.InterestPolicy, Storage: nats.FileStorage, + MaxAge: time.Hour * 24, }, { Name: InputDeviceListUpdate, From 2452b5d79d2e5ebdbb7ce1626a67e83272477910 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Fri, 7 Jul 2023 17:23:35 +0100 Subject: [PATCH 2/3] Revert `DeepEqual` part as it does the wrong thing with stream defaults --- setup/jetstream/nats.go | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index a81701fe68..5d1360e9cd 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -125,18 +125,33 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc // with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc. subjects = []string{name, name + ".>"} } - if info != nil && !reflect.DeepEqual(info.Config, stream) { + if info != nil { // If the stream config doesn't match what we expect, try to update // it. If that doesn't work then try to blow it away and we'll then // recreate it in the next section. - if info, err = s.UpdateStream(stream); err != nil { - logrus.WithError(err).Warnf("Unable to update stream %q, recreating...", name) - // We failed to update the stream, this is a last attempt to get - // things working but may result in data loss. - if err = s.DeleteStream(name); err != nil { - logrus.WithError(err).Fatalf("Unable to delete stream %q", name) + // Each specific option that we set must be checked by hand, as if + // you DeepEqual the whole config struct, it will always show that + // there's a difference because the NATS Server will return defaults + // in the stream info. + switch { + case !reflect.DeepEqual(info.Config.Subjects, subjects): + fallthrough + case info.Config.Retention != stream.Retention: + fallthrough + case info.Config.Storage != stream.Storage: + fallthrough + case info.Config.MaxAge != stream.MaxAge: + // Try updating the stream first, as many things can be updated + // non-destructively. + if info, err = s.UpdateStream(stream); err != nil { + logrus.WithError(err).Warnf("Unable to update stream %q, recreating...", name) + // We failed to update the stream, this is a last attempt to get + // things working but may result in data loss. + if err = s.DeleteStream(name); err != nil { + logrus.WithError(err).Fatalf("Unable to delete stream %q", name) + } + info = nil } - info = nil } } if info == nil { From bec088bf8da802e5887b4045ee1f0e1de42b4618 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Fri, 7 Jul 2023 17:28:12 +0100 Subject: [PATCH 3/3] Ain't nobody got time for `gocyclo` --- setup/jetstream/nats.go | 1 + 1 file changed, 1 insertion(+) diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 5d1360e9cd..8820e86b2b 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -87,6 +87,7 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS return js, nc } +// nolint:gocyclo func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) { if nc == nil { var err error