diff --git a/go.mod b/go.mod index 98e33c8..e4a9afd 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 github.com/influxdata/influxdb-client-go/v2 v2.12.3 - github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129082755-59b3c0c34fe0 + github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241206090214-bf18be49e1a9 github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61 github.com/instill-ai/x v0.3.0-alpha.0.20231219052200-6230a89e386c github.com/knadh/koanf v1.5.0 diff --git a/go.sum b/go.sum index 3f14fbe..b685b77 100644 --- a/go.sum +++ b/go.sum @@ -344,8 +344,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0 github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= -github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129082755-59b3c0c34fe0 h1:Fok/s7GQoNMUA++1WbDdiZ6Ut8AXSuWNkTU2Q/0G9QA= -github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129082755-59b3c0c34fe0/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY= +github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241206090214-bf18be49e1a9 h1:TVCoJt9PQjqTSCNyjagQHV5kteCs8Y8Qh1QBhSdZtac= +github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241206090214-bf18be49e1a9/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY= github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61 h1:smPTvmXDhn/QC7y/TPXyMTqbbRd0gvzmFgWBChwTfhE= github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61/go.mod h1:/TAHs4ybuylk5icuy+MQtHRc4XUnIyXzeNKxX9qDFhw= github.com/instill-ai/x v0.3.0-alpha.0.20231219052200-6230a89e386c h1:a2RVkpIV2QcrGnSHAou+t/L+vBsaIfFvk5inVg5Uh4s= diff --git a/pkg/handler/knowledgebasefiles.go b/pkg/handler/knowledgebasefiles.go index 8a2dc47..c6dcf88 100644 --- a/pkg/handler/knowledgebasefiles.go +++ b/pkg/handler/knowledgebasefiles.go @@ -2,6 +2,7 @@ package handler import ( "context" + "encoding/base64" "fmt" "strings" @@ -219,6 +220,108 @@ func checkUploadKnowledgeBaseFileRequest(req *artifactpb.UploadCatalogFileReques return nil } +// MoveFileToCatalog moves a file from one catalog to another within the same namespace. +// It copies the file content and metadata to the target catalog and deletes +// the file from the source catalog. +func (ph *PublicHandler) MoveFileToCatalog(ctx context.Context, req *artifactpb.MoveFileToCatalogRequest) (*artifactpb.MoveFileToCatalogResponse, error) { + log, _ := logger.GetZapLogger(ctx) + + // Validate authentication and request parameters + _, err := getUserUIDFromContext(ctx) + if err != nil { + err := fmt.Errorf("failed to get user uid from header: %v. err: %w", err, customerror.ErrUnauthenticated) + return nil, err + } + if req.FileUid == "" { + return nil, fmt.Errorf("file uid is required. err: %w", customerror.ErrInvalidArgument) + } + if req.ToCatalogId == "" { + return nil, fmt.Errorf("to catalog id is required. err: %w", customerror.ErrInvalidArgument) + } + + // Step 1: Verify source file exists and check namespace permissions + sourceFiles, err := ph.service.Repository.GetKnowledgeBaseFilesByFileUIDs(ctx, []uuid.UUID{uuid.FromStringOrNil(req.FileUid)}) + if err != nil || len(sourceFiles) == 0 { + log.Error("file not found", zap.Error(err)) + return nil, fmt.Errorf("file not found. err: %w", customerror.ErrNotFound) + } + + sourceFile := sourceFiles[0] + // Verify namespace exists and get its details + reqNamespace, err := ph.service.GetNamespaceByNsID(ctx, req.NamespaceId) + if err != nil { + log.Error("failed to get namespace uid from source file", zap.Error(err)) + return nil, fmt.Errorf("failed to get namespace uid from source file. err: %w", err) + } + // Ensure file movement occurs within the same namespace + if reqNamespace.NsUID.String() != sourceFile.Owner.String() { + return nil, fmt.Errorf("source file is not in the same namespace. err: %w", customerror.ErrInvalidArgument) + } + + // Step 2: Verify target catalog exists + targetCatalog, err := ph.service.Repository.GetKnowledgeBaseByOwnerAndKbID(ctx, reqNamespace.NsUID, req.ToCatalogId) + if err != nil { + log.Error("target catalog not found", zap.Error(err)) + return nil, fmt.Errorf("target catalog not found. err: %w", err) + } + + // Step 3: Retrieve file content from MinIO storage + fileContent, err := ph.service.MinIO.GetFile(ctx, minio.KnowledgeBaseBucketName, sourceFile.Destination) + if err != nil { + log.Error("failed to get file content from MinIO", zap.Error(err)) + return nil, fmt.Errorf("failed to get file content from MinIO. err: %w", err) + } + + // Prepare file content and metadata for upload + fileContentBase64 := base64.StdEncoding.EncodeToString(fileContent) + fileType := artifactpb.FileType(artifactpb.FileType_value[sourceFile.Type]) + externalMetadata := sourceFile.ExternalMetadataUnmarshal + + // Step 4: Create file in target catalog + uploadReq := &artifactpb.UploadCatalogFileRequest{ + NamespaceId: req.NamespaceId, + CatalogId: targetCatalog.KbID, + File: &artifactpb.File{ + Name: sourceFile.Name, + Content: fileContentBase64, + Type: fileType, + ExternalMetadata: externalMetadata, + }, + } + + uploadResp, err := ph.UploadCatalogFile(ctx, uploadReq) + if err != nil { + log.Error("failed to upload file to target catalog", zap.Error(err)) + return nil, fmt.Errorf("failed to upload file to target catalog. err: %w", err) + } + + // process the file + processReq := &artifactpb.ProcessCatalogFilesRequest{ + FileUids: []string{uploadResp.File.FileUid}, + } + _, err = ph.ProcessCatalogFiles(ctx, processReq) + if err != nil { + log.Error("failed to process file", zap.Error(err)) + return nil, fmt.Errorf("failed to process file. err: %w", err) + } + + // Step 5: delete the source file + deleteReq := &artifactpb.DeleteCatalogFileRequest{ + FileUid: sourceFile.UID.String(), + } + _, err = ph.DeleteCatalogFile(ctx, deleteReq) + if err != nil { + log.Error("failed to delete file from original catalog", + zap.String("file_uid", sourceFile.UID.String()), + zap.Error(err)) + } + + // Return the UID of the newly created file + return &artifactpb.MoveFileToCatalogResponse{ + FileUid: uploadResp.File.FileUid, + }, nil +} + func (ph *PublicHandler) ListCatalogFiles(ctx context.Context, req *artifactpb.ListCatalogFilesRequest) (*artifactpb.ListCatalogFilesResponse, error) { log, _ := logger.GetZapLogger(ctx) @@ -364,7 +467,7 @@ func (ph *PublicHandler) DeleteCatalogFile( } startSignal := make(chan bool) - // TODO: need to use clean worker in the future + // TODO: need to use clean worker to prevent the service from being restarted before the file is deleted go utils.GoRecover( func() { // Create a new context to prevent the parent context from being cancelled diff --git a/pkg/repository/knowledgebasefile.go b/pkg/repository/knowledgebasefile.go index e5727d2..fae050f 100644 --- a/pkg/repository/knowledgebasefile.go +++ b/pkg/repository/knowledgebasefile.go @@ -64,6 +64,7 @@ type KnowledgeBaseFile struct { Name string `gorm:"column:name;size:255;not null" json:"name"` // Type is defined in the grpc proto file Type string `gorm:"column:type;not null" json:"type"` + // Destination is the path in the MinIO bucket Destination string `gorm:"column:destination;size:255;not null" json:"destination"` // Process status is defined in the grpc proto file ProcessStatus string `gorm:"column:process_status;size:100;not null" json:"process_status"`