Skip to content

Commit

Permalink
Creating ControlPlaneOperationsService (#224)
Browse files Browse the repository at this point in the history
* Move PulsarConfig into common/config (#217) (#3907)

* ARMADA-2848 Move PulsarConfig into commonconfig

* Update test name TestValidateHasJobSetID->Id

* Revert unintended changes to yarn.lock file

* fix import order

Co-authored-by: Eleanor Pratt <Eleanor.Pratt@gresearch.co.uk>

(cherry picked from commit 35cb59f)
Signed-off-by: mustaily891 <mustafa.ilyas@gresearch.co.uk>

* Adding ControlPlaneEventsTopic to pulsar config

---------

Signed-off-by: mustaily891 <mustafa.ilyas@gresearch.co.uk>
Co-authored-by: Eleanor Pratt <Eleanor.Pratt@gresearch.co.uk>
  • Loading branch information
2 people authored and GitHub Enterprise committed Sep 6, 2024
1 parent af31592 commit 40b9dc2
Show file tree
Hide file tree
Showing 27 changed files with 2,331 additions and 51 deletions.
1 change: 1 addition & 0 deletions config/armada/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ submission:
pulsar:
URL: "pulsar://pulsar:6650"
jobsetEventsTopic: "events"
controlPlaneEventsTopic: "control-plane"
maxConnectionsPerBroker: 1
compressionType: zlib
compressionLevel: faster
Expand Down
2 changes: 2 additions & 0 deletions internal/common/config/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type PulsarConfig struct {
JwtTokenPath string
// The pulsar topic that Jobset Events will be published to
JobsetEventsTopic string
// The pulsar topic that Control Plane Events will be published to
ControlPlaneEventsTopic string
// Compression to use. Valid values are "None", "LZ4", "Zlib", "Zstd". Default is "None"
CompressionType pulsar.CompressionType
// Compression Level to use. Valid values are "Default", "Better", "Faster". Default is "Default"
Expand Down
55 changes: 46 additions & 9 deletions internal/common/eventutil/eventutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/armadaevents"
"github.com/armadaproject/armada/pkg/controlplaneevents"
)

// UnmarshalEventSequence returns an EventSequence object contained in a byte buffer
Expand Down Expand Up @@ -174,15 +175,15 @@ func K8sObjectMetaFromLogObjectMeta(meta *armadaevents.ObjectMeta) *metav1.Objec
}
}

