Skip to content

Commit

Permalink
Merge branch 'development' into feature/204-add-variables-payload-to-…
Browse files Browse the repository at this point in the history
…trigger-by-uuid
  • Loading branch information
lucamrgs authored Aug 8, 2024
2 parents 456b249 + cc5330e commit 34da841
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 120 deletions.
261 changes: 166 additions & 95 deletions internal/reporter/downstream_reporter/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,105 +3,116 @@ package cache
import (
b64 "encoding/base64"
"errors"
"reflect"
"soarca/logger"
"fmt"
"slices"
"soarca/models/cacao"
cache_report "soarca/models/cache"
itime "soarca/utils/time"
"sync"
"time"

"github.com/google/uuid"
)

var component = reflect.TypeOf(Cache{}).PkgPath()
var log *logger.Log

func init() {
log = logger.Logger(component, logger.Info, "", logger.Json)
}

const MaxExecutions int = 10

type Cache struct {
Size int
timeUtil itime.ITime
Cache map[string]cache_report.ExecutionEntry // Cached up to max
fifoRegister []string // Used for O(1) FIFO cache management
mutex sync.Mutex
}

func New(timeUtil itime.ITime, maxExecutions int) *Cache {
return &Cache{
Size: maxExecutions,
Cache: make(map[string]cache_report.ExecutionEntry),
timeUtil: timeUtil,
mutex: sync.Mutex{},
}
}

// ############################### Atomic cache access operations (mutex-protection)

func (cacheReporter *Cache) getAllExecutions() ([]cache_report.ExecutionEntry, error) {
executions := make([]cache_report.ExecutionEntry, 0)
// NOTE: fetched via fifo register key reference as is ordered array,
// this is needed to test and report back ordered executions stored

// Lock
cacheReporter.mutex.Lock()
defer cacheReporter.mutex.Unlock()
for _, executionEntryKey := range cacheReporter.fifoRegister {
// NOTE: cached executions are passed by reference, so they must not be modified
entry, ok := cacheReporter.Cache[executionEntryKey]
if !ok {
// Unlock
return []cache_report.ExecutionEntry{}, errors.New("internal error. cache fifo register and cache executions mismatch")
}
executions = append(executions, entry)
}

// Unlocked
return executions, nil
}

func (cacheReporter *Cache) getExecution(executionKey uuid.UUID) (cache_report.ExecutionEntry, error) {

executionKeyStr := executionKey.String()
// No need for mutex as is one-line access
executionEntry, ok := cacheReporter.Cache[executionKeyStr]

if !ok {
err := errors.New("execution is not in cache")
log.Warning("execution is not in cache. consider increasing cache size.")
err := errors.New("execution is not in cache. consider increasing cache size")
return cache_report.ExecutionEntry{}, err
// TODO Retrieve from database
// TODO Retrieve from database and push to cache
}
return executionEntry, nil
}
func (cacheReporter *Cache) getExecutionStep(executionKey uuid.UUID, stepKey string) (cache_report.StepResult, error) {
executionEntry, err := cacheReporter.getExecution(executionKey)
if err != nil {
return cache_report.StepResult{}, err
}
executionStep, ok := executionEntry.StepResults[stepKey]
if !ok {
err := errors.New("execution step is not in cache")
return cache_report.StepResult{}, err
}
return executionStep, nil
}

