Skip to content

Commit

Permalink
Merge pull request #3824 from LiilyZhang/zhangl/Issue3814
Browse files Browse the repository at this point in the history
Issue #3814 - MultiNamespace: Agent should utilize IS_NAM…
  • Loading branch information
LiilyZhang authored Aug 1, 2023
2 parents adb20f7 + 86b7651 commit 58217e7
Show file tree
Hide file tree
Showing 26 changed files with 546 additions and 156 deletions.
2 changes: 1 addition & 1 deletion agreementbot/agreementworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func (b *BaseAgreementWorker) InitiateNewAgreement(cph ConsumerProtocolHandler,
// for cluster type and check for namespace compatibility
consumerNamespace := ""
if nodeType == persistence.DEVICE_TYPE_CLUSTER {
t_comp, consumerNamespace, t_reason = compcheck.CheckClusterNamespaceCompatibility(nodeType, exchangeDev.ClusterNamespace, wi.ConsumerPolicy.ClusterNamespace, topSvcDef.GetClusterDeployment(), false, msgPrinter)
t_comp, consumerNamespace, t_reason = compcheck.CheckClusterNamespaceCompatibility(nodeType, exchangeDev.ClusterNamespace, exchangeDev.IsNamespaceScoped, wi.ConsumerPolicy.ClusterNamespace, topSvcDef.GetClusterDeployment(), wi.ConsumerPolicy.PatternId, false, msgPrinter)
if !t_comp {
glog.Warningf(BAWlogstring(workerId, fmt.Sprintf("cannot make agreement with node %v for service %v/%v %v. %v", wi.Device.Id, workload.Org, workload.WorkloadURL, workload.Version, t_reason)))
return
Expand Down
63 changes: 44 additions & 19 deletions agreementbot/consumer_protocol_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,24 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChanged(cmd *PolicyChangedComm
}
continue
} else if err := b.pm.MatchesMine(cmd.Msg.Org(), pol); err != nil {
if glog.V(5) {
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cmd msg org matches mine for agreement %v", ag.CurrentAgreementId)))
}
agStillValid := false
policyMatches := true
noNewPriority := false
clusterNSNotChange := true

if ag.Pattern == "" {
policyMatches, noNewPriority = b.HandlePolicyChangeForAgreement(ag, cmd.Msg.OldPolicy(), cph)
policyMatches, noNewPriority, clusterNSNotChange = b.HandlePolicyChangeForAgreement(ag, cmd.Msg.OldPolicy(), cph)
agStillValid = policyMatches && noNewPriority
if ag.GetDeviceType() == persistence.DEVICE_TYPE_CLUSTER {
agStillValid = agStillValid && clusterNSNotChange
}
}

if glog.V(5) {
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for current agreement %v: agStillValid: %v, policyMatches: %v, noNewPriority: %v, clusterNSNotChange: %v", ag.CurrentAgreementId, agStillValid, policyMatches, noNewPriority, clusterNSNotChange)))
}

if !agStillValid {
Expand Down Expand Up @@ -373,8 +384,9 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChanged(cmd *PolicyChangedComm

// first bool is true if the policy still matches, false otherwise
// second bool is true unless a higher priority workload than the current one has been added or changed
// third bool is true if the cluster namespace is not changed, this return value should be check only when device type is cluster
// if an error occurs, both will be false
func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persistence.Agreement, oldPolicy *policy.Policy, cph ConsumerProtocolHandler) (bool, bool) {
func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persistence.Agreement, oldPolicy *policy.Policy, cph ConsumerProtocolHandler) (bool, bool, bool) {
if glog.V(5) {
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("attempting to update agreement %v due to change in policy", ag.CurrentAgreementId)))
}
Expand All @@ -384,7 +396,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
for _, svcId := range ag.ServiceId {
if svcPol, err := exchange.GetServicePolicyWithId(b, svcId); err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get service policy for %v from the exchange: %v", svcId, err)))
return false, false
return false, false, false
} else if svcPol != nil {
svcAllPol.MergeWith(&svcPol.ExternalPolicy, false)
}
Expand All @@ -396,23 +408,23 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
_, busPol, err := compcheck.GetBusinessPolicy(busPolHandler, ag.PolicyName, true, msgPrinter)
if err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get business policy %v/%v from the exchange: %v", ag.Org, ag.PolicyName, err)))
return false, false
return false, false, false
}

nodePolHandler := exchange.GetHTTPNodePolicyHandler(b)
_, nodePol, err := compcheck.GetNodePolicy(nodePolHandler, ag.DeviceId, msgPrinter)
if err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get node policy for %v from the exchange.", ag.DeviceId)))
return false, false
return false, false, false
}

dev, err := exchange.GetExchangeDevice(b.GetHTTPFactory(), ag.DeviceId, b.GetExchangeId(), b.GetExchangeToken(), b.GetExchangeURL())
if err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("failed to get node %v from the exchange.", ag.DeviceId)))
return false, false
return false, false, false
} else if dev == nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("Device %v does not exist in the exchange.", ag.DeviceId)))
return false, false
return false, false, false
}

