Skip to content

Commit

Permalink
small changes and renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
amirylm committed Nov 14, 2023
1 parent 082cdf6 commit b3faec5
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 19 deletions.
2 changes: 1 addition & 1 deletion core/ctrl.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Controller struct {
mdnsSvc mdns.Service
pubsub *pubsub.PubSub

manager pubsubManager
psManager pubsubManager
denylist pubsub.Blacklist
subFilter pubsub.SubscriptionFilter

Expand Down
34 changes: 16 additions & 18 deletions core/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

var (
inspectInterval = time.Minute
peerScoreInspectionInterval = time.Minute
)

func (c *Controller) setupPubsubRouter(ctx context.Context, cfg commons.Config) error {
Expand All @@ -34,7 +34,7 @@ func (c *Controller) setupPubsubRouter(ctx context.Context, cfg commons.Config)

if cfg.Pubsub.Scoring != nil {
opts = append(opts, pubsub.WithPeerScore(gossip.PeerScores(*cfg.Pubsub)))
opts = append(opts, pubsub.WithPeerScoreInspect(c.inspectPeerScores, inspectInterval))
opts = append(opts, pubsub.WithPeerScoreInspect(c.inspectPeerScores, peerScoreInspectionInterval))
}

if cfg.Pubsub.MaxMessageSize > 0 {
Expand All @@ -48,8 +48,6 @@ func (c *Controller) setupPubsubRouter(ctx context.Context, cfg commons.Config)
denylist := pubsub.NewMapBlacklist()
opts = append(opts, pubsub.WithBlacklist(denylist))

// pubsub.WithDefaultValidator() // TODO: check

if cfg.Pubsub.SubFilter != nil {
re, err := regexp.Compile(cfg.Pubsub.SubFilter.Pattern)
if err != nil {
Expand All @@ -73,7 +71,7 @@ func (c *Controller) setupPubsubRouter(ctx context.Context, cfg commons.Config)
}
c.pubsub = ps
c.denylist = denylist
c.manager.topics = make(map[string]*topicWrapper)
c.psManager.topics = make(map[string]*topicWrapper)

return nil
}
Expand All @@ -88,7 +86,7 @@ func (c *Controller) Publish(ctx context.Context, topicName string, data []byte)
}

func (c *Controller) Leave(topicName string) error {
tw := c.manager.getTopicWrapper(topicName)
tw := c.psManager.getTopicWrapper(topicName)
state := tw.state.Load()
switch state {
case topicStateJoined, topicStateErr:
Expand All @@ -104,7 +102,7 @@ func (c *Controller) Leave(topicName string) error {
}

func (c *Controller) Unsubscribe(topicName string) error {
tw := c.manager.getTopicWrapper(topicName)
tw := c.psManager.getTopicWrapper(topicName)
if tw.state.Load() == topicStateUnknown {
return nil // TODO: topic not found?
}
Expand Down Expand Up @@ -162,6 +160,14 @@ func (c *Controller) listenSubscription(ctx context.Context, sub *pubsub.Subscri
}

func (c *Controller) tryJoin(topicName string) (*pubsub.Topic, error) {
topicW := c.psManager.getTopicWrapper(topicName)
if topicW != nil {
if topicW.state.Load() == topicStateJoining {
return nil, fmt.Errorf("already tring to join topic %s", topicName)
}
return topicW.topic, nil
}
c.psManager.joiningTopic(topicName)
opts := []pubsub.TopicOpt{}
cfg, ok := c.cfg.Pubsub.GetTopicConfig(topicName)
if ok {
Expand All @@ -170,19 +176,11 @@ func (c *Controller) tryJoin(topicName string) (*pubsub.Topic, error) {
opts = append(opts, pubsub.WithTopicMessageIdFn(msgID))
}
}
topicW := c.manager.getTopicWrapper(topicName)
if topicW != nil {
if topicW.state.Load() == topicStateJoining {
return nil, fmt.Errorf("already tring to join topic %s", topicName)
}
return topicW.topic, nil
}
c.manager.joiningTopic(topicName)
topic, err := c.pubsub.Join(topicName, opts...)
if err != nil {
return nil, err
}
c.manager.upgradeTopic(topicName, topic)
c.psManager.upgradeTopic(topicName, topic)

if cfg.MsgValidator != nil || c.cfg.Pubsub.MsgValidator != nil {
msgValConfig := commons.MsgValidationConfig{}.Defaults(c.cfg.Pubsub.MsgValidator)
Expand All @@ -204,7 +202,7 @@ func (c *Controller) tryJoin(topicName string) (*pubsub.Topic, error) {

func (c *Controller) trySubscribe(topic *pubsub.Topic) (sub *pubsub.Subscription, err error) {
topicName := topic.String()
sub = c.manager.getSub(topicName)
sub = c.psManager.getSub(topicName)
if sub != nil {
return nil, nil
}
Expand All @@ -219,7 +217,7 @@ func (c *Controller) trySubscribe(topic *pubsub.Topic) (sub *pubsub.Subscription
if err != nil {
return nil, err
}
c.manager.addSub(topicName, sub)
c.psManager.addSub(topicName, sub)
return sub, nil
}

Expand Down

0 comments on commit b3faec5

Please sign in to comment.