Skip to content

Commit

Permalink
Draft copyFromTransferSources
Browse files Browse the repository at this point in the history
  • Loading branch information
sevein committed May 14, 2024
1 parent 770e4a3 commit c808227
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 76 deletions.
2 changes: 1 addition & 1 deletion hack/ccp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a
github.com/testcontainers/testcontainers-go v0.30.0
github.com/testcontainers/testcontainers-go/modules/mysql v0.30.0
go.artefactual.dev/ssclient v0.2.1
go.artefactual.dev/ssclient v0.2.3
go.artefactual.dev/tools v0.10.0
go.nhat.io/httpmock v0.11.0
go.starlark.net v0.0.0-20240411212711-9b43f0afd521
Expand Down
4 changes: 2 additions & 2 deletions hack/ccp/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ github.com/yuin/goldmark-emoji v1.0.2 h1:c/RgTShNgHTtc6xdz2KKI74jJr6rWi7FPgnP9GA
github.com/yuin/goldmark-emoji v1.0.2/go.mod h1:RhP/RWpexdp+KHs7ghKnifRoIs/Bq4nDS7tRbCkOwKY=
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.artefactual.dev/ssclient v0.2.1 h1:PKS7o8D7Q7XU+g1YpjWtGzD0ZknOj8ZPdh0KV9jI0cY=
go.artefactual.dev/ssclient v0.2.1/go.mod h1:ImaAHtgGIbKlnrOUzczBMmltNVbhYkKZ7ujUjfBtUj8=
go.artefactual.dev/ssclient v0.2.3 h1:tVkjQEycs6420Ky++1aZZUBoz94wWS2FoiVzqBNH2gM=
go.artefactual.dev/ssclient v0.2.3/go.mod h1:ImaAHtgGIbKlnrOUzczBMmltNVbhYkKZ7ujUjfBtUj8=
go.artefactual.dev/tools v0.10.0 h1:+LeZS5oHupAQBXvLQ4aGIuZyqf7zCpD7s3UpyDl9zn4=
go.artefactual.dev/tools v0.10.0/go.mod h1:PIy0RtC45gC4sASb4r26g0aCU24kSWIp+mcV1p2gtpY=
go.nhat.io/httpmock v0.11.0 h1:GSADjr4/sn1HXqnyluPr9PYpSmMh/h3ty0O7lEozD3c=
Expand Down
89 changes: 78 additions & 11 deletions hack/ccp/internal/controller/source.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package controller

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/google/uuid"

