From 971b953242944bbb09410c51fc3cc68c470ecb3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=BB=8B=E6=B6=A6?= <807046079@qq.com> Date: Mon, 18 Nov 2024 00:00:06 +0800 Subject: [PATCH] v1.0.1 --- .github/config.yaml | 54 +++ .github/workflows/CI.yml | 4 +- .../workflows/common-test-and-update-docs.yml | 76 +++- .github/workflows/develop-CI.yml | 2 + README.en.md | 2 +- README.md | 2 +- api/v1/account.go | 8 +- api/v1/user.go | 5 +- docs/beforeDocsMake/renameModel/main.go | 7 +- global/constant/constant.go | 50 ++- global/cron/enter.go | 11 +- global/nats/manager/consumer.go | 188 +++++++++ global/nats/manager/customer.go | 27 -- global/nats/manager/dlq.go | 197 +++++----- global/nats/manager/enter.go | 41 +- global/nats/manager/event.go | 78 ++-- global/nats/manager/manager.go | 94 ++--- global/nats/manager/manager_test.go | 367 ++++++++++++------ global/nats/manager/task.go | 55 ++- global/nats/publicFunc_test.go | 294 ++++++++------ initialize/initialize.go | 5 +- initialize/logger.go | 21 +- initialize/nats.go | 2 - model/product/enter.go | 6 +- router/websocket/websocket.go | 17 +- service/category/enter.go | 16 - service/product/bill/reader.go | 22 +- service/template/enter.go | 6 +- service/thirdparty/email.go | 14 +- service/thirdparty/email/enter.go | 10 +- service/thirdparty/enter.go | 12 - service/thirdparty/task.go | 15 +- service/transaction/enter.go | 18 +- util/log/log.go | 30 -- 34 files changed, 1080 insertions(+), 676 deletions(-) create mode 100644 .github/config.yaml create mode 100644 global/nats/manager/consumer.go delete mode 100644 global/nats/manager/customer.go delete mode 100644 util/log/log.go diff --git a/.github/config.yaml b/.github/config.yaml new file mode 100644 index 00000000..decab05b --- /dev/null +++ b/.github/config.yaml @@ -0,0 +1,54 @@ +# 服务初始化的配置文件 执行代码详见 ./initialize/initialize.go +# This is the service initialization configuration file execution code see ./initialize/initialize.go + +# can set `debug` or `production` +# `debug` mode can print mysql and redis logs and `/public/swagger/index.html` api +Mode: debug + +System: + Addr: 8080 + RouterPrefix: "" + JwtKey: "" + # Some apis use symmetric signature keys that can be configured to improve security + ClientSignKey: "" + +Redis: + Addr: "localhost:6379" + Password: "" + Db: 0 + LockDb: 1 + +Mysql: + Path: "localhost" + Port: "3306" + Config: "parseTime=True&loc=Local" + DbName: "leap_ledger" + Username: "root" + Password: "" + +Nats: + ServerUrl: localhost:4222 + # This is the topic that the consumer server needs to subscribe to, such as createTourist, statisticUpdate, transactionSync. + # subjects see ./global/nats/nats.go + Subjects: [all] + +Captcha: + KeyLong: 6 + ImgWidth: 180 + ImgHeight: 50 + OpenCaptcha: 0 + OpenCaptchaTimeout: 3600 + EmailCaptcha: 0 + EmailCaptchaTimeOut: 3600 + +ThirdParty: + # WeCom used to send domain email + WeCom: + CorpId: "" + CorpSecret: "" + # Ai is used to obtain Chinese similarity, which is used to match transaction types + # https://huggingface.co/google-bert/bert-base-chinese + Ai: + Host: "" # leap-ledger-ai-server + Port: "" # 5000 + MinSimilarity: 0.85 \ No newline at end of file diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 16bdaf66..05c56cd8 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -10,7 +10,9 @@ jobs: uses: ./.github/workflows/common-test-and-update-docs.yml with: ref: ${{ github.ref }} - + secrets: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + push-image: needs: test-and-update-docs uses: ./.github/workflows/common-push-image.yml diff --git a/.github/workflows/common-test-and-update-docs.yml b/.github/workflows/common-test-and-update-docs.yml index f5b9824f..eb942a7e 100644 --- a/.github/workflows/common-test-and-update-docs.yml +++ b/.github/workflows/common-test-and-update-docs.yml @@ -6,6 +6,11 @@ on: ref: required: true type: string + secrets: + CODECOV_TOKEN: + required: true +permissions: + contents: read jobs: test-and-update-docs: @@ -15,25 +20,49 @@ jobs: - name: Checkout code uses: actions/checkout@v4 - - name: Build image - run: | - docker build -t xiaozirun/leap-ledger:latest -f docker/Dockerfile . + - name: Set up Go + uses: actions/setup-go@v2 + with: + go-version: 1.23.0 - name: Set up Docker Compose run: | sudo curl -L "https://github.com/docker/compose/releases/latest/download/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose sudo chmod +x /usr/local/bin/docker-compose - - name: Start server + - name: Copy config.yaml + run: cp -f .github/config.yaml config.yaml + + - name: Start mysql + run: | + docker-compose -f docker-compose.yaml up -d leap-ledger-mysql leap-ledger-nats leap-ledger-redis + + - name: Wait for MySQL to be ready run: | - docker-compose -f docker-compose.yaml up -d leap-ledger-mysql && docker-compose -f docker-compose.yaml up -d + # Set the maximum retry count and interval (e.g., 30 retries, 5 seconds each) + max_retries=30 + retries=0 + until docker exec leap-ledger-mysql mysqladmin -u root -p"$MYSQL_ROOT_PASSWORD" ping --silent; do + if [ $retries -ge $max_retries ]; then + echo "MySQL failed to start after $max_retries retries!" + exit 1 + fi + retries=$((retries + 1)) + echo "Waiting for MySQL to be ready... Retry $retries/$max_retries" + sleep 5 + done - - name: Run tests and make report + - name: Go mod run: | - docker exec leap-ledger-server sh -c "go install github.com/jstemmer/go-junit-report@latest" - docker exec leap-ledger-server sh -c "go test -v 2>&1 ./... -coverprofile=docs/coverage.out | go-junit-report > docs/test-report.xml" - docker exec leap-ledger-server sh -c "go tool cover -html=docs/coverage.out -o docs/coverage.html" - continue-on-error: false + sudo chmod -R 666 ./docker + go mod download + go mod tidy + + - name: Exec test + run: | + go install github.com/jstemmer/go-junit-report/v2@latest + go test -timeout 5m -v 2>&1 ./... -coverprofile=docs/coverage.out | tee docs/test-process.log + go-junit-report < docs/test-process.log > docs/test-report.xml - name: Upload test-report.xml uses: actions/upload-artifact@v4 @@ -41,17 +70,28 @@ jobs: name: test-report.xml path: docs/test-report.xml - - name: Upload coverage.html - uses: actions/upload-artifact@v4 + - name: Upload results to Codecov + uses: codecov/codecov-action@v4 with: - name: coverage-report - path: docs/coverage.html + files: docs/coverage.out + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - - name: Execute updateDocs.sh in container - run: docker exec leap-ledger-server sh "./docs/updateDocs.sh" + - name: Upload test results to Codecov + if: ${{ !cancelled() }} + uses: codecov/test-results-action@v1 + with: + files: docs/test-report.xml + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + + - name: Update docs + run: | + sudo chmod -R 777 ./ + go install github.com/swaggo/swag/cmd/swag@latest + swag init -p pascalcase - name: Commit and push changes - # https://github.com/actions/checkout/discussions/479 run: | git config --global user.name "github-actions[bot]" git config --global user.email "github-actions[bot]@users.noreply.github.com" @@ -61,7 +101,7 @@ jobs: else git commit -m "update docs" || echo "No changes to commit." git push origin ${{ inputs.ref }} - fi + continue-on-error: true - name: Docker Compose Down run: | diff --git a/.github/workflows/develop-CI.yml b/.github/workflows/develop-CI.yml index 25f91eb0..8e29437e 100644 --- a/.github/workflows/develop-CI.yml +++ b/.github/workflows/develop-CI.yml @@ -10,6 +10,8 @@ jobs: uses: ./.github/workflows/common-test-and-update-docs.yml with: ref: ${{ github.ref }} + secrets: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} push-image: needs: test-and-update-docs diff --git a/README.en.md b/README.en.md index 7ed309c4..2d0f440b 100644 --- a/README.en.md +++ b/README.en.md @@ -89,7 +89,7 @@ docker-compose up -d ``` Access http://localhost:8080/public/health to verify the service. -For custom configurations, refer to: [./config.yaml](./config.yaml) +If you don't want to rely on Docker, you can modify the request addresses of mysql, nats, and redis in the [./config.yaml](./config.yaml) file and run it locally For client packaging details, visit: https://github.com/ZiRunHua/LeapLedger-App diff --git a/README.md b/README.md index ddaa66f7..5f8fe279 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,7 @@ docker-compose up -d ``` 访问http://localhost:8080/public/health 验证服务 -自定义配置详见:[./config.yaml](./config.yaml) +如果不想依赖Docker,可以修改[./config.yaml](./config.yaml)文件中的mysql、nats和redis的请求地址等信息后直接在本地运行 客户端打包详见:https://github.com/ZiRunHua/LeapLedger-App diff --git a/api/v1/account.go b/api/v1/account.go index 85a20bcf..3178e860 100644 --- a/api/v1/account.go +++ b/api/v1/account.go @@ -3,6 +3,7 @@ package v1 import ( "context" "errors" + "golang.org/x/sync/errgroup" "time" "github.com/ZiRunHua/LeapLedger/api/request" @@ -16,7 +17,6 @@ import ( userModel "github.com/ZiRunHua/LeapLedger/model/user" "github.com/ZiRunHua/LeapLedger/util/timeTool" "github.com/gin-gonic/gin" - "github.com/songzhibin97/gkit/egroup" "gorm.io/gorm" ) @@ -570,7 +570,7 @@ func (a *AccountApi) GetUserInfo(ctx *gin.Context) { return } accountUser, account, nowTime := contextFunc.GetAccountUser(ctx), contextFunc.GetAccount(ctx), contextFunc.GetNowTime(ctx) - group := egroup.WithContext(ctx) + var group errgroup.Group var todayTotal, monthTotal *global.IEStatisticWithTime var recentTrans *response.TransactionDetailList for _, infoType := range requestData.Types { @@ -990,9 +990,9 @@ func (a *AccountApi) GetInfo(ctx *gin.Context) { return nil } // process and response - var group *egroup.Group + var group *errgroup.Group if len(types) > 1 { - group = egroup.WithContext(ctx) + group = &errgroup.Group{} } for i := range types { t := types[i] diff --git a/api/v1/user.go b/api/v1/user.go index e5b81e84..1565d321 100644 --- a/api/v1/user.go +++ b/api/v1/user.go @@ -1,6 +1,7 @@ package v1 import ( + "golang.org/x/sync/errgroup" "time" "github.com/ZiRunHua/LeapLedger/api/request" @@ -17,7 +18,6 @@ import ( "github.com/gin-gonic/gin" "github.com/golang-jwt/jwt/v5" "github.com/pkg/errors" - "github.com/songzhibin97/gkit/egroup" ) type UserApi struct { @@ -458,8 +458,7 @@ func (u *UserApi) Home(ctx *gin.Context) { if false == pass { return } - - group := egroup.WithContext(ctx) + var group errgroup.Group nowTime, timeLocation := account.GetNowTime(), account.GetTimeLocation() year, month, day := nowTime.Date() var todayData, yesterdayData, weekData, monthData, yearData response.TransactionStatistic diff --git a/docs/beforeDocsMake/renameModel/main.go b/docs/beforeDocsMake/renameModel/main.go index e0987785..cf678229 100644 --- a/docs/beforeDocsMake/renameModel/main.go +++ b/docs/beforeDocsMake/renameModel/main.go @@ -2,7 +2,6 @@ package main import ( "bufio" - "github.com/ZiRunHua/LeapLedger/global/constant" "go/ast" "go/parser" "go/token" @@ -10,11 +9,13 @@ import ( "os" "path/filepath" "strings" + + "github.com/ZiRunHua/LeapLedger/global/constant" ) func main() { - handleDir(constant.WORK_PATH + "/api/request/") - handleDir(constant.WORK_PATH + "/api/response/") + handleDir(filepath.Clean(constant.RootDir + "/api/request/")) + handleDir(filepath.Clean(constant.RootDir + "/api/response/")) } func handleDir(path string) { _ = filepath.Walk( diff --git a/global/constant/constant.go b/global/constant/constant.go index 385ff918..e55ca95c 100644 --- a/global/constant/constant.go +++ b/global/constant/constant.go @@ -1,19 +1,26 @@ package constant -type ServerMode string - -var Debug, Production ServerMode = "debug", "production" +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" +) -const WORK_PATH = "/go/LeapLedger" -const RUNTIME_DATA_PATH = WORK_PATH + "/runtime/data" +type ServerMode string -const LOG_PATH = WORK_PATH + "/log" -const DATA_PATH = WORK_PATH + "/data" +const Debug, Production ServerMode = "debug", "production" -var ExampleAccountJsonPath = DATA_PATH + "/template/account/example.json" +var ( + RootDir = getRootDir() + LogPath = filepath.Join(RootDir, "log") + DataPath = filepath.Join(RootDir, "data") + ExampleAccountJsonPath = filepath.Clean(DataPath + "/template/account/example.json") +) // IncomeExpense 收支类型 -type IncomeExpense string //@name IncomeExpense `example:"expense" enums:"income,expense" swaggertype:"string"` +type IncomeExpense string // @name IncomeExpense `example:"expense" enums:"income,expense" swaggertype:"string"` const ( Income IncomeExpense = "income" @@ -82,3 +89,28 @@ const ( // nats type Subject string + +func getRootDir() string { + // `os.Getwd()` is avoided here because, during tests, the working directory is set to the test file’s directory. + // This command retrieves the module's root directory instead. + // Source of `go list` usage: https://stackoverflow.com/a/75943840/23658318 + rootDir, err := exec.Command("go", "list", "-m", "-f", "{{.Dir}}").Output() + if err == nil { + return strings.TrimSpace(string(rootDir)) + } + // If `go list` fails, it may indicate the absence of a Go environment. + // In such cases, this suggests we are not in a test environment, so fall back to `os.Getwd()` to set `RootDir`. + workDir, err := os.Getwd() + if err != nil { + panic(err) + } + // Validate that the directory exists + _, err = os.Stat(workDir) + if err != nil { + if os.IsNotExist(err) { + panic(fmt.Sprintf("Path:%s does not exists", workDir)) + } + panic(err) + } + return workDir +} diff --git a/global/cron/enter.go b/global/cron/enter.go index 5f4e45a8..4b1f6d6c 100644 --- a/global/cron/enter.go +++ b/global/cron/enter.go @@ -2,24 +2,25 @@ package cron import ( "context" + "path/filepath" + + "github.com/ZiRunHua/LeapLedger/global" "github.com/ZiRunHua/LeapLedger/global/constant" "github.com/ZiRunHua/LeapLedger/global/nats" "github.com/ZiRunHua/LeapLedger/initialize" - "github.com/ZiRunHua/LeapLedger/util/log" "go.uber.org/zap" ) -const logPath = constant.LOG_PATH + "/cron.log" - var ( - logger *zap.Logger + logPath = filepath.Join(constant.LogPath, "cron.log") + logger *zap.Logger Scheduler = initialize.Scheduler ) func init() { var err error - logger, err = log.GetNewZapLogger(logPath) + logger, err = global.Config.Logger.New(logPath) if err != nil { panic(err) } diff --git a/global/nats/manager/consumer.go b/global/nats/manager/consumer.go new file mode 100644 index 00000000..076e64a0 --- /dev/null +++ b/global/nats/manager/consumer.go @@ -0,0 +1,188 @@ +package manager + +import ( + "context" + "errors" + "runtime/debug" + "strings" + "time" + + "github.com/ZiRunHua/LeapLedger/util/dataTool" + "github.com/nats-io/nats.go/jetstream" + "go.uber.org/zap" +) + +type ( + // ConsumerManger is used to manage the consumer group in the stream + // It records the consumption method of the consumption group, + // which will help with the retry of messages in the dead letter queue + ConsumerManger interface { + NewConsumer(context.Context, func(*jetstream.ConsumerConfig) error, consumerMessageHandler) ( + jetstream.Consumer, error, + ) + Consume(context.Context, jetstream.Consumer, consumerMessageHandler) error + ReConsume(ctx context.Context, consumerName string, msg *jetstream.RawStreamMsg) error + UpdateAllConsumerConfig(func(*jetstream.ConsumerConfig) error, context.Context) error + } + + consumerManger struct { + stream jetstream.Stream + // consumer is main consumer,other consumers will be based on the configuration of this consumer + consumer jetstream.Consumer + consumerMessageHandler dataTool.Map[string, consumerMessageHandler] + + logger *zap.Logger + } + consumerMessageHandler func(subject string, payload []byte) error +) + +func NewConsumerManger(_ context.Context, stream jetstream.Stream, consumer jetstream.Consumer, logger *zap.Logger) ( + ConsumerManger, error, +) { + var cm consumerManger + cm.stream, cm.consumer, cm.logger = stream, consumer, logger + cm.consumerMessageHandler = dataTool.NewSyncMap[string, consumerMessageHandler]() + return &cm, nil +} + +func (cm *consumerManger) ReConsume(_ context.Context, consumerName string, msg *jetstream.RawStreamMsg) error { + handler, exist := cm.consumerMessageHandler.Load(consumerName) + if !exist { + return errors.New("consumer message handler not found") + } + return handler(msg.Subject, msg.Data) +} + +func (cm *consumerManger) createOrUpdateConsumer(ctx context.Context, config jetstream.ConsumerConfig) ( + consumer jetstream.Consumer, err error, +) { + return cm.stream.CreateOrUpdateConsumer(ctx, config) +} + +func (cm *consumerManger) Consume( + ctx context.Context, consumer jetstream.Consumer, handle consumerMessageHandler, +) (err error) { + info, err := consumer.Info(ctx) + if err != nil { + return err + } + _, err = consumer.Consume(cm.ReceiveMsg(info.Name, handle)) + if err != nil { + return err + } + cm.consumerMessageHandler.Store(info.Name, handle) + return nil +} + +func (cm *consumerManger) ReceiveMsg(name string, handle consumerMessageHandler) jetstream.MessageHandler { + return func(msg jetstream.Msg) { + var err error + defer func() { + if r := recover(); r != nil { + cm.logger.Panic( + msg.Subject(), zap.String("consumer", name), zap.ByteString("data", msg.Data()), + zap.Any("panic", r), zap.Stack(string(debug.Stack())), + ) + err = msg.Nak() + } else if err != nil { + cm.logger.Error(msg.Subject(), zap.String("consumer", name), zap.ByteString("data", msg.Data()), + zap.Error(err)) + err = msg.Nak() + } else { + err = msg.Ack() + } + if err != nil { + cm.logger.Error(msg.Subject(), zap.String("consumer", name), zap.ByteString("data", msg.Data()), + zap.Error(err)) + } + }() + err = handle(msg.Subject(), msg.Data()) + } +} + +func (cm *consumerManger) NewConsumer( + ctx context.Context, + setConfig func(*jetstream.ConsumerConfig) error, + handler consumerMessageHandler) (jetstream.Consumer, error) { + info, err := cm.consumer.Info(ctx) + if err != nil { + return nil, err + } + config := info.Config + err = setConfig(&config) + if err != nil { + return nil, err + } + if strings.Compare(config.Name, info.Config.Name) == 0 || + strings.Compare(config.Durable, info.Config.Durable) == 0 { + return nil, errors.New("new consumer has the same name as the main consumer") + } + consumer, err := cm.createOrUpdateConsumer(ctx, config) + if err != nil { + return nil, err + } + if handler == nil { + handler = func(_ string, _ []byte) error { return nil } + } + _, err = consumer.Consume(cm.ReceiveMsg(config.Name, handler)) + if err != nil { + return nil, err + } + cm.consumerMessageHandler.Store(config.Name, handler) + return consumer, nil +} + +func (cm *consumerManger) iterateConsumers( + ctx context.Context, +) (func(yield func(*jetstream.ConsumerInfo) bool), error) { + consumersList := cm.stream.ListConsumers(ctx) + if err := consumersList.Err(); err != nil { + return nil, err + } + return func(yield func(*jetstream.ConsumerInfo) bool) { + for info := range consumersList.Info() { + if !yield(info) { + return + } + } + }, nil +} + +func (cm *consumerManger) UpdateAllConsumerConfig( + handler func(*jetstream.ConsumerConfig) error, ctx context.Context, +) error { + consumers, err := cm.iterateConsumers(ctx) + if err != nil { + return err + } + for info := range consumers { + err = handler(&info.Config) + if err != nil { + return err + } + _, err = cm.createOrUpdateConsumer(ctx, info.Config) + if err != nil { + return err + } + } + return nil +} + +type pullConsumer struct { + consumer jetstream.Consumer +} + +func (mi *pullConsumer) updateConfig( + js jetstream.JetStream, + streamName string, + config jetstream.ConsumerConfig, +) (err error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + mi.consumer, err = js.CreateOrUpdateConsumer(ctx, streamName, config) + return err +} + +func (mi *pullConsumer) fetchMsg(batch int) (jetstream.MessageBatch, error) { + return mi.consumer.FetchNoWait(batch) +} diff --git a/global/nats/manager/customer.go b/global/nats/manager/customer.go deleted file mode 100644 index d6ec8c11..00000000 --- a/global/nats/manager/customer.go +++ /dev/null @@ -1,27 +0,0 @@ -package manager - -import ( - "context" - "time" - - "github.com/nats-io/nats.go/jetstream" -) - -type pullCustomer struct { - consumer jetstream.Consumer -} - -func (mi *pullCustomer) updateConfig( - js jetstream.JetStream, - streamName string, - config jetstream.ConsumerConfig, -) (err error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - defer cancel() - mi.consumer, err = js.CreateOrUpdateConsumer(ctx, streamName, config) - return err -} - -func (mi *pullCustomer) fetchMsg(batch int) (jetstream.MessageBatch, error) { - return mi.consumer.FetchNoWait(batch) -} diff --git a/global/nats/manager/dlq.go b/global/nats/manager/dlq.go index 9b6105a6..daeeb725 100644 --- a/global/nats/manager/dlq.go +++ b/global/nats/manager/dlq.go @@ -2,46 +2,58 @@ package manager // dead letter queue import ( + "encoding/json" "errors" "fmt" + "path/filepath" + "runtime/debug" - "github.com/ZiRunHua/LeapLedger/util/dataTool" natsServer "github.com/nats-io/nats-server/v2/server" + + "context" + "github.com/ZiRunHua/LeapLedger/util/dataTool" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" "go.uber.org/zap" ) -import ( - "context" - "encoding/json" -) - // The DlqManager is used to manage dead letters. // Use a dead letter queue by passing in a jetstream.Stream registration. // Messages in DlqManager will use stream_seq to query the complete message on the registered stream, // provided that the message still exists, and the message will be published as a new message on the registered stream. -type DlqManager interface { - RepublishBatch(batch int, ctx context.Context) (int, error) -} +type ( + DlqManager interface { + RepublishBatch(batch int, ctx context.Context) (msgNum int, err error) + } -const dlqName = "dlq" -const dlqPrefix = "dlq" -const dlqLogPath = natsLogPath + "dlq.log" + dlqManager struct { + DlqManager -type dlqManager struct { - DlqManager + manageInitializers + register dlqStreamRegister + pullConsumer pullConsumer + } - manageInitializers - dlqMsgHandler - register dlqStreamRegister - pullCustomer pullCustomer -} + dlqRegisterStream interface { + getStreamName() string + reConsume(ctx context.Context, consumer string, streamSeq uint64) error + } +) -func (dm *dlqManager) init(js jetstream.JetStream, registerStream []jetstream.Stream, logger *zap.Logger) (err error) { - dm.logger, dm.register.streamMap = logger, dataTool.NewSyncMap[string, jetstream.Stream]() +const ( + dlqName = "dlq" + dlqPrefix = "dlq" +) - err = dm.register.register(registerStream...) +var ( + dlqLogPath = filepath.Join(natsLogPath, "dlq.log") +) + +func (dm *dlqManager) init( + js jetstream.JetStream, registerStreams []dlqRegisterStream, logger *zap.Logger) (err error) { + dm.register.streamMap = dataTool.NewSyncMap[string, dlqRegisterStream]() + + err = dm.register.register(registerStreams...) if err != nil { return err } @@ -54,21 +66,21 @@ func (dm *dlqManager) init(js jetstream.JetStream, registerStream []jetstream.St Subjects: subjects, Retention: jetstream.InterestPolicy, } - customerConfig := jetstream.ConsumerConfig{ - Name: dlqPrefix + "_customer", - Durable: dlqPrefix + "_customer", + consumerConfig := jetstream.ConsumerConfig{ + Name: dlqPrefix + "_consumer", + Durable: dlqPrefix + "_consumer", AckPolicy: jetstream.AckExplicitPolicy, MaxDeliver: 0, } - err = dm.manageInitializers.init(js, streamConfig, customerConfig) + err = dm.manageInitializers.init(js, streamConfig, consumerConfig, logger) if err != nil { return err } - err = dm.pullCustomer.updateConfig( + err = dm.pullConsumer.updateConfig( js, streamConfig.Name, jetstream.ConsumerConfig{ - Name: dlqPrefix + "_pull_customer", - Durable: dlqPrefix + "_pull_customer", + Name: dlqPrefix + "_pull_consumer", + Durable: dlqPrefix + "_pull_consumer", AckPolicy: jetstream.AckExplicitPolicy, MaxDeliver: 0, }, @@ -76,93 +88,79 @@ func (dm *dlqManager) init(js jetstream.JetStream, registerStream []jetstream.St if err != nil { return err } - _, err = dm.consumer.Consume(dm.receiveMsg) - return err + return dm.manageInitializers.setMainConsumerConsume( + context.TODO(), func(subject string, payload []byte) error { + dm.logger.Info( + "msg", zap.String("subject", subject), + zap.ByteString("data", payload), + ) + return nil + }, + ) } -func (dm *dlqManager) RepublishBatch(batch int, ctx context.Context) (int, error) { - msgBatch, err := dm.pullCustomer.fetchMsg(batch) +func (dm *dlqManager) RepublishBatch(batch int, ctx context.Context) (msgNum int, err error) { + msgBatch, err := dm.pullConsumer.fetchMsg(batch) if err != nil { if errors.Is(err, nats.ErrMsgNotFound) { return 0, nil } return 0, err } - var count int + for msg := range msgBatch.Messages() { - count++ - err = dm.republishDieMsg(msg, ctx) - if err != nil { - dm.logger.Error("republishDieMsg err", zap.Error(err)) - err = msg.Nak() - if err != nil { - dm.logger.Error("Republish nck", zap.Error(err)) - } - } else { - err = msg.Ack() - if err != nil { - dm.logger.Error("Republish ack", zap.Error(err)) - } - } + msgNum++ + dm.republishDieMsg(ctx, msg) } - return count, nil + return msgNum, err } -func (dm *dlqManager) republishDieMsg(msg jetstream.Msg, ctx context.Context) (err error) { +func (dm *dlqManager) republishDieMsg(ctx context.Context, msg jetstream.Msg) (isSuccess bool) { + var err error + defer func() { + if r := recover(); r != nil { + dm.logger.Panic( + "reConsume", zap.String("subject", msg.Subject()), zap.ByteString("data", msg.Data()), + zap.Any("panic", r), zap.Stack(string(debug.Stack())), + ) + return + } + if err == nil { + err = msg.Ack() + } + if err != nil { + dm.logger.Error( + "reConsume", zap.String("subject", msg.Subject()), zap.ByteString("data", msg.Data()), + zap.Error(err), + ) + } + }() var advisory natsServer.JSConsumerDeliveryExceededAdvisory err = json.Unmarshal(msg.Data(), &advisory) if err != nil { - return err - } - var republishMsg *nats.Msg - republishMsg, err = dm.getMsgByAdvisory(advisory, ctx) - if err != nil { - return err + return } - _, err = dm.js.PublishMsg(ctx, republishMsg) - return err -} - -func (dm *dlqManager) getMsgByAdvisory(advisory natsServer.JSConsumerDeliveryExceededAdvisory, ctx context.Context) ( - *nats.Msg, error, -) { - streamRawMsg, err := dm.register.selectMsgByDeliveryExceededAdvisory(advisory, ctx) - if err != nil { - return nil, err + stream, exist := dm.register.streamMap.Load(advisory.Stream) + if !exist { + err = ErrStreamNotExist + return } - return &nats.Msg{ - Subject: streamRawMsg.Subject, - Data: streamRawMsg.Data, - Header: streamRawMsg.Header, - }, nil -} - -type dlqMsgHandler struct { - logger *zap.Logger -} - -func (dmh *dlqMsgHandler) receiveMsg(msg jetstream.Msg) { - dmh.logMsg(msg) - err := msg.Ack() - if err != nil { - dmh.logger.Error("receive msg", zap.Error(err)) + err = stream.reConsume(ctx, advisory.Consumer, advisory.StreamSeq) + if errors.Is(err, jetstream.ErrMsgNotFound) { + // ignore + err = nil + return false } -} - -func (dmh *dlqMsgHandler) logMsg(msg jetstream.Msg) { - dmh.logger.Info( - "msg", zap.String(msgHeaderKeySubject, msg.Headers().Get(msgHeaderKeySubject)), - zap.String("data", string(msg.Data())), - ) + return err == nil } type dlqStreamRegister struct { - streamMap dataTool.Map[string, jetstream.Stream] + streamMap dataTool.Map[string, dlqRegisterStream] } -func (dsr *dlqStreamRegister) register(streams ...jetstream.Stream) error { +func (dsr *dlqStreamRegister) register(streams ...dlqRegisterStream) error { for _, stream := range streams { - dsr.streamMap.LoadOrStore(stream.CachedInfo().Config.Name, stream) + dsr.streamMap.LoadOrStore(stream.getStreamName(), stream) } return nil } @@ -170,11 +168,11 @@ func (dsr *dlqStreamRegister) register(streams ...jetstream.Stream) error { func (dsr *dlqStreamRegister) getMaxDeliveriesEvents() ([]string, error) { var events []string dsr.streamMap.Range( - func(_ string, stream jetstream.Stream) bool { + func(_ string, stream dlqRegisterStream) bool { events = append( events, fmt.Sprintf( "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.%s.*", - stream.(jetstream.Stream).CachedInfo().Config.Name, + stream.getStreamName(), ), ) return true @@ -182,14 +180,3 @@ func (dsr *dlqStreamRegister) getMaxDeliveriesEvents() ([]string, error) { ) return events, nil } - -func (dsr *dlqStreamRegister) selectMsgByDeliveryExceededAdvisory( - advisory natsServer.JSConsumerDeliveryExceededAdvisory, - ctx context.Context, -) (*jetstream.RawStreamMsg, error) { - stream, exist := dsr.streamMap.Load(advisory.Stream) - if !exist { - return nil, ErrStreamNotExist - } - return stream.(jetstream.Stream).GetMsg(ctx, advisory.StreamSeq) -} diff --git a/global/nats/manager/enter.go b/global/nats/manager/enter.go index da164e7a..0ab01c3d 100644 --- a/global/nats/manager/enter.go +++ b/global/nats/manager/enter.go @@ -1,12 +1,13 @@ package manager import ( + "path/filepath" + + "github.com/ZiRunHua/LeapLedger/global" "github.com/ZiRunHua/LeapLedger/global/constant" "github.com/ZiRunHua/LeapLedger/initialize" - "github.com/ZiRunHua/LeapLedger/util/log" "github.com/nats-io/nats.go/jetstream" "go.uber.org/zap" - "runtime/debug" ) var ( @@ -24,7 +25,7 @@ var ( DlqManage DlqManager ) -const natsLogPath = constant.LOG_PATH + "/nats/" +var natsLogPath = filepath.Join(constant.LogPath, "nats") var ( taskLogger *zap.Logger @@ -33,24 +34,20 @@ var ( ) func init() { - initManager() -} - -func initManager() { var err error js, err = jetstream.New(natsConn) if err != nil { panic(err) } - taskLogger, err = log.GetNewZapLogger(natsTaskLogPath) + taskLogger, err = global.Config.Logger.New(natsTaskLogPath) if err != nil { panic(err) } - eventLogger, err = log.GetNewZapLogger(natsEventLogPath) + eventLogger, err = global.Config.Logger.New(natsEventLogPath) if err != nil { panic(err) } - dlqLogger, err = log.GetNewZapLogger(dlqLogPath) + dlqLogger, err = global.Config.Logger.New(dlqLogPath) if err != nil { panic(err) } @@ -79,30 +76,8 @@ func initManager() { dlqManage = &dlqManager{} DlqManage = dlqManage - err = dlqManage.init(js, []jetstream.Stream{taskManage.stream, eventManage.stream}, dlqLogger) + err = dlqManage.init(js, []dlqRegisterStream{taskManage, eventManage}, dlqLogger) if err != nil { panic(err) } } - -func receiveMsg(msg jetstream.Msg, handle func(msg jetstream.Msg) error, logger *zap.Logger) { - var err error - defer func() { - r := recover() - if r == nil { - if err != nil { - logger.Error("receiveMsg err", zap.Error(err)) - err = msg.Nak() - } else { - err = msg.Ack() - } - } else { - logger.Error("receiveMsg panic", zap.Any("panic", r), zap.Stack(string(debug.Stack()))) - err = msg.Nak() - } - if err != nil { - logger.Error("receiveMsg ack err", zap.Error(err)) - } - }() - err = handle(msg) -} diff --git a/global/nats/manager/event.go b/global/nats/manager/event.go index b4c010f1..ae7179a2 100644 --- a/global/nats/manager/event.go +++ b/global/nats/manager/event.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "path/filepath" "runtime" "time" @@ -16,26 +17,28 @@ import ( // EventManager is used to manage events. // EventManager has a dedicated consumer group that publishes tasks to TaskManage when events are triggered, -// and new consumer groups can be created to consume events. +// and new consumer groups can be created to Consume events. // These consumer groups use the same stream. type EventManager interface { Publish(event Event, payload []byte) bool Subscribe(event Event, triggerTask Task, fetchTaskData func(eventData []byte) ([]byte, error)) SubscribeToNewConsumer(event Event, name string, handler MessageHandler) + updateAllConsumerConfig(func(*jetstream.ConsumerConfig) error, context.Context) error } const ( - natsEventName = "event" - natsEventPrefix = "event" - natsEventLogPath = natsLogPath + "event.log" + natsEventName = "event" + natsEventPrefix = "event" +) + +var ( + natsEventLogPath = filepath.Join(natsLogPath, "event.log") ) type Event string func (t Event) subject() string { return fmt.Sprintf("%s.subject_%s", natsEventPrefix, t) } -func (t Event) queue() string { return fmt.Sprintf("%s.queue_%s", natsEventPrefix, t) } - const EventRetryTriggerTask Event = "retry_trigger_task" type RetryTriggerTask struct { @@ -47,10 +50,12 @@ type eventManager struct { EventManager manageInitializers eventMsgHandler + + dlqRegisterStream } func (em *eventManager) init(js jetstream.JetStream, taskManage *taskManager, logger *zap.Logger) error { - em.eventMsgHandler.init(logger) + em.eventMsgHandler.init() em.taskManage = taskManage streamConfig := jetstream.StreamConfig{ Name: natsEventName, @@ -58,20 +63,19 @@ func (em *eventManager) init(js jetstream.JetStream, taskManage *taskManager, lo Retention: jetstream.InterestPolicy, MaxAge: 24 * time.Hour * 7, } - customerConfig := jetstream.ConsumerConfig{ - Name: natsEventPrefix + "_customer", - Durable: natsEventPrefix + "_customer", + consumerConfig := jetstream.ConsumerConfig{ + Name: natsEventPrefix + "_consumer", + Durable: natsEventPrefix + "_consumer", AckPolicy: jetstream.AckExplicitPolicy, BackOff: backOff, MaxDeliver: len(backOff) + 1, MaxAckPending: runtime.GOMAXPROCS(0) * 3, } - err := em.manageInitializers.init(js, streamConfig, customerConfig) + err := em.manageInitializers.init(js, streamConfig, consumerConfig, logger) if err != nil { return err } - _, err = em.consumer.Consume(em.receiveMsg) - return err + return em.manageInitializers.setMainConsumerConsume(context.TODO(), em.msgHandle) } func (em *eventManager) Publish(event Event, payload []byte) bool { @@ -83,6 +87,7 @@ func (em *eventManager) Publish(event Event, payload []byte) bool { return true } +// Subscribe sets up the task triggered by an event. func (em *eventManager) Subscribe(event Event, triggerTask Task, fetchTaskData func(eventData []byte) ([]byte, error)) { taskMap, _ := em.eventToTask.LoadOrStore(event, dataTool.NewSyncMap[Task, MessageHandler]()) taskMap.Store( @@ -115,6 +120,7 @@ func (em *eventManager) Subscribe(event Event, triggerTask Task, fetchTaskData f }, ) } + func (em *eventManager) SubscribeToNewConsumer(event Event, name string, handler MessageHandler) { em.msgHandlerMap.LoadOrStore( event.subject(), func(payload []byte) error { @@ -122,48 +128,50 @@ func (em *eventManager) SubscribeToNewConsumer(event Event, name string, handler return nil }, ) - ctx := context.Background() - config, err := em.getCustomerConfig(ctx) - if err != nil { - panic(err) - } - config.Name, config.Durable, config.FilterSubjects = name, name, []string{event.subject()} - customer, err := em.newCustomer(ctx, config) - if err != nil { - panic(err) - } - _, err = customer.Consume( - func(msg jetstream.Msg) { - receiveMsg(msg, func(msg jetstream.Msg) error { return handler(msg.Data()) }, em.logger) + _, err := em.consumerManger.NewConsumer( + context.TODO(), + func(config *jetstream.ConsumerConfig) error { + config.Name, config.Durable, config.FilterSubjects = name, name, []string{event.subject()} + return nil }, + func(_ string, payload []byte) error { return handler(payload) }, ) if err != nil { panic(err) } } +func (em *eventManager) updateAllConsumerConfig( + handle func(*jetstream.ConsumerConfig) error, ctx context.Context, +) error { + return em.consumerManger.UpdateAllConsumerConfig(handle, ctx) +} + +func (em *eventManager) getStreamName() string { return natsEventName } + +func (em *eventManager) reConsume(ctx context.Context, consumer string, streamSeq uint64) error { + rawMsg, err := em.stream.GetMsg(ctx, streamSeq) + if err != nil { + return err + } + return em.consumerManger.ReConsume(ctx, consumer, rawMsg) +} + type eventMsgHandler struct { eventToTask dataTool.Map[Event, dataTool.Map[Task, MessageHandler]] msgHandlerMap dataTool.Map[string, MessageHandler] msgManger - logger *zap.Logger - taskManage *taskManager } -func (em *eventMsgHandler) init(logger *zap.Logger) { - em.logger = logger +func (em *eventMsgHandler) init() { em.eventToTask = dataTool.NewSyncMap[Event, dataTool.Map[Task, MessageHandler]]() em.msgHandlerMap = dataTool.NewSyncMap[string, MessageHandler]() } -func (em *eventMsgHandler) receiveMsg(msg jetstream.Msg) { - receiveMsg(msg, func(msg jetstream.Msg) error { return em.msgHandle(msg.Subject(), msg.Data()) }, em.logger) -} - func (em *eventMsgHandler) getHandler(subject string) (MessageHandler, error) { - if subject == string(EventRetryTriggerTask) { + if subject == EventRetryTriggerTask.subject() { return func(payload []byte) error { var data RetryTriggerTask err := json.Unmarshal(payload, &data) diff --git a/global/nats/manager/manager.go b/global/nats/manager/manager.go index 2c0eacdb..d96c66b4 100644 --- a/global/nats/manager/manager.go +++ b/global/nats/manager/manager.go @@ -2,6 +2,7 @@ package manager import ( "context" + "go.uber.org/zap" "log" "sync" "time" @@ -55,71 +56,74 @@ var testBackOff = []time.Duration{ time.Second, } +var updateTestBackOffOnce sync.Once + // UpdateTestBackOff // Most test samples are suspended for 30 seconds to wait for message consumption, // and test whether retry and dead letter queues work properly, // so to ensure that the test samples execute properly, // you need to retry at least 10 times within 30 seconds. - -var once sync.Once - func UpdateTestBackOff() { - once.Do( - func() { - backOff = testBackOff - initManager() - time.Sleep(time.Second * 3) - log.Print("update test back off finish") - }, - ) + updateFunc := func() { + ctx := context.TODO() + backOff = testBackOff + err := eventManage.updateAllConsumerConfig( + func(config *jetstream.ConsumerConfig) error { + config.BackOff = backOff + config.MaxDeliver = len(backOff) + 1 + return nil + }, ctx, + ) + if err != nil { + panic(err) + } + info, err := taskManage.consumer.Info(ctx) + if err != nil { + panic(err) + } + info.Config.BackOff = backOff + info.Config.MaxDeliver = len(backOff) + 1 + _, err = taskManage.stream.UpdateConsumer(ctx, info.Config) + if err != nil { + panic(err) + } + time.Sleep(time.Second * 3) + log.Print("update test back off finish") + } + updateTestBackOffOnce.Do(updateFunc) } type manageInitializers struct { - js jetstream.JetStream - stream jetstream.Stream - consumer jetstream.Consumer + js jetstream.JetStream + stream jetstream.Stream + consumer jetstream.Consumer + consumerManger ConsumerManger + + logger *zap.Logger } func (mi *manageInitializers) init( - js jetstream.JetStream, streamConfig jetstream.StreamConfig, customerConfig jetstream.ConsumerConfig, + js jetstream.JetStream, streamConfig jetstream.StreamConfig, consumerConfig jetstream.ConsumerConfig, + logger *zap.Logger, ) (err error) { - mi.js = js + mi.js, mi.logger = js, logger ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - err = mi.updateStreamConfig(ctx, streamConfig) + mi.stream, err = mi.js.CreateOrUpdateStream(ctx, streamConfig) if err != nil { return err } - return mi.updateCustomerConfig(ctx, customerConfig) -} - -func (mi *manageInitializers) updateStreamConfig( - ctx context.Context, - streamConfig jetstream.StreamConfig, -) (err error) { - mi.stream, err = mi.js.CreateOrUpdateStream(ctx, streamConfig) - return err -} - -func (mi *manageInitializers) updateCustomerConfig( - ctx context.Context, - customerConfig jetstream.ConsumerConfig, -) (err error) { - mi.consumer, err = mi.js.CreateOrUpdateConsumer(ctx, mi.stream.CachedInfo().Config.Name, customerConfig) - return err -} - -func (mi *manageInitializers) getCustomerConfig(ctx context.Context) (config jetstream.ConsumerConfig, err error) { - info, err := mi.consumer.Info(ctx) + mi.consumer, err = mi.stream.CreateOrUpdateConsumer(ctx, consumerConfig) if err != nil { - return config, err + return err + } + mi.consumerManger, err = NewConsumerManger(ctx, mi.stream, mi.consumer, logger) + if err != nil { + return err } - config = info.Config - return config, err + return nil } -func (mi *manageInitializers) newCustomer(ctx context.Context, config jetstream.ConsumerConfig) ( - jetstream.Consumer, error, -) { - return mi.js.CreateOrUpdateConsumer(ctx, mi.stream.CachedInfo().Config.Name, config) +func (mi *manageInitializers) setMainConsumerConsume(ctx context.Context, handler consumerMessageHandler) (err error) { + return mi.consumerManger.Consume(ctx, mi.consumer, handler) } diff --git a/global/nats/manager/manager_test.go b/global/nats/manager/manager_test.go index e8fa82ac..35720d5d 100644 --- a/global/nats/manager/manager_test.go +++ b/global/nats/manager/manager_test.go @@ -5,27 +5,41 @@ import ( "errors" "reflect" "strconv" - "sync/atomic" "testing" "time" + "github.com/ZiRunHua/LeapLedger/util/rand" "github.com/google/uuid" + "golang.org/x/sync/errgroup" ) func init() { UpdateTestBackOff() } func TestSubscribeAndPublish(t *testing.T) { - var task = Task(t.Name() + uuid.NewString()) - taskList := []Task{task + "_1", task + "_2", task + "_3"} - var count atomic.Int32 - for _, name := range taskList { + var taskPrefix = Task(t.Name() + uuid.NewString()) + taskList := []struct { + Task Task + Msg []byte + }{ + { + taskPrefix + "_1", []byte("msg1"), + }, + { + taskPrefix + "_2", []byte("123"), + }, + { + taskPrefix + "_3", []byte("msg3"), + }, + } + msgChan := make(chan []byte, len(taskList)) + for _, task := range taskList { taskManage.Subscribe( - name, func(payload []byte) error { - if !reflect.DeepEqual(payload, []byte("1")) { + task.Task, func(payload []byte) error { + if !reflect.DeepEqual(payload, task.Msg) { t.Fail() } - count.Add(1) + msgChan <- payload return nil }, ) @@ -33,15 +47,20 @@ func TestSubscribeAndPublish(t *testing.T) { t.Run( "publish", func(t *testing.T) { t.Parallel() - time.Sleep(time.Second * 3) - for _, name := range taskList { - taskManage.Publish(name, []byte("1")) + for _, task := range taskList { + taskManage.Publish(task.Task, task.Msg) } - time.Sleep(time.Second * 10) - if count.Load() != int32(len(taskList)) { - t.Fatal("count not is ", count.Load()) + count := 0 + for true { + count++ + if count == len(taskList) { + if len(msgChan) == 0 { + break + } else { + t.Fatal() + } + } } - t.Log("count is:", count.Load()) }, ) } @@ -54,145 +73,255 @@ func TestEventSubscribeAndPublish(t *testing.T) { for i := 1; i <= 3; i++ { taskMap[taskPrefix+Task("_"+strconv.Itoa(i))] = false } - + msgChan := make(chan Task) for task := range taskMap { taskManage.Subscribe( task, func(payload []byte) error { - taskMap[task] = true + msgChan <- task return nil }, ) eventManage.Subscribe(event, task, func(eventData []byte) ([]byte, error) { return eventData, nil }) } t.Run( - "publish", func(t *testing.T) { + "event publish", func(t *testing.T) { t.Parallel() - time.Sleep(time.Second * 3) eventManage.Publish(event, []byte("test")) - time.Sleep(time.Second * 30) - for task, b := range taskMap { - if !b { - t.Fatal(task, "fail") + count := 0 + for true { + taskMap[<-msgChan] = true + count++ + if count == len(taskMap) { + if len(msgChan) == 0 { + for task, result := range taskMap { + if !result { + t.Fatal(task, " not trigger") + } + } + break + } else { + t.Fatal() + } } } - t.Log("task trigger info", taskMap) }, ) } func TestDql(t *testing.T) { - taskM := taskManage - var task = Task(t.Name()) - var retryCount = 1 - taskM.Subscribe( + var task, event = Task(t.Name() + rand.String(12)), Event(t.Name() + rand.String(12)) + msgChan, msg := make(chan []byte), []byte(rand.String(12)) + taskManage.Subscribe( task, func(payload []byte) error { - retryCount++ + msgChan <- payload return errors.New("test dql") }, ) - t.Run( - "publish", func(t *testing.T) { - t.Parallel() - time.Sleep(time.Second * 3) - taskM.Publish(task, []byte("test")) - var backOffTime time.Duration - for _, duration := range backOff { - backOffTime += duration + taskManage.Publish(task, msg) + count := 0 + for true { + <-msgChan + count++ + if count == len(backOff)+1 { + if len(msgChan) == 0 { + break + } else { + t.Fatal("try too many times :", count+len(msgChan)) } - time.Sleep(time.Second*3 + backOffTime) - batch, err := dlqManage.consumer.Fetch(10) - if err != nil { - t.Error(err) - } - var processCount int - for msg := range batch.Messages() { - processCount++ - err = msg.Ack() - if err != nil { - t.Error(err) - } - } - t.Log("retry count", retryCount, "process count", processCount) - }, - ) -} - -func TestDqlRepublish(t *testing.T) { - var task = Task(t.Name() + uuid.NewString()) - var num = 3 - var retryCount atomic.Uint32 + } + } taskManage.Subscribe( task, func(payload []byte) error { - retryCount.Add(1) - return errors.New("test dql") + msgChan <- payload + return nil }, ) + time.Sleep(backOff[len(backOff)-1]) + _, err := dlqManage.RepublishBatch(1, context.TODO()) + if err != nil { + t.Fatal(err) + } + republishPayload := <-msgChan + if !reflect.DeepEqual(republishPayload, msg) { + t.Fatal("reConsume payload not compare", string(republishPayload), msg) + } t.Run( - "republish die msg", func(t *testing.T) { - time.Sleep(time.Second) - for i := 0; i < num; i++ { - taskManage.Publish(task, []byte("test_"+strconv.FormatInt(int64(i), 10))) - } - var backOffTime time.Duration - for _, duration := range backOff { - backOffTime += duration + "event and task dql", func(t *testing.T) { + type TestEvent struct { + Event Event + EventData []byte + TriggerTasks []struct { + task Task + fetchTaskData func(eventData []byte) ([]byte, error) + retryCount int + } + ExecConsumers []struct { + name string + retryCount int + } } - t.Log("sleep", backOffTime) - time.Sleep(time.Second*3 + backOffTime) - var count atomic.Int32 - count.Add(int32(num)) - taskManage.Subscribe( - task, func(payload []byte) error { - count.Add(-1) - return nil + var testEvent = TestEvent{ + Event: event + Event(rand.String(12)), + EventData: []byte("event_data_" + rand.String(12)), + TriggerTasks: []struct { + task Task + fetchTaskData func(eventData []byte) ([]byte, error) + retryCount int + }{ + { + task: task + "_0", + fetchTaskData: func(eventData []byte) ([]byte, error) { return eventData, nil }, + retryCount: 0, + }, + { + task: task + "_1", + fetchTaskData: func(eventData []byte) ([]byte, error) { return eventData, nil }, + retryCount: (len(backOff)) / 2, + }, + { + task: task + "_2", + fetchTaskData: func(eventData []byte) ([]byte, error) { return eventData, nil }, + retryCount: len(backOff) + 1, + }, + }, + ExecConsumers: []struct { + name string + retryCount int + }{ + { + name: string(task) + "_4", + retryCount: len(backOff) / 2, + }, + { + name: string(task) + "_5", + retryCount: len(backOff) + 1, + }, + { + name: string(task) + "_3", + retryCount: 0, + }, }, - ) - time.Sleep(time.Second * 3) - t.Log("retry count", retryCount.Load()) - _, err := dlqManage.RepublishBatch(num*10, context.TODO()) - - if err != nil { - t.Fatal(err) } - time.Sleep(time.Second * 30) - if 0 != count.Load() { - t.Fatal("die msg Remaining:", count.Load()) + + msgChan = make(chan []byte, 10) + var needToReConsume, noNeedToReConsume int + for _, triggerTask := range testEvent.TriggerTasks { + retryCount := 0 + taskManage.Subscribe( + triggerTask.task, func(payload []byte) error { + if retryCount == triggerTask.retryCount { + t.Log(triggerTask.task, "finish") + msgChan <- payload + return nil + } else if retryCount > triggerTask.retryCount { + t.Fatal( + "too many retries or re-consuming errors,task:", triggerTask.task, retryCount, ">", + triggerTask.retryCount, + ) + return nil + } + retryCount++ + return errors.New("test dql") + }, + ) + eventManage.Subscribe( + testEvent.Event, triggerTask.task, func(eventData []byte) ([]byte, error) { return eventData, nil }, + ) + if triggerTask.retryCount >= len(backOff)+1 { + needToReConsume++ + } else { + noNeedToReConsume++ + } } - }, - ) -} -func BenchmarkDql(b *testing.B) { - taskM := taskManage - var task Task = Task(uuid.NewString()) - var count = b.N - taskM.Subscribe( - task, func(payload []byte) error { - return errors.New("test dql") - }, - ) - time.Sleep(time.Second * 5) - for i := 0; i < b.N; i++ { - taskM.Publish(task, []byte("test_"+strconv.FormatInt(int64(i), 10))) - } - time.Sleep(time.Second * 20) - b.Run( - "republish", func(b *testing.B) { - taskM.Subscribe( - task, func(payload []byte) error { - count-- - return nil + for _, execConsumer := range testEvent.ExecConsumers { + retryCount := 0 + eventManage.SubscribeToNewConsumer( + testEvent.Event, + execConsumer.name, func(payload []byte) error { + if retryCount == execConsumer.retryCount { + t.Log(execConsumer.name, "finish") + msgChan <- payload + return nil + } else if retryCount > execConsumer.retryCount { + t.Fatal( + "too many retries or re-consuming errors,consumer:", execConsumer.name, retryCount, + ">", + execConsumer.retryCount, + ) + return nil + } + retryCount++ + return errors.New("test dql") + }, + ) + if execConsumer.retryCount >= len(backOff)+1 { + needToReConsume++ + } else { + noNeedToReConsume++ + } + } + eventManage.Publish(testEvent.Event, testEvent.EventData) + var successMsg int + for true { + msg = <-msgChan + successMsg++ + if !reflect.DeepEqual(msg, testEvent.EventData) { + t.Fatal("payload not equal", msg, testEvent.EventData) + } + if successMsg >= noNeedToReConsume { + if noNeedToReConsume == successMsg && len(msgChan) == 0 { + break + } + t.Fatal("success msg to much:", noNeedToReConsume, successMsg+len(msgChan)) + } + } + t.Run( + "test reConsume", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30) + defer cancel() + g, _ := errgroup.WithContext(ctx) + g.Go( + func() error { + time.Sleep(backOff[len(backOff)-1]) + for true { + count, err := dlqManage.RepublishBatch(10, context.TODO()) + if err != nil { + t.Fatal(err) + } + if count == 0 { + break + } + } + return nil + }, + ) + g.Go( + func() error { + successMsg = 0 + for true { + msg = <-msgChan + successMsg++ + if !reflect.DeepEqual(msg, testEvent.EventData) { + t.Fatal("payload not equal", msg, testEvent.EventData) + } + if successMsg >= needToReConsume { + if needToReConsume == successMsg && len(msgChan) == 0 { + return nil + } + t.Fatal("success msg to much:", needToReConsume, successMsg, len(msgChan)) + } + } + t.Fatal("success msg to much:", needToReConsume, successMsg, len(msgChan)) + return nil + }, + ) + err := g.Wait() + if err != nil { + t.Fatal(err) + } }, ) - - _, err := dlqManage.RepublishBatch(b.N, context.Background()) - if err != nil { - b.Error(err) - } }, ) - time.Sleep(time.Second * 20) - if count != 0 { - b.Fatal("msg lose Publish:", b.N, " republish:", count) - } } diff --git a/global/nats/manager/task.go b/global/nats/manager/task.go index 9c7dc8d9..e1624b35 100644 --- a/global/nats/manager/task.go +++ b/global/nats/manager/task.go @@ -1,7 +1,9 @@ package manager import ( + "context" "fmt" + "path/filepath" "runtime" "time" @@ -20,9 +22,12 @@ type TaskManager interface { } const ( - natsTaskName = "task" - natsTaskPrefix = "task" - natsTaskLogPath = natsLogPath + "task.log" + natsTaskName = "task" + natsTaskPrefix = "task" +) + +var ( + natsTaskLogPath = filepath.Join(natsLogPath, "task.log") ) type Task string @@ -34,31 +39,32 @@ type taskManager struct { TaskManager manageInitializers taskMsgHandler + + dlqRegisterStream } func (tm *taskManager) init(js jetstream.JetStream, logger *zap.Logger) error { - tm.taskMsgHandler.init(logger) + tm.taskMsgHandler.init() streamConfig := jetstream.StreamConfig{ Name: natsTaskName, Subjects: []string{natsTaskPrefix + ".*"}, - Retention: jetstream.InterestPolicy, + Retention: jetstream.LimitsPolicy, MaxAge: 24 * time.Hour * 7, } - customerConfig := jetstream.ConsumerConfig{ - Name: natsTaskPrefix + "_customer", - Durable: natsTaskPrefix + "_customer", + consumerConfig := jetstream.ConsumerConfig{ + Name: natsTaskPrefix + "_consumer", + Durable: natsTaskPrefix + "_consumer", AckPolicy: jetstream.AckExplicitPolicy, BackOff: backOff, MaxDeliver: len(backOff) + 1, MaxAckPending: runtime.GOMAXPROCS(0) * 3, } - err := tm.manageInitializers.init(js, streamConfig, customerConfig) + err := tm.manageInitializers.init(js, streamConfig, consumerConfig, logger) if err != nil { return err } - _, err = tm.consumer.Consume(tm.receiveMsg) - return err + return tm.manageInitializers.setMainConsumerConsume(context.TODO(), tm.msgHandle) } func (tm *taskManager) Publish(task Task, payload []byte) bool { @@ -85,21 +91,34 @@ func (tm *taskManager) GetMessageHandler(task Task) (MessageHandler, error) { return tm.getHandler(task.subject()) } +func (tm *taskManager) getStreamName() string { return natsTaskName } + +func (tm *taskManager) reConsume(ctx context.Context, _ string, streamSeq uint64) error { + rawMsg, err := tm.stream.GetMsg(ctx, streamSeq) + + if err != nil { + return err + } + // taskManager has only one consumer group, so it uses the PublishMsgAsync method directly + _, err = tm.js.PublishMsgAsync( + &nats.Msg{ + Subject: rawMsg.Subject, + Data: rawMsg.Data, + Header: rawMsg.Header, + }, + ) + return err +} + type taskMsgHandler struct { msgHandlerMap dataTool.Map[string, MessageHandler] msgManger - - logger *zap.Logger } -func (tm *taskMsgHandler) init(logger *zap.Logger) { - tm.logger = logger +func (tm *taskMsgHandler) init() { tm.msgHandlerMap = dataTool.NewSyncMap[string, MessageHandler]() } -func (tm *taskMsgHandler) receiveMsg(msg jetstream.Msg) { - receiveMsg(msg, func(msg jetstream.Msg) error { return tm.msgHandle(msg.Subject(), msg.Data()) }, tm.logger) -} func (tm *taskMsgHandler) getHandler(subject string) (MessageHandler, error) { handler, exist := tm.msgHandlerMap.Load(subject) if !exist { diff --git a/global/nats/publicFunc_test.go b/global/nats/publicFunc_test.go index 9d2eddd7..0f1c10f4 100644 --- a/global/nats/publicFunc_test.go +++ b/global/nats/publicFunc_test.go @@ -31,219 +31,262 @@ func newTaskData() taskData { } func TestTaskPublishAndSubscribe(t *testing.T) { task := Task(t.Name() + uuid.NewString()) - success := false + msgChan := make(chan struct{}) SubscribeTask( task, func(ctx context.Context) error { - success = true + msgChan <- struct{}{} return nil }, ) t.Run( "Publish task", func(t *testing.T) { t.Parallel() - time.Sleep(time.Second * 3) if !PublishTask(task) { t.Error("Publish fail") } - time.Sleep(time.Second * 3) - if !success { - t.Fail() - } + <-msgChan + close(msgChan) }, ) withPayloadTask, data := Task(t.Name()+uuid.NewString()), newTaskData() - var withPayloadTaskSuccess bool + msgWithDataChan := make(chan struct{ Data taskData }) SubscribeTaskWithPayloadAndProcessInTransaction( withPayloadTask, func(pushData taskData, ctx context.Context) error { - withPayloadTaskSuccess = reflect.DeepEqual(data, pushData) + msgWithDataChan <- struct{ Data taskData }{Data: pushData} return nil }, ) t.Run( "Publish task With payload", func(t *testing.T) { t.Parallel() - time.Sleep(time.Second * 3) if !PublishTaskWithPayload(withPayloadTask, data) { t.Error("Publish fail") } - time.Sleep(time.Second * 3) - if !withPayloadTaskSuccess { - t.Fail() + msg := <-msgWithDataChan + if !reflect.DeepEqual(msg.Data, data) { + t.Fatal("push data not equal:", msg.Data, data) } + close(msgWithDataChan) }, ) } func TestEventPublishAndSubscribe(t *testing.T) { + var taskCount = 10 taskMap, event := make(map[Task]int), Event(uuid.NewString()) - for i := 0; i < 10; i++ { + taskChan := make(chan struct{ Task Task }, taskCount) + for i := 0; i < taskCount; i++ { task := Task("task_" + uuid.NewString()) - taskMap[task] = 0 SubscribeTask( task, func(ctx context.Context) error { - taskMap[task]++ + taskChan <- struct{ Task Task }{task} return nil }, ) - } - time.Sleep(time.Second) - for task := range taskMap { + taskMap[task]++ BindTaskToEvent(event, task) } t.Run( "publish", func(t *testing.T) { - time.Sleep(time.Second * 3) + t.Parallel() PublishEvent(event) - time.Sleep(time.Second * 10) - for task, value := range taskMap { - if value != 1 { - t.Log(task, "fail trigger count", value) + for true { + msg, open := <-taskChan + if !open { + t.Fatal(errors.New("chan close")) + } + taskMap[msg.Task]-- + if taskMap[msg.Task] == 0 { + delete(taskMap, msg.Task) + } else if taskMap[msg.Task] < 0 { + t.Fatal(msg, errors.New("msg repeat")) + } + if len(taskMap) != 0 { + continue } + if len(taskChan) != 0 { + t.Fatal(taskChan, errors.New("msg repeat")) + } + close(taskChan) + return } - t.Log("task trigger info", taskMap) + }, ) } func TestSubscribeEvent(t *testing.T) { name := t.Name() + uuid.NewString() - event := Event(name) - var count atomic.Int32 - count.Add(10) var retryCount atomic.Int32 - SubscribeEvent( - event, name, func(v int, ctx context.Context) error { - if retryCount.Add(1) < 10 { - return errors.New("test retry") - } - count.Add(int32(v)) - return nil - }, - ) - task := Task(name + "_task_1") - BindTaskToEvent(event, task) - SubscribeTaskWithPayload( - task, func(v int, ctx context.Context) error { - if retryCount.Add(1) < 10 { - return errors.New("test retry") - } - count.Add(int32(v)) - return nil - }, - ) - task = Task(name + "_task_2") - BindTaskToEvent(event, task) - SubscribeTaskWithPayload( - task, func(v int, ctx context.Context) error { - if retryCount.Add(1) < 10 { - return errors.New("test retry") - } - count.Add(int32(v)) - return nil + eventToTask := map[Event]map[Task]struct{}{ + Event(name + "_event_1"): { + Task(name + "_task_1"): struct{}{}, + Task(name + "_task_2"): struct{}{}, + Task(name + "_task_3"): struct{}{}, }, - ) - t.Run( - "publish", func(t *testing.T) { - t.Parallel() - time.Sleep(3 * time.Second) - PublishEventWithPayload(event, 1) - time.Sleep(30 * time.Second) - if count.Load() != 13 { - t.Fatal(count.Load()) - } - }, - ) + } + eventSubmitCount := make(map[Event]int) + msgChan := make(chan struct{ Event Event }) + // subscribe + for event, tasks := range eventToTask { + eventSubmitCount[event] = 1 + SubscribeEvent( + event, string(event)+"_new_customer_group", func(v int, ctx context.Context) error { + if retryCount.Add(1) < 10 { + return errors.New("test retry") + } + msgChan <- struct{ Event Event }{event} + return nil + }, + ) + for task := range tasks { + eventSubmitCount[event]++ + BindTaskToEvent(event, task) + SubscribeTaskWithPayload( + task, func(v int, ctx context.Context) error { + if retryCount.Add(1) < 10 { + return errors.New("test retry") + } + msgChan <- struct{ Event Event }{event} + return nil + }, + ) + } + } + for event := range eventToTask { + PublishEventWithPayload(event, 1) + } + for true { + msg, open := <-msgChan + if !open { + t.Fatal(errors.New("chan close")) + } + eventSubmitCount[msg.Event]-- + if eventSubmitCount[msg.Event] == 0 { + delete(eventSubmitCount, msg.Event) + } else if eventSubmitCount[msg.Event] < 0 { + t.Fatal(msg, errors.New("msg repeat")) + } + if len(eventSubmitCount) == 0 { + close(msgChan) + t.Log("finish") + return + } + } } func TestOutboxTask(t *testing.T) { - taskMap := make(map[Task]*atomic.Int32) - var retryCount int32 = 2 - for i := 0; i < 3; i++ { + var retryCount, taskNumber = 2, 3 + msgChan, taskRetryCount := make(chan struct{}, taskNumber), make(map[Task]*atomic.Int32) + for i := 0; i < taskNumber; i++ { task := Task(t.Name() + "task_" + uuid.NewString()) - taskMap[task] = new(atomic.Int32) + taskRetryCount[task] = new(atomic.Int32) + taskRetryCount[task].Add(int32(retryCount)) SubscribeTaskWithPayload( task, func(data int32, ctx context.Context) error { - if taskMap[task].Add(-1) > -retryCount { + if taskRetryCount[task].Add(-1) >= 0 { return errors.New("test retry") } - taskMap[task].Add(data) + msgChan <- struct{}{} return nil }, ) } - t.Run( - "publish", func(t *testing.T) { - t.Parallel() - time.Sleep(time.Second * 3) - for task := range taskMap { - err := db.Transaction( - context.TODO(), func(ctx *cus.TxContext) error { - return PublishTaskToOutboxWithPayload(ctx, task, retryCount+1) - }, - ) - if err != nil { - t.Fatal(err) - } - } - time.Sleep(time.Second * 30) - for task, i := range taskMap { - if i.Load() != 1 { - t.Fatal(task, i) - } + for task := range taskRetryCount { + err := db.Transaction( + context.TODO(), func(ctx *cus.TxContext) error { + return PublishTaskToOutboxWithPayload(ctx, task, 1) + }, + ) + if err != nil { + t.Fatal(err) + } + } + for true { + _, open := <-msgChan + if open { + taskNumber-- + if taskNumber == 0 { + close(msgChan) + return } - }, - ) + } else { + t.Fatal(errors.New("chan close")) + } + } } func TestOutboxEvent(t *testing.T) { - eventMap := make(map[Event]*atomic.Int32) - eventToTask := make(map[Event][]Task) - for i := 0; i < 10; i++ { + const eventNumber, taskNumber = 10, 3 + eventToTask, eventChan := make(map[Event]map[Task]struct{}), make( + chan struct { + Event Event + Task Task + }, eventNumber*taskNumber, + ) + for i := 0; i < eventNumber; i++ { event := Event("event_" + uuid.NewString()) - eventMap[event] = &atomic.Int32{} - for j := 0; j < 3; j++ { + eventToTask[event] = make(map[Task]struct{}) + for j := 0; j < taskNumber; j++ { task := Task("task_" + uuid.NewString()) - eventToTask[event] = append(eventToTask[event], task) + eventToTask[event][task] = struct{}{} SubscribeTaskWithPayload( task, func(t int, ctx context.Context) error { - eventMap[event].Add(3) + eventChan <- struct { + Event Event + Task Task + }{Event: event, Task: task} return nil }, ) BindTaskToEvent(event, task) } } - t.Run( - "public", func(t *testing.T) { - t.Parallel() - time.Sleep(time.Second * 3) - for event := range eventMap { - err := db.Transaction( - context.TODO(), func(ctx *cus.TxContext) error { - return PublishEventToOutboxWithPayload(ctx, event, 3) - }, - ) - if err != nil { - t.Fatal(err) - } + for event := range eventToTask { + err := db.Transaction( + context.TODO(), func(ctx *cus.TxContext) error { + return PublishEventToOutboxWithPayload(ctx, event, 3) + }, + ) + if err != nil { + t.Fatal(err) + } + } + for true { + msg, open := <-eventChan + if open { + if _, exist := eventToTask[msg.Event][msg.Task]; !exist { + close(eventChan) + t.Fatal(msg, errors.New("msg repeat")) } - time.Sleep(time.Second * 20) - for event, num := range eventMap { - if num.Load() != 9 { - t.Fatal(event, num.Load()) + delete(eventToTask[msg.Event], msg.Task) + if len(eventToTask[msg.Event]) == 0 { + delete(eventToTask, msg.Event) + } + if len(eventToTask) == 0 { + if len(eventChan) > 0 { + close(eventChan) + t.Fatal(eventChan, errors.New("msg repeat")) } + t.Log("finish") + close(eventChan) + return } - }, - ) + } else { + t.Fatal(errors.New("chan close")) + } + } } func TestCustomerProcessingTimeout(t *testing.T) { var count atomic.Int32 task := manager.Task(t.Name()) + msgChan := make(chan struct{}) taskManage.Subscribe( task, func(payload []byte) error { count.Add(1) time.Sleep(time.Second * 20) + msgChan <- struct{}{} return nil }, ) @@ -253,15 +296,16 @@ func TestCustomerProcessingTimeout(t *testing.T) { t.Parallel() err := db.Transaction( context.TODO(), func(ctx *cus.TxContext) error { - return PublishTaskToOutbox(ctx, Task(task)) + return PublishTaskToOutbox(ctx, task) }, ) if err != nil { t.Fatal(err) } - time.Sleep(time.Second * 31) + <-msgChan + close(msgChan) if count.Load() != 1 { - t.Fatal("count not is 1,count:", count.Load()) + t.Fatal("msg repeat, repeat count:", count.Load()-1) } }, ) diff --git a/initialize/initialize.go b/initialize/initialize.go index 187d69d8..641f966a 100644 --- a/initialize/initialize.go +++ b/initialize/initialize.go @@ -3,6 +3,7 @@ package initialize import ( "context" "os" + "path/filepath" "time" "github.com/ZiRunHua/LeapLedger/global/constant" @@ -61,14 +62,12 @@ func init() { } } -const _configDirectoryPath = constant.WORK_PATH - func initConfig() error { configFileName := os.Getenv("CONFIG_FILE_NAME") if len(configFileName) == 0 { configFileName = "config.yaml" } - configPath := _configDirectoryPath + "/" + configFileName + configPath := filepath.Join(constant.RootDir, configFileName) yamlFile, err := os.ReadFile(configPath) if err != nil { return err diff --git a/initialize/logger.go b/initialize/logger.go index 4f2fbf64..c6a6170b 100644 --- a/initialize/logger.go +++ b/initialize/logger.go @@ -2,6 +2,7 @@ package initialize import ( "os" + "path/filepath" "github.com/ZiRunHua/LeapLedger/global/constant" "go.uber.org/zap" @@ -12,22 +13,22 @@ type _logger struct { encoder zapcore.Encoder } -const ( - _requestLogPath = constant.WORK_PATH + "/log/request.log" - _errorLogPath = constant.WORK_PATH + "/log/error.log" - _panicLogPath = constant.WORK_PATH + "/log/panic.log" +var ( + _requestLogPath = filepath.Join(constant.RootDir, "log", "request.log") + _errorLogPath = filepath.Join(constant.RootDir, "log", "error.log") + _panicLogPath = filepath.Join(constant.RootDir, "log", "panic.log") ) func (l *_logger) do() error { l.setEncoder() var err error - if RequestLogger, err = l.initLogger(_requestLogPath); err != nil { + if RequestLogger, err = l.New(_requestLogPath); err != nil { return err } - if ErrorLogger, err = l.initLogger(_errorLogPath); err != nil { + if ErrorLogger, err = l.New(_errorLogPath); err != nil { return err } - if PanicLogger, err = l.initLogger(_panicLogPath); err != nil { + if PanicLogger, err = l.New(_panicLogPath); err != nil { return err } return nil @@ -40,7 +41,11 @@ func (l *_logger) setEncoder() { l.encoder = zapcore.NewConsoleEncoder(encoderConfig) } -func (l *_logger) initLogger(path string) (*zap.Logger, error) { +func (l *_logger) New(path string) (*zap.Logger, error) { + err := os.MkdirAll(filepath.Dir(path), os.ModePerm) + if err != nil { + return nil, err + } file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { return nil, err diff --git a/initialize/nats.go b/initialize/nats.go index e36877a9..980d03ba 100644 --- a/initialize/nats.go +++ b/initialize/nats.go @@ -15,8 +15,6 @@ type _nats struct { // NatsDb is used to record and retry failure messages // Enabled in consumer server -const nastStoreDir = constant.RUNTIME_DATA_PATH + "/nats" - func (n *_nats) do() error { err := n.init() if err != nil { diff --git a/model/product/enter.go b/model/product/enter.go index 333fbbcd..c844ef71 100644 --- a/model/product/enter.go +++ b/model/product/enter.go @@ -2,16 +2,18 @@ package productModel import ( "context" + "os" + "path/filepath" + "github.com/ZiRunHua/LeapLedger/global/constant" "github.com/ZiRunHua/LeapLedger/global/cus" "github.com/ZiRunHua/LeapLedger/global/db" "github.com/ZiRunHua/LeapLedger/util/fileTool" "gorm.io/gorm" "gorm.io/gorm/logger" - "os" ) -var initSqlFile = constant.DATA_PATH + "/database/product.sql" +var initSqlFile = filepath.Clean(constant.DataPath + "/database/product.sql") func init() { // table diff --git a/router/websocket/websocket.go b/router/websocket/websocket.go index 6d2c3956..0dbec885 100644 --- a/router/websocket/websocket.go +++ b/router/websocket/websocket.go @@ -1,10 +1,11 @@ package websocket import ( + "github.com/ZiRunHua/LeapLedger/global" + "go.uber.org/zap" "net" "time" - "github.com/ZiRunHua/LeapLedger/api/v1/ws/msg" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" ) @@ -18,8 +19,7 @@ func Use(handler func(conn *websocket.Conn, ctx *gin.Context) error) gin.Handler return func(ctx *gin.Context) { conn, err := upgrader.Upgrade(ctx.Writer, ctx.Request, nil) if err != nil { - ctx.JSONP(500, "error") - return + panic(err) } conn.SetPingHandler( func(message string) error { @@ -33,18 +33,11 @@ func Use(handler func(conn *websocket.Conn, ctx *gin.Context) error) gin.Handler }, ) conn.SetPongHandler(nil) - conn.SetCloseHandler( - func(code int, text string) error { - return nil - }, - ) + conn.SetCloseHandler(nil) defer conn.Close() err = handler(conn, ctx) if err != nil { - err = msg.SendError(conn, err) - if err != nil { - panic(err) - } + global.ErrorLogger.Error("websocket err", zap.Error(err)) } } } diff --git a/service/category/enter.go b/service/category/enter.go index 8c43e051..57e76ded 100644 --- a/service/category/enter.go +++ b/service/category/enter.go @@ -1,10 +1,7 @@ package categoryService import ( - "github.com/ZiRunHua/LeapLedger/global/constant" _thirdpartyService "github.com/ZiRunHua/LeapLedger/service/thirdparty" - _log "github.com/ZiRunHua/LeapLedger/util/log" - "go.uber.org/zap" ) type Group struct { @@ -14,17 +11,4 @@ type Group struct { var GroupApp = new(Group) -var task = &_task{} var aiService = _thirdpartyService.GroupApp.Ai -var errorLog *zap.Logger - -// 初始化 -func init() { - initLog() -} -func initLog() { - var err error - if errorLog, err = _log.GetNewZapLogger(constant.LOG_PATH + "/service/category/error.log"); err != nil { - panic(err) - } -} diff --git a/service/product/bill/reader.go b/service/product/bill/reader.go index e6e9a888..3e4ce2b6 100644 --- a/service/product/bill/reader.go +++ b/service/product/bill/reader.go @@ -9,28 +9,28 @@ package bill import ( + "context" + "path/filepath" + "strings" + + "github.com/ZiRunHua/LeapLedger/global" "github.com/ZiRunHua/LeapLedger/global/constant" + "github.com/ZiRunHua/LeapLedger/global/db" accountModel "github.com/ZiRunHua/LeapLedger/model/account" productModel "github.com/ZiRunHua/LeapLedger/model/product" transactionModel "github.com/ZiRunHua/LeapLedger/model/transaction" - "github.com/ZiRunHua/LeapLedger/util/log" + "github.com/pkg/errors" "go.uber.org/zap" - "strings" ) -import ( - "context" - "github.com/ZiRunHua/LeapLedger/global/db" - "github.com/pkg/errors" +var ( + logPath = filepath.Clean(constant.LogPath + "/service/product/bill.log") + logger *zap.Logger ) -const logPath = constant.LOG_PATH + "/service/product/bill.log" - -var logger *zap.Logger - func init() { var err error - if logger, err = log.GetNewZapLogger(logPath); err != nil { + if logger, err = global.Config.Logger.New(logPath); err != nil { panic(err) } } diff --git a/service/template/enter.go b/service/template/enter.go index e962a21e..18f5b3ca 100644 --- a/service/template/enter.go +++ b/service/template/enter.go @@ -3,7 +3,9 @@ package templateService import ( "context" "errors" + "path/filepath" + "github.com/ZiRunHua/LeapLedger/global" "github.com/ZiRunHua/LeapLedger/global/constant" "github.com/ZiRunHua/LeapLedger/global/cus" "github.com/ZiRunHua/LeapLedger/global/db" @@ -14,7 +16,6 @@ import ( _productService "github.com/ZiRunHua/LeapLedger/service/product" _userService "github.com/ZiRunHua/LeapLedger/service/user" "github.com/ZiRunHua/LeapLedger/util" - _log "github.com/ZiRunHua/LeapLedger/util/log" "github.com/ZiRunHua/LeapLedger/util/rand" "go.uber.org/zap" "gorm.io/gorm" @@ -48,7 +49,8 @@ var ( func init() { var err error - if errorLog, err = _log.GetNewZapLogger(constant.LOG_PATH + "/service/template/error.log"); err != nil { + logPath := filepath.Clean(constant.LogPath + "/service/template/error.log") + if errorLog, err = global.Config.Logger.New(logPath); err != nil { panic(err) } diff --git a/service/thirdparty/email.go b/service/thirdparty/email.go index 1fa82cdc..2443dead 100644 --- a/service/thirdparty/email.go +++ b/service/thirdparty/email.go @@ -3,28 +3,30 @@ package thirdpartyService import ( "bytes" "fmt" + "os" + "path/filepath" + "time" + "github.com/ZiRunHua/LeapLedger/global" "github.com/ZiRunHua/LeapLedger/global/constant" userModel "github.com/ZiRunHua/LeapLedger/model/user" commonService "github.com/ZiRunHua/LeapLedger/service/common" "github.com/ZiRunHua/LeapLedger/util/rand" "github.com/pkg/errors" - "os" - "time" ) var emailTemplate map[constant.Notification][]byte var emailTemplateFilePath = map[constant.Notification]string{ - constant.NotificationOfCaptcha: "/template/email/captcha.html", - constant.NotificationOfRegistrationSuccess: "/template/email/registerSuccess.html", - constant.NotificationOfUpdatePassword: "/template/email/updatePassword.html", + constant.NotificationOfCaptcha: filepath.Clean("/template/email/captcha.html"), + constant.NotificationOfRegistrationSuccess: filepath.Clean("/template/email/registerSuccess.html"), + constant.NotificationOfUpdatePassword: filepath.Clean("/template/email/updatePassword.html"), } func init() { emailTemplate = make(map[constant.Notification][]byte, len(emailTemplateFilePath)) var err error for notification, path := range emailTemplateFilePath { - if emailTemplate[notification], err = os.ReadFile(constant.DATA_PATH + path); err != nil { + if emailTemplate[notification], err = os.ReadFile(filepath.Clean(constant.DataPath + path)); err != nil { panic(err) } } diff --git a/service/thirdparty/email/enter.go b/service/thirdparty/email/enter.go index a83aa218..e25e649f 100644 --- a/service/thirdparty/email/enter.go +++ b/service/thirdparty/email/enter.go @@ -2,6 +2,7 @@ package email import ( "fmt" + "github.com/ZiRunHua/LeapLedger/global/cron" ) @@ -43,7 +44,14 @@ func (e *thirdPartyResponseError) Error() string { func init() { _, err := cron.Scheduler.Every(30).Minute().Do( - cron.MakeJobFunc(Service.getToken), + cron.MakeJobFunc( + func() error { + if false == ServiceStatus { + return nil + } + return Service.getToken() + }, + ), ) if err != nil { panic(err) diff --git a/service/thirdparty/enter.go b/service/thirdparty/enter.go index 77d84dfc..915281d4 100644 --- a/service/thirdparty/enter.go +++ b/service/thirdparty/enter.go @@ -1,10 +1,7 @@ package thirdpartyService import ( - "github.com/ZiRunHua/LeapLedger/global/constant" "github.com/ZiRunHua/LeapLedger/service/thirdparty/email" - _log "github.com/ZiRunHua/LeapLedger/util/log" - "go.uber.org/zap" ) type Group struct { @@ -13,14 +10,5 @@ type Group struct { var ( GroupApp = new(Group) - log *zap.Logger emailServer = email.Service ) - -// 初始化 -func init() { - var err error - if log, err = _log.GetNewZapLogger(constant.LOG_PATH + "/service/thirdparty/email.log"); err != nil { - panic(err) - } -} diff --git a/service/thirdparty/task.go b/service/thirdparty/task.go index ca6a1a1b..0c9af14a 100644 --- a/service/thirdparty/task.go +++ b/service/thirdparty/task.go @@ -2,6 +2,9 @@ package thirdpartyService import ( "context" + "errors" + + "github.com/ZiRunHua/LeapLedger/global" "github.com/ZiRunHua/LeapLedger/global/nats" userModel "github.com/ZiRunHua/LeapLedger/model/user" ) @@ -9,7 +12,11 @@ import ( func init() { nats.SubscribeTaskWithPayload( nats.TaskSendCaptchaEmail, func(t nats.PayloadSendCaptchaEmail, ctx context.Context) error { - return GroupApp.sendCaptchaEmail(t.Email, t.Action) + err := GroupApp.sendCaptchaEmail(t.Email, t.Action) + if errors.Is(err, global.ErrServiceClosed) { + return nil + } + return err }, ) nats.SubscribeTaskWithPayload( @@ -18,7 +25,11 @@ func init() { if err != nil { return err } - return GroupApp.sendNotificationEmail(user, t.Notification) + err = GroupApp.sendNotificationEmail(user, t.Notification) + if errors.Is(err, global.ErrServiceClosed) { + return nil + } + return err }, ) } diff --git a/service/transaction/enter.go b/service/transaction/enter.go index 00453228..82f71b04 100644 --- a/service/transaction/enter.go +++ b/service/transaction/enter.go @@ -1,11 +1,5 @@ package transactionService -import ( - "github.com/ZiRunHua/LeapLedger/global/constant" - _log "github.com/ZiRunHua/LeapLedger/util/log" - "go.uber.org/zap" -) - type Group struct { Transaction Timing Timing @@ -14,15 +8,5 @@ type Group struct { var ( GroupApp = new(Group) - errorLog *zap.Logger - task = &_task{} - server = &Transaction{} + server = &Transaction{} ) - -// 初始化 -func init() { - var err error - if errorLog, err = _log.GetNewZapLogger(constant.LOG_PATH + "/service/transaction/error.log"); err != nil { - panic(err) - } -} diff --git a/util/log/log.go b/util/log/log.go deleted file mode 100644 index 69ee65c7..00000000 --- a/util/log/log.go +++ /dev/null @@ -1,30 +0,0 @@ -package log - -import ( - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "os" - "path/filepath" -) - -func GetNewZapLogger(logPath string) (*zap.Logger, error) { - logDir := filepath.Dir(logPath) - err := os.MkdirAll(logDir, os.ModePerm) - if err != nil { - return nil, err - } - logFile, err := os.Create(logPath) - if err != nil { - return nil, err - } - encoderConfig := zap.NewProductionEncoderConfig() - encoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder - encoder := zapcore.NewJSONEncoder(encoderConfig) - core := zapcore.NewCore( - encoder, - zapcore.AddSync(logFile), - zap.NewAtomicLevelAt(zap.InfoLevel), - ) - logger := zap.New(core, zap.AddCaller()) - return logger, nil -}