Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support topic sequence page #540

Merged
merged 6 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 107 additions & 69 deletions internal/protos/cachepubsub.pb.go

Large diffs are not rendered by default.

35 changes: 25 additions & 10 deletions internal/protos/cachepubsub.proto
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,20 @@ message _SubscriptionRequest {
// The literal topic name to which you want to subscribe.
string topic = 2;

// --> Providing this is not required. <--
//
// If provided, attempt to reconnect to the topic and replay messages starting from
// the provided sequence number. You may get a discontinuity if some (or all) of the
// messages are not available.
// If provided at 1, you may receive some messages leading up to whatever the
// newest message is. The exact amount is unspecified and subject to change.
// If not provided (or 0), the subscription will begin with the latest messages.
uint64 resume_at_topic_sequence_number = 3;

// Determined by the service when a topic is created. This clarifies the intent of
// a subscription, and ensures the right messages are sent for a given
// `resume_at_topic_sequence_number`.
// Include this in your Subscribe() calls when you are reconnecting. The right value
// is the last sequence_page you received.
uint64 sequence_page = 4;
}

// Possible message kinds from a topic. They can be items when they're from you, or
Expand All @@ -101,19 +108,22 @@ message _SubscriptionItem {

// Your subscription has yielded an item you previously published. Here it is!
message _TopicItem {
// Topic sequence numbers are **best-effort** and **informational**.
// They are not transactional.
// They exist:
// * to help reconnect to an existing topic while trying to avoid missing items.
// * to facilitate richer monitoring and logging.
// * to provide a best-effort awareness of stream contiguity, or lack thereof,
// in case you need to know.
// You can safely ignore them if none of that matters to you!
// Topic sequence numbers give an order of messages per-topic.
// All subscribers to a topic will receive messages in the same order, with the same sequence numbers.
uint64 topic_sequence_number = 1;

// The value you previously published to this topic.
_TopicValue value = 2;

// Authenticated id from Publisher's disposable token
string publisher_id = 3;

// Sequence pages exist to determine which sequence number range a message belongs to. On a topic reset,
// the sequence numbers reset and a new sequence_page is given.
// For a given sequence_page, the next message in a topic is topic_sequence_number + 1.
//
// Later sequence pages are numbered greater than earlier pages, but they don't go one-by-one.
uint64 sequence_page = 4;
}

// A value in a topic - published, duplicated and received in a subscription.
Expand All @@ -135,6 +145,11 @@ message _Discontinuity {
uint64 last_topic_sequence = 1;
// The new topic sequence number after which TopicItems will ostensibly resume.
uint64 new_topic_sequence = 2;
// The new topic sequence_page. If you had one before and this one is different, then your topic reset.
// If you didn't have one, then this is just telling you what the sequence page is expected to be.
// If you had one before, and this one is the same, then it's just telling you that you missed some messages
// in the topic. Probably your client is consuming messages a little too slowly in this case!
uint64 new_sequence_page = 3;
}

// A message from Momento for when we want to reassure clients or frameworks that a
Expand Down
1 change: 1 addition & 0 deletions momento/pubsub_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (client *pubSubClient) topicSubscribe(ctx context.Context, request *TopicSu
CacheName: request.CacheName,
Topic: request.TopicName,
ResumeAtTopicSequenceNumber: request.ResumeAtTopicSequenceNumber,
SequencePage: request.SequencePage,
})

if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions momento/topic_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,21 @@ func (c defaultTopicClient) Subscribe(ctx context.Context, request *TopicSubscri
}

topicManager, clientStream, cancelContext, cancelFunction, err = c.pubSubClient.topicSubscribe(ctx, &TopicSubscribeRequest{
CacheName: request.CacheName,
TopicName: request.TopicName,
CacheName: request.CacheName,
TopicName: request.TopicName,
ResumeAtTopicSequenceNumber: request.ResumeAtTopicSequenceNumber,
SequencePage: request.SequencePage,
})
if err != nil {
return nil, err
}

if request.ResumeAtTopicSequenceNumber == 0 && request.SequencePage == 0 {
c.log.Debug("Starting new subscription from latest messages.")
} else {
c.log.Debug("Resuming subscription from sequence number %d and sequence page %d.", request.ResumeAtTopicSequenceNumber, request.SequencePage)
}

