diff --git a/agreementbot/agreementbot.go b/agreementbot/agreementbot.go index 47c7e6402..05d96d917 100644 --- a/agreementbot/agreementbot.go +++ b/agreementbot/agreementbot.go @@ -351,7 +351,7 @@ func (w *AgreementBotWorker) Initialize() bool { // to initiate the protocol. for protocolName, _ := range w.pm.GetAllAgreementProtocols() { if policy.SupportedAgreementProtocol(protocolName) { - cph := CreateConsumerPH(protocolName, w.BaseWorker.Manager.Config, w.db, w.pm, w.BaseWorker.Manager.Messages, w.MMSObjectPM, w.retryAgreements) + cph := CreateConsumerPH(protocolName, w.BaseWorker.Manager.Config, w.db, w.pm, w.BaseWorker.Manager.Messages, w.MMSObjectPM) cph.Initialize() w.consumerPH.Add(protocolName, cph) } else { @@ -424,7 +424,7 @@ func (w *AgreementBotWorker) CommandHandler(command worker.Command) bool { // Update the protocol handler map and make sure there are workers available if the policy has a new protocol in it. if !w.consumerPH.Has(agp.Name) { glog.V(3).Infof("AgreementBotWorker creating worker pool for new agreement protocol %v", agp.Name) - cph := CreateConsumerPH(agp.Name, w.BaseWorker.Manager.Config, w.db, w.pm, w.BaseWorker.Manager.Messages, w.MMSObjectPM, w.retryAgreements) + cph := CreateConsumerPH(agp.Name, w.BaseWorker.Manager.Config, w.db, w.pm, w.BaseWorker.Manager.Messages, w.MMSObjectPM) cph.Initialize() w.consumerPH.Add(agp.Name, cph) } @@ -709,35 +709,20 @@ func (w *AgreementBotWorker) NoWorkHandler() { // pending state. glog.V(4).Infof("AgreementBotWorker Looking for pending agreements before shutting down.") - // Look at all agreements across all protocols. + agreementPendingFilter := func() persistence.AFilter { + return func(a persistence.Agreement) bool { return a.AgreementFinalizedTime == 0 && a.AgreementTimedout == 0 } + } + + // Look at all agreements across all protocols, foundPending := false for _, agp := range policy.AllAgreementProtocols() { // Find all agreements that are in progress, agreements that are not archived and dont have either a finalized time or a timeeout time. - nextAgreementId := "" - for { - lastAgreementId, agreements, err := w.db.FindAgreementsPage([]persistence.AgbotDBFilter{w.db.GetPendingFilter()}, agp, nextAgreementId, w.Config.GetAgbotDBLimit()) - if err != nil { - glog.Errorf("AgreementBotWorker unable to read agreements from database, error: %v", err) - w.Messages() <- events.NewNodeShutdownCompleteMessage(events.AGBOT_QUIESCE_COMPLETE, err.Error()) - break - } - - // There are pending agreements, so exit the scan loop. - if len(agreements) != 0 { - foundPending = true - break - } - - // If there are no more agreements to iterate, then break out of the loop. - if lastAgreementId == "" { - break - } else { - nextAgreementId = lastAgreementId - } - } - - if foundPending { + if agreements, err := w.db.FindAgreements([]persistence.AFilter{agreementPendingFilter(), persistence.UnarchivedAFilter()}, agp); err != nil { + glog.Errorf("AgreementBotWorker unable to read agreements from database, error: %v", err) + w.Messages() <- events.NewNodeShutdownCompleteMessage(events.AGBOT_QUIESCE_COMPLETE, err.Error()) + } else if len(agreements) != 0 { + foundPending = true break } @@ -838,6 +823,26 @@ func (w *AgreementBotWorker) searchNodesAndMakeAgreements(consumerPolicy *policy return err } else { + // Get all the agreements for this policy that are still active. + pendingAgreementFilter := func() persistence.AFilter { + return func(a persistence.Agreement) bool { + return a.PolicyName == consumerPolicy.Header.Name && a.AgreementTimedout == 0 + } + } + + ags := make(map[string][]persistence.Agreement) + + // The agreements with this policy could be part of any supported agreement protocol. + for _, agp := range policy.AllAgreementProtocols() { + // Find all agreements that are in progress. They might be waiting for a reply or not yet finalized. + // TODO: To support more than 1 agreement (maxagreements > 1) with this device for this policy, we need to adjust this logic. + if agreements, err := w.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter(), pendingAgreementFilter()}, agp); err != nil { + glog.Errorf("AgreementBotWorker received error trying to find pending agreements for protocol %v: %v", agp, err) + } else { + ags[agp] = agreements + } + } + for _, dev := range *devices { if filter != nil && filter(dev.Id) { @@ -847,6 +852,12 @@ func (w *AgreementBotWorker) searchNodesAndMakeAgreements(consumerPolicy *policy glog.V(3).Infof("AgreementBotWorker picked up %v for policy %v.", dev.ShortString(), consumerPolicy.Header.Name) glog.V(5).Infof("AgreementBotWorker picked up %v", dev) + // Check for agreements already in progress with this device + if found := w.alreadyMakingAgreementWith(&dev, consumerPolicy, ags); found { + glog.V(5).Infof("AgreementBotWorker skipping device id %v, agreement attempt already in progress with %v", dev.Id, consumerPolicy.Header.Name) + continue + } + // If the device is not ready to make agreements yet, then skip it. if dev.PublicKey == "" { glog.V(5).Infof("AgreementBotWorker skipping device id %v, node is not ready to exchange messages", dev.Id) @@ -893,6 +904,28 @@ func (w *AgreementBotWorker) searchNodesAndMakeAgreements(consumerPolicy *policy } +// Check all agreement protocol buckets to see if there are any agreements with this device. +// Return true if there is already an agreement for this node and policy. +func (w *AgreementBotWorker) alreadyMakingAgreementWith(dev *exchange.SearchResultDevice, consumerPolicy *policy.Policy, allAgreements map[string][]persistence.Agreement) bool { + + // Check to see if we're already doing something with this device. + for _, ags := range allAgreements { + // Look for any agreements with the current node. + for _, ag := range ags { + if ag.DeviceId == dev.Id { + if ag.AgreementFinalizedTime != 0 { + glog.V(5).Infof("AgreementBotWorker sending agreement verify for %v", ag.CurrentAgreementId) + w.consumerPH.Get(ag.AgreementProtocol).VerifyAgreement(&ag, w.consumerPH.Get(ag.AgreementProtocol)) + w.retryAgreements.AddRetry(consumerPolicy.Header.Name, dev.Id) + } + return true + } + } + } + return false + +} + func (w *AgreementBotWorker) policyWatcher(name string, quit chan bool) { worker.GetWorkerStatusManager().SetSubworkerStatus(w.GetName(), name, worker.STATUS_STARTED) @@ -1125,22 +1158,10 @@ func (w *AgreementBotWorker) syncOnInit() error { // Search all agreement protocol buckets for _, agp := range policy.AllAgreementProtocols() { - neededBCInstances := make(map[string]map[string]map[string]bool) - // Loop through our database and check each record for accuracy with the exchange and the blockchain - nextAgreementId := "" - for { - lastAgreementId, agreements, err := w.db.FindAgreementsPage([]persistence.AgbotDBFilter{w.db.GetUnarchivedFilter()}, agp, nextAgreementId, w.Config.GetAgbotDBLimit()) - if err != nil { - return errors.New(AWlogString(fmt.Sprintf("error searching database: %v", err))) - } + if agreements, err := w.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter()}, agp); err == nil { - // If there are no more agreements to iterate, then break out of the loop. - if lastAgreementId == "" { - break - } else { - nextAgreementId = lastAgreementId - } + neededBCInstances := make(map[string]map[string]map[string]bool) for _, ag := range agreements { @@ -1223,17 +1244,20 @@ func (w *AgreementBotWorker) syncOnInit() error { } } - } - // Fire off start requests for each BC client that we need running. The blockchain worker and the container worker will tolerate - // a start request for containers that are already running. - glog.V(3).Infof(AWlogString(fmt.Sprintf("discovered BC instances in DB %v", neededBCInstances))) - for org, typeMap := range neededBCInstances { - for typeName, instMap := range typeMap { - for instName, _ := range instMap { - w.Messages() <- events.NewNewBCContainerMessage(events.NEW_BC_CLIENT, typeName, instName, org, w.GetExchangeURL(), w.GetExchangeId(), w.GetExchangeToken()) + // Fire off start requests for each BC client that we need running. The blockchain worker and the container worker will tolerate + // a start request for containers that are already running. + glog.V(3).Infof(AWlogString(fmt.Sprintf("discovered BC instances in DB %v", neededBCInstances))) + for org, typeMap := range neededBCInstances { + for typeName, instMap := range typeMap { + for instName, _ := range instMap { + w.Messages() <- events.NewNewBCContainerMessage(events.NEW_BC_CLIENT, typeName, instName, org, w.GetExchangeURL(), w.GetExchangeId(), w.GetExchangeToken()) + } } } + + } else { + return errors.New(AWlogString(fmt.Sprintf("error searching database: %v", err))) } } diff --git a/agreementbot/agreementworker.go b/agreementbot/agreementworker.go index fedfcb341..d16058c1c 100644 --- a/agreementbot/agreementworker.go +++ b/agreementbot/agreementworker.go @@ -155,17 +155,15 @@ type HandleWorkloadUpgrade struct { AgreementId string Protocol string Device string - Org string PolicyName string } -func NewHandleWorkloadUpgrade(agId string, protocol string, device string, org string, policyName string) AgreementWork { +func NewHandleWorkloadUpgrade(agId string, protocol string, device string, policyName string) AgreementWork { return HandleWorkloadUpgrade{ workType: WORKLOAD_UPGRADE, AgreementId: agId, Device: device, Protocol: protocol, - Org: org, PolicyName: policyName, } } @@ -224,15 +222,14 @@ type AgreementWorker interface { } type BaseAgreementWorker struct { - pm *policy.PolicyManager - db persistence.AgbotDatabase - config *config.HorizonConfig - alm *AgreementLockManager - workerID string - httpClient *http.Client - ec *worker.BaseExchangeContext - mmsObjMgr *MMSObjectPolicyManager - retryAgreements *RetryAgreements + pm *policy.PolicyManager + db persistence.AgbotDatabase + config *config.HorizonConfig + alm *AgreementLockManager + workerID string + httpClient *http.Client + ec *worker.BaseExchangeContext + mmsObjMgr *MMSObjectPolicyManager } // A local implementation of the ExchangeContext interface because Agbot agreement workers are not full featured workers. @@ -280,40 +277,8 @@ func (b *BaseAgreementWorker) AgreementLockManager() *AgreementLockManager { return b.alm } -// Return true if there is already an agreement for this node and policy. -func (b *BaseAgreementWorker) alreadyMakingAgreementWith(workerId string, cph ConsumerProtocolHandler, wi *InitiateAgreement) bool { - - // If there is at least 1 agreement that matches the node and policy, that's enough to skip the node. - _, ags, err := b.db.FindAgreementsPage([]persistence.AgbotDBFilter{b.db.GetNodeFilter(wi.Device.Id), b.db.GetPolicyFilter(wi.Org, wi.ConsumerPolicy.Header.Name), b.db.GetActiveFilter()}, cph.Name(), "", 1) - if err != nil { - glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("error searching for existing agreements for %v, %v, error: %v", wi.Device.Id, wi.ConsumerPolicy.Header.Name, err))) - return false - } - - // If an in-progress agreement was returned from the query, check to make sure it is not finalized. If it's finalized, then - // the exchange should not have returned the node in the search results. The agbot might be out of sync, so send an - // agreement verify request to the node to find out if the agreement is valid or not. - if ags != nil && len(ags) > 0 { - if ags[0].AgreementFinalizedTime != 0 { - glog.V(5).Infof(BAWlogstring(workerId, fmt.Sprintf("sending agreement verify for %v", ags[0].CurrentAgreementId))) - cph.VerifyAgreement(&ags[0], cph) - b.retryAgreements.AddRetry(wi.ConsumerPolicy.Header.Name, wi.Device.Id) - } - return true - } - - return false -} - func (b *BaseAgreementWorker) InitiateNewAgreement(cph ConsumerProtocolHandler, wi *InitiateAgreement, random *rand.Rand, workerId string) { - // Check the database to see if the node already has an agreement with the policy. This check is done here on the protocol worker thread - // in order to offload the DB call from the main agbot thread. - if found := b.alreadyMakingAgreementWith(workerId, cph, wi); found { - glog.Infof(BAWlogstring(workerId, fmt.Sprintf("skipping device id %v, agreement attempt already in progress with %v", wi.Device.Id, wi.ConsumerPolicy.Header.Name))) - return - } - // Generate an agreement ID agreementIdString, aerr := cutil.GenerateAgreementId() if aerr != nil { @@ -753,7 +718,7 @@ func (b *BaseAgreementWorker) HandleAgreementReply(cph ConsumerProtocolHandler, // Find the saved agreement in the database. The returned agreement might be archived. If it's archived, then it is our agreement // so we will delete the protocol msg. - if agreement, err := b.db.FindSingleAgreementByAgreementId(reply.AgreementId(), cph.Name(), []persistence.AgbotDBFilter{}); err != nil { + if agreement, err := b.db.FindSingleAgreementByAgreementId(reply.AgreementId(), cph.Name(), []persistence.AFilter{}); err != nil { // A DB error occurred so we dont know if this is our agreement or not. Leave it alone until the agbot is restarted // or until the DB error is resolved. glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("error querying pending agreement %v, error: %v", reply.AgreementId(), err))) @@ -950,7 +915,7 @@ func (b *BaseAgreementWorker) HandleDataReceivedAck(cph ConsumerProtocolHandler, // The agreement might be archived in this agbot's partition. If it's archived, then it is our agreement // so we will delete the protocol msg, but we will ignore the ack msg. - if ag, err := b.db.FindSingleAgreementByAgreementId(drAck.AgreementId(), cph.Name(), []persistence.AgbotDBFilter{}); err != nil { + if ag, err := b.db.FindSingleAgreementByAgreementId(drAck.AgreementId(), cph.Name(), []persistence.AFilter{}); err != nil { glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("error querying agreement %v, error: %v", drAck.AgreementId(), err))) deleteMessage = false } else if ag != nil && ag.Archived { @@ -989,7 +954,7 @@ func (b *BaseAgreementWorker) HandleWorkloadUpgrade(cph ConsumerProtocolHandler, // grab the agreement id lock, cancel the agreement and delete the workload usage record. if wi.AgreementId == "" { - if _, ags, err := b.db.FindAgreementsPage([]persistence.AgbotDBFilter{b.db.GetNodeFilter(wi.Device), b.db.GetPolicyFilter(wi.Org, wi.PolicyName)}, cph.Name(), "", 999); err != nil { + if ags, err := b.db.FindAgreements([]persistence.AFilter{persistence.DevPolAFilter(wi.Device, wi.PolicyName)}, cph.Name()); err != nil { glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("error finding agreement for device %v and policyName %v, error: %v", wi.Device, wi.PolicyName, err))) } else if len(ags) == 0 { // If there is no agreement found, is it a problem? We could have caught the system in a state where there is no @@ -1117,7 +1082,7 @@ func (b *BaseAgreementWorker) ExternalCancel(cph ConsumerProtocolHandler, agreem glog.V(3).Infof(BAWlogstring(workerId, fmt.Sprintf("starting deferred cancel for %v", agreementId))) // Find the agreement record - if ag, err := b.db.FindSingleAgreementByAgreementId(agreementId, cph.Name(), []persistence.AgbotDBFilter{}); err != nil { + if ag, err := b.db.FindSingleAgreementByAgreementId(agreementId, cph.Name(), []persistence.AFilter{}); err != nil { glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("error querying agreement %v from database, error: %v", agreementId, err))) } else if ag == nil { glog.V(3).Infof(BAWlogstring(workerId, fmt.Sprintf("nothing to terminate for agreement %v, no database record.", agreementId))) diff --git a/agreementbot/api.go b/agreementbot/api.go index 8b7c984a4..5fc0b242d 100644 --- a/agreementbot/api.go +++ b/agreementbot/api.go @@ -208,7 +208,7 @@ func (a *API) agreement(w http.ResponseWriter, r *http.Request) { id := pathVars["id"] if id != "" { - if ag, err := a.db.FindSingleAgreementByAgreementIdAllProtocols(id, policy.AllAgreementProtocols(), []persistence.AgbotDBFilter{}); err != nil { + if ag, err := a.db.FindSingleAgreementByAgreementIdAllProtocols(id, policy.AllAgreementProtocols(), []persistence.AFilter{}); err != nil { glog.Error(APIlogString(fmt.Sprintf("error finding agreement %v, error: %v", id, err))) http.Error(w, "Internal server error", http.StatusInternalServerError) } else if ag == nil { @@ -228,22 +228,11 @@ func (a *API) agreement(w http.ResponseWriter, r *http.Request) { wrap[agreementsKey][activeKey] = []persistence.Agreement{} for _, agp := range policy.AllAgreementProtocols() { - - nextAgreementId := "" - for { - lastAgreementId, ags, err := a.db.FindAgreementsPage([]persistence.AgbotDBFilter{}, agp, nextAgreementId, a.Config.GetAgbotDBLimit()) - if err != nil { - glog.Error(APIlogString(fmt.Sprintf("error finding all agreements, error: %v", err))) - http.Error(w, "Internal server error", http.StatusInternalServerError) - return - } - - // If there are no more agreements to iterate, then break out of the loop. - if lastAgreementId == "" { - break - } else { - nextAgreementId = lastAgreementId - } + if ags, err := a.db.FindAgreements([]persistence.AFilter{}, agp); err != nil { + glog.Error(APIlogString(fmt.Sprintf("error finding all agreements, error: %v", err))) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } else { for _, agreement := range ags { // The archived agreements and the agreements being terminated are returned as archived. @@ -275,7 +264,7 @@ func (a *API) agreement(w http.ResponseWriter, r *http.Request) { } glog.V(3).Infof(APIlogString(fmt.Sprintf("handling DELETE of agreement: %v", r))) - if ag, err := a.db.FindSingleAgreementByAgreementIdAllProtocols(id, policy.AllAgreementProtocols(), []persistence.AgbotDBFilter{a.db.GetUnarchivedFilter()}); err != nil { + if ag, err := a.db.FindSingleAgreementByAgreementIdAllProtocols(id, policy.AllAgreementProtocols(), []persistence.AFilter{persistence.UnarchivedAFilter()}); err != nil { glog.Error(APIlogString(fmt.Sprintf("error finding agreement %v, error: %v", id, err))) w.WriteHeader(http.StatusInternalServerError) } else if ag == nil { @@ -399,7 +388,7 @@ func (a *API) policy(w http.ResponseWriter, r *http.Request) { protocol := "" // The body is syntacticly correct, verify that the agreement id matches up with the device id and policy name. if upgrade.AgreementId != "" { - if ag, err := a.db.FindSingleAgreementByAgreementIdAllProtocols(upgrade.AgreementId, policy.AllAgreementProtocols(), []persistence.AgbotDBFilter{a.db.GetUnarchivedFilter()}); err != nil { + if ag, err := a.db.FindSingleAgreementByAgreementIdAllProtocols(upgrade.AgreementId, policy.AllAgreementProtocols(), []persistence.AFilter{persistence.UnarchivedAFilter()}); err != nil { glog.Error(APIlogString(fmt.Sprintf("error finding agreement %v, error: %v", upgrade.AgreementId, err))) w.WriteHeader(http.StatusInternalServerError) return @@ -438,7 +427,7 @@ func (a *API) policy(w http.ResponseWriter, r *http.Request) { } // If we got this far, begin workload upgrade processing. - a.Messages() <- events.NewABApiWorkloadUpgradeMessage(events.WORKLOAD_UPGRADE, protocol, upgrade.AgreementId, upgrade.Device, upgrade.Org, policyName) + a.Messages() <- events.NewABApiWorkloadUpgradeMessage(events.WORKLOAD_UPGRADE, protocol, upgrade.AgreementId, upgrade.Device, policyName) w.WriteHeader(http.StatusOK) case "OPTIONS": diff --git a/agreementbot/basic_agreement_worker.go b/agreementbot/basic_agreement_worker.go index 60fecb52d..d3edb527d 100644 --- a/agreementbot/basic_agreement_worker.go +++ b/agreementbot/basic_agreement_worker.go @@ -19,7 +19,7 @@ type BasicAgreementWorker struct { protocolHandler *BasicProtocolHandler } -func NewBasicAgreementWorker(c *BasicProtocolHandler, cfg *config.HorizonConfig, db persistence.AgbotDatabase, pm *policy.PolicyManager, alm *AgreementLockManager, mmsObjMgr *MMSObjectPolicyManager, retryAgs *RetryAgreements) *BasicAgreementWorker { +func NewBasicAgreementWorker(c *BasicProtocolHandler, cfg *config.HorizonConfig, db persistence.AgbotDatabase, pm *policy.PolicyManager, alm *AgreementLockManager, mmsObjMgr *MMSObjectPolicyManager) *BasicAgreementWorker { id, err := uuid.NewV4() if err != nil { @@ -28,15 +28,14 @@ func NewBasicAgreementWorker(c *BasicProtocolHandler, cfg *config.HorizonConfig, p := &BasicAgreementWorker{ BaseAgreementWorker: &BaseAgreementWorker{ - pm: pm, - db: db, - config: cfg, - alm: alm, - workerID: id.String(), - httpClient: cfg.Collaborators.HTTPClientFactory.NewHTTPClient(nil), - ec: worker.NewExchangeContext(cfg.AgreementBot.ExchangeId, cfg.AgreementBot.ExchangeToken, cfg.AgreementBot.ExchangeURL, cfg.GetAgbotCSSURL(), cfg.Collaborators.HTTPClientFactory), - mmsObjMgr: mmsObjMgr, - retryAgreements: retryAgs, + pm: pm, + db: db, + config: cfg, + alm: alm, + workerID: id.String(), + httpClient: cfg.Collaborators.HTTPClientFactory.NewHTTPClient(nil), + ec: worker.NewExchangeContext(cfg.AgreementBot.ExchangeId, cfg.AgreementBot.ExchangeToken, cfg.AgreementBot.ExchangeURL, cfg.GetAgbotCSSURL(), cfg.Collaborators.HTTPClientFactory), + mmsObjMgr: mmsObjMgr, }, protocolHandler: c, } @@ -190,7 +189,7 @@ func (a *BasicAgreementWorker) start(work *PrioritizedWorkQueue, random *rand.Ra exists := false deleteMessage := true sendReply := true - if agreement, err := a.db.FindSingleAgreementByAgreementId(wi.Verify.AgreementId(), a.protocolHandler.Name(), []persistence.AgbotDBFilter{}); err != nil { + if agreement, err := a.db.FindSingleAgreementByAgreementId(wi.Verify.AgreementId(), a.protocolHandler.Name(), []persistence.AFilter{}); err != nil { glog.Errorf(bwlogstring(a.workerID, fmt.Sprintf("error querying agreement %v, error: %v", wi.Verify.AgreementId(), err))) sendReply = false } else if agreement != nil && agreement.Archived { @@ -228,7 +227,7 @@ func (a *BasicAgreementWorker) start(work *PrioritizedWorkQueue, random *rand.Ra cancel := false deleteMessage := true - if agreement, err := a.db.FindSingleAgreementByAgreementId(wi.VerifyReply.AgreementId(), a.protocolHandler.Name(), []persistence.AgbotDBFilter{}); err != nil { + if agreement, err := a.db.FindSingleAgreementByAgreementId(wi.VerifyReply.AgreementId(), a.protocolHandler.Name(), []persistence.AFilter{}); err != nil { glog.Errorf(bwlogstring(a.workerID, fmt.Sprintf("error querying agreement %v, error: %v", wi.VerifyReply.AgreementId(), err))) } else if agreement != nil && agreement.Archived { // The agreement is not active and it is archived, so this message belongs to this agbot. diff --git a/agreementbot/basic_protocol_handler.go b/agreementbot/basic_protocol_handler.go index abac32a5c..842876eaa 100644 --- a/agreementbot/basic_protocol_handler.go +++ b/agreementbot/basic_protocol_handler.go @@ -23,7 +23,7 @@ type BasicProtocolHandler struct { Work *PrioritizedWorkQueue } -func NewBasicProtocolHandler(name string, cfg *config.HorizonConfig, db persistence.AgbotDatabase, pm *policy.PolicyManager, messages chan events.Message, mmsObjMgr *MMSObjectPolicyManager, retryAgs *RetryAgreements) *BasicProtocolHandler { +func NewBasicProtocolHandler(name string, cfg *config.HorizonConfig, db persistence.AgbotDatabase, pm *policy.PolicyManager, messages chan events.Message, mmsObjMgr *MMSObjectPolicyManager) *BasicProtocolHandler { if name == basicprotocol.PROTOCOL_NAME { return &BasicProtocolHandler{ BaseConsumerProtocolHandler: &BaseConsumerProtocolHandler{ @@ -37,7 +37,6 @@ func NewBasicProtocolHandler(name string, cfg *config.HorizonConfig, db persiste deferredCommands: make([]AgreementWork, 0, 10), messages: messages, mmsObjMgr: mmsObjMgr, - retryAgreements: retryAgs, }, agreementPH: basicprotocol.NewProtocolHandler(cfg.Collaborators.HTTPClientFactory.NewHTTPClient(nil), pm), // Allow the main agbot thread to distribute protocol msgs and agreement handling to the worker pool. @@ -67,7 +66,7 @@ func (c *BasicProtocolHandler) Initialize() { // Set up agreement worker pool based on the current technical config. for ix := 0; ix < c.config.AgreementBot.AgreementWorkers; ix++ { - agw := NewBasicAgreementWorker(c, c.config, c.db, c.pm, agreementLockMgr, c.mmsObjMgr, c.retryAgreements) + agw := NewBasicAgreementWorker(c, c.config, c.db, c.pm, agreementLockMgr, c.mmsObjMgr) go agw.start(c.Work, random) } diff --git a/agreementbot/changes_worker.go b/agreementbot/changes_worker.go index bd7915092..966602b9d 100644 --- a/agreementbot/changes_worker.go +++ b/agreementbot/changes_worker.go @@ -66,9 +66,6 @@ func (w *ChangesWorker) NewEvent(incoming events.Message) { msg, _ := incoming.(*events.NodeShutdownCompleteMessage) switch msg.Event().Id { case events.UNCONFIGURE_COMPLETE: - w.Commands <- worker.NewBeginShutdownCommand() - w.Commands <- worker.NewTerminateCommand("shutdown") - case events.AGBOT_QUIESCE_COMPLETE: w.Commands <- worker.NewTerminateCommand("shutdown") } diff --git a/agreementbot/consumer_protocol_handler.go b/agreementbot/consumer_protocol_handler.go index 39c74a1fc..37d5810d6 100644 --- a/agreementbot/consumer_protocol_handler.go +++ b/agreementbot/consumer_protocol_handler.go @@ -15,12 +15,11 @@ import ( "github.com/open-horizon/anax/policy" "github.com/open-horizon/anax/worker" "net/http" - "sync" "time" ) -func CreateConsumerPH(name string, cfg *config.HorizonConfig, db persistence.AgbotDatabase, pm *policy.PolicyManager, msgq chan events.Message, mmsObjMgr *MMSObjectPolicyManager, retryAgs *RetryAgreements) ConsumerProtocolHandler { - if handler := NewBasicProtocolHandler(name, cfg, db, pm, msgq, mmsObjMgr, retryAgs); handler != nil { +func CreateConsumerPH(name string, cfg *config.HorizonConfig, db persistence.AgbotDatabase, pm *policy.PolicyManager, msgq chan events.Message, mmsObjMgr *MMSObjectPolicyManager) ConsumerProtocolHandler { + if handler := NewBasicProtocolHandler(name, cfg, db, pm, msgq, mmsObjMgr); handler != nil { return handler } // Add new consumer side protocol handlers here return nil @@ -87,10 +86,8 @@ type BaseConsumerProtocolHandler struct { agbotId string token string deferredCommands []AgreementWork // The agreement related work that has to be deferred and retried - dcLock sync.Mutex // The lock that protects the deferred command queue messages chan events.Message mmsObjMgr *MMSObjectPolicyManager - retryAgreements *RetryAgreements } func (b *BaseConsumerProtocolHandler) GetSendMessage() func(mt interface{}, pay []byte) error { @@ -199,7 +196,7 @@ func (b *BaseConsumerProtocolHandler) DispatchProtocolMessage(cmd *NewProtocolMe glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("queued data received ack message"))) } else if can, cerr := cph.AgreementProtocolHandler("", "", "").ValidateCancel(string(cmd.Message)); cerr == nil { // Before dispatching the cancel to a worker thread, make sure it's a valid cancel - if ag, err := b.db.FindSingleAgreementByAgreementId(can.AgreementId(), can.Protocol(), []persistence.AgbotDBFilter{}); err != nil { + if ag, err := b.db.FindSingleAgreementByAgreementId(can.AgreementId(), can.Protocol(), []persistence.AFilter{}); err != nil { glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error finding agreement %v in the db", can.AgreementId()))) } else if ag == nil { glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("cancel ignored, cannot find agreement %v in the db", can.AgreementId()))) @@ -233,71 +230,68 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChanged(cmd *PolicyChangedComm glog.V(5).Infof(BCPHlogstring(b.Name(), "received policy changed command.")) - nextAgreementId := "" - for { - lastAgreementId, agreements, err := b.db.FindAgreementsPage([]persistence.AgbotDBFilter{b.db.GetActiveFilter(), b.db.GetUnarchivedFilter(), b.db.GetPolicyFilter(cmd.Msg.Org(), cmd.Msg.PolicyName())}, cph.Name(), nextAgreementId, b.config.GetAgbotDBLimit()) - if err != nil { - glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error searching database: %v", err))) - break - } + if eventPol, err := policy.DemarshalPolicy(cmd.Msg.PolicyString()); err != nil { + glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error demarshalling change policy event %v, error: %v", cmd.Msg.PolicyString(), err))) + } else { - // If there are no more agreements to iterate, then break out of the loop. - if lastAgreementId == "" { - break - } else { - nextAgreementId = lastAgreementId + InProgress := func() persistence.AFilter { + return func(e persistence.Agreement) bool { return e.AgreementCreationTime != 0 && e.AgreementTimedout == 0 } } - for _, ag := range agreements { + if agreements, err := b.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter(), InProgress()}, cph.Name()); err == nil { + for _, ag := range agreements { + + if pol, err := policy.DemarshalPolicy(ag.Policy); err != nil { + glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("unable to demarshal policy for agreement %v, error %v", ag.CurrentAgreementId, err))) + + } else if eventPol.Header.Name != pol.Header.Name { + // This agreement is using a policy different from the one that changed. + glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("policy change handler skipping agreement %v because it is using a policy that did not change.", ag.CurrentAgreementId))) + continue + } else if err := b.pm.MatchesMine(cmd.Msg.Org(), pol); err != nil { + glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a policy %v that has changed: %v", ag.CurrentAgreementId, pol.Header.Name, err))) + b.CancelAgreement(ag, TERM_REASON_POLICY_CHANGED, cph) + } else { + glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v, no policy content differences detected", ag.CurrentAgreementId))) + } - if pol, err := policy.DemarshalPolicy(ag.Policy); err != nil { - glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("unable to demarshal policy for agreement %v, error %v", ag.CurrentAgreementId, err))) - } else if err := b.pm.MatchesMine(cmd.Msg.Org(), pol); err != nil { - glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a policy %v that has changed: %v", ag.CurrentAgreementId, pol.Header.Name, err))) - b.CancelAgreement(ag, TERM_REASON_POLICY_CHANGED, cph) - } else { - glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("for agreement %v, no policy content differences detected", ag.CurrentAgreementId))) } - + } else { + glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error searching database: %v", err))) } } - } func (b *BaseConsumerProtocolHandler) HandlePolicyDeleted(cmd *PolicyDeletedCommand, cph ConsumerProtocolHandler) { glog.V(5).Infof(BCPHlogstring(b.Name(), "received policy deleted command.")) - nextAgreementId := "" - for { - lastAgreementId, agreements, err := b.db.FindAgreementsPage([]persistence.AgbotDBFilter{b.db.GetActiveFilter(), b.db.GetUnarchivedFilter(), b.db.GetPolicyFilter(cmd.Msg.Org(), cmd.Msg.PolicyName())}, cph.Name(), nextAgreementId, b.config.GetAgbotDBLimit()) - if err != nil { - glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error searching database: %v", err))) - break - } - - // If there are no more agreements to iterate, then break out of the loop. - if lastAgreementId == "" { - break - } else { - nextAgreementId = lastAgreementId - } + InProgress := func() persistence.AFilter { + return func(e persistence.Agreement) bool { return e.AgreementCreationTime != 0 && e.AgreementTimedout == 0 } + } + if agreements, err := b.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter(), InProgress()}, cph.Name()); err == nil { for _, ag := range agreements { - if existingPol := b.pm.GetPolicy(cmd.Msg.Org(), cmd.Msg.PolicyName()); existingPol == nil { - glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a policy %v that doesn't exist anymore", ag.CurrentAgreementId, cmd.Msg.PolicyName()))) + if pol, err := policy.DemarshalPolicy(ag.Policy); err != nil { + glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("unable to demarshal policy for agreement %v, error %v", ag.CurrentAgreementId, err))) + } else if cmd.Msg.Org() == ag.Org { + if existingPol := b.pm.GetPolicy(cmd.Msg.Org(), pol.Header.Name); existingPol == nil { + glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a policy %v that doesn't exist anymore", ag.CurrentAgreementId, pol.Header.Name))) - // Remove any workload usage records so that a new agreement will be made starting from the highest priority workload. - if err := b.db.DeleteWorkloadUsage(ag.DeviceId, ag.PolicyName); err != nil { - glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("error deleting workload usage for %v using policy %v, error: %v", ag.DeviceId, ag.PolicyName, err))) - } + // Remove any workload usage records so that a new agreement will be made starting from the highest priority workload. + if err := b.db.DeleteWorkloadUsage(ag.DeviceId, ag.PolicyName); err != nil { + glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("error deleting workload usage for %v using policy %v, error: %v", ag.DeviceId, ag.PolicyName, err))) + } - // Queue up a cancellation command for this agreement. - agreementWork := NewCancelAgreement(ag.CurrentAgreementId, ag.AgreementProtocol, cph.GetTerminationCode(TERM_REASON_POLICY_CHANGED), 0) - cph.WorkQueue().InboundHigh() <- &agreementWork + // Queue up a cancellation command for this agreement. + agreementWork := NewCancelAgreement(ag.CurrentAgreementId, ag.AgreementProtocol, cph.GetTerminationCode(TERM_REASON_POLICY_CHANGED), 0) + cph.WorkQueue().InboundHigh() <- &agreementWork + } } } + } else { + glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error searching database: %v", err))) } } @@ -305,76 +299,49 @@ func (b *BaseConsumerProtocolHandler) HandleServicePolicyChanged(cmd *ServicePol glog.V(5).Infof(BCPHlogstring(b.Name(), "received service policy changed command: %v. cmd")) - nextAgreementId := "" - for { - filters := []persistence.AgbotDBFilter{ - b.db.GetActiveFilter(), - b.db.GetUnarchivedFilter(), - b.db.GetNoPatternFilter(), - b.db.GetServiceIdFilter(cmd.Msg.ServiceId), - b.db.GetPolicyFilter(cmd.Msg.BusinessPolOrg, fmt.Sprintf("%v/%v", cmd.Msg.BusinessPolOrg, cmd.Msg.BusinessPolName)), - } - - lastAgreementId, agreements, err := b.db.FindAgreementsPage(filters, cph.Name(), nextAgreementId, b.config.GetAgbotDBLimit()) - if err != nil { - glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error searching database: %v", err))) - break - } - - // If there are no more agreements to iterate, then break out of the loop. - if lastAgreementId == "" { - break - } else { - nextAgreementId = lastAgreementId - } + InProgress := func() persistence.AFilter { + return func(e persistence.Agreement) bool { return e.AgreementCreationTime != 0 && e.AgreementTimedout == 0 } + } + if agreements, err := b.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter(), InProgress()}, cph.Name()); err == nil { for _, ag := range agreements { - glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a service policy %v that has changed.", ag.CurrentAgreementId, ag.ServiceId))) - b.CancelAgreement(ag, TERM_REASON_POLICY_CHANGED, cph) + if ag.Pattern == "" && ag.PolicyName == fmt.Sprintf("%v/%v", cmd.Msg.BusinessPolOrg, cmd.Msg.BusinessPolName) && ag.ServiceId[0] == cmd.Msg.ServiceId { + + glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a service policy %v that has changed.", ag.CurrentAgreementId, ag.ServiceId))) + b.CancelAgreement(ag, TERM_REASON_POLICY_CHANGED, cph) + } } + } else { + glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error searching database: %v", err))) } } func (b *BaseConsumerProtocolHandler) HandleServicePolicyDeleted(cmd *ServicePolicyDeletedCommand, cph ConsumerProtocolHandler) { - glog.V(5).Infof(BCPHlogstring(b.Name(), "received service policy deleted command.")) - - nextAgreementId := "" - for { - filters := []persistence.AgbotDBFilter{ - b.db.GetActiveFilter(), - b.db.GetUnarchivedFilter(), - b.db.GetNoPatternFilter(), - b.db.GetServiceIdFilter(cmd.Msg.ServiceId), - b.db.GetPolicyFilter(cmd.Msg.BusinessPolOrg, fmt.Sprintf("%v/%v", cmd.Msg.BusinessPolOrg, cmd.Msg.BusinessPolName)), - } - - lastAgreementId, agreements, err := b.db.FindAgreementsPage(filters, cph.Name(), nextAgreementId, b.config.GetAgbotDBLimit()) - if err != nil { - glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error searching database: %v", err))) - break - } + glog.V(5).Infof(BCPHlogstring(b.Name(), "received policy deleted command.")) - // If there are no more agreements to iterate, then break out of the loop. - if lastAgreementId == "" { - break - } else { - nextAgreementId = lastAgreementId - } + InProgress := func() persistence.AFilter { + return func(e persistence.Agreement) bool { return e.AgreementCreationTime != 0 && e.AgreementTimedout == 0 } + } + if agreements, err := b.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter(), InProgress()}, cph.Name()); err == nil { for _, ag := range agreements { - glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a service policy %v that doesn't exist anymore", ag.CurrentAgreementId, ag.ServiceId))) + if ag.Pattern == "" && ag.PolicyName == fmt.Sprintf("%v/%v", cmd.Msg.BusinessPolOrg, cmd.Msg.BusinessPolName) && ag.ServiceId[0] == cmd.Msg.ServiceId { + glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a service policy %v that doesn't exist anymore", ag.CurrentAgreementId, ag.ServiceId))) - // Remove any workload usage records so that a new agreement will be made starting from the highest priority workload. - if err := b.db.DeleteWorkloadUsage(ag.DeviceId, ag.PolicyName); err != nil { - glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("error deleting workload usage for %v using policy %v, error: %v", ag.DeviceId, ag.PolicyName, err))) - } + // Remove any workload usage records so that a new agreement will be made starting from the highest priority workload. + if err := b.db.DeleteWorkloadUsage(ag.DeviceId, ag.PolicyName); err != nil { + glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("error deleting workload usage for %v using policy %v, error: %v", ag.DeviceId, ag.PolicyName, err))) + } - // Queue up a cancellation command for this agreement. - agreementWork := NewCancelAgreement(ag.CurrentAgreementId, ag.AgreementProtocol, cph.GetTerminationCode(TERM_REASON_POLICY_CHANGED), 0) - cph.WorkQueue().InboundHigh() <- &agreementWork + // Queue up a cancellation command for this agreement. + agreementWork := NewCancelAgreement(ag.CurrentAgreementId, ag.AgreementProtocol, cph.GetTerminationCode(TERM_REASON_POLICY_CHANGED), 0) + cph.WorkQueue().InboundHigh() <- &agreementWork + } } + } else { + glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error searching database: %v", err))) } } @@ -419,7 +386,7 @@ func (b *BaseConsumerProtocolHandler) CancelAgreement(ag persistence.Agreement, func (b *BaseConsumerProtocolHandler) HandleWorkloadUpgrade(cmd *WorkloadUpgradeCommand, cph ConsumerProtocolHandler) { glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("received workload upgrade command."))) - upgradeWork := NewHandleWorkloadUpgrade(cmd.Msg.AgreementId, cmd.Msg.AgreementProtocol, cmd.Msg.DeviceId, cmd.Msg.Org, cmd.Msg.PolicyName) + upgradeWork := NewHandleWorkloadUpgrade(cmd.Msg.AgreementId, cmd.Msg.AgreementProtocol, cmd.Msg.DeviceId, cmd.Msg.PolicyName) cph.WorkQueue().InboundHigh() <- &upgradeWork glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("queued workload upgrade command."))) } @@ -578,14 +545,10 @@ func (b *BaseConsumerProtocolHandler) getDevice(deviceId string, workerId string } func (b *BaseConsumerProtocolHandler) DeferCommand(cmd AgreementWork) { - b.dcLock.Lock() - defer b.dcLock.Unlock() b.deferredCommands = append(b.deferredCommands, cmd) } func (b *BaseConsumerProtocolHandler) GetDeferredCommands() []AgreementWork { - b.dcLock.Lock() - defer b.dcLock.Unlock() res := b.deferredCommands b.deferredCommands = make([]AgreementWork, 0, 10) return res diff --git a/agreementbot/governance.go b/agreementbot/governance.go index 63b152bd8..ba09c3859 100644 --- a/agreementbot/governance.go +++ b/agreementbot/governance.go @@ -16,7 +16,7 @@ import ( func (w *AgreementBotWorker) GovernAgreements() int { - unarchived := []persistence.AgbotDBFilter{w.db.GetUnarchivedFilter()} + unarchived := []persistence.AFilter{persistence.UnarchivedAFilter()} // The length of time this governance routine waits is based on several factors. The data verification check rate // of any agreements that are being maintained and the default time specified in the agbot config. Assume that we @@ -32,6 +32,11 @@ func (w *AgreementBotWorker) GovernAgreements() int { w.GovTiming.dvSkip = uint64(0) // Number of times to skip data verification checks before actually doing the check. w.GovTiming.nhSkip = uint64(0) // Number of times to skip node health checks before actually doing the check. + // A filter for limiting the returned set of agreements just to those that are in progress and not yet timed out. + notYetFinalFilter := func() persistence.AFilter { + return func(a persistence.Agreement) bool { return a.AgreementCreationTime != 0 && a.AgreementTimedout == 0 } + } + // Reset the updated status of the Node Health manager. This ensures that the agbot will attempt to get updated status // info from the exchange. The exchange might return no updates, but at least the agbot asked for updates. w.NHManager.ResetUpdateStatus() @@ -41,171 +46,150 @@ func (w *AgreementBotWorker) GovernAgreements() int { protocolHandler := w.consumerPH.Get(agp) - // For the given agreement protocol, get the node orgs in use by the set of all agreements, so that the node health manager - // can be queried later for node status. - if nodeOrgs, err := w.db.GetNodeOrgs([]persistence.AgbotDBFilter{w.db.GetUnarchivedFilter(), w.db.GetActiveFilter()}, agp); err != nil { - glog.Errorf(logString(fmt.Sprintf("unable to get node orgs from agreements in database, error: %v", err))) - continue - } else { - // Ensure the Node Health manager has up to date info. - glog.V(5).Infof("AgreementBot Governance saving the node orgs %v to the node health manager for all active agreements under %v protocol.", nodeOrgs, agp) - w.NHManager.SetNodeOrgs(nodeOrgs) - } - - nextAgreementId := "" - // Process agreements in batches of at most w.Config.GetAgbotDBLimit(). This ensure that we dont consume too much memory. - for { - // Find all agreements that are in progress. They might be waiting for a reply or not yet finalized on blockchain. - // The result set might have fewer agreements than the result set limit size. - if lastAgreementId, agreements, err := w.db.FindAgreementsPage([]persistence.AgbotDBFilter{w.db.GetActiveFilter(), w.db.GetUnarchivedFilter()}, agp, nextAgreementId, w.Config.GetAgbotDBLimit()); err == nil { - - // If there are no more agreements to iterate, then break out of the loop. - if lastAgreementId == "" { - break - } else { - nextAgreementId = lastAgreementId - } + // Find all agreements that are in progress. They might be waiting for a reply or not yet finalized on blockchain. + if agreements, err := w.db.FindAgreements([]persistence.AFilter{notYetFinalFilter(), persistence.UnarchivedAFilter()}, agp); err == nil { + activeDataVerification := true + allActiveAgreements := make(map[string][]string) - activeDataVerification := true - allActiveAgreements := make(map[string][]string) + // set the node orgs for the given agreement protocol + glog.V(5).Infof("AgreementBot Governance saving the node orgs to the node health manager for all active agreements under %v protocol.", agp) + w.NHManager.SetNodeOrgs(agreements, agp) - for _, ag := range agreements { + for _, ag := range agreements { - // Govern agreements that have seen a reply from the device - if protocolHandler.AlreadyReceivedReply(&ag) { + // Govern agreements that have seen a reply from the device + if protocolHandler.AlreadyReceivedReply(&ag) { - // For agreements that havent seen a blockchain write yet, check timeout - if ag.AgreementFinalizedTime == 0 { + // For agreements that havent seen a blockchain write yet, check timeout + if ag.AgreementFinalizedTime == 0 { - glog.V(5).Infof("AgreementBot Governance detected agreement %v not yet final.", ag.CurrentAgreementId) - now := uint64(time.Now().Unix()) - if ag.AgreementCreationTime+w.BaseWorker.Manager.Config.AgreementBot.AgreementTimeoutS < now { - // Start timing out the agreement - w.TerminateAgreement(&ag, protocolHandler.GetTerminationCode(TERM_REASON_NOT_FINALIZED_TIMEOUT)) - } + glog.V(5).Infof("AgreementBot Governance detected agreement %v not yet final.", ag.CurrentAgreementId) + now := uint64(time.Now().Unix()) + if ag.AgreementCreationTime+w.BaseWorker.Manager.Config.AgreementBot.AgreementTimeoutS < now { + // Start timing out the agreement + w.TerminateAgreement(&ag, protocolHandler.GetTerminationCode(TERM_REASON_NOT_FINALIZED_TIMEOUT)) } + } - // Do DV check only if not skipping it this time. - if w.GovTiming.dvSkip == 0 { + // Do DV check only if not skipping it this time. + if w.GovTiming.dvSkip == 0 { - // Check for the receipt of data in the data ingest system (if necessary) - if !ag.DisableDataVerificationChecks { + // Check for the receipt of data in the data ingest system (if necessary) + if !ag.DisableDataVerificationChecks { - // Capture the data verification check rate for later - if discoveredDVWaitTime == 0 || (discoveredDVWaitTime != 0 && uint64(ag.DataVerificationCheckRate) < discoveredDVWaitTime) { - discoveredDVWaitTime = uint64(ag.DataVerificationCheckRate) - } + // Capture the data verification check rate for later + if discoveredDVWaitTime == 0 || (discoveredDVWaitTime != 0 && uint64(ag.DataVerificationCheckRate) < discoveredDVWaitTime) { + discoveredDVWaitTime = uint64(ag.DataVerificationCheckRate) + } - // First check to see if this agreement is just not sending data. If so, terminate the agreement. - now := uint64(time.Now().Unix()) - noDataLimit := w.BaseWorker.Manager.Config.AgreementBot.NoDataIntervalS - if ag.DataVerificationNoDataInterval != 0 { - noDataLimit = uint64(ag.DataVerificationNoDataInterval) - } - if now-ag.DataVerifiedTime >= noDataLimit { - // No data is being received, terminate the agreement - glog.V(3).Infof(logString(fmt.Sprintf("cancelling agreement %v due to lack of data", ag.CurrentAgreementId))) - w.TerminateAgreement(&ag, protocolHandler.GetTerminationCode(TERM_REASON_NO_DATA_RECEIVED)) - - } else if activeDataVerification { - // Otherwise make sure the device is still sending data - if ag.DataVerifiedTime+uint64(ag.DataVerificationCheckRate) > now { - // It's not time to check again - continue - } else if activeAgreements, err := GetActiveAgreements(allActiveAgreements, ag, w.BaseWorker.Manager.Config); err != nil { - glog.Errorf(logString(fmt.Sprintf("unable to retrieve active agreement list. Terminating data verification loop early, error: %v", err))) - activeDataVerification = false - } else if ActiveAgreementsContains(activeAgreements, ag, w.Config.AgreementBot.DVPrefix) { - if _, err := w.db.DataVerified(ag.CurrentAgreementId, agp); err != nil { - glog.Errorf(logString(fmt.Sprintf("unable to record data verification, error: %v", err))) + // First check to see if this agreement is just not sending data. If so, terminate the agreement. + now := uint64(time.Now().Unix()) + noDataLimit := w.BaseWorker.Manager.Config.AgreementBot.NoDataIntervalS + if ag.DataVerificationNoDataInterval != 0 { + noDataLimit = uint64(ag.DataVerificationNoDataInterval) + } + if now-ag.DataVerifiedTime >= noDataLimit { + // No data is being received, terminate the agreement + glog.V(3).Infof(logString(fmt.Sprintf("cancelling agreement %v due to lack of data", ag.CurrentAgreementId))) + w.TerminateAgreement(&ag, protocolHandler.GetTerminationCode(TERM_REASON_NO_DATA_RECEIVED)) + + } else if activeDataVerification { + // Otherwise make sure the device is still sending data + if ag.DataVerifiedTime+uint64(ag.DataVerificationCheckRate) > now { + // It's not time to check again + continue + } else if activeAgreements, err := GetActiveAgreements(allActiveAgreements, ag, w.BaseWorker.Manager.Config); err != nil { + glog.Errorf(logString(fmt.Sprintf("unable to retrieve active agreement list. Terminating data verification loop early, error: %v", err))) + activeDataVerification = false + } else if ActiveAgreementsContains(activeAgreements, ag, w.Config.AgreementBot.DVPrefix) { + if _, err := w.db.DataVerified(ag.CurrentAgreementId, agp); err != nil { + glog.Errorf(logString(fmt.Sprintf("unable to record data verification, error: %v", err))) + } + + if ag.DataNotificationSent == 0 { + // Get message address of the device from the exchange. The device ensures that the exchange is kept current. + // If the address happens to be invalid, that should be a temporary condition. We will keep sending until + // we get an ack to our verification message. + if whisperTo, pubkeyTo, err := protocolHandler.GetDeviceMessageEndpoint(ag.DeviceId, "Governance"); err != nil { + glog.Errorf(logString(fmt.Sprintf("error obtaining message target for data notification: %v", err))) + } else if mt, err := exchange.CreateMessageTarget(ag.DeviceId, nil, pubkeyTo, whisperTo); err != nil { + glog.Errorf(logString(fmt.Sprintf("error creating message target: %v", err))) + } else if err := protocolHandler.AgreementProtocolHandler("", "", "").NotifyDataReceipt(ag.CurrentAgreementId, mt, protocolHandler.GetSendMessage()); err != nil { + glog.Errorf(logString(fmt.Sprintf("unable to send data notification, error: %v", err))) } + } - if ag.DataNotificationSent == 0 { - // Get message address of the device from the exchange. The device ensures that the exchange is kept current. - // If the address happens to be invalid, that should be a temporary condition. We will keep sending until - // we get an ack to our verification message. - if whisperTo, pubkeyTo, err := protocolHandler.GetDeviceMessageEndpoint(ag.DeviceId, "Governance"); err != nil { - glog.Errorf(logString(fmt.Sprintf("error obtaining message target for data notification: %v", err))) + // Check to see if it's time to send a metering notification + // Create Metering notification. If the policy is empty, there's nothing to do. + mp := policy.Meter{Tokens: ag.MeteringTokens, PerTimeUnit: ag.MeteringPerTimeUnit, NotificationIntervalS: ag.MeteringNotificationInterval} + if mp.IsEmpty() { + continue + } else if ag.MeteringNotificationSent == 0 || (ag.MeteringNotificationSent != 0 && (ag.MeteringNotificationSent+uint64(ag.MeteringNotificationInterval)) <= now) { + // Grab the blockchain info from the agreement if there is any + + bcType, bcName, bcOrg := protocolHandler.GetKnownBlockchain(&ag) + glog.V(5).Info(logString(fmt.Sprintf("metering on %v %v", bcType, bcName))) + + // If we can write to the blockchain then we have all the info we need to do metering. + if protocolHandler.IsBlockchainWritable(bcType, bcName, bcOrg) && protocolHandler.CanSendMeterRecord(&ag) { + if mn, err := protocolHandler.CreateMeteringNotification(mp, &ag); err != nil { + glog.Errorf(logString(fmt.Sprintf("unable to create metering notification, error: %v", err))) + } else if whisperTo, pubkeyTo, err := protocolHandler.GetDeviceMessageEndpoint(ag.DeviceId, "Governance"); err != nil { + glog.Errorf(logString(fmt.Sprintf("error obtaining message target for metering notification: %v", err))) } else if mt, err := exchange.CreateMessageTarget(ag.DeviceId, nil, pubkeyTo, whisperTo); err != nil { glog.Errorf(logString(fmt.Sprintf("error creating message target: %v", err))) - } else if err := protocolHandler.AgreementProtocolHandler("", "", "").NotifyDataReceipt(ag.CurrentAgreementId, mt, protocolHandler.GetSendMessage()); err != nil { - glog.Errorf(logString(fmt.Sprintf("unable to send data notification, error: %v", err))) - } - } - - // Check to see if it's time to send a metering notification - // Create Metering notification. If the policy is empty, there's nothing to do. - mp := policy.Meter{Tokens: ag.MeteringTokens, PerTimeUnit: ag.MeteringPerTimeUnit, NotificationIntervalS: ag.MeteringNotificationInterval} - if mp.IsEmpty() { - continue - } else if ag.MeteringNotificationSent == 0 || (ag.MeteringNotificationSent != 0 && (ag.MeteringNotificationSent+uint64(ag.MeteringNotificationInterval)) <= now) { - // Grab the blockchain info from the agreement if there is any - - bcType, bcName, bcOrg := protocolHandler.GetKnownBlockchain(&ag) - glog.V(5).Info(logString(fmt.Sprintf("metering on %v %v", bcType, bcName))) - - // If we can write to the blockchain then we have all the info we need to do metering. - if protocolHandler.IsBlockchainWritable(bcType, bcName, bcOrg) && protocolHandler.CanSendMeterRecord(&ag) { - if mn, err := protocolHandler.CreateMeteringNotification(mp, &ag); err != nil { - glog.Errorf(logString(fmt.Sprintf("unable to create metering notification, error: %v", err))) - } else if whisperTo, pubkeyTo, err := protocolHandler.GetDeviceMessageEndpoint(ag.DeviceId, "Governance"); err != nil { - glog.Errorf(logString(fmt.Sprintf("error obtaining message target for metering notification: %v", err))) - } else if mt, err := exchange.CreateMessageTarget(ag.DeviceId, nil, pubkeyTo, whisperTo); err != nil { - glog.Errorf(logString(fmt.Sprintf("error creating message target: %v", err))) - } else if msg, err := protocolHandler.AgreementProtocolHandler(bcType, bcName, bcOrg).NotifyMetering(ag.CurrentAgreementId, mn, mt, protocolHandler.GetSendMessage()); err != nil { - glog.Errorf(logString(fmt.Sprintf("unable to send metering notification, error: %v", err))) - } else if _, err := w.db.MeteringNotification(ag.CurrentAgreementId, agp, msg); err != nil { - glog.Errorf(logString(fmt.Sprintf("unable to record metering notification, error: %v", err))) - } + } else if msg, err := protocolHandler.AgreementProtocolHandler(bcType, bcName, bcOrg).NotifyMetering(ag.CurrentAgreementId, mn, mt, protocolHandler.GetSendMessage()); err != nil { + glog.Errorf(logString(fmt.Sprintf("unable to send metering notification, error: %v", err))) + } else if _, err := w.db.MeteringNotification(ag.CurrentAgreementId, agp, msg); err != nil { + glog.Errorf(logString(fmt.Sprintf("unable to record metering notification, error: %v", err))) } } + } - // Data verification has occured. If it has been maintained for the specified duration then we can turn off the - // workload rollback retry checking feature. - if wlUsage, err := w.db.FindSingleWorkloadUsageByDeviceAndPolicyName(ag.DeviceId, ag.PolicyName); err != nil { - glog.Errorf(logString(fmt.Sprintf("unable to find workload usage record, error: %v", err))) - } else if wlUsage != nil && !wlUsage.DisableRetry { - if wlUsage.VerifiedDurationS == 0 || (wlUsage.VerifiedDurationS != 0 && ag.DataNotificationSent != 0 && ag.DataVerifiedTime != ag.AgreementCreationTime && (ag.DataVerifiedTime > ag.DataNotificationSent) && ((ag.DataVerifiedTime - ag.DataNotificationSent) >= uint64(wlUsage.VerifiedDurationS))) { - glog.V(5).Infof(logString(fmt.Sprintf("disabling workload rollback for %v after %v seconds", ag.CurrentAgreementId, (ag.DataVerifiedTime - ag.DataNotificationSent)))) - if _, err := w.db.DisableRollbackChecking(ag.DeviceId, ag.PolicyName); err != nil { - glog.Errorf(logString(fmt.Sprintf("unable to disable workload rollback retries, error: %v", err))) - } + // Data verification has occured. If it has been maintained for the specified duration then we can turn off the + // workload rollback retry checking feature. + if wlUsage, err := w.db.FindSingleWorkloadUsageByDeviceAndPolicyName(ag.DeviceId, ag.PolicyName); err != nil { + glog.Errorf(logString(fmt.Sprintf("unable to find workload usage record, error: %v", err))) + } else if wlUsage != nil && !wlUsage.DisableRetry { + if wlUsage.VerifiedDurationS == 0 || (wlUsage.VerifiedDurationS != 0 && ag.DataNotificationSent != 0 && ag.DataVerifiedTime != ag.AgreementCreationTime && (ag.DataVerifiedTime > ag.DataNotificationSent) && ((ag.DataVerifiedTime - ag.DataNotificationSent) >= uint64(wlUsage.VerifiedDurationS))) { + glog.V(5).Infof(logString(fmt.Sprintf("disabling workload rollback for %v after %v seconds", ag.CurrentAgreementId, (ag.DataVerifiedTime - ag.DataNotificationSent)))) + if _, err := w.db.DisableRollbackChecking(ag.DeviceId, ag.PolicyName); err != nil { + glog.Errorf(logString(fmt.Sprintf("unable to disable workload rollback retries, error: %v", err))) } } - - } else if _, err := w.db.DataNotVerified(ag.CurrentAgreementId, agp); err != nil { - glog.Errorf(logString(fmt.Sprintf("unable to record data not verified, error: %v", err))) } + + } else if _, err := w.db.DataNotVerified(ag.CurrentAgreementId, agp); err != nil { + glog.Errorf(logString(fmt.Sprintf("unable to record data not verified, error: %v", err))) } } } + } - // Do node health check only if not skipping it this time. - if w.GovTiming.nhSkip == 0 { - // Check for agreement termination based on node health issues. Checking node health might require an expensive - // call to the exchange for batch node status, so only do the health checks if we have to. - if checkrate, err := w.VerifyNodeHealth(&ag, protocolHandler); err != nil { - glog.Errorf(logString(fmt.Sprintf("unable to verify node health for %v, error: %v", ag.CurrentAgreementId, err))) - } else if checkrate != 0 && (discoveredNHWaitTime == 0 || (discoveredNHWaitTime != 0 && uint64(checkrate) < discoveredNHWaitTime)) { - discoveredNHWaitTime = uint64(checkrate) - } + // Do node health check only if not skipping it this time. + if w.GovTiming.nhSkip == 0 { + // Check for agreement termination based on node health issues. Checking node health might require an expensive + // call to the exchange for batch node status, so only do the health checks if we have to. + if checkrate, err := w.VerifyNodeHealth(&ag, protocolHandler); err != nil { + glog.Errorf(logString(fmt.Sprintf("unable to verify node health for %v, error: %v", ag.CurrentAgreementId, err))) + } else if checkrate != 0 && (discoveredNHWaitTime == 0 || (discoveredNHWaitTime != 0 && uint64(checkrate) < discoveredNHWaitTime)) { + discoveredNHWaitTime = uint64(checkrate) } + } - // Govern agreements that havent seen a proposal reply yet - } else { - // We are waiting for a reply - glog.V(5).Infof("AgreementBot Governance waiting for reply to %v.", ag.CurrentAgreementId) - now := uint64(time.Now().Unix()) - if ag.AgreementCreationTime+w.BaseWorker.Manager.Config.AgreementBot.ProtocolTimeoutS < now { - w.retryAgreements.AddRetry(ag.PolicyName, ag.DeviceId) - w.TerminateAgreement(&ag, protocolHandler.GetTerminationCode(TERM_REASON_NO_REPLY)) - } + // Govern agreements that havent seen a proposal reply yet + } else { + // We are waiting for a reply + glog.V(5).Infof("AgreementBot Governance waiting for reply to %v.", ag.CurrentAgreementId) + now := uint64(time.Now().Unix()) + if ag.AgreementCreationTime+w.BaseWorker.Manager.Config.AgreementBot.ProtocolTimeoutS < now { + w.retryAgreements.AddRetry(ag.PolicyName, ag.DeviceId) + w.TerminateAgreement(&ag, protocolHandler.GetTerminationCode(TERM_REASON_NO_REPLY)) } } - } else { - glog.Errorf(logString(fmt.Sprintf("unable to read agreements from database, error: %v", err))) - break } + } else { + glog.Errorf(logString(fmt.Sprintf("unable to read agreements from database, error: %v", err))) } } @@ -358,7 +342,7 @@ func (w *AgreementBotWorker) checkWorkloadUsageAgreement(partnerWLU *persistence partnerUpgrading := "" upgradedPartnerFound := "" - if ag, err := w.db.FindSingleAgreementByAgreementIdAllProtocols(partnerWLU.CurrentAgreementId, policy.AllAgreementProtocols(), []persistence.AgbotDBFilter{w.db.GetUnarchivedFilter()}); err != nil { + if ag, err := w.db.FindSingleAgreementByAgreementIdAllProtocols(partnerWLU.CurrentAgreementId, policy.AllAgreementProtocols(), []persistence.AFilter{persistence.UnarchivedAFilter()}); err != nil { glog.Errorf(logString(fmt.Sprintf("unable to read agreement %v from database, error: %v", partnerWLU.CurrentAgreementId, err))) } else if ag == nil { // If we dont find an agreement for a partner, then it is because a previous agreement with that partner has failed and we @@ -486,25 +470,17 @@ func (w *AgreementBotWorker) GovernArchivedAgreements() int { glog.V(5).Infof(logString(fmt.Sprintf("archive purge scanning for agreements archived more than %v hour(s) ago.", ageLimit))) + // A filter for limiting the returned set of agreements to just those that are too old. + agedOutFilter := func(now int64, limitH int) persistence.AFilter { + return func(a persistence.Agreement) bool { + return a.AgreementTimedout != 0 && (a.AgreementTimedout+uint64(limitH*3600) <= uint64(now)) + } + } + // Find all archived agreements that are old enough and delete them. for _, agp := range policy.AllAgreementProtocols() { now := time.Now().Unix() - - nextAgreementId := "" - for { - lastAgreementId, agreements, err := w.db.FindAgreementsPage([]persistence.AgbotDBFilter{w.db.GetAgedOutFilter(now, ageLimit), w.db.GetArchivedFilter()}, agp, nextAgreementId, w.Config.GetAgbotDBLimit()) - if err != nil { - glog.Errorf(logString(fmt.Sprintf("unable to read archived agreements from database for protocol %v, error: %v", agp, err))) - break - } - - // If there are no more agreements to iterate, then break out of the loop. - if lastAgreementId == "" { - break - } else { - nextAgreementId = lastAgreementId - } - + if agreements, err := w.db.FindAgreements([]persistence.AFilter{persistence.ArchivedAFilter(), agedOutFilter(now, ageLimit)}, agp); err == nil { for _, ag := range agreements { if err := w.db.DeleteAgreement(ag.CurrentAgreementId, agp); err != nil { glog.Error(logString(fmt.Sprintf("error deleting archived agreement %v, error: %v", ag.CurrentAgreementId, err))) @@ -512,6 +488,9 @@ func (w *AgreementBotWorker) GovernArchivedAgreements() int { glog.V(3).Infof(logString(fmt.Sprintf("archive purge deleted %v", ag.CurrentAgreementId))) } } + + } else { + glog.Errorf(logString(fmt.Sprintf("unable to read archived agreements from database for protocol %v, error: %v", agp, err))) } } return 0 @@ -531,22 +510,7 @@ func (w *AgreementBotWorker) GovernBlockchainNeeds() int { // Make a map of all blockchain names that we need to have running neededBCs := make(map[string]map[string]bool) - - nextAgreementId := "" - for { - lastAgreementId, agreements, err := w.db.FindAgreementsPage([]persistence.AgbotDBFilter{w.db.GetUnarchivedFilter()}, agp, nextAgreementId, w.Config.GetAgbotDBLimit()) - if err != nil { - glog.Errorf(logString(fmt.Sprintf("unable to read agreements from database for protocol %v, error: %v", agp, err))) - break - } - - // If there are no more agreements to iterate, then break out of the loop. - if lastAgreementId == "" { - break - } else { - nextAgreementId = lastAgreementId - } - + if agreements, err := w.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter()}, agp); err == nil { for _, ag := range agreements { _, bcName, bcOrg := w.consumerPH.Get(agp).GetKnownBlockchain(&ag) if bcName != "" { @@ -556,11 +520,14 @@ func (w *AgreementBotWorker) GovernBlockchainNeeds() int { neededBCs[bcOrg][bcName] = true } } - } - // If we captured any needed blockchains, inform the blockchain worker - if len(neededBCs) != 0 { - w.Messages() <- events.NewReportNeededBlockchainsMessage(events.BC_NEEDED, bcType, neededBCs) + // If we captured any needed blockchains, inform the blockchain worker + if len(neededBCs) != 0 { + w.Messages() <- events.NewReportNeededBlockchainsMessage(events.BC_NEEDED, bcType, neededBCs) + } + + } else { + glog.Errorf(logString(fmt.Sprintf("unable to read agreements from database for protocol %v, error: %v", agp, err))) } } diff --git a/agreementbot/nodehealth_manager.go b/agreementbot/nodehealth_manager.go index 192b434be..f0d5a7f5b 100644 --- a/agreementbot/nodehealth_manager.go +++ b/agreementbot/nodehealth_manager.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "github.com/golang/glog" + "github.com/open-horizon/anax/agreementbot/persistence" "github.com/open-horizon/anax/cutil" "github.com/open-horizon/anax/exchange" "time" @@ -182,8 +183,24 @@ func (m *NodeHealthManager) setNewStatus(pattern string, org string, lastCall st } // set the node orgs for patterns for current active agreements under the given agreement protocol -func (m *NodeHealthManager) SetNodeOrgs(nodeOrgs map[string][]string) { - m.NodeOrgs = nodeOrgs +func (m *NodeHealthManager) SetNodeOrgs(agreements []persistence.Agreement, agreementProtocol string) { + + tmpNodeOrgs := map[string][]string{} + + for _, ag := range agreements { + patternKey := getKey(ag.Pattern, ag.Org) + nodeOrg := exchange.GetOrg(ag.DeviceId) + if nodeOrgs, ok := tmpNodeOrgs[patternKey]; !ok { + tmpNodeOrgs[patternKey] = []string{nodeOrg} + } else { + if !stringSliceContains(nodeOrgs, nodeOrg) { + nodeOrgs = append(nodeOrgs, nodeOrg) + tmpNodeOrgs[patternKey] = nodeOrgs + } + } + } + + m.NodeOrgs = tmpNodeOrgs } // check if a slice contains a string diff --git a/agreementbot/nodehealth_manager_test.go b/agreementbot/nodehealth_manager_test.go index eff8f6403..dcbc00933 100644 --- a/agreementbot/nodehealth_manager_test.go +++ b/agreementbot/nodehealth_manager_test.go @@ -4,8 +4,10 @@ package agreementbot import ( "flag" + "github.com/open-horizon/anax/agreementbot/persistence" "github.com/open-horizon/anax/cutil" "github.com/open-horizon/anax/exchange" + "github.com/open-horizon/anax/policy" "testing" "time" ) @@ -246,27 +248,20 @@ func Test_SetNodeOrgs(t *testing.T) { t.Errorf("patterns map should be empty") } - // ag1, _ := persistence.NewAgreement("agreement_id1", "pattern_org1", "node_org1/device1", "device", "", "", "", "", "basic", "pattern_org1/sall", []string{""}, policy.NodeHealth{}) - // ag2, _ := persistence.NewAgreement("agreement_id2", "pattern_org1", "node_org1/device2", "device", "", "", "", "", "basic", "pattern_org1/sall", []string{""}, policy.NodeHealth{}) - // ag3, _ := persistence.NewAgreement("agreement_id3", "pattern_org1", "node_org2/device1", "device", "", "", "", "", "basic", "pattern_org1/sall", []string{""}, policy.NodeHealth{}) - // ag4, _ := persistence.NewAgreement("agreement_id4", "pattern_org1", "node_org2/device2", "device", "", "", "", "", "basic", "pattern_org1/sall", []string{""}, policy.NodeHealth{}) - // ag5, _ := persistence.NewAgreement("agreement_id5", "pattern_org2", "node_org1/device1", "device", "", "", "", "", "basic", "pattern_org2/sall", []string{""}, policy.NodeHealth{}) - // ag6, _ := persistence.NewAgreement("agreement_id6", "pattern_org2", "node_org1/device1", "device", "", "", "", "", "basic", "pattern_org2/sall", []string{""}, policy.NodeHealth{}) - // ag7, _ := persistence.NewAgreement("agreement_id7", "pattern_org1", "node_org1/device2", "device", "", "", "", "", "basic", "pattern_org1/netspeed", []string{""}, policy.NodeHealth{}) - // ag8, _ := persistence.NewAgreement("agreement_id8", "pattern_org1", "node_org2/device2", "device", "", "", "", "", "basic", "pattern_org1/netspeed", []string{""}, policy.NodeHealth{}) - // ag9, _ := persistence.NewAgreement("agreement_id9", "org1", "node_org2/device3", "device", "", "", "", "", "basic", "", []string{""}, policy.NodeHealth{}) - // ag10, _ := persistence.NewAgreement("agreement_id10", "org1", "node_org3/device3", "device", "", "", "", "", "basic", "", []string{""}, policy.NodeHealth{}) - - // agreements := []persistence.Agreement{*ag1, *ag2, *ag3, *ag4, *ag5, *ag6, *ag7, *ag8, *ag9, *ag10} - - nodeOrgs := map[string][]string{ - "pattern_org1/sall": []string{"node_org1", "node_org2"}, - "pattern_org2/sall": []string{"node_org1"}, - "pattern_org1/netspeed": []string{"node_org1", "node_org2"}, - "org1": []string{"node_org2", "node_org3"}, - } + ag1, _ := persistence.NewAgreement("agreement_id1", "pattern_org1", "node_org1/device1", "device", "", "", "", "", "basic", "pattern_org1/sall", []string{""}, policy.NodeHealth{}) + ag2, _ := persistence.NewAgreement("agreement_id2", "pattern_org1", "node_org1/device2", "device", "", "", "", "", "basic", "pattern_org1/sall", []string{""}, policy.NodeHealth{}) + ag3, _ := persistence.NewAgreement("agreement_id3", "pattern_org1", "node_org2/device1", "device", "", "", "", "", "basic", "pattern_org1/sall", []string{""}, policy.NodeHealth{}) + ag4, _ := persistence.NewAgreement("agreement_id4", "pattern_org1", "node_org2/device2", "device", "", "", "", "", "basic", "pattern_org1/sall", []string{""}, policy.NodeHealth{}) + ag5, _ := persistence.NewAgreement("agreement_id5", "pattern_org2", "node_org1/device1", "device", "", "", "", "", "basic", "pattern_org2/sall", []string{""}, policy.NodeHealth{}) + ag6, _ := persistence.NewAgreement("agreement_id6", "pattern_org2", "node_org1/device1", "device", "", "", "", "", "basic", "pattern_org2/sall", []string{""}, policy.NodeHealth{}) + ag7, _ := persistence.NewAgreement("agreement_id7", "pattern_org1", "node_org1/device2", "device", "", "", "", "", "basic", "pattern_org1/netspeed", []string{""}, policy.NodeHealth{}) + ag8, _ := persistence.NewAgreement("agreement_id8", "pattern_org1", "node_org2/device2", "device", "", "", "", "", "basic", "pattern_org1/netspeed", []string{""}, policy.NodeHealth{}) + ag9, _ := persistence.NewAgreement("agreement_id9", "org1", "node_org2/device3", "device", "", "", "", "", "basic", "", []string{""}, policy.NodeHealth{}) + ag10, _ := persistence.NewAgreement("agreement_id10", "org1", "node_org3/device3", "device", "", "", "", "", "basic", "", []string{""}, policy.NodeHealth{}) + + agreements := []persistence.Agreement{*ag1, *ag2, *ag3, *ag4, *ag5, *ag6, *ag7, *ag8, *ag9, *ag10} - nhm.SetNodeOrgs(nodeOrgs) + nhm.SetNodeOrgs(agreements, "basic") patternNodeOrgs := nhm.NodeOrgs @@ -312,7 +307,7 @@ func Test_SetNodeOrgs(t *testing.T) { t.Errorf("expected org1 has node org called node_org3 in map but not.") } - nhm.SetNodeOrgs(map[string][]string{}) + nhm.SetNodeOrgs([]persistence.Agreement{}, "basic") if len(nhm.NodeOrgs) != 0 { t.Errorf("expected patternNodeOrgs has 0 keys but found %v", len(nhm.NodeOrgs)) } diff --git a/agreementbot/persistence/agreement.go b/agreementbot/persistence/agreement.go index 4a8d02154..c5743f3e9 100644 --- a/agreementbot/persistence/agreement.go +++ b/agreementbot/persistence/agreement.go @@ -434,23 +434,30 @@ func ValidateStateTransition(mod *Agreement, update *Agreement) { } } -// Utility function to run a list of filters on an agreement, and decide if the agreement -// should be kept in the result set or not. -func RunFilters(ag *Agreement, filters []AgbotDBFilter) *Agreement { - for _, filter := range filters { - if !filter.KeepResult(ag) { +// Filters used by the caller to control what comes back from the database. +type AFilter func(Agreement) bool + +func UnarchivedAFilter() AFilter { + return func(e Agreement) bool { return !e.Archived } +} + +func ArchivedAFilter() AFilter { + return func(e Agreement) bool { return e.Archived } +} + +func IdAFilter(id string) AFilter { + return func(a Agreement) bool { return a.CurrentAgreementId == id } +} + +func DevPolAFilter(deviceId string, policyName string) AFilter { + return func(a Agreement) bool { return a.DeviceId == deviceId && a.PolicyName == policyName } +} + +func RunFilters(ag *Agreement, filters []AFilter) *Agreement { + for _, filterFn := range filters { + if !filterFn(*ag) { return nil } } return ag } - -// Utility function to add SQL filters on the given query. This function assumes that the input SQL -// statement already has a WHERE clause on it. -func RunSQLFilters(sqlIn string, filters []AgbotDBFilter) string { - sql := sqlIn - for _, filter := range filters { - sql = filter.ConditionSQL(sql) - } - return sql -} diff --git a/agreementbot/persistence/bolt/agreement.go b/agreementbot/persistence/bolt/agreement.go index 9baf5b773..82a4c5683 100644 --- a/agreementbot/persistence/bolt/agreement.go +++ b/agreementbot/persistence/bolt/agreement.go @@ -6,8 +6,6 @@ import ( "github.com/boltdb/bolt" "github.com/golang/glog" "github.com/open-horizon/anax/agreementbot/persistence" - "github.com/open-horizon/anax/cutil" - "github.com/open-horizon/anax/exchange" "github.com/open-horizon/anax/policy" ) @@ -33,9 +31,9 @@ func (db *AgbotBoltDB) String() string { func (db *AgbotBoltDB) GetAgreementCount(partition string) (int64, int64, error) { var activeNum, archivedNum int64 for _, protocol := range policy.AllAgreementProtocols() { - if _, activeAgreements, err := db.FindAgreementsPage([]persistence.AgbotDBFilter{db.GetUnarchivedFilter()}, protocol, "", 0); err != nil { + if activeAgreements, err := db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter()}, protocol); err != nil { return 0, 0, err - } else if _, archivedAgreements, err := db.FindAgreementsPage([]persistence.AgbotDBFilter{db.GetArchivedFilter()}, protocol, "", 0); err != nil { + } else if archivedAgreements, err := db.FindAgreements([]persistence.AFilter{persistence.ArchivedAFilter()}, protocol); err != nil { return 0, 0, err } else { activeNum += int64(len(activeAgreements)) @@ -45,19 +43,9 @@ func (db *AgbotBoltDB) GetAgreementCount(partition string) (int64, int64, error) return activeNum, archivedNum, nil } -// Return all the agreements in the database according to the provided filters. The limit parameter is ignored for the bolt plugin. -func (db *AgbotBoltDB) FindAgreementsPage(filters []persistence.AgbotDBFilter, protocol string, nextId string, limit int) (string, []persistence.Agreement, error) { - +func (db *AgbotBoltDB) FindAgreements(filters []persistence.AFilter, protocol string) ([]persistence.Agreement, error) { agreements := make([]persistence.Agreement, 0) - lastId := "" - - // The Bolt DB plugin doesnt support pagination, if the caller is giving us a nextId, then just return an empty - // result set because we know we already gave the full result set on the previous call to this function. - if nextId != "" { - return "", agreements, nil - } - // Read all the records from the DB. readErr := db.db.View(func(tx *bolt.Tx) error { if b := tx.Bucket([]byte(bucketName(protocol))); b != nil { @@ -68,11 +56,16 @@ func (db *AgbotBoltDB) FindAgreementsPage(filters []persistence.AgbotDBFilter, p if err := json.Unmarshal(v, &a); err != nil { glog.Errorf("Unable to deserialize db record: %v", v) } else { - if lastId == "" || a.CurrentAgreementId > lastId { - lastId = a.CurrentAgreementId - } - if agPassed := persistence.RunFilters(&a, filters); agPassed != nil { + if !a.Archived { glog.V(5).Infof("Demarshalled agreement in DB: %v", a) + } + exclude := false + for _, filterFn := range filters { + if !filterFn(a) { + exclude = true + } + } + if !exclude { agreements = append(agreements, a) } } @@ -84,47 +77,10 @@ func (db *AgbotBoltDB) FindAgreementsPage(filters []persistence.AgbotDBFilter, p }) if readErr != nil { - return "", nil, readErr + return nil, readErr } else { - return lastId, agreements, nil - } - -} - -func (db *AgbotBoltDB) GetNodeOrgs(filters []persistence.AgbotDBFilter, protocol string) (map[string][]string, error) { - res := make(map[string][]string, 5) - - // Get all the agreements out of the DB. - _, ags, err := db.FindAgreementsPage(filters, protocol, "", 0) - if err != nil { - return res, err + return agreements, nil } - - // Iterate through each agreement and pull out the node org info that the caller is requesting. - for _, ag := range ags { - // Construct the response object based on the result set. For each pattern or policy org, collect the - // set of node orgs in use by each one. - - // The key to each map entry is the pattern or policy org. - key := ag.Pattern - if key == "" { - key = ag.Org - } - - // The value of each key is an array of node orgs. - nodeOrg := exchange.GetOrg(ag.DeviceId) - if nodeOrgs, ok := res[key]; !ok { - res[key] = []string{nodeOrg} - } else { - if !cutil.SliceContains(nodeOrgs, nodeOrg) { - nodeOrgs = append(nodeOrgs, nodeOrg) - res[key] = nodeOrgs - } - } - - } - - return res, nil } func (db *AgbotBoltDB) AgreementAttempt(agreementid string, org string, deviceid string, deviceType string, policyName string, bcType string, bcName string, bcOrg string, agreementProto string, pattern string, serviceId []string, nhPolicy policy.NodeHealth) error { @@ -182,10 +138,10 @@ func (db *AgbotBoltDB) ArchiveAgreement(agreementid string, protocol string, rea } // no error on not found, only nil -func (db *AgbotBoltDB) FindSingleAgreementByAgreementId(agreementid string, protocol string, filters []persistence.AgbotDBFilter) (*persistence.Agreement, error) { - filters = append(filters, db.GetAgreementIdFilter(agreementid)) +func (db *AgbotBoltDB) FindSingleAgreementByAgreementId(agreementid string, protocol string, filters []persistence.AFilter) (*persistence.Agreement, error) { + filters = append(filters, persistence.IdAFilter(agreementid)) - if _, agreements, err := db.FindAgreementsPage(filters, protocol, "", 0); err != nil { + if agreements, err := db.FindAgreements(filters, protocol); err != nil { return nil, err } else if len(agreements) > 1 { return nil, fmt.Errorf("Expected only one record for agreementid: %v, but retrieved: %v", agreementid, agreements) @@ -197,22 +153,25 @@ func (db *AgbotBoltDB) FindSingleAgreementByAgreementId(agreementid string, prot } // no error on not found, only nil -func (db *AgbotBoltDB) FindSingleAgreementByAgreementIdAllProtocols(agreementid string, protocols []string, filters []persistence.AgbotDBFilter) (*persistence.Agreement, error) { +func (db *AgbotBoltDB) FindSingleAgreementByAgreementIdAllProtocols(agreementid string, protocols []string, filters []persistence.AFilter) (*persistence.Agreement, error) { + filters = append(filters, persistence.IdAFilter(agreementid)) for _, protocol := range protocols { - if ag, err := db.FindSingleAgreementByAgreementId(agreementid, protocol, filters); err != nil { + if agreements, err := db.FindAgreements(filters, protocol); err != nil { return nil, err - } else if ag == nil { + } else if len(agreements) > 1 { + return nil, fmt.Errorf("Expected only one record for agreementid: %v, but retrieved: %v", agreementid, agreements) + } else if len(agreements) == 0 { continue } else { - return ag, nil + return &agreements[0], nil } } return nil, nil } func (db *AgbotBoltDB) SingleAgreementUpdate(agreementid string, protocol string, fn func(persistence.Agreement) *persistence.Agreement) (*persistence.Agreement, error) { - if agreement, err := db.FindSingleAgreementByAgreementId(agreementid, protocol, []persistence.AgbotDBFilter{}); err != nil { + if agreement, err := db.FindSingleAgreementByAgreementId(agreementid, protocol, []persistence.AFilter{}); err != nil { return nil, err } else if agreement == nil { return nil, fmt.Errorf("Unable to locate agreement id: %v", agreementid) diff --git a/agreementbot/persistence/bolt/filter.go b/agreementbot/persistence/bolt/filter.go deleted file mode 100644 index fcc1ad535..000000000 --- a/agreementbot/persistence/bolt/filter.go +++ /dev/null @@ -1,186 +0,0 @@ -package bolt - -import ( - "github.com/open-horizon/anax/agreementbot/persistence" -) - -// Filter implementations specific to bolt - -// Include unarchived agreements in the result set. -type UnarchivedFilter struct { -} - -func (f *UnarchivedFilter) ConditionSQL(sqlStr string) string { - return sqlStr -} - -func (f *UnarchivedFilter) KeepResult(a *persistence.Agreement) bool { - return a.Archived == false -} - -func (db *AgbotBoltDB) GetUnarchivedFilter() persistence.AgbotDBFilter { - return &UnarchivedFilter{} -} - -// Include archived agreements in the result set. -type ArchivedFilter struct { -} - -func (f *ArchivedFilter) ConditionSQL(sqlStr string) string { - return sqlStr -} - -func (f *ArchivedFilter) KeepResult(a *persistence.Agreement) bool { - return a.Archived == true -} - -func (db *AgbotBoltDB) GetArchivedFilter() persistence.AgbotDBFilter { - return &ArchivedFilter{} -} - -// Include active agreements in the result set. -type ActiveFilter struct { -} - -func (f *ActiveFilter) ConditionSQL(sqlStr string) string { - return sqlStr -} - -func (f *ActiveFilter) KeepResult(a *persistence.Agreement) bool { - return a.AgreementCreationTime != 0 && a.AgreementTimedout == 0 -} - -func (db *AgbotBoltDB) GetActiveFilter() persistence.AgbotDBFilter { - return &ActiveFilter{} -} - -// Exclude pattern based agreements from the result set. -type NoPatternFilter struct { -} - -func (f *NoPatternFilter) ConditionSQL(sqlStr string) string { - return sqlStr -} - -func (f *NoPatternFilter) KeepResult(a *persistence.Agreement) bool { - return a.Pattern == "" -} - -func (db *AgbotBoltDB) GetNoPatternFilter() persistence.AgbotDBFilter { - return &NoPatternFilter{} -} - -// Include old agreements that have been around for a certain period of time. -type AgedOutFilter struct { - now int64 - ageLimit int -} - -func (f *AgedOutFilter) ConditionSQL(sqlStr string) string { - return sqlStr -} - -func (f *AgedOutFilter) KeepResult(a *persistence.Agreement) bool { - return a.AgreementTimedout != 0 && (a.AgreementTimedout+uint64(f.ageLimit*3600) <= uint64(f.now)) -} - -func (db *AgbotBoltDB) GetAgedOutFilter(now int64, ageLimit int) persistence.AgbotDBFilter { - return &AgedOutFilter{ - now: now, - ageLimit: ageLimit, - } -} - -// Include agreements with a specific node. -type NodeFilter struct { - id string -} - -func (f *NodeFilter) ConditionSQL(sqlStr string) string { - return sqlStr -} - -func (f *NodeFilter) KeepResult(a *persistence.Agreement) bool { - return a.DeviceId == f.id -} - -func (db *AgbotBoltDB) GetNodeFilter(id string) persistence.AgbotDBFilter { - return &NodeFilter{ - id: id, - } -} - -// Include agreements with a specific policy. -type PolicyFilter struct { - org string - name string -} - -func (f *PolicyFilter) ConditionSQL(sqlStr string) string { - return sqlStr -} - -func (f *PolicyFilter) KeepResult(a *persistence.Agreement) bool { - return a.Org == f.org && a.PolicyName == f.name -} - -func (db *AgbotBoltDB) GetPolicyFilter(org string, name string) persistence.AgbotDBFilter { - return &PolicyFilter{ - org: org, - name: name, - } -} - -// Include agreements that are in the process of forming. -type PendingFilter struct { -} - -func (f *PendingFilter) ConditionSQL(sqlStr string) string { - return sqlStr -} - -func (f *PendingFilter) KeepResult(a *persistence.Agreement) bool { - return a.AgreementFinalizedTime == 0 && a.AgreementTimedout == 0 -} - -func (db *AgbotBoltDB) GetPendingFilter() persistence.AgbotDBFilter { - return &PendingFilter{} -} - -// Include only a specific agreement id. -type AgreementIdFilter struct { - id string -} - -func (f *AgreementIdFilter) ConditionSQL(sqlStr string) string { - return sqlStr -} - -func (f *AgreementIdFilter) KeepResult(a *persistence.Agreement) bool { - return a.CurrentAgreementId == f.id -} - -func (db *AgbotBoltDB) GetAgreementIdFilter(id string) persistence.AgbotDBFilter { - return &AgreementIdFilter{ - id: id, - } -} - -// Include only agreements for a specific service id. -type ServiceIdFilter struct { - id string -} - -func (f *ServiceIdFilter) ConditionSQL(sqlStr string) string { - return sqlStr -} - -func (f *ServiceIdFilter) KeepResult(a *persistence.Agreement) bool { - return a.ServiceId[0] == f.id -} - -func (db *AgbotBoltDB) GetServiceIdFilter(id string) persistence.AgbotDBFilter { - return &ServiceIdFilter{ - id: id, - } -} diff --git a/agreementbot/persistence/database.go b/agreementbot/persistence/database.go index 4c082fa0f..6a93b3716 100644 --- a/agreementbot/persistence/database.go +++ b/agreementbot/persistence/database.go @@ -26,10 +26,9 @@ type AgbotDatabase interface { MovePartition(timeout uint64) (bool, error) // Persistent agreement related functions - FindAgreementsPage(filters []AgbotDBFilter, protocol string, nextId string, limit int) (string, []Agreement, error) - GetNodeOrgs(filters []AgbotDBFilter, protocol string) (map[string][]string, error) - FindSingleAgreementByAgreementId(agreementid string, protocol string, filters []AgbotDBFilter) (*Agreement, error) - FindSingleAgreementByAgreementIdAllProtocols(agreementid string, protocols []string, ilters []AgbotDBFilter) (*Agreement, error) + FindAgreements(filters []AFilter, protocol string) ([]Agreement, error) + FindSingleAgreementByAgreementId(agreementid string, protocol string, filters []AFilter) (*Agreement, error) + FindSingleAgreementByAgreementIdAllProtocols(agreementid string, protocols []string, filters []AFilter) (*Agreement, error) GetAgreementCount(partition string) (int64, int64, error) @@ -51,18 +50,6 @@ type AgbotDatabase interface { DeleteAgreement(pk string, protocol string) error ArchiveAgreement(agreementid string, protocol string, reason uint, desc string) (*Agreement, error) - // Filters related to persistent agreement objects - GetUnarchivedFilter() AgbotDBFilter - GetArchivedFilter() AgbotDBFilter - GetActiveFilter() AgbotDBFilter - GetNoPatternFilter() AgbotDBFilter - GetAgedOutFilter(now int64, ageLimit int) AgbotDBFilter - GetNodeFilter(id string) AgbotDBFilter - GetPolicyFilter(org string, name string) AgbotDBFilter - GetPendingFilter() AgbotDBFilter - GetAgreementIdFilter(id string) AgbotDBFilter - GetServiceIdFilter(id string) AgbotDBFilter - // Workoad usage related functions NewWorkloadUsage(deviceId string, hapartners []string, policy string, policyName string, priority int, retryDurationS int, verifiedDurationS int, reqsNotMet bool, agid string) error FindSingleWorkloadUsageByDeviceAndPolicyName(deviceid string, policyName string) (*WorkloadUsage, error) @@ -81,10 +68,3 @@ type AgbotDatabase interface { DeleteWorkloadUsage(deviceid string, policyName string) error } - -type AgbotDBFilter interface { - // Modify the SQL statement to condition the result set. - ConditionSQL(sqlStr string) string - // Keep rows in the result set, if this function returns true. - KeepResult(ag *Agreement) bool -} diff --git a/agreementbot/persistence/postgresql/agreement.go b/agreementbot/persistence/postgresql/agreement.go index 8e412d78e..f341a43c6 100644 --- a/agreementbot/persistence/postgresql/agreement.go +++ b/agreementbot/persistence/postgresql/agreement.go @@ -7,8 +7,6 @@ import ( "fmt" "github.com/golang/glog" "github.com/open-horizon/anax/agreementbot/persistence" - "github.com/open-horizon/anax/cutil" - "github.com/open-horizon/anax/exchange" "github.com/open-horizon/anax/policy" "strings" ) @@ -105,13 +103,6 @@ const AGREEMENT_PARTITION_FILLIN = `partition_name` const AGREEMENT_QUERY = `SELECT agreement FROM "agreements_ WHERE agreement_id = $1 AND protocol = $2;` const ALL_AGREEMENTS_QUERY = `SELECT agreement FROM "agreements_ WHERE protocol = $1;` const AGREEMENT_PARTITION_EMPTY = `SELECT agreement_id FROM "agreements_;` -const AGREEMENT_PAGE1_QUERY_PART1 = `SELECT agreement FROM "agreements_ WHERE protocol = $1;` -const AGREEMENT_PAGE1_QUERY_PART2 = ` ORDER BY agreement_id limit $2;` -const AGREEMENT_PAGEn_QUERY_PART1 = `SELECT agreement FROM "agreements_ WHERE agreement_id > $1 AND protocol = $2;` -const AGREEMENT_PAGEn_QUERY_PART2 = ` ORDER BY agreement_id limit $3;` - -// Used to extract the node orgs in use by all agreements. -const AGREEMENT_NODEORGS = `SELECT DISTINCT agreement->>'device_id' AS deviceid,agreement->>'org' AS org,agreement->>'pattern' as pattern FROM "agreements_ WHERE protocol = $1;` const AGREEMENT_COUNT = `SELECT agreement FROM "agreements_;` @@ -130,8 +121,6 @@ const AGREEMENT_PARTITIONS = `SELECT partition FROM agreements;` const AGREEMENT_DROP_PARTITION = `DROP TABLE "agreements_;` -const AGREEMENT_BYTE_SIZE = 1024 * 5 - // The fields in this object are initialized in the Initialize method in this package. type AgbotPostgresqlDB struct { identity string // The identity of this agbot in the partitions table. @@ -263,130 +252,52 @@ func (db *AgbotPostgresqlDB) GetAgreementCount(partition string) (int64, int64, return activeNum, archivedNum, nil } -// Retrieve a page of agreement from the database and filter them out based on the input filters. -func (db *AgbotPostgresqlDB) FindAgreementsPage(filters []persistence.AgbotDBFilter, protocol string, nextId string, limit int) (string, []persistence.Agreement, error) { +// Retrieve all agreements from the database and filter them out based on the input filters. +func (db *AgbotPostgresqlDB) FindAgreements(filters []persistence.AFilter, protocol string) ([]persistence.Agreement, error) { ags := make([]persistence.Agreement, 0, 100) - lastId := "" for _, currentPartition := range db.AllPartitions() { - - // Find all the agreement objects, allowing the database to do the filtering. If there is no nextId, - // then the caller wants to start at the beginning of the table. - - sqlStr := "" - if nextId == "" { - sqlStr = strings.Replace(AGREEMENT_PAGE1_QUERY_PART1, AGREEMENT_TABLE_NAME_ROOT, db.GetAgreementPartitionTableName(currentPartition), 1) - sqlStr = persistence.RunSQLFilters(sqlStr, filters) - sqlStr = strings.Replace(sqlStr, ";", AGREEMENT_PAGE1_QUERY_PART2, 1) - } else { - sqlStr = strings.Replace(AGREEMENT_PAGEn_QUERY_PART1, AGREEMENT_TABLE_NAME_ROOT, db.GetAgreementPartitionTableName(currentPartition), 1) - sqlStr = persistence.RunSQLFilters(sqlStr, filters) - sqlStr = strings.Replace(sqlStr, ";", AGREEMENT_PAGEn_QUERY_PART2, 1) - } - glog.V(3).Infof("Find agreements using SQL: %v for partition %v", sqlStr, currentPartition) - - var err error - var rows *sql.Rows - if nextId == "" { - rows, err = db.db.Query(sqlStr, protocol, limit) - } else { - rows, err = db.db.Query(sqlStr, nextId, protocol, limit) - } - + // Find all the agreement objects, read them in and run them through the filters (after unmarshalling the blob into an + // in memory agreement object). + sql := strings.Replace(ALL_AGREEMENTS_QUERY, AGREEMENT_TABLE_NAME_ROOT, db.GetAgreementPartitionTableName(currentPartition), 1) + glog.V(5).Infof("Find agreements using SQL: %v for partition %v", sql, currentPartition) + rows, err := db.db.Query(sql, protocol) if err != nil { - return "", nil, errors.New(fmt.Sprintf("error querying for agreements error: %v", err)) + return nil, errors.New(fmt.Sprintf("error querying for agreements error: %v", err)) } // If the rows object doesnt get closed, memory and connections will grow and/or leak. defer rows.Close() - - // Keep track of the last agreement id returned in the result set, so that the caller knows where to begin the next search. for rows.Next() { - agBytes := make([]byte, 0, AGREEMENT_BYTE_SIZE) + agBytes := make([]byte, 0, 2048) ag := new(persistence.Agreement) if err := rows.Scan(&agBytes); err != nil { - return "", nil, errors.New(fmt.Sprintf("error scanning row: %v", err)) + return nil, errors.New(fmt.Sprintf("error scanning row: %v", err)) } else if err := json.Unmarshal(agBytes, ag); err != nil { - return "", nil, errors.New(fmt.Sprintf("error demarshalling row: %v, error: %v", string(agBytes), err)) + return nil, errors.New(fmt.Sprintf("error demarshalling row: %v, error: %v", string(agBytes), err)) } else { - lastId = ag.CurrentAgreementId + if !ag.Archived { + glog.V(5).Infof("Demarshalled agreement in partition %v from DB: %v", currentPartition, ag) + } if agPassed := persistence.RunFilters(ag, filters); agPassed != nil { ags = append(ags, *ag) - glog.V(5).Infof("Demarshalled agreement in partition %v from DB: %v", currentPartition, ag) } } } // The rows.Next() function will exit with false when done or an error occurred. Get any error encountered during iteration. if err = rows.Err(); err != nil { - return "", nil, errors.New(fmt.Sprintf("error iterating: %v", err)) + return nil, errors.New(fmt.Sprintf("error iterating: %v", err)) } } - return lastId, ags, nil - -} - -// Return a map of patterns and policy orgs that are in use for current agreements, and include the list of node orgs -// in use by each pattern and policyorg. -func (db *AgbotPostgresqlDB) GetNodeOrgs(filters []persistence.AgbotDBFilter, protocol string) (map[string][]string, error) { - - res := make(map[string][]string, 5) - - for _, currentPartition := range db.AllPartitions() { - - // Construct the query string by providing the correct table partition name and adding any filters. - sqlStr := strings.Replace(AGREEMENT_NODEORGS, AGREEMENT_TABLE_NAME_ROOT, db.GetAgreementPartitionTableName(currentPartition), 1) - sqlStr = persistence.RunSQLFilters(sqlStr, filters) - glog.V(5).Infof("Find node orgs using SQL: %v for partition %v", sqlStr, currentPartition) - - // Query the database. - rows, err := db.db.Query(sqlStr, protocol) - if err != nil { - return nil, errors.New(fmt.Sprintf("error querying for agreement node orgs error: %v", err)) - } - - // If the rows object doesnt get closed, memory and connections will grow and/or leak. - defer rows.Close() - - // Construct the response object based on the result set. For each pattern or policy org, collect the - // set of node orgs in use by each one. - for rows.Next() { - var deviceid, org, pattern string - if err := rows.Scan(&deviceid, &org, &pattern); err != nil { - return nil, errors.New(fmt.Sprintf("error scanning row: %v", err)) - } else { - // The key to each map entry is the pattern or policy org. - key := pattern - if key == "" { - key = org - } - - // The value of each key is an array of node orgs. - nodeOrg := exchange.GetOrg(deviceid) - if nodeOrgs, ok := res[key]; !ok { - res[key] = []string{nodeOrg} - } else { - if !cutil.SliceContains(nodeOrgs, nodeOrg) { - nodeOrgs = append(nodeOrgs, nodeOrg) - res[key] = nodeOrgs - } - } - } - } - - // The rows.Next() function will exit with false when done or an error occurred. Get any error encountered during iteration. - if err = rows.Err(); err != nil { - return nil, errors.New(fmt.Sprintf("error iterating nodeorg result set: %v", err)) - } - } + return ags, nil - return res, nil } // Find a specific agreement in the database. The input filters are ignored for this query. They are needed by the bolt implementation. -func (db *AgbotPostgresqlDB) internalFindSingleAgreementByAgreementId(tx *sql.Tx, agreementId string, protocol string, filters []persistence.AgbotDBFilter) (*persistence.Agreement, string, error) { +func (db *AgbotPostgresqlDB) internalFindSingleAgreementByAgreementId(tx *sql.Tx, agreementId string, protocol string, filters []persistence.AFilter) (*persistence.Agreement, string, error) { agBytes := make([]byte, 0, 2048) ag := new(persistence.Agreement) @@ -421,20 +332,23 @@ func (db *AgbotPostgresqlDB) internalFindSingleAgreementByAgreementId(tx *sql.Tx } -func (db *AgbotPostgresqlDB) FindSingleAgreementByAgreementId(agreementId string, protocol string, filters []persistence.AgbotDBFilter) (*persistence.Agreement, error) { +func (db *AgbotPostgresqlDB) FindSingleAgreementByAgreementId(agreementId string, protocol string, filters []persistence.AFilter) (*persistence.Agreement, error) { ag, _, err := db.internalFindSingleAgreementByAgreementId(nil, agreementId, protocol, filters) return ag, err } -func (db *AgbotPostgresqlDB) FindSingleAgreementByAgreementIdAllProtocols(agreementid string, protocols []string, filters []persistence.AgbotDBFilter) (*persistence.Agreement, error) { +func (db *AgbotPostgresqlDB) FindSingleAgreementByAgreementIdAllProtocols(agreementid string, protocols []string, filters []persistence.AFilter) (*persistence.Agreement, error) { + filters = append(filters, persistence.IdAFilter(agreementid)) for _, protocol := range protocols { - if ag, err := db.FindSingleAgreementByAgreementId(agreementid, protocol, filters); err != nil { + if agreements, err := db.FindAgreements(filters, protocol); err != nil { return nil, err - } else if ag == nil { + } else if len(agreements) > 1 { + return nil, fmt.Errorf("Expected only one record for agreementid: %v, but retrieved: %v", agreementid, agreements) + } else if len(agreements) == 0 { continue } else { - return ag, nil + return &agreements[0], nil } } return nil, nil @@ -520,7 +434,7 @@ func (db *AgbotPostgresqlDB) Close() { // to be updated (the query is done in it's own transaction), then calls the input function to update the agreement in // memory, and finally calls wrapTransaction to start a transaction that will actually perform the update. func (db *AgbotPostgresqlDB) SingleAgreementUpdate(agreementid string, protocol string, fn func(persistence.Agreement) *persistence.Agreement) (*persistence.Agreement, error) { - if agreement, err := db.FindSingleAgreementByAgreementId(agreementid, protocol, []persistence.AgbotDBFilter{}); err != nil { + if agreement, err := db.FindSingleAgreementByAgreementId(agreementid, protocol, []persistence.AFilter{}); err != nil { return nil, err } else if agreement == nil { return nil, errors.New(fmt.Sprintf("unable to locate agreement id: %v", agreementid)) @@ -548,7 +462,7 @@ func (db *AgbotPostgresqlDB) wrapTransaction(agreementid string, protocol string // agreement object contains valid state transitions, and then write the updated agreement back to the database. func (db *AgbotPostgresqlDB) persistUpdatedAgreement(tx *sql.Tx, agreementid string, protocol string, update *persistence.Agreement) error { - if mod, partition, err := db.internalFindSingleAgreementByAgreementId(tx, agreementid, protocol, []persistence.AgbotDBFilter{}); err != nil { + if mod, partition, err := db.internalFindSingleAgreementByAgreementId(tx, agreementid, protocol, []persistence.AFilter{}); err != nil { return err } else if mod == nil { return errors.New(fmt.Sprintf("No agreement with given id available to update: %v", agreementid)) @@ -598,7 +512,7 @@ func (db *AgbotPostgresqlDB) deleteAgreement(tx *sql.Tx, agreementId string, pro // check to see if the partition specific table is now empty. checkTableDeletion := false - _, partition, err := db.internalFindSingleAgreementByAgreementId(tx, agreementId, protocol, []persistence.AgbotDBFilter{}) + _, partition, err := db.internalFindSingleAgreementByAgreementId(tx, agreementId, protocol, []persistence.AFilter{}) if err != nil { return err } else if partition != db.PrimaryPartition() { diff --git a/agreementbot/persistence/postgresql/filter.go b/agreementbot/persistence/postgresql/filter.go deleted file mode 100644 index 26d9f3813..000000000 --- a/agreementbot/persistence/postgresql/filter.go +++ /dev/null @@ -1,209 +0,0 @@ -package postgresql - -import ( - "fmt" - "github.com/open-horizon/anax/agreementbot/persistence" - "strconv" - "strings" -) - -// SQL constants used by these filters -const UNARCHIVED_AGREEMENT = `CAST(agreement->>'archived' AS BOOLEAN) = false` -const ARCHIVED_AGREEMENT = `CAST(agreement->>'archived' AS BOOLEAN) = true` -const ACTIVE_AGREEMENT = `CAST(agreement->>'agreement_creation_time' AS INTEGER) != 0 AND CAST(agreement->>'agreement_timeout' AS INTEGER) = 0` -const NOT_PATTERN = `agreement->>'pattern' = ''` -const AGEDOUT_AGREEMENT = `CAST(agreement->>'agreement_timeout' AS INTEGER) != 0 AND CAST(agreement->>'agreement_timeout' AS INTEGER) + $1 <= $2` -const NODE_AGREEMENT = `agreement->>'device_id' = '$1'` -const POLICY_AGREEMENT = `agreement->>'org' = '$1' AND agreement->>'policy_name' = '$2'` -const PENDING_AGREEMENT = `CAST(agreement->>'agreement_finalized_time' AS INTEGER) = 0 AND CAST(agreement->>'agreement_timeout' AS INTEGER) = 0` -const SERVICE_ID_AGREEMENT = `(agreement->>'service_id')::jsonb->>0 = '$1'` - -// Filter implementations specific to Postgresql - -// Include unarchived agreements in the result set. -type UnarchivedFilter struct { -} - -func (f *UnarchivedFilter) ConditionSQL(sqlStr string) string { - return strings.Replace(sqlStr, ";", fmt.Sprintf(" AND %s;", UNARCHIVED_AGREEMENT), 1) -} - -func (f *UnarchivedFilter) KeepResult(a *persistence.Agreement) bool { - return true -} - -func (db *AgbotPostgresqlDB) GetUnarchivedFilter() persistence.AgbotDBFilter { - return &UnarchivedFilter{} -} - -// Include archived agreements in the result set. -type ArchivedFilter struct { -} - -func (f *ArchivedFilter) ConditionSQL(sqlStr string) string { - return strings.Replace(sqlStr, ";", fmt.Sprintf(" AND %s;", ARCHIVED_AGREEMENT), 1) -} - -func (f *ArchivedFilter) KeepResult(a *persistence.Agreement) bool { - return true -} - -func (db *AgbotPostgresqlDB) GetArchivedFilter() persistence.AgbotDBFilter { - return &ArchivedFilter{} -} - -// Include active agreements in the result set. -type ActiveFilter struct { -} - -func (f *ActiveFilter) ConditionSQL(sqlStr string) string { - return strings.Replace(sqlStr, ";", fmt.Sprintf(" AND %s;", ACTIVE_AGREEMENT), 1) -} - -func (f *ActiveFilter) KeepResult(a *persistence.Agreement) bool { - return true -} - -func (db *AgbotPostgresqlDB) GetActiveFilter() persistence.AgbotDBFilter { - return &ActiveFilter{} -} - -// Exclude pattern based agreements from the result set. -type NoPatternFilter struct { -} - -func (f *NoPatternFilter) ConditionSQL(sqlStr string) string { - return strings.Replace(sqlStr, ";", fmt.Sprintf(" AND %s;", NOT_PATTERN), 1) -} - -func (f *NoPatternFilter) KeepResult(a *persistence.Agreement) bool { - return true -} - -func (db *AgbotPostgresqlDB) GetNoPatternFilter() persistence.AgbotDBFilter { - return &NoPatternFilter{} -} - -// Include old agreements that have been around for a certain period of time. -type AgedOutFilter struct { - now int64 - ageLimit int -} - -func (f *AgedOutFilter) ConditionSQL(sqlStr string) string { - sql := strings.Replace(AGEDOUT_AGREEMENT, "$1", strconv.Itoa(f.ageLimit*3600), 1) - sql = strings.Replace(sql, "$2", strconv.FormatInt(f.now, 10), 1) - - sql = strings.Replace(sqlStr, ";", fmt.Sprintf(" AND %s;", sql), 1) - return sql -} - -func (f *AgedOutFilter) KeepResult(a *persistence.Agreement) bool { - return true -} - -func (db *AgbotPostgresqlDB) GetAgedOutFilter(now int64, ageLimit int) persistence.AgbotDBFilter { - return &AgedOutFilter{ - now: now, - ageLimit: ageLimit, - } -} - -// Include agreements with a specific node. -type NodeFilter struct { - id string -} - -func (f *NodeFilter) ConditionSQL(sqlStr string) string { - sql := strings.Replace(NODE_AGREEMENT, "$1", f.id, 1) - - sql = strings.Replace(sqlStr, ";", fmt.Sprintf(" AND %s;", sql), 1) - return sql -} - -func (f *NodeFilter) KeepResult(a *persistence.Agreement) bool { - return true -} - -func (db *AgbotPostgresqlDB) GetNodeFilter(id string) persistence.AgbotDBFilter { - return &NodeFilter{ - id: id, - } -} - -// Include agreements with a specific policy. -type PolicyFilter struct { - org string - name string -} - -func (f *PolicyFilter) ConditionSQL(sqlStr string) string { - sql := strings.Replace(POLICY_AGREEMENT, "$1", f.org, 1) - sql = strings.Replace(string(sql), "$2", f.name, 1) - - sql = strings.Replace(sqlStr, ";", fmt.Sprintf(" AND %s;", sql), 1) - return sql -} - -func (f *PolicyFilter) KeepResult(a *persistence.Agreement) bool { - return true -} - -func (db *AgbotPostgresqlDB) GetPolicyFilter(org string, name string) persistence.AgbotDBFilter { - return &PolicyFilter{ - org: org, - name: name, - } -} - -// Include agreements that are in the process of forming. -type PendingFilter struct { -} - -func (f *PendingFilter) ConditionSQL(sqlStr string) string { - return strings.Replace(sqlStr, ";", fmt.Sprintf(" AND %s;", PENDING_AGREEMENT), 1) -} - -func (f *PendingFilter) KeepResult(a *persistence.Agreement) bool { - return true -} - -func (db *AgbotPostgresqlDB) GetPendingFilter() persistence.AgbotDBFilter { - return &PendingFilter{} -} - -// Include only a specific agreement id. For the Postgresql DB plugin, this does nothing. -type AgreementIdFilter struct { -} - -func (f *AgreementIdFilter) ConditionSQL(sqlStr string) string { - return sqlStr -} - -func (f *AgreementIdFilter) KeepResult(a *persistence.Agreement) bool { - return true -} - -func (db *AgbotPostgresqlDB) GetAgreementIdFilter(id string) persistence.AgbotDBFilter { - return &AgreementIdFilter{} -} - -// Include only agreements for a specific service id. -type ServiceIdFilter struct { - id string -} - -func (f *ServiceIdFilter) ConditionSQL(sqlStr string) string { - sql := strings.Replace(SERVICE_ID_AGREEMENT, "$1", f.id, 1) - return strings.Replace(sqlStr, ";", fmt.Sprintf(" AND %s;", sql), 1) -} - -func (f *ServiceIdFilter) KeepResult(a *persistence.Agreement) bool { - return true -} - -func (db *AgbotPostgresqlDB) GetServiceIdFilter(id string) persistence.AgbotDBFilter { - return &ServiceIdFilter{ - id: id, - } -} diff --git a/agreementbot/persistence/postgresql/workload_usage.go b/agreementbot/persistence/postgresql/workload_usage.go index 9103ac352..4429110fc 100644 --- a/agreementbot/persistence/postgresql/workload_usage.go +++ b/agreementbot/persistence/postgresql/workload_usage.go @@ -274,7 +274,7 @@ func (db *AgbotPostgresqlDB) UpdateWUAgreementId(deviceid string, policyName str // move the workload usage record. if wlUsage, wlPartition, err := db.internalFindSingleWorkloadUsageByDeviceAndPolicyName(nil, deviceid, policyName); err != nil { return nil, err - } else if _, agPartition, err := db.internalFindSingleAgreementByAgreementId(nil, agid, protocol, []persistence.AgbotDBFilter{}); err != nil { + } else if _, agPartition, err := db.internalFindSingleAgreementByAgreementId(nil, agid, protocol, []persistence.AFilter{}); err != nil { return nil, err } else if wlPartition != agPartition { // Move the existing record to the new partition. Inserts are always done in the primary partition. diff --git a/agreementbot/policy.go b/agreementbot/policy.go index 53e2a7e51..b33fb030a 100644 --- a/agreementbot/policy.go +++ b/agreementbot/policy.go @@ -192,145 +192,143 @@ func (w *BaseAgreementWorker) HandleMMSObjectPolicy(cph ConsumerProtocolHandler, glog.V(5).Infof(BAWlogstring(workerId, fmt.Sprintf("Object Policy current dest nodes: %v", destNodes))) + inProgress := func() persistence.AFilter { + return func(e persistence.Agreement) bool { return e.AgreementCreationTime != 0 && e.AgreementTimedout == 0 } + } + + notPattern := func() persistence.AFilter { + return func(e persistence.Agreement) bool { return e.Pattern == "" } + } + + // Find all policy related agreements that are in progress. + agreements, err := w.db.FindAgreements([]persistence.AFilter{inProgress(), notPattern(), persistence.UnarchivedAFilter()}, cph.Name()) + if err != nil { + glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy unable to read agreements, error %v", err))) + return + } + + // The main logic in the function can be summarized as follows. The top half of the algorithm verifies that any policy changes + // in the object's constraints are checked. The bottom half of the algorithm verifies that changes to the object policy's + // service list are handled correctly. + // + // for all agreements in this agbot: (this ensures that the agbot only considers agreements in it's scope) + // if the agreement is for a service that is compatible (including arch and version range) with a service in the new policy + // if agreement's node's policy is compatible with new object policy + // if agreement's node is NOT in current obj dest list, then + // add the agreement's node to object's destination list + // else + // nothing to do, the object is already on the agreement's node + // else + // if agreement's node is in the object's destination list, then + // remove the agreement's node from obj destination list + // else + // nothing to do, the agreement's node is not in the object's destination list + + // else (we might have to remove the node from the object's destination list - if the object policy's service list changed) + // if the policy change event includes an old/previous Policy (which is the policy before the change) + // if the old Policy's service list is different from newPolicy service list (then a service list change has occurred, so more checks are required) + // if the agreement is for a service that is compatible with a service in the old policy (this agreement's node might need to be removed from the object's destination list) + // if the agreement's node is in current object's destination list, then (it needs to be removed if there are no other services on the node which are compatible with the object policy) + // find ALL the services running on the node (even the services for which this agbot doesnt have an agreement) + // if none of them are in new policy then + // remove the node from the dest list + // else + // nothing to do, assume that the agbot which owns the agreement for the other services will handle this same policy change event appropriately + // else + // nothing to do, node is not in object's destination list + // else + // nothing to do, this agreement is irrelevant because it contains services in neither the old nor new object policy + // else (the new policy service list has not changed, and since it doesnt match a service in this agreement, there is nothing to do) + // nothing to do + // else (no old policy so the object policy's service list hasnt changed) + // nothing to do, the new policy didn't match any services of agreements owned by this agbot. + objPolicies := new(exchange.ObjectDestinationPolicies) (*objPolicies) = append((*objPolicies), newPolicy) - // Find all policy related agreements that are in progress. Agreements are scanned in pages so that we dont consume too much memory. - nextAgreementId := "" - for { - lastAgreementId, agreements, err := w.db.FindAgreementsPage([]persistence.AgbotDBFilter{w.db.GetActiveFilter(), w.db.GetNoPatternFilter(), w.db.GetUnarchivedFilter()}, cph.Name(), nextAgreementId, w.config.GetAgbotDBLimit()) - if err != nil { - glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy unable to read agreements, error %v", err))) - return - } + for _, agreement := range agreements { - // If there are no more agreements to iterate, then break out of the loop. - if lastAgreementId == "" { - break - } else { - nextAgreementId = lastAgreementId - } + // if the agreement is for a service that is compatible (including arch and version range) with a service in the new policy + if w.findCompatibleServices(&agreement, &newPolicy, workerId, w.config.ArchSynonyms) { + + _, nodePolicy, err := compcheck.GetNodePolicy(exchange.GetHTTPNodePolicyHandler(w), agreement.DeviceId, nil) - // The main logic in the function can be summarized as follows. The top half of the algorithm verifies that any policy changes - // in the object's constraints are checked. The bottom half of the algorithm verifies that changes to the object policy's - // service list are handled correctly. - // - // for all agreements in this agbot: (this ensures that the agbot only considers agreements in it's scope) - // if the agreement is for a service that is compatible (including arch and version range) with a service in the new policy - // if agreement's node's policy is compatible with new object policy - // if agreement's node is NOT in current obj dest list, then - // add the agreement's node to object's destination list - // else - // nothing to do, the object is already on the agreement's node - // else - // if agreement's node is in the object's destination list, then - // remove the agreement's node from obj destination list - // else - // nothing to do, the agreement's node is not in the object's destination list - - // else (we might have to remove the node from the object's destination list - if the object policy's service list changed) - // if the policy change event includes an old/previous Policy (which is the policy before the change) - // if the old Policy's service list is different from newPolicy service list (then a service list change has occurred, so more checks are required) - // if the agreement is for a service that is compatible with a service in the old policy (this agreement's node might need to be removed from the object's destination list) - // if the agreement's node is in current object's destination list, then (it needs to be removed if there are no other services on the node which are compatible with the object policy) - // find ALL the services running on the node (even the services for which this agbot doesnt have an agreement) - // if none of them are in new policy then - // remove the node from the dest list - // else - // nothing to do, assume that the agbot which owns the agreement for the other services will handle this same policy change event appropriately - // else - // nothing to do, node is not in object's destination list - // else - // nothing to do, this agreement is irrelevant because it contains services in neither the old nor new object policy - // else (the new policy service list has not changed, and since it doesnt match a service in this agreement, there is nothing to do) - // nothing to do - // else (no old policy so the object policy's service list hasnt changed) - // nothing to do, the new policy didn't match any services of agreements owned by this agbot. - - for _, agreement := range agreements { - - // if the agreement is for a service that is compatible (including arch and version range) with a service in the new policy - if w.findCompatibleServices(&agreement, &newPolicy, workerId, w.config.ArchSynonyms) { - - _, nodePolicy, err := compcheck.GetNodePolicy(exchange.GetHTTPNodePolicyHandler(w), agreement.DeviceId, nil) + if err != nil { + glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy error %v", err))) + } else if nodePolicy == nil { + glog.Errorf(BAWlogstring(workerId, fmt.Errorf("No node policy found for %v", agreement.DeviceId))) + } else { + // if agreement's node's policy is compatible with new object policy + // if agreement's node is NOT in current obj dest list, then + // add the agreement's node to object's destination list + added, err := AssignObjectToNode(w, objPolicies, agreement.DeviceId, nodePolicy, false) if err != nil { glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy error %v", err))) - } else if nodePolicy == nil { - glog.Errorf(BAWlogstring(workerId, fmt.Errorf("No node policy found for %v", agreement.DeviceId))) - } else { - - // if agreement's node's policy is compatible with new object policy - // if agreement's node is NOT in current obj dest list, then - // add the agreement's node to object's destination list - added, err := AssignObjectToNode(w, objPolicies, agreement.DeviceId, nodePolicy, false) + } else if !added { + // else + // if agreement's node is in the object's destination list, then + // remove the agreement's node from obj destination list + err := UnassignObjectFromNode(w, &newPolicy, agreement.DeviceId) if err != nil { glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy error %v", err))) - } else if !added { - // else - // if agreement's node is in the object's destination list, then - // remove the agreement's node from obj destination list - err := UnassignObjectFromNode(w, &newPolicy, agreement.DeviceId) - if err != nil { - glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy error %v", err))) - } } } - continue } + continue + } - // else (we might have to remove the node from the object's destination list - if the object policy's service list changed) - // if the policy change event includes an old/previous Policy (which is the policy before the change) - if wi.Event.OldPolicy != nil { + // else (we might have to remove the node from the object's destination list - if the object policy's service list changed) + // if the policy change event includes an old/previous Policy (which is the policy before the change) + if wi.Event.OldPolicy != nil { - // if the old Policy's service list is different from newPolicy service list (then a service list change has occurred, so more checks are required) - if hasDifferentServiceLists(&newPolicy, &oldPolicy) { + // if the old Policy's service list is different from newPolicy service list (then a service list change has occurred, so more checks are required) + if hasDifferentServiceLists(&newPolicy, &oldPolicy) { - // if the agreement is for a service that is compatible with a service in the old policy (this agreement's node might need to be removed - // from the object's destination list) - if w.findCompatibleServices(&agreement, &oldPolicy, workerId, w.config.ArchSynonyms) { + // if the agreement is for a service that is compatible with a service in the old policy (this agreement's node might need to be removed + // from the object's destination list) + if w.findCompatibleServices(&agreement, &oldPolicy, workerId, w.config.ArchSynonyms) { - // if the agreement's node is in current object's destination list, then (it needs to be removed if there are no other services on - // the node which are compatible with the object policy) - if cutil.SliceContains(destNodes, exchange.GetId(agreement.DeviceId)) { + // if the agreement's node is in current object's destination list, then (it needs to be removed if there are no other services on + // the node which are compatible with the object policy) + if cutil.SliceContains(destNodes, exchange.GetId(agreement.DeviceId)) { - // find ALL the services running on the node (even the services for which this agbot doesnt have an agreement) - ns, err := exchange.GetHTTPNodeStatusHandler(w)(agreement.DeviceId) + // find ALL the services running on the node (even the services for which this agbot doesnt have an agreement) + ns, err := exchange.GetHTTPNodeStatusHandler(w)(agreement.DeviceId) + if err != nil { + glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy unable to get node status, error %v", err))) + continue + } + glog.V(3).Infof(BAWlogstring(workerId, fmt.Sprintf("Object Policy got node status: %v", ns.RunningServices))) + + // if none of them are in new policy then + if !hasRunningService(ns.RunningServices, &newPolicy, workerId, w.config.ArchSynonyms) { + // remove the node from the dest list + err := UnassignObjectFromNode(w, &newPolicy, agreement.DeviceId) if err != nil { - glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy unable to get node status, error %v", err))) - continue - } - glog.V(3).Infof(BAWlogstring(workerId, fmt.Sprintf("Object Policy got node status: %v", ns.RunningServices))) - - // if none of them are in new policy then - if !hasRunningService(ns.RunningServices, &newPolicy, workerId, w.config.ArchSynonyms) { - // remove the node from the dest list - err := UnassignObjectFromNode(w, &newPolicy, agreement.DeviceId) - if err != nil { - glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy error %v", err))) - } - } else { - // else - // nothing to do, assume that the agbot which owns the agreement for the other services will handle this same policy change event appropriately + glog.Errorf(BAWlogstring(workerId, fmt.Sprintf("Object Policy error %v", err))) } } else { // else - // nothing to do, node is not in object's destination list + // nothing to do, assume that the agbot which owns the agreement for the other services will handle this same policy change event appropriately } } else { // else - // nothing to do, this agreement is irrelevant because it contains services in neither the old nor new object policy + // nothing to do, node is not in object's destination list } } else { - // else (the new policy service list has not changed, and since it doesnt match a service in this agreement, there is nothing to do) - // nothing to do + // else + // nothing to do, this agreement is irrelevant because it contains services in neither the old nor new object policy } } else { - // else (no old policy so the object policy's service list hasnt changed) - // nothing to do, the new policy didn't match any services of agreements owned by this agbot. + // else (the new policy service list has not changed, and since it doesnt match a service in this agreement, there is nothing to do) + // nothing to do } - + } else { + // else (no old policy so the object policy's service list hasnt changed) + // nothing to do, the new policy didn't match any services of agreements owned by this agbot. } + } glog.V(3).Infof(BAWlogstring(workerId, fmt.Sprintf("done with MMS Object Policy event: %v", wi))) diff --git a/config/config.go b/config/config.go index d5b79be82..ca67a71d3 100644 --- a/config/config.go +++ b/config/config.go @@ -111,7 +111,6 @@ type AGConfig struct { AgreementBatchSize uint64 // The number of nodes that the agbot will process in a batch. FullRescanS uint64 // The number of seconds between policy scans when there have been no changes reported by the exchange. MaxExchangeChanges int // The maximum number of exchange changes to request on a given call the exchange /changes API. - DBPagingLimit int // The maximum number of agreements obtained from the DB at any one time, when iterating all agreements. } func (c *HorizonConfig) UserPublicKeyPath() string { @@ -158,10 +157,6 @@ func (c *HorizonConfig) GetAgbotFullRescan() uint64 { return c.AgreementBot.FullRescanS } -func (c *HorizonConfig) GetAgbotDBLimit() int { - return c.AgreementBot.DBPagingLimit -} - func getDefaultBase() string { basePath := os.Getenv("HZN_VAR_BASE") if basePath == "" { @@ -231,7 +226,6 @@ func Read(file string) (*HorizonConfig, error) { AgreementBatchSize: AgbotAgreementBatchSize_DEFAULT, FullRescanS: AgbotFullRescan_DEFAULT, MaxExchangeChanges: AgbotMaxChanges_DEFAULT, - DBPagingLimit: AgbotAgreementDBLimit_DEFAULT, }, } diff --git a/config/constants.go b/config/constants.go index a737f16d9..05c9764e1 100644 --- a/config/constants.go +++ b/config/constants.go @@ -80,6 +80,3 @@ const AgbotFullRescan_DEFAULT = 600 // The maximum number of changes to retrieve at once from the exchange const AgbotMaxChanges_DEFAULT = 1000 - -// The maximum number of agreements returned from the DB when iterating all of them. -const AgbotAgreementDBLimit_DEFAULT = 1000 diff --git a/events/events.go b/events/events.go index 907df152e..5f571f568 100644 --- a/events/events.go +++ b/events/events.go @@ -1043,7 +1043,6 @@ type ABApiWorkloadUpgradeMessage struct { AgreementProtocol string AgreementId string DeviceId string - Org string PolicyName string } @@ -1052,14 +1051,14 @@ func (m *ABApiWorkloadUpgradeMessage) Event() Event { } func (m ABApiWorkloadUpgradeMessage) String() string { - return fmt.Sprintf("Event: %v, AgreementProtocol: %v, AgreementId: %v, DeviceId: %v, Org: %v, PolicyName: %v", m.event, m.AgreementProtocol, m.AgreementId, m.DeviceId, m.Org, m.PolicyName) + return fmt.Sprintf("Event: %v, AgreementProtocol: %v, AgreementId: %v, DeviceId: %v, PolicyName: %v", m.event, m.AgreementProtocol, m.AgreementId, m.DeviceId, m.PolicyName) } func (m ABApiWorkloadUpgradeMessage) ShortString() string { return m.String() } -func NewABApiWorkloadUpgradeMessage(id EventId, protocol string, agreementId string, deviceId string, org string, policyName string) *ABApiWorkloadUpgradeMessage { +func NewABApiWorkloadUpgradeMessage(id EventId, protocol string, agreementId string, deviceId string, policyName string) *ABApiWorkloadUpgradeMessage { return &ABApiWorkloadUpgradeMessage{ event: Event{ Id: id, @@ -1067,7 +1066,6 @@ func NewABApiWorkloadUpgradeMessage(id EventId, protocol string, agreementId str AgreementProtocol: protocol, AgreementId: agreementId, DeviceId: deviceId, - Org: org, PolicyName: policyName, } } diff --git a/exchange/rpc.go b/exchange/rpc.go index 2b135e628..8d8292c5d 100644 --- a/exchange/rpc.go +++ b/exchange/rpc.go @@ -737,7 +737,7 @@ func GetNodeHealthStatus(httpClientFactory *config.HTTPClientFactory, pattern st } } else { status := resp.(*NodeHealthStatus) - glog.V(3).Infof(rpclogString(fmt.Sprintf("found node health status for %v, status %v", pattern, status))) + glog.V(3).Infof(rpclogString(fmt.Sprintf("found nodehealth status for %v, status %v", pattern, status))) return status, nil } } diff --git a/test/Makefile b/test/Makefile index 26e8a7c54..5cf01a9ac 100644 --- a/test/Makefile +++ b/test/Makefile @@ -2,7 +2,7 @@ SHELL := /bin/bash #Versioning -DOCKER_EXCH_TAG ?= 2.23.0 +DOCKER_EXCH_TAG := 2.23.0 PKG_URL_BASE := http://169.45.88.181/linux/ubuntu PKGS_URL := $(PKG_URL_BASE)/dists/xenial-testing/main/binary-amd64/Packages DEB_PKGS_URL := $(PKG_URL_BASE)/pool/main/h/horizon diff --git a/test/gov/service_apireg.sh b/test/gov/service_apireg.sh index 59aaa8d8e..caa049216 100755 --- a/test/gov/service_apireg.sh +++ b/test/gov/service_apireg.sh @@ -962,6 +962,55 @@ echo -e "Register gps service pattern $VERS:" results "$RES" +# shelm test pattern +# if [ "${EXCH_APP_HOST}" = "http://exchange-api:8080/v1" ]; then +# MHI=90 +# CAS=60 +# else +# MHI=600 +# CAS=600 +# fi + +# VERS="1.0.0" +# read -d '' sdef <