Skip to content

Commit

Permalink
feat: support k8s_watch_policy env
Browse files Browse the repository at this point in the history
  • Loading branch information
SongZhen0704 committed Sep 25, 2024
1 parent 581ca82 commit 5e23338
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 46 deletions.
48 changes: 30 additions & 18 deletions cli/ctl/trisolaris_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,18 @@ import (
)

type ParamData struct {
CtrlIP string
CtrlMac string
GroupID string
ClusterID string
TeamID string
RpcIP string
RpcPort string
Type string
PluginType string
PluginName string
OrgID uint32
CtrlIP string
CtrlMac string
GroupID string
ClusterID string
TeamID string
KubernetesWatchPolicy string
RpcIP string
RpcPort string
Type string
PluginType string
PluginName string
OrgID uint32
}

type SortedAcls []*trident.FlowAcl
Expand Down Expand Up @@ -227,6 +228,7 @@ func RegisterTrisolarisCommand() *cobra.Command {
trisolarisCmd.PersistentFlags().StringVarP(&paramData.GroupID, "gid", "", "", "agent group ID")
trisolarisCmd.PersistentFlags().StringVarP(&paramData.TeamID, "tid", "", "", "agent team ID")
trisolarisCmd.PersistentFlags().StringVarP(&paramData.ClusterID, "cid", "", "", "agent k8s cluster ID")
trisolarisCmd.PersistentFlags().StringVarP(&paramData.KubernetesWatchPolicy, "kwp", "", "", "agent k8s watch policy")
trisolarisCmd.PersistentFlags().StringVarP(&paramData.Type, "type", "", "trident", "request type trdient/analyzer")
trisolarisCmd.PersistentFlags().StringVarP(&paramData.PluginType, "ptype", "", "wasm", "request plugin type")
trisolarisCmd.PersistentFlags().StringVarP(&paramData.PluginName, "pname", "", "", "request plugin name")
Expand Down Expand Up @@ -262,6 +264,7 @@ func initCmd(cmd *cobra.Command, cmds []CmdExecute) {
defer conn.Close()
var name, groupID, clusterID, teamID string
var orgID uint32
var kubernetesWatchPolicy trident.KubernetesWatchPolicy
switch paramData.Type {
case "trident":
name = paramData.Type
Expand All @@ -276,16 +279,25 @@ func initCmd(cmd *cobra.Command, cmds []CmdExecute) {
fmt.Printf("type(%s) muste be in [trident, analyzer]", paramData.Type)
return
}
switch paramData.KubernetesWatchPolicy {
case "disabled":
kubernetesWatchPolicy = trident.KubernetesWatchPolicy_KWP_WATCH_DISABLED
case "only":
kubernetesWatchPolicy = trident.KubernetesWatchPolicy_KWP_WATCH_ONLY
default:
kubernetesWatchPolicy = trident.KubernetesWatchPolicy_KWP_NORMAL
}
fmt.Printf("request trisolaris(%s), params(%+v)\n", conn.Target(), paramData)
c := trident.NewSynchronizerClient(conn)
reqData := &trident.SyncRequest{
CtrlIp: &paramData.CtrlIP,
CtrlMac: &paramData.CtrlMac,
VtapGroupIdRequest: &groupID,
KubernetesClusterId: &clusterID,
ProcessName: &name,
TeamId: &teamID,
OrgId: &orgID,
CtrlIp: &paramData.CtrlIP,
CtrlMac: &paramData.CtrlMac,
VtapGroupIdRequest: &groupID,
KubernetesClusterId: &clusterID,
KubernetesWatchPolicy: &kubernetesWatchPolicy,
ProcessName: &name,
TeamId: &teamID,
OrgId: &orgID,
}
var response *trident.SyncResponse
var err error
Expand Down
2 changes: 1 addition & 1 deletion cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.18

require (
github.com/bitly/go-simplejson v0.5.0
github.com/deepflowio/deepflow/message v0.0.0-20240913001216-68c04e44504b
github.com/deepflowio/deepflow/message v0.0.0-20240924113131-ec9660ac2e46
github.com/deepflowio/deepflow/server v0.0.0-20240913001216-68c04e44504b
github.com/golang/protobuf v1.5.4
github.com/mattn/go-runewidth v0.0.14
Expand Down
4 changes: 2 additions & 2 deletions cli/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deepflowio/deepflow/message v0.0.0-20240913001216-68c04e44504b h1:9BFasJn1uQhu/aAzCZeoBYeawzH6LAj4qzx3LFi/2GI=
github.com/deepflowio/deepflow/message v0.0.0-20240913001216-68c04e44504b/go.mod h1:e+1lUMMlycCvFRKvlwt/y/0vxJnF8wVss3GyR1ARXY0=
github.com/deepflowio/deepflow/message v0.0.0-20240924113131-ec9660ac2e46 h1:uoBpoM3HHixjjz4aELXSGc0rHjMRhqbrF52aI+QhaKg=
github.com/deepflowio/deepflow/message v0.0.0-20240924113131-ec9660ac2e46/go.mod h1:e+1lUMMlycCvFRKvlwt/y/0vxJnF8wVss3GyR1ARXY0=
github.com/deepflowio/deepflow/server v0.0.0-20240913001216-68c04e44504b h1:sfsFOMxr0zACQjtzHuee3i/tDDnDHStKApyf6mPnpbY=
github.com/deepflowio/deepflow/server v0.0.0-20240913001216-68c04e44504b/go.mod h1:0FBQRNqe8wEbd17MR/cBedYnwZmGIaA0+23RwB98axo=
github.com/deepflowio/deepflow/server/controller/db/mysql/migrator/edition v0.0.0-20240913001216-68c04e44504b h1:SuS5rJsqJBupkI1rJdhp2I+5aFjzONlM+/PhAkgDISY=
Expand Down
10 changes: 10 additions & 0 deletions server/controller/trisolaris/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package common

import (
api "github.com/deepflowio/deepflow/message/trident"
)

const (
// VTAP
VTAP_CONTROLLER_EXCEPTIONS_MASK = 0xFFFFFFFF00000000
Expand Down Expand Up @@ -111,3 +115,9 @@ const (
DISABLED = 0
ENABLED = 1
)

var (
KWP_NORMAL = api.KubernetesWatchPolicy_KWP_NORMAL
KWP_WATCH_ONLY = api.KubernetesWatchPolicy_KWP_WATCH_ONLY
KWP_WATCH_DISABLED = api.KubernetesWatchPolicy_KWP_WATCH_DISABLED
)
66 changes: 47 additions & 19 deletions server/controller/trisolaris/services/grpc/synchronize/vtap.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,16 @@ func (e *VTapEvent) Sync(ctx context.Context, in *api.SyncRequest) (*api.SyncRes
log.Errorf("ctrlIp is %s, ctrlMac is %s, not team_id refuse(%v) register", ctrlIP, ctrlMac, trisolaris.GetIsRefused(), logger.NewORGPrefix(orgID))
return e.GetFailedResponse(in, nil), nil
}
log.Warningf("vtap (ctrl_ip: %s, ctrl_mac: %s, team_id: (str=%s,int=%d), host_ips: %s, kubernetes_cluster_id: %s, kubernetes_force_watch: %t, group_id: %s) not found in cache. "+
log.Warningf("vtap (ctrl_ip: %s, ctrl_mac: %s, team_id: (str=%s,int=%d), host_ips: %s, kubernetes_cluster_id: %s, kubernetes_force_watch: %t, kubernetes_watch_policy: %d, group_id: %s) not found in cache. "+
"NAME:%s REVISION:%s BOOT_TIME:%d",
ctrlIP, ctrlMac, teamIDStr, teamIDInt, in.GetHostIps(), in.GetKubernetesClusterId(), in.GetKubernetesForceWatch(),
ctrlIP, ctrlMac, teamIDStr, teamIDInt, in.GetHostIps(),
in.GetKubernetesClusterId(), in.GetKubernetesForceWatch(), in.GetKubernetesWatchPolicy(),
in.GetVtapGroupIdRequest(), in.GetProcessName(), in.GetRevision(), in.GetBootTime(), logger.NewORGPrefix(orgID))
// kubernetes_force_watch field is compatibility for old version agent
// If the kubernetes_force_watch field is true, the ctrl_ip and ctrl_mac of the vtap will not change,
// If the kubernetes_watch_policy field is KWP_WATCH_ONLY, the ctrl_ip and ctrl_mac of the vtap will not change,
// resulting in unsuccessful registration and a large number of error logs.
if !in.GetKubernetesForceWatch() {
if !in.GetKubernetesForceWatch() || in.GetKubernetesWatchPolicy() != KWP_WATCH_ONLY {
gVTapInfo.Register(
int(in.GetTapMode()),
in.GetCtrlIp(),
Expand Down Expand Up @@ -445,14 +448,21 @@ func (e *VTapEvent) Sync(ctx context.Context, in *api.SyncRequest) (*api.SyncRes
}

configInfo := e.generateConfigInfo(vtapCache, in.GetKubernetesClusterId(), gVTapInfo, orgID)
// 携带信息有cluster_id时选择一个采集器开启云平台同步开关
if in.GetKubernetesClusterId() != "" && isOpenK8sSyn(vtapCache.GetVTapType()) == true {
value := gVTapInfo.GetKubernetesClusterID(in.GetKubernetesClusterId(), vtapCacheKey, in.GetKubernetesForceWatch())
// 携带信息有cluster_id && watch_policy != disabled 时选择一个采集器开启云平台同步开关
if in.GetKubernetesClusterId() != "" &&
in.GetKubernetesWatchPolicy() != KWP_WATCH_DISABLED &&
isOpenK8sSyn(vtapCache.GetVTapType()) == true {
value := gVTapInfo.GetKubernetesClusterID(
in.GetKubernetesClusterId(),
vtapCacheKey,
in.GetKubernetesForceWatch(),
int(in.GetKubernetesWatchPolicy()),
)
if value == vtapCacheKey {
log.Infof(
"open cluster(%s) kubernetes_api_enabled VTap(ctrl_ip: %s, ctrl_mac: %s, team_id: (str=%s,int=%d), kubernetes_force_watch: %t)",
"open cluster(%s) kubernetes_api_enabled VTap(ctrl_ip: %s, ctrl_mac: %s, team_id: (str=%s,int=%d), kubernetes_force_watch: %t, kubernetes_watch_policy: %d)",
in.GetKubernetesClusterId(), ctrlIP, ctrlMac,
teamIDStr, teamIDInt, in.GetKubernetesForceWatch(), logger.NewORGPrefix(orgID))
teamIDStr, teamIDInt, in.GetKubernetesForceWatch(), in.GetKubernetesWatchPolicy(), logger.NewORGPrefix(orgID))
configInfo.KubernetesApiEnabled = proto.Bool(true)
}
}
Expand Down Expand Up @@ -602,12 +612,21 @@ func (e *VTapEvent) noVTapResponse(in *api.SyncRequest, orgID int) *api.SyncResp
}
configInfo.TridentType = &tridentType
configInfo.Enabled = proto.Bool(false)
value := gVTapInfo.GetKubernetesClusterID(in.GetKubernetesClusterId(), vtapCacheKey, in.GetKubernetesForceWatch())
if value == vtapCacheKey {
configInfo.KubernetesApiEnabled = proto.Bool(true)
log.Infof(
"open cluster(%s) kubernetes_api_enabled VTap(ctrl_ip: %s, ctrl_mac: %s, kubernetes_force_watch: %t)",
in.GetKubernetesClusterId(), ctrlIP, ctrlMac, in.GetKubernetesForceWatch(), logger.NewORGPrefix(orgID))
if in.GetKubernetesWatchPolicy() != KWP_WATCH_DISABLED {
value := gVTapInfo.GetKubernetesClusterID(
in.GetKubernetesClusterId(),
vtapCacheKey,
in.GetKubernetesForceWatch(),
int(in.GetKubernetesWatchPolicy()),
)
if value == vtapCacheKey {
configInfo.KubernetesApiEnabled = proto.Bool(true)
log.Infof(
"open cluster(%s) kubernetes_api_enabled "+
"VTap(ctrl_ip: %s, ctrl_mac: %s, kubernetes_force_watch: %t, kubernetes_watch_policy: %d)",
in.GetKubernetesClusterId(), ctrlIP, ctrlMac,
in.GetKubernetesForceWatch(), in.GetKubernetesWatchPolicy(), logger.NewORGPrefix(orgID))
}
}
return &api.SyncResponse{
Status: &STATUS_SUCCESS,
Expand Down Expand Up @@ -753,13 +772,22 @@ func (e *VTapEvent) pushResponse(in *api.SyncRequest, all bool) (*api.SyncRespon
}

configInfo := e.generateConfigInfo(vtapCache, in.GetKubernetesClusterId(), gVTapInfo, orgID)
// 携带信息有cluster_id时选择一个采集器开启云平台同步开关
if in.GetKubernetesClusterId() != "" && isOpenK8sSyn(vtapCache.GetVTapType()) == true {
value := gVTapInfo.GetKubernetesClusterID(in.GetKubernetesClusterId(), vtapCacheKey, in.GetKubernetesForceWatch())
// 携带信息有cluster_id && watch_policy != disabled 时选择一个采集器开启云平台同步开关
if in.GetKubernetesClusterId() != "" &&
in.GetKubernetesWatchPolicy() != KWP_WATCH_DISABLED &&
isOpenK8sSyn(vtapCache.GetVTapType()) == true {
value := gVTapInfo.GetKubernetesClusterID(
in.GetKubernetesClusterId(),
vtapCacheKey,
in.GetKubernetesForceWatch(),
int(in.GetKubernetesWatchPolicy()),
)
if value == vtapCacheKey {
log.Infof(
"open cluster(%s) kubernetes_api_enabled VTap(ctrl_ip: %s, ctrl_mac: %s, team_id: (str=%s,int=%d), kubernetes_force_watch: %t)",
in.GetKubernetesClusterId(), ctrlIP, ctrlMac, teamIDStr, teamIDInt, in.GetKubernetesForceWatch(), logger.NewORGPrefix(orgID))
"open cluster(%s) kubernetes_api_enabled "+
"VTap(ctrl_ip: %s, ctrl_mac: %s, team_id: (str=%s,int=%d), kubernetes_force_watch: %t, kubernetes_watch_policy %d)",
in.GetKubernetesClusterId(), ctrlIP, ctrlMac, teamIDStr, teamIDInt,
in.GetKubernetesForceWatch(), in.GetKubernetesWatchPolicy(), logger.NewORGPrefix(orgID))
configInfo.KubernetesApiEnabled = proto.Bool(true)
}
}
Expand Down
11 changes: 8 additions & 3 deletions server/controller/trisolaris/vtap/vtap.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,14 +996,19 @@ func (v *VTapInfo) getVTapPodDomains(c *VTapCache) []string {
return result
}

func (v *VTapInfo) GetKubernetesClusterID(clusterID string, value string, force bool) string {
func (v *VTapInfo) GetKubernetesClusterID(clusterID string, value string, force bool, watchPolicy int) string {
log.Info("##########")
log.Info(watchPolicy)
log.Info(int(KWP_WATCH_ONLY))
if v == nil {
if force {
log.Info("##########11111")
// force field is for compatibility old version agent
if force || (watchPolicy == int(KWP_WATCH_ONLY)) {
return value
}
return ""
}
return v.kcData.getClusterID(clusterID, value, force)
return v.kcData.getClusterID(clusterID, value, force || (watchPolicy == int(KWP_WATCH_ONLY)))
}

func (v *VTapInfo) putChVTapChangedForPD() {
Expand Down
2 changes: 1 addition & 1 deletion server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/cornelk/hashmap v1.0.8
github.com/deckarep/golang-set v1.8.0
github.com/deckarep/golang-set/v2 v2.1.0
github.com/deepflowio/deepflow/message v0.0.0-20240725065348-535fb6f1efdc
github.com/deepflowio/deepflow/message v0.0.0-20240924113131-ec9660ac2e46
github.com/deepflowio/deepflow/server/controller/cloud/kubernetes_gather/expand v0.0.0-00010101000000-000000000000
github.com/deepflowio/deepflow/server/controller/cloud/platform v0.0.0-00010101000000-000000000000
github.com/deepflowio/deepflow/server/controller/cloud/tencent/expand v0.0.0-00010101000000-000000000000
Expand Down
4 changes: 2 additions & 2 deletions server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ github.com/deckarep/golang-set v1.8.0 h1:sk9/l/KqpunDwP7pSjUg0keiOOLEnOBHzykLrsP
github.com/deckarep/golang-set v1.8.0/go.mod h1:5nI87KwE7wgsBU1F4GKAw2Qod7p5kyS383rP6+o6qqo=
github.com/deckarep/golang-set/v2 v2.1.0 h1:g47V4Or+DUdzbs8FxCCmgb6VYd+ptPAngjM6dtGktsI=
github.com/deckarep/golang-set/v2 v2.1.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4=
github.com/deepflowio/deepflow/message v0.0.0-20240725065348-535fb6f1efdc h1:B5hQ+ItZ4+dy4APMWZQeSV187j4dzDsxkAJfuEg1BLA=
github.com/deepflowio/deepflow/message v0.0.0-20240725065348-535fb6f1efdc/go.mod h1:e+1lUMMlycCvFRKvlwt/y/0vxJnF8wVss3GyR1ARXY0=
github.com/deepflowio/deepflow/message v0.0.0-20240924113131-ec9660ac2e46 h1:uoBpoM3HHixjjz4aELXSGc0rHjMRhqbrF52aI+QhaKg=
github.com/deepflowio/deepflow/message v0.0.0-20240924113131-ec9660ac2e46/go.mod h1:e+1lUMMlycCvFRKvlwt/y/0vxJnF8wVss3GyR1ARXY0=
github.com/deepflowio/tempopb v0.0.0-20230215110519-15853baf3a79 h1:erRwXyZiUZxxZX/Q1QHesZXgxjiunZUFy+ggCRimkD4=
github.com/deepflowio/tempopb v0.0.0-20230215110519-15853baf3a79/go.mod h1:h2rkZ319TExIUGuK8a2dlcGd8qc6wdhsrcpXWaJQaQE=
github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE=
Expand Down

0 comments on commit 5e23338

Please sign in to comment.