// Ping the stream to provide a nice error message if the cache does not exist.
err = clientStream.RecvMsg(firstMsg)
if err != nil {
Expand Down
16 changes: 14 additions & 2 deletions momento/topic_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func (TopicHeartbeat) isTopicEvent() {}
type TopicDiscontinuity struct {
lastKnownSequenceNumber uint64
newSequenceNumber uint64
newSequencePage uint64
}

func (d TopicDiscontinuity) GetLastKnownSequenceNumber() uint64 {
Expand All @@ -24,10 +25,15 @@ func (d TopicDiscontinuity) GetNewSequenceNumber() uint64 {
return d.newSequenceNumber
}

func NewTopicDiscontinuity(lastKnownSequenceNumber uint64, newSequenceNumber uint64) TopicDiscontinuity {
func (d TopicDiscontinuity) GetNewSequencePage() uint64 {
return d.newSequencePage
}

func NewTopicDiscontinuity(lastKnownSequenceNumber uint64, newSequenceNumber uint64, newSequencePage uint64) TopicDiscontinuity {
return TopicDiscontinuity{
lastKnownSequenceNumber: lastKnownSequenceNumber,
newSequenceNumber: newSequenceNumber,
newSequencePage: newSequencePage,
}
}

Expand All @@ -37,6 +43,7 @@ type TopicItem struct {
message TopicValue
publisherId String
topicSequenceNumber uint64
topicSequencePage uint64
}

func (m TopicItem) isTopicEvent() {}
Expand All @@ -53,10 +60,15 @@ func (m TopicItem) GetTopicSequenceNumber() uint64 {
return m.topicSequenceNumber
}

func NewTopicItem(message TopicValue, publisherId String, topicSequenceNumber uint64) TopicItem {
func (m TopicItem) GetTopicSequencePage() uint64 {
return m.topicSequencePage
}

func NewTopicItem(message TopicValue, publisherId String, topicSequenceNumber uint64, topicSequencePage uint64) TopicItem {
return TopicItem{
message: message,
publisherId: publisherId,
topicSequenceNumber: topicSequenceNumber,
topicSequencePage: topicSequencePage,
}
}
1 change: 1 addition & 0 deletions momento/topic_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ type TopicSubscribeRequest struct {
CacheName string
TopicName string
ResumeAtTopicSequenceNumber uint64
SequencePage uint64
}

func (r TopicSubscribeRequest) cacheName() string { return r.CacheName }
11 changes: 7 additions & 4 deletions momento/topic_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type topicSubscription struct {
topicName string
log logger.MomentoLogger
lastKnownSequenceNumber uint64
lastKnownSequencePage uint64
cancelContext context.Context
cancelFunction context.CancelFunc
}
Expand Down Expand Up @@ -133,18 +134,19 @@ func (s *topicSubscription) Event(ctx context.Context) (TopicEvent, error) {
switch typedMsg := rawMsg.Kind.(type) {
case *pb.XSubscriptionItem_Discontinuity:
s.log.Debug("received discontinuity item: %+v", typedMsg.Discontinuity)
return NewTopicDiscontinuity(typedMsg.Discontinuity.LastTopicSequence, typedMsg.Discontinuity.NewTopicSequence), nil
return NewTopicDiscontinuity(typedMsg.Discontinuity.LastTopicSequence, typedMsg.Discontinuity.NewTopicSequence, typedMsg.Discontinuity.NewSequencePage), nil
case *pb.XSubscriptionItem_Item:
s.lastKnownSequenceNumber = typedMsg.Item.GetTopicSequenceNumber()
s.lastKnownSequencePage = typedMsg.Item.GetSequencePage()
publisherId := typedMsg.Item.GetPublisherId()

s.log.Trace("received item with sequence number %d and publisher Id %s", s.lastKnownSequenceNumber, publisherId)
s.log.Trace("received item with sequence number %d, sequence page %d, and publisher Id %s", s.lastKnownSequenceNumber, s.lastKnownSequencePage, publisherId)

switch subscriptionItem := typedMsg.Item.Value.Kind.(type) {
case *pb.XTopicValue_Text:
return NewTopicItem(String(subscriptionItem.Text), String(publisherId), s.lastKnownSequenceNumber), nil
return NewTopicItem(String(subscriptionItem.Text), String(publisherId), s.lastKnownSequenceNumber, s.lastKnownSequencePage), nil
case *pb.XTopicValue_Binary:
return NewTopicItem(Bytes(subscriptionItem.Binary), String(publisherId), s.lastKnownSequenceNumber), nil
return NewTopicItem(Bytes(subscriptionItem.Binary), String(publisherId), s.lastKnownSequenceNumber, s.lastKnownSequencePage), nil
}
case *pb.XSubscriptionItem_Heartbeat:
s.log.Trace("received heartbeat item")
Expand All @@ -167,6 +169,7 @@ func (s *topicSubscription) attemptReconnect(ctx context.Context) {
CacheName: s.cacheName,
TopicName: s.topicName,
ResumeAtTopicSequenceNumber: s.lastKnownSequenceNumber,
SequencePage: s.lastKnownSequencePage,
malandis marked this conversation as resolved.
Show resolved Hide resolved
})

if err != nil {
Expand Down
Loading