From c1ad129e2bec7d799c35fbf2c2ae6285f5dd5fc7 Mon Sep 17 00:00:00 2001 From: "luca.morgese@tno.nl" Date: Fri, 2 Aug 2024 16:47:40 +0200 Subject: [PATCH] separate cache access functions from interface functions --- .../downstream_reporter/cache/cache.go | 238 +++++++++++------- .../downstream_reporter/cache_test.go | 6 +- 2 files changed, 155 insertions(+), 89 deletions(-) diff --git a/internal/reporter/downstream_reporter/cache/cache.go b/internal/reporter/downstream_reporter/cache/cache.go index 8c299dd2..51ad69c8 100644 --- a/internal/reporter/downstream_reporter/cache/cache.go +++ b/internal/reporter/downstream_reporter/cache/cache.go @@ -3,6 +3,8 @@ package cache import ( b64 "encoding/base64" "errors" + "fmt" + "slices" "soarca/models/cacao" cache_report "soarca/models/cache" itime "soarca/utils/time" @@ -31,10 +33,35 @@ func New(timeUtil itime.ITime, maxExecutions int) *Cache { } } -// Util for retrieval +// ############################### Atomic cache access operations (mutex-protection) + +func (cacheReporter *Cache) getAllExecutions() ([]cache_report.ExecutionEntry, error) { + executions := make([]cache_report.ExecutionEntry, 0) + // NOTE: fetched via fifo register key reference as is ordered array, + // this is needed to test and report back ordered executions stored + + // Lock + cacheReporter.mutex.Lock() + for _, executionEntryKey := range cacheReporter.fifoRegister { + // NOTE: cached executions are passed by reference, so they must not be modified + entry, ok := cacheReporter.Cache[executionEntryKey] + if !ok { + return []cache_report.ExecutionEntry{}, errors.New("internal error. cache fifo register and cache executions mismatch") + } + executions = append(executions, entry) + } + cacheReporter.mutex.Unlock() + // Unlocked + + return executions, nil +} + func (cacheReporter *Cache) getExecution(executionKey uuid.UUID) (cache_report.ExecutionEntry, error) { + executionKeyStr := executionKey.String() + // No need for mutex as is one-line access executionEntry, ok := cacheReporter.Cache[executionKeyStr] + if !ok { err := errors.New("execution is not in cache. consider increasing cache size") return cache_report.ExecutionEntry{}, err @@ -52,6 +79,13 @@ func (cacheReporter *Cache) addExecutionFIFO(newExecutionEntry cache_report.Exec newExecutionEntryKey := newExecutionEntry.ExecutionId.String() + // Lock + cacheReporter.mutex.Lock() + defer cacheReporter.mutex.Unlock() + + if _, ok := cacheReporter.Cache[newExecutionEntryKey]; ok { + return errors.New("there is already an execution in the cache with the same execution id") + } if len(cacheReporter.fifoRegister) >= cacheReporter.Size { firstExecution := cacheReporter.fifoRegister[0] @@ -61,35 +95,123 @@ func (cacheReporter *Cache) addExecutionFIFO(newExecutionEntry cache_report.Exec cacheReporter.Cache[newExecutionEntryKey] = newExecutionEntry return nil + // Unlocked } - cacheReporter.fifoRegister = append(cacheReporter.fifoRegister, newExecutionEntryKey) cacheReporter.Cache[newExecutionEntryKey] = newExecutionEntry + return nil + // Unlocked } -func (cacheReporter *Cache) GetExecutions() ([]cache_report.ExecutionEntry, error) { +func (cacheReporter *Cache) upateEndExecutionWorkflow(executionId uuid.UUID, workflowError error) error { + // The cache should stay locked for the whole modification period + // in order to prevent e.g. the execution data being popped-out due to FIFO + // while its status or some of its steps are being updated + + // Lock cacheReporter.mutex.Lock() defer cacheReporter.mutex.Unlock() - executions := make([]cache_report.ExecutionEntry, 0) - // NOTE: fetched via fifo register key reference as is ordered array, - // needed to test and report back ordered executions stored - for _, executionEntryKey := range cacheReporter.fifoRegister { - // NOTE: cached executions are passed by reference, so they must not be modified - entry, ok := cacheReporter.Cache[executionEntryKey] - if !ok { - return []cache_report.ExecutionEntry{}, errors.New("internal error. cache fifo register and cache executions mismatch") - } - executions = append(executions, entry) + executionEntry, err := cacheReporter.getExecution(executionId) + if err != nil { + return err } - return executions, nil + + if workflowError != nil { + executionEntry.Error = workflowError + executionEntry.Status = cache_report.Failed + } else { + executionEntry.Status = cache_report.SuccessfullyExecuted + } + executionEntry.Ended = cacheReporter.timeUtil.Now() + cacheReporter.Cache[executionId.String()] = executionEntry + + return nil + // Unlocked } -func (cacheReporter *Cache) GetExecutionReport(executionKey uuid.UUID) (cache_report.ExecutionEntry, error) { +func (cacheReporter *Cache) addStartExecutionStep(executionId uuid.UUID, newStepData cache_report.StepResult) error { + // Locked cacheReporter.mutex.Lock() defer cacheReporter.mutex.Unlock() + executionEntry, err := cacheReporter.getExecution(executionId) + if err != nil { + return err + } + + if executionEntry.Status != cache_report.Ongoing { + return errors.New("trying to report on the execution of a step for an already reportedly terminated playbook execution") + } + _, alreadyThere := executionEntry.StepResults[newStepData.StepId] + if alreadyThere { + // TODO: must fix: all steps should start empty values but already present. Check should be + // done on Step.Started > 0 time + // + // Should divide between instanciation of step, and modification of step, + // with respective checks step status + return errors.New("a step execution start was already reported for this step. ignoring") + } + + executionEntry.StepResults[newStepData.StepId] = newStepData + // New code + cacheReporter.Cache[executionId.String()] = executionEntry + + return nil + // Unlocked +} + +func (cacheReporter *Cache) upateEndExecutionStep(executionId uuid.UUID, stepId string, returnVars cacao.Variables, stepError error, acceptedStepStati []cache_report.Status) error { + // Locked + cacheReporter.mutex.Lock() + defer cacheReporter.mutex.Unlock() + + executionEntry, err := cacheReporter.getExecution(executionId) + if err != nil { + return err + } + + if executionEntry.Status != cache_report.Ongoing { + return errors.New("trying to report on the execution of a step for an already reportedly terminated playbook execution") + // Unlocked + } + executionStepResult, ok := executionEntry.StepResults[stepId] + if !ok { + // TODO: must fix: all steps should start empty values but already present. Check should be + // done on Step.Started > 0 time + return errors.New("trying to update a step which was not (yet?) recorded in the cache") + // Unlocked + } + + if !slices.Contains(acceptedStepStati, executionStepResult.Status) { + return fmt.Errorf("step status precondition not met for step update [step status: %s]", executionStepResult.Status.String()) + } + + if stepError != nil { + executionStepResult.Error = stepError + executionStepResult.Status = cache_report.ServerSideError + } else { + executionStepResult.Status = cache_report.SuccessfullyExecuted + } + executionStepResult.Ended = cacheReporter.timeUtil.Now() + executionStepResult.Variables = returnVars + executionEntry.StepResults[stepId] = executionStepResult + cacheReporter.Cache[executionId.String()] = executionEntry + + return nil + // Unlocked +} + +// ############################### Informer interface + +func (cacheReporter *Cache) GetExecutions() ([]cache_report.ExecutionEntry, error) { + executions, err := cacheReporter.getAllExecutions() + return executions, err +} + +func (cacheReporter *Cache) GetExecutionReport(executionKey uuid.UUID) (cache_report.ExecutionEntry, error) { + executionEntry, err := cacheReporter.getExecution(executionKey) if err != nil { return cache_report.ExecutionEntry{}, err @@ -99,11 +221,9 @@ func (cacheReporter *Cache) GetExecutionReport(executionKey uuid.UUID) (cache_re return report, nil } -// ############################### Reporting +// ############################### Reporting interface func (cacheReporter *Cache) ReportWorkflowStart(executionId uuid.UUID, playbook cacao.Playbook) error { - cacheReporter.mutex.Lock() - defer cacheReporter.mutex.Unlock() newExecutionEntry := cache_report.ExecutionEntry{ ExecutionId: executionId, @@ -121,43 +241,12 @@ func (cacheReporter *Cache) ReportWorkflowStart(executionId uuid.UUID, playbook } func (cacheReporter *Cache) ReportWorkflowEnd(executionId uuid.UUID, playbook cacao.Playbook, workflowError error) error { - cacheReporter.mutex.Lock() - defer cacheReporter.mutex.Unlock() - - executionEntry, err := cacheReporter.getExecution(executionId) - if err != nil { - return err - } - - if workflowError != nil { - executionEntry.Error = workflowError - executionEntry.Status = cache_report.Failed - } else { - executionEntry.Status = cache_report.SuccessfullyExecuted - } - executionEntry.Ended = cacheReporter.timeUtil.Now() - cacheReporter.Cache[executionId.String()] = executionEntry - return nil + err := cacheReporter.upateEndExecutionWorkflow(executionId, workflowError) + return err } func (cacheReporter *Cache) ReportStepStart(executionId uuid.UUID, step cacao.Step, variables cacao.Variables) error { - cacheReporter.mutex.Lock() - defer cacheReporter.mutex.Unlock() - - executionEntry, err := cacheReporter.getExecution(executionId) - if err != nil { - return err - } - - if executionEntry.Status != cache_report.Ongoing { - return errors.New("trying to report on the execution of a step for an already reported completed or failed execution") - } - - _, alreadyThere := executionEntry.StepResults[step.ID] - if alreadyThere { - return errors.New("a step execution start was already reported for this step. ignoring") - } commandsB64 := []string{} isAutomated := true @@ -173,7 +262,7 @@ func (cacheReporter *Cache) ReportStepStart(executionId uuid.UUID, step cacao.St } } - newStepEntry := cache_report.StepResult{ + newStep := cache_report.StepResult{ ExecutionId: executionId, StepId: step.ID, Started: cacheReporter.timeUtil.Now(), @@ -184,44 +273,21 @@ func (cacheReporter *Cache) ReportStepStart(executionId uuid.UUID, step cacao.St Error: nil, IsAutomated: isAutomated, } - executionEntry.StepResults[step.ID] = newStepEntry - // New code - cacheReporter.Cache[executionId.String()] = executionEntry - return nil -} -func (cacheReporter *Cache) ReportStepEnd(executionId uuid.UUID, step cacao.Step, returnVars cacao.Variables, stepError error) error { - cacheReporter.mutex.Lock() - defer cacheReporter.mutex.Unlock() + err := cacheReporter.addStartExecutionStep(executionId, newStep) - executionEntry, err := cacheReporter.getExecution(executionId) - if err != nil { - return err - } + return err +} - if executionEntry.Status != cache_report.Ongoing { - return errors.New("trying to report on the execution of a step for an already reported completed or failed execution") - } +func (cacheReporter *Cache) ReportStepEnd(executionId uuid.UUID, step cacao.Step, returnVars cacao.Variables, stepError error) error { - executionStepResult, ok := executionEntry.StepResults[step.ID] - if !ok { - return errors.New("cannot report step end. step was not found in execution") - } + // stepId, err := uuid.Parse(step.ID) + // if err != nil { + // return fmt.Errorf("could not parse to uuid the step id: %s", step.ID) + // } - if executionStepResult.Status != cache_report.Ongoing { - return errors.New("trying to report on the execution of a step that was already reported completed or failed") - } + acceptedStepStati := []cache_report.Status{cache_report.Ongoing} + err := cacheReporter.upateEndExecutionStep(executionId, step.ID, returnVars, stepError, acceptedStepStati) - if stepError != nil { - executionStepResult.Error = stepError - executionStepResult.Status = cache_report.ServerSideError - } else { - executionStepResult.Status = cache_report.SuccessfullyExecuted - } - executionStepResult.Ended = cacheReporter.timeUtil.Now() - executionStepResult.Variables = returnVars - executionEntry.StepResults[step.ID] = executionStepResult - // New code - cacheReporter.Cache[executionId.String()] = executionEntry - return nil + return err } diff --git a/test/unittest/reporters/downstream_reporter/cache_test.go b/test/unittest/reporters/downstream_reporter/cache_test.go index 3050a271..30f58d0b 100644 --- a/test/unittest/reporters/downstream_reporter/cache_test.go +++ b/test/unittest/reporters/downstream_reporter/cache_test.go @@ -6,7 +6,7 @@ import ( "soarca/internal/reporter/downstream_reporter/cache" "soarca/models/cacao" cache_model "soarca/models/cache" - "soarca/test/unittest/mocks/mock_utils/time" + mock_time "soarca/test/unittest/mocks/mock_utils/time" "testing" "time" @@ -841,7 +841,7 @@ func TestInvalidStepReportAfterExecutionEnd(t *testing.T) { t.Fail() } - expectedErr := errors.New("trying to report on the execution of a step for an already reported completed or failed execution") + expectedErr := errors.New("trying to report on the execution of a step for an already reportedly terminated playbook execution") assert.Equal(t, err, expectedErr) mock_time.AssertExpectations(t) } @@ -930,7 +930,7 @@ func TestInvalidStepReportAfterStepEnd(t *testing.T) { t.Fail() } - expectedErr := errors.New("trying to report on the execution of a step that was already reported completed or failed") + expectedErr := errors.New("step status precondition not met for step update [step status: successfully_executed]") assert.Equal(t, err, expectedErr) mock_time.AssertExpectations(t) }