nodeArch := dev.Arch
Expand All @@ -428,30 +440,31 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
// skip for now if not all built-in properties are in the node policy
// this will get called again after the node updates its policy with the built-ins
if !externalpolicy.ContainsAllBuiltInNodeProps(&nodePol.Properties, swVers, dev.GetNodeType()) {
return true, true
return true, true, true
}

match, reason, producerPol, consumerPol, err := compcheck.CheckPolicyCompatiblility(nodePol, busPol, &svcAllPol, nodeArch, nil)

if !match {
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v is not longer in policy. Reason is: %v", ag.CurrentAgreementId, reason)))
return false, true
return false, true, false
}

// don't send an update if the agreement is not finalized yet
if ag.AgreementFinalizedTime == 0 {
return true, true
return true, true, true
}

// for every priority (in order highest to lowest) in the new policy with priority lower than the current wl
// if it's not in the old policy, cancel
choice := -1

nextPriority := policy.GetNextWorkloadChoice(busPol.Workloads, choice)
wl := nextPriority

wlUsage, err := b.db.FindSingleWorkloadUsageByDeviceAndPolicyName(ag.DeviceId, ag.PolicyName)
if err != nil {
return false, false
return false, false, false
}
// wlUsage is nil if no prioriy is set in the previous policy
wlUsagePriority := 0
Expand All @@ -462,7 +475,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
if currentWL := policy.GetWorkloadWithPriority(busPol.Workloads, wlUsagePriority); currentWL == nil {
// the current workload priority is no longer in the deployment policy
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("current workload priority %v is no longer in policy for agreement %v", wlUsagePriority, ag.CurrentAgreementId)))
return true, false
return true, false, false
} else {
wl = currentWL
}
Expand All @@ -473,10 +486,22 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
matchingWL := policy.GetWorkloadWithPriority(oldPolicy.Workloads, choice)
if matchingWL == nil || !matchingWL.IsSame(*nextPriority) {
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("Higher priority version added or modified. Cancelling agreement %v", ag.CurrentAgreementId)))
return true, false
return true, false, false
}
nextPriority = policy.GetNextWorkloadChoice(busPol.Workloads, choice)
}

// check if cluster namespace is changed in new policy
if dev.NodeType == persistence.DEVICE_TYPE_CLUSTER && busPol.ClusterNamespace != oldPolicy.ClusterNamespace {
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cluster namespace is changed from %v to %v in busiess policy for agreement %v, checking cluster namespace compatibility ...", oldPolicy.ClusterNamespace, busPol.ClusterNamespace, ag.CurrentAgreementId)))
t_comp, consumerNamespace, t_reason := compcheck.CheckClusterNamespaceCompatibility(dev.NodeType, dev.ClusterNamespace, dev.IsNamespaceScoped, busPol.ClusterNamespace, wl.ClusterDeployment, ag.Pattern, false, msgPrinter)
if !t_comp {
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("cluster namespace %v is not longer compatible for agreement %v. Reason is: %v", consumerNamespace, ag.CurrentAgreementId, t_reason)))

}
// new cluster namespace is still compatible
return true, true, false
}
}

