Skip to content

Commit

Permalink
Upgrade Pulsar Client to v0.11 (armadaproject#2896)
Browse files Browse the repository at this point in the history
* update

* update pulsar client

* Fix bug causing server spinning

* Abstract out the retry until success logic for testing (armadaproject#2901)

* Respond to review

---------

Co-authored-by: Chris Martin <chris@cmartinit.co.uk>
Co-authored-by: Daniel Rastelli <rastellidani@gmail.com>
  • Loading branch information
3 people authored Aug 22, 2023
1 parent c964465 commit 08149dd
Show file tree
Hide file tree
Showing 15 changed files with 212 additions and 118 deletions.
11 changes: 10 additions & 1 deletion cmd/eventsprinter/logic/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/gogo/protobuf/proto"
v1 "k8s.io/api/core/v1"

"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/pkg/armadaevents"
)

Expand All @@ -31,7 +32,15 @@ func PrintEvents(url, topic, subscription string, verbose bool) error {
time.Sleep(time.Second)
continue
}
consumer.Ack(msg)

util.RetryUntilSuccess(
ctx,
func() error { return consumer.Ack(msg) },
func(err error) {
fmt.Println(err)
time.Sleep(time.Second)
},
)

sequence := &armadaevents.EventSequence{}
err = proto.Unmarshal(msg.Payload(), sequence)
Expand Down
6 changes: 5 additions & 1 deletion e2e/pulsar_test/pulsar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,11 @@ func receiveJobSetSequencesWithEventFilter(
fmt.Println("Pulsar receive error", err)
continue
}
consumer.Ack(msg)
err = consumer.Ack(msg)
if err != nil {
fmt.Println("Pulsar ack error", err)
continue
}

sequence := &armadaevents.EventSequence{}
err = proto.Unmarshal(msg.Payload(), sequence)
Expand Down
8 changes: 3 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ replace github.com/AthenZ/athenz v1.10.39 => github.com/AthenZ/athenz v1.10.4
require (
github.com/alexbrainman/sspi v0.0.0-20180613141037-e580b900e9f5
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/apache/pulsar-client-go v0.8.1-0.20220429133321-5ee63303d43e
github.com/apache/pulsar-client-go v0.11.0
github.com/avast/retry-go v3.0.0+incompatible
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f
Expand Down Expand Up @@ -102,11 +102,11 @@ require (
github.com/Azure/go-ntlmssp v0.0.0-20220621081337-cb9428e4ac1e // indirect
github.com/DataDog/zstd v1.5.0 // indirect
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 // indirect
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20220120090717-25e59572242e // indirect
github.com/ardielle/ardielle-go v1.5.2 // indirect
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/aymanbagabas/go-osc52 v1.2.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.4.0 // indirect
github.com/blang/semver v3.5.1+incompatible // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/charmbracelet/lipgloss v0.6.1-0.20220911181249-6304a734e792 // indirect
Expand Down Expand Up @@ -168,8 +168,6 @@ require (
github.com/mtibben/percent v0.2.1 // indirect
github.com/muesli/reflow v0.3.0 // indirect
github.com/muesli/termenv v0.14.0 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.19.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down Expand Up @@ -197,7 +195,7 @@ require (
golang.org/x/text v0.7.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.28.1 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
Expand Down
69 changes: 6 additions & 63 deletions go.sum

Large diffs are not rendered by default.

13 changes: 11 additions & 2 deletions internal/armada/server/eventsprinter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/common/pulsarutils/pulsarrequestid"
"github.com/armadaproject/armada/internal/common/requestid"
"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/pkg/armadaevents"
)

Expand Down Expand Up @@ -73,16 +74,24 @@ func (srv *EventsPrinter) Run(ctx context.Context) error {
default:

// Get a message from Pulsar, which consists of a sequence of events (i.e., state transitions).
ctxWithTimeout, _ := context.WithTimeout(ctx, 10*time.Second)
ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*time.Second)
msg, err := consumer.Receive(ctxWithTimeout)
cancel()
if errors.Is(err, context.DeadlineExceeded) { // expected
log.Info("no new messages from Pulsar (or another instance holds the subscription)")
break
} else if err != nil {
logging.WithStacktrace(log, err).Warnf("receiving from Pulsar failed")
break
}
consumer.Ack(msg)
util.RetryUntilSuccess(
context.Background(),
func() error { return consumer.Ack(msg) },
func(err error) {
logging.WithStacktrace(log, err).Warnf("acking pulsar message failed")
time.Sleep(time.Second) // Not sure what the right backoff is here
},
)

sequence := &armadaevents.EventSequence{}
if err := proto.Unmarshal(msg.Payload(), sequence); err != nil {
Expand Down
19 changes: 16 additions & 3 deletions internal/armada/server/submit_from_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (srv *SubmitFromLog) Run(ctx context.Context) error {
// If this message isn't for us we can simply ack it
// and go to the next message
if !schedulers.ForLegacyScheduler(msg) {
srv.Consumer.Ack(msg)
srv.ack(ctx, msg)
break
}

Expand All @@ -137,7 +137,7 @@ func (srv *SubmitFromLog) Run(ctx context.Context) error {
// Unmarshal and validate the message.
sequence, err := eventutil.UnmarshalEventSequence(ctxWithLogger, msg.Payload())
if err != nil {
srv.Consumer.Ack(msg)
srv.ack(ctx, msg)
logging.WithStacktrace(messageLogger, err).Warnf("processing message failed; ignoring")
numErrored++
break
Expand All @@ -146,7 +146,7 @@ func (srv *SubmitFromLog) Run(ctx context.Context) error {
messageLogger.WithField("numEvents", len(sequence.Events)).Info("processing sequence")
// TODO: Improve retry logic.
srv.ProcessSequence(ctxWithLogger, sequence)
srv.Consumer.Ack(msg)
srv.ack(ctx, msg)
}
}
}
Expand Down Expand Up @@ -766,3 +766,16 @@ func (srv *SubmitFromLog) ReprioritizeJobSet(

return true, nil
}

func (srv *SubmitFromLog) ack(ctx context.Context, msg pulsar.Message) {
util.RetryUntilSuccess(
ctx,
func() error {
return srv.Consumer.Ack(msg)
},
func(err error) {
logrus.WithError(err).Warnf("Error acking pulsar message")
time.Sleep(time.Second)
},
)
}
10 changes: 9 additions & 1 deletion internal/common/ingest/ingestion_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/armadaproject/armada/internal/common/eventutil"
commonmetrics "github.com/armadaproject/armada/internal/common/ingest/metrics"
"github.com/armadaproject/armada/internal/common/pulsarutils"
"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/pkg/armadaevents"
)

Expand Down Expand Up @@ -204,7 +205,14 @@ func (ingester *IngestionPipeline[T]) Run(ctx context.Context) error {
break
} else {
for _, msgId := range msg.GetMessageIDs() {
ingester.consumer.AckID(msgId)
util.RetryUntilSuccess(
context.Background(),
func() error { return ingester.consumer.AckID(msgId) },
func(err error) {
log.WithError(err).Warnf("Pulsar ack failed; backing off for %s", ingester.pulsarConfig.BackoffTime)
time.Sleep(ingester.pulsarConfig.BackoffTime)
},
)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion internal/common/ingest/ingestion_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,13 @@ func (p *mockPulsarConsumer) Receive(ctx context.Context) (pulsar.Message, error
}
}

func (p *mockPulsarConsumer) AckID(messageId pulsar.MessageID) {
func (p *mockPulsarConsumer) AckID(messageId pulsar.MessageID) error {
p.acked[messageId] = true
p.received++
if p.received >= len(p.messages) {
p.cancelFn()
}
return nil
}

func (p *mockPulsarConsumer) assertDidAck(messages []pulsar.Message) {
Expand Down
3 changes: 1 addition & 2 deletions internal/common/mocks/mock_executorapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions internal/common/mocks/mock_pulsar.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions internal/common/pulsarutils/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/sirupsen/logrus"

"github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/common/util"
)

// ConsumerMessageId wraps a pulsar message id and an identifier for the consumer which originally received the
Expand Down Expand Up @@ -108,7 +109,7 @@ func Receive(
// Ack will ack all pulsar messages coming in on the msgs channel. The incoming messages contain a consumer id which
// corresponds to the index of the consumer that should be used to perform the ack. In theory, the acks could be done
// in parallel, however its unlikely that they will be a performance bottleneck
func Ack(ctx context.Context, consumers []pulsar.Consumer, msgs chan []*ConsumerMessageId, wg *sync.WaitGroup) {
func Ack(ctx context.Context, consumers []pulsar.Consumer, msgs chan []*ConsumerMessageId, backoffTime time.Duration, wg *sync.WaitGroup) {
for msg := range msgs {
for _, id := range msg {
if id.ConsumerId < 0 || id.ConsumerId >= len(consumers) {
Expand All @@ -118,7 +119,17 @@ func Ack(ctx context.Context, consumers []pulsar.Consumer, msgs chan []*Consumer
"Asked to ack message belonging to consumer %d, however this is outside the bounds of the consumers array which is of length %d",
id.ConsumerId, len(consumers)))
}
consumers[id.ConsumerId].AckID(id.MessageId)
util.RetryUntilSuccess(
ctx,
func() error { return consumers[id.ConsumerId].AckID(id.MessageId) },
func(err error) {
logging.
WithStacktrace(msgLogger, err).
WithField("lastMessageId", id.MessageId).
Warnf("Pulsar ack failed; backing off for %s", backoffTime)
time.Sleep(backoffTime)
},
)
}
}
msgLogger.Info("Shutting down Ackker")
Expand Down
5 changes: 3 additions & 2 deletions internal/common/pulsarutils/async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ type mockConsumer struct {
ackedIds []pulsar.MessageID
}

func (c *mockConsumer) AckID(message pulsar.MessageID) {
func (c *mockConsumer) AckID(message pulsar.MessageID) error {
c.ackedIds = append(c.ackedIds, message)
return nil
}

func (c *mockConsumer) Receive(ctx context.Context) (pulsar.Message, error) {
Expand Down Expand Up @@ -70,7 +71,7 @@ func TestAcks(t *testing.T) {
consumers := []pulsar.Consumer{&mockConsumer}
wg := sync.WaitGroup{}
wg.Add(1)
go Ack(ctx.Background(), consumers, input, &wg)
go Ack(ctx.Background(), consumers, input, 1*time.Second, &wg)
input <- []*ConsumerMessageId{
{NewMessageId(1), 0, 0}, {NewMessageId(2), 0, 0},
}
Expand Down
35 changes: 0 additions & 35 deletions internal/common/pulsarutils/pulsar_to_channel.go

This file was deleted.

19 changes: 19 additions & 0 deletions internal/common/util/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package util

import "golang.org/x/net/context"

func RetryUntilSuccess(ctx context.Context, performAction func() error, onError func(error)) {
for {
select {
case <-ctx.Done():
return
default:
err := performAction()
if err == nil {
return
} else {
onError(err)
}
}
}
}
Loading

0 comments on commit 08149dd

Please sign in to comment.