Skip to content

Commit

Permalink
separate cache access functions from interface functions
Browse files Browse the repository at this point in the history
  • Loading branch information
lucamrgs committed Aug 2, 2024
1 parent c0557a9 commit c1ad129
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 89 deletions.
238 changes: 152 additions & 86 deletions internal/reporter/downstream_reporter/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package cache
import (
b64 "encoding/base64"
"errors"
"fmt"
"slices"
"soarca/models/cacao"
cache_report "soarca/models/cache"
itime "soarca/utils/time"
Expand Down Expand Up @@ -31,10 +33,35 @@ func New(timeUtil itime.ITime, maxExecutions int) *Cache {
}
}

// Util for retrieval
// ############################### Atomic cache access operations (mutex-protection)

func (cacheReporter *Cache) getAllExecutions() ([]cache_report.ExecutionEntry, error) {
executions := make([]cache_report.ExecutionEntry, 0)
// NOTE: fetched via fifo register key reference as is ordered array,
// this is needed to test and report back ordered executions stored

// Lock
cacheReporter.mutex.Lock()
for _, executionEntryKey := range cacheReporter.fifoRegister {
// NOTE: cached executions are passed by reference, so they must not be modified
entry, ok := cacheReporter.Cache[executionEntryKey]
if !ok {
return []cache_report.ExecutionEntry{}, errors.New("internal error. cache fifo register and cache executions mismatch")
}
executions = append(executions, entry)
}
cacheReporter.mutex.Unlock()
// Unlocked

return executions, nil
}