if wl.Arch == "" || wl.Arch == "*" {
Expand All @@ -486,10 +511,10 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
// populate the workload with the deployment string
if svcDef, _, err := exchange.GetHTTPServiceHandler(b)(wl.WorkloadURL, wl.Org, wl.Version, wl.Arch); err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error getting service '%v' from the exchange, error: %v", wl, err)))
return false, false
return false, false, false
} else if svcDef == nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("Service %v not found in the exchange.", wl)))
return false, false
return false, false, false
} else {
if dev.NodeType == persistence.DEVICE_TYPE_CLUSTER {
wl.ClusterDeployment = svcDef.GetClusterDeploymentString()
Expand All @@ -503,14 +528,14 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
newTsCs, err := policy.Create_Terms_And_Conditions(producerPol, consumerPol, wl, ag.CurrentAgreementId, b.config.AgreementBot.DefaultWorkloadPW, b.config.AgreementBot.NoDataIntervalS, basicprotocol.PROTOCOL_CURRENT_VERSION)
if err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error creating new terms and conditions: %v", err)))
return false, false
return false, false, false
}

ag.LastPolicyUpdateTime = uint64(time.Now().Unix())

b.UpdateAgreement(&ag, basicprotocol.MsgUpdateTypePolicyChange, newTsCs, cph)

return true, true
return true, true, true
}

