Skip to content

Commit

Permalink
fixed global config issue
Browse files Browse the repository at this point in the history
  • Loading branch information
surendratiwari3 committed Oct 31, 2024
1 parent 0c857b1 commit e3def84
Show file tree
Hide file tree
Showing 13 changed files with 364 additions and 169 deletions.
61 changes: 0 additions & 61 deletions api/handlers/task_handler.go

This file was deleted.

43 changes: 0 additions & 43 deletions api/main.go

This file was deleted.

6 changes: 0 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,6 @@ func (cp *configProvider) GetConfig() *Config {
return &cp.applicationConfig
}

// SetConfigProvider sets the global configuration provider.
// This is useful for injecting custom configurations during tests.
func SetConfigProvider(provider ConfigProvider) {
globalConfigProvider = provider
}

// GetConfigProvider returns the global configuration provider.
func GetConfigProvider() ConfigProvider {
if globalConfigProvider == nil {
Expand Down
6 changes: 4 additions & 2 deletions config/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import "go.mongodb.org/mongo-driver/mongo"

// MongoDBConfig ...
type MongoDBConfig struct {
Client *mongo.Client
Database string
Client *mongo.Client
URI string
MaxPoolSize uint64 // Maximum number of connections in the pool
DbName string
}
54 changes: 50 additions & 4 deletions example/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package main

import (
"context"
"encoding/json"
"github.com/sirupsen/logrus"
"github.com/surendratiwari3/paota/config"
"github.com/surendratiwari3/paota/schema"
"github.com/surendratiwari3/paota/workerpool"

//"github.com/surendratiwari3/paota/example/task"
"github.com/surendratiwari3/paota/logger"
"os"
)

type printWorker struct {
workerPool workerpool.Pool
}

func main() {
logrusLog := logrus.StandardLogger()
logrusLog.SetFormatter(&logrus.JSONFormatter{})
Expand All @@ -24,7 +28,7 @@ func main() {
//Store: "null",
TaskQueueName: "paota_task_queue",
AMQP: &config.AMQPConfig{
Url: "amqp://localhost:5672/",
Url: "amqp://guest:guest@localhost:5672/",

Check failure

Code scanning / SonarCloud

AMQP credentials should not be disclosed High

Make sure these Rabbit MQ credentials get revoked, changed, and removed from the code. See more on SonarCloud
Exchange: "paota_task_exchange",
ExchangeType: "direct",
BindingKey: "paota_task_binding_key",
Expand All @@ -47,10 +51,13 @@ func main() {
logger.ApplicationLogger.Info("workerPool is nil")
os.Exit(0)
}

printWorker := printWorker{workerPool: newWorkerPool}

logger.ApplicationLogger.Info("newWorkerPool created successfully")
// Register tasks
regTasks := map[string]interface{}{
"Print": Print,
"Print": printWorker.Print,
}
err = newWorkerPool.RegisterTasks(regTasks)
if err != nil {
Expand All @@ -65,7 +72,46 @@ func main() {
}
}

func Print(arg *schema.Signature) error {
func (wp printWorker) Publish() {
// UserRecord represents the structure of user records.
type UserRecord struct {
ID string `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
// Add other fields as needed
}
// Replace this with the received user record
user := UserRecord{
ID: "1",
Name: "John Doe",
Email: "john.doe@example.com",
}

// Convert the struct to a JSON string
userJSON, err := json.Marshal(user)
if err != nil {
//
}

printJob := &schema.Signature{
Name: "Print",
Args: []schema.Arg{
{
Type: "string",
Value: string(userJSON),
},
},
RetryCount: 10,
RoutingKey: "consumer_publisher",
IgnoreWhenTaskNotRegistered: true,
}

wp.workerPool.SendTaskWithContext(context.Background(), printJob)

}

func (wp printWorker) Print(arg *schema.Signature) error {
logger.ApplicationLogger.Info("success")
wp.Publish()
return nil
}
124 changes: 124 additions & 0 deletions example/consumer_publisher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package main

import (
"context"
"encoding/json"
"github.com/sirupsen/logrus"
"github.com/surendratiwari3/paota/config"
"github.com/surendratiwari3/paota/schema"
"github.com/surendratiwari3/paota/workerpool"
"sync"

//"github.com/surendratiwari3/paota/example/task"
"github.com/surendratiwari3/paota/logger"
"os"
)

type printWorker struct {
workerPool workerpool.Pool
}

func main() {
logrusLog := logrus.StandardLogger()
logrusLog.SetFormatter(&logrus.JSONFormatter{})
logrusLog.SetReportCaller(true)

logger.ApplicationLogger = logrusLog

cnf := config.Config{
Broker: "amqp",
//Store: "null",
TaskQueueName: "paota_task_queue",
AMQP: &config.AMQPConfig{
Url: "amqp://guest:guest@localhost:5672/",

Check failure

Code scanning / SonarCloud

AMQP credentials should not be disclosed High

Make sure these Rabbit MQ credentials get revoked, changed, and removed from the code. See more on SonarCloud
Exchange: "paota_task_exchange",
ExchangeType: "direct",
BindingKey: "consumer_publisher",
PrefetchCount: 100,
ConnectionPoolSize: 10,
DelayedQueue: "delay_consumer_publisher",
},
}

newWorkerPool, err := workerpool.NewWorkerPoolWithConfig(context.Background(), 10, "testWorker", cnf)
if err != nil {
logger.ApplicationLogger.Error("workerPool is not created", err)
os.Exit(0)
} else if newWorkerPool == nil {
logger.ApplicationLogger.Info("workerPool is nil")
os.Exit(0)
}

printWorker := printWorker{workerPool: newWorkerPool}

logger.ApplicationLogger.Info("newWorkerPool created successfully")
// Register tasks
regTasks := map[string]interface{}{
"Print": printWorker.Print,
}
err = newWorkerPool.RegisterTasks(regTasks)
if err != nil {
logger.ApplicationLogger.Info("error while registering task")
return
}
logger.ApplicationLogger.Info("Worker is also started")

err = newWorkerPool.Start()
if err != nil {
logger.ApplicationLogger.Error("error while starting worker")
}
}

func (wp printWorker) Publish() {
// UserRecord represents the structure of user records.
type UserRecord struct {
ID string `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
// Add other fields as needed
}
// Replace this with the received user record
user := UserRecord{
ID: "1",
Name: "John Doe",
Email: "john.doe@example.com",
}

// Convert the struct to a JSON string
userJSON, err := json.Marshal(user)
if err != nil {
//
}

printJob := &schema.Signature{
Name: "Print",
Args: []schema.Arg{
{
Type: "string",
Value: string(userJSON),
},
},
RetryCount: 10,
RoutingKey: "consumer_publisher_",
IgnoreWhenTaskNotRegistered: true,
}

waitGrp := sync.WaitGroup{}
waitGrp.Add(1)
go func() {
for i := 0; i < 100; i++ {
wp.workerPool.SendTaskWithContext(context.Background(), printJob)
}
waitGrp.Done()
}()
}

waitGrp.Wait()

}

func (wp printWorker) Print(arg *schema.Signature) error {
logger.ApplicationLogger.Info("success")
wp.Publish()
return nil
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package store
package backend

import (
"github.com/surendratiwari3/paota/schema"
)

// Backend - a common interface for all result Store
type Backend interface {
// InsertTask Insert task state to result backend
InsertTask(signature schema.Signature) error
// StoreTask Insert task state to result backend
StoreTask(signature schema.Signature) error
// DeleteTask Delete task state from result backend
DeleteTask(taskId string) error
// GetTask Get task state from result backend
Expand Down
4 changes: 2 additions & 2 deletions internal/broker/amqp/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func (b *AMQPBroker) isDirectExchange() bool {
// NewAMQPBroker creates a new instance of the AMQP broker
// It opens connections to RabbitMQ, declares an exchange, opens a channel,
// declares and binds the queue, and enables publish notifications
func NewAMQPBroker() (broker.Broker, error) {
cfg := config.GetConfigProvider().GetConfig()
func NewAMQPBroker(configProvider config.ConfigProvider) (broker.Broker, error) {
cfg := configProvider.GetConfig()
amqpErrorChannel := make(chan *amqp.Error, 1)
stopChannel := make(chan struct{})
doneStopChannel := make(chan struct{})
Expand Down
Loading

0 comments on commit e3def84

Please sign in to comment.