diff --git a/cli/ctl/trisolaris_check.go b/cli/ctl/trisolaris_check.go index 4ec3f3e94518..7e8fc7adb6db 100644 --- a/cli/ctl/trisolaris_check.go +++ b/cli/ctl/trisolaris_check.go @@ -41,17 +41,18 @@ import ( ) type ParamData struct { - CtrlIP string - CtrlMac string - GroupID string - ClusterID string - TeamID string - RpcIP string - RpcPort string - Type string - PluginType string - PluginName string - OrgID uint32 + CtrlIP string + CtrlMac string + GroupID string + ClusterID string + TeamID string + KubernetesWatchPolicy string + RpcIP string + RpcPort string + Type string + PluginType string + PluginName string + OrgID uint32 } type SortedAcls []*trident.FlowAcl @@ -227,6 +228,7 @@ func RegisterTrisolarisCommand() *cobra.Command { trisolarisCmd.PersistentFlags().StringVarP(¶mData.GroupID, "gid", "", "", "agent group ID") trisolarisCmd.PersistentFlags().StringVarP(¶mData.TeamID, "tid", "", "", "agent team ID") trisolarisCmd.PersistentFlags().StringVarP(¶mData.ClusterID, "cid", "", "", "agent k8s cluster ID") + trisolarisCmd.PersistentFlags().StringVarP(¶mData.KubernetesWatchPolicy, "kwp", "", "", "agent k8s watch policy") trisolarisCmd.PersistentFlags().StringVarP(¶mData.Type, "type", "", "trident", "request type trdient/analyzer") trisolarisCmd.PersistentFlags().StringVarP(¶mData.PluginType, "ptype", "", "wasm", "request plugin type") trisolarisCmd.PersistentFlags().StringVarP(¶mData.PluginName, "pname", "", "", "request plugin name") @@ -262,6 +264,7 @@ func initCmd(cmd *cobra.Command, cmds []CmdExecute) { defer conn.Close() var name, groupID, clusterID, teamID string var orgID uint32 + var kubernetesWatchPolicy trident.KubernetesWatchPolicy switch paramData.Type { case "trident": name = paramData.Type @@ -276,16 +279,25 @@ func initCmd(cmd *cobra.Command, cmds []CmdExecute) { fmt.Printf("type(%s) muste be in [trident, analyzer]", paramData.Type) return } + switch paramData.KubernetesWatchPolicy { + case "disabled": + kubernetesWatchPolicy = trident.KubernetesWatchPolicy_KWP_WATCH_DISABLED + case "only": + kubernetesWatchPolicy = trident.KubernetesWatchPolicy_KWP_WATCH_ONLY + default: + kubernetesWatchPolicy = trident.KubernetesWatchPolicy_KWP_NORMAL + } fmt.Printf("request trisolaris(%s), params(%+v)\n", conn.Target(), paramData) c := trident.NewSynchronizerClient(conn) reqData := &trident.SyncRequest{ - CtrlIp: ¶mData.CtrlIP, - CtrlMac: ¶mData.CtrlMac, - VtapGroupIdRequest: &groupID, - KubernetesClusterId: &clusterID, - ProcessName: &name, - TeamId: &teamID, - OrgId: &orgID, + CtrlIp: ¶mData.CtrlIP, + CtrlMac: ¶mData.CtrlMac, + VtapGroupIdRequest: &groupID, + KubernetesClusterId: &clusterID, + KubernetesWatchPolicy: &kubernetesWatchPolicy, + ProcessName: &name, + TeamId: &teamID, + OrgId: &orgID, } var response *trident.SyncResponse var err error diff --git a/cli/go.mod b/cli/go.mod index 126c2c2df475..40188823aeb8 100644 --- a/cli/go.mod +++ b/cli/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/bitly/go-simplejson v0.5.0 - github.com/deepflowio/deepflow/message v0.0.0-20240913001216-68c04e44504b + github.com/deepflowio/deepflow/message v0.0.0-20240924113131-ec9660ac2e46 github.com/deepflowio/deepflow/server v0.0.0-20240913001216-68c04e44504b github.com/golang/protobuf v1.5.4 github.com/mattn/go-runewidth v0.0.14 diff --git a/cli/go.sum b/cli/go.sum index c17513edd775..98ec7a5c1b96 100644 --- a/cli/go.sum +++ b/cli/go.sum @@ -74,8 +74,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/deepflowio/deepflow/message v0.0.0-20240913001216-68c04e44504b h1:9BFasJn1uQhu/aAzCZeoBYeawzH6LAj4qzx3LFi/2GI= -github.com/deepflowio/deepflow/message v0.0.0-20240913001216-68c04e44504b/go.mod h1:e+1lUMMlycCvFRKvlwt/y/0vxJnF8wVss3GyR1ARXY0= +github.com/deepflowio/deepflow/message v0.0.0-20240924113131-ec9660ac2e46 h1:uoBpoM3HHixjjz4aELXSGc0rHjMRhqbrF52aI+QhaKg= +github.com/deepflowio/deepflow/message v0.0.0-20240924113131-ec9660ac2e46/go.mod h1:e+1lUMMlycCvFRKvlwt/y/0vxJnF8wVss3GyR1ARXY0= github.com/deepflowio/deepflow/server v0.0.0-20240913001216-68c04e44504b h1:sfsFOMxr0zACQjtzHuee3i/tDDnDHStKApyf6mPnpbY= github.com/deepflowio/deepflow/server v0.0.0-20240913001216-68c04e44504b/go.mod h1:0FBQRNqe8wEbd17MR/cBedYnwZmGIaA0+23RwB98axo= github.com/deepflowio/deepflow/server/controller/db/mysql/migrator/edition v0.0.0-20240913001216-68c04e44504b h1:SuS5rJsqJBupkI1rJdhp2I+5aFjzONlM+/PhAkgDISY= diff --git a/server/controller/trisolaris/common/common.go b/server/controller/trisolaris/common/common.go index 8dd0d8bba5a1..b34eb1b9dbc4 100644 --- a/server/controller/trisolaris/common/common.go +++ b/server/controller/trisolaris/common/common.go @@ -16,6 +16,10 @@ package common +import ( + api "github.com/deepflowio/deepflow/message/trident" +) + const ( // VTAP VTAP_CONTROLLER_EXCEPTIONS_MASK = 0xFFFFFFFF00000000 @@ -111,3 +115,9 @@ const ( DISABLED = 0 ENABLED = 1 ) + +var ( + KWP_NORMAL = api.KubernetesWatchPolicy_KWP_NORMAL + KWP_WATCH_ONLY = api.KubernetesWatchPolicy_KWP_WATCH_ONLY + KWP_WATCH_DISABLED = api.KubernetesWatchPolicy_KWP_WATCH_DISABLED +) diff --git a/server/controller/trisolaris/services/grpc/synchronize/vtap.go b/server/controller/trisolaris/services/grpc/synchronize/vtap.go index 196deba80cda..2fc403e26efb 100644 --- a/server/controller/trisolaris/services/grpc/synchronize/vtap.go +++ b/server/controller/trisolaris/services/grpc/synchronize/vtap.go @@ -339,13 +339,16 @@ func (e *VTapEvent) Sync(ctx context.Context, in *api.SyncRequest) (*api.SyncRes log.Errorf("ctrlIp is %s, ctrlMac is %s, not team_id refuse(%v) register", ctrlIP, ctrlMac, trisolaris.GetIsRefused(), logger.NewORGPrefix(orgID)) return e.GetFailedResponse(in, nil), nil } - log.Warningf("vtap (ctrl_ip: %s, ctrl_mac: %s, team_id: (str=%s,int=%d), host_ips: %s, kubernetes_cluster_id: %s, kubernetes_force_watch: %t, group_id: %s) not found in cache. "+ + log.Warningf("vtap (ctrl_ip: %s, ctrl_mac: %s, team_id: (str=%s,int=%d), host_ips: %s, kubernetes_cluster_id: %s, kubernetes_force_watch: %t, kubernetes_watch_policy: %d, group_id: %s) not found in cache. "+ "NAME:%s REVISION:%s BOOT_TIME:%d", - ctrlIP, ctrlMac, teamIDStr, teamIDInt, in.GetHostIps(), in.GetKubernetesClusterId(), in.GetKubernetesForceWatch(), + ctrlIP, ctrlMac, teamIDStr, teamIDInt, in.GetHostIps(), + in.GetKubernetesClusterId(), in.GetKubernetesForceWatch(), in.GetKubernetesWatchPolicy(), in.GetVtapGroupIdRequest(), in.GetProcessName(), in.GetRevision(), in.GetBootTime(), logger.NewORGPrefix(orgID)) + // kubernetes_force_watch field is compatibility for old version agent // If the kubernetes_force_watch field is true, the ctrl_ip and ctrl_mac of the vtap will not change, + // If the kubernetes_watch_policy field is KWP_WATCH_ONLY, the ctrl_ip and ctrl_mac of the vtap will not change, // resulting in unsuccessful registration and a large number of error logs. - if !in.GetKubernetesForceWatch() { + if !in.GetKubernetesForceWatch() || in.GetKubernetesWatchPolicy() != KWP_WATCH_ONLY { gVTapInfo.Register( int(in.GetTapMode()), in.GetCtrlIp(), @@ -445,14 +448,21 @@ func (e *VTapEvent) Sync(ctx context.Context, in *api.SyncRequest) (*api.SyncRes } configInfo := e.generateConfigInfo(vtapCache, in.GetKubernetesClusterId(), gVTapInfo, orgID) - // 携带信息有cluster_id时选择一个采集器开启云平台同步开关 - if in.GetKubernetesClusterId() != "" && isOpenK8sSyn(vtapCache.GetVTapType()) == true { - value := gVTapInfo.GetKubernetesClusterID(in.GetKubernetesClusterId(), vtapCacheKey, in.GetKubernetesForceWatch()) + // 携带信息有cluster_id && watch_policy != disabled 时选择一个采集器开启云平台同步开关 + if in.GetKubernetesClusterId() != "" && + in.GetKubernetesWatchPolicy() != KWP_WATCH_DISABLED && + isOpenK8sSyn(vtapCache.GetVTapType()) == true { + value := gVTapInfo.GetKubernetesClusterID( + in.GetKubernetesClusterId(), + vtapCacheKey, + in.GetKubernetesForceWatch(), + int(in.GetKubernetesWatchPolicy()), + ) if value == vtapCacheKey { log.Infof( - "open cluster(%s) kubernetes_api_enabled VTap(ctrl_ip: %s, ctrl_mac: %s, team_id: (str=%s,int=%d), kubernetes_force_watch: %t)", + "open cluster(%s) kubernetes_api_enabled VTap(ctrl_ip: %s, ctrl_mac: %s, team_id: (str=%s,int=%d), kubernetes_force_watch: %t, kubernetes_watch_policy: %d)", in.GetKubernetesClusterId(), ctrlIP, ctrlMac, - teamIDStr, teamIDInt, in.GetKubernetesForceWatch(), logger.NewORGPrefix(orgID)) + teamIDStr, teamIDInt, in.GetKubernetesForceWatch(), in.GetKubernetesWatchPolicy(), logger.NewORGPrefix(orgID)) configInfo.KubernetesApiEnabled = proto.Bool(true) } } @@ -602,12 +612,21 @@ func (e *VTapEvent) noVTapResponse(in *api.SyncRequest, orgID int) *api.SyncResp } configInfo.TridentType = &tridentType configInfo.Enabled = proto.Bool(false) - value := gVTapInfo.GetKubernetesClusterID(in.GetKubernetesClusterId(), vtapCacheKey, in.GetKubernetesForceWatch()) - if value == vtapCacheKey { - configInfo.KubernetesApiEnabled = proto.Bool(true) - log.Infof( - "open cluster(%s) kubernetes_api_enabled VTap(ctrl_ip: %s, ctrl_mac: %s, kubernetes_force_watch: %t)", - in.GetKubernetesClusterId(), ctrlIP, ctrlMac, in.GetKubernetesForceWatch(), logger.NewORGPrefix(orgID)) + if in.GetKubernetesWatchPolicy() != KWP_WATCH_DISABLED { + value := gVTapInfo.GetKubernetesClusterID( + in.GetKubernetesClusterId(), + vtapCacheKey, + in.GetKubernetesForceWatch(), + int(in.GetKubernetesWatchPolicy()), + ) + if value == vtapCacheKey { + configInfo.KubernetesApiEnabled = proto.Bool(true) + log.Infof( + "open cluster(%s) kubernetes_api_enabled "+ + "VTap(ctrl_ip: %s, ctrl_mac: %s, kubernetes_force_watch: %t, kubernetes_watch_policy: %d)", + in.GetKubernetesClusterId(), ctrlIP, ctrlMac, + in.GetKubernetesForceWatch(), in.GetKubernetesWatchPolicy(), logger.NewORGPrefix(orgID)) + } } return &api.SyncResponse{ Status: &STATUS_SUCCESS, @@ -753,13 +772,22 @@ func (e *VTapEvent) pushResponse(in *api.SyncRequest, all bool) (*api.SyncRespon } configInfo := e.generateConfigInfo(vtapCache, in.GetKubernetesClusterId(), gVTapInfo, orgID) - // 携带信息有cluster_id时选择一个采集器开启云平台同步开关 - if in.GetKubernetesClusterId() != "" && isOpenK8sSyn(vtapCache.GetVTapType()) == true { - value := gVTapInfo.GetKubernetesClusterID(in.GetKubernetesClusterId(), vtapCacheKey, in.GetKubernetesForceWatch()) + // 携带信息有cluster_id && watch_policy != disabled 时选择一个采集器开启云平台同步开关 + if in.GetKubernetesClusterId() != "" && + in.GetKubernetesWatchPolicy() != KWP_WATCH_DISABLED && + isOpenK8sSyn(vtapCache.GetVTapType()) == true { + value := gVTapInfo.GetKubernetesClusterID( + in.GetKubernetesClusterId(), + vtapCacheKey, + in.GetKubernetesForceWatch(), + int(in.GetKubernetesWatchPolicy()), + ) if value == vtapCacheKey { log.Infof( - "open cluster(%s) kubernetes_api_enabled VTap(ctrl_ip: %s, ctrl_mac: %s, team_id: (str=%s,int=%d), kubernetes_force_watch: %t)", - in.GetKubernetesClusterId(), ctrlIP, ctrlMac, teamIDStr, teamIDInt, in.GetKubernetesForceWatch(), logger.NewORGPrefix(orgID)) + "open cluster(%s) kubernetes_api_enabled "+ + "VTap(ctrl_ip: %s, ctrl_mac: %s, team_id: (str=%s,int=%d), kubernetes_force_watch: %t, kubernetes_watch_policy %d)", + in.GetKubernetesClusterId(), ctrlIP, ctrlMac, teamIDStr, teamIDInt, + in.GetKubernetesForceWatch(), in.GetKubernetesWatchPolicy(), logger.NewORGPrefix(orgID)) configInfo.KubernetesApiEnabled = proto.Bool(true) } } diff --git a/server/controller/trisolaris/vtap/vtap.go b/server/controller/trisolaris/vtap/vtap.go index dd0db21a6b46..62fd46713d37 100644 --- a/server/controller/trisolaris/vtap/vtap.go +++ b/server/controller/trisolaris/vtap/vtap.go @@ -996,14 +996,19 @@ func (v *VTapInfo) getVTapPodDomains(c *VTapCache) []string { return result } -func (v *VTapInfo) GetKubernetesClusterID(clusterID string, value string, force bool) string { +func (v *VTapInfo) GetKubernetesClusterID(clusterID string, value string, force bool, watchPolicy int) string { + log.Info("##########") + log.Info(watchPolicy) + log.Info(int(KWP_WATCH_ONLY)) if v == nil { - if force { + log.Info("##########11111") + // force field is for compatibility old version agent + if force || (watchPolicy == int(KWP_WATCH_ONLY)) { return value } return "" } - return v.kcData.getClusterID(clusterID, value, force) + return v.kcData.getClusterID(clusterID, value, force || (watchPolicy == int(KWP_WATCH_ONLY))) } func (v *VTapInfo) putChVTapChangedForPD() { diff --git a/server/go.mod b/server/go.mod index e90cd626a474..2ee92a89def4 100644 --- a/server/go.mod +++ b/server/go.mod @@ -41,7 +41,7 @@ require ( github.com/cornelk/hashmap v1.0.8 github.com/deckarep/golang-set v1.8.0 github.com/deckarep/golang-set/v2 v2.1.0 - github.com/deepflowio/deepflow/message v0.0.0-20240725065348-535fb6f1efdc + github.com/deepflowio/deepflow/message v0.0.0-20240924113131-ec9660ac2e46 github.com/deepflowio/deepflow/server/controller/cloud/kubernetes_gather/expand v0.0.0-00010101000000-000000000000 github.com/deepflowio/deepflow/server/controller/cloud/platform v0.0.0-00010101000000-000000000000 github.com/deepflowio/deepflow/server/controller/cloud/tencent/expand v0.0.0-00010101000000-000000000000 diff --git a/server/go.sum b/server/go.sum index 13be41a0c90c..71c8887c4f9d 100644 --- a/server/go.sum +++ b/server/go.sum @@ -167,8 +167,8 @@ github.com/deckarep/golang-set v1.8.0 h1:sk9/l/KqpunDwP7pSjUg0keiOOLEnOBHzykLrsP github.com/deckarep/golang-set v1.8.0/go.mod h1:5nI87KwE7wgsBU1F4GKAw2Qod7p5kyS383rP6+o6qqo= github.com/deckarep/golang-set/v2 v2.1.0 h1:g47V4Or+DUdzbs8FxCCmgb6VYd+ptPAngjM6dtGktsI= github.com/deckarep/golang-set/v2 v2.1.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= -github.com/deepflowio/deepflow/message v0.0.0-20240725065348-535fb6f1efdc h1:B5hQ+ItZ4+dy4APMWZQeSV187j4dzDsxkAJfuEg1BLA= -github.com/deepflowio/deepflow/message v0.0.0-20240725065348-535fb6f1efdc/go.mod h1:e+1lUMMlycCvFRKvlwt/y/0vxJnF8wVss3GyR1ARXY0= +github.com/deepflowio/deepflow/message v0.0.0-20240924113131-ec9660ac2e46 h1:uoBpoM3HHixjjz4aELXSGc0rHjMRhqbrF52aI+QhaKg= +github.com/deepflowio/deepflow/message v0.0.0-20240924113131-ec9660ac2e46/go.mod h1:e+1lUMMlycCvFRKvlwt/y/0vxJnF8wVss3GyR1ARXY0= github.com/deepflowio/tempopb v0.0.0-20230215110519-15853baf3a79 h1:erRwXyZiUZxxZX/Q1QHesZXgxjiunZUFy+ggCRimkD4= github.com/deepflowio/tempopb v0.0.0-20230215110519-15853baf3a79/go.mod h1:h2rkZ319TExIUGuK8a2dlcGd8qc6wdhsrcpXWaJQaQE= github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE=