From c808227eded331447f0b7dde916e12e310e53c95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Tue, 14 May 2024 11:03:28 +0000 Subject: [PATCH] Draft copyFromTransferSources --- hack/ccp/go.mod | 2 +- hack/ccp/go.sum | 4 +- hack/ccp/internal/controller/source.go | 89 ++++++++++-- hack/ccp/internal/controller/source_test.go | 34 ++++- hack/ccp/internal/ssclient/ssclient.go | 151 ++++++++++++-------- hack/ccp/internal/ssclient/ssclient_test.go | 117 ++++++++++++++- 6 files changed, 321 insertions(+), 76 deletions(-) diff --git a/hack/ccp/go.mod b/hack/ccp/go.mod index 4adee8e08..6ccda5110 100644 --- a/hack/ccp/go.mod +++ b/hack/ccp/go.mod @@ -25,7 +25,7 @@ require ( github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a github.com/testcontainers/testcontainers-go v0.30.0 github.com/testcontainers/testcontainers-go/modules/mysql v0.30.0 - go.artefactual.dev/ssclient v0.2.1 + go.artefactual.dev/ssclient v0.2.3 go.artefactual.dev/tools v0.10.0 go.nhat.io/httpmock v0.11.0 go.starlark.net v0.0.0-20240411212711-9b43f0afd521 diff --git a/hack/ccp/go.sum b/hack/ccp/go.sum index bcd7d1900..64a2c4417 100644 --- a/hack/ccp/go.sum +++ b/hack/ccp/go.sum @@ -371,8 +371,8 @@ github.com/yuin/goldmark-emoji v1.0.2 h1:c/RgTShNgHTtc6xdz2KKI74jJr6rWi7FPgnP9GA github.com/yuin/goldmark-emoji v1.0.2/go.mod h1:RhP/RWpexdp+KHs7ghKnifRoIs/Bq4nDS7tRbCkOwKY= github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= -go.artefactual.dev/ssclient v0.2.1 h1:PKS7o8D7Q7XU+g1YpjWtGzD0ZknOj8ZPdh0KV9jI0cY= -go.artefactual.dev/ssclient v0.2.1/go.mod h1:ImaAHtgGIbKlnrOUzczBMmltNVbhYkKZ7ujUjfBtUj8= +go.artefactual.dev/ssclient v0.2.3 h1:tVkjQEycs6420Ky++1aZZUBoz94wWS2FoiVzqBNH2gM= +go.artefactual.dev/ssclient v0.2.3/go.mod h1:ImaAHtgGIbKlnrOUzczBMmltNVbhYkKZ7ujUjfBtUj8= go.artefactual.dev/tools v0.10.0 h1:+LeZS5oHupAQBXvLQ4aGIuZyqf7zCpD7s3UpyDl9zn4= go.artefactual.dev/tools v0.10.0/go.mod h1:PIy0RtC45gC4sASb4r26g0aCU24kSWIp+mcV1p2gtpY= go.nhat.io/httpmock v0.11.0 h1:GSADjr4/sn1HXqnyluPr9PYpSmMh/h3ty0O7lEozD3c= diff --git a/hack/ccp/internal/controller/source.go b/hack/ccp/internal/controller/source.go index cbb5aa543..7b849b363 100644 --- a/hack/ccp/internal/controller/source.go +++ b/hack/ccp/internal/controller/source.go @@ -1,12 +1,15 @@ package controller import ( + "context" "errors" "fmt" "os" "path/filepath" "strings" + "github.com/google/uuid" + "github.com/artefactual/archivematica/hack/ccp/internal/derrors" "github.com/artefactual/archivematica/hack/ccp/internal/ssclient" ) @@ -24,11 +27,11 @@ import ( // This method does not rely on the activeTransfer watched directory. It does // not prompt the user to accept the transfer because we go directly into the // next chain link. -func StartTransfer(ssclient ssclient.Client, sharedDir, tmpDir, name, path string) error { +func StartTransfer(ctx context.Context, ssclient ssclient.Client, sharedDir, tmpDir, name, path string) error { destRel, destAbs, src := determineTransferPaths(sharedDir, tmpDir, name, path) fmt.Println(destRel, destAbs, src) - copyFromTransferSources(ssclient, []string{path}, destRel) + _ = copyFromTransferSources(ctx, ssclient, []string{path}, destRel) tsrc, tdst := "", "" dst, err := moveToInternalSharedDir(sharedDir, tsrc, tdst) @@ -61,12 +64,14 @@ func StartTransferWithWatchedDir() { // update transfer.currentlocation with the new destination } -func locationPath(locPath string) (id, path string) { - if before, after, found := strings.Cut(locPath, ":"); found { - id = before +func locationPath(locPath string) (id uuid.UUID, path string) { + before, after, found := strings.Cut(locPath, ":") + + if found { + id, _ = uuid.Parse(before) path = after } else { - id = before + path = before } return id, path @@ -147,9 +152,71 @@ func moveToInternalSharedDir(sharedDir, path, dest string) (_ string, err error) } } -func copyFromTransferSources(c ssclient.Client, paths []string, destRel string) { - // - processing_location = storage_service.get_first_location(purpose="CP") - // - transfer_sources = storage_service.get_location(purpose="TS") - // - _default_transfer_source_location_uuid - // - storage_service.copy_files(location, processing_location, files) +func copyFromTransferSources(ctx context.Context, c ssclient.Client, paths []string, destRel string) (err error) { + derrors.Add(&err, "copyFromTransferSources()") + + // We'll use the default transfer source location when a request does not + // indicate its source. + defaultTransferSource, err := c.ReadDefaultLocation(ctx, "TS") + if err != nil { + return err + } + + // Look up the destination, which is our pipeline processing location. + currentlyProcessing, err := c.ReadProcessingLocation(ctx) + if err != nil { + return err + } + + // filesByLocID is a list of all the copy operations that we'll be making, + // indexed by the identifier of the transfer source location. + transferSources, err := c.ListLocations(ctx, "", "TS") + if err != nil { + return err + } + type sourceFiles struct { + transferSource *ssclient.Location + files [][2]string // src, dst + } + filesByLocID := map[uuid.UUID]sourceFiles{} + for _, loc := range transferSources { + filesByLocID[loc.ID] = sourceFiles{ + transferSource: loc, + files: [][2]string{}, + } + } + + for _, item := range paths { + locID, path := locationPath(item) + if locID == uuid.Nil { + locID = defaultTransferSource.ID + } + ops, ok := filesByLocID[locID] + if !ok { + return fmt.Errorf("location %s is not associated with this pipeline", locID) + } + + source := strings.Replace(path, ops.transferSource.Path, "", 1) + source = strings.TrimPrefix(source, "/") + + var lastSegment string + if strings.HasSuffix(source, "/") { + lastSegment = joinPath(filepath.Base(strings.TrimSuffix(source, "/")), "") + } else { + lastSegment = filepath.Base(source) + } + + destination := filepath.Join(currentlyProcessing.Path, destRel, lastSegment) + destination = strings.Replace(destination, "%sharedPath%", "", 1) + + ops.files = append(ops.files, [2]string{source, destination}) + } + + for _, sf := range filesByLocID { + if copyErr := c.MoveFiles(ctx, sf.transferSource, currentlyProcessing, sf.files); err != nil { + err = errors.Join(err, copyErr) + } + } + + return err } diff --git a/hack/ccp/internal/controller/source_test.go b/hack/ccp/internal/controller/source_test.go index ff04da6f3..b96b796ac 100644 --- a/hack/ccp/internal/controller/source_test.go +++ b/hack/ccp/internal/controller/source_test.go @@ -4,6 +4,7 @@ import ( "path/filepath" "testing" + "github.com/google/uuid" "gotest.tools/v3/assert" "gotest.tools/v3/fs" ) @@ -34,7 +35,7 @@ func TestDetermineTransferPaths(t *testing.T) { }, want{ destRel: "/tmp/tmp.12345", - destAbs: "/var/archivematica/sharedDirectory/tmp/tmp.12345", + destAbs: "/var/archivematica/sharedDirectory/tmp/tmp.12345/transfer.tar.gz", src: "/var/source/transfer.tar.gz", }, }, @@ -108,3 +109,34 @@ func TestMoveToInternalSharedDir(t *testing.T) { assert.Equal(t, dest, tmpDir.Join("sharedDir", "deposits", filepath.Base(dest))) assert.Assert(t, fs.Equal(dest, fs.Expected(t, fs.WithFile("MARBLES.TGA", "contents"), fs.MatchAnyFileMode))) } + +func TestLocationPath(t *testing.T) { + t.Parallel() + + tests := []struct { + arg string + id uuid.UUID + path string + }{ + { + arg: "c059a454-dafa-418e-a126-74d0c7219ce6:/tmp", + id: uuid.MustParse("c059a454-dafa-418e-a126-74d0c7219ce6"), + path: "/tmp", + }, + { + arg: "/tmp", + id: uuid.Nil, + path: "/tmp", + }, + { + arg: "12345:/tmp", + id: uuid.Nil, + path: "/tmp", + }, + } + for _, tc := range tests { + id, path := locationPath(tc.arg) + assert.Equal(t, id, tc.id) + assert.Equal(t, path, tc.path) + } +} diff --git a/hack/ccp/internal/ssclient/ssclient.go b/hack/ccp/internal/ssclient/ssclient.go index df3da827b..7b4e65c73 100644 --- a/hack/ccp/internal/ssclient/ssclient.go +++ b/hack/ccp/internal/ssclient/ssclient.go @@ -2,6 +2,7 @@ package ssclient import ( "context" + "errors" "fmt" "net/http" "strings" @@ -21,6 +22,8 @@ import ( "github.com/artefactual/archivematica/hack/ccp/internal/store" ) +var ErrLocationNotAvailable = errors.New("location not available") + type Pipeline struct { ID uuid.UUID URI string @@ -41,8 +44,13 @@ type Location struct { type Client interface { ReadPipeline(ctx context.Context, id uuid.UUID) (*Pipeline, error) ReadDefaultLocation(ctx context.Context, purpose string) (*Location, error) + ReadProcessingLocation(ctx context.Context) (*Location, error) ListLocations(ctx context.Context, path, purpose string) ([]*Location, error) - CopyFiles(ctx context.Context, l *Location, files []string) error + + // MoveFiles moves files between locations. `files` is a list of pairs + // indicating the paths of the source file and its destination (both paths + // must be relative to their Location of the files to be moved). + MoveFiles(ctx context.Context, src, dst *Location, files [][2]string) error } // clientImpl implements Client. @@ -88,6 +96,77 @@ func (c *clientImpl) ReadPipeline(ctx context.Context, id uuid.UUID) (_ *Pipelin return p, nil } +func (c *clientImpl) ReadDefaultLocation(ctx context.Context, purpose string) (_ *Location, err error) { + derrors.Add(&err, "ReadDefaultLocation(%s)", purpose) + + p, err := c.pipeline(ctx) + if err != nil { + return nil, err + } + + headerOptions := kiotahttp.NewHeadersInspectionOptions() + headerOptions.InspectResponseHeaders = true + + reqConfig := &api.V2LocationDefaultWithPurposeItemRequestBuilderGetRequestConfiguration{ + Options: []kiotaabs.RequestOption{headerOptions}, + } + if err := c.client.Api().V2().Location().DefaultEscaped().ByPurpose(purpose).Get(ctx, reqConfig); err != nil { + return nil, err + } + + uris := headerOptions.ResponseHeaders.Get("Location") + if len(uris) < 1 { + return nil, ErrLocationNotAvailable + } + uri := uris[0] + if uri == "" { + return nil, ErrLocationNotAvailable + } + + // Capture the UUID in the URI, e.g. "/api/v2/location/be68cfa8-d32a-44ba-a140-2ec5d6b903e0/". + id := strings.TrimSuffix(strings.TrimPrefix(uri, "/api/v2/location/"), "/") + + res, err := c.client.Api().V2().Location().ByUuid(id).Get(ctx, nil) + if err != nil { + return nil, err + } + + // Confirm that the default location has been made available to this pipeline. + var match bool + for _, item := range res.GetPipeline() { + if item == p.URI { + match = true + break + } + } + if !match { + return nil, ErrLocationNotAvailable + } + + ret, err := convertLocation(res) + if err != nil { + return nil, err + } + + return ret, nil +} + +func (c *clientImpl) ReadProcessingLocation(ctx context.Context) (_ *Location, err error) { + derrors.Add(&err, "ReadProcessingLocation") + + res, err := c.ListLocations(ctx, "", models.CP_LOCATIONPURPOSE.String()) + if err != nil { + return nil, err + } + + if len(res) < 1 { + return nil, ErrLocationNotAvailable + } + + // We can have many but we'll return the first match. + return res[0], nil +} + func (c *clientImpl) ListLocations(ctx context.Context, path, purpose string) (_ []*Location, err error) { derrors.Add(&err, "ListLocations(%s, %s)", path, purpose) @@ -137,70 +216,30 @@ func (c *clientImpl) ListLocations(ctx context.Context, path, purpose string) (_ return ret, nil } -func (c *clientImpl) ReadDefaultLocation(ctx context.Context, purpose string) (_ *Location, err error) { - derrors.Add(&err, "ReadDefaultLocation(%s)", purpose) +func (c *clientImpl) MoveFiles(ctx context.Context, src, dst *Location, files [][2]string) (err error) { + derrors.Add(&err, "MoveFiles()") p, err := c.pipeline(ctx) if err != nil { - return nil, err - } - - headerOptions := kiotahttp.NewHeadersInspectionOptions() - headerOptions.InspectResponseHeaders = true - - reqConfig := &api.V2LocationDefaultWithPurposeItemRequestBuilderGetRequestConfiguration{ - Options: []kiotaabs.RequestOption{headerOptions}, - } - if err := c.client.Api().V2().Location().DefaultEscaped().ByPurpose(purpose).Get(ctx, reqConfig); err != nil { - return nil, err - } - - uris := headerOptions.ResponseHeaders.Get("Location") - if len(uris) < 1 { - return nil, fmt.Errorf("location not available") - } - uri := uris[0] - if uri == "" { - return nil, fmt.Errorf("location not available") - } - - // Capture the UUID in the URI, e.g. "/api/v2/location/be68cfa8-d32a-44ba-a140-2ec5d6b903e0/". - id := strings.TrimSuffix(strings.TrimPrefix(uri, "/api/v2/location/"), "/") - - res, err := c.client.Api().V2().Location().ByUuid(id).Get(ctx, nil) - if err != nil { - return nil, err + return err } - // Confirm that the default location has been made available to this pipeline. - var match bool - for _, item := range res.GetPipeline() { - if item == p.URI { - match = true - break - } - } - if !match { - return nil, fmt.Errorf("location not available") - } + body := models.NewMoveRequest() + body.SetPipeline(&p.URI) + body.SetOriginLocation(&src.URI) - ret, err := convertLocation(res) - if err != nil { - return nil, err + moves := make([]models.MoveFileable, 0, len(files)) + for _, f := range files { + m := models.NewMoveFile() + m.SetSource(&f[0]) + m.SetDestination(&f[1]) + moves = append(moves, m) } + body.SetFiles(moves) - return ret, nil -} - -func (c *clientImpl) CopyFiles(ctx context.Context, l *Location, files []string) (err error) { - derrors.Add(&err, "CopyFiles()") - - _, err = c.pipeline(ctx) - if err != nil { - return err - } + _, err = c.client.Api().V2().Location().ByUuid(dst.ID.String()).Post(ctx, body, nil) - return nil + return err } // pipeline returns the details of the current pipeline. diff --git a/hack/ccp/internal/ssclient/ssclient_test.go b/hack/ccp/internal/ssclient/ssclient_test.go index 7506a4eee..0057976be 100644 --- a/hack/ccp/internal/ssclient/ssclient_test.go +++ b/hack/ccp/internal/ssclient/ssclient_test.go @@ -2,6 +2,7 @@ package ssclient_test import ( "context" + "net/http" "testing" "github.com/google/uuid" @@ -113,6 +114,67 @@ func TestClient(t *testing.T) { }, }, + // + // ReadProcessingLocation + // + + "ReadProcessingLocation returns the CP location": { + store: func(rec *storemock.MockStoreMockRecorder) { + // It looks up the pipeline ID in the store. + expectStoreReadPipelineID(rec) + }, + server: httpmock.New(func(s *httpmock.Server) { + // It looks up the pipeline details. + s.ExpectGet("/api/v2/pipeline/fb2b8866-6f39-4616-b6cd-fa73193a3b05"). + ReturnHeader("Content-Type", "application/json"). + ReturnJSON(map[string]any{ + "uuid": "fb2b8866-6f39-4616-b6cd-fa73193a3b05", + "resource_uri": "/api/v2/pipeline/fb2b8866-6f39-4616-b6cd-fa73193a3b05/", + }) + + s.ExpectGet("/api/v2/location?limit=100&pipeline__uuid=fb2b8866-6f39-4616-b6cd-fa73193a3b05&purpose=CP"). + ReturnHeader("Content-Type", "application/json"). + Return(`{ + "meta": { + "limit": 100, + "next": null, + "offset": 0, + "previous": null, + "total_count": 1 + }, + "objects": [ + { + "description": null, + "enabled": true, + "path": "/var/archivematica/sharedDirectory", + "pipeline": ["/api/v2/pipeline/fb2b8866-6f39-4616-b6cd-fa73193a3b05/"], + "purpose": "CP", + "quota": null, + "relative_path": "var/archivematica/sharedDirectory/", + "resource_uri": "/api/v2/location/df192133-3b13-4292-a219-50887d285cb3/", + "space": "/api/v2/space/b4785c92-74c5-44d0-8d48-7f776fa55da7/", + "used": 0, + "uuid": "df192133-3b13-4292-a219-50887d285cb3" + } + + ] + }`) + }), + client: func(t *testing.T, c ssclient.Client) { + ret, err := c.ReadProcessingLocation(context.Background()) + + assert.NilError(t, err) + assert.DeepEqual(t, ret, &ssclient.Location{ + ID: uuid.MustParse("df192133-3b13-4292-a219-50887d285cb3"), + URI: "/api/v2/location/df192133-3b13-4292-a219-50887d285cb3/", + Purpose: "CP", + Path: "/var/archivematica/sharedDirectory", + RelativePath: "var/archivematica/sharedDirectory/", + Pipelines: []string{"/api/v2/pipeline/fb2b8866-6f39-4616-b6cd-fa73193a3b05/"}, + }) + }, + }, + // // ListLocations // @@ -177,12 +239,57 @@ func TestClient(t *testing.T) { }, // - // CopyFiles + // MoveFiles // - "CopyFiles ...": { // TODO - server: nil, - client: func(t *testing.T, c ssclient.Client) {}, + "MoveFiles moves files between locations": { + store: func(rec *storemock.MockStoreMockRecorder) { + // It looks up the pipeline ID in the store. + expectStoreReadPipelineID(rec) + }, + server: httpmock.New(func(s *httpmock.Server) { + // It looks up the pipeline details. + s.ExpectGet("/api/v2/pipeline/fb2b8866-6f39-4616-b6cd-fa73193a3b05"). + ReturnHeader("Content-Type", "application/json"). + ReturnJSON(map[string]any{ + "uuid": "fb2b8866-6f39-4616-b6cd-fa73193a3b05", + "resource_uri": "/api/v2/pipeline/fb2b8866-6f39-4616-b6cd-fa73193a3b05/", + }) + + // It posts a list of files. + s.ExpectPost("/api/v2/location/df192133-3b13-4292-a219-50887d285cb3/"). + WithBodyJSON(map[string]any{ + "files": []map[string]any{ + { + "source": "test", + "destination": "test", + }, + }, + "origin_location": "/api/v2/location/5cbbf1f6-7abe-474e-8dda-9904083a1831/", + "pipeline": "/api/v2/pipeline/fb2b8866-6f39-4616-b6cd-fa73193a3b05/", + }) + }), + client: func(t *testing.T, c ssclient.Client) { + err := c.MoveFiles( + context.Background(), + &ssclient.Location{ + ID: uuid.MustParse("5cbbf1f6-7abe-474e-8dda-9904083a1831"), + URI: "/api/v2/location/5cbbf1f6-7abe-474e-8dda-9904083a1831/", + }, + &ssclient.Location{ + ID: uuid.MustParse("df192133-3b13-4292-a219-50887d285cb3"), + URI: "/api/v2/location/df192133-3b13-4292-a219-50887d285cb3/", + }, + [][2]string{ + { + "test", // => /home/test + "test", // => /var/archivematica/sharedDirectory/test" + }, + }, + ) + + assert.NilError(t, err) + }, }, } for name, tc := range tests { @@ -202,7 +309,7 @@ func TestClient(t *testing.T) { } config := ssclient.Config{srv.URL(), "username", "api-key"} - c, err := ssclient.NewClient(nil, store, config) + c, err := ssclient.NewClient(&http.Client{}, store, config) assert.NilError(t, err) tc.client(t, c)