Skip to content

Commit

Permalink
feat(artifact): adopt the advanced converting pipeline (#127)
Browse files Browse the repository at this point in the history
Because 

our original converting pipeline is not versatile enough for some
complex documents.

This commit 

adopts the advanced pipeline using VLM.
  • Loading branch information
Yougigun authored Nov 13, 2024
1 parent c630c10 commit b5be01b
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 20 deletions.
26 changes: 14 additions & 12 deletions pkg/service/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ const chunkLength = 1024
const chunkOverlap = 200
const NamespaceID = "preset"

// Note: this pipeline is for the old indexing pipeline
// Note: this pipeline is for the old indexing pipeline having convert_result
const ConvertDocToMDPipelineID = "indexing-convert-pdf"
const DocToMDVersion = "v1.1.1"

// TODO: we revert to the old pipeline. it will change to the new pipeline later.
const ConvertDocToMDPipelineID2 = "indexing-convert-pdf"

// TODO: we need to update the version after the new pipeline is ready
const DocToMDVersion2 = "v1.1.1"
// Note: this pipeline is for the new indexing pipeline having convert_result or convert_result2
const ConvertDocToMDPipelineID2 = "indexing-advanced-convert-doc"
const DocToMDVersion2 = "v1.2.0"

const MdChunkPipelineID = "indexing-split-markdown"
const MdSplitVersion = "v2.0.0"
Expand Down Expand Up @@ -144,8 +142,9 @@ func getFileTypePrefix(fileType artifactPb.FileType) string {
}
}

// Helper function to safely extract the "convert_result" from the response.
// It checks if the index and key are available to avoid nil pointer issues.
// getConvertResult extracts the conversion result from the pipeline response.
// It first checks for a non-empty "convert_result" field, then falls back to "convert_result2".
// Returns an error if neither field contains valid data or if the response structure is invalid.
func getConvertResult(resp *pipelinePb.TriggerNamespacePipelineReleaseResponse) (string, error) {
if resp == nil || len(resp.Outputs) == 0 {
return "", fmt.Errorf("response is nil or has no outputs. resp: %v", resp)
Expand All @@ -155,10 +154,14 @@ func getConvertResult(resp *pipelinePb.TriggerNamespacePipelineReleaseResponse)
return "", fmt.Errorf("fields in the output are nil. resp: %v", resp)
}
convertResult, ok := fields["convert_result"]
if !ok {
return "", fmt.Errorf("convert_result not found in the output fields. resp: %v", resp)
if ok && convertResult.GetStringValue() != "" {
return convertResult.GetStringValue(), nil
}
return convertResult.GetStringValue(), nil
convertResult2, ok2 := fields["convert_result2"]
if ok2 && convertResult2.GetStringValue() != "" {
return convertResult2.GetStringValue(), nil
}
return "", fmt.Errorf("convert_result or convert_result2 not found in the output fields. resp: %v", resp)
}

type Chunk = struct {
Expand Down Expand Up @@ -365,7 +368,6 @@ func (s *Service) EmbeddingTextPipe(ctx context.Context, caller uuid.UUID, reque
batch := texts[i:end]
batchIndex := i / maxBatchSize


// Acquire semaphore before starting goroutine
sem <- struct{}{}
wg.Add(1)
Expand Down
55 changes: 47 additions & 8 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,11 +490,37 @@ func (wp *fileToEmbWorkerPool) processConvertingFile(ctx context.Context, file r
return updatedFile, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_CHUNKING, nil
}

// Processes a file with the status "chunking".
// If the file is a PDF or other document type, it retrieves the converted file from MinIO and calls the markdown chunking pipeline.
// If the file is a text or markdown file, it retrieves the file from MinIO and calls the respective chunking pipeline.
// The resulting chunks are saved into object storage and metadata is updated in the database.
// Finally, the file status is updated to "embedding" in the database.
// Processes a file with the status "chunking" by splitting it into text chunks.
// The processing varies by file type:
//
// For PDF, DOC, DOCX, PPT, PPTX, HTML, XLSX, XLS, CSV:
// - Retrieves converted file from MinIO
// - For spreadsheet files (XLSX, XLS, CSV): Uses markdown chunking pipeline
// - For other document types: Uses text chunking pipeline
//
// For TEXT files:
// - Retrieves original file from MinIO
// - Uses text chunking pipeline
//
// For MARKDOWN files:
// - Retrieves original file from MinIO
// - Uses markdown chunking pipeline
//
// For all file types:
// - Saves chunks to object storage
// - Updates metadata in database with chunking pipeline info
// - Updates file status to "embedding"
//
// Parameters:
// - ctx: Context for the operation
// - file: KnowledgeBaseFile struct containing file metadata
//
// Returns:
// - updatedFile: Updated KnowledgeBaseFile after processing
// - nextStatus: Next file process status (EMBEDDING if successful)
// - err: Error if any step fails
//
// The function handles errors at each step and returns appropriate status codes.
func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file repository.KnowledgeBaseFile) (*repository.KnowledgeBaseFile, artifactpb.FileProcessStatus, error) {
logger, _ := logger.GetZapLogger(ctx)
logger.Info("Processing chunking status file.", zap.String("File uid", file.UID.String()))
Expand Down Expand Up @@ -527,10 +553,23 @@ func (wp *fileToEmbWorkerPool) processChunkingFile(ctx context.Context, file rep
return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err
}

// TODO: some file use splitTextPipe and some use splitMarkdownPipe
// call the markdown chunking pipeline
requesterUID := file.RequesterUID
chunks, err := wp.svc.SplitMarkdownPipe(ctx, file.CreatorUID, requesterUID, string(convertedFileData))
chunks := []service.Chunk{}
switch file.Type {
case artifactpb.FileType_FILE_TYPE_XLSX.String(),
artifactpb.FileType_FILE_TYPE_XLS.String(),
artifactpb.FileType_FILE_TYPE_CSV.String():
requesterUID := file.RequesterUID
chunks, err = wp.svc.SplitMarkdownPipe(ctx, file.CreatorUID, requesterUID, string(convertedFileData))
case artifactpb.FileType_FILE_TYPE_PDF.String(),
artifactpb.FileType_FILE_TYPE_DOCX.String(),
artifactpb.FileType_FILE_TYPE_DOC.String(),
artifactpb.FileType_FILE_TYPE_PPTX.String(),
artifactpb.FileType_FILE_TYPE_PPT.String(),
artifactpb.FileType_FILE_TYPE_HTML.String():
requesterUID := file.RequesterUID
chunks, err = wp.svc.SplitTextPipe(ctx, file.CreatorUID, requesterUID, string(convertedFileData))
}
if err != nil {
logger.Error("Failed to get chunks from converted file using markdown chunking pipeline.", zap.String("Converted file uid", convertedFile.UID.String()))
return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err
Expand Down

0 comments on commit b5be01b

Please sign in to comment.