Skip to content

Commit

Permalink
update 2
Browse files Browse the repository at this point in the history
Signed-off-by: Le Zhang <zhangl@us.ibm.com>
  • Loading branch information
LiilyZhang committed Jul 17, 2023
1 parent c5fa0bb commit a9bcda8
Showing 1 changed file with 37 additions and 31 deletions.
68 changes: 37 additions & 31 deletions agreementbot/consumer_protocol_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,14 +325,18 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChanged(cmd *PolicyChangedComm
agStillValid := false
policyMatches := true
noNewPriority := false
clusterNSNotChange := true

if ag.Pattern == "" {
policyMatches, noNewPriority = b.HandlePolicyChangeForAgreement(ag, cmd.Msg.OldPolicy(), cph)
policyMatches, noNewPriority, clusterNSNotChange = b.HandlePolicyChangeForAgreement(ag, cmd.Msg.OldPolicy(), cph)
agStillValid = policyMatches && noNewPriority
if ag.GetDeviceType() == persistence.DEVICE_TYPE_CLUSTER {
agStillValid = agStillValid && clusterNSNotChange
}
}

if glog.V(5) {
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for current agreement %v: agStillValid: %v, policyMatches: %v, noNewPriority: %v", ag.CurrentAgreementId, agStillValid, policyMatches, noNewPriority)))
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for current agreement %v: agStillValid: %v, policyMatches: %v, noNewPriority: %v, clusterNSNotChange: %v", ag.CurrentAgreementId, agStillValid, policyMatches, noNewPriority, clusterNSNotChange)))
}

if !agStillValid {
Expand Down Expand Up @@ -380,8 +384,9 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChanged(cmd *PolicyChangedComm

// first bool is true if the policy still matches, false otherwise
// second bool is true unless a higher priority workload than the current one has been added or changed
// third bool is true if the cluster namespace is not changed, this return value should be check only when device type is cluster
// if an error occurs, both will be false
func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persistence.Agreement, oldPolicy *policy.Policy, cph ConsumerProtocolHandler) (bool, bool) {
func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persistence.Agreement, oldPolicy *policy.Policy, cph ConsumerProtocolHandler) (bool, bool, bool) {
if glog.V(5) {
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("attempting to update agreement %v due to change in policy", ag.CurrentAgreementId)))
}
Expand All @@ -391,7 +396,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
for _, svcId := range ag.ServiceId {
if svcPol, err := exchange.GetServicePolicyWithId(b, svcId); err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get service policy for %v from the exchange: %v", svcId, err)))
return false, false
return false, false, false
} else if svcPol != nil {
svcAllPol.MergeWith(&svcPol.ExternalPolicy, false)
}
Expand All @@ -403,23 +408,23 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
_, busPol, err := compcheck.GetBusinessPolicy(busPolHandler, ag.PolicyName, true, msgPrinter)
if err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get business policy %v/%v from the exchange: %v", ag.Org, ag.PolicyName, err)))
return false, false
return false, false, false
}

nodePolHandler := exchange.GetHTTPNodePolicyHandler(b)
_, nodePol, err := compcheck.GetNodePolicy(nodePolHandler, ag.DeviceId, msgPrinter)
if err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get node policy for %v from the exchange.", ag.DeviceId)))
return false, false
return false, false, false
}

dev, err := exchange.GetExchangeDevice(b.GetHTTPFactory(), ag.DeviceId, b.GetExchangeId(), b.GetExchangeToken(), b.GetExchangeURL())
if err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get node %v from the exchange.", ag.DeviceId)))
return false, false
return false, false, false
} else if dev == nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("Device %v does not exist in the exchange.", ag.DeviceId)))
return false, false
return false, false, false
}

nodeArch := dev.Arch
Expand All @@ -435,19 +440,19 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
// skip for now if not all built-in properties are in the node policy
// this will get called again after the node updates its policy with the built-ins
if !externalpolicy.ContainsAllBuiltInNodeProps(&nodePol.Properties, swVers, dev.GetNodeType()) {
return true, true
return true, true, true
}

