Skip to content

Commit

Permalink
Add pub/sub
Browse files Browse the repository at this point in the history
  • Loading branch information
minhduc140583 committed Jul 21, 2024
1 parent 6a45d0d commit 7e6d707
Show file tree
Hide file tree
Showing 8 changed files with 512 additions and 0 deletions.
149 changes: 149 additions & 0 deletions pubsub/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package pubsub

import (
"cloud.google.com/go/pubsub"
"context"
"fmt"
"google.golang.org/api/option"
"google.golang.org/api/transport"
"log"
"os"
"reflect"
"strconv"
"time"
)

func NewPubSubClientWithRetries(ctx context.Context, credentials []byte, retries []time.Duration, options ...string) (*pubsub.Client, error) {
var projectId string
if len(options) > 0 && len(options[0]) > 0 {
projectId = options[0]
}
if credentials != nil && len(credentials) > 0 {
opts := option.WithCredentialsJSON(credentials)
creds, er0 := transport.Creds(ctx, opts)
if er0 != nil {
return nil, er0
}
if len(projectId) == 0 {
projectId = creds.ProjectID
}
c, er1 := pubsub.NewClient(ctx, projectId, opts)
if er1 == nil {
return c, er1
}
i := 0
err := Retry(retries, func() (err error) {
i = i + 1
c2, er2 := pubsub.NewClient(ctx, projectId, opts)
if er2 == nil {
c = c2
}
return er2
})
if err != nil {
log.Printf("Failed to new pubsub client: %s.", err.Error())
}
return c, err
} else {
log.Println("empty credentials")
return pubsub.NewClient(ctx, projectId)
}
}
func NewPubSubClientWithFile(ctx context.Context, projectId string, keyFilename string) (*pubsub.Client, error) {
if len(keyFilename) > 0 && existFile(keyFilename) {
log.Println("key file exists")
return pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(keyFilename))
} else {
log.Println("key file doesn't exists")
return pubsub.NewClient(ctx, projectId)
}
}
func NewPubSubClient(ctx context.Context, credentials []byte, options ...string) (*pubsub.Client, error) {
opts := option.WithCredentialsJSON(credentials)
var projectId string
if len(options) > 0 && len(options[0]) > 0 {
projectId = options[0]
} else {
creds, err := transport.Creds(ctx, opts)
projectId = creds.ProjectID
if err != nil {
panic("Credentials Error: " + err.Error())
}
if creds == nil {
panic("Error: creds is nil")
}
}
if credentials != nil && len(credentials) > 0 {
return pubsub.NewClient(ctx, projectId, opts)
} else {
log.Println("empty credentials")
return pubsub.NewClient(ctx, projectId)
}
}

func existFile(filename string) bool {
if _, err := os.Stat(filename); err == nil {
return true
} else if os.IsNotExist(err) {
return false
} else {
log.Println(err.Error())
}
return false
}

func MakeDurations(vs []int64) []time.Duration {
durations := make([]time.Duration, 0)
for _, v := range vs {
d := time.Duration(v) * time.Second
durations = append(durations, d)
}
return durations
}
func MakeArray(v interface{}, prefix string, max int) []int64 {
var ar []int64
v2 := reflect.Indirect(reflect.ValueOf(v))
for i := 1; i <= max; i++ {
fn := prefix + strconv.Itoa(i)
v3 := v2.FieldByName(fn).Interface().(int64)
if v3 > 0 {
ar = append(ar, v3)
} else {
return ar
}
}
return ar
}
func DurationsFromValue(v interface{}, prefix string, max int) []time.Duration {
arr := MakeArray(v, prefix, max)
return MakeDurations(arr)
}

