Skip to content

Commit

Permalink
Issue 3457 - Feature Request: Clear ha group management and workload …
Browse files Browse the repository at this point in the history
…tables (#3551)

Signed-off-by: linggao <linggao@us.ibm.com>

Signed-off-by: linggao <linggao@us.ibm.com>
  • Loading branch information
linggao authored Nov 15, 2022
1 parent bcb9560 commit 177bbbd
Show file tree
Hide file tree
Showing 12 changed files with 284 additions and 176 deletions.
109 changes: 100 additions & 9 deletions agreementbot/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,11 @@ func (a *API) listen(apiListen string) {
router.HandleFunc("/cache/deploymentpol", a.ListDeploy).Methods("GET", "OPTIONS")
router.HandleFunc("/cache/deploymentpol/{org}", a.ListDeploy).Methods("GET", "OPTIONS")
router.HandleFunc("/cache/deploymentpol/{org}/{name}", a.ListDeploy).Methods("GET", "OPTIONS")
router.HandleFunc("/ha/upgradingwlu", a.ha_upgrading_wlu).Methods("GET", "OPTIONS")
router.HandleFunc("/ha/upgradingnode", a.ha_upgrading_node).Methods("GET", "OPTIONS")
router.HandleFunc("/ha/upgradingwlu", a.ha_upgrading_wlu).Methods("GET", "DELETE", "OPTIONS")
router.HandleFunc("/ha/upgradingwlu/{org}/{group_name}", a.ha_upgrading_wlu).Methods("GET", "DELETE", "OPTIONS")
router.HandleFunc("/ha/upgradingwlu/{org}/{group_name}/{policy_name}", a.ha_upgrading_wlu).Methods("GET", "DELETE", "OPTIONS")
router.HandleFunc("/ha/upgradingnode", a.ha_upgrading_node).Methods("GET", "DELETE", "OPTIONS")
router.HandleFunc("/ha/upgradingnode/{org}/{group_name}", a.ha_upgrading_node).Methods("GET", "DELETE", "OPTIONS")

if err := http.ListenAndServe(apiListen, nocache(router)); err != nil {
glog.Fatalf(APIlogString(fmt.Sprintf("failed to start listener on %v, error %v", apiListen, err)))
Expand Down Expand Up @@ -849,21 +852,72 @@ func (a *API) partition(w http.ResponseWriter, r *http.Request) {
}
}

// List all the entries in the ha_workload_upgrade table. They are the HA nodes
// List ot delete the entries in the ha_workload_upgrade table. They are the HA nodes
// in which the workload is being upgraded.
func (a *API) ha_upgrading_wlu(w http.ResponseWriter, r *http.Request) {

glog.V(5).Infof(APIlogString(fmt.Sprintf("Handling %v on HA upgrading workloads.", r.Method)))

switch r.Method {
case "GET":
if ha_wlu, err := a.db.ListAllHAUpgradingWorkloads(); err != nil {
glog.Error(APIlogString(fmt.Sprintf("error finding all HA nodes in which the workload is being upgraded, error: %v", err)))
pathVars := mux.Vars(r)
orgID := pathVars["org"]
groupName := pathVars["group_name"]
policyName := pathVars["policy_name"]

var err error
var ha_wlu []persistence.UpgradingHAGroupWorkload
if orgID == "" {
ha_wlu, err = a.db.ListAllHAUpgradingWorkloads()
} else if policyName == "" {
ha_wlu, err = a.db.ListHAUpgradingWorkloadsByGroupName(orgID, groupName)
} else {
tmp_wlu, err := a.db.GetHAUpgradingWorkload(orgID, groupName, policyName)
if err == nil {
if tmp_wlu == nil {
ha_wlu = []persistence.UpgradingHAGroupWorkload{}
} else {
ha_wlu = []persistence.UpgradingHAGroupWorkload{*tmp_wlu}
}
}
}

if err != nil {
glog.Error(APIlogString(fmt.Sprintf("error finding HA nodes in which the workload is being upgraded, error: %v", err)))
http.Error(w, "Internal server error", http.StatusInternalServerError)
} else {
// write output
writeResponse(w, ha_wlu, http.StatusOK)
}
case "DELETE":
pathVars := mux.Vars(r)
orgID := pathVars["org"]
groupName := pathVars["group_name"]
policyName := pathVars["policy_name"]

var err error
if orgID == "" {
err = a.db.DeleteAllHAUpgradingWorkload()
} else if policyName == "" {
err = a.db.DeleteHAUpgradingWorkloadsByGroupName(orgID, groupName)
} else {
var tmp_wlu *persistence.UpgradingHAGroupWorkload
tmp_wlu, err = a.db.GetHAUpgradingWorkload(orgID, groupName, policyName)
if err == nil && tmp_wlu != nil {
err = a.db.DeleteHAUpgradingWorkload(*tmp_wlu)
}
}

if err != nil {
glog.Error(APIlogString(fmt.Sprintf("error deleting HA nodes in which the workload is being upgraded, error: %v", err)))
http.Error(w, "Internal server error", http.StatusInternalServerError)
} else {
// write output
w.WriteHeader(http.StatusOK)
}

case "OPTIONS":
w.Header().Set("Allow", "GET, OPTIONS")
w.Header().Set("Allow", "GET, DELETE, OPTIONS")
w.WriteHeader(http.StatusOK)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
Expand All @@ -874,17 +928,54 @@ func (a *API) ha_upgrading_wlu(w http.ResponseWriter, r *http.Request) {
// the nodes that are being upgraded when the node is in a HA group.
func (a *API) ha_upgrading_node(w http.ResponseWriter, r *http.Request) {

glog.V(5).Infof(APIlogString(fmt.Sprintf("Handling %v on HA upgrading nodes.", r.Method)))

pathVars := mux.Vars(r)
orgID := pathVars["org"]
groupName := pathVars["group_name"]

switch r.Method {
case "GET":
if ha_nodes, err := a.db.ListAllUpgradingHANode(); err != nil {
glog.Error(APIlogString(fmt.Sprintf("error finding all upgrading HA nodes, error: %v", err)))
var err error
var ha_nodes []persistence.UpgradingHAGroupNode
if orgID == "" {
ha_nodes, err = a.db.ListAllUpgradingHANode()
} else {
var tmp_node *persistence.UpgradingHAGroupNode
tmp_node, err = a.db.ListUpgradingNodeInGroup(orgID, groupName)
if err == nil {
if tmp_node == nil {
ha_nodes = []persistence.UpgradingHAGroupNode{}
} else {
ha_nodes = []persistence.UpgradingHAGroupNode{*tmp_node}
}
}
}

if err != nil {
glog.Error(APIlogString(fmt.Sprintf("error finding upgrading HA nodes, error: %v", err)))
http.Error(w, "Internal server error", http.StatusInternalServerError)
} else {
// write output
writeResponse(w, ha_nodes, http.StatusOK)
}
case "DELETE":
var err error
if orgID == "" {
err = a.db.DeleteAllUpgradingHANode()
} else {
err = a.db.DeleteHAUpgradeNodeByGroup(orgID, groupName)
}

if err != nil {
glog.Error(APIlogString(fmt.Sprintf("error deleting upgrading HA nodes, error: %v", err)))
http.Error(w, "Internal server error", http.StatusInternalServerError)
} else {
// write output
w.WriteHeader(http.StatusOK)
}
case "OPTIONS":
w.Header().Set("Allow", "GET, OPTIONS")
w.Header().Set("Allow", "GET, DELETE, OPTIONS")
w.WriteHeader(http.StatusOK)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
Expand Down
12 changes: 7 additions & 5 deletions agreementbot/consumer_protocol_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChanged(cmd *PolicyChangedComm
if ag.Pattern == "" {
agStillValid = b.HandlePolicyChangeForAgreement(ag, cph)
}

if !agStillValid {
glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("agreement %v has a policy %v that has changed incompatibly. Cancelling agreement: %v", ag.CurrentAgreementId, pol.Header.Name, err)))
b.CancelAgreement(ag, TERM_REASON_POLICY_CHANGED, cph)
Expand Down Expand Up @@ -535,18 +535,20 @@ func (b *BaseConsumerProtocolHandler) CancelAgreement(ag persistence.Agreement,

// put this workload in HA workload upgrading table
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("inserting HA upgrading workloads with hagroup %v, org: %v, policyName: %v deviceId: %v", theDev.HAGroup, ag.Org, ag.PolicyName, ag.DeviceId)))
if err = b.db.InsertHAUpgradingWorkloadForGroupAndPolicy(deviceAndGroupOrg, theDev.HAGroup, ag.PolicyName, ag.DeviceId); err != nil {
// might not be an error if the entry is already added by another agbot
glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("unable to insert HA upgrading workloads with hagroup %v, org: %v, policyName: %v deviceId: %v, error: %v", theDev.HAGroup, ag.Org, ag.PolicyName, ag.DeviceId, err)))
if currentNodeId, err := b.db.InsertHAUpgradingWorkloadForGroupAndPolicy(deviceAndGroupOrg, theDev.HAGroup, ag.PolicyName, ag.DeviceId); err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("unable to insert HA upgrading workloads with hagroup %v, org: %v, policyName: %v deviceId: %v, error: %v", theDev.HAGroup, ag.Org, ag.PolicyName, ag.DeviceId, err)))
return
} else {
} else if currentNodeId == ag.DeviceId {
glog.V(5).Infof(BCPHlogstring(b.Name(), fmt.Sprintf("delete workloadusage and cancel agreement for: org: %v, hagroup: %v, policyName: %v deviceId: %v", ag.Org, theDev.HAGroup, ag.PolicyName, ag.DeviceId)))
if err := b.db.DeleteWorkloadUsage(ag.DeviceId, ag.PolicyName); err != nil {
glog.Warningf(BCPHlogstring(b.Name(), fmt.Sprintf("error deleting workload usage for %v using policy %v, error: %v", ag.DeviceId, ag.PolicyName, err)))
}
agreementWork := NewCancelAgreement(ag.CurrentAgreementId, ag.AgreementProtocol, cph.GetTerminationCode(reason), 0)
cph.WorkQueue().InboundHigh() <- &agreementWork
return
} else {
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("unable to insert HA upgrading workloads with hagroup %v, org: %v, policyName: %v deviceId: %v because there is another node %v exists already in the table.", theDev.HAGroup, ag.Org, ag.PolicyName, ag.DeviceId, currentNodeId)))
return
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions agreementbot/governance.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,12 @@ func (w *AgreementBotWorker) governHAPartners() {
} else if currentUpgradingWorkloadForGroup == nil {
glog.V(5).Infof(logString(fmt.Sprintf("no workload is upgrading for hagroup %v, now upgrade the workload: %v.", device.HAGroup, wlu.String())))
// insert this workload into ha workload upgrade table, then upgrade this workload
if err = w.db.InsertHAUpgradingWorkloadForGroupAndPolicy(org, haGroupName, wlu.PolicyName, wlu.DeviceId); err != nil {
// might not be an error if the insertion was done by another agbot before this one
if currentNodeId, err := w.db.InsertHAUpgradingWorkloadForGroupAndPolicy(org, haGroupName, wlu.PolicyName, wlu.DeviceId); err != nil {
glog.Warningf(logString(fmt.Sprintf("unable to insert HA upgrading workloads with hagroup %v, org: %v, policyName: %v deviceId: %v. %v", haGroupName, org, wlu.PolicyName, wlu.DeviceId, err)))
} else {
} else if currentNodeId == wlu.DeviceId {
w.UpgradeWorkload(wlu)
} else {
glog.Infof(logString(fmt.Sprintf("unable to insert HA upgrading workloads with hagroup %v, org: %v, policyName: %v deviceId: %v because there is another node %v exists already in the table.", haGroupName, org, wlu.PolicyName, wlu.DeviceId, currentNodeId)))
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions agreementbot/persistence/bolt/ha_group_node_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ func (db *AgbotBoltDB) CheckIfGroupPresentAndUpdateHATable(requestingNode persis
return &updatedDBNode, dbErr
}

func (db *AgbotBoltDB) DeleteAllUpgradingHANode() error {
if upgradingHANodes, err := db.ListAllUpgradingHANode(); err != nil {
return err
} else if len(upgradingHANodes) != 0 {
for _, uNode := range upgradingHANodes {
// delete upgrading ha workload
if err = db.DeleteHAUpgradeNode(uNode); err != nil {
return err
}
}
}
return nil
}

func (db *AgbotBoltDB) DeleteHAUpgradeNode(nodeToDelete persistence.UpgradingHAGroupNode) error {
return db.db.Update(func(tx *bolt.Tx) error {
if b := tx.Bucket([]byte(HABUCKET)); b == nil {
Expand Down
41 changes: 25 additions & 16 deletions agreementbot/persistence/bolt/ha_group_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,8 @@ import (

const HA_WORKLOAD_USAGE_BUCKET = "ha_workload_usage"

func (db *AgbotBoltDB) DeleteHAUpgradingWorkload(workloadToDelete persistence.UpgradingHAGroupWorkload) error {
return db.db.Update(func(tx *bolt.Tx) error {
if b := tx.Bucket([]byte(HA_WORKLOAD_USAGE_BUCKET)); b == nil {
return fmt.Errorf("Unknown bucket %v", HA_WORKLOAD_USAGE_BUCKET)
} else {
return b.Delete([]byte(haWLUId(workloadToDelete.OrgId, workloadToDelete.GroupName, workloadToDelete.PolicyName)))
}
})
}

func (db *AgbotBoltDB) DeleteHAUpgradingWorkloadsByGroupName(org string, haGroupName string) error {
if upgradingHAWorkloads, err := db.FindHAUpgradeWorkloadsWithFilters([]persistence.HAWorkloadUpgradeFilter{persistence.HAWorkloadUpgradeGroupFilter(org, haGroupName)}); err != nil {
func (db *AgbotBoltDB) DeleteAllHAUpgradingWorkload() error {
if upgradingHAWorkloads, err := db.ListAllHAUpgradingWorkloads(); err != nil {
return err
} else if len(upgradingHAWorkloads) != 0 {
for _, upgrupgradingHAWorkload := range upgradingHAWorkloads {
Expand All @@ -34,8 +24,18 @@ func (db *AgbotBoltDB) DeleteHAUpgradingWorkloadsByGroupName(org string, haGroup
return nil
}

func (db *AgbotBoltDB) DeleteHAUpgradingWorkloadsByGroupNameAndDeviceId(org string, haGroupName string, deviceId string) error {
if upgradingHAWorkloads, err := db.FindHAUpgradeWorkloadsWithFilters([]persistence.HAWorkloadUpgradeFilter{persistence.HAWorkloadUpgradeGroupAndNodeFilter(org, haGroupName, deviceId)}); err != nil {
func (db *AgbotBoltDB) DeleteHAUpgradingWorkload(workloadToDelete persistence.UpgradingHAGroupWorkload) error {
return db.db.Update(func(tx *bolt.Tx) error {
if b := tx.Bucket([]byte(HA_WORKLOAD_USAGE_BUCKET)); b == nil {
return fmt.Errorf("Unknown bucket %v", HA_WORKLOAD_USAGE_BUCKET)
} else {
return b.Delete([]byte(haWLUId(workloadToDelete.OrgId, workloadToDelete.GroupName, workloadToDelete.PolicyName)))
}
})
}

func (db *AgbotBoltDB) DeleteHAUpgradingWorkloadsByGroupName(org string, haGroupName string) error {
if upgradingHAWorkloads, err := db.FindHAUpgradeWorkloadsWithFilters([]persistence.HAWorkloadUpgradeFilter{persistence.HAWorkloadUpgradeGroupFilter(org, haGroupName)}); err != nil {
return err
} else if len(upgradingHAWorkloads) != 0 {
for _, upgrupgradingHAWorkload := range upgradingHAWorkloads {
Expand Down Expand Up @@ -156,14 +156,23 @@ func (db *AgbotBoltDB) UpdateHAUpgradingWorkloadForGroupAndPolicy(org string, ha
}
}

func (db *AgbotBoltDB) InsertHAUpgradingWorkloadForGroupAndPolicy(org string, haGroupName string, policyName string, deviceId string) error {
// Check if there is an entry for the given haGroupName, org, policyName. If exists, return the node id of the existing row. If not, insert a new row.
func (db *AgbotBoltDB) InsertHAUpgradingWorkloadForGroupAndPolicy(org string, haGroupName string, policyName string, deviceId string) (string, error) {
key := haWLUId(org, haGroupName, policyName)
newNodeId := deviceId
dbErr := db.db.Update(func(tx *bolt.Tx) error {
if b, err := tx.CreateBucketIfNotExists([]byte(HA_WORKLOAD_USAGE_BUCKET)); err != nil {
return err
} else {
current := b.Get([]byte(key))
if current != nil {
// get the node id
var mod persistence.UpgradingHAGroupWorkload
if err := json.Unmarshal(current, &mod); err != nil {
return fmt.Errorf("Failed to unmarshal ha upgrading workload DB data: %v. Error: %v", string(current), err)
} else {
newNodeId = mod.NodeId
}
// if already exit, do nothing (be consistent with postgresql/ha_group_workload.go)
return nil
} else {
Expand All @@ -180,7 +189,7 @@ func (db *AgbotBoltDB) InsertHAUpgradingWorkloadForGroupAndPolicy(org string, ha
}
return nil
})
return dbErr
return newNodeId, dbErr
}

func haWLUId(orgId string, groupName string, policyName string) string {
Expand Down
5 changes: 3 additions & 2 deletions agreementbot/persistence/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,19 @@ type AgbotDatabase interface {

// Functions related to persistence of the state of nodes in ha groups executing node management upgrades.
CheckIfGroupPresentAndUpdateHATable(requestingNode UpgradingHAGroupNode) (*UpgradingHAGroupNode, error)
DeleteAllUpgradingHANode() error
DeleteHAUpgradeNode(nodeToDelete UpgradingHAGroupNode) error
ListUpgradingNodeInGroup(orgId string, groupName string) (*UpgradingHAGroupNode, error)
ListAllUpgradingHANode() ([]UpgradingHAGroupNode, error)
DeleteHAUpgradeNodeByGroup(orgId string, groupName string) error

// Functions related to persistence of the state of workload in ha groups executing service upgrades.
DeleteAllHAUpgradingWorkload() error
DeleteHAUpgradingWorkload(workloadToDelete UpgradingHAGroupWorkload) error
DeleteHAUpgradingWorkloadsByGroupName(org string, haGroupName string) error
DeleteHAUpgradingWorkloadsByGroupNameAndDeviceId(org string, haGroupName string, deviceId string) error
ListHAUpgradingWorkloadsByGroupName(org string, haGroupName string) ([]UpgradingHAGroupWorkload, error)
ListAllHAUpgradingWorkloads() ([]UpgradingHAGroupWorkload, error)
GetHAUpgradingWorkload(org string, haGroupName string, policyName string) (*UpgradingHAGroupWorkload, error)
UpdateHAUpgradingWorkloadForGroupAndPolicy(org string, haGroupName string, policyName string, deviceId string) error
InsertHAUpgradingWorkloadForGroupAndPolicy(org string, haGroupName string, policyName string, deviceId string) error
InsertHAUpgradingWorkloadForGroupAndPolicy(org string, haGroupName string, policyName string, deviceId string) (string, error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ END $$ LANGUAGE plpgsql;`

const HA_GROUP_ADD_IF_NOT_PRESENT_BY_FUNCTION = `SELECT * FROM ha_group_add_if_not_present($1,$2,$3,$4);`

const HA_GROUP_DELETE_NODE_ALL = `DELETE FROM ha_group_updates `

const HA_GROUP_DELETE_NODE = `DELETE FROM ha_group_updates WHERE group_name = $1 AND org_id = $2 AND node_id = $3 AND nmp_id = $4 `

const HA_GROUP_DELETE_NODE_BY_GROUP = `DELETE FROM ha_group_updates WHERE group_name = $1 AND org_id = $2 `
Expand All @@ -66,6 +68,11 @@ func (db *AgbotPostgresqlDB) CheckIfGroupPresentAndUpdateHATable(requestingNode
}
}

func (db *AgbotPostgresqlDB) DeleteAllUpgradingHANode() error {
_, qerr := db.db.Exec(HA_GROUP_DELETE_NODE_ALL)
return qerr
}

func (db *AgbotPostgresqlDB) DeleteHAUpgradeNode(nodeToDelete persistence.UpgradingHAGroupNode) error {
_, qerr := db.db.Exec(HA_GROUP_DELETE_NODE, nodeToDelete.GroupName, nodeToDelete.OrgId, nodeToDelete.NodeId, nodeToDelete.NMPName)
return qerr
Expand Down
Loading

0 comments on commit 177bbbd

Please sign in to comment.