Skip to content

Commit

Permalink
feat(blob): implement object and object url repository (#117)
Browse files Browse the repository at this point in the history
Because

1. artifact has two tables, object and object_url
2. bucket name is not changed in env
3. we uses goroutines for operate minIO,e.g files deletion, which may
overwhelm minIO when file are to many at the same time.

This commit

1. implements the repositroy lib for new tables
2. make bucket name as const in code
3. implement rate limit to operate minIO
  • Loading branch information
Yougigun authored Oct 18, 2024
1 parent e1cedf3 commit 4107ad1
Show file tree
Hide file tree
Showing 19 changed files with 12,315 additions and 7,730 deletions.
9 changes: 4 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,10 @@ type RegistryConfig struct {

// MinioConfig is the minio configuration.
type MinioConfig struct {
Host string `koanf:"host"`
Port string `koanf:"port"`
RootUser string `koanf:"rootuser"`
RootPwd string `koanf:"rootpwd"`
BucketName string `koanf:"bucketname"`
Host string `koanf:"host"`
Port string `koanf:"port"`
RootUser string `koanf:"rootuser"`
RootPwd string `koanf:"rootpwd"`
}

// MilvusConfig is the milvus configuration.
Expand Down
3 changes: 1 addition & 2 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ database:
host: pg-sql
port: 5432
name: artifact
version: 16
version: 17
timezone: Etc/UTC
pool:
idleconnections: 5
Expand Down Expand Up @@ -71,7 +71,6 @@ minio:
port: 9000
rootuser: minioadmin
rootpwd: minioadmin
bucketname: instill-ai-knowledge-bases
milvus:
host: milvus
port: 19530
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/influxdata/influxdb-client-go/v2 v2.12.3
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241001150423-8d8b9e2fa860
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241018045010-dcc80f850d9d
github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61
github.com/instill-ai/x v0.3.0-alpha.0.20231219052200-6230a89e386c
github.com/knadh/koanf v1.5.0
Expand Down Expand Up @@ -102,3 +102,5 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

// replace github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241001150423-8d8b9e2fa860 => ./protogen-go
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,10 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241001150423-8d8b9e2fa860 h1:HsOZQH0CMS5L1KYcXUrXHsgcmjFUS9mTBADSTHs1FSk=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241001150423-8d8b9e2fa860/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241012090311-e872dc0b511d h1:jf2RQtRFNxnPMkjTD0AAqXDXO8lHYOrWU3Hrr+yGEzY=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241012090311-e872dc0b511d/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241018045010-dcc80f850d9d h1:6/5voyjeqeeeYszZ7XjifG2pekRDYdqWD0bgeKz9LHw=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241018045010-dcc80f850d9d/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61 h1:smPTvmXDhn/QC7y/TPXyMTqbbRd0gvzmFgWBChwTfhE=
github.com/instill-ai/usage-client v0.3.0-alpha.0.20240319060111-4a3a39f2fd61/go.mod h1:/TAHs4ybuylk5icuy+MQtHRc4XUnIyXzeNKxX9qDFhw=
github.com/instill-ai/x v0.3.0-alpha.0.20231219052200-6230a89e386c h1:a2RVkpIV2QcrGnSHAou+t/L+vBsaIfFvk5inVg5Uh4s=
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
BEGIN;

-- Drop object_url table
DROP TABLE IF EXISTS object_url;

-- Drop object table
DROP TABLE IF EXISTS object;
COMMIT;
60 changes: 60 additions & 0 deletions pkg/db/migration/000017_create_object_and_object_url_table.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
BEGIN;
-- Create object table
CREATE TABLE IF NOT EXISTS object (
uid UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(1040) NOT NULL,
size BIGINT NOT NULL,
content_type VARCHAR(255) NOT NULL,
namespace_uid UUID NOT NULL,
creator_uid UUID NOT NULL,
is_uploaded BOOLEAN NOT NULL DEFAULT FALSE,
destination VARCHAR(255),
object_expire_days INTEGER,
last_modified_time TIMESTAMP,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
delete_time TIMESTAMP
);
-- Create indexes
CREATE INDEX IF NOT EXISTS idx_object_namespace_uid_creator ON object(namespace_uid, creator_uid);
-- Add comments for all columns
COMMENT ON COLUMN object.uid IS 'Unique identifier(uuid) for the object';
COMMENT ON COLUMN object.name IS 'Name of the object';
COMMENT ON COLUMN object.size IS 'Size of the object in bytes';
COMMENT ON COLUMN object.content_type IS 'MIME type of the object';
COMMENT ON COLUMN object.namespace_uid IS 'Namespace identifier(uuid) for the object';
COMMENT ON COLUMN object.creator_uid IS 'Creator of the object';
COMMENT ON COLUMN object.is_uploaded IS 'Flag indicating if the object is uploaded';
COMMENT ON COLUMN object.destination IS 'The destination of the object in the object storage';
COMMENT ON COLUMN object.object_expire_days IS 'The number of days the object will be expired';
COMMENT ON COLUMN object.last_modified_time IS 'Timestamp when the local file was last modified';
COMMENT ON COLUMN object.create_time IS 'Timestamp when the object was created';
COMMENT ON COLUMN object.update_time IS 'Timestamp when the object was last updated';
COMMENT ON COLUMN object.delete_time IS 'Timestamp when the object was deleted';
-- Create object_url table
CREATE TABLE IF NOT EXISTS object_url (
uid UUID PRIMARY KEY DEFAULT gen_random_uuid(),
namespace_uid UUID NOT NULL,
object_uid UUID NOT NULL REFERENCES object(uid) ON DELETE CASCADE,
url_expire_at TIMESTAMP NOT NULL,
minio_url_path TEXT NOT NULL,
encoded_url_path TEXT NOT NULL,
type VARCHAR(10) NOT NULL CHECK (type IN ('upload', 'download')),
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
delete_time TIMESTAMP
);
-- Add comments for all columns
COMMENT ON COLUMN object_url.uid IS 'Unique identifier(uuid) for the object url';
COMMENT ON COLUMN object_url.namespace_uid IS 'Namespace identifier(uuid) for the object url';
COMMENT ON COLUMN object_url.object_uid IS 'Object identifier(uuid) for the object url';
COMMENT ON COLUMN object_url.url_expire_at IS 'Timestamp when the object url will be expired';
COMMENT ON COLUMN object_url.minio_url_path IS 'The minio url path for the object';
COMMENT ON COLUMN object_url.encoded_url_path IS 'The encoded url path for the object';
COMMENT ON COLUMN object_url.type IS 'The type of the object url';
COMMENT ON COLUMN object_url.create_time IS 'Timestamp when the object url was created';
COMMENT ON COLUMN object_url.update_time IS 'Timestamp when the object url was last updated';
COMMENT ON COLUMN object_url.delete_time IS 'Timestamp when the object url was deleted';
-- Create indexes
CREATE INDEX IF NOT EXISTS idx_namespace_uid_object_uid ON object_url(namespace_uid, object_uid);
COMMIT;
5 changes: 3 additions & 2 deletions pkg/handler/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/gofrs/uuid"
"github.com/instill-ai/artifact-backend/pkg/customerror"
"github.com/instill-ai/artifact-backend/pkg/logger"
"github.com/instill-ai/artifact-backend/pkg/minio"
"github.com/instill-ai/artifact-backend/pkg/repository"
artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha"
"go.uber.org/zap"
Expand Down Expand Up @@ -86,7 +87,7 @@ func (ph *PublicHandler) GetFileCatalog(ctx context.Context, req *artifactpb.Get
}

// get the source file sourceContent from minIO using dest of source
sourceContent, err := ph.service.MinIO.GetFile(ctx, source.Dest)
sourceContent, err := ph.service.MinIO.GetFile(ctx, minio.KnowledgeBaseBucketName, source.Dest)
if err != nil {
log.Error("failed to get file from minio", zap.Error(err))
return nil, fmt.Errorf("failed to get file from minio. err: %w", err)
Expand Down Expand Up @@ -157,7 +158,7 @@ func (ph *PublicHandler) GetFileCatalog(ctx context.Context, req *artifactpb.Get
}

// Retrieve the original file content from MinIO
originalContent, err := ph.service.MinIO.GetFile(ctx, kbFile.Destination)
originalContent, err := ph.service.MinIO.GetFile(ctx, minio.KnowledgeBaseBucketName, kbFile.Destination)
if err != nil {
log.Error("failed to get original file from minio", zap.Error(err))
return nil, fmt.Errorf("failed to get original file from minio. err: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/handler/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/gofrs/uuid"
"github.com/instill-ai/artifact-backend/pkg/customerror"
"github.com/instill-ai/artifact-backend/pkg/logger"
"github.com/instill-ai/artifact-backend/pkg/minio"
"github.com/instill-ai/artifact-backend/pkg/repository"
artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha"
"go.uber.org/zap"
Expand Down Expand Up @@ -173,7 +174,7 @@ func (ph *PublicHandler) GetSourceFile(ctx context.Context, req *artifactpb.GetS
}

// get the source file content from minIO using dest of source
content, err := ph.service.MinIO.GetFile(ctx, source.Dest)
content, err := ph.service.MinIO.GetFile(ctx, minio.KnowledgeBaseBucketName, source.Dest)
if err != nil {
log.Error("failed to get file from minio", zap.Error(err))
return nil, fmt.Errorf("failed to get file from minio. err: %w", err)
Expand Down
5 changes: 3 additions & 2 deletions pkg/handler/knowledgebasefiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/instill-ai/artifact-backend/pkg/constant"
"github.com/instill-ai/artifact-backend/pkg/customerror"
"github.com/instill-ai/artifact-backend/pkg/logger"
"github.com/instill-ai/artifact-backend/pkg/minio"
"github.com/instill-ai/artifact-backend/pkg/repository"
"github.com/instill-ai/artifact-backend/pkg/resource"
"github.com/instill-ai/artifact-backend/pkg/utils"
Expand Down Expand Up @@ -129,7 +130,7 @@ func (ph *PublicHandler) UploadCatalogFile(ctx context.Context, req *artifactpb.
// create catalog file in database
res, err = ph.service.Repository.CreateKnowledgeBaseFile(ctx, kbFile, func(FileUID string) error {
// upload file to minio
err := ph.service.MinIO.UploadBase64File(ctx, destination, req.File.Content, fileTypeConvertToMime(req.File.Type))
err := ph.service.MinIO.UploadBase64File(ctx, minio.KnowledgeBaseBucketName, destination, req.File.Content, fileTypeConvertToMime(req.File.Type))
if err != nil {
return err
}
Expand Down Expand Up @@ -410,7 +411,7 @@ func (ph *PublicHandler) DeleteCatalogFile(
}

// Delete the files in MinIO
errChan := ph.service.MinIO.DeleteFiles(ctx, objectPaths)
errChan := ph.service.MinIO.DeleteFiles(ctx, minio.KnowledgeBaseBucketName, objectPaths)
for err := range errChan {
if err != nil {
log.Error("failed to delete files in minio", zap.Error(err))
Expand Down
3 changes: 2 additions & 1 deletion pkg/handler/qa.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/instill-ai/artifact-backend/pkg/constant"
"github.com/instill-ai/artifact-backend/pkg/customerror"
"github.com/instill-ai/artifact-backend/pkg/logger"
"github.com/instill-ai/artifact-backend/pkg/minio"
"github.com/instill-ai/artifact-backend/pkg/repository"
"github.com/instill-ai/artifact-backend/pkg/resource"
"github.com/instill-ai/artifact-backend/pkg/service"
Expand Down Expand Up @@ -113,7 +114,7 @@ func (ph *PublicHandler) QuestionAnswering(
log.Info("get chunks by uids", zap.Duration("duration", time.Since(t)))
t = time.Now()
// fetch the chunks content from minio
chunkContents, err := ph.service.MinIO.GetFilesByPaths(ctx, chunkFilePaths)
chunkContents, err := ph.service.MinIO.GetFilesByPaths(ctx, minio.KnowledgeBaseBucketName, chunkFilePaths)
if err != nil {
log.Error("failed to get chunks content", zap.Error(err))
return nil, fmt.Errorf("failed to get chunks content. err: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/handler/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/instill-ai/artifact-backend/pkg/constant"
"github.com/instill-ai/artifact-backend/pkg/customerror"
"github.com/instill-ai/artifact-backend/pkg/logger"
"github.com/instill-ai/artifact-backend/pkg/minio"
"github.com/instill-ai/artifact-backend/pkg/repository"
"github.com/instill-ai/artifact-backend/pkg/resource"
"github.com/instill-ai/artifact-backend/pkg/service"
Expand Down Expand Up @@ -97,7 +98,7 @@ func (ph *PublicHandler) SimilarityChunksSearch(
log.Info("get chunks by uids", zap.Duration("duration", time.Since(t)))
t = time.Now()
// fetch the chunks content from minio
chunkContents, err := ph.service.MinIO.GetFilesByPaths(ctx, chunkFilePaths)
chunkContents, err := ph.service.MinIO.GetFilesByPaths(ctx, minio.KnowledgeBaseBucketName, chunkFilePaths)
if err != nil {
log.Error("failed to get chunks content", zap.Error(err))
return nil, fmt.Errorf("failed to get chunks content. err: %w", err)
Expand Down
12 changes: 6 additions & 6 deletions pkg/minio/knowledgebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (m *Minio) SaveConvertedFile(ctx context.Context, kbUID, convertedFileUID,
mimeType = "text/markdown"
}

err := m.UploadBase64File(ctx, filePathName, base64.StdEncoding.EncodeToString(content), mimeType)
err := m.UploadBase64File(ctx, KnowledgeBaseBucketName, filePathName, base64.StdEncoding.EncodeToString(content), mimeType)
if err != nil {
return err
}
Expand Down Expand Up @@ -79,7 +79,7 @@ func (m *Minio) SaveTextChunks(ctx context.Context, kbUID string, chunks map[Chu
defer wg.Done()
filePathName := m.GetChunkPathInKnowledgeBase(kbUID, string(chunkUID))

err := m.UploadBase64File(ctx, filePathName, base64.StdEncoding.EncodeToString(chunkContent), "text/plain")
err := m.UploadBase64File(ctx, KnowledgeBaseBucketName, filePathName, base64.StdEncoding.EncodeToString(chunkContent), "text/plain")
if err != nil {
logger.Error("Failed to upload chunk after retries", zap.String("chunkUID", string(chunkUID)), zap.Error(err))
errorUIDChan <- ChunkError{ChunkUID: string(chunkUID), ErrorMessage: err.Error()}
Expand Down Expand Up @@ -110,30 +110,30 @@ func (m *Minio) SaveTextChunks(ctx context.Context, kbUID string, chunks map[Chu
// Delete all files in the knowledge base
func (m *Minio) DeleteKnowledgeBase(ctx context.Context, kbUID string) chan error {
// List all objects in the knowledge base
err := m.DeleteFilesWithPrefix(ctx, kbUID)
err := m.DeleteFilesWithPrefix(ctx, KnowledgeBaseBucketName, kbUID)
return err
}

// Delete converted files in the knowledge base
func (m *Minio) DeleteAllConvertedFilesInKb(ctx context.Context, kbUID string) chan error {
// List all objects in the knowledge base
err := m.DeleteFilesWithPrefix(ctx, kbUID+convertedFilePrefix)
err := m.DeleteFilesWithPrefix(ctx, KnowledgeBaseBucketName, kbUID+convertedFilePrefix)

return err
}

// Delete uploaded files in the knowledge base
func (m *Minio) DeleteAllUploadedFilesInKb(ctx context.Context, kbUID string) chan error {
// List all objects in the knowledge base
err := m.DeleteFilesWithPrefix(ctx, kbUID+uploadedFilePrefix)
err := m.DeleteFilesWithPrefix(ctx, KnowledgeBaseBucketName, kbUID+uploadedFilePrefix)

return err
}

// Delete chunks in the knowledge base
func (m *Minio) DeleteAllChunksInKb(ctx context.Context, kbUID string) chan error {
// List all objects in the knowledge base
err := m.DeleteFilesWithPrefix(ctx, kbUID+chunkPrefix)
err := m.DeleteFilesWithPrefix(ctx, KnowledgeBaseBucketName, kbUID+chunkPrefix)

return err
}
Expand Down
Loading

0 comments on commit 4107ad1

Please sign in to comment.