type RetryConfig struct {
Retry1 int64 `mapstructure:"1" json:"retry1,omitempty" gorm:"column:retry1" bson:"retry1,omitempty" dynamodbav:"retry1,omitempty" firestore:"retry1,omitempty"`
Retry2 int64 `mapstructure:"2" json:"retry2,omitempty" gorm:"column:retry2" bson:"retry2,omitempty" dynamodbav:"retry2,omitempty" firestore:"retry2,omitempty"`
Retry3 int64 `mapstructure:"3" json:"retry3,omitempty" gorm:"column:retry3" bson:"retry3,omitempty" dynamodbav:"retry3,omitempty" firestore:"retry3,omitempty"`
Retry4 int64 `mapstructure:"4" json:"retry4,omitempty" gorm:"column:retry4" bson:"retry4,omitempty" dynamodbav:"retry4,omitempty" firestore:"retry4,omitempty"`
Retry5 int64 `mapstructure:"5" json:"retry5,omitempty" gorm:"column:retry5" bson:"retry5,omitempty" dynamodbav:"retry5,omitempty" firestore:"retry5,omitempty"`
Retry6 int64 `mapstructure:"6" json:"retry6,omitempty" gorm:"column:retry6" bson:"retry6,omitempty" dynamodbav:"retry6,omitempty" firestore:"retry6,omitempty"`
Retry7 int64 `mapstructure:"7" json:"retry7,omitempty" gorm:"column:retry7" bson:"retry7,omitempty" dynamodbav:"retry7,omitempty" firestore:"retry7,omitempty"`
Retry8 int64 `mapstructure:"8" json:"retry8,omitempty" gorm:"column:retry8" bson:"retry8,omitempty" dynamodbav:"retry8,omitempty" firestore:"retry8,omitempty"`
Retry9 int64 `mapstructure:"9" json:"retry9,omitempty" gorm:"column:retry9" bson:"retry9,omitempty" dynamodbav:"retry9,omitempty" firestore:"retry9,omitempty"`
}

func Retry(sleeps []time.Duration, f func() error) (err error) {
attempts := len(sleeps)
for i := 0; ; i++ {
log.Printf("Retrying %d of %d ", i+1, attempts)
err = f()
if err == nil {
return
}
if i >= (attempts - 1) {
break
}
time.Sleep(sleeps[i])
log.Printf("Retrying %d of %d after error: %s", i+1, attempts, err.Error())
}
return fmt.Errorf("after %d attempts, last error: %s", attempts, err)
}
7 changes: 7 additions & 0 deletions pubsub/client_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package pubsub

type ClientConfig struct {
ProjectId string `yaml:"project_id" mapstructure:"project_id" json:"projectId,omitempty" gorm:"column:projectid" bson:"projectId,omitempty" dynamodbav:"projectId,omitempty" firestore:"projectId,omitempty"`
Credentials string `yaml:"credentials" mapstructure:"credentials" json:"credentials,omitempty" gorm:"column:credentials" bson:"credentials,omitempty" dynamodbav:"credentials,omitempty" firestore:"credentials,omitempty"`
KeyFilename string `yaml:"key_filename" mapstructure:"key_filename" json:"keyFilename,omitempty" gorm:"column:keyfilename" bson:"keyFilename,omitempty" dynamodbav:"keyFilename,omitempty" firestore:"keyFilename,omitempty"`
}
77 changes: 77 additions & 0 deletions pubsub/health_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package pubsub

import (
"cloud.google.com/go/pubsub"
"context"
"fmt"
"time"
)

type PermissionType int

const (
PermissionPublish PermissionType = 0
PermissionSubscribe PermissionType = 1
)

type HealthChecker struct {
name string
client *pubsub.Client
timeout time.Duration
permissionType PermissionType
resourceId string
}