match, reason, producerPol, consumerPol, err := compcheck.CheckPolicyCompatiblility(nodePol, busPol, &svcAllPol, nodeArch, nil)

if !match {
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v is not longer in policy. Reason is: %v", ag.CurrentAgreementId, reason)))
return false, true
return false, true, false
}

// don't send an update if the agreement is not finalized yet
if ag.AgreementFinalizedTime == 0 {
return true, true
return true, true, true
}

// for every priority (in order highest to lowest) in the new policy with priority lower than the current wl
Expand All @@ -463,7 +468,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste

wlUsage, err := b.db.FindSingleWorkloadUsageByDeviceAndPolicyName(ag.DeviceId, ag.PolicyName) // ag.DeviceId: userdev/an12345, ag.PolicyName: "userdev/bp_location_2.0.6
if err != nil {
return false, false
return false, false, false
}
// wlUsage is nil if no prioriy is set in the previous policy
wlUsagePriority := 0
Expand All @@ -474,7 +479,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
if currentWL := policy.GetWorkloadWithPriority(busPol.Workloads, wlUsagePriority); currentWL == nil {
// the current workload priority is no longer in the deployment policy
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("current workload priority %v is no longer in policy for agreement %v", wlUsagePriority, ag.CurrentAgreementId)))
return true, false
return true, false, false
} else {
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v, get current wl with priority is %v", ag.CurrentAgreementId, currentWL)))
// currentWL: Priority: PriorityValue: 0, Retries: 0, RetryDurationS: 0, VerifiedDurationS: 0, Deployment: , DeploymentSignature: , DeploymentUserInfo: , Workload Password: , ClusterDeployment: , ClusterDeploymentSignature: , Workload URL: https://bluehorizon.network/services/location, Org: e2edev@somecomp.com, Version: 2.0.7, Arch: amd64, Deployment Overrides: , Deployment Overrides Signature:
Expand All @@ -493,11 +498,23 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
matchingWL := policy.GetWorkloadWithPriority(oldPolicy.Workloads, choice)
if matchingWL == nil || !matchingWL.IsSame(*nextPriority) {
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("Higher priority version added or modified. Cancelling agreement %v", ag.CurrentAgreementId)))
return true, false
return true, false, false
}
nextPriority = policy.GetNextWorkloadChoice(busPol.Workloads, choice)
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v inside priority loop, get next workload choice from business policy workload, choice: %v, nextPriority: %v", ag.CurrentAgreementId, choice, nextPriority)))
}

// check if cluster namespace is changed in new policy
if dev.NodeType == persistence.DEVICE_TYPE_CLUSTER && busPol.ClusterNamespace != oldPolicy.ClusterNamespace {
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cluster namespace is changed from %v to %v in busiess policy for agreement %v, checking cluster namespace compatibility ...", oldPolicy.ClusterNamespace, busPol.ClusterNamespace, ag.CurrentAgreementId)))
t_comp, consumerNamespace, t_reason := compcheck.CheckClusterNamespaceCompatibility(dev.NodeType, dev.ClusterNamespace, dev.IsNamespaceScoped, busPol.ClusterNamespace, wl.ClusterDeployment, ag.Pattern, false, msgPrinter)
if !t_comp {
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cluster namespace %v is not longer compatible for agreement %v. Reason is: %v", consumerNamespace, ag.CurrentAgreementId, t_reason)))

}
// new cluster namespace is still compatible
return true, true, false
}
}

