Skip to content

Commit

Permalink
[Server] fix
Browse files Browse the repository at this point in the history
  • Loading branch information
roryye committed May 31, 2024
1 parent cb36a54 commit a95f94b
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 103 deletions.
2 changes: 1 addition & 1 deletion cli/ctl/agent_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func createAgentGroup(cmd *cobra.Command, args []string, groupID string) {
url := fmt.Sprintf("http://%s:%d/v1/vtap-groups/", server.IP, server.Port)

// 调用采集器组API,并输出返回结果
body := map[string]interface{}{"name": args[0], "group_id": groupID, "team_id": 1, "user_id": 1}
body := map[string]interface{}{"name": args[0], "group_id": groupID}
_, err := common.CURLPerform("POST", url, body, "",
[]common.HTTPOption{common.WithTimeout(common.GetTimeout(cmd)), common.WithORGID(common.GetORGID(cmd))}...)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions server/controller/db/mysql/migration/rawsql/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1536,8 +1536,8 @@ TRUNCATE TABLE policy_acl_group;

CREATE TABLE IF NOT EXISTS vtap_group_configuration(
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
user_id INTEGER DEFAULT 1,
team_id INTEGER DEFAULT 1,
user_id INTEGER DEFAULT 1,
team_id INTEGER DEFAULT 1,
max_collect_pps INTEGER DEFAULT NULL,
max_npb_bps BIGINT DEFAULT NULL COMMENT 'unit: bps',
max_cpus INTEGER DEFAULT NULL,
Expand Down
260 changes: 160 additions & 100 deletions server/controller/http/service/vtap_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/google/uuid"
"gorm.io/gorm"

"github.com/deepflowio/deepflow/server/agent_config"
agentconf "github.com/deepflowio/deepflow/server/agent_config"
"github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/controller/config"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
Expand Down Expand Up @@ -167,6 +167,20 @@ func (a *AgentGroup) Create(vtapGroupCreate model.VtapGroupCreate) (resp model.V
)
}

var allVTaps []mysql.VTap
if err = db.Where("lcuuid IN (?)", vtapGroupCreate.VtapLcuuids).Find(&allVTaps).Error; err != nil {
return model.VtapGroup{}, err
}
vtaps, err := getAgentByUser(userInfo, &a.cfg.FPermit, allVTaps)
if err != nil {
return model.VtapGroup{}, err
}
for _, vtap := range vtaps {
if vtap.TeamID != vtapGroupCreate.TeamID {
return model.VtapGroup{}, fmt.Errorf("agent team(%d) must equal to agent group team(%d)", vtap.TeamID, vtapGroupCreate.TeamID)
}
}

shortUUID := VTAP_GROUP_SHORT_UUID_PREFIX + common.GenerateShortUUID()
groupID := vtapGroupCreate.GroupID
// verify vtap group id in deepflow-ctl command model
Expand All @@ -183,17 +197,22 @@ func (a *AgentGroup) Create(vtapGroupCreate model.VtapGroupCreate) (resp model.V
vtapGroup.Name = vtapGroupCreate.Name
vtapGroup.TeamID = vtapGroupCreate.TeamID
vtapGroup.UserID = a.resourceAccess.userInfo.ID
db.Create(&vtapGroup)

var allVTaps []mysql.VTap
db.Where("lcuuid IN (?)", vtapGroupCreate.VtapLcuuids).Find(&allVTaps)
vtaps, err := getAgentByUser(userInfo, &a.cfg.FPermit, allVTaps)
err = db.Transaction(func(tx *gorm.DB) error {
if err := tx.Create(&vtapGroup).Error; err != nil {
return err
}
for _, vtap := range vtaps {
if err := tx.Model(&vtap).Updates(map[string]interface{}{"vtap_group_lcuuid": lcuuid,
"team_id": vtapGroupCreate.TeamID}).Error; err != nil {
return err
}
}
return nil
})
if err != nil {
return model.VtapGroup{}, err
}
for _, vtap := range vtaps {
db.Model(&vtap).Updates(map[string]interface{}{"vtap_group_lcuuid": lcuuid, "team_id": vtapGroupCreate.TeamID})
}

response, _ := a.Get(map[string]interface{}{"lcuuid": lcuuid})
refresh.RefreshCache(userInfo.ORGID, []common.DataChanged{common.DATA_CHANGED_VTAP})
Expand Down Expand Up @@ -232,108 +251,140 @@ func (a *AgentGroup) Update(lcuuid string, vtapGroupUpdate map[string]interface{
if err != nil {
return model.VtapGroup{}, err
}
db := dbInfo.DB

var vtapGroup mysql.VTapGroup
var dbUpdateMap = make(map[string]interface{})
if ret := db.Where("lcuuid = ?", lcuuid).First(&vtapGroup); ret.Error != nil {
return model.VtapGroup{}, NewError(httpcommon.RESOURCE_NOT_FOUND, fmt.Sprintf("vtap_group (%s) not found", lcuuid))
}
resourceUpdate := map[string]interface{}{
"team_id": vtapGroupUpdate["TEAM_ID"],
"owner_user_id": vtapGroupUpdate["USER_ID"],
}
if err := a.resourceAccess.CanUpdateResource(vtapGroup.TeamID,
common.SET_RESOURCE_TYPE_AGENT_GROUP, vtapGroup.Lcuuid, resourceUpdate); err != nil {
return model.VtapGroup{}, err
}

log.Infof("ORG(id=%d database=%s) update vtap_group (%s) config %v", dbInfo.ORGID, dbInfo.Name, vtapGroup.Name, vtapGroupUpdate)

// 修改名称
if _, ok := vtapGroupUpdate["NAME"]; ok {
dbUpdateMap["name"] = vtapGroupUpdate["NAME"]
}

// 修改状态
if _, ok := vtapGroupUpdate["STATE"]; ok {
db.Model(&mysql.VTap{}).Where("vtap_group_lcuuid = ?", lcuuid).Update("state", vtapGroupUpdate["STATE"])
}

// 注册采集器
if _, ok := vtapGroupUpdate["ENABLE"]; ok {
db.Model(&mysql.VTap{}).Where("vtap_group_lcuuid = ?", lcuuid).Update("enable", vtapGroupUpdate["ENABLE"])
}

if _, ok := vtapGroupUpdate["TEAM_ID"]; ok {
dbUpdateMap["team_id"] = vtapGroupUpdate["TEAM_ID"]
}
if _, ok := vtapGroupUpdate["USER_ID"]; ok {
dbUpdateMap["user_id"] = vtapGroupUpdate["USER_ID"]
}

// 修改组内采集器
if _, ok := vtapGroupUpdate["VTAP_LCUUIDS"]; ok {
if len(vtapGroupUpdate["VTAP_LCUUIDS"].([]interface{})) > cfg.Spec.VTapMaxPerGroup {
return model.VtapGroup{}, NewError(
httpcommon.SELECTED_RESOURCES_NUM_EXCEEDED,
fmt.Sprintf("vtap count exceeds (limit %d)", cfg.Spec.VTapMaxPerGroup),
)
db := dbInfo.DB
err = db.Transaction(func(tx *gorm.DB) error {
var vtapGroup mysql.VTapGroup
var vtapGroupTeamID int
if ret := tx.Where("lcuuid = ?", lcuuid).First(&vtapGroup); ret.Error != nil {
return fmt.Errorf("vtap_group (%s) not found", lcuuid)
}

var allOldVtaps []mysql.VTap
var allNewVtaps []mysql.VTap
db.Where("vtap_group_lcuuid IN (?)", vtapGroup.Lcuuid).Find(&allOldVtaps)
oldVtaps, err := getAgentByUser(userInfo, &a.cfg.FPermit, allOldVtaps)
if err != nil {
return model.VtapGroup{}, err
vtapGroupTeamID = vtapGroup.TeamID
resourceUpdate := map[string]interface{}{
"team_id": vtapGroupUpdate["TEAM_ID"],
"owner_user_id": vtapGroupUpdate["USER_ID"],
}
db.Where("lcuuid IN (?)", vtapGroupUpdate["VTAP_LCUUIDS"]).Find(&allNewVtaps)
newVtaps, err := getAgentByUser(userInfo, &a.cfg.FPermit, allNewVtaps)
if err != nil {
return model.VtapGroup{}, err
if err := a.resourceAccess.CanUpdateResource(vtapGroup.TeamID,
common.SET_RESOURCE_TYPE_AGENT_GROUP, vtapGroup.Lcuuid, resourceUpdate); err != nil {
return err
}

lcuuidToOldVtap := make(map[string]*mysql.VTap)
lcuuidToNewVtap := make(map[string]*mysql.VTap)
var oldVtapLcuuids = mapset.NewSet()
var newVtapLcuuids = mapset.NewSet()
var delVtapLcuuids = mapset.NewSet()
var addVtapLcuuids = mapset.NewSet()
for i, vtap := range oldVtaps {
lcuuidToOldVtap[vtap.Lcuuid] = &oldVtaps[i]
oldVtapLcuuids.Add(vtap.Lcuuid)
var vtapGroupConfigs []agentconf.AgentGroupConfigModel
db.Where("vtap_group_lcuuid = ?", lcuuid).Find(&vtapGroupConfigs)
// transfer agent group config
_, ok1 := vtapGroupUpdate["TEAM_ID"]
_, ok2 := vtapGroupUpdate["USER_ID"]
if ok1 && ok2 {
for _, vtapGroupConfig := range vtapGroupConfigs {
if err := a.resourceAccess.CanUpdateResource(vtapGroup.TeamID,
common.SET_RESOURCE_TYPE_AGENT_GROUP_CONFIG, *vtapGroupConfig.Lcuuid, resourceUpdate); err != nil {
return err
}
}
}
for i, vtap := range newVtaps {
lcuuidToNewVtap[vtap.Lcuuid] = &newVtaps[i]
newVtapLcuuids.Add(vtap.Lcuuid)

log.Infof("ORG(id=%d database=%s) update vtap_group (%s) config %v", dbInfo.ORGID, dbInfo.Name, vtapGroup.Name, vtapGroupUpdate)

var dbUpdateMap = make(map[string]interface{})
// 修改名称
if _, ok := vtapGroupUpdate["NAME"]; ok {
dbUpdateMap["name"] = vtapGroupUpdate["NAME"]
}

delVtapLcuuids = oldVtapLcuuids.Difference(newVtapLcuuids)
addVtapLcuuids = newVtapLcuuids.Difference(oldVtapLcuuids)
// 修改状态
if _, ok := vtapGroupUpdate["STATE"]; ok {
tx.Model(&mysql.VTap{}).Where("vtap_group_lcuuid = ?", lcuuid).Update("state", vtapGroupUpdate["STATE"])
}

var defaultVtapGroup mysql.VTapGroup
if ret := db.Where("id = ?", common.DEFAULT_VTAP_GROUP_ID).First(&defaultVtapGroup); ret.Error != nil {
return model.VtapGroup{}, NewError(httpcommon.RESOURCE_NOT_FOUND, "default vtap_group not found")
// 注册采集器
if _, ok := vtapGroupUpdate["ENABLE"]; ok {
tx.Model(&mysql.VTap{}).Where("vtap_group_lcuuid = ?", lcuuid).Update("enable", vtapGroupUpdate["ENABLE"])
}

for _, lcuuid := range delVtapLcuuids.ToSlice() {
vtap := lcuuidToOldVtap[lcuuid.(string)]
log.Infof("ORG(id=%d database=%s) update vtap group lcuuid(%s -> %s)",
dbInfo.ORGID, dbInfo.Name, vtap.VtapGroupLcuuid, defaultVtapGroup.Lcuuid)
db.Model(vtap).Updates(map[string]interface{}{"vtap_group_lcuuid": defaultVtapGroup.Lcuuid})
if _, ok := vtapGroupUpdate["TEAM_ID"]; ok {
dbUpdateMap["team_id"] = vtapGroupUpdate["TEAM_ID"]
vtapGroupTeamID, ok = vtapGroupUpdate["TEAM_ID"].(int)
if !ok {
return fmt.Errorf("team id must be int")
}
}
if _, ok := vtapGroupUpdate["USER_ID"]; ok {
dbUpdateMap["user_id"] = vtapGroupUpdate["USER_ID"]
}

for _, lcuuid := range addVtapLcuuids.ToSlice() {
vtap := lcuuidToNewVtap[lcuuid.(string)]
log.Infof("ORG(id=%d database=%s) update vtap group lcuuid(%s - > %s)",
dbInfo.ORGID, dbInfo.Name, vtap.VtapGroupLcuuid, vtapGroup.Lcuuid)
db.Model(vtap).Updates(map[string]interface{}{"vtap_group_lcuuid": vtapGroup.Lcuuid})
// 修改组内采集器
if _, ok := vtapGroupUpdate["VTAP_LCUUIDS"]; ok {
if len(vtapGroupUpdate["VTAP_LCUUIDS"].([]interface{})) > cfg.Spec.VTapMaxPerGroup {
return NewError(
httpcommon.SELECTED_RESOURCES_NUM_EXCEEDED,
fmt.Sprintf("vtap count exceeds (limit %d)", cfg.Spec.VTapMaxPerGroup),
)
}

var allOldVtaps []mysql.VTap
var allNewVtaps []mysql.VTap
tx.Where("vtap_group_lcuuid IN (?)", vtapGroup.Lcuuid).Find(&allOldVtaps)
oldVtaps, err := getAgentByUser(userInfo, &a.cfg.FPermit, allOldVtaps)
if err != nil {
return err
}
tx.Where("lcuuid IN (?)", vtapGroupUpdate["VTAP_LCUUIDS"]).Find(&allNewVtaps)
newVtaps, err := getAgentByUser(userInfo, &a.cfg.FPermit, allNewVtaps)
if err != nil {
return err
}

lcuuidToOldVtap := make(map[string]*mysql.VTap)
lcuuidToNewVtap := make(map[string]*mysql.VTap)
var oldVtapLcuuids = mapset.NewSet()
var newVtapLcuuids = mapset.NewSet()
var delVtapLcuuids = mapset.NewSet()
var addVtapLcuuids = mapset.NewSet()
for i, vtap := range oldVtaps {
lcuuidToOldVtap[vtap.Lcuuid] = &oldVtaps[i]
oldVtapLcuuids.Add(vtap.Lcuuid)
}
for i, vtap := range newVtaps {
lcuuidToNewVtap[vtap.Lcuuid] = &newVtaps[i]
newVtapLcuuids.Add(vtap.Lcuuid)
}

delVtapLcuuids = oldVtapLcuuids.Difference(newVtapLcuuids)
addVtapLcuuids = newVtapLcuuids.Difference(oldVtapLcuuids)

var defaultVtapGroup mysql.VTapGroup
if ret := tx.Where("id = ?", common.DEFAULT_VTAP_GROUP_ID).First(&defaultVtapGroup); ret.Error != nil {
return NewError(httpcommon.RESOURCE_NOT_FOUND, "default vtap_group not found")
}

for _, lcuuid := range delVtapLcuuids.ToSlice() {
vtap := lcuuidToOldVtap[lcuuid.(string)]
log.Infof("ORG(id=%d database=%s) update vtap group lcuuid(%s -> %s)",
dbInfo.ORGID, dbInfo.Name, vtap.VtapGroupLcuuid, defaultVtapGroup.Lcuuid)
if err = tx.Model(vtap).Updates(map[string]interface{}{"vtap_group_lcuuid": defaultVtapGroup.Lcuuid}).Error; err != nil {
return err
}
}

for _, lcuuid := range addVtapLcuuids.ToSlice() {
vtap := lcuuidToNewVtap[lcuuid.(string)]
if vtap.TeamID != vtapGroupTeamID {
return fmt.Errorf(
"agent(%s) team(%d) must equal to agent group team(%d)", vtap.Name, vtap.TeamID, vtapGroupTeamID)
}
log.Infof("ORG(id=%d database=%s) update vtap group lcuuid(%s - > %s)",
dbInfo.ORGID, dbInfo.Name, vtap.VtapGroupLcuuid, vtapGroup.Lcuuid)
if err = tx.Model(vtap).Updates(map[string]interface{}{"vtap_group_lcuuid": vtapGroup.Lcuuid}).Error; err != nil {
return err
}
}
}
}

// 更新vtap_group DB
db.Model(&vtapGroup).Updates(dbUpdateMap)
// 更新vtap_group DB
return tx.Model(&vtapGroup).Updates(dbUpdateMap).Error
})
if err != nil {
return model.VtapGroup{}, err
}

response, _ := a.Get(map[string]interface{}{"lcuuid": lcuuid})
refresh.RefreshCache(userInfo.ORGID, []common.DataChanged{common.DATA_CHANGED_VTAP})
Expand Down Expand Up @@ -362,11 +413,20 @@ func (a *AgentGroup) Delete(lcuuid string) (resp map[string]string, err error) {
}

log.Infof("ORG(id=%d database=%s) delete vtap_group (%s)", dbInfo.ORGID, dbInfo.Name, vtapGroup.Name)
err = db.Transaction(func(tx *gorm.DB) error {
if err = tx.Model(&mysql.VTap{}).Where("vtap_group_lcuuid = ?", lcuuid).Updates(map[string]interface{}{
"vtap_group_lcuuid": defaultVtapGroup.Lcuuid, "team_id": defaultVtapGroup.TeamID}).Error; err != nil {
return err
}
if err = db.Delete(&vtapGroup).Error; err != nil {
return err
}
return db.Where("vtap_group_lcuuid = ?", lcuuid).Delete(&agentconf.AgentGroupConfigModel{}).Error
})
if err != nil {
return nil, err
}

db.Model(&mysql.VTap{}).Where("vtap_group_lcuuid = ?", lcuuid).
Updates(map[string]interface{}{"vtap_group_lcuuid": defaultVtapGroup.Lcuuid, "team_id": defaultVtapGroup.TeamID})
db.Delete(&vtapGroup)
db.Where("vtap_group_lcuuid = ?", lcuuid).Delete(&agent_config.AgentGroupConfigModel{})
refresh.RefreshCache(orgID, []common.DataChanged{common.DATA_CHANGED_VTAP})
return map[string]string{"LCUUID": lcuuid}, nil
}

0 comments on commit a95f94b

Please sign in to comment.