Skip to content

Commit

Permalink
Merge pull request #1 from Project-IPCA/develop
Browse files Browse the repository at this point in the history
[APP-0001] Fixed worker double ack to channel and add handle when l…
  • Loading branch information
CheIby authored Nov 28, 2024
2 parents cfc823b + 13433ba commit 048625e
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 72 deletions.
Binary file modified main
Binary file not shown.
157 changes: 88 additions & 69 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,89 +16,108 @@ import (
"gorm.io/gorm"
)

func main(){
func main() {
cfg := config.NewConfig()

db_pool := db.Init(cfg)
pubsub := redis_client.RedisClient(cfg)
rabbit := rabbitmq_client.RabbitMQClient(cfg)

pythonConsumer(db_pool,pubsub,rabbit,cfg)
}

func pythonConsumer(db_pool *gorm.DB, pubsub *redis.Client, rabbit *amqp.Connection,cfg *config.Config) {
for {
ch, err := rabbit.Channel()
if err != nil {
fmt.Printf("Failed to create a RabbitMQ channel: %v", err)
time.Sleep(5 * time.Second)
return
}
defer ch.Close()

// Assert the queue
_, err = ch.QueueDeclare(
cfg.RabbitMQ.QueueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
fmt.Printf("Failed to declare the RabbitMQ queue: %v", err)
if err := pythonConsumer(db_pool, pubsub, cfg); err != nil {
fmt.Printf("Consumer error: %v, retrying in 5 seconds...\n", err)
time.Sleep(5 * time.Second)
continue
}
}
}

err = ch.Qos(1, 0, false)
if err != nil {
fmt.Printf("Failed to set QoS: %v", err)
time.Sleep(5 * time.Second)
continue
}
func pythonConsumer(db_pool *gorm.DB, pubsub *redis.Client, cfg *config.Config) error {
rabbit, err := rabbitmq_client.RabbitMQClient(cfg)
if err != nil {
return fmt.Errorf("failed to connect to RabbitMQ: %v", err)
}
defer rabbit.Close()

msgs, err := ch.Consume(
cfg.RabbitMQ.QueueName, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
fmt.Printf("Failed to register RabbitMQ consumer: %v", err)
time.Sleep(5 * time.Second)
continue
}
ch, err := rabbit.Channel()
if err != nil {
return fmt.Errorf("failed to create channel: %v", err)
}
defer ch.Close()

_, err = ch.QueueDeclare(
cfg.RabbitMQ.QueueName,
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("failed to declare queue: %v", err)
}

err = ch.Qos(1, 0, false)
if err != nil {
return fmt.Errorf("failed to set QoS: %v", err)
}

msgs, err := ch.Consume(
cfg.RabbitMQ.QueueName,
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return fmt.Errorf("failed to start consuming: %v", err)
}

connCloseChan := make(chan *amqp.Error)
channelCloseChan := make(chan *amqp.Error)

ch.NotifyClose(channelCloseChan)
rabbit.NotifyClose(connCloseChan)

fmt.Println("Connected to RabbitMQ. Waiting for messages...")

for {
select {
case err := <-connCloseChan:
return fmt.Errorf("connection closed: %v", err)

fmt.Println("Waiting for messages...")
case err := <-channelCloseChan:
return fmt.Errorf("channel closed: %v", err)

for msg := range msgs {
fmt.Println(msgs)
var msgBody models.ReciveMessage
err := json.Unmarshal([]byte(msg.Body), &msgBody)
if err != nil {
fmt.Printf("Failed to parse message: %v", err)
msg.Nack(false, true)
continue
case msg, ok := <-msgs:
if !ok {
return fmt.Errorf("message channel closed")
}

fmt.Printf("%+v\n", msgBody)
jobType := msgBody.JobType

switch jobType {
case "upsert-testcase":
service.AddAndUpdateTestCase(ch,db_pool,msg,msgBody,pubsub)
msg.Ack(true)
case "exercise-submit":
service.RunSubmission(ch,db_pool,msg,msgBody,pubsub)
default:
fmt.Printf("job_type not wtf")
msg.Ack(true)
if err := processMessage(ch, db_pool, msg, pubsub); err != nil {
fmt.Printf("Error processing message: %v\n", err)
}
msg.Ack(false)
}
}
}
}

func processMessage(ch *amqp.Channel, db_pool *gorm.DB, msg amqp.Delivery, pubsub *redis.Client) error {
var msgBody models.ReciveMessage
if err := json.Unmarshal(msg.Body, &msgBody); err != nil {
msg.Nack(false, true)
return fmt.Errorf("failed to parse message: %v", err)
}

fmt.Printf("Processing message: %+v\n", msgBody)

switch msgBody.JobType {
case "upsert-testcase":
service.AddAndUpdateTestCase(ch, db_pool, msg, msgBody, pubsub)
case "exercise-submit":
service.RunSubmission(ch, db_pool, msg, msgBody, pubsub)
default:
fmt.Printf("Unknown job type: %s\n", msgBody.JobType)
msg.Ack(false)
}
return nil
}
6 changes: 3 additions & 3 deletions rabbitmq_client/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
amqp "github.com/rabbitmq/amqp091-go"
)

func RabbitMQClient(cfg *config.Config) *amqp.Connection {
func RabbitMQClient(cfg *config.Config) (*amqp.Connection,error) {
url := fmt.Sprintf("amqp://%s:%s@%s:%s/", cfg.RabbitMQ.User, cfg.RabbitMQ.Password, cfg.RabbitMQ.Host, cfg.RabbitMQ.Port)

conn, err := amqp.Dial(url)
if err != nil {
panic("Failed to connect to RabbitMQ: " + err.Error())
return nil,fmt.Errorf("Failed to connect to RabbitMQ : %v" ,err.Error())
}

return conn
return conn,nil
}

0 comments on commit 048625e

Please sign in to comment.