From ac88c1b145ed5c68d4fe4c8162546de2d6c62f34 Mon Sep 17 00:00:00 2001 From: "gary.y" Date: Sat, 30 Nov 2024 16:23:47 +0800 Subject: [PATCH] feat(artifact): implement fast indexing for temporary catalog --- cmd/main/main.go | 8 +- pkg/handler/knowledgebase.go | 50 +- pkg/mock/repository_i_mock.gen.go | 60 +- pkg/repository/knowledgebasefile.go | 64 +- pkg/service/pipeline.go | 189 +++++- pkg/worker/common.go | 297 +++++++++ .../{worker.go => persistentcatalogworker.go} | 371 ++--------- pkg/worker/tempcatalogworker.go | 605 ++++++++++++++++++ 8 files changed, 1230 insertions(+), 414 deletions(-) create mode 100644 pkg/worker/common.go rename pkg/worker/{worker.go => persistentcatalogworker.go} (67%) create mode 100644 pkg/worker/tempcatalogworker.go diff --git a/cmd/main/main.go b/cmd/main/main.go index 0dbae65..33c6a84 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -182,10 +182,14 @@ func main() { }), ) - // activate file-to-embeddings worker pool - wp := worker.NewFileToEmbWorkerPool(ctx, service, config.Config.FileToEmbeddingWorker.NumberOfWorkers) + // activate persistent catalog file-to-embeddings worker pool + wp := worker.NewPersistentCatalogFileToEmbWorkerPool(ctx, service, config.Config.FileToEmbeddingWorker.NumberOfWorkers, artifactPB.CatalogType_CATALOG_TYPE_PERSISTENT) wp.Start() + // activate temp(ephemeral) catalog file-to-embeddings worker pool + wpTemp := worker.NewTempCatalogFileToEmbWorkerPool(ctx, service, config.Config.FileToEmbeddingWorker.NumberOfWorkers, artifactPB.CatalogType_CATALOG_TYPE_EPHEMERAL) + wpTemp.Start() + // Start usage reporter var usg usage.Usage if config.Config.Server.Usage.Enabled { diff --git a/pkg/handler/knowledgebase.go b/pkg/handler/knowledgebase.go index acc89f6..ec43c49 100644 --- a/pkg/handler/knowledgebase.go +++ b/pkg/handler/knowledgebase.go @@ -154,11 +154,11 @@ func (ph *PublicHandler) CreateCatalog(ctx context.Context, req *artifactpb.Crea service.NamespaceID + "/" + service.ConvertDocToMDPipelineID2 + "@" + service.DocToMDVersion2, }, SplittingPipelines: []string{ - service.NamespaceID + "/" + service.TextChunkPipelineID + "@" + service.TextSplitVersion, - service.NamespaceID + "/" + service.MdChunkPipelineID + "@" + service.MdSplitVersion, + service.NamespaceID + "/" + service.ChunkTextPipelineID + "@" + service.ChunkTextVersion, + service.NamespaceID + "/" + service.ChunkMdPipelineID + "@" + service.ChunkMdVersion, }, EmbeddingPipelines: []string{ - service.NamespaceID + "/" + service.TextEmbedPipelineID + "@" + service.TextEmbedVersion, + service.NamespaceID + "/" + service.EmbedTextPipelineID + "@" + service.EmbedTextVersion, }, DownstreamApps: []string{}, TotalFiles: 0, @@ -235,11 +235,11 @@ func (ph *PublicHandler) ListCatalogs(ctx context.Context, req *artifactpb.ListC service.NamespaceID + "/" + service.ConvertDocToMDPipelineID2 + "@" + service.DocToMDVersion2, }, SplittingPipelines: []string{ - service.NamespaceID + "/" + service.TextChunkPipelineID + "@" + service.TextSplitVersion, - service.NamespaceID + "/" + service.MdChunkPipelineID + "@" + service.MdSplitVersion, + service.NamespaceID + "/" + service.ChunkTextPipelineID + "@" + service.ChunkTextVersion, + service.NamespaceID + "/" + service.ChunkMdPipelineID + "@" + service.ChunkMdVersion, }, EmbeddingPipelines: []string{ - service.NamespaceID + "/" + service.TextEmbedPipelineID + "@" + service.TextEmbedVersion, + service.NamespaceID + "/" + service.EmbedTextPipelineID + "@" + service.EmbedTextVersion, }, DownstreamApps: []string{}, TotalFiles: uint32(fileCounts[kb.UID]), @@ -262,7 +262,7 @@ func (ph *PublicHandler) UpdateCatalog(ctx context.Context, req *artifactpb.Upda if req.CatalogId == "" { log.Error("kb_id is empty", zap.Error(ErrCheckRequiredFields)) return nil, fmt.Errorf("kb_id is empty. err: %w", ErrCheckRequiredFields) -} + } ns, err := ph.service.GetNamespaceByNsID(ctx, req.GetNamespaceId()) if err != nil { @@ -318,22 +318,28 @@ func (ph *PublicHandler) UpdateCatalog(ctx context.Context, req *artifactpb.Upda // populate response return &artifactpb.UpdateCatalogResponse{ Catalog: &artifactpb.Catalog{ - Name: kb.Name, - CatalogId: kb.KbID, - Description: kb.Description, - Tags: kb.Tags, - CreateTime: kb.CreateTime.String(), - UpdateTime: kb.UpdateTime.String(), - OwnerName: kb.Owner, - ConvertingPipelines: []string{service.NamespaceID + "/" + service.ConvertDocToMDPipelineID}, + Name: kb.Name, + CatalogId: kb.KbID, + Description: kb.Description, + Tags: kb.Tags, + CreateTime: kb.CreateTime.String(), + UpdateTime: kb.UpdateTime.String(), + OwnerName: kb.Owner, + ConvertingPipelines: []string{ + service.NamespaceID + "/" + service.ConvertDocToMDPipelineID, + service.NamespaceID + "/" + service.ConvertDocToMDPipelineID2, + }, SplittingPipelines: []string{ - service.NamespaceID + "/" + service.TextChunkPipelineID, - service.NamespaceID + "/" + service.MdChunkPipelineID}, - EmbeddingPipelines: []string{service.NamespaceID + "/" + service.TextEmbedPipelineID}, - DownstreamApps: []string{}, - TotalFiles: uint32(fileCounts[kb.UID]), - TotalTokens: uint32(tokenCounts[kb.UID]), - UsedStorage: uint64(kb.Usage), + service.NamespaceID + "/" + service.ChunkTextPipelineID, + service.NamespaceID + "/" + service.ChunkMdPipelineID, + }, + EmbeddingPipelines: []string{ + service.NamespaceID + "/" + service.EmbedTextPipelineID, + }, + DownstreamApps: []string{}, + TotalFiles: uint32(fileCounts[kb.UID]), + TotalTokens: uint32(tokenCounts[kb.UID]), + UsedStorage: uint64(kb.Usage), }, }, nil } diff --git a/pkg/mock/repository_i_mock.gen.go b/pkg/mock/repository_i_mock.gen.go index 87a44ae..85e736b 100644 --- a/pkg/mock/repository_i_mock.gen.go +++ b/pkg/mock/repository_i_mock.gen.go @@ -208,8 +208,8 @@ type RepositoryIMock struct { beforeGetKnowledgebaseFileByKbUIDAndFileIDCounter uint64 GetKnowledgebaseFileByKbUIDAndFileIDMock mRepositoryIMockGetKnowledgebaseFileByKbUIDAndFileID - funcGetNeedProcessFiles func(ctx context.Context) (ka1 []mm_repository.KnowledgeBaseFile) - inspectFuncGetNeedProcessFiles func(ctx context.Context) + funcGetNeedProcessFiles func(ctx context.Context, catalogType artifactpb.CatalogType) (ka1 []mm_repository.KnowledgeBaseFile) + inspectFuncGetNeedProcessFiles func(ctx context.Context, catalogType artifactpb.CatalogType) afterGetNeedProcessFilesCounter uint64 beforeGetNeedProcessFilesCounter uint64 GetNeedProcessFilesMock mRepositoryIMockGetNeedProcessFiles @@ -10066,12 +10066,14 @@ type RepositoryIMockGetNeedProcessFilesExpectation struct { // RepositoryIMockGetNeedProcessFilesParams contains parameters of the RepositoryI.GetNeedProcessFiles type RepositoryIMockGetNeedProcessFilesParams struct { - ctx context.Context + ctx context.Context + catalogType artifactpb.CatalogType } // RepositoryIMockGetNeedProcessFilesParamPtrs contains pointers to parameters of the RepositoryI.GetNeedProcessFiles type RepositoryIMockGetNeedProcessFilesParamPtrs struct { - ctx *context.Context + ctx *context.Context + catalogType *artifactpb.CatalogType } // RepositoryIMockGetNeedProcessFilesResults contains results of the RepositoryI.GetNeedProcessFiles @@ -10080,7 +10082,7 @@ type RepositoryIMockGetNeedProcessFilesResults struct { } // Expect sets up expected params for RepositoryI.GetNeedProcessFiles -func (mmGetNeedProcessFiles *mRepositoryIMockGetNeedProcessFiles) Expect(ctx context.Context) *mRepositoryIMockGetNeedProcessFiles { +func (mmGetNeedProcessFiles *mRepositoryIMockGetNeedProcessFiles) Expect(ctx context.Context, catalogType artifactpb.CatalogType) *mRepositoryIMockGetNeedProcessFiles { if mmGetNeedProcessFiles.mock.funcGetNeedProcessFiles != nil { mmGetNeedProcessFiles.mock.t.Fatalf("RepositoryIMock.GetNeedProcessFiles mock is already set by Set") } @@ -10093,7 +10095,7 @@ func (mmGetNeedProcessFiles *mRepositoryIMockGetNeedProcessFiles) Expect(ctx con mmGetNeedProcessFiles.mock.t.Fatalf("RepositoryIMock.GetNeedProcessFiles mock is already set by ExpectParams functions") } - mmGetNeedProcessFiles.defaultExpectation.params = &RepositoryIMockGetNeedProcessFilesParams{ctx} + mmGetNeedProcessFiles.defaultExpectation.params = &RepositoryIMockGetNeedProcessFilesParams{ctx, catalogType} for _, e := range mmGetNeedProcessFiles.expectations { if minimock.Equal(e.params, mmGetNeedProcessFiles.defaultExpectation.params) { mmGetNeedProcessFiles.mock.t.Fatalf("Expectation set by When has same params: %#v", *mmGetNeedProcessFiles.defaultExpectation.params) @@ -10125,8 +10127,30 @@ func (mmGetNeedProcessFiles *mRepositoryIMockGetNeedProcessFiles) ExpectCtxParam return mmGetNeedProcessFiles } +// ExpectCatalogTypeParam2 sets up expected param catalogType for RepositoryI.GetNeedProcessFiles +func (mmGetNeedProcessFiles *mRepositoryIMockGetNeedProcessFiles) ExpectCatalogTypeParam2(catalogType artifactpb.CatalogType) *mRepositoryIMockGetNeedProcessFiles { + if mmGetNeedProcessFiles.mock.funcGetNeedProcessFiles != nil { + mmGetNeedProcessFiles.mock.t.Fatalf("RepositoryIMock.GetNeedProcessFiles mock is already set by Set") + } + + if mmGetNeedProcessFiles.defaultExpectation == nil { + mmGetNeedProcessFiles.defaultExpectation = &RepositoryIMockGetNeedProcessFilesExpectation{} + } + + if mmGetNeedProcessFiles.defaultExpectation.params != nil { + mmGetNeedProcessFiles.mock.t.Fatalf("RepositoryIMock.GetNeedProcessFiles mock is already set by Expect") + } + + if mmGetNeedProcessFiles.defaultExpectation.paramPtrs == nil { + mmGetNeedProcessFiles.defaultExpectation.paramPtrs = &RepositoryIMockGetNeedProcessFilesParamPtrs{} + } + mmGetNeedProcessFiles.defaultExpectation.paramPtrs.catalogType = &catalogType + + return mmGetNeedProcessFiles +} + // Inspect accepts an inspector function that has same arguments as the RepositoryI.GetNeedProcessFiles -func (mmGetNeedProcessFiles *mRepositoryIMockGetNeedProcessFiles) Inspect(f func(ctx context.Context)) *mRepositoryIMockGetNeedProcessFiles { +func (mmGetNeedProcessFiles *mRepositoryIMockGetNeedProcessFiles) Inspect(f func(ctx context.Context, catalogType artifactpb.CatalogType)) *mRepositoryIMockGetNeedProcessFiles { if mmGetNeedProcessFiles.mock.inspectFuncGetNeedProcessFiles != nil { mmGetNeedProcessFiles.mock.t.Fatalf("Inspect function is already set for RepositoryIMock.GetNeedProcessFiles") } @@ -10150,7 +10174,7 @@ func (mmGetNeedProcessFiles *mRepositoryIMockGetNeedProcessFiles) Return(ka1 []m } // Set uses given function f to mock the RepositoryI.GetNeedProcessFiles method -func (mmGetNeedProcessFiles *mRepositoryIMockGetNeedProcessFiles) Set(f func(ctx context.Context) (ka1 []mm_repository.KnowledgeBaseFile)) *RepositoryIMock { +func (mmGetNeedProcessFiles *mRepositoryIMockGetNeedProcessFiles) Set(f func(ctx context.Context, catalogType artifactpb.CatalogType) (ka1 []mm_repository.KnowledgeBaseFile)) *RepositoryIMock { if mmGetNeedProcessFiles.defaultExpectation != nil { mmGetNeedProcessFiles.mock.t.Fatalf("Default expectation is already set for the RepositoryI.GetNeedProcessFiles method") } @@ -10165,14 +10189,14 @@ func (mmGetNeedProcessFiles *mRepositoryIMockGetNeedProcessFiles) Set(f func(ctx // When sets expectation for the RepositoryI.GetNeedProcessFiles which will trigger the result defined by the following // Then helper -func (mmGetNeedProcessFiles *mRepositoryIMockGetNeedProcessFiles) When(ctx context.Context) *RepositoryIMockGetNeedProcessFilesExpectation { +func (mmGetNeedProcessFiles *mRepositoryIMockGetNeedProcessFiles) When(ctx context.Context, catalogType artifactpb.CatalogType) *RepositoryIMockGetNeedProcessFilesExpectation { if mmGetNeedProcessFiles.mock.funcGetNeedProcessFiles != nil { mmGetNeedProcessFiles.mock.t.Fatalf("RepositoryIMock.GetNeedProcessFiles mock is already set by Set") } expectation := &RepositoryIMockGetNeedProcessFilesExpectation{ mock: mmGetNeedProcessFiles.mock, - params: &RepositoryIMockGetNeedProcessFilesParams{ctx}, + params: &RepositoryIMockGetNeedProcessFilesParams{ctx, catalogType}, } mmGetNeedProcessFiles.expectations = append(mmGetNeedProcessFiles.expectations, expectation) return expectation @@ -10205,15 +10229,15 @@ func (mmGetNeedProcessFiles *mRepositoryIMockGetNeedProcessFiles) invocationsDon } // GetNeedProcessFiles implements repository.RepositoryI -func (mmGetNeedProcessFiles *RepositoryIMock) GetNeedProcessFiles(ctx context.Context) (ka1 []mm_repository.KnowledgeBaseFile) { +func (mmGetNeedProcessFiles *RepositoryIMock) GetNeedProcessFiles(ctx context.Context, catalogType artifactpb.CatalogType) (ka1 []mm_repository.KnowledgeBaseFile) { mm_atomic.AddUint64(&mmGetNeedProcessFiles.beforeGetNeedProcessFilesCounter, 1) defer mm_atomic.AddUint64(&mmGetNeedProcessFiles.afterGetNeedProcessFilesCounter, 1) if mmGetNeedProcessFiles.inspectFuncGetNeedProcessFiles != nil { - mmGetNeedProcessFiles.inspectFuncGetNeedProcessFiles(ctx) + mmGetNeedProcessFiles.inspectFuncGetNeedProcessFiles(ctx, catalogType) } - mm_params := RepositoryIMockGetNeedProcessFilesParams{ctx} + mm_params := RepositoryIMockGetNeedProcessFilesParams{ctx, catalogType} // Record call args mmGetNeedProcessFiles.GetNeedProcessFilesMock.mutex.Lock() @@ -10232,7 +10256,7 @@ func (mmGetNeedProcessFiles *RepositoryIMock) GetNeedProcessFiles(ctx context.Co mm_want := mmGetNeedProcessFiles.GetNeedProcessFilesMock.defaultExpectation.params mm_want_ptrs := mmGetNeedProcessFiles.GetNeedProcessFilesMock.defaultExpectation.paramPtrs - mm_got := RepositoryIMockGetNeedProcessFilesParams{ctx} + mm_got := RepositoryIMockGetNeedProcessFilesParams{ctx, catalogType} if mm_want_ptrs != nil { @@ -10240,6 +10264,10 @@ func (mmGetNeedProcessFiles *RepositoryIMock) GetNeedProcessFiles(ctx context.Co mmGetNeedProcessFiles.t.Errorf("RepositoryIMock.GetNeedProcessFiles got unexpected parameter ctx, want: %#v, got: %#v%s\n", *mm_want_ptrs.ctx, mm_got.ctx, minimock.Diff(*mm_want_ptrs.ctx, mm_got.ctx)) } + if mm_want_ptrs.catalogType != nil && !minimock.Equal(*mm_want_ptrs.catalogType, mm_got.catalogType) { + mmGetNeedProcessFiles.t.Errorf("RepositoryIMock.GetNeedProcessFiles got unexpected parameter catalogType, want: %#v, got: %#v%s\n", *mm_want_ptrs.catalogType, mm_got.catalogType, minimock.Diff(*mm_want_ptrs.catalogType, mm_got.catalogType)) + } + } else if mm_want != nil && !minimock.Equal(*mm_want, mm_got) { mmGetNeedProcessFiles.t.Errorf("RepositoryIMock.GetNeedProcessFiles got unexpected parameters, want: %#v, got: %#v%s\n", *mm_want, mm_got, minimock.Diff(*mm_want, mm_got)) } @@ -10251,9 +10279,9 @@ func (mmGetNeedProcessFiles *RepositoryIMock) GetNeedProcessFiles(ctx context.Co return (*mm_results).ka1 } if mmGetNeedProcessFiles.funcGetNeedProcessFiles != nil { - return mmGetNeedProcessFiles.funcGetNeedProcessFiles(ctx) + return mmGetNeedProcessFiles.funcGetNeedProcessFiles(ctx, catalogType) } - mmGetNeedProcessFiles.t.Fatalf("Unexpected call to RepositoryIMock.GetNeedProcessFiles. %v", ctx) + mmGetNeedProcessFiles.t.Fatalf("Unexpected call to RepositoryIMock.GetNeedProcessFiles. %v %v", ctx, catalogType) return } diff --git a/pkg/repository/knowledgebasefile.go b/pkg/repository/knowledgebasefile.go index fe77392..e5727d2 100644 --- a/pkg/repository/knowledgebasefile.go +++ b/pkg/repository/knowledgebasefile.go @@ -35,7 +35,7 @@ type KnowledgeBaseFileI interface { // ProcessKnowledgeBaseFiles updates the process status of the files ProcessKnowledgeBaseFiles(ctx context.Context, fileUIDs []string, requester uuid.UUID) ([]KnowledgeBaseFile, error) // GetNeedProcessFiles returns the files that are not yet processed - GetNeedProcessFiles(ctx context.Context) []KnowledgeBaseFile + GetNeedProcessFiles(ctx context.Context, catalogType artifactpb.CatalogType) []KnowledgeBaseFile // UpdateKnowledgeBaseFile updates the data and retrieves the latest data UpdateKnowledgeBaseFile(ctx context.Context, fileUID string, updateMap map[string]interface{}) (*KnowledgeBaseFile, error) // GetCountFilesByListKnowledgeBaseUID returns the number of files associated with the knowledge base UID @@ -386,9 +386,11 @@ func (r *Repository) ProcessKnowledgeBaseFiles( return files, nil } -// GetNeedProcessFiles -func (r *Repository) GetNeedProcessFiles(ctx context.Context) []KnowledgeBaseFile { +// GetNeedProcessFiles returns the files that need to be processed in persistent catalogs +func (r *Repository) GetNeedProcessFiles(ctx context.Context, catalogType artifactpb.CatalogType) []KnowledgeBaseFile { var files []KnowledgeBaseFile + + // First get files that need processing whereClause := fmt.Sprintf("%v IN ? AND %v is null", KnowledgeBaseFileColumn.ProcessStatus, KnowledgeBaseFileColumn.DeleteTime) if err := r.db.WithContext(ctx).Where( whereClause, []string{ @@ -400,7 +402,41 @@ func (r *Repository) GetNeedProcessFiles(ctx context.Context) []KnowledgeBaseFil Find(&files).Error; err != nil { return nil } - return files + + // Filter files to only include those from persistent catalogs + var result []KnowledgeBaseFile + // Get all unique knowledge base UIDs + kbUIDs := make([]uuid.UUID, 0) + kbUIDMap := make(map[uuid.UUID]bool) + for _, file := range files { + if !kbUIDMap[file.KnowledgeBaseUID] { + kbUIDs = append(kbUIDs, file.KnowledgeBaseUID) + kbUIDMap[file.KnowledgeBaseUID] = true + } + } + + // Get all knowledge bases in one query + kbs, err := r.GetKnowledgeBasesByUIDs(ctx, kbUIDs) + if err != nil { + return nil + } + + // Create map of persistent knowledge bases + persistentKBs := make(map[uuid.UUID]bool) + for _, kb := range kbs { + if kb.CatalogType == catalogType.String() { + persistentKBs[kb.UID] = true + } + } + + // Filter files to only include those from persistent catalogs + for _, file := range files { + if persistentKBs[file.KnowledgeBaseUID] { + result = append(result, file) + } + } + + return result } // UpdateKnowledgeBaseFile updates the data and retrieves the latest data @@ -559,12 +595,12 @@ func (r *Repository) GetKnowledgebaseFileByKbUIDAndFileID(ctx context.Context, k } type SourceMeta struct { - OriginalFileUID uuid.UUID + OriginalFileUID uuid.UUID OriginalFileName string - KbUID uuid.UUID - Dest string - CreateTime time.Time - UpdateTime time.Time + KbUID uuid.UUID + Dest string + CreateTime time.Time + UpdateTime time.Time } // GetTruthSourceByFileUID returns the truth source file destination of minIO by file UID @@ -624,12 +660,12 @@ func (r *Repository) GetTruthSourceByFileUID(ctx context.Context, fileUID uuid.U } return &SourceMeta{ - OriginalFileUID: originalFileUID, + OriginalFileUID: originalFileUID, OriginalFileName: originalFileName, - Dest: dest, - CreateTime: createTime, - UpdateTime: updateTime, - KbUID: kbUID, + Dest: dest, + CreateTime: createTime, + UpdateTime: updateTime, + KbUID: kbUID, }, nil } diff --git a/pkg/service/pipeline.go b/pkg/service/pipeline.go index 9887a46..f304ddf 100644 --- a/pkg/service/pipeline.go +++ b/pkg/service/pipeline.go @@ -16,8 +16,12 @@ import ( "google.golang.org/protobuf/types/known/structpb" ) -const chunkLength = 1024 -const chunkOverlap = 200 +const maxChunkLengthForPersistentCatalog = 1024 +const maxChunkLengthForTempCatalog = 7000 + +const chunkOverlapForPersistentCatalog = 200 +const chunkOverlapForTempCatalog = 700 + const NamespaceID = "preset" // Note: this pipeline is for the old indexing pipeline having convert_result @@ -26,22 +30,22 @@ const DocToMDVersion = "v1.1.1" // Note: this pipeline is for the new indexing pipeline having convert_result or convert_result2 const ConvertDocToMDPipelineID2 = "indexing-advanced-convert-doc" -const DocToMDVersion2 = "v1.2.0" +const DocToMDVersion2 = "v1.3.0" -const MdChunkPipelineID = "indexing-split-markdown" -const MdSplitVersion = "v2.0.0" +const ChunkMdPipelineID = "indexing-split-markdown" +const ChunkMdVersion = "v2.0.0" -const TextChunkPipelineID = "indexing-split-text" -const TextSplitVersion = "v2.0.0" +const ChunkTextPipelineID = "indexing-split-text" +const ChunkTextVersion = "v2.0.0" -const TextEmbedPipelineID = "indexing-embed" -const TextEmbedVersion = "v1.1.0" +const EmbedTextPipelineID = "indexing-embed" +const EmbedTextVersion = "v1.1.0" const QAPipelineID = "retrieving-qna" const QAVersion = "v1.2.0" -// ConvertToMDPipe using converting pipeline to convert some file type to MD and consume caller's credits -func (s *Service) ConvertToMDPipe(ctx context.Context, fileUID uuid.UUID, caller uuid.UUID, requester uuid.UUID, fileBase64 string, fileType artifactPb.FileType) (string, error) { +// ConvertToMDPipeForFilesInPersistentCatalog using converting pipeline to convert some file type to MD and consume caller's credits +func (s *Service) ConvertToMDPipeForFilesInPersistentCatalog(ctx context.Context, fileUID uuid.UUID, caller uuid.UUID, requester uuid.UUID, fileBase64 string, fileType artifactPb.FileType) (string, error) { logger, _ := logger.GetZapLogger(ctx) var md metadata.MD if requester != uuid.Nil { @@ -122,6 +126,84 @@ func (s *Service) ConvertToMDPipe(ctx context.Context, fileUID uuid.UUID, caller return result, nil } +// ConvertToMDPipeForFilesInTempCatalog using converting pipeline to convert some file type to MD and consume caller's credits +func (s *Service) ConvertToMDPipeForFilesInTempCatalog(ctx context.Context, fileUID uuid.UUID, caller uuid.UUID, requester uuid.UUID, fileBase64 string, fileType artifactPb.FileType) (string, error) { + logger, _ := logger.GetZapLogger(ctx) + var md metadata.MD + if requester != uuid.Nil { + md = metadata.New(map[string]string{ + constant.HeaderUserUIDKey: caller.String(), + constant.HeaderAuthTypeKey: "user", + constant.HeaderRequesterUIDKey: requester.String(), + }) + } else { + md = metadata.New(map[string]string{ + constant.HeaderUserUIDKey: caller.String(), + constant.HeaderAuthTypeKey: "user", + }) + } + ctx = metadata.NewOutgoingContext(ctx, md) + + // Get the appropriate prefix for the file type + prefix := getFileTypePrefix(fileType) + + // Determine which pipeline and version to use based on file type + var pipelineID string + var version string + + switch fileType { + // Document types use the new pipeline + case artifactPb.FileType_FILE_TYPE_PDF, + artifactPb.FileType_FILE_TYPE_DOCX, + artifactPb.FileType_FILE_TYPE_DOC, + artifactPb.FileType_FILE_TYPE_PPT, + artifactPb.FileType_FILE_TYPE_PPTX, + artifactPb.FileType_FILE_TYPE_XLSX, + artifactPb.FileType_FILE_TYPE_XLS, + artifactPb.FileType_FILE_TYPE_CSV, + artifactPb.FileType_FILE_TYPE_HTML: + pipelineID = ConvertDocToMDPipelineID + version = DocToMDVersion + + default: + return "", fmt.Errorf("unsupported file type: %v", fileType) + } + + // save the converting pipeline metadata into database + convertingPipelineMetadata := NamespaceID + "/" + pipelineID + "@" + version + err := s.Repository.UpdateKbFileExtraMetaData(ctx, fileUID, "", convertingPipelineMetadata, "", "", nil, nil, nil, nil) + if err != nil { + logger.Error("Failed to save converting pipeline metadata.", zap.String("File uid:", fileUID.String())) + return "", fmt.Errorf("failed to save converting pipeline metadata: %w", err) + } + + req := &pipelinePb.TriggerNamespacePipelineReleaseRequest{ + NamespaceId: NamespaceID, + PipelineId: pipelineID, + ReleaseId: version, + Inputs: []*structpb.Struct{ + { + Fields: map[string]*structpb.Value{ + "document_input": {Kind: &structpb.Value_StringValue{StringValue: prefix + fileBase64}}, + }, + }, + }, + } + + resp, err := s.PipelinePub.TriggerNamespacePipelineRelease(ctx, req) + if err != nil { + logger.Error("failed to trigger pipeline", zap.Error(err)) + return "", fmt.Errorf("failed to trigger %s pipeline: %w", pipelineID, err) + } + + result, err := getConvertResult(resp) + if err != nil { + logger.Error("failed to get convert result", zap.Error(err)) + return "", fmt.Errorf("failed to get convert result: %w", err) + } + return result, nil +} + // getFileTypePrefix returns the appropriate prefix for the given file type func getFileTypePrefix(fileType artifactPb.FileType) string { switch fileType { @@ -179,9 +261,9 @@ type Chunk = struct { Tokens int } -// SplitMarkdownPipe triggers the markdown splitting pipeline, processes the markdown text, and deducts credits from the caller's account. +// ChunkMarkdownPipe triggers the markdown splitting pipeline, processes the markdown text, and deducts credits from the caller's account. // It sets up the necessary metadata, triggers the pipeline, and processes the response to return the non-empty chunks. -func (s *Service) SplitMarkdownPipe(ctx context.Context, caller uuid.UUID, requester uuid.UUID, markdown string) ([]Chunk, error) { +func (s *Service) ChunkMarkdownPipe(ctx context.Context, caller uuid.UUID, requester uuid.UUID, markdown string) ([]Chunk, error) { var md metadata.MD if requester != uuid.Nil { md = metadata.New(map[string]string{ @@ -198,21 +280,21 @@ func (s *Service) SplitMarkdownPipe(ctx context.Context, caller uuid.UUID, reque ctx = metadata.NewOutgoingContext(ctx, md) req := &pipelinePb.TriggerNamespacePipelineReleaseRequest{ NamespaceId: NamespaceID, - PipelineId: MdChunkPipelineID, - ReleaseId: MdSplitVersion, + PipelineId: ChunkMdPipelineID, + ReleaseId: ChunkMdVersion, Inputs: []*structpb.Struct{ { Fields: map[string]*structpb.Value{ "md_input": {Kind: &structpb.Value_StringValue{StringValue: markdown}}, - "max_chunk_length": {Kind: &structpb.Value_NumberValue{NumberValue: chunkLength}}, - "chunk_overlap": {Kind: &structpb.Value_NumberValue{NumberValue: chunkOverlap}}, + "max_chunk_length": {Kind: &structpb.Value_NumberValue{NumberValue: maxChunkLengthForPersistentCatalog}}, + "chunk_overlap": {Kind: &structpb.Value_NumberValue{NumberValue: chunkOverlapForPersistentCatalog}}, }, }, }, } res, err := s.PipelinePub.TriggerNamespacePipelineRelease(ctx, req) if err != nil { - return nil, fmt.Errorf("failed to trigger %s pipeline. err:%w", MdChunkPipelineID, err) + return nil, fmt.Errorf("failed to trigger %s pipeline. err:%w", ChunkMdPipelineID, err) } result, err := GetChunksFromResponse(res) if err != nil { @@ -260,9 +342,60 @@ func GetChunksFromResponse(resp *pipelinePb.TriggerNamespacePipelineReleaseRespo return chunks, nil } -// SplitTextPipe splits the input text into chunks using the splitting pipeline and consumes the caller's credits. +// ChunkTextPipeForPersistentCatalog splits the input text into chunks using the splitting pipeline and consumes the caller's credits. +// It sets up the necessary metadata, triggers the pipeline, and processes the response to return the non-empty chunks. +func (s *Service) ChunkTextPipeForPersistentCatalog(ctx context.Context, caller uuid.UUID, requester uuid.UUID, text string) ([]Chunk, error) { + var md metadata.MD + if requester != uuid.Nil { + md = metadata.New(map[string]string{ + constant.HeaderUserUIDKey: caller.String(), + constant.HeaderAuthTypeKey: "user", + constant.HeaderRequesterUIDKey: requester.String(), + }) + } else { + md = metadata.New(map[string]string{ + constant.HeaderUserUIDKey: caller.String(), + constant.HeaderAuthTypeKey: "user", + }) + } + ctx = metadata.NewOutgoingContext(ctx, md) + req := &pipelinePb.TriggerNamespacePipelineReleaseRequest{ + NamespaceId: NamespaceID, + PipelineId: ChunkTextPipelineID, + ReleaseId: ChunkTextVersion, + + Inputs: []*structpb.Struct{ + { + Fields: map[string]*structpb.Value{ + "text_input": {Kind: &structpb.Value_StringValue{StringValue: text}}, + "max_chunk_length": {Kind: &structpb.Value_NumberValue{NumberValue: maxChunkLengthForPersistentCatalog}}, + "chunk_overlap": {Kind: &structpb.Value_NumberValue{NumberValue: chunkOverlapForPersistentCatalog}}, + }, + }, + }, + } + res, err := s.PipelinePub.TriggerNamespacePipelineRelease(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to trigger %s pipeline. err:%w", ChunkTextPipelineID, err) + } + result, err := GetChunksFromResponse(res) + if err != nil { + return nil, fmt.Errorf("failed to get chunks from response: %w", err) + } + // remove the empty chunk. + // note: this is a workaround for the pipeline bug that sometimes returns empty chunks. + var filteredResult []Chunk + for _, chunk := range result { + if chunk.Text != "" { + filteredResult = append(filteredResult, chunk) + } + } + return filteredResult, nil +} + +// ChunkTextPipeForTempCatalog splits the input text into chunks using the splitting pipeline and consumes the caller's credits. // It sets up the necessary metadata, triggers the pipeline, and processes the response to return the non-empty chunks. -func (s *Service) SplitTextPipe(ctx context.Context, caller uuid.UUID, requester uuid.UUID, text string) ([]Chunk, error) { +func (s *Service) ChunkTextPipeForTempCatalog(ctx context.Context, caller uuid.UUID, requester uuid.UUID, text string) ([]Chunk, error) { var md metadata.MD if requester != uuid.Nil { md = metadata.New(map[string]string{ @@ -279,22 +412,22 @@ func (s *Service) SplitTextPipe(ctx context.Context, caller uuid.UUID, requester ctx = metadata.NewOutgoingContext(ctx, md) req := &pipelinePb.TriggerNamespacePipelineReleaseRequest{ NamespaceId: NamespaceID, - PipelineId: TextChunkPipelineID, - ReleaseId: TextSplitVersion, + PipelineId: ChunkTextPipelineID, + ReleaseId: ChunkTextVersion, Inputs: []*structpb.Struct{ { Fields: map[string]*structpb.Value{ "text_input": {Kind: &structpb.Value_StringValue{StringValue: text}}, - "max_chunk_length": {Kind: &structpb.Value_NumberValue{NumberValue: chunkLength}}, - "chunk_overlap": {Kind: &structpb.Value_NumberValue{NumberValue: chunkOverlap}}, + "max_chunk_length": {Kind: &structpb.Value_NumberValue{NumberValue: maxChunkLengthForTempCatalog}}, + "chunk_overlap": {Kind: &structpb.Value_NumberValue{NumberValue: chunkOverlapForTempCatalog}}, }, }, }, } res, err := s.PipelinePub.TriggerNamespacePipelineRelease(ctx, req) if err != nil { - return nil, fmt.Errorf("failed to trigger %s pipeline. err:%w", TextChunkPipelineID, err) + return nil, fmt.Errorf("failed to trigger %s pipeline. err:%w", ChunkTextPipelineID, err) } result, err := GetChunksFromResponse(res) if err != nil { @@ -398,13 +531,13 @@ func (s *Service) EmbeddingTextPipe(ctx context.Context, caller uuid.UUID, reque req := &pipelinePb.TriggerNamespacePipelineReleaseRequest{ NamespaceId: NamespaceID, - PipelineId: TextEmbedPipelineID, - ReleaseId: TextEmbedVersion, + PipelineId: EmbedTextPipelineID, + ReleaseId: EmbedTextVersion, Inputs: inputs, } res, err := s.PipelinePub.TriggerNamespacePipelineRelease(ctx_, req) if err != nil { - errChan <- fmt.Errorf("failed to trigger %s pipeline. err:%w", TextEmbedPipelineID, err) + errChan <- fmt.Errorf("failed to trigger %s pipeline. err:%w", EmbedTextPipelineID, err) ctxCancel() return } diff --git a/pkg/worker/common.go b/pkg/worker/common.go new file mode 100644 index 0000000..4e3b8f1 --- /dev/null +++ b/pkg/worker/common.go @@ -0,0 +1,297 @@ +package worker + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/gofrs/uuid" + "github.com/instill-ai/artifact-backend/pkg/logger" + "github.com/instill-ai/artifact-backend/pkg/milvus" + "github.com/instill-ai/artifact-backend/pkg/minio" + "github.com/instill-ai/artifact-backend/pkg/repository" + "github.com/instill-ai/artifact-backend/pkg/service" + "github.com/redis/go-redis/v9" + "go.uber.org/zap" +) + +const periodOfDispatcher = 5 * time.Second +const extensionHelperPeriod = 5 * time.Second +const workerLifetime = 45 * time.Second +const workerPrefix = "worker-processing-file-" + +var ErrFileStatusNotMatch = errors.New("file status not match") + +func getWorkerKey(fileUID string) string { + return workerPrefix + fileUID +} + +// checkFileStatus checks if the file status from argument is the same as the file in database +func checkFileStatus(ctx context.Context, svc *service.Service, file repository.KnowledgeBaseFile) error { + dbFiles, err := svc.Repository.GetKnowledgeBaseFilesByFileUIDs(ctx, []uuid.UUID{file.UID}) + if err != nil { + return err + } + if len(dbFiles) == 0 { + return fmt.Errorf("file uid not found in database. file uid: %s", file.UID) + } + // if the file's status from argument is not the same as the file in database, skip the processing + // because the file in argument is not the latest file in database. Instead, it is from the queue. + if dbFiles[0].ProcessStatus != file.ProcessStatus { + err := fmt.Errorf("%w - file uid: %s, database file status: %v, file status in argument: %v", ErrFileStatusNotMatch, file.UID, dbFiles[0].ProcessStatus, file.ProcessStatus) + return err + } + return nil +} + +// registerFileWorker registers a file worker in the worker pool and sets a worker key in Redis with the given fileUID and workerLifetime. +// It periodically extends the worker's lifetime in Redis until the worker is done processing. +// It returns a boolean indicating success and a stopRegisterWorkerFunc that can be used to cancel the worker's lifetime extension and remove the worker key from Redis. +// period: duration between lifetime extensions +// workerLifetime: total duration the worker key should be kept in Redis +func registerFileWorker(ctx context.Context, svc *service.Service, fileUID string, period time.Duration, workerLifetime time.Duration) (ok bool, stopRegisterWorker stopRegisterWorkerFunc) { + logger, _ := logger.GetZapLogger(ctx) + stopRegisterWorker = func() { + logger.Warn("stopRegisterWorkerFunc is not implemented yet") + } + ok, err := svc.RedisClient.SetNX(ctx, getWorkerKey(fileUID), "1", workerLifetime).Result() + if err != nil { + logger.Error("Error when setting worker key in redis", zap.Error(err)) + return + } + if !ok { + logger.Warn("Key exists in redis, file is already being processed by worker", zap.String("fileUID", fileUID)) + return + } + ctx, lifetimeHelperCancel := context.WithCancel(ctx) + + // lifetimeExtHelper is a helper function that extends the lifetime of the worker by periodically updating the worker key's expiration time in Redis. + lifetimeExtHelper := func(ctx context.Context) { + ticker := time.NewTicker(period) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + // Context is done, exit the worker + logger.Debug("Finish worker lifetime extend helper received termination signal", zap.String("worker", getWorkerKey(fileUID))) + return + case <-ticker.C: + // extend the lifetime of the worker + logger.Debug("Extending worker lifetime", zap.String("worker", getWorkerKey(fileUID)), zap.Duration("lifetime", workerLifetime)) + err := svc.RedisClient.Expire(ctx, getWorkerKey(fileUID), workerLifetime).Err() + if err != nil { + logger.Error("Error when extending worker lifetime in redis", zap.Error(err), zap.String("worker", getWorkerKey(fileUID))) + return + } + } + } + } + go lifetimeExtHelper(ctx) + + // stopRegisterWorker function will cancel the lifetimeExtHelper and remove the worker key in redis + stopRegisterWorker = func() { + lifetimeHelperCancel() + svc.RedisClient.Del(ctx, getWorkerKey(fileUID)) + } + + return true, stopRegisterWorker +} + +// checkFileWorker checks if any of the provided fileUIDs have active workers +func checkRegisteredFilesWorker(ctx context.Context, svc *service.Service, fileUIDs []string) map[string]struct{} { + logger, _ := logger.GetZapLogger(ctx) + pipe := svc.RedisClient.Pipeline() + + // Create a map to hold the results + results := make(map[string]*redis.IntCmd) + + // Add EXISTS commands to the pipeline for each fileUID + for _, fileUID := range fileUIDs { + key := getWorkerKey(fileUID) + results[fileUID] = pipe.Exists(ctx, key) + } + + // Execute the pipeline + _, err := pipe.Exec(ctx) + if err != nil { + logger.Error("Error executing redis pipeline", zap.Error(err)) + return nil + } + + // Collect keys that do not exist + nonExistentKeys := make(map[string]struct{}) + for fileUID, result := range results { + exists, err := result.Result() + if err != nil { + logger.Error("Error getting result for %s", zap.String("fileUID", fileUID), zap.Error(err)) + return nil + } + if exists == 0 { + nonExistentKeys[fileUID] = struct{}{} + } + } + return nonExistentKeys +} + +// saveConvertedFile saves a converted file into object storage and updates the metadata in the database. +func saveConvertedFile(ctx context.Context, svc *service.Service, kbUID, fileUID uuid.UUID, name string, convertedFile []byte) error { + logger, _ := logger.GetZapLogger(ctx) + _, err := svc.Repository.CreateConvertedFile( + ctx, + repository.ConvertedFile{KbUID: kbUID, FileUID: fileUID, Name: name, Type: "text/markdown", Destination: "destination"}, + func(convertedFileUID uuid.UUID) (map[string]any, error) { + // save the converted file into object storage + err := svc.MinIO.SaveConvertedFile(ctx, kbUID.String(), convertedFileUID.String(), "md", convertedFile) + if err != nil { + return nil, err + } + output := make(map[string]any) + output[repository.ConvertedFileColumn.Destination] = svc.MinIO.GetConvertedFilePathInKnowledgeBase(kbUID.String(), convertedFileUID.String(), "md") + return output, nil + }) + if err != nil { + logger.Error("Failed to save converted file into object storage and metadata into database.", zap.String("FileUID", fileUID.String())) + return err + } + + return nil +} + + +type chunk = struct { + End int + Start int + Text string + Tokens int +} + +// saveChunks saves chunks into object storage and updates the metadata in the database. +func saveChunks(ctx context.Context, svc *service.Service, kbUID string, kbFileUID uuid.UUID, sourceTable string, sourceUID uuid.UUID, chunks []chunk) error { + logger, _ := logger.GetZapLogger(ctx) + textChunks := make([]*repository.TextChunk, len(chunks)) + + // turn kbUid to uuid no must parse + kbUIDuuid, err := uuid.FromString(kbUID) + if err != nil { + logger.Error("Failed to parse kbUID to uuid.", zap.String("KbUID", kbUID)) + return err + } + for i, c := range chunks { + textChunks[i] = &repository.TextChunk{ + SourceUID: sourceUID, + SourceTable: sourceTable, + StartPos: c.Start, + EndPos: c.End, + ContentDest: "not set yet because we need to save the chunks in db to get the uid", + Tokens: c.Tokens, + Retrievable: true, + InOrder: i, + KbUID: kbUIDuuid, + KbFileUID: kbFileUID, + } + } + _, err = svc.Repository.DeleteAndCreateChunks(ctx, sourceTable, sourceUID, textChunks, + func(chunkUIDs []string) (map[string]any, error) { + // save the chunksForMinIO into object storage + chunksForMinIO := make(map[minio.ChunkUIDType]minio.ChunkContentType, len(textChunks)) + for i, uid := range chunkUIDs { + chunksForMinIO[minio.ChunkUIDType(uid)] = minio.ChunkContentType([]byte(chunks[i].Text)) + } + err := svc.MinIO.SaveTextChunks(ctx, kbUID, chunksForMinIO) + if err != nil { + logger.Error("Failed to save chunks into object storage.", zap.String("SourceUID", sourceUID.String())) + return nil, err + } + chunkDestMap := make(map[string]any, len(chunkUIDs)) + for _, chunkUID := range chunkUIDs { + chunkDestMap[chunkUID] = svc.MinIO.GetChunkPathInKnowledgeBase(kbUID, string(chunkUID)) + } + return chunkDestMap, nil + }, + ) + if err != nil { + logger.Error("Failed to save chunks into object storage and metadata into database.", zap.String("SourceUID", sourceUID.String())) + return err + } + return nil +} + +type MilvusEmbedding struct { + SourceTable string + SourceUID string + EmbeddingUID string + Vector []float32 +} + +// saveEmbeddings saves embeddings into the vector database and updates the metadata in the database. +// Processes embeddings in batches of 50 to avoid timeout issues. +const batchSize = 50 + +func saveEmbeddings(ctx context.Context, svc *service.Service, kbUID string, embeddings []repository.Embedding) error { + logger, _ := logger.GetZapLogger(ctx) + if len(embeddings) == 0 { + logger.Debug("No embeddings to save") + return nil + } + + totalEmbeddings := len(embeddings) + + // Process embeddings in batches + for i := 0; i < totalEmbeddings; i += batchSize { + // Add context check + if err := ctx.Err(); err != nil { + return fmt.Errorf("context cancelled while processing embeddings: %w", err) + } + + end := i + batchSize + if end > totalEmbeddings { + end = totalEmbeddings + } + + currentBatch := embeddings[i:end] + + externalServiceCall := func(_ []string) error { + // save the embeddings into vector database + milvusEmbeddings := make([]milvus.Embedding, len(currentBatch)) + for j, emb := range currentBatch { + milvusEmbeddings[j] = milvus.Embedding{ + SourceTable: emb.SourceTable, + SourceUID: emb.SourceUID.String(), + EmbeddingUID: emb.UID.String(), + Vector: emb.Vector, + } + } + err := svc.MilvusClient.InsertVectorsToKnowledgeBaseCollection(ctx, kbUID, milvusEmbeddings) + if err != nil { + logger.Error("Failed to save embeddings batch into vector database.", + zap.String("KbUID", kbUID), + zap.Int("batch", i/batchSize+1), + zap.Int("batchSize", len(currentBatch))) + return err + } + return nil + } + + _, err := svc.Repository.UpsertEmbeddings(ctx, currentBatch, externalServiceCall) + if err != nil { + logger.Error("Failed to save embeddings batch into vector database and metadata into database.", + zap.String("KbUID", kbUID), + zap.Int("batch", i/batchSize+1), + zap.Int("batchSize", len(currentBatch))) + return err + } + + logger.Info("Embeddings batch saved successfully", + zap.String("KbUID", kbUID), + zap.Int("batch", i/batchSize+1), + zap.Int("batchSize", len(currentBatch)), + zap.Int("progress", end), + zap.Int("total", totalEmbeddings)) + } + + logger.Info("All embeddings saved into vector database and metadata into database.", + zap.String("KbUID", kbUID), + zap.Int("total embeddings", totalEmbeddings)) + return nil +} diff --git a/pkg/worker/worker.go b/pkg/worker/persistentcatalogworker.go similarity index 67% rename from pkg/worker/worker.go rename to pkg/worker/persistentcatalogworker.go index 5db5c42..76fc6e1 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/persistentcatalogworker.go @@ -3,54 +3,45 @@ package worker import ( "context" "encoding/base64" - "errors" "fmt" "runtime/debug" "sync" "time" - "github.com/gofrs/uuid" "github.com/instill-ai/artifact-backend/pkg/logger" - "github.com/instill-ai/artifact-backend/pkg/milvus" "github.com/instill-ai/artifact-backend/pkg/minio" "github.com/instill-ai/artifact-backend/pkg/repository" "github.com/instill-ai/artifact-backend/pkg/service" "github.com/instill-ai/artifact-backend/pkg/utils" artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha" - "github.com/redis/go-redis/v9" "go.uber.org/zap" ) -const periodOfDispatcher = 5 * time.Second -const extensionHelperPeriod = 5 * time.Second -const workerLifetime = 45 * time.Second -const workerPrefix = "worker-processing-file-" - -var ErrFileStatusNotMatch = errors.New("file status not match") - -type fileToEmbWorkerPool struct { +type persistentCatalogFileToEmbWorkerPool struct { numberOfWorkers int svc *service.Service channel chan repository.KnowledgeBaseFile wg sync.WaitGroup ctx context.Context cancel context.CancelFunc + catalogType artifactpb.CatalogType } -func NewFileToEmbWorkerPool(ctx context.Context, svc *service.Service, nums int) *fileToEmbWorkerPool { +func NewPersistentCatalogFileToEmbWorkerPool(ctx context.Context, svc *service.Service, nums int, catalogType artifactpb.CatalogType) *persistentCatalogFileToEmbWorkerPool { ctx, cancel := context.WithCancel(ctx) - return &fileToEmbWorkerPool{ + return &persistentCatalogFileToEmbWorkerPool{ numberOfWorkers: nums, svc: svc, - // channel is un-buffered because we dont want the out of date file to be processed - channel: make(chan repository.KnowledgeBaseFile), - wg: sync.WaitGroup{}, - ctx: ctx, - cancel: cancel, + // channel is un-buffered because we dont want the out of date and duplicate file in queue + channel: make(chan repository.KnowledgeBaseFile), + wg: sync.WaitGroup{}, + ctx: ctx, + cancel: cancel, + catalogType: catalogType, } } -func (wp *fileToEmbWorkerPool) Start() { +func (wp *persistentCatalogFileToEmbWorkerPool) Start() { logger, _ := logger.GetZapLogger(wp.ctx) for i := 0; i < wp.numberOfWorkers; i++ { wp.wg.Add(1) @@ -67,7 +58,7 @@ func (wp *fileToEmbWorkerPool) Start() { } // dispatcher is responsible for dispatching the incomplete file to the worker -func (wp *fileToEmbWorkerPool) startDispatcher() { +func (wp *persistentCatalogFileToEmbWorkerPool) startDispatcher() { logger, _ := logger.GetZapLogger(wp.ctx) defer wp.wg.Done() ticker := time.NewTicker(periodOfDispatcher) @@ -82,13 +73,13 @@ func (wp *fileToEmbWorkerPool) startDispatcher() { return case <-ticker.C: // Periodically check for incomplete files - incompleteFiles := wp.svc.Repository.GetNeedProcessFiles(wp.ctx) + incompleteFiles := wp.svc.Repository.GetNeedProcessFiles(wp.ctx, wp.catalogType) // Check if any of the incomplete files have active workers fileUIDs := make([]string, len(incompleteFiles)) for i, file := range incompleteFiles { fileUIDs[i] = file.UID.String() } - nonExistentKeys := wp.checkRegisteredFilesWorker(wp.ctx, fileUIDs) + nonExistentKeys := checkRegisteredFilesWorker(wp.ctx, wp.svc, fileUIDs) // Dispatch the files that do not have active workers incompleteAndNonRegisteredFiles := make([]repository.KnowledgeBaseFile, 0) @@ -124,7 +115,7 @@ func (wp *fileToEmbWorkerPool) startDispatcher() { // check the status of triggered process and extend the lifetime in redis... // pros: less connection to pipeline service and less resource consumption -func (wp *fileToEmbWorkerPool) startWorker(ctx context.Context, workerID int) { +func (wp *persistentCatalogFileToEmbWorkerPool) startWorker(ctx context.Context, workerID int) { logger, _ := logger.GetZapLogger(ctx) logger.Info("Worker started", zap.Int("WorkerID", workerID)) defer wp.wg.Done() @@ -155,7 +146,7 @@ func (wp *fileToEmbWorkerPool) startWorker(ctx context.Context, workerID int) { } // register file process worker in redis and extend the lifetime - ok, stopRegisterFunc := wp.registerFileWorker(ctx, file.UID.String(), extensionHelperPeriod, workerLifetime) + ok, stopRegisterFunc := registerFileWorker(ctx, wp.svc, file.UID.String(), extensionHelperPeriod, workerLifetime) if !ok { if stopRegisterFunc != nil { stopRegisterFunc() @@ -166,7 +157,7 @@ func (wp *fileToEmbWorkerPool) startWorker(ctx context.Context, workerID int) { // Because the file is from the dispatcher, the file status is guaranteed to be incomplete // but when the worker wakes up and tries to process the file, the file status might have been updated by other workers. // So we need to check the file status again to ensure the file is still same as when the worker wakes up - err := wp.checkFileStatus(ctx, file) + err := checkFileStatus(ctx, wp.svc, file) if err != nil { logger.Warn("File status not match. skip processing", zap.String("file uid", file.UID.String()), zap.Error(err)) if stopRegisterFunc != nil { @@ -208,7 +199,7 @@ func (wp *fileToEmbWorkerPool) startWorker(ctx context.Context, workerID int) { } // stop -func (wp *fileToEmbWorkerPool) GraceFulStop() { +func (wp *persistentCatalogFileToEmbWorkerPool) GraceFulStop() { logger, _ := logger.GetZapLogger(wp.ctx) logger.Info("Worker pool received termination signal") close(wp.channel) @@ -219,97 +210,8 @@ func (wp *fileToEmbWorkerPool) GraceFulStop() { type stopRegisterWorkerFunc func() -// registerFileWorker registers a file worker in the worker pool and sets a worker key in Redis with the given fileUID and workerLifetime. -// It periodically extends the worker's lifetime in Redis until the worker is done processing. -// It returns a boolean indicating success and a stopRegisterWorkerFunc that can be used to cancel the worker's lifetime extension and remove the worker key from Redis. -// period: duration between lifetime extensions -// workerLifetime: total duration the worker key should be kept in Redis -func (wp *fileToEmbWorkerPool) registerFileWorker(ctx context.Context, fileUID string, period time.Duration, workerLifetime time.Duration) (ok bool, stopRegisterWorker stopRegisterWorkerFunc) { - logger, _ := logger.GetZapLogger(ctx) - stopRegisterWorker = func() { - logger.Warn("stopRegisterWorkerFunc is not implemented yet") - } - ok, err := wp.svc.RedisClient.SetNX(ctx, getWorkerKey(fileUID), "1", workerLifetime).Result() - if err != nil { - logger.Error("Error when setting worker key in redis", zap.Error(err)) - return - } - if !ok { - logger.Warn("Key exists in redis, file is already being processed by worker", zap.String("fileUID", fileUID)) - return - } - ctx, lifetimeHelperCancel := context.WithCancel(ctx) - - // lifetimeExtHelper is a helper function that extends the lifetime of the worker by periodically updating the worker key's expiration time in Redis. - lifetimeExtHelper := func(ctx context.Context) { - ticker := time.NewTicker(period) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - // Context is done, exit the worker - logger.Debug("Finish worker lifetime extend helper received termination signal", zap.String("worker", getWorkerKey(fileUID))) - return - case <-ticker.C: - // extend the lifetime of the worker - logger.Debug("Extending worker lifetime", zap.String("worker", getWorkerKey(fileUID)), zap.Duration("lifetime", workerLifetime)) - err := wp.svc.RedisClient.Expire(ctx, getWorkerKey(fileUID), workerLifetime).Err() - if err != nil { - logger.Error("Error when extending worker lifetime in redis", zap.Error(err), zap.String("worker", getWorkerKey(fileUID))) - return - } - } - } - } - go lifetimeExtHelper(ctx) - - // stopRegisterWorker function will cancel the lifetimeExtHelper and remove the worker key in redis - stopRegisterWorker = func() { - lifetimeHelperCancel() - wp.svc.RedisClient.Del(ctx, getWorkerKey(fileUID)) - } - - return true, stopRegisterWorker -} - -// checkFileWorker checks if any of the provided fileUIDs have active workers -func (wp *fileToEmbWorkerPool) checkRegisteredFilesWorker(ctx context.Context, fileUIDs []string) map[string]struct{} { - logger, _ := logger.GetZapLogger(ctx) - pipe := wp.svc.RedisClient.Pipeline() - - // Create a map to hold the results - results := make(map[string]*redis.IntCmd) - - // Add EXISTS commands to the pipeline for each fileUID - for _, fileUID := range fileUIDs { - key := getWorkerKey(fileUID) - results[fileUID] = pipe.Exists(ctx, key) - } - - // Execute the pipeline - _, err := pipe.Exec(ctx) - if err != nil { - logger.Error("Error executing redis pipeline", zap.Error(err)) - return nil - } - - // Collect keys that do not exist - nonExistentKeys := make(map[string]struct{}) - for fileUID, result := range results { - exists, err := result.Result() - if err != nil { - logger.Error("Error getting result for %s", zap.String("fileUID", fileUID), zap.Error(err)) - return nil - } - if exists == 0 { - nonExistentKeys[fileUID] = struct{}{} - } - } - return nonExistentKeys -} - // processFile handles the processing of a file through various stages using a state machine. -func (wp *fileToEmbWorkerPool) processFile(ctx context.Context, file repository.KnowledgeBaseFile) error { +func (wp *persistentCatalogFileToEmbWorkerPool) processFile(ctx context.Context, file repository.KnowledgeBaseFile) error { logger, _ := logger.GetZapLogger(ctx) var status artifactpb.FileProcessStatus if statusInt, ok := artifactpb.FileProcessStatus_value[file.ProcessStatus]; !ok { @@ -319,7 +221,7 @@ func (wp *fileToEmbWorkerPool) processFile(ctx context.Context, file repository. } // check if the file is already processed - err := wp.checkFileStatus(ctx, file) + err := checkFileStatus(ctx, wp.svc, file) if err != nil { return err } @@ -389,7 +291,7 @@ func (wp *fileToEmbWorkerPool) processFile(ctx context.Context, file repository. // For pdf, doc, docx, ppt, pptx, html, and xlsx files, it transitions to the converting status. // For text and markdown files, it transitions to the chunking status. // For unsupported file types, it returns an error. -func (wp *fileToEmbWorkerPool) processWaitingFile(ctx context.Context, file repository.KnowledgeBaseFile) (updatedFile *repository.KnowledgeBaseFile, nextStatus artifactpb.FileProcessStatus, err error) { +func (wp *persistentCatalogFileToEmbWorkerPool) processWaitingFile(ctx context.Context, file repository.KnowledgeBaseFile) (updatedFile *repository.KnowledgeBaseFile, nextStatus artifactpb.FileProcessStatus, err error) { // check if file process status is waiting if file.ProcessStatus != artifactpb.FileProcessStatus_name[int32(artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_WAITING)] { return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, fmt.Errorf("file process status should be waiting. status: %v", file.ProcessStatus) @@ -440,7 +342,7 @@ func (wp *fileToEmbWorkerPool) processWaitingFile(ctx context.Context, file repo // The converted file is saved into object storage and the metadata is updated in the database. // Finally, the file status is updated to chunking in the database. // If the file is not a PDF, it returns an error. -func (wp *fileToEmbWorkerPool) processConvertingFile(ctx context.Context, file repository.KnowledgeBaseFile) (updatedFile *repository.KnowledgeBaseFile, nextStatus artifactpb.FileProcessStatus, err error) { +func (wp *persistentCatalogFileToEmbWorkerPool) processConvertingFile(ctx context.Context, file repository.KnowledgeBaseFile) (updatedFile *repository.KnowledgeBaseFile, nextStatus artifactpb.FileProcessStatus, err error) { logger, _ := logger.GetZapLogger(ctx) fileInMinIOPath := file.Destination @@ -455,14 +357,14 @@ func (wp *fileToEmbWorkerPool) processConvertingFile(ctx context.Context, file r // convert the pdf file to md requesterUID := file.RequesterUID - convertedMD, err := wp.svc.ConvertToMDPipe(ctx, file.UID, file.CreatorUID, requesterUID, base64Data, artifactpb.FileType(artifactpb.FileType_value[file.Type])) + convertedMD, err := wp.svc.ConvertToMDPipeForFilesInPersistentCatalog(ctx, file.UID, file.CreatorUID, requesterUID, base64Data, artifactpb.FileType(artifactpb.FileType_value[file.Type])) if err != nil { logger.Error("Failed to convert pdf to md using pdf-to-md pipeline.", zap.String("File path", fileInMinIOPath)) return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err } // save the converted file into object storage and metadata into database - err = wp.saveConvertedFile(ctx, file.KnowledgeBaseUID, file.UID, "converted_"+file.Name, []byte(convertedMD)) + err = saveConvertedFile(ctx, wp.svc, file.KnowledgeBaseUID, file.UID, "converted_"+file.Name, []byte(convertedMD)) if err != nil { logger.Error("Failed to save converted data.", zap.String("File path", fileInMinIOPath)) return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err @@ -511,7 +413,7 @@ func (wp *fileToEmbWorkerPool) processConvertingFile(ctx context.Context, file r // - err: Error if any step fails // // The function handles errors at each step and returns appropriate status codes. -func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file repository.KnowledgeBaseFile) (*repository.KnowledgeBaseFile, artifactpb.FileProcessStatus, error) { +func (wp *persistentCatalogFileToEmbWorkerPool) processChunkingFile(ctx context.Context, file repository.KnowledgeBaseFile) (*repository.KnowledgeBaseFile, artifactpb.FileProcessStatus, error) { logger, _ := logger.GetZapLogger(ctx) logger.Info("Processing chunking status file.", zap.String("File uid", file.UID.String())) // check the file status is chunking @@ -551,14 +453,14 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep artifactpb.FileType_FILE_TYPE_CSV.String(), artifactpb.FileType_FILE_TYPE_HTML.String(): requesterUID := file.RequesterUID - chunks, err = wp.svc.SplitMarkdownPipe(ctx, file.CreatorUID, requesterUID, string(convertedFileData)) + chunks, err = wp.svc.ChunkMarkdownPipe(ctx, file.CreatorUID, requesterUID, string(convertedFileData)) case artifactpb.FileType_FILE_TYPE_PDF.String(), artifactpb.FileType_FILE_TYPE_DOCX.String(), artifactpb.FileType_FILE_TYPE_DOC.String(), artifactpb.FileType_FILE_TYPE_PPTX.String(), artifactpb.FileType_FILE_TYPE_PPT.String(): requesterUID := file.RequesterUID - chunks, err = wp.svc.SplitTextPipe(ctx, file.CreatorUID, requesterUID, string(convertedFileData)) + chunks, err = wp.svc.ChunkTextPipeForPersistentCatalog(ctx, file.CreatorUID, requesterUID, string(convertedFileData)) } if err != nil { logger.Error("Failed to get chunks from converted file using markdown chunking pipeline.", zap.String("Converted file uid", convertedFile.UID.String())) @@ -566,13 +468,13 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep } // Save the chunks into object storage(minIO) and metadata into database - err = wp.saveChunks(ctx, file.KnowledgeBaseUID.String(), file.UID, wp.svc.Repository.ConvertedFileTableName(), convertedFile.UID, chunks) + err = saveChunks(ctx, wp.svc, file.KnowledgeBaseUID.String(), file.UID, wp.svc.Repository.ConvertedFileTableName(), convertedFile.UID, chunks) if err != nil { logger.Error("Failed to save chunks into object storage and metadata into database.", zap.String("File uid", file.UID.String())) return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err } // save chunking pipeline metadata into file's extra metadata - chunkingPipelineMetadata := service.NamespaceID + "/" + service.MdChunkPipelineID + "@" + service.MdSplitVersion + chunkingPipelineMetadata := service.NamespaceID + "/" + service.ChunkMdPipelineID + "@" + service.ChunkMdVersion err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", chunkingPipelineMetadata, "", nil, nil, nil, nil) if err != nil { logger.Error("Failed to save chunking pipeline metadata.", zap.String("File uid:", file.UID.String())) @@ -602,19 +504,19 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep // Call the text chunking pipeline requesterUID := file.RequesterUID - chunks, err := wp.svc.SplitTextPipe(ctx, file.CreatorUID, requesterUID, string(originalFile)) + chunks, err := wp.svc.ChunkTextPipeForPersistentCatalog(ctx, file.CreatorUID, requesterUID, string(originalFile)) if err != nil { logger.Error("Failed to get chunks from original file.", zap.String("File uid", file.UID.String())) return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err } // Save the chunks into object storage(minIO) and metadata into database - err = wp.saveChunks(ctx, file.KnowledgeBaseUID.String(), file.UID, wp.svc.Repository.KnowledgeBaseFileTableName(), file.UID, chunks) + err = saveChunks(ctx, wp.svc, file.KnowledgeBaseUID.String(), file.UID, wp.svc.Repository.KnowledgeBaseFileTableName(), file.UID, chunks) if err != nil { logger.Error("Failed to save chunks into object storage and metadata into database.", zap.String("File uid", file.UID.String())) return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err } // save chunking pipeline metadata into file's extra metadata - chunkingPipelineMetadata := service.NamespaceID + "/" + service.TextChunkPipelineID + "@" + service.TextSplitVersion + chunkingPipelineMetadata := service.NamespaceID + "/" + service.ChunkTextPipelineID + "@" + service.ChunkTextVersion err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", chunkingPipelineMetadata, "", nil, nil, nil, nil) if err != nil { logger.Error("Failed to save chunking pipeline metadata.", zap.String("File uid:", file.UID.String())) @@ -641,7 +543,7 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep } // save chunking pipeline metadata into file's extra metadata - chunkingPipelineMetadata := service.NamespaceID + "/" + service.MdChunkPipelineID + "@" + service.MdSplitVersion + chunkingPipelineMetadata := service.NamespaceID + "/" + service.ChunkMdPipelineID + "@" + service.ChunkMdVersion err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", chunkingPipelineMetadata, "", nil, nil, nil, nil) if err != nil { logger.Error("Failed to save chunking pipeline metadata.", zap.String("File uid:", file.UID.String())) @@ -652,14 +554,14 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep // Call the text chunking pipeline requesterUID := file.RequesterUID - chunks, err := wp.svc.SplitMarkdownPipe(ctx, file.CreatorUID, requesterUID, string(originalFile)) + chunks, err := wp.svc.ChunkMarkdownPipe(ctx, file.CreatorUID, requesterUID, string(originalFile)) if err != nil { logger.Error("Failed to get chunks from original file.", zap.String("File uid", file.UID.String())) return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err } // Save the chunks into object storage(minIO) and metadata into database - err = wp.saveChunks(ctx, file.KnowledgeBaseUID.String(), file.UID, wp.svc.Repository.KnowledgeBaseFileTableName(), file.UID, chunks) + err = saveChunks(ctx, wp.svc, file.KnowledgeBaseUID.String(), file.UID, wp.svc.Repository.KnowledgeBaseFileTableName(), file.UID, chunks) if err != nil { logger.Error("Failed to save chunks into object storage and metadata into database.", zap.String("File uid", file.UID.String())) return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err @@ -709,7 +611,7 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep // // The function handles errors at each step and returns appropriate status codes. // If chunk retrieval fails initially, it will retry once after a 1 second delay. -func (wp *fileToEmbWorkerPool) processEmbeddingFile(ctx context.Context, file repository.KnowledgeBaseFile) (updatedFile *repository.KnowledgeBaseFile, nextStatus artifactpb.FileProcessStatus, err error) { +func (wp *persistentCatalogFileToEmbWorkerPool) processEmbeddingFile(ctx context.Context, file repository.KnowledgeBaseFile) (updatedFile *repository.KnowledgeBaseFile, nextStatus artifactpb.FileProcessStatus, err error) { logger, _ := logger.GetZapLogger(ctx) // check the file status is embedding if file.ProcessStatus != artifactpb.FileProcessStatus_name[int32(artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_EMBEDDING)] { @@ -731,7 +633,7 @@ func (wp *fileToEmbWorkerPool) processEmbeddingFile(ctx context.Context, file re } // save embedding pipeline metadata into file's extra metadata - embeddingPipelineMetadata := service.NamespaceID + "/" + service.TextEmbedPipelineID + "@" + service.TextEmbedVersion + embeddingPipelineMetadata := service.NamespaceID + "/" + service.EmbedTextPipelineID + "@" + service.EmbedTextVersion err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", "", embeddingPipelineMetadata, nil, nil, nil, nil) if err != nil { logger.Error("Failed to save embedding pipeline metadata.", zap.String("File uid:", file.UID.String())) @@ -760,7 +662,7 @@ func (wp *fileToEmbWorkerPool) processEmbeddingFile(ctx context.Context, file re KbFileUID: file.UID, } } - err = wp.saveEmbeddings(ctx, file.KnowledgeBaseUID.String(), embeddings) + err = saveEmbeddings(ctx, wp.svc, file.KnowledgeBaseUID.String(), embeddings) if err != nil { logger.Error("Failed to save embeddings into vector database and metadata into database.", zap.String("SourceUID", sourceUID.String())) return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err @@ -780,203 +682,8 @@ func (wp *fileToEmbWorkerPool) processEmbeddingFile(ctx context.Context, file re // processCompletedFile logs the completion of the file-to-embeddings process. // It checks if the file status is completed and logs the information. -func (wp *fileToEmbWorkerPool) processCompletedFile(ctx context.Context, file repository.KnowledgeBaseFile) error { +func (wp *persistentCatalogFileToEmbWorkerPool) processCompletedFile(ctx context.Context, file repository.KnowledgeBaseFile) error { logger, _ := logger.GetZapLogger(ctx) logger.Info("File to embeddings process completed.", zap.String("File uid", file.UID.String())) return nil } - -// saveConvertedFile saves a converted file into object storage and updates the metadata in the database. -func (wp *fileToEmbWorkerPool) saveConvertedFile(ctx context.Context, kbUID, fileUID uuid.UUID, name string, convertedFile []byte) error { - logger, _ := logger.GetZapLogger(ctx) - _, err := wp.svc.Repository.CreateConvertedFile( - ctx, - repository.ConvertedFile{KbUID: kbUID, FileUID: fileUID, Name: name, Type: "text/markdown", Destination: "destination"}, - func(convertedFileUID uuid.UUID) (map[string]any, error) { - // save the converted file into object storage - err := wp.svc.MinIO.SaveConvertedFile(ctx, kbUID.String(), convertedFileUID.String(), "md", convertedFile) - if err != nil { - return nil, err - } - output := make(map[string]any) - output[repository.ConvertedFileColumn.Destination] = wp.svc.MinIO.GetConvertedFilePathInKnowledgeBase(kbUID.String(), convertedFileUID.String(), "md") - return output, nil - }) - if err != nil { - logger.Error("Failed to save converted file into object storage and metadata into database.", zap.String("FileUID", fileUID.String())) - return err - } - - return nil -} - -type chunk = struct { - End int - Start int - Text string - Tokens int -} - -// saveChunks saves chunks into object storage and updates the metadata in the database. -func (wp *fileToEmbWorkerPool) saveChunks(ctx context.Context, kbUID string, kbFileUID uuid.UUID, sourceTable string, sourceUID uuid.UUID, chunks []chunk) error { - logger, _ := logger.GetZapLogger(ctx) - textChunks := make([]*repository.TextChunk, len(chunks)) - - // turn kbUid to uuid no must parse - kbUIDuuid, err := uuid.FromString(kbUID) - if err != nil { - logger.Error("Failed to parse kbUID to uuid.", zap.String("KbUID", kbUID)) - return err - } - for i, c := range chunks { - textChunks[i] = &repository.TextChunk{ - SourceUID: sourceUID, - SourceTable: sourceTable, - StartPos: c.Start, - EndPos: c.End, - ContentDest: "not set yet because we need to save the chunks in db to get the uid", - Tokens: c.Tokens, - Retrievable: true, - InOrder: i, - KbUID: kbUIDuuid, - KbFileUID: kbFileUID, - } - } - _, err = wp.svc.Repository.DeleteAndCreateChunks(ctx, sourceTable, sourceUID, textChunks, - func(chunkUIDs []string) (map[string]any, error) { - // save the chunksForMinIO into object storage - chunksForMinIO := make(map[minio.ChunkUIDType]minio.ChunkContentType, len(textChunks)) - for i, uid := range chunkUIDs { - chunksForMinIO[minio.ChunkUIDType(uid)] = minio.ChunkContentType([]byte(chunks[i].Text)) - } - err := wp.svc.MinIO.SaveTextChunks(ctx, kbUID, chunksForMinIO) - if err != nil { - logger.Error("Failed to save chunks into object storage.", zap.String("SourceUID", sourceUID.String())) - return nil, err - } - chunkDestMap := make(map[string]any, len(chunkUIDs)) - for _, chunkUID := range chunkUIDs { - chunkDestMap[chunkUID] = wp.svc.MinIO.GetChunkPathInKnowledgeBase(kbUID, string(chunkUID)) - } - return chunkDestMap, nil - }, - ) - if err != nil { - logger.Error("Failed to save chunks into object storage and metadata into database.", zap.String("SourceUID", sourceUID.String())) - return err - } - return nil -} - -type MilvusEmbedding struct { - SourceTable string - SourceUID string - EmbeddingUID string - Vector []float32 -} - -// saveEmbeddings saves embeddings into the vector database and updates the metadata in the database. -// Processes embeddings in batches of 50 to avoid timeout issues. -const batchSize = 50 - -func (wp *fileToEmbWorkerPool) saveEmbeddings(ctx context.Context, kbUID string, embeddings []repository.Embedding) error { - logger, _ := logger.GetZapLogger(ctx) - if len(embeddings) == 0 { - logger.Debug("No embeddings to save") - return nil - } - - totalEmbeddings := len(embeddings) - - // Process embeddings in batches - for i := 0; i < totalEmbeddings; i += batchSize { - // Add context check - if err := ctx.Err(); err != nil { - return fmt.Errorf("context cancelled while processing embeddings: %w", err) - } - - end := i + batchSize - if end > totalEmbeddings { - end = totalEmbeddings - } - - currentBatch := embeddings[i:end] - - externalServiceCall := func(_ []string) error { - // save the embeddings into vector database - milvusEmbeddings := make([]milvus.Embedding, len(currentBatch)) - for j, emb := range currentBatch { - milvusEmbeddings[j] = milvus.Embedding{ - SourceTable: emb.SourceTable, - SourceUID: emb.SourceUID.String(), - EmbeddingUID: emb.UID.String(), - Vector: emb.Vector, - } - } - err := wp.svc.MilvusClient.InsertVectorsToKnowledgeBaseCollection(ctx, kbUID, milvusEmbeddings) - if err != nil { - logger.Error("Failed to save embeddings batch into vector database.", - zap.String("KbUID", kbUID), - zap.Int("batch", i/batchSize+1), - zap.Int("batchSize", len(currentBatch))) - return err - } - return nil - } - - _, err := wp.svc.Repository.UpsertEmbeddings(ctx, currentBatch, externalServiceCall) - if err != nil { - logger.Error("Failed to save embeddings batch into vector database and metadata into database.", - zap.String("KbUID", kbUID), - zap.Int("batch", i/batchSize+1), - zap.Int("batchSize", len(currentBatch))) - return err - } - - logger.Info("Embeddings batch saved successfully", - zap.String("KbUID", kbUID), - zap.Int("batch", i/batchSize+1), - zap.Int("batchSize", len(currentBatch)), - zap.Int("progress", end), - zap.Int("total", totalEmbeddings)) - } - - logger.Info("All embeddings saved into vector database and metadata into database.", - zap.String("KbUID", kbUID), - zap.Int("total embeddings", totalEmbeddings)) - return nil -} - -// checkFileStatus checks if the file status from argument is the same as the file in database -func (wp *fileToEmbWorkerPool) checkFileStatus(ctx context.Context, file repository.KnowledgeBaseFile) error { - dbFiles, err := wp.svc.Repository.GetKnowledgeBaseFilesByFileUIDs(ctx, []uuid.UUID{file.UID}) - if err != nil { - return err - } - if len(dbFiles) == 0 { - return fmt.Errorf("file uid not found in database. file uid: %s", file.UID) - } - // if the file's status from argument is not the same as the file in database, skip the processing - // because the file in argument is not the latest file in database. Instead, it is from the queue. - if dbFiles[0].ProcessStatus != file.ProcessStatus { - err := fmt.Errorf("%w - file uid: %s, database file status: %v, file status in argument: %v", ErrFileStatusNotMatch, file.UID, dbFiles[0].ProcessStatus, file.ProcessStatus) - return err - } - return nil -} - -func getWorkerKey(fileUID string) string { - return workerPrefix + fileUID -} - -// mockVectorizeText is a mock implementation of the VectorizeText function. -// It generates mock vectors for the given texts. -// -//nolint:unused -func mockVectorizeText(_ context.Context, _ uuid.UUID, texts []string) ([][]float32, error) { - vectors := make([][]float32, len(texts)) - for i := range texts { - vectors[i] = make([]float32, milvus.VectorDim) - } - return vectors, nil -} diff --git a/pkg/worker/tempcatalogworker.go b/pkg/worker/tempcatalogworker.go new file mode 100644 index 0000000..ec8ed70 --- /dev/null +++ b/pkg/worker/tempcatalogworker.go @@ -0,0 +1,605 @@ +package worker + +import ( + "context" + "encoding/base64" + "fmt" + "runtime/debug" + "sync" + "time" + + "github.com/gofrs/uuid" + "github.com/instill-ai/artifact-backend/pkg/logger" + "github.com/instill-ai/artifact-backend/pkg/minio" + "github.com/instill-ai/artifact-backend/pkg/repository" + "github.com/instill-ai/artifact-backend/pkg/service" + "github.com/instill-ai/artifact-backend/pkg/utils" + artifactpb "github.com/instill-ai/protogen-go/artifact/artifact/v1alpha" + "go.uber.org/zap" +) + +type tempCatalogFileToEmbWorkerPool struct { + numberOfWorkers int + svc *service.Service + channel chan repository.KnowledgeBaseFile + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + catalogType artifactpb.CatalogType +} + +func NewTempCatalogFileToEmbWorkerPool(ctx context.Context, svc *service.Service, nums int, catalogType artifactpb.CatalogType) *tempCatalogFileToEmbWorkerPool { + ctx, cancel := context.WithCancel(ctx) + return &tempCatalogFileToEmbWorkerPool{ + numberOfWorkers: nums, + svc: svc, + // channel is un-buffered because we dont want the out of date and duplicate file in queue + channel: make(chan repository.KnowledgeBaseFile), + wg: sync.WaitGroup{}, + ctx: ctx, + cancel: cancel, + catalogType: catalogType, + } +} + +func (wp *tempCatalogFileToEmbWorkerPool) Start() { + logger, _ := logger.GetZapLogger(wp.ctx) + for i := 0; i < wp.numberOfWorkers; i++ { + wp.wg.Add(1) + go utils.GoRecover(func() { + wp.startWorker(wp.ctx, i+1) + }, fmt.Sprintf("Worker %d", i+1)) + } + // start dispatcher + wp.wg.Add(1) + go utils.GoRecover(func() { + wp.startDispatcher() + }, "Dispatcher") + logger.Info("Worker pool started") +} + +// dispatcher is responsible for dispatching the incomplete file to the worker +func (wp *tempCatalogFileToEmbWorkerPool) startDispatcher() { + logger, _ := logger.GetZapLogger(wp.ctx) + defer wp.wg.Done() + ticker := time.NewTicker(periodOfDispatcher) + defer ticker.Stop() + + logger.Info("Worker dispatcher started") + for { + select { + case <-wp.ctx.Done(): + // Context is done, exit the dispatcher + fmt.Println("Dispatcher received termination signal") + return + case <-ticker.C: + // Periodically check for incomplete files + incompleteFiles := wp.svc.Repository.GetNeedProcessFiles(wp.ctx, wp.catalogType) + // Check if any of the incomplete files have active workers + fileUIDs := make([]string, len(incompleteFiles)) + for i, file := range incompleteFiles { + fileUIDs[i] = file.UID.String() + } + nonExistentKeys := checkRegisteredFilesWorker(wp.ctx, wp.svc, fileUIDs) + + // Dispatch the files that do not have active workers + incompleteAndNonRegisteredFiles := make([]repository.KnowledgeBaseFile, 0) + for _, file := range incompleteFiles { + if _, ok := nonExistentKeys[file.UID.String()]; ok { + incompleteAndNonRegisteredFiles = append(incompleteAndNonRegisteredFiles, file) + } + } + + dispatchLoop: + for _, file := range incompleteAndNonRegisteredFiles { + select { + case <-wp.ctx.Done(): + fmt.Println("Dispatcher received termination signal while dispatching") + return + default: + select { + case wp.channel <- file: + logger.Info("Dispatcher dispatched file.", zap.String("fileUID", file.UID.String())) + default: + logger.Debug("channel is full, skip dispatching remaining files.", zap.String("fileUID", file.UID.String())) + break dispatchLoop + } + } + } + + } + } +} + +// REFACTOR : in the future, we can use async process +// so that we just need one worker to trigger the process and one worker to +// check the status of triggered process and extend the lifetime in redis... +// pros: less connection to pipeline service and less resource consumption + +func (wp *tempCatalogFileToEmbWorkerPool) startWorker(ctx context.Context, workerID int) { + logger, _ := logger.GetZapLogger(ctx) + logger.Info("Worker started", zap.Int("WorkerID", workerID)) + defer wp.wg.Done() + // Defer a function to catch panics + defer func() { + if r := recover(); r != nil { + logger.Error("Panic recovered in worker", + zap.Int("WorkerID", workerID), + zap.Any("panic", r), + zap.String("stack", string(debug.Stack()))) + // Start a new worker + logger.Info("Restarting worker after panic", zap.Int("WorkerID", workerID)) + wp.wg.Add(1) + go wp.startWorker(ctx, workerID) + } + }() + for { + select { + case <-ctx.Done(): + // Context is done, exit the worker + fmt.Printf("Worker %d received termination signal\n", workerID) + return + case file, ok := <-wp.channel: + if !ok { + // Job channel is closed, exit the worker + fmt.Printf("Job channel closed, worker %d exiting\n", workerID) + return + } + + // register file process worker in redis and extend the lifetime + ok, stopRegisterFunc := registerFileWorker(ctx, wp.svc, file.UID.String(), extensionHelperPeriod, workerLifetime) + if !ok { + if stopRegisterFunc != nil { + stopRegisterFunc() + } + continue + } + // check if the file is already processed + // Because the file is from the dispatcher, the file status is guaranteed to be incomplete + // but when the worker wakes up and tries to process the file, the file status might have been updated by other workers. + // So we need to check the file status again to ensure the file is still same as when the worker wakes up + err := checkFileStatus(ctx, wp.svc, file) + if err != nil { + logger.Warn("File status not match. skip processing", zap.String("file uid", file.UID.String()), zap.Error(err)) + if stopRegisterFunc != nil { + stopRegisterFunc() + } + continue + } + // start file processing tracing + fmt.Printf("Worker %d processing file: %s\n", workerID, file.UID.String()) + + // process + t0 := time.Now() + err = wp.processFile(ctx, file) + + if err != nil { + logger.Error("Error processing file", zap.String("file uid", file.UID.String()), zap.Error(err)) + err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, err.Error(), "", "", "", nil, nil, nil, nil) + if err != nil { + fmt.Printf("Error marshaling extra metadata: %v\n", err) + } + _, err := wp.svc.Repository.UpdateKnowledgeBaseFile(ctx, file.UID.String(), map[string]interface{}{ + repository.KnowledgeBaseFileColumn.ProcessStatus: artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_FAILED.String(), + }) + if err != nil { + fmt.Printf("Error updating file status: %v\n", err) + } + } else { + fmt.Printf("Worker %d finished processing fileUID: %s\n", workerID, file.UID.String()) + } + processingTime := int64(time.Since(t0).Seconds()) + err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", "", "", &processingTime, nil, nil, nil) + if err != nil { + fmt.Printf("Error updating file extra metadata: %v\n", err) + } + // cancel the lifetime extend helper when the file is processed + stopRegisterFunc() + } + } +} + +// stop +func (wp *tempCatalogFileToEmbWorkerPool) GraceFulStop() { + logger, _ := logger.GetZapLogger(wp.ctx) + logger.Info("Worker pool received termination signal") + close(wp.channel) + wp.cancel() + wp.wg.Wait() + logger.Info("Worker pool exited") +} + +// processFile handles the processing of a file through various stages using a state machine. +func (wp *tempCatalogFileToEmbWorkerPool) processFile(ctx context.Context, file repository.KnowledgeBaseFile) error { + logger, _ := logger.GetZapLogger(ctx) + var status artifactpb.FileProcessStatus + if statusInt, ok := artifactpb.FileProcessStatus_value[file.ProcessStatus]; !ok { + return fmt.Errorf("invalid process status: %v", file.ProcessStatus) + } else { + status = artifactpb.FileProcessStatus(statusInt) + } + + // check if the file is already processed + err := checkFileStatus(ctx, wp.svc, file) + if err != nil { + return err + } + + for { + switch status { + case artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_WAITING: + updatedFile, nextStatus, err := wp.processWaitingFile(ctx, file) + if err != nil { + return fmt.Errorf("error processing waiting file: %w", err) + } + status = nextStatus + file = *updatedFile + case artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_CONVERTING: + t0 := time.Now() + updatedFile, nextStatus, err := wp.processConvertingFile(ctx, file) + if err != nil { + return fmt.Errorf("error processing converting file: %w", err) + } + status = nextStatus + file = *updatedFile + convertingTime := int64(time.Since(t0).Seconds()) + err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", "", "", nil, &convertingTime, nil, nil) + if err != nil { + logger.Error("Error updating file extra metadata", zap.Error(err)) + } + + case artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_CHUNKING: + t0 := time.Now() + updatedFile, nextStatus, err := wp.processChunkingFilesInTempCatalog(ctx, file) + if err != nil { + return fmt.Errorf("error processing chunking file: %w", err) + } + status = nextStatus + file = *updatedFile + chunkingTime := int64(time.Since(t0).Seconds()) + err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", "", "", nil, nil, &chunkingTime, nil) + if err != nil { + logger.Error("Error updating file extra metadata", zap.Error(err)) + } + + case artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_EMBEDDING: + t0 := time.Now() + updatedFile, nextStatus, err := wp.processEmbeddingFile(ctx, file) + if err != nil { + return fmt.Errorf("error processing embedding file: %w", err) + } + status = nextStatus + file = *updatedFile + embeddingTime := int64(time.Since(t0).Seconds()) + err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", "", "", nil, nil, nil, &embeddingTime) + if err != nil { + logger.Error("Error updating file extra metadata", zap.Error(err)) + } + + case artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_COMPLETED: + err := wp.processCompletedFile(ctx, file) + if err != nil { + return err + } + return nil + } + } +} + +// processWaitingFile determines the next status based on the file type. +// For pdf, doc, docx, ppt, pptx, html, and xlsx files, it transitions to the converting status. +// For text and markdown files, it transitions to the chunking status. +// For unsupported file types, it returns an error. +func (wp *tempCatalogFileToEmbWorkerPool) processWaitingFile(ctx context.Context, file repository.KnowledgeBaseFile) (updatedFile *repository.KnowledgeBaseFile, nextStatus artifactpb.FileProcessStatus, err error) { + // check if file process status is waiting + if file.ProcessStatus != artifactpb.FileProcessStatus_name[int32(artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_WAITING)] { + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, fmt.Errorf("file process status should be waiting. status: %v", file.ProcessStatus) + } + + // Determine the next status based on the file type. + switch file.Type { + // For pdf, doc, docx, ppt, pptx, html, and xlsx files, it transitions to the converting status. + case artifactpb.FileType_FILE_TYPE_PDF.String(), + artifactpb.FileType_FILE_TYPE_DOC.String(), + artifactpb.FileType_FILE_TYPE_DOCX.String(), + artifactpb.FileType_FILE_TYPE_PPT.String(), + artifactpb.FileType_FILE_TYPE_PPTX.String(), + artifactpb.FileType_FILE_TYPE_HTML.String(), + artifactpb.FileType_FILE_TYPE_XLSX.String(), + artifactpb.FileType_FILE_TYPE_XLS.String(), + artifactpb.FileType_FILE_TYPE_CSV.String(): + // update the file status to converting status in database + updateMap := map[string]interface{}{ + repository.KnowledgeBaseFileColumn.ProcessStatus: artifactpb.FileProcessStatus_name[int32(artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_CONVERTING)], + } + updatedFile, err := wp.svc.Repository.UpdateKnowledgeBaseFile(ctx, file.UID.String(), updateMap) + if err != nil { + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err + } + return updatedFile, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_CONVERTING, nil + + // For text and markdown files, it transitions to the chunking status. + case artifactpb.FileType_name[int32(artifactpb.FileType_FILE_TYPE_TEXT)], + artifactpb.FileType_name[int32(artifactpb.FileType_FILE_TYPE_MARKDOWN)]: + + updateMap := map[string]interface{}{ + repository.KnowledgeBaseFileColumn.ProcessStatus: artifactpb.FileProcessStatus_name[int32(artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_CHUNKING)], + } + updatedFile, err := wp.svc.Repository.UpdateKnowledgeBaseFile(ctx, file.UID.String(), updateMap) + if err != nil { + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err + } + return updatedFile, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_CHUNKING, nil + + default: + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, fmt.Errorf("unsupported file type in processWaitingFile: %v", file.Type) + } +} + +// processConvertingFile processes a file with converting status. +// If the file is a PDF, it retrieves the file from MinIO, converts it to Markdown using the PDF-to-Markdown pipeline, and then transitions to chunking status. +// The converted file is saved into object storage and the metadata is updated in the database. +// Finally, the file status is updated to chunking in the database. +// If the file is not a PDF, it returns an error. +func (wp *tempCatalogFileToEmbWorkerPool) processConvertingFile(ctx context.Context, file repository.KnowledgeBaseFile) (updatedFile *repository.KnowledgeBaseFile, nextStatus artifactpb.FileProcessStatus, err error) { + logger, _ := logger.GetZapLogger(ctx) + + fileInMinIOPath := file.Destination + data, err := wp.svc.MinIO.GetFile(ctx, minio.KnowledgeBaseBucketName, fileInMinIOPath) + if err != nil { + logger.Error("Failed to get file from minIO.", zap.String("File path", fileInMinIOPath)) + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err + } + + // encode data to base64 + base64Data := base64.StdEncoding.EncodeToString(data) + + // convert the pdf file to md + requesterUID := file.RequesterUID + convertedMD, err := wp.svc.ConvertToMDPipeForFilesInTempCatalog(ctx, file.UID, file.CreatorUID, requesterUID, base64Data, artifactpb.FileType(artifactpb.FileType_value[file.Type])) + if err != nil { + logger.Error("Failed to convert pdf to md using pdf-to-md pipeline.", zap.String("File path", fileInMinIOPath)) + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err + } + + // save the converted file into object storage and metadata into database + err = saveConvertedFile(ctx, wp.svc, file.KnowledgeBaseUID, file.UID, "converted_"+file.Name, []byte(convertedMD)) + if err != nil { + logger.Error("Failed to save converted data.", zap.String("File path", fileInMinIOPath)) + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err + } + // update the file status to chunking status in database + updateMap := map[string]interface{}{ + repository.KnowledgeBaseFileColumn.ProcessStatus: artifactpb.FileProcessStatus_name[int32(artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_CHUNKING)], + } + updatedFile, err = wp.svc.Repository.UpdateKnowledgeBaseFile(ctx, file.UID.String(), updateMap) + if err != nil { + logger.Error("Failed to update file status.", zap.String("File uid", file.UID.String())) + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err + } + + return updatedFile, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_CHUNKING, nil +} + +// processChunkingFilesInTempCatalog processes a file in "chunking" status by splitting it into text chunks. +// The processing varies by file type: +// +// For document files (PDF, DOC, DOCX, PPT, PPTX, HTML, XLSX, XLS, CSV): +// - Retrieves the converted markdown file from MinIO +// - Uses text chunking pipeline to split content +// +// For TEXT files: +// - Retrieves original file from MinIO +// - Uses text chunking pipeline +// +// For MARKDOWN files: +// - Retrieves original file from MinIO +// - Uses text chunking pipeline +// +// For all file types: +// - Saves chunks to MinIO object storage +// - Updates metadata in database with chunking pipeline info +// - Updates file status to "embedding" +// +// Parameters: +// - ctx: Context for the operation +// - file: KnowledgeBaseFile struct containing file metadata +// +// Returns: +// - updatedFile: Updated KnowledgeBaseFile after processing +// - nextStatus: Next file process status (EMBEDDING if successful) +// - err: Error if any step fails +func (wp *tempCatalogFileToEmbWorkerPool) processChunkingFilesInTempCatalog(ctx context.Context, file repository.KnowledgeBaseFile) (*repository.KnowledgeBaseFile, artifactpb.FileProcessStatus, error) { + logger, _ := logger.GetZapLogger(ctx) + logger.Info("Processing chunking status file.", zap.String("File uid", file.UID.String())) + + // check the file status is chunking + if file.ProcessStatus != artifactpb.FileProcessStatus_name[int32(artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_CHUNKING)] { + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, fmt.Errorf("file process status should be chunking. status: %v", file.ProcessStatus) + } + + var fileData []byte + var sourceTable string + var sourceUID uuid.UUID + var err error + + // Get file data based on type + switch file.Type { + case artifactpb.FileType_FILE_TYPE_PDF.String(), + artifactpb.FileType_FILE_TYPE_DOC.String(), + artifactpb.FileType_FILE_TYPE_DOCX.String(), + artifactpb.FileType_FILE_TYPE_PPT.String(), + artifactpb.FileType_FILE_TYPE_PPTX.String(), + artifactpb.FileType_FILE_TYPE_HTML.String(), + artifactpb.FileType_FILE_TYPE_XLSX.String(), + artifactpb.FileType_FILE_TYPE_XLS.String(), + artifactpb.FileType_FILE_TYPE_CSV.String(): + // Get converted file for document types + convertedFile, err := wp.svc.Repository.GetConvertedFileByFileUID(ctx, file.UID) + if err != nil { + logger.Error("Failed to get converted file metadata.", zap.String("File uid", file.UID.String())) + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err + } + fileData, err = wp.svc.MinIO.GetFile(ctx, minio.KnowledgeBaseBucketName, convertedFile.Destination) + if err != nil { + logger.Error("Failed to get converted file from minIO.", zap.String("Converted file uid", convertedFile.UID.String())) + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err + } + sourceTable = wp.svc.Repository.ConvertedFileTableName() + sourceUID = convertedFile.UID + + case artifactpb.FileType_FILE_TYPE_TEXT.String(), + artifactpb.FileType_FILE_TYPE_MARKDOWN.String(): + // Get original file for text/markdown types + fileData, err = wp.svc.MinIO.GetFile(ctx, minio.KnowledgeBaseBucketName, file.Destination) + if err != nil { + logger.Error("Failed to get file from minIO.", zap.String("File uid", file.UID.String())) + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err + } + sourceTable = wp.svc.Repository.KnowledgeBaseFileTableName() + sourceUID = file.UID + + default: + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, fmt.Errorf("unsupported file type in processChunkingFile: %v", file.Type) + } + + // Common processing for all file types + requesterUID := file.RequesterUID + chunks, err := wp.svc.ChunkTextPipeForTempCatalog(ctx, file.CreatorUID, requesterUID, string(fileData)) + if err != nil { + logger.Error("Failed to get chunks from file.", zap.String("File uid", file.UID.String())) + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err + } + + // Save chunks and update metadata + err = saveChunks(ctx, wp.svc, file.KnowledgeBaseUID.String(), file.UID, sourceTable, sourceUID, chunks) + if err != nil { + logger.Error("Failed to save chunks into object storage and metadata into database.", zap.String("File uid", file.UID.String())) + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err + } + + // Update file's chunking pipeline metadata + chunkingPipelineMetadata := service.NamespaceID + "/" + service.ChunkTextPipelineID + "@" + service.ChunkTextVersion + err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", chunkingPipelineMetadata, "", nil, nil, nil, nil) + if err != nil { + logger.Error("Failed to save chunking pipeline metadata.", zap.String("File uid:", file.UID.String())) + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, fmt.Errorf("failed to save chunking pipeline metadata: %w", err) + } + + // Update file status + updateMap := map[string]interface{}{ + repository.KnowledgeBaseFileColumn.ProcessStatus: artifactpb.FileProcessStatus_name[int32(artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_EMBEDDING)], + } + updatedFile, err := wp.svc.Repository.UpdateKnowledgeBaseFile(ctx, file.UID.String(), updateMap) + if err != nil { + logger.Error("Failed to update file status.", zap.String("File uid", file.UID.String())) + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err + } + + return updatedFile, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_EMBEDDING, nil +} + +// processEmbeddingFile processes a file that is ready for embedding by: +// 1. Validating the file's process status is "EMBEDDING" +// 2. Retrieving text chunks from MinIO storage and database metadata +// - Will retry once if initial chunk retrieval fails +// +// 3. Updating file metadata with embedding pipeline version info +// - Uses TextEmbedPipelineID and TextEmbedVersion from service config +// +// 4. Calling the embedding pipeline to generate vectors from text chunks +// - Uses file creator and requester UIDs for pipeline execution +// +// 5. Saving embeddings to vector database (Milvus) and metadata to SQL database +// - Creates embeddings collection named after knowledge base UID +// - Links embeddings to source text chunks and file metadata +// +// 6. Updating file status to "COMPLETED" in database +// +// Parameters: +// - ctx: Context for the operation +// - file: KnowledgeBaseFile struct containing file metadata +// +// Returns: +// - updatedFile: Updated KnowledgeBaseFile after processing +// - nextStatus: Next file process status (COMPLETED if successful) +// - err: Error if any step fails +// +// The function handles errors at each step and returns appropriate status codes. +// If chunk retrieval fails initially, it will retry once after a 1 second delay. +func (wp *tempCatalogFileToEmbWorkerPool) processEmbeddingFile(ctx context.Context, file repository.KnowledgeBaseFile) (updatedFile *repository.KnowledgeBaseFile, nextStatus artifactpb.FileProcessStatus, err error) { + logger, _ := logger.GetZapLogger(ctx) + // check the file status is embedding + if file.ProcessStatus != artifactpb.FileProcessStatus_name[int32(artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_EMBEDDING)] { + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, fmt.Errorf("file process status should be embedding. status: %v", file.ProcessStatus) + } + + sourceTable, sourceUID, chunks, _, texts, err := wp.svc.GetChunksByFile(ctx, &file) + if err != nil { + logger.Error("Failed to get chunks from database first time.", zap.String("SourceUID", sourceUID.String())) + // TODO: investigate minIO failure. Ref: Last-Modified time format not recognized. Please report this issue at https://github.com/minio/minio-go/issues. + // retry once when get chunks failed + time.Sleep(1 * time.Second) + logger.Info("Retrying to get chunks from database.", zap.String("SourceUID", sourceUID.String())) + sourceTable, sourceUID, chunks, _, texts, err = wp.svc.GetChunksByFile(ctx, &file) + if err != nil { + logger.Error("Failed to get chunks from database second time.", zap.String("SourceUID", sourceUID.String())) + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err + } + } + + // save embedding pipeline metadata into file's extra metadata + embeddingPipelineMetadata := service.NamespaceID + "/" + service.EmbedTextPipelineID + "@" + service.EmbedTextVersion + err = wp.svc.Repository.UpdateKbFileExtraMetaData(ctx, file.UID, "", "", "", embeddingPipelineMetadata, nil, nil, nil, nil) + if err != nil { + logger.Error("Failed to save embedding pipeline metadata.", zap.String("File uid:", file.UID.String())) + return nil, + artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, + fmt.Errorf("failed to save embedding pipeline metadata: %w", err) + } + + // call the embedding pipeline + requesterUID := file.RequesterUID + vectors, err := wp.svc.EmbeddingTextPipe(ctx, file.CreatorUID, requesterUID, texts) + if err != nil { + logger.Error("Failed to get embeddings from chunks. using embedding pipeline", zap.String("SourceTable", sourceTable), zap.String("SourceUID", sourceUID.String())) + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err + } + // save the embeddings into milvus and metadata into database + collection := wp.svc.MilvusClient.GetKnowledgeBaseCollectionName(file.KnowledgeBaseUID.String()) + embeddings := make([]repository.Embedding, len(vectors)) + for i, v := range vectors { + embeddings[i] = repository.Embedding{ + SourceTable: wp.svc.Repository.TextChunkTableName(), + SourceUID: chunks[i].UID, + Vector: v, + Collection: collection, + KbUID: file.KnowledgeBaseUID, + KbFileUID: file.UID, + } + } + err = saveEmbeddings(ctx, wp.svc, file.KnowledgeBaseUID.String(), embeddings) + if err != nil { + logger.Error("Failed to save embeddings into vector database and metadata into database.", zap.String("SourceUID", sourceUID.String())) + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err + } + + // update the file status to complete status in database + updateMap := map[string]interface{}{ + repository.KnowledgeBaseFileColumn.ProcessStatus: artifactpb.FileProcessStatus_name[int32(artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_COMPLETED)], + } + updatedFile, err = wp.svc.Repository.UpdateKnowledgeBaseFile(ctx, file.UID.String(), updateMap) + if err != nil { + logger.Error("Failed to update file status.", zap.String("File uid", file.UID.String())) + return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err + } + return updatedFile, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_COMPLETED, nil +} + +// processCompletedFile logs the completion of the file-to-embeddings process. +// It checks if the file status is completed and logs the information. +func (wp *tempCatalogFileToEmbWorkerPool) processCompletedFile(ctx context.Context, file repository.KnowledgeBaseFile) error { + logger, _ := logger.GetZapLogger(ctx) + logger.Info("File to embeddings process completed.", zap.String("File uid", file.UID.String())) + return nil +}