diff --git a/agreementbot/api.go b/agreementbot/api.go index d718a9cbc..8382a48b9 100644 --- a/agreementbot/api.go +++ b/agreementbot/api.go @@ -354,8 +354,11 @@ func (a *API) listen(apiListen string) { router.HandleFunc("/cache/deploymentpol", a.ListDeploy).Methods("GET", "OPTIONS") router.HandleFunc("/cache/deploymentpol/{org}", a.ListDeploy).Methods("GET", "OPTIONS") router.HandleFunc("/cache/deploymentpol/{org}/{name}", a.ListDeploy).Methods("GET", "OPTIONS") - router.HandleFunc("/ha/upgradingwlu", a.ha_upgrading_wlu).Methods("GET", "OPTIONS") - router.HandleFunc("/ha/upgradingnode", a.ha_upgrading_node).Methods("GET", "OPTIONS") + router.HandleFunc("/ha/upgradingwlu", a.ha_upgrading_wlu).Methods("GET", "DELETE", "OPTIONS") + router.HandleFunc("/ha/upgradingwlu/{org}/{group_name}", a.ha_upgrading_wlu).Methods("GET", "DELETE", "OPTIONS") + router.HandleFunc("/ha/upgradingwlu/{org}/{group_name}/{policy_name}", a.ha_upgrading_wlu).Methods("GET", "DELETE", "OPTIONS") + router.HandleFunc("/ha/upgradingnode", a.ha_upgrading_node).Methods("GET", "DELETE", "OPTIONS") + router.HandleFunc("/ha/upgradingnode/{org}/{group_name}", a.ha_upgrading_node).Methods("GET", "DELETE", "OPTIONS") if err := http.ListenAndServe(apiListen, nocache(router)); err != nil { glog.Fatalf(APIlogString(fmt.Sprintf("failed to start listener on %v, error %v", apiListen, err))) @@ -849,21 +852,72 @@ func (a *API) partition(w http.ResponseWriter, r *http.Request) { } } -// List all the entries in the ha_workload_upgrade table. They are the HA nodes +// List ot delete the entries in the ha_workload_upgrade table. They are the HA nodes // in which the workload is being upgraded. func (a *API) ha_upgrading_wlu(w http.ResponseWriter, r *http.Request) { + glog.V(5).Infof(APIlogString(fmt.Sprintf("Handling %v on HA upgrading workloads.", r.Method))) + switch r.Method { case "GET": - if ha_wlu, err := a.db.ListAllHAUpgradingWorkloads(); err != nil { - glog.Error(APIlogString(fmt.Sprintf("error finding all HA nodes in which the workload is being upgraded, error: %v", err))) + pathVars := mux.Vars(r) + orgID := pathVars["org"] + groupName := pathVars["group_name"] + policyName := pathVars["policy_name"] + + var err error + var ha_wlu []persistence.UpgradingHAGroupWorkload + if orgID == "" { + ha_wlu, err = a.db.ListAllHAUpgradingWorkloads() + } else if policyName == "" { + ha_wlu, err = a.db.ListHAUpgradingWorkloadsByGroupName(orgID, groupName) + } else { + tmp_wlu, err := a.db.GetHAUpgradingWorkload(orgID, groupName, policyName) + if err == nil { + if tmp_wlu == nil { + ha_wlu = []persistence.UpgradingHAGroupWorkload{} + } else { + ha_wlu = []persistence.UpgradingHAGroupWorkload{*tmp_wlu} + } + } + } + + if err != nil { + glog.Error(APIlogString(fmt.Sprintf("error finding HA nodes in which the workload is being upgraded, error: %v", err))) http.Error(w, "Internal server error", http.StatusInternalServerError) } else { // write output writeResponse(w, ha_wlu, http.StatusOK) } + case "DELETE": + pathVars := mux.Vars(r) + orgID := pathVars["org"] + groupName := pathVars["group_name"] + policyName := pathVars["policy_name"] + + var err error + if orgID == "" { + err = a.db.DeleteAllHAUpgradingWorkload() + } else if policyName == "" { + err = a.db.DeleteHAUpgradingWorkloadsByGroupName(orgID, groupName) + } else { + var tmp_wlu *persistence.UpgradingHAGroupWorkload + tmp_wlu, err = a.db.GetHAUpgradingWorkload(orgID, groupName, policyName) + if err == nil && tmp_wlu != nil { + err = a.db.DeleteHAUpgradingWorkload(*tmp_wlu) + } + } + + if err != nil { + glog.Error(APIlogString(fmt.Sprintf("error deleting HA nodes in which the workload is being upgraded, error: %v", err))) + http.Error(w, "Internal server error", http.StatusInternalServerError) + } else { + // write output + w.WriteHeader(http.StatusOK) + } + case "OPTIONS": - w.Header().Set("Allow", "GET, OPTIONS") + w.Header().Set("Allow", "GET, DELETE, OPTIONS") w.WriteHeader(http.StatusOK) default: w.WriteHeader(http.StatusMethodNotAllowed) @@ -874,17 +928,54 @@ func (a *API) ha_upgrading_wlu(w http.ResponseWriter, r *http.Request) { // the nodes that are being upgraded when the node is in a HA group. func (a *API) ha_upgrading_node(w http.ResponseWriter, r *http.Request) { + glog.V(5).Infof(APIlogString(fmt.Sprintf("Handling %v on HA upgrading nodes.", r.Method))) + + pathVars := mux.Vars(r) + orgID := pathVars["org"] + groupName := pathVars["group_name"] + switch r.Method { case "GET": - if ha_nodes, err := a.db.ListAllUpgradingHANode(); err != nil { - glog.Error(APIlogString(fmt.Sprintf("error finding all upgrading HA nodes, error: %v", err))) + var err error + var ha_nodes []persistence.UpgradingHAGroupNode + if orgID == "" { + ha_nodes, err = a.db.ListAllUpgradingHANode() + } else { + var tmp_node *persistence.UpgradingHAGroupNode + tmp_node, err = a.db.ListUpgradingNodeInGroup(orgID, groupName) + if err == nil { + if tmp_node == nil { + ha_nodes = []persistence.UpgradingHAGroupNode{} + } else { + ha_nodes = []persistence.UpgradingHAGroupNode{*tmp_node} + } + } + } + + if err != nil { + glog.Error(APIlogString(fmt.Sprintf("error finding upgrading HA nodes, error: %v", err))) http.Error(w, "Internal server error", http.StatusInternalServerError) } else { // write output writeResponse(w, ha_nodes, http.StatusOK) } + case "DELETE": + var err error + if orgID == "" { + err = a.db.DeleteAllUpgradingHANode() + } else { + err = a.db.DeleteHAUpgradeNodeByGroup(orgID, groupName) + } + + if err != nil { + glog.Error(APIlogString(fmt.Sprintf("error deleting upgrading HA nodes, error: %v", err))) + http.Error(w, "Internal server error", http.StatusInternalServerError) + } else { + // write output + w.WriteHeader(http.StatusOK) + } case "OPTIONS": - w.Header().Set("Allow", "GET, OPTIONS") + w.Header().Set("Allow", "GET, DELETE, OPTIONS") w.WriteHeader(http.StatusOK) default: w.WriteHeader(http.StatusMethodNotAllowed) diff --git a/agreementbot/consumer_protocol_handler.go b/agreementbot/consumer_protocol_handler.go index 6c11dddeb..0f72983a7 100644 --- a/agreementbot/consumer_protocol_handler.go +++ b/agreementbot/consumer_protocol_handler.go @@ -314,7 +314,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChanged(cmd *PolicyChangedComm if ag.Pattern == "" { agStillValid = b.HandlePolicyChangeForAgreement(ag, cph) } - + if !agStillValid { glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a policy %v that has changed incompatibly. Cancelling agreement: %v", ag.CurrentAgreementId, pol.Header.Name, err))) b.CancelAgreement(ag, TERM_REASON_POLICY_CHANGED, cph) @@ -535,11 +535,10 @@ func (b *BaseConsumerProtocolHandler) CancelAgreement(ag persistence.Agreement, // put this workload in HA workload upgrading table glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("inserting HA upgrading workloads with hagroup %v, org: %v, policyName: %v deviceId: %v", theDev.HAGroup, ag.Org, ag.PolicyName, ag.DeviceId))) - if err = b.db.InsertHAUpgradingWorkloadForGroupAndPolicy(deviceAndGroupOrg, theDev.HAGroup, ag.PolicyName, ag.DeviceId); err != nil { - // might not be an error if the entry is already added by another agbot - glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("unable to insert HA upgrading workloads with hagroup %v, org: %v, policyName: %v deviceId: %v, error: %v", theDev.HAGroup, ag.Org, ag.PolicyName, ag.DeviceId, err))) + if currentNodeId, err := b.db.InsertHAUpgradingWorkloadForGroupAndPolicy(deviceAndGroupOrg, theDev.HAGroup, ag.PolicyName, ag.DeviceId); err != nil { + glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("unable to insert HA upgrading workloads with hagroup %v, org: %v, policyName: %v deviceId: %v, error: %v", theDev.HAGroup, ag.Org, ag.PolicyName, ag.DeviceId, err))) return - } else { + } else if currentNodeId == ag.DeviceId { glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("delete workloadusage and cancel agreement for: org: %v, hagroup: %v, policyName: %v deviceId: %v", ag.Org, theDev.HAGroup, ag.PolicyName, ag.DeviceId))) if err := b.db.DeleteWorkloadUsage(ag.DeviceId, ag.PolicyName); err != nil { glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("error deleting workload usage for %v using policy %v, error: %v", ag.DeviceId, ag.PolicyName, err))) @@ -547,6 +546,9 @@ func (b *BaseConsumerProtocolHandler) CancelAgreement(ag persistence.Agreement, agreementWork := NewCancelAgreement(ag.CurrentAgreementId, ag.AgreementProtocol, cph.GetTerminationCode(reason), 0) cph.WorkQueue().InboundHigh() <- &agreementWork return + } else { + glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("unable to insert HA upgrading workloads with hagroup %v, org: %v, policyName: %v deviceId: %v because there is another node %v exists already in the table.", theDev.HAGroup, ag.Org, ag.PolicyName, ag.DeviceId, currentNodeId))) + return } } } diff --git a/agreementbot/governance.go b/agreementbot/governance.go index c23cd3d03..cb8108d3c 100644 --- a/agreementbot/governance.go +++ b/agreementbot/governance.go @@ -372,11 +372,12 @@ func (w *AgreementBotWorker) governHAPartners() { } else if currentUpgradingWorkloadForGroup == nil { glog.V(5).Infof(logString(fmt.Sprintf("no workload is upgrading for hagroup %v, now upgrade the workload: %v.", device.HAGroup, wlu.String()))) // insert this workload into ha workload upgrade table, then upgrade this workload - if err = w.db.InsertHAUpgradingWorkloadForGroupAndPolicy(org, haGroupName, wlu.PolicyName, wlu.DeviceId); err != nil { - // might not be an error if the insertion was done by another agbot before this one + if currentNodeId, err := w.db.InsertHAUpgradingWorkloadForGroupAndPolicy(org, haGroupName, wlu.PolicyName, wlu.DeviceId); err != nil { glog.Warningf(logString(fmt.Sprintf("unable to insert HA upgrading workloads with hagroup %v, org: %v, policyName: %v deviceId: %v. %v", haGroupName, org, wlu.PolicyName, wlu.DeviceId, err))) - } else { + } else if currentNodeId == wlu.DeviceId { w.UpgradeWorkload(wlu) + } else { + glog.Infof(logString(fmt.Sprintf("unable to insert HA upgrading workloads with hagroup %v, org: %v, policyName: %v deviceId: %v because there is another node %v exists already in the table.", haGroupName, org, wlu.PolicyName, wlu.DeviceId, currentNodeId))) } } } diff --git a/agreementbot/persistence/bolt/ha_group_node_management.go b/agreementbot/persistence/bolt/ha_group_node_management.go index 89deb08b6..7ad1b4a35 100644 --- a/agreementbot/persistence/bolt/ha_group_node_management.go +++ b/agreementbot/persistence/bolt/ha_group_node_management.go @@ -41,6 +41,20 @@ func (db *AgbotBoltDB) CheckIfGroupPresentAndUpdateHATable(requestingNode persis return &updatedDBNode, dbErr } +func (db *AgbotBoltDB) DeleteAllUpgradingHANode() error { + if upgradingHANodes, err := db.ListAllUpgradingHANode(); err != nil { + return err + } else if len(upgradingHANodes) != 0 { + for _, uNode := range upgradingHANodes { + // delete upgrading ha workload + if err = db.DeleteHAUpgradeNode(uNode); err != nil { + return err + } + } + } + return nil +} + func (db *AgbotBoltDB) DeleteHAUpgradeNode(nodeToDelete persistence.UpgradingHAGroupNode) error { return db.db.Update(func(tx *bolt.Tx) error { if b := tx.Bucket([]byte(HABUCKET)); b == nil { diff --git a/agreementbot/persistence/bolt/ha_group_workload.go b/agreementbot/persistence/bolt/ha_group_workload.go index adbb7b56e..518376305 100644 --- a/agreementbot/persistence/bolt/ha_group_workload.go +++ b/agreementbot/persistence/bolt/ha_group_workload.go @@ -10,18 +10,8 @@ import ( const HA_WORKLOAD_USAGE_BUCKET = "ha_workload_usage" -func (db *AgbotBoltDB) DeleteHAUpgradingWorkload(workloadToDelete persistence.UpgradingHAGroupWorkload) error { - return db.db.Update(func(tx *bolt.Tx) error { - if b := tx.Bucket([]byte(HA_WORKLOAD_USAGE_BUCKET)); b == nil { - return fmt.Errorf("Unknown bucket %v", HA_WORKLOAD_USAGE_BUCKET) - } else { - return b.Delete([]byte(haWLUId(workloadToDelete.OrgId, workloadToDelete.GroupName, workloadToDelete.PolicyName))) - } - }) -} - -func (db *AgbotBoltDB) DeleteHAUpgradingWorkloadsByGroupName(org string, haGroupName string) error { - if upgradingHAWorkloads, err := db.FindHAUpgradeWorkloadsWithFilters([]persistence.HAWorkloadUpgradeFilter{persistence.HAWorkloadUpgradeGroupFilter(org, haGroupName)}); err != nil { +func (db *AgbotBoltDB) DeleteAllHAUpgradingWorkload() error { + if upgradingHAWorkloads, err := db.ListAllHAUpgradingWorkloads(); err != nil { return err } else if len(upgradingHAWorkloads) != 0 { for _, upgrupgradingHAWorkload := range upgradingHAWorkloads { @@ -34,8 +24,18 @@ func (db *AgbotBoltDB) DeleteHAUpgradingWorkloadsByGroupName(org string, haGroup return nil } -func (db *AgbotBoltDB) DeleteHAUpgradingWorkloadsByGroupNameAndDeviceId(org string, haGroupName string, deviceId string) error { - if upgradingHAWorkloads, err := db.FindHAUpgradeWorkloadsWithFilters([]persistence.HAWorkloadUpgradeFilter{persistence.HAWorkloadUpgradeGroupAndNodeFilter(org, haGroupName, deviceId)}); err != nil { +func (db *AgbotBoltDB) DeleteHAUpgradingWorkload(workloadToDelete persistence.UpgradingHAGroupWorkload) error { + return db.db.Update(func(tx *bolt.Tx) error { + if b := tx.Bucket([]byte(HA_WORKLOAD_USAGE_BUCKET)); b == nil { + return fmt.Errorf("Unknown bucket %v", HA_WORKLOAD_USAGE_BUCKET) + } else { + return b.Delete([]byte(haWLUId(workloadToDelete.OrgId, workloadToDelete.GroupName, workloadToDelete.PolicyName))) + } + }) +} + +func (db *AgbotBoltDB) DeleteHAUpgradingWorkloadsByGroupName(org string, haGroupName string) error { + if upgradingHAWorkloads, err := db.FindHAUpgradeWorkloadsWithFilters([]persistence.HAWorkloadUpgradeFilter{persistence.HAWorkloadUpgradeGroupFilter(org, haGroupName)}); err != nil { return err } else if len(upgradingHAWorkloads) != 0 { for _, upgrupgradingHAWorkload := range upgradingHAWorkloads { @@ -156,14 +156,23 @@ func (db *AgbotBoltDB) UpdateHAUpgradingWorkloadForGroupAndPolicy(org string, ha } } -func (db *AgbotBoltDB) InsertHAUpgradingWorkloadForGroupAndPolicy(org string, haGroupName string, policyName string, deviceId string) error { +// Check if there is an entry for the given haGroupName, org, policyName. If exists, return the node id of the existing row. If not, insert a new row. +func (db *AgbotBoltDB) InsertHAUpgradingWorkloadForGroupAndPolicy(org string, haGroupName string, policyName string, deviceId string) (string, error) { key := haWLUId(org, haGroupName, policyName) + newNodeId := deviceId dbErr := db.db.Update(func(tx *bolt.Tx) error { if b, err := tx.CreateBucketIfNotExists([]byte(HA_WORKLOAD_USAGE_BUCKET)); err != nil { return err } else { current := b.Get([]byte(key)) if current != nil { + // get the node id + var mod persistence.UpgradingHAGroupWorkload + if err := json.Unmarshal(current, &mod); err != nil { + return fmt.Errorf("Failed to unmarshal ha upgrading workload DB data: %v. Error: %v", string(current), err) + } else { + newNodeId = mod.NodeId + } // if already exit, do nothing (be consistent with postgresql/ha_group_workload.go) return nil } else { @@ -180,7 +189,7 @@ func (db *AgbotBoltDB) InsertHAUpgradingWorkloadForGroupAndPolicy(org string, ha } return nil }) - return dbErr + return newNodeId, dbErr } func haWLUId(orgId string, groupName string, policyName string) string { diff --git a/agreementbot/persistence/database.go b/agreementbot/persistence/database.go index c60b4233c..eaa47a83a 100644 --- a/agreementbot/persistence/database.go +++ b/agreementbot/persistence/database.go @@ -96,18 +96,19 @@ type AgbotDatabase interface { // Functions related to persistence of the state of nodes in ha groups executing node management upgrades. CheckIfGroupPresentAndUpdateHATable(requestingNode UpgradingHAGroupNode) (*UpgradingHAGroupNode, error) + DeleteAllUpgradingHANode() error DeleteHAUpgradeNode(nodeToDelete UpgradingHAGroupNode) error ListUpgradingNodeInGroup(orgId string, groupName string) (*UpgradingHAGroupNode, error) ListAllUpgradingHANode() ([]UpgradingHAGroupNode, error) DeleteHAUpgradeNodeByGroup(orgId string, groupName string) error // Functions related to persistence of the state of workload in ha groups executing service upgrades. + DeleteAllHAUpgradingWorkload() error DeleteHAUpgradingWorkload(workloadToDelete UpgradingHAGroupWorkload) error DeleteHAUpgradingWorkloadsByGroupName(org string, haGroupName string) error - DeleteHAUpgradingWorkloadsByGroupNameAndDeviceId(org string, haGroupName string, deviceId string) error ListHAUpgradingWorkloadsByGroupName(org string, haGroupName string) ([]UpgradingHAGroupWorkload, error) ListAllHAUpgradingWorkloads() ([]UpgradingHAGroupWorkload, error) GetHAUpgradingWorkload(org string, haGroupName string, policyName string) (*UpgradingHAGroupWorkload, error) UpdateHAUpgradingWorkloadForGroupAndPolicy(org string, haGroupName string, policyName string, deviceId string) error - InsertHAUpgradingWorkloadForGroupAndPolicy(org string, haGroupName string, policyName string, deviceId string) error + InsertHAUpgradingWorkloadForGroupAndPolicy(org string, haGroupName string, policyName string, deviceId string) (string, error) } diff --git a/agreementbot/persistence/postgresql/ha_group_node_management.go b/agreementbot/persistence/postgresql/ha_group_node_management.go index 25d6b07df..da070d766 100644 --- a/agreementbot/persistence/postgresql/ha_group_node_management.go +++ b/agreementbot/persistence/postgresql/ha_group_node_management.go @@ -40,6 +40,8 @@ END $$ LANGUAGE plpgsql;` const HA_GROUP_ADD_IF_NOT_PRESENT_BY_FUNCTION = `SELECT * FROM ha_group_add_if_not_present($1,$2,$3,$4);` +const HA_GROUP_DELETE_NODE_ALL = `DELETE FROM ha_group_updates ` + const HA_GROUP_DELETE_NODE = `DELETE FROM ha_group_updates WHERE group_name = $1 AND org_id = $2 AND node_id = $3 AND nmp_id = $4 ` const HA_GROUP_DELETE_NODE_BY_GROUP = `DELETE FROM ha_group_updates WHERE group_name = $1 AND org_id = $2 ` @@ -66,6 +68,11 @@ func (db *AgbotPostgresqlDB) CheckIfGroupPresentAndUpdateHATable(requestingNode } } +func (db *AgbotPostgresqlDB) DeleteAllUpgradingHANode() error { + _, qerr := db.db.Exec(HA_GROUP_DELETE_NODE_ALL) + return qerr +} + func (db *AgbotPostgresqlDB) DeleteHAUpgradeNode(nodeToDelete persistence.UpgradingHAGroupNode) error { _, qerr := db.db.Exec(HA_GROUP_DELETE_NODE, nodeToDelete.GroupName, nodeToDelete.OrgId, nodeToDelete.NodeId, nodeToDelete.NMPName) return qerr diff --git a/agreementbot/persistence/postgresql/ha_group_workload.go b/agreementbot/persistence/postgresql/ha_group_workload.go index ad4fff4f3..180f25507 100644 --- a/agreementbot/persistence/postgresql/ha_group_workload.go +++ b/agreementbot/persistence/postgresql/ha_group_workload.go @@ -20,10 +20,33 @@ const CREATE_HA_WORKLOAD_UPGRADE_MAIN_TABLE = `CREATE TABLE IF NOT EXISTS ha_wor updated timestamp with time zone DEFAULT current_timestamp );` +// Check if the ha group is in the table. If not add the group, node, and policy name that it is upgrading with +// These operations are in the same transaction to prevent a situation where 2 agbots check for the group name, before either can add it to the table +const HA_WORKLOAD_ADD_IF_NOT_PRESENT = ` +CREATE OR REPLACE FUNCTION ha_workload_add_if_not_present( + ha_group_name CHARACTER VARYING, + ha_org_id CHARACTER VARYING, + ha_policy_name CHARACTER VARYING, + ha_node_id CHARACTER VARYING) + RETURNS TABLE(db_node_id text) AS $$ + +BEGIN +LOCK TABLE ha_workload_upgrade; + +IF NOT EXISTS (SELECT node_id FROM ha_workload_upgrade WHERE group_name = ha_group_name AND org_id = ha_org_id AND policy_name = ha_policy_name) THEN + INSERT INTO ha_workload_upgrade (group_name, org_id, policy_name, node_id) VALUES (ha_group_name, ha_org_id, ha_policy_name, ha_node_id); +END IF; + +RETURN QUERY SELECT node_id FROM ha_workload_upgrade WHERE group_name = ha_group_name AND org_id = ha_org_id AND policy_name = ha_policy_name; + +END $$ LANGUAGE plpgsql;` + const HA_WORKLOAD_ADD_IF_NOT_PRESENT_BY_FUNCTION = `SELECT * FROM ha_workload_add_if_not_present($1,$2,$3,$4);` const HA_WORKLOAD_DELETE = `DELETE FROM ha_workload_upgrade WHERE group_name = $1 AND org_id = $2 AND policy_name = $3 AND node_id = $4;` +const HA_WORKLOAD_DELETE_ALL = `DELETE FROM ha_workload_upgrade;` + const HA_WORKLOAD_DELETE_ALL_IN_HA_GROUP = `DELETE FROM ha_workload_upgrade WHERE group_name = $1 AND org_id = $2;` const HA_WORKLOAD_DELETE_ALL_BY_GROUP_AND_NODE = `DELETE FROM ha_workload_upgrade WHERE group_name = $1 AND org_id = $2 AND node_id =$3;` @@ -38,6 +61,11 @@ const HA_WORKLOAD_INSERT = `INSERT INTO ha_workload_upgrade (group_name, org_id, const HA_WORKLOAD_GET_ALL = `SELECT group_name, org_id, policy_name, node_id FROM ha_workload_upgrade;` +func (db *AgbotPostgresqlDB) DeleteAllHAUpgradingWorkload() error { + _, qerr := db.db.Exec(HA_WORKLOAD_DELETE_ALL) + return qerr +} + func (db *AgbotPostgresqlDB) DeleteHAUpgradingWorkload(workloadToDelete persistence.UpgradingHAGroupWorkload) error { _, qerr := db.db.Exec(HA_WORKLOAD_DELETE, workloadToDelete.GroupName, workloadToDelete.OrgId, workloadToDelete.PolicyName, workloadToDelete.NodeId) return qerr @@ -48,11 +76,6 @@ func (db *AgbotPostgresqlDB) DeleteHAUpgradingWorkloadsByGroupName(org string, h return qerr } -func (db *AgbotPostgresqlDB) DeleteHAUpgradingWorkloadsByGroupNameAndDeviceId(org string, haGroupName string, deviceId string) error { - _, qerr := db.db.Exec(HA_WORKLOAD_DELETE_ALL_BY_GROUP_AND_NODE, haGroupName, org, deviceId) - return qerr -} - func (db *AgbotPostgresqlDB) ListHAUpgradingWorkloadsByGroupName(org string, haGroupName string) ([]persistence.UpgradingHAGroupWorkload, error) { upgradingWorkloads := []persistence.UpgradingHAGroupWorkload{} rows, err := db.db.Query(HA_WORKLOAD_GET_ALL_IN_HA_GROUP, haGroupName, org) @@ -131,12 +154,21 @@ func (db *AgbotPostgresqlDB) UpdateHAUpgradingWorkloadForGroupAndPolicy(org stri return nil } -func (db *AgbotPostgresqlDB) InsertHAUpgradingWorkloadForGroupAndPolicy(org string, haGroupName string, policyName string, deviceId string) error { - if _, err := db.db.Exec(HA_WORKLOAD_INSERT, haGroupName, org, policyName, deviceId); err != nil { - return err - } else { - glog.V(2).Infof(fmt.Sprintf("Succeeded creating ha upgrading workload record: %v %v %v %v", org, haGroupName, policyName, deviceId)) +// Check if there is an entry for the given haGroupName, org, policyName. If exists, return the node id of the existing row. If not, insert a new row. +func (db *AgbotPostgresqlDB) InsertHAUpgradingWorkloadForGroupAndPolicy(org string, haGroupName string, policyName string, deviceId string) (string, error) { + var dbNodeId sql.NullString + qerr := db.db.QueryRow(HA_WORKLOAD_ADD_IF_NOT_PRESENT_BY_FUNCTION, haGroupName, org, policyName, deviceId).Scan(&dbNodeId) + + if qerr != nil && qerr != sql.ErrNoRows { + return deviceId, fmt.Errorf("error scanning row for ha workloads currently upgrading in group %v/%v for policy %v. %v", org, haGroupName, policyName, qerr) } - return nil + if !dbNodeId.Valid { + return deviceId, fmt.Errorf("node id returned from ha workload updates table search is not valid") + } + + if dbNodeId.String == deviceId { + glog.V(2).Infof(fmt.Sprintf("Succeeded inserting ha upgrading workload for node %v for %v/%v/%v.", deviceId, org, haGroupName, policyName)) + } + return dbNodeId.String, nil } diff --git a/agreementbot/persistence/postgresql/init.go b/agreementbot/persistence/postgresql/init.go index 742d6b8c4..d28303d4f 100644 --- a/agreementbot/persistence/postgresql/init.go +++ b/agreementbot/persistence/postgresql/init.go @@ -112,6 +112,8 @@ func (db *AgbotPostgresqlDB) Initialize(cfg *config.HorizonConfig) error { // Create the ha group service upgrade table. Do not partition it. if _, err := db.db.Exec(CREATE_HA_WORKLOAD_UPGRADE_MAIN_TABLE); err != nil { return fmt.Errorf("unable to create ha workload upgrade table, error: %v", err) + } else if _, err := db.db.Exec(HA_WORKLOAD_ADD_IF_NOT_PRESENT); err != nil { + return fmt.Errorf("unable to create ha workload add if not present function, error: %v", err) } glog.V(3).Infof("Postgresql primary partition database tables exist.") diff --git a/test/gov/ha_test.sh b/test/gov/ha_test.sh index 73de933f0..634b5a452 100755 --- a/test/gov/ha_test.sh +++ b/test/gov/ha_test.sh @@ -46,92 +46,28 @@ function verify_ha_group_name { function publish_new_netspeed_service { echo -e "\n${PREFIX} publish netspeed service 2.4.0..." - read -d '' sdef <&1) + if [ "${NOVAULT}" != "1" ]; then + NS_FILE_IBM="/root/service_defs/IBM/netspeed_2.3.0_secrets.json" + NS_FILE_E2EDEV="/root/service_defs/e2edev@somecomp.com/netspeed_2.3.0_secrets.json" + else + NS_FILE_IBM="/root/service_defs/IBM/netspeed_2.3.0.json" + NS_FILE_E2EDEV="/root/service_defs/e2edev@somecomp.com/netspeed_2.3.0.json" + fi + export VERS="2.4.0" + export ARCH=${ARCH} + export CPU_IMAGE_NAME="${DOCKER_CPU_INAME}" + export CPU_IMAGE_TAG="${DOCKER_CPU_TAG}" + + res=$(cat ${NS_FILE_IBM} | envsubst | hzn exchange service publish -f- -O -P -o IBM -u ${IBM_ADMIN_AUTH} 2>&1) + if [ $? -ne 0 ]; then + echo -e "\n${PREFIX} failed to create netspeed service version 2.4.0 for IBM org. $res" + exit 2 + fi + + + res=$(cat ${NS_FILE_E2EDEV} | envsubst | hzn exchange service publish -f- -O -P -o e2edev@somecomp.com -u ${E2EDEV_ADMIN_AUTH} 2>&1) if [ $? -ne 0 ]; then - echo -e "\n${PREFIX} failed to create netspeed service version 2.4.0. $res" + echo -e "\n${PREFIX} failed to create netspeed service version 2.4.0 for e2edev@somecomp.com org. $res" exit 2 fi } @@ -265,6 +201,10 @@ function update_sns_pattern { ] } EOF + if [ "${NOVAULT}" == "1" ]; then + sns=$(echo $sns |jq 'del(.secretBinding)') + fi + res=$(echo "$sns" | hzn exchange pattern publish -f- -p sns -o e2edev@somecomp.com -u ${E2EDEV_ADMIN_AUTH} 2>&1) if [ $? -ne 0 ]; then echo -e "\n${PREFIX} failed to update pattern sns with netspeed service 2.4.0. $res" @@ -275,18 +215,18 @@ EOF function update_ns_policy { echo -e "\n${PREFIX} updating deployment policy bp_netspeed with netspeed service 2.4.0..." read -d '' bp_ns <&1) if [ $? -ne 0 ]; then echo -e "\n${PREFIX} failed to update deployment policy bp_netspeed with netspeed service 2.4.0. $res" @@ -416,8 +373,8 @@ EOF function verify_rolling_upgrade { source ./utils.sh + NS_ORG=$1 NS_URL="https://bluehorizon.network/services/netspeed" - NS_ORG="IBM" NS_VERSION="2.4.0" ANAX_API1="http://localhost:8510" ANAX_API2="http://localhost:8511" @@ -465,7 +422,7 @@ if [ "$PATTERN" != "" ]; then update_sns_pattern # check service rolling upgrade - verify_rolling_upgrade + verify_rolling_upgrade "IBM" fi else # add new netspeed service version 2.4.0 to deployment policy bp_netspeed @@ -473,7 +430,7 @@ else update_ns_policy # check rolling upgrade - verify_rolling_upgrade + verify_rolling_upgrade "e2edev@somecomp.com" fi echo -e "${PREFIX} Done" diff --git a/test/gov/start_anax_loop.sh b/test/gov/start_anax_loop.sh index abe53f5fc..ac0731501 100755 --- a/test/gov/start_anax_loop.sh +++ b/test/gov/start_anax_loop.sh @@ -1,25 +1,31 @@ #!/bin/bash -export HZN_AGENT_PORT=8510 +num=$1 + +export HZN_AGENT_PORT=$((8509 + ${num})) + +if [[ "$num" == "1" ]]; then + num="" +fi while (true) do if [ "$OLDANAX" == "1" ] then - echo "Starting OLD Anax1 to run workloads." + echo "Starting OLD Anax to run workloads." if [ ${CERT_LOC} -eq "1" ]; then - /usr/bin/old-anax -v=5 -alsologtostderr=true -config /etc/colonus/anax-combined.config >/tmp/anax.log 2>&1 > /dev/null + /usr/bin/old-anax -v=5 -alsologtostderr=true -config /etc/colonus/anax-combined${num}.config >/tmp/anax${num}.log 2>&1 > /dev/null else - /usr/bin/old-anax -v=5 -alsologtostderr=true -config /etc/colonus/anax-combined-no-cert.config >/tmp/anax.log 2>&1 > /dev/null + /usr/bin/old-anax -v=5 -alsologtostderr=true -config /etc/colonus/anax-combined${num}-no-cert.config >/tmp/anax${num}.log 2>&1 > /dev/null fi else - echo "Starting Anax1 to run workloads." + echo "Starting Anax to run workloads." if [ ${CERT_LOC} -eq "1" ]; then - /usr/local/bin/anax -v=5 -alsologtostderr=true -config /etc/colonus/anax-combined.config >>/tmp/anax.log 2>&1 + /usr/local/bin/anax -v=5 -alsologtostderr=true -config /etc/colonus/anax-combined${num}.config >>/tmp/anax${num}.log 2>&1 else - /usr/local/bin/anax -v=5 -alsologtostderr=true -config /etc/colonus/anax-combined-no-cert.config >>/tmp/anax.log 2>&1 + /usr/local/bin/anax -v=5 -alsologtostderr=true -config /etc/colonus/anax-combined${num}-no-cert.config >>/tmp/anax${num}.log 2>&1 fi fi rc=$? - echo "anax exited with exit code $rc" + echo "${anax} exited with exit code $rc" done diff --git a/test/gov/start_node.sh b/test/gov/start_node.sh index 3afef7180..73061f2f0 100755 --- a/test/gov/start_node.sh +++ b/test/gov/start_node.sh @@ -38,7 +38,7 @@ EOF } -nohup ./start_anax_loop.sh &>/dev/null & +nohup ./start_anax_loop.sh 1 &>/dev/null & sleep 5 @@ -74,22 +74,8 @@ then export HZN_AGENT_PORT=8511 export ANAX_API="http://localhost:${HZN_AGENT_PORT}" - if [ "$OLDANAX" == "1" ] - then - echo "Starting OLD Anax2 to run workloads." - if [ ${CERT_LOC} -eq "1" ]; then - /usr/bin/old-anax -v=5 -alsologtostderr=true -config /etc/colonus/anax-combined2.config >/tmp/anax2.log 2>&1 & - else - /usr/bin/old-anax -v=5 -alsologtostderr=true -config /etc/colonus/anax-combined2-no-cert.config >/tmp/anax2.log 2>&1 & - fi - else - echo "Starting Anax2 to run workloads." - if [ ${CERT_LOC} -eq "1" ]; then - /usr/local/bin/anax -v=5 -alsologtostderr=true -config /etc/colonus/anax-combined2.config >/tmp/anax2.log 2>&1 & - else - /usr/local/bin/anax -v=5 -alsologtostderr=true -config /etc/colonus/anax-combined2-no-cert.config >/tmp/anax2.log 2>&1 & - fi - fi + # start anax2 + nohup ./start_anax_loop.sh 2 &>/dev/null & sleep 5