if wl.Arch == "" || wl.Arch == "*" {
Expand All @@ -507,10 +524,10 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
// populate the workload with the deployment string
if svcDef, _, err := exchange.GetHTTPServiceHandler(b)(wl.WorkloadURL, wl.Org, wl.Version, wl.Arch); err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error getting service '%v' from the exchange, error: %v", wl, err)))
return false, false
return false, false, false
} else if svcDef == nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("Service %v not found in the exchange.", wl)))
return false, false
return false, false, false
} else {
if dev.NodeType == persistence.DEVICE_TYPE_CLUSTER {
wl.ClusterDeployment = svcDef.GetClusterDeploymentString()
Expand All @@ -521,28 +538,17 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
}
}

if dev.NodeType == persistence.DEVICE_TYPE_CLUSTER && busPol.ClusterNamespace != oldPolicy.ClusterNamespace {
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cluster namespace is changed from %v to %v in busiess policy for agreement %v, checking cluster namespace compatibility ...", oldPolicy.ClusterNamespace, busPol.ClusterNamespace, ag.CurrentAgreementId)))
t_comp, consumerNamespace, t_reason := compcheck.CheckClusterNamespaceCompatibility(dev.NodeType, dev.ClusterNamespace, dev.IsNamespaceScoped, busPol.ClusterNamespace, wl.ClusterDeployment, ag.Pattern, false, msgPrinter)
if !t_comp {
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cluster namespace %v is not longer compatible for agreement %v. Reason is: %v", consumerNamespace, ag.CurrentAgreementId, t_reason)))
return false, false
} else {
// still compatible, if consumerNamespace is changed
}
}

newTsCs, err := policy.Create_Terms_And_Conditions(producerPol, consumerPol, wl, ag.CurrentAgreementId, b.config.AgreementBot.DefaultWorkloadPW, b.config.AgreementBot.NoDataIntervalS, basicprotocol.PROTOCOL_CURRENT_VERSION)
if err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error creating new terms and conditions: %v", err)))
return false, false
return false, false, false
}

ag.LastPolicyUpdateTime = uint64(time.Now().Unix())

b.UpdateAgreement(&ag, basicprotocol.MsgUpdateTypePolicyChange, newTsCs, cph)

return true, true
return true, true, true
}

func (b *BaseConsumerProtocolHandler) HandlePolicyDeleted(cmd *PolicyDeletedCommand, cph ConsumerProtocolHandler) {
Expand Down Expand Up @@ -613,7 +619,7 @@ func (b *BaseConsumerProtocolHandler) HandleServicePolicyChanged(cmd *ServicePol
if agreements, err := b.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter(), InProgress()}, cph.Name()); err == nil {
for _, ag := range agreements {
if ag.Pattern == "" && ag.PolicyName == fmt.Sprintf("%v/%v", cmd.Msg.BusinessPolOrg, cmd.Msg.BusinessPolName) && ag.ServiceId[0] == cmd.Msg.ServiceId {
policyMatches, noNewPriority := b.HandlePolicyChangeForAgreement(ag, nil, cph)
policyMatches, noNewPriority, _ := b.HandlePolicyChangeForAgreement(ag, nil, cph)
agStillValid := policyMatches && noNewPriority
if !agStillValid {
glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a service policy %v that has changed.", ag.CurrentAgreementId, ag.ServiceId)))
Expand All @@ -638,7 +644,7 @@ func (b *BaseConsumerProtocolHandler) HandleNodePolicyChanged(cmd *NodePolicyCha
if agreements, err := b.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter(), InProgress()}, cph.Name()); err == nil {
for _, ag := range agreements {
if ag.Pattern == "" && ag.DeviceId == cutil.FormOrgSpecUrl(cmd.Msg.NodeId, cmd.Msg.NodePolOrg) {
policyMatches, noNewPriority := b.HandlePolicyChangeForAgreement(ag, nil, cph)
policyMatches, noNewPriority, _ := b.HandlePolicyChangeForAgreement(ag, nil, cph)
agStillValid := policyMatches && noNewPriority
if !agStillValid {
glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a node policy %v that has changed.", ag.CurrentAgreementId, ag.ServiceId)))
Expand Down

0 comments on commit a9bcda8

Please sign in to comment.