diff --git a/pkg/service/pipeline.go b/pkg/service/pipeline.go index e21bd97..df09f58 100644 --- a/pkg/service/pipeline.go +++ b/pkg/service/pipeline.go @@ -20,15 +20,13 @@ const chunkLength = 1024 const chunkOverlap = 200 const NamespaceID = "preset" -// Note: this pipeline is for the old indexing pipeline +// Note: this pipeline is for the old indexing pipeline having convert_result const ConvertDocToMDPipelineID = "indexing-convert-pdf" const DocToMDVersion = "v1.1.1" -// TODO: we revert to the old pipeline. it will change to the new pipeline later. -const ConvertDocToMDPipelineID2 = "indexing-convert-pdf" - -// TODO: we need to update the version after the new pipeline is ready -const DocToMDVersion2 = "v1.1.1" +// Note: this pipeline is for the new indexing pipeline having convert_result or convert_result2 +const ConvertDocToMDPipelineID2 = "indexing-advanced-convert-doc" +const DocToMDVersion2 = "v1.2.0" const MdChunkPipelineID = "indexing-split-markdown" const MdSplitVersion = "v2.0.0" @@ -144,8 +142,9 @@ func getFileTypePrefix(fileType artifactPb.FileType) string { } } -// Helper function to safely extract the "convert_result" from the response. -// It checks if the index and key are available to avoid nil pointer issues. +// getConvertResult extracts the conversion result from the pipeline response. +// It first checks for a non-empty "convert_result" field, then falls back to "convert_result2". +// Returns an error if neither field contains valid data or if the response structure is invalid. func getConvertResult(resp *pipelinePb.TriggerNamespacePipelineReleaseResponse) (string, error) { if resp == nil || len(resp.Outputs) == 0 { return "", fmt.Errorf("response is nil or has no outputs. resp: %v", resp) @@ -155,10 +154,14 @@ func getConvertResult(resp *pipelinePb.TriggerNamespacePipelineReleaseResponse) return "", fmt.Errorf("fields in the output are nil. resp: %v", resp) } convertResult, ok := fields["convert_result"] - if !ok { - return "", fmt.Errorf("convert_result not found in the output fields. resp: %v", resp) + if ok && convertResult.GetStringValue() != "" { + return convertResult.GetStringValue(), nil } - return convertResult.GetStringValue(), nil + convertResult2, ok2 := fields["convert_result2"] + if ok2 && convertResult2.GetStringValue() != "" { + return convertResult2.GetStringValue(), nil + } + return "", fmt.Errorf("convert_result or convert_result2 not found in the output fields. resp: %v", resp) } type Chunk = struct { @@ -365,7 +368,6 @@ func (s *Service) EmbeddingTextPipe(ctx context.Context, caller uuid.UUID, reque batch := texts[i:end] batchIndex := i / maxBatchSize - // Acquire semaphore before starting goroutine sem <- struct{}{} wg.Add(1) diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index a3d9cf7..094c0e7 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -490,11 +490,37 @@ func (wp *fileToEmbWorkerPool) processConvertingFile(ctx context.Context, file r return updatedFile, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_CHUNKING, nil } -// Processes a file with the status "chunking". -// If the file is a PDF or other document type, it retrieves the converted file from MinIO and calls the markdown chunking pipeline. -// If the file is a text or markdown file, it retrieves the file from MinIO and calls the respective chunking pipeline. -// The resulting chunks are saved into object storage and metadata is updated in the database. -// Finally, the file status is updated to "embedding" in the database. +// Processes a file with the status "chunking" by splitting it into text chunks. +// The processing varies by file type: +// +// For PDF, DOC, DOCX, PPT, PPTX, HTML, XLSX, XLS, CSV: +// - Retrieves converted file from MinIO +// - For spreadsheet files (XLSX, XLS, CSV): Uses markdown chunking pipeline +// - For other document types: Uses text chunking pipeline +// +// For TEXT files: +// - Retrieves original file from MinIO +// - Uses text chunking pipeline +// +// For MARKDOWN files: +// - Retrieves original file from MinIO +// - Uses markdown chunking pipeline +// +// For all file types: +// - Saves chunks to object storage +// - Updates metadata in database with chunking pipeline info +// - Updates file status to "embedding" +// +// Parameters: +// - ctx: Context for the operation +// - file: KnowledgeBaseFile struct containing file metadata +// +// Returns: +// - updatedFile: Updated KnowledgeBaseFile after processing +// - nextStatus: Next file process status (EMBEDDING if successful) +// - err: Error if any step fails +// +// The function handles errors at each step and returns appropriate status codes. func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file repository.KnowledgeBaseFile) (*repository.KnowledgeBaseFile, artifactpb.FileProcessStatus, error) { logger, _ := logger.GetZapLogger(ctx) logger.Info("Processing chunking status file.", zap.String("File uid", file.UID.String())) @@ -527,10 +553,23 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err } - // TODO: some file use splitTextPipe and some use splitMarkdownPipe // call the markdown chunking pipeline - requesterUID := file.RequesterUID - chunks, err := wp.svc.SplitMarkdownPipe(ctx, file.CreatorUID, requesterUID, string(convertedFileData)) + chunks := []service.Chunk{} + switch file.Type { + case artifactpb.FileType_FILE_TYPE_XLSX.String(), + artifactpb.FileType_FILE_TYPE_XLS.String(), + artifactpb.FileType_FILE_TYPE_CSV.String(): + requesterUID := file.RequesterUID + chunks, err = wp.svc.SplitMarkdownPipe(ctx, file.CreatorUID, requesterUID, string(convertedFileData)) + case artifactpb.FileType_FILE_TYPE_PDF.String(), + artifactpb.FileType_FILE_TYPE_DOCX.String(), + artifactpb.FileType_FILE_TYPE_DOC.String(), + artifactpb.FileType_FILE_TYPE_PPTX.String(), + artifactpb.FileType_FILE_TYPE_PPT.String(), + artifactpb.FileType_FILE_TYPE_HTML.String(): + requesterUID := file.RequesterUID + chunks, err = wp.svc.SplitTextPipe(ctx, file.CreatorUID, requesterUID, string(convertedFileData)) + } if err != nil { logger.Error("Failed to get chunks from converted file using markdown chunking pipeline.", zap.String("Converted file uid", convertedFile.UID.String())) return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err