func (cacheReporter *Cache) getExecution(executionKey uuid.UUID) (cache_report.ExecutionEntry, error) {

executionKeyStr := executionKey.String()
// No need for mutex as is one-line access
executionEntry, ok := cacheReporter.Cache[executionKeyStr]

if !ok {
err := errors.New("execution is not in cache. consider increasing cache size")
return cache_report.ExecutionEntry{}, err
Expand All @@ -52,6 +79,13 @@ func (cacheReporter *Cache) addExecutionFIFO(newExecutionEntry cache_report.Exec

newExecutionEntryKey := newExecutionEntry.ExecutionId.String()

// Lock
cacheReporter.mutex.Lock()
defer cacheReporter.mutex.Unlock()

if _, ok := cacheReporter.Cache[newExecutionEntryKey]; ok {
return errors.New("there is already an execution in the cache with the same execution id")
}
if len(cacheReporter.fifoRegister) >= cacheReporter.Size {

firstExecution := cacheReporter.fifoRegister[0]
Expand All @@ -61,35 +95,123 @@ func (cacheReporter *Cache) addExecutionFIFO(newExecutionEntry cache_report.Exec
cacheReporter.Cache[newExecutionEntryKey] = newExecutionEntry

return nil
// Unlocked
}

cacheReporter.fifoRegister = append(cacheReporter.fifoRegister, newExecutionEntryKey)
cacheReporter.Cache[newExecutionEntryKey] = newExecutionEntry

return nil
// Unlocked
}

func (cacheReporter *Cache) GetExecutions() ([]cache_report.ExecutionEntry, error) {
func (cacheReporter *Cache) upateEndExecutionWorkflow(executionId uuid.UUID, workflowError error) error {
// The cache should stay locked for the whole modification period
// in order to prevent e.g. the execution data being popped-out due to FIFO
// while its status or some of its steps are being updated

// Lock
cacheReporter.mutex.Lock()
defer cacheReporter.mutex.Unlock()

executions := make([]cache_report.ExecutionEntry, 0)
// NOTE: fetched via fifo register key reference as is ordered array,
// needed to test and report back ordered executions stored
for _, executionEntryKey := range cacheReporter.fifoRegister {
// NOTE: cached executions are passed by reference, so they must not be modified
entry, ok := cacheReporter.Cache[executionEntryKey]
if !ok {
return []cache_report.ExecutionEntry{}, errors.New("internal error. cache fifo register and cache executions mismatch")
}
executions = append(executions, entry)
executionEntry, err := cacheReporter.getExecution(executionId)
if err != nil {
return err
}
return executions, nil

if workflowError != nil {
executionEntry.Error = workflowError
executionEntry.Status = cache_report.Failed
} else {
executionEntry.Status = cache_report.SuccessfullyExecuted
}
executionEntry.Ended = cacheReporter.timeUtil.Now()
cacheReporter.Cache[executionId.String()] = executionEntry

return nil
// Unlocked
}

func (cacheReporter *Cache) GetExecutionReport(executionKey uuid.UUID) (cache_report.ExecutionEntry, error) {
func (cacheReporter *Cache) addStartExecutionStep(executionId uuid.UUID, newStepData cache_report.StepResult) error {
// Locked
cacheReporter.mutex.Lock()
defer cacheReporter.mutex.Unlock()

executionEntry, err := cacheReporter.getExecution(executionId)
if err != nil {
return err
}

if executionEntry.Status != cache_report.Ongoing {
return errors.New("trying to report on the execution of a step for an already reportedly terminated playbook execution")
}
_, alreadyThere := executionEntry.StepResults[newStepData.StepId]
if alreadyThere {
// TODO: must fix: all steps should start empty values but already present. Check should be
// done on Step.Started > 0 time
//
// Should divide between instanciation of step, and modification of step,
// with respective checks step status
return errors.New("a step execution start was already reported for this step. ignoring")
}

executionEntry.StepResults[newStepData.StepId] = newStepData
// New code
cacheReporter.Cache[executionId.String()] = executionEntry

return nil
// Unlocked
}

func (cacheReporter *Cache) upateEndExecutionStep(executionId uuid.UUID, stepId string, returnVars cacao.Variables, stepError error, acceptedStepStati []cache_report.Status) error {
// Locked
cacheReporter.mutex.Lock()
defer cacheReporter.mutex.Unlock()

executionEntry, err := cacheReporter.getExecution(executionId)
if err != nil {
return err
}

if executionEntry.Status != cache_report.Ongoing {
return errors.New("trying to report on the execution of a step for an already reportedly terminated playbook execution")
// Unlocked
}
executionStepResult, ok := executionEntry.StepResults[stepId]
if !ok {
// TODO: must fix: all steps should start empty values but already present. Check should be
// done on Step.Started > 0 time
return errors.New("trying to update a step which was not (yet?) recorded in the cache")
// Unlocked
}

if !slices.Contains(acceptedStepStati, executionStepResult.Status) {
return fmt.Errorf("step status precondition not met for step update [step status: %s]", executionStepResult.Status.String())
}

if stepError != nil {
executionStepResult.Error = stepError
executionStepResult.Status = cache_report.ServerSideError
} else {
executionStepResult.Status = cache_report.SuccessfullyExecuted
}
executionStepResult.Ended = cacheReporter.timeUtil.Now()
executionStepResult.Variables = returnVars
executionEntry.StepResults[stepId] = executionStepResult
cacheReporter.Cache[executionId.String()] = executionEntry

return nil
// Unlocked
}

// ############################### Informer interface

func (cacheReporter *Cache) GetExecutions() ([]cache_report.ExecutionEntry, error) {
executions, err := cacheReporter.getAllExecutions()
return executions, err
}

func (cacheReporter *Cache) GetExecutionReport(executionKey uuid.UUID) (cache_report.ExecutionEntry, error) {

executionEntry, err := cacheReporter.getExecution(executionKey)
if err != nil {
return cache_report.ExecutionEntry{}, err
Expand All @@ -99,11 +221,9 @@ func (cacheReporter *Cache) GetExecutionReport(executionKey uuid.UUID) (cache_re
return report, nil
}

// ############################### Reporting
// ############################### Reporting interface

func (cacheReporter *Cache) ReportWorkflowStart(executionId uuid.UUID, playbook cacao.Playbook) error {
cacheReporter.mutex.Lock()
defer cacheReporter.mutex.Unlock()

newExecutionEntry := cache_report.ExecutionEntry{
ExecutionId: executionId,
Expand All @@ -121,43 +241,12 @@ func (cacheReporter *Cache) ReportWorkflowStart(executionId uuid.UUID, playbook
}

func (cacheReporter *Cache) ReportWorkflowEnd(executionId uuid.UUID, playbook cacao.Playbook, workflowError error) error {
cacheReporter.mutex.Lock()
defer cacheReporter.mutex.Unlock()

executionEntry, err := cacheReporter.getExecution(executionId)
if err != nil {
return err
}

if workflowError != nil {
executionEntry.Error = workflowError
executionEntry.Status = cache_report.Failed
} else {
executionEntry.Status = cache_report.SuccessfullyExecuted
}
executionEntry.Ended = cacheReporter.timeUtil.Now()
cacheReporter.Cache[executionId.String()] = executionEntry

return nil
err := cacheReporter.upateEndExecutionWorkflow(executionId, workflowError)
return err
}

func (cacheReporter *Cache) ReportStepStart(executionId uuid.UUID, step cacao.Step, variables cacao.Variables) error {
cacheReporter.mutex.Lock()
defer cacheReporter.mutex.Unlock()

executionEntry, err := cacheReporter.getExecution(executionId)
if err != nil {
return err
}

if executionEntry.Status != cache_report.Ongoing {
return errors.New("trying to report on the execution of a step for an already reported completed or failed execution")
}

_, alreadyThere := executionEntry.StepResults[step.ID]
if alreadyThere {
return errors.New("a step execution start was already reported for this step. ignoring")
}

commandsB64 := []string{}
isAutomated := true
Expand All @@ -173,7 +262,7 @@ func (cacheReporter *Cache) ReportStepStart(executionId uuid.UUID, step cacao.St
}
}

newStepEntry := cache_report.StepResult{
newStep := cache_report.StepResult{
ExecutionId: executionId,
StepId: step.ID,
Started: cacheReporter.timeUtil.Now(),
Expand All @@ -184,44 +273,21 @@ func (cacheReporter *Cache) ReportStepStart(executionId uuid.UUID, step cacao.St
Error: nil,
IsAutomated: isAutomated,
}
executionEntry.StepResults[step.ID] = newStepEntry
// New code
cacheReporter.Cache[executionId.String()] = executionEntry
return nil
}

func (cacheReporter *Cache) ReportStepEnd(executionId uuid.UUID, step cacao.Step, returnVars cacao.Variables, stepError error) error {
cacheReporter.mutex.Lock()
defer cacheReporter.mutex.Unlock()
err := cacheReporter.addStartExecutionStep(executionId, newStep)

executionEntry, err := cacheReporter.getExecution(executionId)
if err != nil {
return err
}
return err
}

if executionEntry.Status != cache_report.Ongoing {
return errors.New("trying to report on the execution of a step for an already reported completed or failed execution")
}
func (cacheReporter *Cache) ReportStepEnd(executionId uuid.UUID, step cacao.Step, returnVars cacao.Variables, stepError error) error {

executionStepResult, ok := executionEntry.StepResults[step.ID]
if !ok {
return errors.New("cannot report step end. step was not found in execution")
}
// stepId, err := uuid.Parse(step.ID)
// if err != nil {
// return fmt.Errorf("could not parse to uuid the step id: %s", step.ID)
// }

if executionStepResult.Status != cache_report.Ongoing {
return errors.New("trying to report on the execution of a step that was already reported completed or failed")
}
acceptedStepStati := []cache_report.Status{cache_report.Ongoing}
err := cacheReporter.upateEndExecutionStep(executionId, step.ID, returnVars, stepError, acceptedStepStati)

if stepError != nil {
executionStepResult.Error = stepError
executionStepResult.Status = cache_report.ServerSideError
} else {
executionStepResult.Status = cache_report.SuccessfullyExecuted
}
executionStepResult.Ended = cacheReporter.timeUtil.Now()
executionStepResult.Variables = returnVars
executionEntry.StepResults[step.ID] = executionStepResult
// New code
cacheReporter.Cache[executionId.String()] = executionEntry
return nil
return err
}
6 changes: 3 additions & 3 deletions test/unittest/reporters/downstream_reporter/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"soarca/internal/reporter/downstream_reporter/cache"
"soarca/models/cacao"
cache_model "soarca/models/cache"
"soarca/test/unittest/mocks/mock_utils/time"
mock_time "soarca/test/unittest/mocks/mock_utils/time"
"testing"
"time"

Expand Down Expand Up @@ -841,7 +841,7 @@ func TestInvalidStepReportAfterExecutionEnd(t *testing.T) {
t.Fail()
}

expectedErr := errors.New("trying to report on the execution of a step for an already reported completed or failed execution")
expectedErr := errors.New("trying to report on the execution of a step for an already reportedly terminated playbook execution")
assert.Equal(t, err, expectedErr)
mock_time.AssertExpectations(t)
}
Expand Down Expand Up @@ -930,7 +930,7 @@ func TestInvalidStepReportAfterStepEnd(t *testing.T) {
t.Fail()
}

expectedErr := errors.New("trying to report on the execution of a step that was already reported completed or failed")
expectedErr := errors.New("step status precondition not met for step update [step status: successfully_executed]")
assert.Equal(t, err, expectedErr)
mock_time.AssertExpectations(t)
}

0 comments on commit c1ad129

Please sign in to comment.