func NewHealthChecker(name string, client *pubsub.Client, resourceId string, permissionType PermissionType, timeout ...time.Duration) *HealthChecker {
if len(timeout) >= 1 {
return &HealthChecker{name: name, client: client, permissionType: permissionType, resourceId: resourceId, timeout: timeout[0]}
}
return &HealthChecker{name: name, client: client, permissionType: permissionType, resourceId: resourceId, timeout: 4 * time.Second}
}
func NewPubHealthChecker(name string, client *pubsub.Client, resourceId string, timeout ...time.Duration) *HealthChecker {
if len(timeout) >= 1 {
return &HealthChecker{name: name, client: client, permissionType: PermissionPublish, resourceId: resourceId, timeout: timeout[0]}
}
return &HealthChecker{name: name, client: client, permissionType: PermissionPublish, resourceId: resourceId, timeout: 4 * time.Second}
}
func NewSubHealthChecker(name string, client *pubsub.Client, resourceId string, timeout ...time.Duration) *HealthChecker {
if len(timeout) >= 1 {
return &HealthChecker{name: name, client: client, permissionType: PermissionSubscribe, resourceId: resourceId, timeout: timeout[0]}
}
return &HealthChecker{name: name, client: client, permissionType: PermissionSubscribe, resourceId: resourceId, timeout: 4 * time.Second}
}
func (h *HealthChecker) Name() string {
return h.name
}

func (h *HealthChecker) Check(ctx context.Context) (map[string]interface{}, error) {
res := make(map[string]interface{})
var permissions []string
var err error

timeoutCtx, _ := context.WithTimeout(ctx, h.timeout)
if h.permissionType == PermissionPublish {
permissions, err = h.client.Topic(h.resourceId).IAM().TestPermissions(timeoutCtx, []string{"pubsub.topics.publish"})
} else if h.permissionType == PermissionSubscribe {
permissions, err = h.client.Subscription(h.resourceId).IAM().TestPermissions(timeoutCtx, []string{"pubsub.subscriptions.consume"})
}

if err != nil {
return res, err
} else if len(permissions) != 1 {
return res, fmt.Errorf("invalid permissions: %v", permissions)
} else {
return res, nil
}
}

func (h *HealthChecker) Build(ctx context.Context, data map[string]interface{}, err error) map[string]interface{} {
if err == nil {
return data
}
if data == nil {
data = make(map[string]interface{}, 0)
}
data["error"] = err.Error()
return data
}
98 changes: 98 additions & 0 deletions pubsub/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package pubsub

import (
"cloud.google.com/go/iam"
"cloud.google.com/go/pubsub"
"context"
"log"
"time"
)

var CheckTopicPermission = CheckPermission

type Publisher struct {
Client *pubsub.Client
Topic *pubsub.Topic
Convert func(context.Context, []byte) ([]byte, error)
}

func NewPublisher(ctx context.Context, client *pubsub.Client, topicId string, c *TopicConfig, options ...func(context.Context, []byte) ([]byte, error)) *Publisher {
topic := client.Topic(topicId)
CheckTopicPermission(ctx, topic.IAM(), "pubsub.topics.publish")
var convert func(context.Context, []byte) ([]byte, error)
if len(options) > 0 {
convert = options[0]
}
return &Publisher{Client: client, Topic: ConfigureTopic(topic, c), Convert: convert}
}

func NewPublisherByConfig(ctx context.Context, c PublisherConfig, options ...func(context.Context, []byte) ([]byte, error)) (*Publisher, error) {
if c.Retry.Retry1 <= 0 {
client, err := NewPubSubClient(ctx, []byte(c.Client.Credentials), c.Client.ProjectId)
if err != nil {
return nil, err
}
return NewPublisher(ctx, client, c.TopicId, c.Topic, options...), nil
} else {
durations := DurationsFromValue(c.Retry, "Retry", 9)
client, err := NewPubSubClientWithRetries(ctx, []byte(c.Client.Credentials), durations, c.Client.ProjectId)
if err != nil {
return nil, err
}
return NewPublisher(ctx, client, c.TopicId, c.Topic, options...), nil
}
}