// CompactEventSequences converts a []*armadaevents.EventSequence into a []*armadaevents.EventSequence of minimal length.
// CompactJobSetEventSequences converts a []*armadaevents.EventSequence into a []*armadaevents.EventSequence of minimal length.
// In particular, it moves events with equal (queue, jobSetName, userId, groups) into a single sequence
// when doing so is possible without changing the order of events within job sets.
//
// For example, three sequences [A, B, C], [D, E], [F, G]
// could result in the following two sequences [A, B, C, F, G], [D, E],
// if sequence 1 and 3 share the same (queue, jobSetName, userId, groups)
// and if the sequence [D, E] is for a different job set.
func CompactEventSequences(sequences []*armadaevents.EventSequence) []*armadaevents.EventSequence {
func CompactJobSetEventSequences(sequences []*armadaevents.EventSequence) []*armadaevents.EventSequence {
// We may change ordering between job sets but not within job sets.
// To ensure the order of the resulting compacted sequences is deterministic,
// store a slice of all unique jobSetNames in the order they occur in sequences.
Expand Down Expand Up @@ -249,7 +250,7 @@ func groupsEqual(g1, g2 []string) bool {
return true
}

func LimitSequencesEventMessageCount(sequences []*armadaevents.EventSequence, maxEventsPerSequence int) []*armadaevents.EventSequence {
func LimitJobSetSequencesEventMessageCount(sequences []*armadaevents.EventSequence, maxEventsPerSequence int) []*armadaevents.EventSequence {
rv := make([]*armadaevents.EventSequence, 0, len(sequences))
for _, sequence := range sequences {
if len(sequence.Events) > maxEventsPerSequence {
Expand All @@ -272,12 +273,12 @@ func LimitSequencesEventMessageCount(sequences []*armadaevents.EventSequence, ma
return rv
}

// LimitSequencesByteSize calls LimitSequenceByteSize for each of the provided sequences
// LimitJobSetEventSequencesByteSize calls LimitJobSetSequenceByteSize for each of the provided sequences
// and returns all resulting sequences.
func LimitSequencesByteSize(sequences []*armadaevents.EventSequence, sizeInBytes uint, strict bool) ([]*armadaevents.EventSequence, error) {
func LimitJobSetEventSequencesByteSize(sequences []*armadaevents.EventSequence, sizeInBytes uint, strict bool) ([]*armadaevents.EventSequence, error) {
rv := make([]*armadaevents.EventSequence, 0, len(sequences))
for _, sequence := range sequences {
limitedSequences, err := LimitSequenceByteSize(sequence, sizeInBytes, strict)
limitedSequences, err := LimitJobSetSequenceByteSize(sequence, sizeInBytes, strict)
if err != nil {
return nil, err
}
Expand All @@ -287,15 +288,15 @@ func LimitSequencesByteSize(sequences []*armadaevents.EventSequence, sizeInBytes
}

// This is an (over)estimate of the byte overhead used to represent the list EventSequence.Events
// We need this get a safe estimate for the headerSize in LimitSequenceByteSize
// We need this get a safe estimate for the headerSize in LimitJobSetSequenceByteSize
// We cannot simply rely on proto.Size on an EventSequence with an empty Event list,
// as proto is smart enough to realise it is empty and just nils it out for 0 bytes
const sequenceEventListOverheadSizeBytes = 100

// LimitSequenceByteSize returns a slice of sequences produced by breaking up sequence.Events into separate sequences
// LimitJobSetSequenceByteSize returns a slice of sequences produced by breaking up sequence.Events into separate sequences
// If strict is true, each sequence will be at most sizeInBytes bytes in size
// If strict is false, sizeInBytes can be exceeded by at most the size of a single sequence.Event
func LimitSequenceByteSize(sequence *armadaevents.EventSequence, sizeInBytes uint, strict bool) ([]*armadaevents.EventSequence, error) {
func LimitJobSetSequenceByteSize(sequence *armadaevents.EventSequence, sizeInBytes uint, strict bool) ([]*armadaevents.EventSequence, error) {
// Compute the size of the sequence without events.
events := sequence.Events
sequence.Events = make([]*armadaevents.EventSequence_Event, 0)
Expand Down Expand Up @@ -334,3 +335,39 @@ func LimitSequenceByteSize(sequence *armadaevents.EventSequence, sizeInBytes uin
}
return sequences, nil
}

// ValidateControlPlaneEventsByteSize validates that each event is at most sizeInBytes bytes in size. Unlike JobSet events,
// control plane events are not a sequence.
func ValidateControlPlaneEventsByteSize(events []*controlplaneevents.ControlPlaneEventV1, sizeInBytes uint) error {
for _, event := range events {
eventSize := uint(proto.Size(event))
if eventSize > sizeInBytes {
return errors.WithStack(&armadaerrors.ErrInvalidArgument{
Name: "controlplaneevent",
Value: event,
Message: fmt.Sprintf(
"event of %d bytes is larger than the sequence size limit of %d",
eventSize,
sizeInBytes,
),
})
}
}

return nil
}

func MessageKeyFromControlPlaneEvent(event *controlplaneevents.ControlPlaneEventV1) (string, error) {
switch ev := event.Event.(type) {
case *controlplaneevents.ControlPlaneEventV1_CordonExecutor:
return proto.MessageName(ev.CordonExecutor), nil
case *controlplaneevents.ControlPlaneEventV1_UncordonExecutor:
return proto.MessageName(ev.UncordonExecutor), nil
default:
return "", errors.WithStack(&armadaerrors.ErrInvalidArgument{
Name: "controlplaneevent",
Value: ev,
Message: fmt.Sprintf("unknown event type %T", ev),
})
}
}
18 changes: 9 additions & 9 deletions internal/common/eventutil/eventutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestCompactSequences_Basic(t *testing.T) {
},
}

actual := CompactEventSequences(sequences)
actual := CompactJobSetEventSequences(sequences)
assert.Equal(t, expected, actual)
}

Expand Down Expand Up @@ -166,7 +166,7 @@ func TestCompactSequences_JobSetOrder(t *testing.T) {
},
}

actual := CompactEventSequences(sequences)
actual := CompactJobSetEventSequences(sequences)
assert.Equal(t, expected, actual)
}

Expand Down Expand Up @@ -253,7 +253,7 @@ func TestCompactSequences_Groups(t *testing.T) {
},
}

actual := CompactEventSequences(sequences)
actual := CompactJobSetEventSequences(sequences)
assert.Equal(t, expected, actual)
}

Expand Down Expand Up @@ -343,7 +343,7 @@ func TestLimitSequencesEventMessageCount(t *testing.T) {
},
}

result := LimitSequencesEventMessageCount(input, 2)
result := LimitJobSetSequencesEventMessageCount(input, 2)
assert.Len(t, result, 3)
assert.Equal(t, expected, result)
}
Expand All @@ -370,16 +370,16 @@ func TestLimitSequenceByteSize(t *testing.T) {
})
}

actual, err := LimitSequenceByteSize(sequence, 1000, true)
actual, err := LimitJobSetSequenceByteSize(sequence, 1000, true)
if !assert.NoError(t, err) {
return
}
assert.Equal(t, []*armadaevents.EventSequence{sequence}, actual)

_, err = LimitSequenceByteSize(sequence, 1, true)
_, err = LimitJobSetSequenceByteSize(sequence, 1, true)
assert.Error(t, err)

