Skip to content

Commit

Permalink
Merge pull request #211 from Maple-pro/dev
Browse files Browse the repository at this point in the history
fix(message): ChatGPT and mq
  • Loading branch information
liaosunny123 authored Aug 31, 2023
2 parents 89ca60a + e898a19 commit 1480e75
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 256 deletions.
13 changes: 9 additions & 4 deletions src/constant/strings/service.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
package strings

const (
// Exchange name
VideoExchange = "video_exchange"
EventExchange = "event"
MessageExchange = "message_exchange"

VideoPicker = "video_picker"
VideoSummary = "video_summary"
// Queue name
VideoPicker = "video_picker"
VideoSummary = "video_summary"
MessageCommon = "message_common"
MessageGPT = "message_gpt"

// Routing key
FavoriteActionEvent = "video.favorite.action"
VideoGetEvent = "video.get.action"
VideoCommentEvent = "video.comment.action"
VideoPublishEvent = "video.publish.action"

MessageActionEvent = "message.send"
MessageGptActionEvent = "message.gpt.send"
MessageActionEvent = "message.common"
MessageGptActionEvent = "message.gpt"
)
115 changes: 47 additions & 68 deletions src/services/message/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
"context"
"encoding/json"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"

grpc2 "GuGoTik/src/utils/grpc"
"time"

"github.com/go-redis/redis_rate/v10"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/robfig/cron/v3"
"gorm.io/gorm"

Expand All @@ -44,81 +44,34 @@ type MessageServiceImpl struct {
var conn *amqp.Connection
var channel *amqp.Channel

//输出

// 输出
func failOnError(err error, msg string) {
//打日志
logging.Logger.WithFields(logrus.Fields{
"err": err,
}).Errorf(msg)
if err != nil {
logging.Logger.WithFields(logrus.Fields{
"err": err,
}).Errorf(msg)
}
}

func (c MessageServiceImpl) New() {

var err error

conn, err = amqp.Dial(rabbitmq.BuildMQConnAddr())
if err != nil {
failOnError(err, "Failed to connect to RabbitMQ")
}
failOnError(err, "Failed to connect to RabbitMQ")

channel, err = conn.Channel()
if err != nil {
failOnError(err, "Failed to open a channel")
}
failOnError(err, "Failed to open a channel")

err = channel.ExchangeDeclare(
strings.MessageExchange,
"x-delayed-message",
true, false, false, false,
amqp.Table{
"x-delayed-type": "direct",
"x-delayed-type": "topic",
},
)
if err != nil {
failOnError(err, "Failed to get exchange")
}

_, err = channel.QueueDeclare(
strings.MessageActionEvent,
true, false, false, false,
nil,
)

if err != nil {
failOnError(err, "Failed to define queue")
}

_, err = channel.QueueDeclare(
strings.MessageGptActionEvent,
true, false, false, false,
nil,
)

if err != nil {
failOnError(err, "Failed to define queue")
}

err = channel.QueueBind(
strings.MessageActionEvent,
strings.MessageActionEvent,
strings.MessageExchange,
false,
nil,
)
if err != nil {
failOnError(err, "Failed to bind queue to exchange")
}

err = channel.QueueBind(
strings.MessageGptActionEvent,
strings.MessageGptActionEvent,
strings.MessageExchange,
false,
nil,
)
if err != nil {
failOnError(err, "Failed to bind queue to exchange")
}
failOnError(err, "Failed to get exchange")

userRpcConn := grpc2.Connect(config.UserRpcServerName)

Expand Down Expand Up @@ -243,7 +196,7 @@ func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.Action
"user_id": request.UserId,
"action_type": request.ActionType,
"content_text": request.Content,
}).Errorf("database insert error")
}).Errorf("database insert error")
logging.SetSpanError(span, err)
return res, err
}
Expand Down Expand Up @@ -380,43 +333,69 @@ func addMessage(ctx context.Context, fromUserId uint32, toUserId uint32, Context
ConversationId: conversationId,
}

//TO_DO 后面写mq?

body, err := json.Marshal(message)

if err != nil {
resp = &chat.ActionResponse{
StatusCode: strings.UnableToAddMessageErrorCode,
StatusMsg: strings.UnableToAddMessageError,
}
return
}
headers := rabbitmq.InjectAMQPHeaders(ctx)

headers := rabbitmq.InjectAMQPHeaders(ctx)
if message.ToUserId == config.EnvCfg.MagicUserId {
err = channel.PublishWithContext(ctx,
"", strings.MessageGptActionEvent, false, false,
logging.Logger.WithFields(logrus.Fields{
"routing": strings.MessageGptActionEvent,
"message": message,
}).Debugf("Publishing message to %s", strings.MessageGptActionEvent)
err = channel.PublishWithContext(
ctx,
strings.MessageExchange,
strings.MessageGptActionEvent,
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: body,
Headers: headers,
})
if err != nil {
logging.Logger.WithFields(logrus.Fields{
"err": err,
}).Errorf("Failed to publish message to %s", strings.MessageGptActionEvent)
}

} else {

err = channel.PublishWithContext(ctx, "", strings.MessageActionEvent, false, false,
logging.Logger.WithFields(logrus.Fields{
"routing": strings.MessageActionEvent,
"message": message,
}).Debugf("Publishing message to %s", strings.MessageActionEvent)
err = channel.PublishWithContext(
ctx,
strings.MessageExchange,
strings.MessageActionEvent,
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: body,
Headers: headers,
})
if err != nil {
logging.Logger.WithFields(logrus.Fields{
"err": err,
}).Errorf("Failed to publish message to %s", strings.MessageActionEvent)
}
}

// result := database.Client.WithContext(ctx).Create(&message)

if err != nil {
logging.Logger.WithFields(logrus.Fields{
"err": err,
}).Errorf("Error when publishing the message to mq")
resp = &chat.ActionResponse{
StatusCode: strings.UnableToAddMessageErrorCode,
StatusMsg: strings.UnableToAddMessageError,
Expand Down
Loading

0 comments on commit 1480e75

Please sign in to comment.