func (b *BaseConsumerProtocolHandler) HandlePolicyDeleted(cmd *PolicyDeletedCommand, cph ConsumerProtocolHandler) {
Expand Down Expand Up @@ -581,7 +606,7 @@ func (b *BaseConsumerProtocolHandler) HandleServicePolicyChanged(cmd *ServicePol
if agreements, err := b.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter(), InProgress()}, cph.Name()); err == nil {
for _, ag := range agreements {
if ag.Pattern == "" && ag.PolicyName == fmt.Sprintf("%v/%v", cmd.Msg.BusinessPolOrg, cmd.Msg.BusinessPolName) && ag.ServiceId[0] == cmd.Msg.ServiceId {
policyMatches, noNewPriority := b.HandlePolicyChangeForAgreement(ag, nil, cph)
policyMatches, noNewPriority, _ := b.HandlePolicyChangeForAgreement(ag, nil, cph)
agStillValid := policyMatches && noNewPriority
if !agStillValid {
glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a service policy %v that has changed.", ag.CurrentAgreementId, ag.ServiceId)))
Expand All @@ -606,7 +631,7 @@ func (b *BaseConsumerProtocolHandler) HandleNodePolicyChanged(cmd *NodePolicyCha
if agreements, err := b.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter(), InProgress()}, cph.Name()); err == nil {
for _, ag := range agreements {
if ag.Pattern == "" && ag.DeviceId == cutil.FormOrgSpecUrl(cmd.Msg.NodeId, cmd.Msg.NodePolOrg) {
policyMatches, noNewPriority := b.HandlePolicyChangeForAgreement(ag, nil, cph)
policyMatches, noNewPriority, _ := b.HandlePolicyChangeForAgreement(ag, nil, cph)
agStillValid := policyMatches && noNewPriority
if !agStillValid {
glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a node policy %v that has changed.", ag.CurrentAgreementId, ag.ServiceId)))
Expand Down
11 changes: 10 additions & 1 deletion api/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type HorizonDevice struct {
Name *string `json:"name,omitempty"`
NodeType *string `json:"nodeType,omitempty"`
ClusterNamespace *string `json:"clusterNamespace"`
NamespaceScoped *bool `json:"NamespaceScoped,omitempty"`
Token *string `json:"token,omitempty"`
TokenLastValidTime *uint64 `json:"token_last_valid_time,omitempty"`
TokenValid *bool `json:"token_valid,omitempty"`
Expand Down Expand Up @@ -69,6 +70,11 @@ func (h HorizonDevice) String() string {
clusterNs = *h.ClusterNamespace
}

isNS := false
if h.NamespaceScoped != nil {
isNS = *h.NamespaceScoped
}

ha_group := ""
if h.HAGroup != nil {
ha_group = *h.HAGroup
Expand All @@ -89,7 +95,7 @@ func (h HorizonDevice) String() string {
tv = *h.TokenValid
}

return fmt.Sprintf("Id: %v, Org: %v, Pattern: %v, Name: %v, NodeType: %v, ClusterNamespace: %v, HAGroup: %v, Token: [%v], TokenLastValidTime: %v, TokenValid: %v, %v", id, org, pat, name, nodeType, clusterNs, ha_group, cred, tlvt, tv, h.Config)
return fmt.Sprintf("Id: %v, Org: %v, Pattern: %v, Name: %v, NodeType: %v, ClusterNamespace: %v, NamespaceScoped: %v, HAGroup: %v, Token: [%v], TokenLastValidTime: %v, TokenValid: %v, %v", id, org, pat, name, nodeType, clusterNs, isNS, ha_group, cred, tlvt, tv, h.Config)
}

// This is a type conversion function but note that the token field within the persistent
Expand Down Expand Up @@ -121,6 +127,9 @@ func ConvertFromPersistentHorizonDevice(pDevice *persistence.ExchangeDevice) *Ho
if pDevice.NodeType == persistence.DEVICE_TYPE_CLUSTER {
ns := cutil.GetClusterNamespace()
hDevice.ClusterNamespace = &ns

isNS := cutil.IsNamespaceScoped()
hDevice.NamespaceScoped = &isNS
}

return &hDevice
Expand Down
8 changes: 8 additions & 0 deletions api/path_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,14 @@ func CreateHorizonDevice(device *HorizonDevice,
if err := patchDeviceHandler(deviceId, *device.Token, &pdr); err != nil {
return errorhandler(NewSystemError(fmt.Sprintf("error updating cluster namespace for the exchange node. %v", err))), nil, nil
}

pdr = exchange.PatchDeviceRequest{}
isNS := cutil.IsNamespaceScoped()
pdr.IsNamespaceScoped = &isNS
if err := patchDeviceHandler(deviceId, *device.Token, &pdr); err != nil {
return errorhandler(NewSystemError(fmt.Sprintf("error updating cluster agent scope for the exchange node. %v", err))), nil, nil
}

}

// Return 2 device objects, the first is the fully populated newly created device object. The second is a device
Expand Down
17 changes: 10 additions & 7 deletions api/path_node_configstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/open-horizon/anax/persistence"
"github.com/open-horizon/anax/policy"
"github.com/open-horizon/anax/semanticversion"
"os"
"strings"
)

Expand Down Expand Up @@ -358,15 +357,19 @@ func getSpecRefsForPattern(nodeType string, patName string,

glog.V(5).Infof(apiLogString(fmt.Sprintf("working with pattern definition %v", patternDef)))

nodeNamespace := os.Getenv("AGENT_NAMESPACE")
// Uncomment this section after exchange add "isNamespaceScoped" field
nodeNamespace := cutil.GetClusterNamespace()
isNamespaceScoped := cutil.IsNamespaceScoped()

if nodeType == persistence.DEVICE_TYPE_CLUSTER {
if nodeNamespace == "" {
// TODO: a better way to detect cluster agent namespace
nodeNamespace = externalpolicy.DEFAULT_NODE_K8S_NAMESPACE
}
if nodeNamespace != externalpolicy.DEFAULT_NODE_K8S_NAMESPACE {
if patternDef.ClusterNamespace != "" && patternDef.ClusterNamespace != nodeNamespace {
return nil, nil, NewSystemError(fmt.Sprintf("Pattern cluster namespace is different from agent namespace. Cluster namespace in pattern is %v, agent namespace is %v", patternDef.ClusterNamespace, nodeNamespace))
}

glog.V(5).Infof(apiLogString(fmt.Sprintf("checking cluster namespace comptibility in pattern %v", patId)))
if err := compcheck.ValidatePatternClusterNamespace(isNamespaceScoped, nodeNamespace, patternDef.ClusterNamespace, patId, nil); err != nil {
return nil, nil, NewSystemError(err.Error())
}
}

Expand Down Expand Up @@ -416,7 +419,7 @@ func getSpecRefsForPattern(nodeType string, patName string,

if nodeType == persistence.DEVICE_TYPE_CLUSTER {
// Ignore service that has namespace conflict
if compatible, _, reason := compcheck.CheckClusterNamespaceCompatibility(nodeType, nodeNamespace, patternDef.ClusterNamespace, serviceDef.ClusterDeployment, true, nil); !compatible {
if compatible, _, reason := compcheck.CheckClusterNamespaceCompatibility(nodeType, cutil.GetClusterNamespace(), cutil.IsNamespaceScoped(), patternDef.ClusterNamespace, serviceDef.ClusterDeployment, patId, true, nil); !compatible {
// warning
glog.Infof(apiLogString(fmt.Sprintf("skipping service %v/%v because %v", service.ServiceOrg, service.ServiceURL, reason)))
continue
Expand Down
Loading

0 comments on commit 58217e7

Please sign in to comment.