Skip to content

Commit

Permalink
[Server] fix tagrecorder check
Browse files Browse the repository at this point in the history
  • Loading branch information
roryye authored and SongZhen0704 committed Apr 19, 2024
1 parent 33f8c4a commit 56780f6
Showing 1 changed file with 37 additions and 9 deletions.
46 changes: 37 additions & 9 deletions server/controller/tagrecorder/check/tagrecorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

logging "github.com/op/go-logging"
"gorm.io/gorm"
"gorm.io/gorm/clause"

"github.com/deepflowio/deepflow/server/controller/config"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
Expand Down Expand Up @@ -62,14 +63,17 @@ type TagRecorder struct {

func (c *TagRecorder) Check() {
go func() {
log.Info("tagrecorder health check data run")
t := time.Now()
if err := mysql.GetDBs().DoOnAllDBs(func(db *mysql.DB) error {
return c.check(db)
t := time.Now()
log.Infof("ORG(id=%d database=%s) tagrecorder health check data run", db.ORGID, db.Name)
if err := c.check(db); err != nil {
log.Infof("ORG(id=%d database=%s) tagrecorder health check failed: %v", db.ORGID, db.Name, err.Error())
}
log.Infof("ORG(id=%d database=%s) tagrecorder health check data end, time since: %v", db.ORGID, db.Name, time.Since(t))
return nil
}); err != nil {
log.Error(err)
}
log.Infof("tagrecorder health check data end, time since: %v", time.Since(t))
}()
}

Expand Down Expand Up @@ -182,8 +186,9 @@ func compareAndCheck[CT MySQLChModel](db *mysql.DB, oldItems, newItems []CT) err
return err
}
var t CT
tableName := reflect.TypeOf(t)
log.Infof("check tagrecorder table(%v), old len(%v) hash(%v), new len(%v) hash(%v)", tableName, len(oldItems), oldHash, len(newItems), newHash)
tableName := reflect.TypeOf(t).String()
log.Infof("ORG(id=%d database=%s) check tagrecorder table(%v), old len(%v) hash(%v), new len(%v) hash(%v)",
db.ORGID, db.Name, tableName, len(oldItems), oldHash, len(newItems), newHash)
if oldHash == newHash {
return nil
}
Expand All @@ -199,16 +204,39 @@ func compareAndCheck[CT MySQLChModel](db *mysql.DB, oldItems, newItems []CT) err
addItems = append(addItems, item)
}
if len(addItems) > 0 {
if err := tx.Create(&addItems).Error; err != nil {
return fmt.Errorf("add data(len:%d) to table(%s) failed, %v", len(newItems), tableName, err)
if err := addBatch(tx, addItems, db.ORGID, db.Name, tableName); err != nil {
return fmt.Errorf("add data to table(%s) failed, %v", tableName, err)
}
}
return nil
})
log.Infof("truncate table(%v)", tableName)
log.Infof("ORG(id=%d database=%s) truncate table(%v)", db.ORGID, db.Name, tableName)
return err
}

func addBatch[CT MySQLChModel](tx *gorm.DB, toAdd []CT, orgID int, database, resourceType string) error {
count := len(toAdd)
offset := 1000
pages := count/offset + 1
if count%offset == 0 {
pages = count / offset
}
for i := 0; i < pages; i++ {
start := i * offset
end := (i + 1) * offset
if end > count {
end = count
}
oneP := toAdd[start:end]
err := tx.Clauses(clause.Returning{}).Create(&oneP).Error
if err != nil {
return err
}
log.Infof("ORG(id=%d database=%s) add %d %s[%d, %d] success", orgID, database, len(oneP), resourceType, start, end)
}
return nil
}

func genH64[CT MySQLChModel](oldItems, newItems []CT) (oldHash, newHash uint64, err error) {
var newStrByte []byte
newStr := make([]string, len(newItems))
Expand Down

0 comments on commit 56780f6

Please sign in to comment.