Skip to content

Commit

Permalink
[Controller] genesis vinterface add team id
Browse files Browse the repository at this point in the history
  • Loading branch information
askyrie authored and SongZhen0704 committed May 22, 2024
1 parent 3556c60 commit f2cb002
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 55 deletions.
3 changes: 2 additions & 1 deletion cli/ctl/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func tableVip(response *simplejson.Json, table *tablewriter.Table) {
}

func tableVinterface(response *simplejson.Json, table *tablewriter.Table) {
table.SetHeader([]string{"MAC", "NAME", "TAP_MAC", "TAP_NAME", "IF_TYPE", "DEVICE_TYPE", "DEVICE_NAME", "HOST_IP", "AGENT_ID", "CLUSTER_ID", "NETNS_ID", "IP"})
table.SetHeader([]string{"MAC", "NAME", "TAP_MAC", "TAP_NAME", "IF_TYPE", "DEVICE_TYPE", "DEVICE_NAME", "HOST_IP", "AGENT_ID", "CLUSTER_ID", "NETNS_ID", "TEAM_ID", "IP"})

tableItems := [][]string{}
for i := range response.Get("DATA").MustArray() {
Expand All @@ -351,6 +351,7 @@ func tableVinterface(response *simplejson.Json, table *tablewriter.Table) {
tableItem = append(tableItem, strconv.Itoa(data.Get("VTAP_ID").MustInt()))
tableItem = append(tableItem, data.Get("KUBERNETES_CLUSTER_ID").MustString())
tableItem = append(tableItem, strconv.Itoa(data.Get("NETNS_ID").MustInt()))
tableItem = append(tableItem, strconv.Itoa(data.Get("TEAM_ID").MustInt()))
tableItem = append(tableItem, ip)
tableItems = append(tableItems, tableItem)
}
Expand Down
4 changes: 2 additions & 2 deletions cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ go 1.18

require (
github.com/bitly/go-simplejson v0.5.0
github.com/deepflowio/deepflow/message v0.0.0-20240507083143-eaca2bed10f2
github.com/deepflowio/deepflow/server v0.0.0-20240423024840-ece29545d0ac
github.com/deepflowio/deepflow/message v0.0.0-20240508092310-e45a3d549f9b
github.com/deepflowio/deepflow/server v0.0.0-20240521151831-c9de50d1a803
github.com/golang/protobuf v1.5.4
github.com/mattn/go-runewidth v0.0.14
github.com/olekukonko/tablewriter v0.0.5
Expand Down
8 changes: 4 additions & 4 deletions cli/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deepflowio/deepflow/message v0.0.0-20240507083143-eaca2bed10f2 h1:RaWabWp/uS3/uDCwsK7arzJ7/m5x4LeXC6EXpnl6ylI=
github.com/deepflowio/deepflow/message v0.0.0-20240507083143-eaca2bed10f2/go.mod h1:e+1lUMMlycCvFRKvlwt/y/0vxJnF8wVss3GyR1ARXY0=
github.com/deepflowio/deepflow/server v0.0.0-20240423024840-ece29545d0ac h1:5gysrosvC5kwK0gwH9R/1dbgeyJRKcQj371eB1tXqCk=
github.com/deepflowio/deepflow/server v0.0.0-20240423024840-ece29545d0ac/go.mod h1:EMICsEChD3sF/62DhAsGJ/uDUEJDqEMcZjtanDH+C2o=
github.com/deepflowio/deepflow/message v0.0.0-20240508092310-e45a3d549f9b h1:dDrPqgS+JP/2tc8zIIO58Xm6ulv8nOnezEpo2wgPSEc=
github.com/deepflowio/deepflow/message v0.0.0-20240508092310-e45a3d549f9b/go.mod h1:e+1lUMMlycCvFRKvlwt/y/0vxJnF8wVss3GyR1ARXY0=
github.com/deepflowio/deepflow/server v0.0.0-20240521151831-c9de50d1a803 h1:Dh1tTAiMOWrhOVp6idJydvV+hHCZgcfBoGi9tU2mI60=
github.com/deepflowio/deepflow/server v0.0.0-20240521151831-c9de50d1a803/go.mod h1:C4Tk88hF3MBM+MexSqbbwI+6UxA9Yo0OJede6ab4pTE=
github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
Expand Down
1 change: 1 addition & 0 deletions message/controller.proto
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ message GenesisSyncVinterface {
optional string last_seen = 14;
optional uint32 netns_id = 15;
optional string if_type = 16;
optional uint32 team_id = 17;
}

message GenesisSyncProcess {
Expand Down
1 change: 1 addition & 0 deletions server/controller/db/mysql/migration/rawsql/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1861,6 +1861,7 @@ CREATE TABLE IF NOT EXISTS genesis_vinterface (
last_seen DATETIME,
vtap_id INTEGER,
kubernetes_cluster_id CHAR(64),
team_id INTEGER DEFAULT 1,
PRIMARY KEY (`lcuuid`,`vtap_id`, `node_ip`)
) ENGINE=innodb DEFAULT CHARSET=utf8mb4;
TRUNCATE TABLE genesis_vinterface;
Expand Down
35 changes: 35 additions & 0 deletions server/controller/db/mysql/migration/rawsql/issu/6.5.1.35.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- modify start, add upgrade sql
DROP PROCEDURE IF EXISTS AddColumnIfNotExists;

CREATE PROCEDURE AddColumnIfNotExists(
IN tableName VARCHAR(255),
IN colName VARCHAR(255),
IN afterCol VARCHAR(255)
)
BEGIN
DECLARE column_count INT;

-- 检查列是否存在
SELECT COUNT(*)
INTO column_count
FROM information_schema.columns
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = tableName
AND column_name = colName;

-- 如果列不存在,则添加列
IF column_count = 0 THEN
SET @sql = CONCAT('ALTER TABLE ', tableName, ' ADD COLUMN ', colName, ' INTEGER DEFAULT 1 AFTER ', afterCol);
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
END IF;
END;

CALL AddColumnIfNotExists('genesis_vinterface', 'team_id', 'kubernetes_cluster_id');

DROP PROCEDURE AddColumnIfNotExists;

-- update db_version to latest, remeber update DB_VERSION_EXPECT in migrate/init.go
UPDATE db_version SET version='6.5.1.35';
-- modify end
2 changes: 1 addition & 1 deletion server/controller/db/mysql/migration/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ package migration

const (
DB_VERSION_TABLE = "db_version"
DB_VERSION_EXPECTED = "6.5.1.34"
DB_VERSION_EXPECTED = "6.5.1.35"
)
14 changes: 11 additions & 3 deletions server/controller/genesis/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ import (

var log = logging.MustGetLogger("genesis.common")

type TeamInfo struct {
OrgID int
TeamId int
}

type VifInfo struct {
MaskLen uint32
Address string
Expand Down Expand Up @@ -499,8 +504,8 @@ func RequestGet(url string, timeout int, queryStrings map[string]string) error {
return nil
}

func GetTeamIDToOrgID() (map[string]int, error) {
teamIDToOrgID := map[string]int{}
func GetTeamShortLcuuidToInfo() (map[string]TeamInfo, error) {
teamIDToOrgID := map[string]TeamInfo{}
orgIDs, err := mysql.GetORGIDs()
if err != nil {
return teamIDToOrgID, err
Expand All @@ -518,7 +523,10 @@ func GetTeamIDToOrgID() (map[string]int, error) {
continue
}
for _, team := range teams {
teamIDToOrgID[team.ShortLcuuid] = orgID
teamIDToOrgID[team.ShortLcuuid] = TeamInfo{
OrgID: orgID,
TeamId: team.TeamID,
}
}
}
return teamIDToOrgID, nil
Expand Down
1 change: 1 addition & 0 deletions server/controller/genesis/datatypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
type VIFRPCMessage struct {
orgID int
msgType int
teamID uint32
vtapID uint32
peer string
k8sClusterID string
Expand Down
1 change: 1 addition & 0 deletions server/controller/genesis/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ func (g *Genesis) GetGenesisSyncResponse(orgID int) (GenesisSyncDataResponse, er
HostIP: v.GetHostIp(),
KubernetesClusterID: v.GetKubernetesClusterId(),
NodeIP: v.GetNodeIp(),
TeamID: v.GetTeamId(),
LastSeen: vpLastSeen,
})
}
Expand Down
104 changes: 60 additions & 44 deletions server/controller/genesis/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ func isInterestedHost(tType tridentcommon.TridentType) bool {

type TridentStats struct {
OrgID int
TeamID int
VtapID uint32
TeamID string
TeamShortLcuuid string
IP string
Proxy string
K8sVersion uint64
Expand All @@ -70,7 +71,7 @@ type SynchronizerServer struct {
k8sQueue queue.QueueWriter
prometheusQueue queue.QueueWriter
genesisSyncQueue queue.QueueWriter
teamIDToOrgID sync.Map
teamShortLcuuidToInfo sync.Map
clusterIDToVersion sync.Map
prometheusClusterIDToVersion sync.Map
vtapToVersion sync.Map
Expand Down Expand Up @@ -128,29 +129,32 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *trident.G
return &trident.GenesisSyncResponse{Version: &version}, nil
}

var orgID int
teamID := request.GetTeamId()
if teamID == "" {
var orgID, teamID int
teamShortLcuuid := request.GetTeamId()
if teamShortLcuuid == "" {
orgID = mysqlcommon.DEFAULT_ORG_ID
teamID = mysqlcommon.DEFAULT_TEAM_ID
} else {
oID, ok := g.teamIDToOrgID.Load(teamID)
if !ok {
teamIDToOrgID, err := common.GetTeamIDToOrgID()
t, ok := g.teamShortLcuuidToInfo.Load(teamShortLcuuid)
if ok {
orgID = t.(common.TeamInfo).OrgID
teamID = t.(common.TeamInfo).TeamId
} else {
teamShortLcuuidToInfo, err := common.GetTeamShortLcuuidToInfo()
if err != nil {
log.Errorf("genesis sync from %s team_id %s vtap get org id failed: %s", remote, teamID, err.Error())
log.Errorf("genesis sync from %s team_id %s vtap get team info failed: %s", remote, teamShortLcuuid, err.Error())
return &trident.GenesisSyncResponse{Version: &version}, nil
}
oID, ok := teamIDToOrgID[teamID]
teamInfo, ok := teamShortLcuuidToInfo[teamShortLcuuid]
if !ok {
log.Errorf("genesis sync from %s team_id %s not found organization", remote, teamID)
log.Errorf("genesis sync from %s team_id %s not found team info", remote, teamShortLcuuid)
return &trident.GenesisSyncResponse{Version: &version}, nil
}
orgID = oID
for k, v := range teamIDToOrgID {
g.teamIDToOrgID.Store(k, v)
orgID = teamInfo.OrgID
teamID = teamInfo.TeamId
for k, v := range teamShortLcuuidToInfo {
g.teamShortLcuuidToInfo.Store(k, v)
}
} else {
orgID = oID.(int)
}
}
vtap := fmt.Sprintf("%d-%d", orgID, vtapID)
Expand Down Expand Up @@ -187,6 +191,7 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *trident.G
peer: remote,
vtapID: vtapID,
orgID: orgID,
teamID: uint32(teamID),
msgType: common.TYPE_RENEW,
message: request,
},
Expand All @@ -200,6 +205,7 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *trident.G
peer: remote,
vtapID: vtapID,
orgID: orgID,
teamID: uint32(teamID),
k8sClusterID: k8sClusterID,
msgType: common.TYPE_UPDATE,
message: request,
Expand All @@ -222,6 +228,7 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *trident.G
stats.SyncTridentType = tType
stats.SyncLastSeen = time.Now()
stats.K8sClusterID = k8sClusterID
stats.TeamShortLcuuid = teamShortLcuuid
stats.GenesisSyncProcessDataOperation = request.GetProcessData()
stats.GenesisSyncDataOperation = platformData
g.tridentStatsMap.Store(vtap, stats)
Expand Down Expand Up @@ -255,29 +262,32 @@ func (g *SynchronizerServer) KubernetesAPISync(ctx context.Context, request *tri
}
entries := request.GetEntries()

var orgID int
teamID := request.GetTeamId()
if teamID == "" {
var orgID, teamID int
teamShortLcuuid := request.GetTeamId()
if teamShortLcuuid == "" {
orgID = mysqlcommon.DEFAULT_ORG_ID
teamID = mysqlcommon.DEFAULT_TEAM_ID
} else {
oID, ok := g.teamIDToOrgID.Load(teamID)
if !ok {
teamIDToOrgID, err := common.GetTeamIDToOrgID()
t, ok := g.teamShortLcuuidToInfo.Load(teamShortLcuuid)
if ok {
orgID = t.(common.TeamInfo).OrgID
teamID = t.(common.TeamInfo).TeamId
} else {
teamShortLcuuidToInfo, err := common.GetTeamShortLcuuidToInfo()
if err != nil {
log.Errorf("kubernetes api sync from %s team_id %s vtap get org id failed: %s", remote, teamID, err.Error())
log.Errorf("kubernetes api sync from %s team_id %s vtap get team info failed: %s", remote, teamShortLcuuid, err.Error())
return &trident.KubernetesAPISyncResponse{}, nil
}
oID, ok := teamIDToOrgID[teamID]
teamInfo, ok := teamShortLcuuidToInfo[teamShortLcuuid]
if !ok {
log.Errorf("kubernetes api sync from %s team_id %s not found organization", remote, teamID)
log.Errorf("kubernetes api sync %s team_id %s not found team info", remote, teamShortLcuuid)
return &trident.KubernetesAPISyncResponse{}, nil
}
orgID = oID
for k, v := range teamIDToOrgID {
g.teamIDToOrgID.Store(k, v)
orgID = teamInfo.OrgID
teamID = teamInfo.TeamId
for k, v := range teamShortLcuuidToInfo {
g.teamShortLcuuidToInfo.Store(k, v)
}
} else {
orgID = oID.(int)
}
}
vtap := fmt.Sprintf("%d-%d", orgID, vtapID)
Expand All @@ -296,6 +306,7 @@ func (g *SynchronizerServer) KubernetesAPISync(ctx context.Context, request *tri
stats.K8sClusterID = clusterID
stats.K8sLastSeen = time.Now()
stats.K8sVersion = version
stats.TeamShortLcuuid = teamShortLcuuid
g.tridentStatsMap.Store(vtap, stats)
now := time.Now()
if vtapID != 0 {
Expand Down Expand Up @@ -371,29 +382,32 @@ func (g *SynchronizerServer) PrometheusAPISync(ctx context.Context, request *tri
}
entries := request.GetEntries()

var orgID int
teamID := request.GetTeamId()
if teamID == "" {
var orgID, teamID int
teamShortLcuuid := request.GetTeamId()
if teamShortLcuuid == "" {
orgID = mysqlcommon.DEFAULT_ORG_ID
teamID = mysqlcommon.DEFAULT_TEAM_ID
} else {
oID, ok := g.teamIDToOrgID.Load(teamID)
if !ok {
teamIDToOrgID, err := common.GetTeamIDToOrgID()
t, ok := g.teamShortLcuuidToInfo.Load(teamShortLcuuid)
if ok {
orgID = t.(common.TeamInfo).OrgID
teamID = t.(common.TeamInfo).TeamId
} else {
teamShortLcuuidToInfo, err := common.GetTeamShortLcuuidToInfo()
if err != nil {
log.Errorf("prometheus api sync from %s team_id %s vtap get org id failed: %s", remote, teamID, err.Error())
log.Errorf("prometheus api sync from %s team_id %s vtap get team info failed: %s", remote, teamShortLcuuid, err.Error())
return &trident.PrometheusAPISyncResponse{}, nil
}
oID, ok := teamIDToOrgID[teamID]
teamInfo, ok := teamShortLcuuidToInfo[teamShortLcuuid]
if !ok {
log.Errorf("prometheus api sync from %s team_id %s not found organization", remote, teamID)
log.Errorf("prometheus api sync %s team_id %s not found team info", remote, teamShortLcuuid)
return &trident.PrometheusAPISyncResponse{}, nil
}
orgID = oID
for k, v := range teamIDToOrgID {
g.teamIDToOrgID.Store(k, v)
orgID = teamInfo.OrgID
teamID = teamInfo.TeamId
for k, v := range teamShortLcuuidToInfo {
g.teamShortLcuuidToInfo.Store(k, v)
}
} else {
orgID = oID.(int)
}
}
vtap := fmt.Sprintf("%d-%d", orgID, vtapID)
Expand All @@ -410,6 +424,7 @@ func (g *SynchronizerServer) PrometheusAPISync(ctx context.Context, request *tri
stats.TeamID = teamID
stats.VtapID = vtapID
stats.PrometheusClusterID = clusterID
stats.TeamShortLcuuid = teamShortLcuuid
stats.PrometheusLastSeen = time.Now()
stats.PrometheusVersion = version
g.tridentStatsMap.Store(vtap, stats)
Expand Down Expand Up @@ -641,6 +656,7 @@ func (g *SynchronizerServer) GenesisSharingSync(ctx context.Context, request *co
HostIp: &vData.HostIP,
KubernetesClusterId: &vData.KubernetesClusterID,
NodeIp: &vData.NodeIP,
TeamId: &vData.TeamID,
LastSeen: &vLastSeen,
}
gSyncVinterfaces = append(gSyncVinterfaces, gVinterface)
Expand Down
2 changes: 2 additions & 0 deletions server/controller/genesis/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func (v *GenesisSyncRpcUpdater) ParseVinterfaceInfo(info VIFRPCMessage, peer str
vIF.LastSeen = epoch
vIF.VtapID = vtapID
vIF.KubernetesClusterID = k8sClusterID
vIF.TeamID = info.teamID
VIFs = append(VIFs, vIF)
}
}
Expand Down Expand Up @@ -315,6 +316,7 @@ func (v *GenesisSyncRpcUpdater) ParseVinterfaceInfo(info VIFRPCMessage, peer str
vIF.LastSeen = epoch
vIF.VtapID = vtapID
vIF.KubernetesClusterID = k8sClusterID
vIF.TeamID = info.teamID
VIFs = append(VIFs, vIF)
}
return VIFs
Expand Down
1 change: 1 addition & 0 deletions server/controller/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ func (GenesisPort) TableName() string {
}

type GenesisVinterface struct {
TeamID uint32 `gorm:"column:team_id;type:int;default:1" json:"TEAM_ID"`
NetnsID uint32 `gorm:"column:netns_id;type:int unsigned;default:0" json:"NETNS_ID"`
VtapID uint32 `gorm:"primaryKey;column:vtap_id;type:int" json:"VTAP_ID"`
Lcuuid string `gorm:"primaryKey;column:lcuuid;type:char(64)" json:"LCUUID"`
Expand Down

0 comments on commit f2cb002

Please sign in to comment.