"github.com/artefactual/archivematica/hack/ccp/internal/derrors"
"github.com/artefactual/archivematica/hack/ccp/internal/ssclient"
)
Expand All @@ -24,11 +27,11 @@ import (
// This method does not rely on the activeTransfer watched directory. It does
// not prompt the user to accept the transfer because we go directly into the
// next chain link.
func StartTransfer(ssclient ssclient.Client, sharedDir, tmpDir, name, path string) error {
func StartTransfer(ctx context.Context, ssclient ssclient.Client, sharedDir, tmpDir, name, path string) error {
destRel, destAbs, src := determineTransferPaths(sharedDir, tmpDir, name, path)
fmt.Println(destRel, destAbs, src)

copyFromTransferSources(ssclient, []string{path}, destRel)
_ = copyFromTransferSources(ctx, ssclient, []string{path}, destRel)

tsrc, tdst := "", ""
dst, err := moveToInternalSharedDir(sharedDir, tsrc, tdst)
Expand Down Expand Up @@ -61,12 +64,14 @@ func StartTransferWithWatchedDir() {
// update transfer.currentlocation with the new destination
}

func locationPath(locPath string) (id, path string) {
if before, after, found := strings.Cut(locPath, ":"); found {
id = before
func locationPath(locPath string) (id uuid.UUID, path string) {
before, after, found := strings.Cut(locPath, ":")

if found {
id, _ = uuid.Parse(before)
path = after
} else {
id = before
path = before
}

return id, path
Expand Down Expand Up @@ -147,9 +152,71 @@ func moveToInternalSharedDir(sharedDir, path, dest string) (_ string, err error)
}
}

func copyFromTransferSources(c ssclient.Client, paths []string, destRel string) {
// - processing_location = storage_service.get_first_location(purpose="CP")
// - transfer_sources = storage_service.get_location(purpose="TS")
// - _default_transfer_source_location_uuid
// - storage_service.copy_files(location, processing_location, files)
func copyFromTransferSources(ctx context.Context, c ssclient.Client, paths []string, destRel string) (err error) {
derrors.Add(&err, "copyFromTransferSources()")

// We'll use the default transfer source location when a request does not
// indicate its source.
defaultTransferSource, err := c.ReadDefaultLocation(ctx, "TS")
if err != nil {
return err
}

// Look up the destination, which is our pipeline processing location.
currentlyProcessing, err := c.ReadProcessingLocation(ctx)
if err != nil {
return err
}

// filesByLocID is a list of all the copy operations that we'll be making,
// indexed by the identifier of the transfer source location.
transferSources, err := c.ListLocations(ctx, "", "TS")
if err != nil {
return err
}
type sourceFiles struct {
transferSource *ssclient.Location
files [][2]string // src, dst
}
filesByLocID := map[uuid.UUID]sourceFiles{}
for _, loc := range transferSources {
filesByLocID[loc.ID] = sourceFiles{
transferSource: loc,
files: [][2]string{},
}
}

for _, item := range paths {
locID, path := locationPath(item)
if locID == uuid.Nil {
locID = defaultTransferSource.ID
}
ops, ok := filesByLocID[locID]
if !ok {
return fmt.Errorf("location %s is not associated with this pipeline", locID)
}

source := strings.Replace(path, ops.transferSource.Path, "", 1)
source = strings.TrimPrefix(source, "/")

var lastSegment string
if strings.HasSuffix(source, "/") {
lastSegment = joinPath(filepath.Base(strings.TrimSuffix(source, "/")), "")
} else {
lastSegment = filepath.Base(source)
}

destination := filepath.Join(currentlyProcessing.Path, destRel, lastSegment)
destination = strings.Replace(destination, "%sharedPath%", "", 1)

ops.files = append(ops.files, [2]string{source, destination})
}

for _, sf := range filesByLocID {
if copyErr := c.MoveFiles(ctx, sf.transferSource, currentlyProcessing, sf.files); err != nil {
err = errors.Join(err, copyErr)
}
}

return err
}
34 changes: 33 additions & 1 deletion hack/ccp/internal/controller/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"path/filepath"
"testing"

"github.com/google/uuid"
"gotest.tools/v3/assert"
"gotest.tools/v3/fs"
)
Expand Down Expand Up @@ -34,7 +35,7 @@ func TestDetermineTransferPaths(t *testing.T) {
},
want{
destRel: "/tmp/tmp.12345",
destAbs: "/var/archivematica/sharedDirectory/tmp/tmp.12345",
destAbs: "/var/archivematica/sharedDirectory/tmp/tmp.12345/transfer.tar.gz",
src: "/var/source/transfer.tar.gz",
},
},
Expand Down Expand Up @@ -108,3 +109,34 @@ func TestMoveToInternalSharedDir(t *testing.T) {
assert.Equal(t, dest, tmpDir.Join("sharedDir", "deposits", filepath.Base(dest)))
assert.Assert(t, fs.Equal(dest, fs.Expected(t, fs.WithFile("MARBLES.TGA", "contents"), fs.MatchAnyFileMode)))
}

func TestLocationPath(t *testing.T) {
t.Parallel()

tests := []struct {
arg string
id uuid.UUID
path string
}{
{
arg: "c059a454-dafa-418e-a126-74d0c7219ce6:/tmp",
id: uuid.MustParse("c059a454-dafa-418e-a126-74d0c7219ce6"),
path: "/tmp",
},
{
arg: "/tmp",
id: uuid.Nil,
path: "/tmp",
},
{
arg: "12345:/tmp",
id: uuid.Nil,
path: "/tmp",
},
}
for _, tc := range tests {
id, path := locationPath(tc.arg)
assert.Equal(t, id, tc.id)
assert.Equal(t, path, tc.path)
}
}
151 changes: 95 additions & 56 deletions hack/ccp/internal/ssclient/ssclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ssclient

import (
"context"
"errors"
"fmt"
"net/http"
"strings"
Expand All @@ -21,6 +22,8 @@ import (
"github.com/artefactual/archivematica/hack/ccp/internal/store"
)

var ErrLocationNotAvailable = errors.New("location not available")

type Pipeline struct {
ID uuid.UUID
URI string
Expand All @@ -41,8 +44,13 @@ type Location struct {
type Client interface {
ReadPipeline(ctx context.Context, id uuid.UUID) (*Pipeline, error)
ReadDefaultLocation(ctx context.Context, purpose string) (*Location, error)
ReadProcessingLocation(ctx context.Context) (*Location, error)
ListLocations(ctx context.Context, path, purpose string) ([]*Location, error)
CopyFiles(ctx context.Context, l *Location, files []string) error

// MoveFiles moves files between locations. `files` is a list of pairs
// indicating the paths of the source file and its destination (both paths
// must be relative to their Location of the files to be moved).
MoveFiles(ctx context.Context, src, dst *Location, files [][2]string) error
}

// clientImpl implements Client.
Expand Down Expand Up @@ -88,6 +96,77 @@ func (c *clientImpl) ReadPipeline(ctx context.Context, id uuid.UUID) (_ *Pipelin
return p, nil
}

func (c *clientImpl) ReadDefaultLocation(ctx context.Context, purpose string) (_ *Location, err error) {
derrors.Add(&err, "ReadDefaultLocation(%s)", purpose)

p, err := c.pipeline(ctx)
if err != nil {
return nil, err
}

headerOptions := kiotahttp.NewHeadersInspectionOptions()
headerOptions.InspectResponseHeaders = true

reqConfig := &api.V2LocationDefaultWithPurposeItemRequestBuilderGetRequestConfiguration{
Options: []kiotaabs.RequestOption{headerOptions},
}
if err := c.client.Api().V2().Location().DefaultEscaped().ByPurpose(purpose).Get(ctx, reqConfig); err != nil {
return nil, err
}

uris := headerOptions.ResponseHeaders.Get("Location")
if len(uris) < 1 {
return nil, ErrLocationNotAvailable
}
uri := uris[0]
if uri == "" {
return nil, ErrLocationNotAvailable
}

// Capture the UUID in the URI, e.g. "/api/v2/location/be68cfa8-d32a-44ba-a140-2ec5d6b903e0/".
id := strings.TrimSuffix(strings.TrimPrefix(uri, "/api/v2/location/"), "/")

res, err := c.client.Api().V2().Location().ByUuid(id).Get(ctx, nil)
if err != nil {
return nil, err
}

// Confirm that the default location has been made available to this pipeline.
var match bool
for _, item := range res.GetPipeline() {
if item == p.URI {
match = true
break
}
}
if !match {
return nil, ErrLocationNotAvailable
}

ret, err := convertLocation(res)
if err != nil {
return nil, err
}

return ret, nil
}

func (c *clientImpl) ReadProcessingLocation(ctx context.Context) (_ *Location, err error) {
derrors.Add(&err, "ReadProcessingLocation")

res, err := c.ListLocations(ctx, "", models.CP_LOCATIONPURPOSE.String())
if err != nil {
return nil, err
}

if len(res) < 1 {
return nil, ErrLocationNotAvailable
}

// We can have many but we'll return the first match.
return res[0], nil
}

func (c *clientImpl) ListLocations(ctx context.Context, path, purpose string) (_ []*Location, err error) {
derrors.Add(&err, "ListLocations(%s, %s)", path, purpose)

Expand Down Expand Up @@ -137,70 +216,30 @@ func (c *clientImpl) ListLocations(ctx context.Context, path, purpose string) (_
return ret, nil
}

func (c *clientImpl) ReadDefaultLocation(ctx context.Context, purpose string) (_ *Location, err error) {
derrors.Add(&err, "ReadDefaultLocation(%s)", purpose)
func (c *clientImpl) MoveFiles(ctx context.Context, src, dst *Location, files [][2]string) (err error) {
derrors.Add(&err, "MoveFiles()")

p, err := c.pipeline(ctx)
if err != nil {
return nil, err
}

headerOptions := kiotahttp.NewHeadersInspectionOptions()
headerOptions.InspectResponseHeaders = true

reqConfig := &api.V2LocationDefaultWithPurposeItemRequestBuilderGetRequestConfiguration{
Options: []kiotaabs.RequestOption{headerOptions},
}
if err := c.client.Api().V2().Location().DefaultEscaped().ByPurpose(purpose).Get(ctx, reqConfig); err != nil {
return nil, err
}

uris := headerOptions.ResponseHeaders.Get("Location")
if len(uris) < 1 {
return nil, fmt.Errorf("location not available")
}
uri := uris[0]
if uri == "" {
return nil, fmt.Errorf("location not available")
}

// Capture the UUID in the URI, e.g. "/api/v2/location/be68cfa8-d32a-44ba-a140-2ec5d6b903e0/".
id := strings.TrimSuffix(strings.TrimPrefix(uri, "/api/v2/location/"), "/")

res, err := c.client.Api().V2().Location().ByUuid(id).Get(ctx, nil)
if err != nil {
return nil, err
return err
}

// Confirm that the default location has been made available to this pipeline.
var match bool
for _, item := range res.GetPipeline() {
if item == p.URI {
match = true
break
}
}
if !match {
return nil, fmt.Errorf("location not available")
}
body := models.NewMoveRequest()
body.SetPipeline(&p.URI)
body.SetOriginLocation(&src.URI)

ret, err := convertLocation(res)
if err != nil {
return nil, err
moves := make([]models.MoveFileable, 0, len(files))
for _, f := range files {
m := models.NewMoveFile()
m.SetSource(&f[0])
m.SetDestination(&f[1])
moves = append(moves, m)
}
body.SetFiles(moves)

return ret, nil
}

func (c *clientImpl) CopyFiles(ctx context.Context, l *Location, files []string) (err error) {
derrors.Add(&err, "CopyFiles()")

_, err = c.pipeline(ctx)
if err != nil {
return err
}
_, err = c.client.Api().V2().Location().ByUuid(dst.ID.String()).Post(ctx, body, nil)

return nil
return err
}

// pipeline returns the details of the current pipeline.
Expand Down
Loading

0 comments on commit c808227

Please sign in to comment.