Skip to content

Commit

Permalink
rename to storage
Browse files Browse the repository at this point in the history
add s3 factory
add sftp health check
add filter support
  • Loading branch information
Maksym Trofimenko committed Jun 9, 2024
1 parent 4581b6e commit 23a6dd0
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 165 deletions.
4 changes: 2 additions & 2 deletions src/jobservice/job/impl/replication/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ import (
_ "github.com/goharbor/harbor/src/pkg/reg/adapter/tencentcr"
// register the VolcEngine CR Registry adapter
_ "github.com/goharbor/harbor/src/pkg/reg/adapter/volcenginecr"
// register sftp adapter
_ "github.com/goharbor/harbor/src/pkg/reg/adapter/sftp"
// register storage adapter
_ "github.com/goharbor/harbor/src/pkg/reg/adapter/storage"
"github.com/goharbor/harbor/src/pkg/reg/model"
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,110 +1,125 @@
package sftp
package storage

import (
"context"
"errors"
"fmt"
"github.com/davecgh/go-spew/spew"
"github.com/docker/distribution"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/storage"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/pkg/reg/adapter"
sftpdriver "github.com/goharbor/harbor/src/pkg/reg/adapter/sftp/driver"
"github.com/goharbor/harbor/src/common/utils"
regadapter "github.com/goharbor/harbor/src/pkg/reg/adapter"
"github.com/goharbor/harbor/src/pkg/reg/adapter/storage/health"
"github.com/goharbor/harbor/src/pkg/reg/filter"
"github.com/goharbor/harbor/src/pkg/reg/model"
"github.com/opencontainers/go-digest"
"io"
"strings"
)

func init() {
err := adapter.RegisterFactory(model.RegistryTypeSFTP, &factory{})
if err != nil {
log.Errorf("failed to register factory for dtr: %v", err)
return
}
log.Infof("the factory of SFTP sftpAdapter was registered")
}

type factory struct {
}

// Create ...
func (f *factory) Create(r *model.Registry) (adapter.Adapter, error) {
driver := sftpdriver.NewDriver(r)
ns, err := storage.NewRegistry(context.TODO(), driver)
if err != nil {
return nil, err
}
return &sftpAdapter{
regModel: r,
registry: ns,
}, nil
}

// AdapterPattern ...
func (f *factory) AdapterPattern() *model.AdapterPattern {
return nil
}

var (
_ adapter.Adapter = (*sftpAdapter)(nil)
_ adapter.ArtifactRegistry = (*sftpAdapter)(nil)
_ regadapter.Adapter = (*adapter)(nil)
_ regadapter.ArtifactRegistry = (*adapter)(nil)
)

type sftpAdapter struct {
type adapter struct {
regModel *model.Registry
driver storagedriver.StorageDriver
registry distribution.Namespace
}

func (a *sftpAdapter) FetchArtifacts(_ []*model.Filter) ([]*model.Resource, error) {
func (a *adapter) FetchArtifacts(filters []*model.Filter) ([]*model.Resource, error) {
fmt.Println("Fetch artifacts")

spew.Dump("filters", filters)
ctx := context.Background()
var repos = make([]string, 1000)
var repoNames = make([]string, 1000)

_, err := a.registry.Repositories(ctx, repos, "")
_, err := a.registry.Repositories(ctx, repoNames, "")
if err != nil && !errors.Is(err, io.EOF) {
return nil, fmt.Errorf("unable to get repositories: %v", err)
}
var resources []*model.Resource

for _, r := range repos {
if r == "" {
continue
}
if len(repoNames) == 0 {
return nil, nil
}

named, err := reference.WithName(r)
if err != nil {
return nil, fmt.Errorf("ref %s error: %v", r, err)
}
repo, err := a.registry.Repository(ctx, named)
if err != nil {
return nil, fmt.Errorf("unable to get repo %s: %v", r, err)
}
tags, err := repo.Tags(ctx).All(ctx)
if err != nil {
return nil, fmt.Errorf("unable to get all tags for repo %s: %v", r, err)
}
var repositories []*model.Repository
for _, repoName := range repoNames {
repositories = append(repositories, &model.Repository{
Name: repoName,
})
}

repositories, err = filter.DoFilterRepositories(repositories, filters)
if err != nil {
return nil, err
}

runner := utils.NewLimitedConcurrentRunner(10)

var rawResources = make([]*model.Resource, len(repositories))
for i, r := range repositories {
repo := r
index := i

resources = append(resources, &model.Resource{
Type: model.ResourceTypeImage,
Registry: a.regModel,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: r,
runner.AddTask(func() error {
named, err := reference.WithName(repo.Name)
if err != nil {
return fmt.Errorf("ref %s error: %v", repo.Name, err)
}
repository, err := a.registry.Repository(ctx, named)
if err != nil {
return fmt.Errorf("unable to get repo %s: %v", repo.Name, err)
}

tags, err := repository.Tags(ctx).All(ctx)
if err != nil {
return fmt.Errorf("unable to get all tags for repo %s: %v", r, err)
}

artifacts := []*model.Artifact{
{
Tags: tags,
},
Artifacts: []*model.Artifact{
{
Tags: tags,
}

artifacts, err = filter.DoFilterArtifacts(artifacts, filters)
if err != nil {
return fmt.Errorf("failed to list artifacts of repository %s: %v", repo, err)
}

if len(artifacts) == 0 {
return nil
}

rawResources[index] = &model.Resource{
Type: model.ResourceTypeImage,
Registry: a.regModel,
Metadata: &model.ResourceMetadata{
Repository: &model.Repository{
Name: r.Name,
},
Artifacts: artifacts,
},
},
}
return nil
})
}

var resources []*model.Resource

for _, r := range rawResources {
if r == nil {
continue
}
resources = append(resources, r)
}
return resources, nil
}

func (a *sftpAdapter) ManifestExist(repository, ref string) (exist bool, desc *distribution.Descriptor, err error) {
func (a *adapter) ManifestExist(repository, ref string) (exist bool, desc *distribution.Descriptor, err error) {
ctx := context.Background()

repo, err := a.getRepo(ctx, repository, ref)
Expand Down Expand Up @@ -145,7 +160,7 @@ func (a *sftpAdapter) ManifestExist(repository, ref string) (exist bool, desc *d
return true, &descriptor, nil
}

func (a *sftpAdapter) PullManifest(repository, ref string, _ ...string) (distribution.Manifest, string, error) {
func (a *adapter) PullManifest(repository, ref string, _ ...string) (distribution.Manifest, string, error) {
ctx := context.Background()

repo, err := a.getRepo(ctx, repository, ref)
Expand Down Expand Up @@ -184,7 +199,7 @@ func (a *sftpAdapter) PullManifest(repository, ref string, _ ...string) (distrib
}

// PushManifest manifests are blobs actually
func (a *sftpAdapter) PushManifest(repository, ref, mediaType string, payload []byte) (string, error) {
func (a *adapter) PushManifest(repository, ref, mediaType string, payload []byte) (string, error) {
ctx := context.Background()

repo, err := a.getRepo(ctx, repository, ref)
Expand All @@ -207,7 +222,7 @@ func (a *sftpAdapter) PushManifest(repository, ref, mediaType string, payload []
return descriptor.Digest.String(), nil
}

func (a *sftpAdapter) DeleteManifest(repository, ref string) error {
func (a *adapter) DeleteManifest(repository, ref string) error {
ctx := context.Background()

named, err := reference.WithName(repository)
Expand All @@ -223,11 +238,10 @@ func (a *sftpAdapter) DeleteManifest(repository, ref string) error {
if err != nil {
return err
}

return manifests.Delete(ctx, digest.Digest(ref))
}

func (a *sftpAdapter) BlobExist(repository, d string) (exist bool, err error) {
func (a *adapter) BlobExist(repository, d string) (exist bool, err error) {
ctx := context.Background()

repo, err := a.getRepo(ctx, repository, d)
Expand All @@ -246,7 +260,7 @@ func (a *sftpAdapter) BlobExist(repository, d string) (exist bool, err error) {
return true, nil
}

func (a *sftpAdapter) PullBlob(repository, d string) (int64, io.ReadCloser, error) {
func (a *adapter) PullBlob(repository, d string) (int64, io.ReadCloser, error) {
ctx := context.Background()

repo, err := a.getRepo(ctx, repository, d)
Expand All @@ -268,17 +282,15 @@ func (a *sftpAdapter) PullBlob(repository, d string) (int64, io.ReadCloser, erro
return descriptor.Size, readSeeker, nil
}

func (a *sftpAdapter) PullBlobChunk(_, _ string, _, _, _ int64) (size int64, blob io.ReadCloser, err error) {
func (a *adapter) PullBlobChunk(_, _ string, _, _, _ int64) (size int64, blob io.ReadCloser, err error) {
return 0, nil, fmt.Errorf("PullBlobChunk is not implemented")
}

func (a *sftpAdapter) PushBlobChunk(_, _ string, _ int64, _ io.Reader, _, _ int64, _ string) (nextUploadLocation string, endRange int64, err error) {
func (a *adapter) PushBlobChunk(_, _ string, _ int64, _ io.Reader, _, _ int64, _ string) (nextUploadLocation string, endRange int64, err error) {
return "", 0, fmt.Errorf("PushBlobChunk is not implemented")
}

func (a *sftpAdapter) PushBlob(repository, d string, size int64, r io.Reader) error {
fmt.Printf("push blob %s %d", d, size)

func (a *adapter) PushBlob(repository, d string, size int64, r io.Reader) error {
ctx := context.Background()

repo, err := a.getRepo(ctx, repository, d)
Expand Down Expand Up @@ -311,15 +323,15 @@ func (a *sftpAdapter) PushBlob(repository, d string, size int64, r io.Reader) er
return nil
}

func (a *sftpAdapter) MountBlob(_, _, _ string) (err error) {
func (a *adapter) MountBlob(_, _, _ string) (err error) {
return fmt.Errorf("MountBlob is not implemented")
}

func (a *sftpAdapter) CanBeMount(_ string) (mount bool, repository string, err error) {
func (a *adapter) CanBeMount(_ string) (mount bool, repository string, err error) {
return false, "", nil
}

func (a *sftpAdapter) DeleteTag(r, tag string) error {
func (a *adapter) DeleteTag(r, tag string) error {
ctx := context.Background()
named, err := reference.WithName(r)
if err != nil {
Expand All @@ -332,7 +344,7 @@ func (a *sftpAdapter) DeleteTag(r, tag string) error {
return repo.Tags(ctx).Untag(ctx, tag)
}

func (a *sftpAdapter) ListTags(r string) ([]string, error) {
func (a *adapter) ListTags(r string) ([]string, error) {
ctx := context.Background()

named, err := reference.WithName(r)
Expand All @@ -350,15 +362,22 @@ func (a *sftpAdapter) ListTags(r string) ([]string, error) {
return tags, nil
}

func (a *sftpAdapter) PrepareForPush(_ []*model.Resource) error {
func (a *adapter) PrepareForPush(_ []*model.Resource) error {
return nil
}

func (a *sftpAdapter) HealthCheck() (string, error) {
func (a *adapter) HealthCheck() (string, error) {
checker, ok := a.driver.(health.Checker)
if !ok {
return model.Healthy, nil
}
if err := checker.Health(context.Background()); err != nil {
return model.Unhealthy, err
}
return model.Healthy, nil
}

func (a *sftpAdapter) getRepo(ctx context.Context, repository, ref string) (distribution.Repository, error) {
func (a *adapter) getRepo(ctx context.Context, repository, ref string) (distribution.Repository, error) {
named, err := reference.WithName(repository)
if err != nil {
return nil, err
Expand All @@ -375,13 +394,22 @@ func (a *sftpAdapter) getRepo(ctx context.Context, repository, ref string) (dist
return a.registry.Repository(ctx, named)
}

func (a *sftpAdapter) Info() (*model.RegistryInfo, error) {
func (a *adapter) Info() (*model.RegistryInfo, error) {
return &model.RegistryInfo{
Type: model.RegistryTypeSFTP,
SupportedResourceTypes: []string{
model.ResourceTypeImage,
},
SupportedResourceFilters: []*model.FilterStyle{},
SupportedResourceFilters: []*model.FilterStyle{
{
Type: model.FilterTypeName,
Style: model.FilterStyleTypeText,
},
{
Type: model.FilterTypeTag,
Style: model.FilterStyleTypeText,
},
},
SupportedTriggers: []string{
model.TriggerTypeManual,
model.TriggerTypeScheduled,
Expand Down
Loading

0 comments on commit 23a6dd0

Please sign in to comment.