Skip to content

Commit

Permalink
[registry] relocates tag cleanup locking
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiealquiza committed Feb 8, 2022
1 parent 09cdbd2 commit 8a90044
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 22 deletions.
32 changes: 21 additions & 11 deletions registry/server/tag_cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,20 @@ func (tc *TagCleaner) RunTagCleanup(s *Server, ctx context.Context, c Config) {
for tc.running {
<-t.C

if err := s.Locking.Lock(ctx); err != nil {
log.Println(err)
if err := s.MarkForDeletion(ctx, time.Now); err != nil {
log.Println("error marking tags for deletion: ", err)
continue
}
defer s.Locking.UnlockLogError(ctx)

err := s.MarkForDeletion(time.Now)
if err != nil {
log.Println(err)
continue
if err := s.DeleteStaleTags(ctx, time.Now, c); err != nil {
log.Println("error deleting stale tags: ", err)
}

s.DeleteStaleTags(time.Now, c)
}
}

// MarkForDeletion marks stored tags that have been stranded without an associated
// kafka resource.
func (s *Server) MarkForDeletion(now func() time.Time) error {
func (s *Server) MarkForDeletion(ctx context.Context, now func() time.Time) error {
markTimeMinutes := fmt.Sprint(now().Unix())

// Get all brokers from ZK.
Expand All @@ -54,6 +49,12 @@ func (s *Server) MarkForDeletion(now func() time.Time) error {
return ErrFetchingBrokers
}

// Lock.
if err := s.Locking.Lock(ctx); err != nil {
return err
}
defer s.Locking.UnlockLogError(ctx)

// Get all topics from ZK
topics, err := s.ZK.GetTopics([]*regexp.Regexp{topicRegex})
topicSet := TopicSetFromSlice(topics)
Expand Down Expand Up @@ -110,8 +111,15 @@ func (s *Server) MarkForDeletion(now func() time.Time) error {
}

// DeleteStaleTags deletes any tags that have not had a kafka resource associated with them.
func (s *Server) DeleteStaleTags(now func() time.Time, c Config) {
func (s *Server) DeleteStaleTags(ctx context.Context, now func() time.Time, c Config) error {
sweepTime := now().Unix()

// Lock.
if err := s.Locking.Lock(ctx); err != nil {
return err
}
defer s.Locking.UnlockLogError(ctx)

allTags, _ := s.Tags.Store.GetAllTags()

for kafkaObject, tags := range allTags {
Expand All @@ -132,6 +140,8 @@ func (s *Server) DeleteStaleTags(now func() time.Time, c Config) {
log.Printf("deleted tags for non-existent %s %s\n", kafkaObject.Type, kafkaObject.ID)
}
}

return nil
}

// TopicSetFromSlice converts a slice into a TopicSet for convenience
Expand Down
36 changes: 25 additions & 11 deletions registry/server/tag_cleanup_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package server

import (
"context"
"fmt"
"github.com/DataDog/kafka-kit/v3/kafkazk"
"testing"
"time"

"github.com/DataDog/kafka-kit/v3/kafkazk"
)

func TestMarkStaleTags(t *testing.T) {
Expand All @@ -21,13 +23,16 @@ func TestMarkStaleTags(t *testing.T) {

zk := kafkazk.NewZooKeeperStub()
th := testTagHandler()
s := Server{Tags: th, ZK: zk}

s := testServer()
s.Tags = th
s.ZK = zk

// WHEN
th.Store.SetTags(topic, tt)
th.Store.SetTags(broker, bt)
th.Store.SetTags(noBroker, nbt)
s.MarkForDeletion(time.Now)
s.MarkForDeletion(context.Background(), time.Now)

// THEN
nbtags, _ := th.Store.GetTags(noBroker)
Expand All @@ -54,13 +59,16 @@ func TestDeleteStaleTags(t *testing.T) {
bt := TagSet{"foo": "bar", TagMarkTimeKey: fmt.Sprint(markTime.Unix())}
broker := KafkaObject{Type: "broker", ID: "not found"}

th := testTagHandler()
zk := kafkazk.NewZooKeeperStub()
s := Server{Tags: th, ZK: zk}
th := testTagHandler()

s := testServer()
s.Tags = th
s.ZK = zk

//WHEN
th.Store.SetTags(broker, bt)
s.DeleteStaleTags(func() time.Time { return sweepTime }, Config{TagAllowedStalenessMinutes: 10})
s.DeleteStaleTags(context.Background(), func() time.Time { return sweepTime }, Config{TagAllowedStalenessMinutes: 10})

//THEN
btags, _ := th.Store.GetTags(broker)
Expand All @@ -77,13 +85,16 @@ func TestUnmarkedTagsAreSafe(t *testing.T) {
bt := TagSet{"foo": "bar"}
broker := KafkaObject{Type: "broker", ID: "not found"}

th := testTagHandler()
zk := kafkazk.NewZooKeeperStub()
s := Server{Tags: th, ZK: zk}
th := testTagHandler()

s := testServer()
s.Tags = th
s.ZK = zk

//WHEN
th.Store.SetTags(broker, bt)
s.DeleteStaleTags(func() time.Time { return sweepTime }, Config{TagAllowedStalenessMinutes: 10})
s.DeleteStaleTags(context.Background(), func() time.Time { return sweepTime }, Config{TagAllowedStalenessMinutes: 10})

//THEN
btags, _ := th.Store.GetTags(broker)
Expand All @@ -102,12 +113,15 @@ func TestKafkaObjectComesBack(t *testing.T) {

zk := kafkazk.NewZooKeeperStub()
th := testTagHandler()
s := Server{Tags: th, ZK: zk}

s := testServer()
s.Tags = th
s.ZK = zk

// WHEN
th.Store.SetTags(broker, bt)
th.Store.SetTags(topic, tt)
s.MarkForDeletion(time.Now)
s.MarkForDeletion(context.Background(), time.Now)

// THEN
btags, _ := th.Store.GetTags(broker)
Expand Down

0 comments on commit 8a90044

Please sign in to comment.