Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(artifact): implement fast indexing for temporary catalog #134

Merged
merged 1 commit into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,14 @@ func main() {
}),
)

// activate file-to-embeddings worker pool
wp := worker.NewFileToEmbWorkerPool(ctx, service, config.Config.FileToEmbeddingWorker.NumberOfWorkers)
// activate persistent catalog file-to-embeddings worker pool
wp := worker.NewPersistentCatalogFileToEmbWorkerPool(ctx, service, config.Config.FileToEmbeddingWorker.NumberOfWorkers, artifactPB.CatalogType_CATALOG_TYPE_PERSISTENT)
wp.Start()

// activate temp(ephemeral) catalog file-to-embeddings worker pool
wpTemp := worker.NewTempCatalogFileToEmbWorkerPool(ctx, service, config.Config.FileToEmbeddingWorker.NumberOfWorkers, artifactPB.CatalogType_CATALOG_TYPE_EPHEMERAL)
wpTemp.Start()

// Start usage reporter
var usg usage.Usage
if config.Config.Server.Usage.Enabled {
Expand Down
50 changes: 28 additions & 22 deletions pkg/handler/knowledgebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,11 @@ func (ph *PublicHandler) CreateCatalog(ctx context.Context, req *artifactpb.Crea
service.NamespaceID + "/" + service.ConvertDocToMDPipelineID2 + "@" + service.DocToMDVersion2,
},
SplittingPipelines: []string{
service.NamespaceID + "/" + service.TextChunkPipelineID + "@" + service.TextSplitVersion,
service.NamespaceID + "/" + service.MdChunkPipelineID + "@" + service.MdSplitVersion,
service.NamespaceID + "/" + service.ChunkTextPipelineID + "@" + service.ChunkTextVersion,
service.NamespaceID + "/" + service.ChunkMdPipelineID + "@" + service.ChunkMdVersion,
},
EmbeddingPipelines: []string{
service.NamespaceID + "/" + service.TextEmbedPipelineID + "@" + service.TextEmbedVersion,
service.NamespaceID + "/" + service.EmbedTextPipelineID + "@" + service.EmbedTextVersion,
},
DownstreamApps: []string{},
TotalFiles: 0,
Expand Down Expand Up @@ -235,11 +235,11 @@ func (ph *PublicHandler) ListCatalogs(ctx context.Context, req *artifactpb.ListC
service.NamespaceID + "/" + service.ConvertDocToMDPipelineID2 + "@" + service.DocToMDVersion2,
},
SplittingPipelines: []string{
service.NamespaceID + "/" + service.TextChunkPipelineID + "@" + service.TextSplitVersion,
service.NamespaceID + "/" + service.MdChunkPipelineID + "@" + service.MdSplitVersion,
service.NamespaceID + "/" + service.ChunkTextPipelineID + "@" + service.ChunkTextVersion,
service.NamespaceID + "/" + service.ChunkMdPipelineID + "@" + service.ChunkMdVersion,
},
EmbeddingPipelines: []string{
service.NamespaceID + "/" + service.TextEmbedPipelineID + "@" + service.TextEmbedVersion,
service.NamespaceID + "/" + service.EmbedTextPipelineID + "@" + service.EmbedTextVersion,
},
DownstreamApps: []string{},
TotalFiles: uint32(fileCounts[kb.UID]),
Expand All @@ -262,7 +262,7 @@ func (ph *PublicHandler) UpdateCatalog(ctx context.Context, req *artifactpb.Upda
if req.CatalogId == "" {
log.Error("kb_id is empty", zap.Error(ErrCheckRequiredFields))
return nil, fmt.Errorf("kb_id is empty. err: %w", ErrCheckRequiredFields)
}
}

ns, err := ph.service.GetNamespaceByNsID(ctx, req.GetNamespaceId())
if err != nil {
Expand Down Expand Up @@ -318,22 +318,28 @@ func (ph *PublicHandler) UpdateCatalog(ctx context.Context, req *artifactpb.Upda
// populate response
return &artifactpb.UpdateCatalogResponse{
Catalog: &artifactpb.Catalog{
Name: kb.Name,
CatalogId: kb.KbID,
Description: kb.Description,
Tags: kb.Tags,
CreateTime: kb.CreateTime.String(),
UpdateTime: kb.UpdateTime.String(),
OwnerName: kb.Owner,
ConvertingPipelines: []string{service.NamespaceID + "/" + service.ConvertDocToMDPipelineID},
Name: kb.Name,
CatalogId: kb.KbID,
Description: kb.Description,
Tags: kb.Tags,
CreateTime: kb.CreateTime.String(),
UpdateTime: kb.UpdateTime.String(),
OwnerName: kb.Owner,
ConvertingPipelines: []string{
service.NamespaceID + "/" + service.ConvertDocToMDPipelineID,
service.NamespaceID + "/" + service.ConvertDocToMDPipelineID2,
},
SplittingPipelines: []string{
service.NamespaceID + "/" + service.TextChunkPipelineID,
service.NamespaceID + "/" + service.MdChunkPipelineID},
EmbeddingPipelines: []string{service.NamespaceID + "/" + service.TextEmbedPipelineID},
DownstreamApps: []string{},
TotalFiles: uint32(fileCounts[kb.UID]),
TotalTokens: uint32(tokenCounts[kb.UID]),
UsedStorage: uint64(kb.Usage),
service.NamespaceID + "/" + service.ChunkTextPipelineID,
service.NamespaceID + "/" + service.ChunkMdPipelineID,
},
EmbeddingPipelines: []string{
service.NamespaceID + "/" + service.EmbedTextPipelineID,
},
DownstreamApps: []string{},
TotalFiles: uint32(fileCounts[kb.UID]),
TotalTokens: uint32(tokenCounts[kb.UID]),
UsedStorage: uint64(kb.Usage),
},
}, nil
}
Expand Down
60 changes: 44 additions & 16 deletions pkg/mock/repository_i_mock.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 50 additions & 14 deletions pkg/repository/knowledgebasefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type KnowledgeBaseFileI interface {
// ProcessKnowledgeBaseFiles updates the process status of the files
ProcessKnowledgeBaseFiles(ctx context.Context, fileUIDs []string, requester uuid.UUID) ([]KnowledgeBaseFile, error)
// GetNeedProcessFiles returns the files that are not yet processed
GetNeedProcessFiles(ctx context.Context) []KnowledgeBaseFile
GetNeedProcessFiles(ctx context.Context, catalogType artifactpb.CatalogType) []KnowledgeBaseFile
// UpdateKnowledgeBaseFile updates the data and retrieves the latest data
UpdateKnowledgeBaseFile(ctx context.Context, fileUID string, updateMap map[string]interface{}) (*KnowledgeBaseFile, error)
// GetCountFilesByListKnowledgeBaseUID returns the number of files associated with the knowledge base UID
Expand Down Expand Up @@ -386,9 +386,11 @@ func (r *Repository) ProcessKnowledgeBaseFiles(
return files, nil
}

// GetNeedProcessFiles
func (r *Repository) GetNeedProcessFiles(ctx context.Context) []KnowledgeBaseFile {
// GetNeedProcessFiles returns the files that need to be processed in persistent catalogs
func (r *Repository) GetNeedProcessFiles(ctx context.Context, catalogType artifactpb.CatalogType) []KnowledgeBaseFile {
var files []KnowledgeBaseFile

// First get files that need processing
whereClause := fmt.Sprintf("%v IN ? AND %v is null", KnowledgeBaseFileColumn.ProcessStatus, KnowledgeBaseFileColumn.DeleteTime)
if err := r.db.WithContext(ctx).Where(
whereClause, []string{
Expand All @@ -400,7 +402,41 @@ func (r *Repository) GetNeedProcessFiles(ctx context.Context) []KnowledgeBaseFil
Find(&files).Error; err != nil {
return nil
}
return files

// Filter files to only include those from persistent catalogs
var result []KnowledgeBaseFile
// Get all unique knowledge base UIDs
kbUIDs := make([]uuid.UUID, 0)
kbUIDMap := make(map[uuid.UUID]bool)
for _, file := range files {
if !kbUIDMap[file.KnowledgeBaseUID] {
kbUIDs = append(kbUIDs, file.KnowledgeBaseUID)
kbUIDMap[file.KnowledgeBaseUID] = true
}
}

// Get all knowledge bases in one query
kbs, err := r.GetKnowledgeBasesByUIDs(ctx, kbUIDs)
if err != nil {
return nil
}

// Create map of persistent knowledge bases
persistentKBs := make(map[uuid.UUID]bool)
for _, kb := range kbs {
if kb.CatalogType == catalogType.String() {
persistentKBs[kb.UID] = true
}
}

// Filter files to only include those from persistent catalogs
for _, file := range files {
if persistentKBs[file.KnowledgeBaseUID] {
result = append(result, file)
}
}

return result
}

// UpdateKnowledgeBaseFile updates the data and retrieves the latest data
Expand Down Expand Up @@ -559,12 +595,12 @@ func (r *Repository) GetKnowledgebaseFileByKbUIDAndFileID(ctx context.Context, k
}

type SourceMeta struct {
OriginalFileUID uuid.UUID
OriginalFileUID uuid.UUID
OriginalFileName string
KbUID uuid.UUID
Dest string
CreateTime time.Time
UpdateTime time.Time
KbUID uuid.UUID
Dest string
CreateTime time.Time
UpdateTime time.Time
}

// GetTruthSourceByFileUID returns the truth source file destination of minIO by file UID
Expand Down Expand Up @@ -624,12 +660,12 @@ func (r *Repository) GetTruthSourceByFileUID(ctx context.Context, fileUID uuid.U
}

return &SourceMeta{
OriginalFileUID: originalFileUID,
OriginalFileUID: originalFileUID,
OriginalFileName: originalFileName,
Dest: dest,
CreateTime: createTime,
UpdateTime: updateTime,
KbUID: kbUID,
Dest: dest,
CreateTime: createTime,
UpdateTime: updateTime,
KbUID: kbUID,
}, nil
}

Expand Down
Loading
Loading