Skip to content

Commit

Permalink
Implement updateContextDecisionJob
Browse files Browse the repository at this point in the history
  • Loading branch information
sevein committed May 1, 2024
1 parent e792571 commit c76d71b
Show file tree
Hide file tree
Showing 10 changed files with 440 additions and 92 deletions.
6 changes: 6 additions & 0 deletions hack/ccp/internal/controller/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ type chain struct {
choices any // TODO: see `generated_choices` in `chain.py`.
}

func (c *chain) update(kvs map[string]string) {
for k, v := range kvs {
c.pCtx.Set(k, string(v))
}
}

// iterator carries a package through all its workflow.
type iterator struct {
logger logr.Logger
Expand Down
167 changes: 144 additions & 23 deletions hack/ccp/internal/controller/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"maps"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -317,22 +318,22 @@ var _ jobRunner = (*updateContextDecisionJob)(nil)
// TODO: this should be defined in the workflow, not hardcoded here.
//
// nolint: unused
var updateContextDecisionJobChoiceMapping = map[string]string{
var updateContextDecisionJobChoiceMapping = map[uuid.UUID]uuid.UUID{
// Decision point "Assign UUIDs to directories?".
"8882bad4-561c-4126-89c9-f7f0c083d5d7": "bd899573-694e-4d33-8c9b-df0af802437d",
"e10a31c3-56df-4986-af7e-2794ddfe8686": "bd899573-694e-4d33-8c9b-df0af802437d",
"d6f6f5db-4cc2-4652-9283-9ec6a6d181e5": "bd899573-694e-4d33-8c9b-df0af802437d",
"1563f22f-f5f7-4dfe-a926-6ab50d408832": "bd899573-694e-4d33-8c9b-df0af802437d",
uuid.MustParse("8882bad4-561c-4126-89c9-f7f0c083d5d7"): uuid.MustParse("bd899573-694e-4d33-8c9b-df0af802437d"),
uuid.MustParse("e10a31c3-56df-4986-af7e-2794ddfe8686"): uuid.MustParse("bd899573-694e-4d33-8c9b-df0af802437d"),
uuid.MustParse("d6f6f5db-4cc2-4652-9283-9ec6a6d181e5"): uuid.MustParse("bd899573-694e-4d33-8c9b-df0af802437d"),
uuid.MustParse("1563f22f-f5f7-4dfe-a926-6ab50d408832"): uuid.MustParse("bd899573-694e-4d33-8c9b-df0af802437d"),
// Decision "Yes" (for "Assign UUIDs to directories?").
"7e4cf404-e62d-4dc2-8d81-6141e390f66f": "2dc3f487-e4b0-4e07-a4b3-6216ed24ca14",
"2732a043-b197-4cbc-81ab-4e2bee9b74d3": "2dc3f487-e4b0-4e07-a4b3-6216ed24ca14",
"aa793efa-1b62-498c-8f92-cab187a99a2a": "2dc3f487-e4b0-4e07-a4b3-6216ed24ca14",
"efd98ddb-80a6-4206-80bf-81bf00f84416": "2dc3f487-e4b0-4e07-a4b3-6216ed24ca14",
uuid.MustParse("7e4cf404-e62d-4dc2-8d81-6141e390f66f"): uuid.MustParse("2dc3f487-e4b0-4e07-a4b3-6216ed24ca14"),
uuid.MustParse("2732a043-b197-4cbc-81ab-4e2bee9b74d3"): uuid.MustParse("2dc3f487-e4b0-4e07-a4b3-6216ed24ca14"),
uuid.MustParse("aa793efa-1b62-498c-8f92-cab187a99a2a"): uuid.MustParse("2dc3f487-e4b0-4e07-a4b3-6216ed24ca14"),
uuid.MustParse("efd98ddb-80a6-4206-80bf-81bf00f84416"): uuid.MustParse("2dc3f487-e4b0-4e07-a4b3-6216ed24ca14"),
// Decision "No" (for "Assign UUIDs to directories?").
"0053c670-3e61-4a3e-a188-3a2dd1eda426": "891f60d0-1ba8-48d3-b39e-dd0934635d29",
"8e93e523-86bb-47e1-a03a-4b33e13f8c5e": "891f60d0-1ba8-48d3-b39e-dd0934635d29",
"6dfbeff8-c6b1-435b-833a-ed764229d413": "891f60d0-1ba8-48d3-b39e-dd0934635d29",
"dc0ee6b6-ed5f-42a3-bc8f-c9c7ead03ed1": "891f60d0-1ba8-48d3-b39e-dd0934635d29",
uuid.MustParse("0053c670-3e61-4a3e-a188-3a2dd1eda426"): uuid.MustParse("891f60d0-1ba8-48d3-b39e-dd0934635d29"),
uuid.MustParse("8e93e523-86bb-47e1-a03a-4b33e13f8c5e"): uuid.MustParse("891f60d0-1ba8-48d3-b39e-dd0934635d29"),
uuid.MustParse("6dfbeff8-c6b1-435b-833a-ed764229d413"): uuid.MustParse("891f60d0-1ba8-48d3-b39e-dd0934635d29"),
uuid.MustParse("dc0ee6b6-ed5f-42a3-bc8f-c9c7ead03ed1"): uuid.MustParse("891f60d0-1ba8-48d3-b39e-dd0934635d29"),
}

func newUpdateContextDecisionJob(j *job) (*updateContextDecisionJob, error) {
Expand All @@ -347,14 +348,20 @@ func newUpdateContextDecisionJob(j *job) (*updateContextDecisionJob, error) {
}, nil
}

func (l *updateContextDecisionJob) exec(ctx context.Context) (_ uuid.UUID, err error) {
func (l *updateContextDecisionJob) exec(ctx context.Context) (linkID uuid.UUID, err error) {
defer func() {
if err != nil {
err = fmt.Errorf("nextChainDecisionJob: %v", err)
return
}
if e := l.j.markComplete(ctx); e != nil {
err = e
return
}
if id := l.j.wl.ExitCodes[0].LinkID; id == nil || *id == uuid.Nil {
err = fmt.Errorf("nextChainDecisionJob: linkID undefined")
} else {
linkID = *id
}
}()

Expand All @@ -365,19 +372,133 @@ func (l *updateContextDecisionJob) exec(ctx context.Context) (_ uuid.UUID, err e
return uuid.Nil, fmt.Errorf("save: %v", err)
}

panic("not implemented")
// Load new context from the database (DashboardSettings).
// TODO: split this out? Workflow items with no replacements configured
// seems like a different case.
if len(l.config.Replacements) == 0 {
if dict, err := l.loadDatabaseContext(ctx); err != nil {
return uuid.Nil, fmt.Errorf("load dict from db: %v", err)
} else if dict != nil {
l.j.chain.update(dict)
return uuid.Nil, nil
}
}

// TODO:
// - Load settings from DashboardSetting to update context.
// - Or load preconfigured choice to update context.
// - Or mark as awaiting.
// Load new context from processing configuration.
if dict, err := l.loadPreconfiguredContext(); err != nil {
return uuid.Nil, fmt.Errorf("load context with preconfigured choice: %v", err)
} else if dict != nil {
l.j.chain.update(dict)
return uuid.Nil, nil
}

id := l.j.wl.ExitCodes[0].LinkID
if id == nil || *id == uuid.Nil {
return uuid.Nil, errors.New("ops")
// Build decision point and await resolution.
opts := make([]option, len(l.config.Replacements))
for i, item := range l.config.Replacements {
opts[i] = option(item.Description.String())
}

return *id, nil
return l.await(ctx, opts)
}

// loadDatabaseContext loads the context dictionary from the database.
func (l *updateContextDecisionJob) loadDatabaseContext(ctx context.Context) (map[string]string, error) {
ln, ok := l.j.wf.Links[l.j.wl.FallbackLinkID]
if !ok {
return nil, nil
}
cfg, ok := ln.Config.(workflow.LinkStandardTaskConfig)
if !ok {
return nil, nil
}
if cfg.Execute == "" {
return nil, nil
}

ret, err := l.j.pkg.store.ReadDict(ctx, cfg.Execute)
if err != nil {
return nil, err
}

return l.formatChoices(ret), nil
}

// loadPreconfiguredContext loads the context dictionary from the workflow.
func (l *updateContextDecisionJob) loadPreconfiguredContext() (map[string]string, error) {
var normalizedChoice uuid.UUID
if v, ok := updateContextDecisionJobChoiceMapping[l.j.wl.ID]; ok {
normalizedChoice = v
} else {
normalizedChoice = l.j.wl.ID
}

choices, err := l.j.pkg.parseProcessingConfig()
if err != nil {
return nil, err
}

for _, choice := range choices {
if choice.AppliesTo != normalizedChoice.String() {
continue
}
desiredChoice, err := uuid.Parse(choice.GoToChain)
if err != nil {
return nil, err
}
if v, ok := updateContextDecisionJobChoiceMapping[desiredChoice]; ok {
desiredChoice = v
}
ln, ok := l.j.wf.Links[normalizedChoice]
if !ok {
return nil, nil // fmt.Errorf("desired choice not found: %s", desiredChoice)
}
config, ok := ln.Config.(workflow.LinkMicroServiceChoiceReplacementDic)
if !ok {
return nil, fmt.Errorf("desired choice doesn't have the expected type: %s", desiredChoice)
}
for _, replacement := range config.Replacements {
if replacement.ID == desiredChoice.String() {
choices := maps.Clone(replacement.Items)
return l.formatChoices(choices), nil
}
}
}

return nil, nil
}

func (l *updateContextDecisionJob) formatChoices(choices map[string]string) map[string]string {
for k, v := range choices {
delete(choices, k)
choices[fmt.Sprintf("%%%s%%", k)] = v
}

return choices
}

func (l *updateContextDecisionJob) await(ctx context.Context, opts []option) (_ uuid.UUID, err error) {
defer func() {
if err != nil {
err = fmt.Errorf("await: %v", err)
return
}
}()

if err := l.j.markAwaitingDecision(ctx); err != nil {
return uuid.Nil, err
}

decision, err := l.j.pkg.AwaitDecision(ctx, opts)
if err != nil {
return uuid.Nil, err
}

// TODO: decision here should be an integer.
// https://github.com/artefactual/archivematica/blob/2dd5a2366bf0529c193a19a5546087ed9a0b5534/src/MCPServer/lib/server/jobs/decisions.py#L286-L298

panic("not implemented")

return decision.uuid(), nil
}

// directoryClientScriptJob.
Expand Down
44 changes: 29 additions & 15 deletions hack/ccp/internal/controller/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,25 @@ func (p *Package) String() string {
return p.Name()
}

// parseProcessingConfig returns a list of preconfigured choices. A missing
// configuration file is a non-error, i.e. returns an empty slice of choices.
func (p *Package) parseProcessingConfig() ([]workflow.Choice, error) {
f, err := os.Open(filepath.Join(p.path, "processingMCP.xml"))
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}

choices, err := workflow.ParseConfig(f)
if err != nil {
return nil, fmt.Errorf("parse: %v", err)
}

return choices, nil
}

// PreconfiguredChoice looks up a pre-configured choice in the processing
// configuration file that is part of the package.
func (p *Package) PreconfiguredChoice(linkID uuid.UUID) (uuid.UUID, error) {
Expand All @@ -118,19 +137,11 @@ func (p *Package) PreconfiguredChoice(linkID uuid.UUID) (uuid.UUID, error) {
return chainID, nil
}

f, err := os.Open(filepath.Join(p.path, "processingMCP.xml"))
if err != nil {
if os.IsNotExist(err) {
return uuid.Nil, nil
}
return uuid.Nil, err
}

// TODO: this could be cached if the file isn't going to change, but
// Archivematica is not doing any caching.
choices, err := workflow.ParseConfig(f)
choices, err := p.parseProcessingConfig()
if err != nil {
return uuid.Nil, err
} else if len(choices) == 0 {
return uuid.Nil, nil
}

var chainID uuid.UUID
Expand Down Expand Up @@ -194,7 +205,7 @@ func (p *Package) Files(ctx context.Context, filterFilenameEnd, filterSubdir str
if err != nil {
return nil, err
}
ret := make([]replacementMapping, len(files))
ret := make([]replacementMapping, 0, len(files))
seen := make(map[string]struct{}, len(files))

for _, f := range files {
Expand All @@ -218,6 +229,9 @@ func (p *Package) Files(ctx context.Context, filterFilenameEnd, filterSubdir str
if err != nil {
return err
}
if d.IsDir() {
return nil
}
fname := d.Name()
if filterFilenameEnd != "" && !strings.HasPrefix(fname, filterFilenameEnd) {
return nil
Expand All @@ -227,7 +241,7 @@ func (p *Package) Files(ctx context.Context, filterFilenameEnd, filterSubdir str
}
ret = append(ret, map[string]replacement{
"%relativeLocation": replacement(path),
"%fileUUID%": replacement(""),
"%fileUUID%": replacement("None"),
"%fileGrpUse%": replacement(""),
})
return nil
Expand Down Expand Up @@ -546,8 +560,8 @@ func fileReplacements(pkg *Package, f *store.File) replacementMapping {
ext := filepath.Ext(f.CurrentLocation)
extWithDot := "." + ext
name := filepath.Base(strings.TrimSuffix(f.CurrentLocation, ext))
absolutePath := strings.ReplaceAll(f.CurrentLocation, "%SIPDirectory%", pkg.Path())
absolutePath = strings.ReplaceAll(absolutePath, "%transferDirectory%", pkg.Path())
absolutePath := strings.ReplaceAll(f.CurrentLocation, "%SIPDirectory%", joinPath(pkg.Path(), ""))
absolutePath = strings.ReplaceAll(absolutePath, "%transferDirectory%", joinPath(pkg.Path(), ""))

maps.Copy(mapping, map[string]replacement{
"%fileUUID%": replacement(f.ID.String()),
Expand Down
27 changes: 16 additions & 11 deletions hack/ccp/internal/controller/package_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@ import (
)

func TestReplacements(t *testing.T) {
pCtx := &packageContext{OrderedMap: orderedmap.NewOrderedMap[string, string]()}
pCtx.Set("%path%", "/mnt/disk")
pCtx.Set("%name%", `Dr. Evelyn "The Innovator" O'Neill: The Complete Digital Archives`)
t.Parallel()

rm := replacementMapping(map[string]replacement{
"%uuid%": "91354225-f28b-433c-8280-cf6a5edea2ff",
"%job%": `cool \\stuff`,
}).update(pCtx)
t.Run("Updates itself with a given packageContext", func(t *testing.T) {
t.Parallel()
pCtx := &packageContext{OrderedMap: orderedmap.NewOrderedMap[string, string]()}
pCtx.Set("%path%", "/mnt/disk")
pCtx.Set("%name%", `Dr. Evelyn "The Innovator" O'Neill: The Complete Digital Archives`)

assert.Equal(t,
rm.replaceValues(`%name% with path="%path%" and uuid="%uuid%" did: %job%`),
`Dr. Evelyn \"The Innovator\" O'Neill: The Complete Digital Archives with path="/mnt/disk" and uuid="91354225-f28b-433c-8280-cf6a5edea2ff" did: cool \\\\\\\\stuff`,
)
rm := replacementMapping(map[string]replacement{
"%uuid%": "91354225-f28b-433c-8280-cf6a5edea2ff",
"%job%": `cool \\stuff`,
}).update(pCtx)

assert.Equal(t,
rm.replaceValues(`%name% with path="%path%" and uuid="%uuid%" did: %job%`),
`Dr. Evelyn \"The Innovator\" O'Neill: The Complete Digital Archives with path="/mnt/disk" and uuid="91354225-f28b-433c-8280-cf6a5edea2ff" did: cool \\\\\\\\stuff`,
)
})
}
Loading

0 comments on commit c76d71b

Please sign in to comment.