From a9bcda89c890a27ac1fe0945c4054fa22dce5bcc Mon Sep 17 00:00:00 2001 From: Le Zhang Date: Mon, 17 Jul 2023 10:43:44 -0400 Subject: [PATCH] update 2 Signed-off-by: Le Zhang --- agreementbot/consumer_protocol_handler.go | 68 ++++++++++++----------- 1 file changed, 37 insertions(+), 31 deletions(-) diff --git a/agreementbot/consumer_protocol_handler.go b/agreementbot/consumer_protocol_handler.go index 4b3c32785..755c63d3e 100644 --- a/agreementbot/consumer_protocol_handler.go +++ b/agreementbot/consumer_protocol_handler.go @@ -325,14 +325,18 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChanged(cmd *PolicyChangedComm agStillValid := false policyMatches := true noNewPriority := false + clusterNSNotChange := true if ag.Pattern == "" { - policyMatches, noNewPriority = b.HandlePolicyChangeForAgreement(ag, cmd.Msg.OldPolicy(), cph) + policyMatches, noNewPriority, clusterNSNotChange = b.HandlePolicyChangeForAgreement(ag, cmd.Msg.OldPolicy(), cph) agStillValid = policyMatches && noNewPriority + if ag.GetDeviceType() == persistence.DEVICE_TYPE_CLUSTER { + agStillValid = agStillValid && clusterNSNotChange + } } if glog.V(5) { - glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for current agreement %v: agStillValid: %v, policyMatches: %v, noNewPriority: %v", ag.CurrentAgreementId, agStillValid, policyMatches, noNewPriority))) + glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for current agreement %v: agStillValid: %v, policyMatches: %v, noNewPriority: %v, clusterNSNotChange: %v", ag.CurrentAgreementId, agStillValid, policyMatches, noNewPriority, clusterNSNotChange))) } if !agStillValid { @@ -380,8 +384,9 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChanged(cmd *PolicyChangedComm // first bool is true if the policy still matches, false otherwise // second bool is true unless a higher priority workload than the current one has been added or changed +// third bool is true if the cluster namespace is not changed, this return value should be check only when device type is cluster // if an error occurs, both will be false -func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persistence.Agreement, oldPolicy *policy.Policy, cph ConsumerProtocolHandler) (bool, bool) { +func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persistence.Agreement, oldPolicy *policy.Policy, cph ConsumerProtocolHandler) (bool, bool, bool) { if glog.V(5) { glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("attempting to update agreement %v due to change in policy", ag.CurrentAgreementId))) } @@ -391,7 +396,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste for _, svcId := range ag.ServiceId { if svcPol, err := exchange.GetServicePolicyWithId(b, svcId); err != nil { glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get service policy for %v from the exchange: %v", svcId, err))) - return false, false + return false, false, false } else if svcPol != nil { svcAllPol.MergeWith(&svcPol.ExternalPolicy, false) } @@ -403,23 +408,23 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste _, busPol, err := compcheck.GetBusinessPolicy(busPolHandler, ag.PolicyName, true, msgPrinter) if err != nil { glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get business policy %v/%v from the exchange: %v", ag.Org, ag.PolicyName, err))) - return false, false + return false, false, false } nodePolHandler := exchange.GetHTTPNodePolicyHandler(b) _, nodePol, err := compcheck.GetNodePolicy(nodePolHandler, ag.DeviceId, msgPrinter) if err != nil { glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get node policy for %v from the exchange.", ag.DeviceId))) - return false, false + return false, false, false } dev, err := exchange.GetExchangeDevice(b.GetHTTPFactory(), ag.DeviceId, b.GetExchangeId(), b.GetExchangeToken(), b.GetExchangeURL()) if err != nil { glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get node %v from the exchange.", ag.DeviceId))) - return false, false + return false, false, false } else if dev == nil { glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("Device %v does not exist in the exchange.", ag.DeviceId))) - return false, false + return false, false, false } nodeArch := dev.Arch @@ -435,19 +440,19 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste // skip for now if not all built-in properties are in the node policy // this will get called again after the node updates its policy with the built-ins if !externalpolicy.ContainsAllBuiltInNodeProps(&nodePol.Properties, swVers, dev.GetNodeType()) { - return true, true + return true, true, true } match, reason, producerPol, consumerPol, err := compcheck.CheckPolicyCompatiblility(nodePol, busPol, &svcAllPol, nodeArch, nil) if !match { glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v is not longer in policy. Reason is: %v", ag.CurrentAgreementId, reason))) - return false, true + return false, true, false } // don't send an update if the agreement is not finalized yet if ag.AgreementFinalizedTime == 0 { - return true, true + return true, true, true } // for every priority (in order highest to lowest) in the new policy with priority lower than the current wl @@ -463,7 +468,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste wlUsage, err := b.db.FindSingleWorkloadUsageByDeviceAndPolicyName(ag.DeviceId, ag.PolicyName) // ag.DeviceId: userdev/an12345, ag.PolicyName: "userdev/bp_location_2.0.6 if err != nil { - return false, false + return false, false, false } // wlUsage is nil if no prioriy is set in the previous policy wlUsagePriority := 0 @@ -474,7 +479,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste if currentWL := policy.GetWorkloadWithPriority(busPol.Workloads, wlUsagePriority); currentWL == nil { // the current workload priority is no longer in the deployment policy glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("current workload priority %v is no longer in policy for agreement %v", wlUsagePriority, ag.CurrentAgreementId))) - return true, false + return true, false, false } else { glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v, get current wl with priority is %v", ag.CurrentAgreementId, currentWL))) // currentWL: Priority: PriorityValue: 0, Retries: 0, RetryDurationS: 0, VerifiedDurationS: 0, Deployment: , DeploymentSignature: , DeploymentUserInfo: , Workload Password: , ClusterDeployment: , ClusterDeploymentSignature: , Workload URL: https://bluehorizon.network/services/location, Org: e2edev@somecomp.com, Version: 2.0.7, Arch: amd64, Deployment Overrides: , Deployment Overrides Signature: @@ -493,11 +498,23 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste matchingWL := policy.GetWorkloadWithPriority(oldPolicy.Workloads, choice) if matchingWL == nil || !matchingWL.IsSame(*nextPriority) { glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("Higher priority version added or modified. Cancelling agreement %v", ag.CurrentAgreementId))) - return true, false + return true, false, false } nextPriority = policy.GetNextWorkloadChoice(busPol.Workloads, choice) glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v inside priority loop, get next workload choice from business policy workload, choice: %v, nextPriority: %v", ag.CurrentAgreementId, choice, nextPriority))) } + + // check if cluster namespace is changed in new policy + if dev.NodeType == persistence.DEVICE_TYPE_CLUSTER && busPol.ClusterNamespace != oldPolicy.ClusterNamespace { + glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cluster namespace is changed from %v to %v in busiess policy for agreement %v, checking cluster namespace compatibility ...", oldPolicy.ClusterNamespace, busPol.ClusterNamespace, ag.CurrentAgreementId))) + t_comp, consumerNamespace, t_reason := compcheck.CheckClusterNamespaceCompatibility(dev.NodeType, dev.ClusterNamespace, dev.IsNamespaceScoped, busPol.ClusterNamespace, wl.ClusterDeployment, ag.Pattern, false, msgPrinter) + if !t_comp { + glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cluster namespace %v is not longer compatible for agreement %v. Reason is: %v", consumerNamespace, ag.CurrentAgreementId, t_reason))) + + } + // new cluster namespace is still compatible + return true, true, false + } } if wl.Arch == "" || wl.Arch == "*" { @@ -507,10 +524,10 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste // populate the workload with the deployment string if svcDef, _, err := exchange.GetHTTPServiceHandler(b)(wl.WorkloadURL, wl.Org, wl.Version, wl.Arch); err != nil { glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error getting service '%v' from the exchange, error: %v", wl, err))) - return false, false + return false, false, false } else if svcDef == nil { glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("Service %v not found in the exchange.", wl))) - return false, false + return false, false, false } else { if dev.NodeType == persistence.DEVICE_TYPE_CLUSTER { wl.ClusterDeployment = svcDef.GetClusterDeploymentString() @@ -521,28 +538,17 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste } } - if dev.NodeType == persistence.DEVICE_TYPE_CLUSTER && busPol.ClusterNamespace != oldPolicy.ClusterNamespace { - glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cluster namespace is changed from %v to %v in busiess policy for agreement %v, checking cluster namespace compatibility ...", oldPolicy.ClusterNamespace, busPol.ClusterNamespace, ag.CurrentAgreementId))) - t_comp, consumerNamespace, t_reason := compcheck.CheckClusterNamespaceCompatibility(dev.NodeType, dev.ClusterNamespace, dev.IsNamespaceScoped, busPol.ClusterNamespace, wl.ClusterDeployment, ag.Pattern, false, msgPrinter) - if !t_comp { - glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cluster namespace %v is not longer compatible for agreement %v. Reason is: %v", consumerNamespace, ag.CurrentAgreementId, t_reason))) - return false, false - } else { - // still compatible, if consumerNamespace is changed - } - } - newTsCs, err := policy.Create_Terms_And_Conditions(producerPol, consumerPol, wl, ag.CurrentAgreementId, b.config.AgreementBot.DefaultWorkloadPW, b.config.AgreementBot.NoDataIntervalS, basicprotocol.PROTOCOL_CURRENT_VERSION) if err != nil { glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error creating new terms and conditions: %v", err))) - return false, false + return false, false, false } ag.LastPolicyUpdateTime = uint64(time.Now().Unix()) b.UpdateAgreement(&ag, basicprotocol.MsgUpdateTypePolicyChange, newTsCs, cph) - return true, true + return true, true, true } func (b *BaseConsumerProtocolHandler) HandlePolicyDeleted(cmd *PolicyDeletedCommand, cph ConsumerProtocolHandler) { @@ -613,7 +619,7 @@ func (b *BaseConsumerProtocolHandler) HandleServicePolicyChanged(cmd *ServicePol if agreements, err := b.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter(), InProgress()}, cph.Name()); err == nil { for _, ag := range agreements { if ag.Pattern == "" && ag.PolicyName == fmt.Sprintf("%v/%v", cmd.Msg.BusinessPolOrg, cmd.Msg.BusinessPolName) && ag.ServiceId[0] == cmd.Msg.ServiceId { - policyMatches, noNewPriority := b.HandlePolicyChangeForAgreement(ag, nil, cph) + policyMatches, noNewPriority, _ := b.HandlePolicyChangeForAgreement(ag, nil, cph) agStillValid := policyMatches && noNewPriority if !agStillValid { glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a service policy %v that has changed.", ag.CurrentAgreementId, ag.ServiceId))) @@ -638,7 +644,7 @@ func (b *BaseConsumerProtocolHandler) HandleNodePolicyChanged(cmd *NodePolicyCha if agreements, err := b.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter(), InProgress()}, cph.Name()); err == nil { for _, ag := range agreements { if ag.Pattern == "" && ag.DeviceId == cutil.FormOrgSpecUrl(cmd.Msg.NodeId, cmd.Msg.NodePolOrg) { - policyMatches, noNewPriority := b.HandlePolicyChangeForAgreement(ag, nil, cph) + policyMatches, noNewPriority, _ := b.HandlePolicyChangeForAgreement(ag, nil, cph) agStillValid := policyMatches && noNewPriority if !agStillValid { glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a node policy %v that has changed.", ag.CurrentAgreementId, ag.ServiceId)))