diff --git a/agreementbot/agreementbot.go b/agreementbot/agreementbot.go index 92bde07d9..1a37cf00a 100644 --- a/agreementbot/agreementbot.go +++ b/agreementbot/agreementbot.go @@ -1046,6 +1046,8 @@ func (w *AgreementBotWorker) syncOnInit() error { if agreements, err := w.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter()}, agp); err == nil { neededBCInstances := make(map[string]map[string]map[string]bool) + bPolicyCheckingMap := make(map[string]bool) + bPolicyMessageMap := make(map[string]events.Message) for _, ag := range agreements { @@ -1061,11 +1063,13 @@ func (w *AgreementBotWorker) syncOnInit() error { } neededBCInstances[bcOrg][bcType][bcName] = true + var pol *policy.Policy + // If the agreement has received a reply then we just need to make sure that the policy manager's agreement counts // are correct. Even for already timedout agreements, the governance process will cleanup old and outdated agreements, // so we don't need to do anything here. if ag.AgreementCreationTime != 0 { - if pol, err := policy.DemarshalPolicy(ag.Policy); err != nil { + if pol, err = policy.DemarshalPolicy(ag.Policy); err != nil { glog.Errorf(AWlogString(fmt.Sprintf("unable to demarshal policy for agreement %v, error %v", ag.CurrentAgreementId, err))) } else if existingPol := w.pm.GetPolicy(ag.Org, pol.Header.Name); existingPol == nil { glog.Errorf(AWlogString(fmt.Sprintf("agreement %v has a policy %v that doesn't exist anymore", ag.CurrentAgreementId, pol.Header.Name))) @@ -1119,6 +1123,39 @@ func (w *AgreementBotWorker) syncOnInit() error { glog.V(3).Infof(AWlogString(fmt.Sprintf("added agreement %v to policy agreement counter.", ag.CurrentAgreementId))) } + // After checking the policy, add it in to a map. In Each for loop which iterate the agreements, checking if current policy inside agreement has been handled or not + if pol != nil && !bPolicyCheckingMap[pol.Header.Name] { + glog.V(3).Infof(AWlogString(fmt.Sprintf("checking policy against exchange for agreement %v.", ag.CurrentAgreementId))) + if exchPols, err := exchange.GetBusinessPolicies(w, exchange.GetOrg(pol.Header.Name), exchange.GetId(pol.Header.Name)); err != nil { + glog.Errorf(AWlogString(fmt.Sprintf("error getting business policies from exchange for org: %v, policy name: %v error: %v", ag.Org, pol.Header.Name, err))) + } else if len(exchPols) == 0 { + glog.V(3).Infof(AWlogString(fmt.Sprintf("business policy %v from agreement %v is not found from exchange.", pol.Header.Name, ag.CurrentAgreementId))) + // Need to cancel the agreement + policyDeletedMsg := events.NewPolicyDeletedMessage(events.DELETED_POLICY, "", pol.Header.Name, exchange.GetOrg(pol.Header.Name), ag.Policy) + bPolicyMessageMap[pol.Header.Name] = policyDeletedMsg + } else { + for polId, exchPol := range exchPols { + bPol := exchPol.GetBusinessPolicy() + if exPolicy, err := bPol.GenPolicyFromBusinessPolicy(polId); err != nil { + glog.Errorf(AWlogString(fmt.Sprintf("error generating internal business policies for org: %v, policy name: %v from %v, error: %v", ag.Org, pol.Header.Name, exchPol.String(), err))) + } else if exPolicy == nil { + glog.Errorf(AWlogString(fmt.Sprintf("the generated internal business policies is nil for org: %v, policy name: %v from %v", ag.Org, pol.Header.Name, exchPol.String()))) + } else if exPolicyString, err := policy.MarshalPolicy(exPolicy); err != nil { + glog.Errorf(fmt.Sprintf("Error trying to marshal internal business policy %v error: %v", exPolicy, err)) + } else { + // If business policy has been changed during a restart, handle it + glog.V(3).Infof(AWlogString(fmt.Sprintf("need re-evaluate the agreement %v for policy %v", ag.CurrentAgreementId, pol.Header.Name))) + + policyChangedMsg := events.NewPolicyChangedMessage(events.CHANGED_POLICY, "", pol.Header.Name, ag.Org, exPolicyString, pol) + bPolicyMessageMap[pol.Header.Name] = policyChangedMsg + } + } + } + bPolicyCheckingMap[pol.Header.Name] = true + } else { + glog.V(3).Infof(AWlogString(fmt.Sprintf("skip checking policy %v for agreement %v", pol, ag.CurrentAgreementId))) + } + // This state should never occur, but could if there was an error along the way. It means that a DB record // was created for this agreement but the record was never updated with the creation time, which is supposed to occur // immediately following creation of the record. Further, if this were to occur, then the exchange should not have been @@ -1142,6 +1179,11 @@ func (w *AgreementBotWorker) syncOnInit() error { } } + glog.V(3).Infof(AWlogString(fmt.Sprintf("policies that might have changed: %v", bPolicyMessageMap))) + for _, msg := range bPolicyMessageMap { + w.queuePolicyCommand(msg) + } + } else { return errors.New(AWlogString(fmt.Sprintf("error searching database: %v", err))) } @@ -1151,6 +1193,23 @@ func (w *AgreementBotWorker) syncOnInit() error { return nil } +func (w *AgreementBotWorker) queuePolicyCommand(message events.Message) { + switch message.(type) { + case *events.PolicyChangedMessage: + pcm, _ := message.(*events.PolicyChangedMessage) + pcCmd := NewPolicyChangedCommand(*pcm) + w.Commands <- pcCmd + + case *events.PolicyDeletedMessage: + pdm, _ := message.(*events.PolicyDeletedMessage) + pdCmd := NewPolicyDeletedCommand(*pdm) + w.Commands <- pdCmd + + default: //nothing + } + +} + func (w *AgreementBotWorker) recordConsumerAgreementState(agreementId string, pol *policy.Policy, org string, state string) error { workload := pol.Workloads[0].WorkloadURL @@ -1159,7 +1218,7 @@ func (w *AgreementBotWorker) recordConsumerAgreementState(agreementId string, po as := new(exchange.PutAgbotAgreementState) as.Service = exchange.WorkloadAgreement{ - Org: exchange.GetOrg(pol.PatternId), + Org: org, Pattern: exchange.GetId(pol.PatternId), URL: workload, } @@ -1478,7 +1537,6 @@ func (w *AgreementBotWorker) databaseHeartBeat() int { // Ask the database to check for stale partitions and move them into our partition if one is found. func (w *AgreementBotWorker) stalePartitions() int { - // Dont try to grab a stale partition if we are unable to heartbeat. now := uint64(time.Now().Unix()) if hb, err := w.db.GetHeartbeat(); err != nil { diff --git a/agreementbot/persistence/postgresql/partition.go b/agreementbot/persistence/postgresql/partition.go index b78188f0c..7060a1502 100644 --- a/agreementbot/persistence/postgresql/partition.go +++ b/agreementbot/persistence/postgresql/partition.go @@ -262,7 +262,9 @@ func (db *AgbotPostgresqlDB) MovePartition(timeout uint64) (bool, error) { return false, err } else if _, err := tx.Exec(db.GetWorkloadUsagePartitionMove(fromPartition, db.PrimaryPartition())); err != nil { return false, err - } else if _, err := tx.Exec(db.GetSecretPartitionMove(fromPartition, db.PrimaryPartition())); err != nil { + } else if _, err := tx.Exec(db.GetSecretPartitionMovePattern(fromPartition, db.PrimaryPartition())); err != nil { + return false, err + } else if _, err := tx.Exec(db.GetSecretPartitionMovePolicy(fromPartition, db.PrimaryPartition())); err != nil { return false, err } else if _, err := tx.Exec(db.GetAgreementPartitionTableDrop(fromPartition)); err != nil { return false, err diff --git a/agreementbot/persistence/postgresql/secrets.go b/agreementbot/persistence/postgresql/secrets.go index fb0040f93..ea24d5967 100644 --- a/agreementbot/persistence/postgresql/secrets.go +++ b/agreementbot/persistence/postgresql/secrets.go @@ -63,11 +63,18 @@ const SECRET_UPDATE_TIME_PATTERN = `UPDATE "secrets_pattern_ SET last_update_che const SECRET_EXISTS_UPDATE_TIME_PATTERN = `UPDATE "secrets_pattern_ SET last_update_check = $1, secret_exists = false WHERE secret_org = $2 AND secret_name = $3 AND secret_exists = $4;` const SECRET_DELETE_PATTERN = `DELETE FROM "secrets_pattern_ WHERE secret_org = $1 AND secret_name = $2 AND pattern_org = $3 AND pattern_name = $4;` -const SECRET_MOVE = `WITH moved_rows AS ( +const SECRET_MOVE_POLICY = `WITH moved_rows AS ( + DELETE FROM "secrets_policy_ a + RETURNING a.secret_org, a.secret_name, a.policy_org, a.policy_name, a.last_update_check, a.secret_exists +) +INSERT INTO "secrets_policy_ (secret_org, secret_name, policy_org, policy_name, last_update_check, secret_exists, partition) SELECT secret_org, secret_name, policy_org, policy_name, last_update_check, secret_exists, 'partition_name' FROM moved_rows WHERE secret_org <> policy_org ON CONFLICT DO NOTHING; +` + +const SECRET_MOVE_PATTERN = `WITH moved_rows AS ( DELETE FROM "secrets_pattern_ a - RETURNING a.secret_org, a.secret_name, a.pattern_org, a.pattern_name, a.last_update_check + RETURNING a.secret_org, a.secret_name, a.pattern_org, a.pattern_name, a.last_update_check, a.secret_exists ) -INSERT INTO "secrets_pattern_ (secret_org, secret_name, pattern_org, pattern_name, last_update_check, partition) SELECT secret_org, secret_name, pattern_org, pattern_name, last_update_check, 'partition_name' FROM moved_rows WHERE secret_org <> pattern_org ON CONFLICT DO NOTHING; +INSERT INTO "secrets_pattern_ (secret_org, secret_name, pattern_org, pattern_name, last_update_check, secret_exists, partition) SELECT secret_org, secret_name, pattern_org, pattern_name, last_update_check, secret_exists, 'partition_name' FROM moved_rows WHERE secret_org <> pattern_org ON CONFLICT DO NOTHING; ` const SECRET_DROP_PARTITION_POLICY = `DROP TABLE "secrets_policy_;` @@ -224,13 +231,21 @@ func (db *AgbotPostgresqlDB) GetDeleteSecretPattern() string { } // The partition table name replacement scheme used in this function is slightly different from the others above. -func (db *AgbotPostgresqlDB) GetSecretPartitionMove(fromPartition string, toPartition string) string { - sql := strings.Replace(SECRET_MOVE, SECRET_TABLE_NAME_ROOT_PATTERN, db.GetSecretPartitionTableNamePattern(toPartition), 2) +func (db *AgbotPostgresqlDB) GetSecretPartitionMovePattern(fromPartition string, toPartition string) string { + sql := strings.Replace(SECRET_MOVE_PATTERN, SECRET_TABLE_NAME_ROOT_PATTERN, db.GetSecretPartitionTableNamePattern(toPartition), 2) sql = strings.Replace(sql, db.GetSecretPartitionTableNamePattern(toPartition), db.GetSecretPartitionTableNamePattern(fromPartition), 1) sql = strings.Replace(sql, SECRET_PARTITION_FILLIN, toPartition, 1) return sql } +// The partition table name replacement scheme used in this function is slightly different from the others above. +func (db *AgbotPostgresqlDB) GetSecretPartitionMovePolicy(fromPartition string, toPartition string) string { + sql := strings.Replace(SECRET_MOVE_POLICY, SECRET_TABLE_NAME_ROOT_POLICY, db.GetSecretPartitionTableNamePolicy(toPartition), 2) + sql = strings.Replace(sql, db.GetSecretPartitionTableNamePolicy(toPartition), db.GetSecretPartitionTableNamePolicy(fromPartition), 1) + sql = strings.Replace(sql, SECRET_PARTITION_FILLIN, toPartition, 1) + return sql +} + func (db *AgbotPostgresqlDB) GetManagedPolicySecretNames(policyOrg, policyName string) ([]string, error) { sql := "" if policyOrg == "" {