func ConfigureTopic(topic *pubsub.Topic, c *TopicConfig) *pubsub.Topic {
if c != nil {
if c.CountThreshold > 0 {
topic.PublishSettings.DelayThreshold = time.Duration(c.CountThreshold) * time.Millisecond
}
if c.DelayThreshold > 0 {
topic.PublishSettings.CountThreshold = c.DelayThreshold
}
if c.ByteThreshold > 0 {
topic.PublishSettings.ByteThreshold = c.ByteThreshold
}
if c.NumGoroutines > 0 {
topic.PublishSettings.NumGoroutines = c.NumGoroutines
}
}
return topic
}
func (p *Publisher) Publish(ctx context.Context, data []byte, attributes map[string]string) error {
msg := &pubsub.Message{Data: data}
if attributes != nil {
msg.Attributes = attributes
}
publishResult := p.Topic.Publish(ctx, msg)
_, err := publishResult.Get(ctx)
return err
}
func (p *Publisher) PublishData(ctx context.Context, data []byte) error {
msg := &pubsub.Message{Data: data}
publishResult := p.Topic.Publish(ctx, msg)
_, err := publishResult.Get(ctx)
return err
}
func (p *Publisher) PublishMessage(ctx context.Context, data []byte, attributes map[string]string) (string, error) {
msg := &pubsub.Message{Data: data}
if attributes != nil {
msg.Attributes = attributes
}
publishResult := p.Topic.Publish(ctx, msg)
return publishResult.Get(ctx)
}

func CheckPermission(ctx0 context.Context, iam *iam.Handle, permission string) {
ctx, _ := context.WithTimeout(ctx0, 30*time.Second)

log.Printf("Checking permission: %s", permission)
if permissions, err := iam.TestPermissions(ctx, []string{permission}); err != nil {
log.Printf("Can't check permission %v: %s", permission, err.Error())
} else if len(permissions) > 0 && permissions[0] == permission {
log.Printf("Permission %v valid", permission)
} else {
log.Printf("Permission %v invalid", permission)
}
}
15 changes: 15 additions & 0 deletions pubsub/publisher_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package pubsub

type PublisherConfig struct {
TopicId string `yaml:"topic_id" mapstructure:"topic_id" json:"topicId,omitempty" gorm:"column:topicid" bson:"topicId,omitempty" dynamodbav:"topicId,omitempty" firestore:"topicId,omitempty"`
Client ClientConfig `yaml:"client" mapstructure:"client" json:"client,omitempty" gorm:"column:client" bson:"client,omitempty" dynamodbav:"client,omitempty" firestore:"client,omitempty"`
Topic *TopicConfig `yaml:"topic" mapstructure:"topic" json:"topic,omitempty" gorm:"column:topic" bson:"topic,omitempty" dynamodbav:"topic,omitempty" firestore:"topic,omitempty"`
Retry RetryConfig `yaml:"retry" mapstructure:"retry" json:"retry,omitempty" gorm:"column:retry" bson:"retry,omitempty" dynamodbav:"retry,omitempty" firestore:"retry,omitempty"`
}

type TopicConfig struct {
DelayThreshold int `yaml:"delay_threshold" mapstructure:"delay_threshold" json:"delayThreshold,omitempty" gorm:"column:delaythreshold" bson:"delayThreshold,omitempty" dynamodbav:"delayThreshold,omitempty" firestore:"delayThreshold,omitempty"` // MaxMessages
CountThreshold int `yaml:"count_threshold" mapstructure:"" json:"countThreshold,omitempty" gorm:"column:countthreshold" bson:"countThreshold,omitempty" dynamodbav:"countThreshold,omitempty" firestore:"countThreshold,omitempty"` // MaxMilliseconds
ByteThreshold int `yaml:"byte_threshold" mapstructure:"byte_threshold" json:"byteThreshold,omitempty" gorm:"column:bytethreshold" bson:"byteThreshold,omitempty" dynamodbav:"byteThreshold,omitempty" firestore:"byteThreshold,omitempty"` // MaxBytes
NumGoroutines int `yaml:"num_goroutines" mapstructure:"num_goroutines" json:"numGoroutines,omitempty" gorm:"column:numgoroutines" bson:"numGoroutines,omitempty" dynamodbav:"numGoroutines,omitempty" firestore:"numGoroutines,omitempty"`
}
Loading

0 comments on commit 7e6d707

Please sign in to comment.