// Adding executions in FIFO logic
func (cacheReporter *Cache) addExecution(newExecutionEntry cache_report.ExecutionEntry) error {
func (cacheReporter *Cache) addExecutionFIFO(newExecutionEntry cache_report.ExecutionEntry) error {

if !(len(cacheReporter.fifoRegister) == len(cacheReporter.Cache)) {
return errors.New("cache fifo register and content are desynchronized")
}

newExecutionEntryKey := newExecutionEntry.ExecutionId.String()

// Lock
cacheReporter.mutex.Lock()
defer cacheReporter.mutex.Unlock()

if _, ok := cacheReporter.Cache[newExecutionEntryKey]; ok {
return errors.New("there is already an execution in the cache with the same execution id")
}
if len(cacheReporter.fifoRegister) >= cacheReporter.Size {

firstExecution := cacheReporter.fifoRegister[0]
cacheReporter.fifoRegister = cacheReporter.fifoRegister[1:]
delete(cacheReporter.Cache, firstExecution)

cacheReporter.fifoRegister = append(cacheReporter.fifoRegister, newExecutionEntryKey)
cacheReporter.Cache[newExecutionEntryKey] = newExecutionEntry

return nil
// Unlocked
}

cacheReporter.fifoRegister = append(cacheReporter.fifoRegister, newExecutionEntryKey)
cacheReporter.Cache[newExecutionEntryKey] = newExecutionEntry
return nil
}

func (cacheReporter *Cache) ReportWorkflowStart(executionId uuid.UUID, playbook cacao.Playbook) error {
newExecutionEntry := cache_report.ExecutionEntry{
ExecutionId: executionId,
PlaybookId: playbook.ID,
Started: cacheReporter.timeUtil.Now(),
Ended: time.Time{},
StepResults: map[string]cache_report.StepResult{},
Status: cache_report.Ongoing,
}
err := cacheReporter.addExecution(newExecutionEntry)
if err != nil {
return err
}
return nil
// Unlocked
}

func (cacheReporter *Cache) ReportWorkflowEnd(executionId uuid.UUID, playbook cacao.Playbook, workflowError error) error {
func (cacheReporter *Cache) upateEndExecutionWorkflow(executionId uuid.UUID, workflowError error) error {
// The cache should stay locked for the whole modification period
// in order to prevent e.g. the execution data being popped-out due to FIFO
// while its status or some of its steps are being updated

// Lock
cacheReporter.mutex.Lock()
defer cacheReporter.mutex.Unlock()

executionEntry, err := cacheReporter.getExecution(executionId)
if err != nil {
Expand All @@ -118,70 +129,64 @@ func (cacheReporter *Cache) ReportWorkflowEnd(executionId uuid.UUID, playbook ca
cacheReporter.Cache[executionId.String()] = executionEntry

return nil
// Unlocked
}

func (cacheReporter *Cache) ReportStepStart(executionId uuid.UUID, step cacao.Step, variables cacao.Variables) error {
func (cacheReporter *Cache) addStartExecutionStep(executionId uuid.UUID, newStepData cache_report.StepResult) error {
// Locked
cacheReporter.mutex.Lock()
defer cacheReporter.mutex.Unlock()

executionEntry, err := cacheReporter.getExecution(executionId)
if err != nil {
return err
}

if executionEntry.Status != cache_report.Ongoing {
return errors.New("trying to report on the execution of a step for an already reported completed or failed execution")
return errors.New("trying to report on the execution of a step for an already reportedly terminated playbook execution")
}

_, alreadyThere := executionEntry.StepResults[step.ID]
_, alreadyThere := executionEntry.StepResults[newStepData.StepId]
if alreadyThere {
log.Warning("a step execution was already reported for this step. overwriting.")
// TODO: must fix: all steps should start empty values but already present. Check should be
// done on Step.Started > 0 time
//
// Should divide between instanciation of step, and modification of step,
// with respective checks step status
return errors.New("a step execution start was already reported for this step. ignoring")
}

// TODO: must test
commandsB64 := []string{}
isAutomated := true
for _, cmd := range step.Commands {
if cmd.Type == cacao.CommandTypeManual {
isAutomated = false
}
if cmd.CommandB64 != "" {
commandsB64 = append(commandsB64, cmd.CommandB64)
} else {
cmdB64 := b64.StdEncoding.EncodeToString([]byte(cmd.Command))
commandsB64 = append(commandsB64, cmdB64)
}
}
executionEntry.StepResults[newStepData.StepId] = newStepData
// New code
cacheReporter.Cache[executionId.String()] = executionEntry

newStepEntry := cache_report.StepResult{
ExecutionId: executionId,
StepId: step.ID,
Started: cacheReporter.timeUtil.Now(),
Ended: time.Time{},
Variables: variables,
CommandsB64: commandsB64,
Status: cache_report.Ongoing,
Error: nil,
IsAutomated: isAutomated,
}
executionEntry.StepResults[step.ID] = newStepEntry
return nil
// Unlocked
}

func (cacheReporter *Cache) ReportStepEnd(executionId uuid.UUID, step cacao.Step, returnVars cacao.Variables, stepError error) error {
func (cacheReporter *Cache) upateEndExecutionStep(executionId uuid.UUID, stepId string, returnVars cacao.Variables, stepError error, acceptedStepStati []cache_report.Status) error {
// Locked
cacheReporter.mutex.Lock()
defer cacheReporter.mutex.Unlock()

executionEntry, err := cacheReporter.getExecution(executionId)
if err != nil {
return err
}

if executionEntry.Status != cache_report.Ongoing {
return errors.New("trying to report on the execution of a step for an already reported completed or failed execution")
return errors.New("trying to report on the execution of a step for an already reportedly terminated playbook execution")
// Unlocked
}

executionStepResult, err := cacheReporter.getExecutionStep(executionId, step.ID)
if err != nil {
return err
executionStepResult, ok := executionEntry.StepResults[stepId]
if !ok {
// TODO: must fix: all steps should start empty values but already present. Check should be
// done on Step.Started > 0 time
return errors.New("trying to update a step which was not (yet?) recorded in the cache")
// Unlocked
}

if executionStepResult.Status != cache_report.Ongoing {
return errors.New("trying to report on the execution of a step that was already reported completed or failed")
if !slices.Contains(acceptedStepStati, executionStepResult.Status) {
return fmt.Errorf("step status precondition not met for step update [step status: %s]", executionStepResult.Status.String())
}

if stepError != nil {
Expand All @@ -192,27 +197,22 @@ func (cacheReporter *Cache) ReportStepEnd(executionId uuid.UUID, step cacao.Step
}
executionStepResult.Ended = cacheReporter.timeUtil.Now()
executionStepResult.Variables = returnVars
executionEntry.StepResults[step.ID] = executionStepResult
executionEntry.StepResults[stepId] = executionStepResult
cacheReporter.Cache[executionId.String()] = executionEntry

return nil
// Unlocked
}

// ############################### Informer interface

func (cacheReporter *Cache) GetExecutions() ([]cache_report.ExecutionEntry, error) {
executions := make([]cache_report.ExecutionEntry, 0)
// NOTE: fetched via fifo register key reference as is ordered array,
// needed to test and report back ordered executions stored
for _, executionEntryKey := range cacheReporter.fifoRegister {
// NOTE: cached executions are passed by reference, so they must not be modified
entry, present := cacheReporter.Cache[executionEntryKey]
if !present {
return []cache_report.ExecutionEntry{}, errors.New("internal error. cache fifo register and cache executions mismatch.")
}
executions = append(executions, entry)
}
return executions, nil
executions, err := cacheReporter.getAllExecutions()
return executions, err
}

func (cacheReporter *Cache) GetExecutionReport(executionKey uuid.UUID) (cache_report.ExecutionEntry, error) {

executionEntry, err := cacheReporter.getExecution(executionKey)
if err != nil {
return cache_report.ExecutionEntry{}, err
Expand All @@ -221,3 +221,74 @@ func (cacheReporter *Cache) GetExecutionReport(executionKey uuid.UUID) (cache_re

return report, nil
}

// ############################### Reporting interface

func (cacheReporter *Cache) ReportWorkflowStart(executionId uuid.UUID, playbook cacao.Playbook) error {

newExecutionEntry := cache_report.ExecutionEntry{
ExecutionId: executionId,
PlaybookId: playbook.ID,
Started: cacheReporter.timeUtil.Now(),
Ended: time.Time{},
StepResults: map[string]cache_report.StepResult{},
Status: cache_report.Ongoing,
}
err := cacheReporter.addExecutionFIFO(newExecutionEntry)
if err != nil {
return err
}
return nil
}

func (cacheReporter *Cache) ReportWorkflowEnd(executionId uuid.UUID, playbook cacao.Playbook, workflowError error) error {

err := cacheReporter.upateEndExecutionWorkflow(executionId, workflowError)
return err
}

func (cacheReporter *Cache) ReportStepStart(executionId uuid.UUID, step cacao.Step, variables cacao.Variables) error {

commandsB64 := []string{}
isAutomated := true
for _, cmd := range step.Commands {
if cmd.Type == cacao.CommandTypeManual {
isAutomated = false
}
if cmd.CommandB64 != "" {
commandsB64 = append(commandsB64, cmd.CommandB64)
} else {
cmdB64 := b64.StdEncoding.EncodeToString([]byte(cmd.Command))
commandsB64 = append(commandsB64, cmdB64)
}
}

newStep := cache_report.StepResult{
ExecutionId: executionId,
StepId: step.ID,
Started: cacheReporter.timeUtil.Now(),
Ended: time.Time{},
Variables: variables,
CommandsB64: commandsB64,
Status: cache_report.Ongoing,
Error: nil,
IsAutomated: isAutomated,
}

err := cacheReporter.addStartExecutionStep(executionId, newStep)

return err
}

func (cacheReporter *Cache) ReportStepEnd(executionId uuid.UUID, step cacao.Step, returnVars cacao.Variables, stepError error) error {

// stepId, err := uuid.Parse(step.ID)
// if err != nil {
// return fmt.Errorf("could not parse to uuid the step id: %s", step.ID)
// }

acceptedStepStati := []cache_report.Status{cache_report.Ongoing}
err := cacheReporter.upateEndExecutionStep(executionId, step.ID, returnVars, stepError, acceptedStepStati)

return err
}
Loading

0 comments on commit 34da841

Please sign in to comment.