_, err = LimitSequenceByteSize(sequence, 1, false)
_, err = LimitJobSetSequenceByteSize(sequence, 1, false)
assert.NoError(t, err)

expected := make([]*armadaevents.EventSequence, numEvents)
Expand All @@ -400,7 +400,7 @@ func TestLimitSequenceByteSize(t *testing.T) {
},
}
}
actual, err = LimitSequenceByteSize(sequence, 65+sequenceEventListOverheadSizeBytes, true)
actual, err = LimitJobSetSequenceByteSize(sequence, 65+sequenceEventListOverheadSizeBytes, true)
if !assert.NoError(t, err) {
return
}
Expand Down Expand Up @@ -435,7 +435,7 @@ func TestLimitSequencesByteSize(t *testing.T) {
sequences = append(sequences, sequence)
}

actual, err := LimitSequencesByteSize(sequences, 65+sequenceEventListOverheadSizeBytes, true)
actual, err := LimitJobSetEventSequencesByteSize(sequences, 65+sequenceEventListOverheadSizeBytes, true)
if !assert.NoError(t, err) {
return
}
Expand Down
67 changes: 67 additions & 0 deletions internal/common/mocks/controlplaneevents/mock_publisher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion internal/common/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ package mocks

// Mock implementations used by tests
//go:generate mockgen -destination=./mock_pulsar.go -package=mocks "github.com/apache/pulsar-client-go/pulsar" Client,Producer,Message
//go:generate mockgen -destination=./mock_publisher.go -package=mocks "github.com/armadaproject/armada/internal/common/pulsarutils" Publisher
//go:generate mockgen -destination=./mock_executorapi.go -package=mocks "github.com/armadaproject/armada/pkg/executorapi" ExecutorApiClient,ExecutorApi_LeaseJobRunsClient
//go:generate mockgen -destination=./controlplaneevents/mock_publisher.go -package=controlplaneevents "github.com/armadaproject/armada/internal/common/pulsarutils/controlplaneevents" Publisher
//go:generate mockgen -destination=./jobsetevents/mock_publisher.go -package=jobsetevents "github.com/armadaproject/armada/internal/common/pulsarutils/jobsetevents" Publisher

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

102 changes: 102 additions & 0 deletions internal/common/pulsarutils/controlplaneevents/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package controlplaneevents

import (
"sync"
"time"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/eventutil"
"github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/pkg/controlplaneevents"
)

// Publisher is an interface to be implemented by structs that handle publishing messages to pulsar
type Publisher interface {
PublishMessages(ctx *armadacontext.Context, events ...*controlplaneevents.ControlPlaneEventV1) error
Close()
}

// PulsarPublisher is the default implementation of Publisher
type PulsarPublisher struct {
// Used to send messages to pulsar
producer pulsar.Producer
// Maximum size (in bytes) of produced pulsar messages.
// This must be below 4MB which is the pulsar message size limit
maxAllowedMessageSize uint
// Timeout after which async messages sends will be considered failed
sendTimeout time.Duration
}

func NewPulsarPublisher(
pulsarClient pulsar.Client,
producerOptions pulsar.ProducerOptions,
maxAllowedMessageSize uint,
sendTimeout time.Duration,
) (*PulsarPublisher, error) {
producer, err := pulsarClient.CreateProducer(producerOptions)
if err != nil {
return nil, errors.WithStack(err)
}
return &PulsarPublisher{
producer: producer,
maxAllowedMessageSize: maxAllowedMessageSize,
sendTimeout: sendTimeout,
}, nil
}

// PublishMessages publishes control plane event sequences to pulsar.
func (p *PulsarPublisher) PublishMessages(ctx *armadacontext.Context, events ...*controlplaneevents.ControlPlaneEventV1) error {
err := eventutil.ValidateControlPlaneEventsByteSize(events, p.maxAllowedMessageSize)
if err != nil {
return err
}
msgs := make([]*pulsar.ProducerMessage, len(events))
for i, event := range events {
bytes, err := proto.Marshal(event)
if err != nil {
return err
}

messageKey, err := eventutil.MessageKeyFromControlPlaneEvent(event)
if err != nil {
return err
}
msgs[i] = &pulsar.ProducerMessage{
Payload: bytes,
Key: messageKey,
}
}

wg := sync.WaitGroup{}
wg.Add(len(msgs))

// Send messages
sendCtx, cancel := armadacontext.WithTimeout(ctx, p.sendTimeout)
errored := false
for _, msg := range msgs {
p.producer.SendAsync(sendCtx, msg, func(_ pulsar.MessageID, _ *pulsar.ProducerMessage, err error) {
if err != nil {
logging.
WithStacktrace(ctx, err).
Error("error sending message to Pulsar")
errored = true
}
wg.Done()
})
}
wg.Wait()
cancel()
if errored {
return errors.New("One or more messages failed to send to Pulsar")
}

return nil
}

func (p *PulsarPublisher) Close() {
p.producer.Close()
}
Loading

0 comments on commit 40b9dc2

Please sign in to comment.