diff --git a/pkg/client/grpc/pipeline_test.go b/pkg/client/grpc/pipeline_test.go index 1fa2a22..29a3e42 100644 --- a/pkg/client/grpc/pipeline_test.go +++ b/pkg/client/grpc/pipeline_test.go @@ -8,6 +8,7 @@ package grpcclient // "os" // "testing" +// "github.com/instill-ai/artifact-backend/pkg/service" // pipelinev1beta "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta" // "google.golang.org/grpc/metadata" // "google.golang.org/protobuf/types/known/structpb" @@ -79,7 +80,7 @@ package grpcclient // ctx := metadata.NewOutgoingContext(context.Background(), md) // pipelinePublicServiceClient := pipelinev1beta.NewPipelinePublicServiceClient(pipelinePublicGrpcConn) // req := &pipelinev1beta.TriggerOrganizationPipelineReleaseRequest{ -// Name: "organizations/preset/pipelines/indexing-embed/releases/v1.0.0", +// Name: "organizations/preset/pipelines/indexing-embed/releases/v1.1.0", // Data: []*pipelinev1beta.TriggerData{ // { // Variable: &structpb.Struct{ @@ -102,8 +103,14 @@ package grpcclient // if err != nil { // t.Fatalf("failed to trigger pipeline: %v", err) // } +// vector, err := service.GetVectorFromResponse(res) +// if err != nil { +// t.Fatalf("failed to trigger pipeline: %v", err) +// } // t.Logf("pipeline triggered successfully") -// fmt.Println(res) +// fmt.Println("length of vector", len(vector[0])) +// // first 10 elements of the vector +// fmt.Println("vector", vector[0][:10]) // } // func TestCEPresetConvertPDF2MdPipeReleaseRequest(t *testing.T) { @@ -189,13 +196,13 @@ package grpcclient // t.Fatalf("failed to read pdf file: %v", err) // } // req := &pipelinev1beta.TriggerOrganizationPipelineReleaseRequest{ -// Name: "organizations/preset/pipelines/indexing-split-markdown/releases/v0.0.1", +// Name: "organizations/preset/pipelines/indexing-split-markdown/releases/v1.0.1", // Inputs: []*structpb.Struct{ // { // Fields: map[string]*structpb.Value{ -// "text_input": {Kind: &structpb.Value_StringValue{StringValue: mdString}}, -// "chunk_length": {Kind: &structpb.Value_NumberValue{NumberValue: 800}}, -// "chunk_overlap": {Kind: &structpb.Value_NumberValue{NumberValue: 200}}, +// "md_input": {Kind: &structpb.Value_StringValue{StringValue: mdString}}, +// "max_chunk_length": {Kind: &structpb.Value_NumberValue{NumberValue: 800}}, +// "chunk_overlap": {Kind: &structpb.Value_NumberValue{NumberValue: 200}}, // }, // }, // }, @@ -259,7 +266,7 @@ package grpcclient // t.Fatalf("failed to read pdf file: %v", err) // } // req := &pipelinev1beta.TriggerOrganizationPipelineReleaseRequest{ -// Name: "organizations/preset/pipelines/indexing-split-text/releases/v0.0.1", +// Name: "organizations/preset/pipelines/indexing-split-text/releases/v1.0.0", // Inputs: []*structpb.Struct{ // { // Fields: map[string]*structpb.Value{ diff --git a/pkg/milvus/milvus.go b/pkg/milvus/milvus.go index 91fe298..4716a26 100644 --- a/pkg/milvus/milvus.go +++ b/pkg/milvus/milvus.go @@ -32,12 +32,17 @@ type MilvusClient struct { } const ( - VectorDim = 3072 + VectorDim = 1536 VectorType = entity.FieldTypeFloatVector - ScannNlist = 2048 + ScannNlist = 1024 MetricType = entity.COSINE WitRaw = true ) +// Search parameter +const ( + Nprobe = 250 + ReorderK = 250 +) type Embedding struct { SourceTable string @@ -334,9 +339,7 @@ func (m *MilvusClient) SearchSimilarEmbeddings(ctx context.Context, collectionNa } // Perform the search - nprobe := 500 - reorderK := 500 - sp, err := entity.NewIndexSCANNSearchParam(nprobe, reorderK) + sp, err := entity.NewIndexSCANNSearchParam(Nprobe, ReorderK) if err != nil { log.Error("failed to create search param", zap.Error(err)) return nil, fmt.Errorf("failed to create search param: %w", err) diff --git a/pkg/service/pipeline.go b/pkg/service/pipeline.go index 36629af..93df61e 100644 --- a/pkg/service/pipeline.go +++ b/pkg/service/pipeline.go @@ -14,6 +14,10 @@ import ( const chunkLength = 800 const chunkOverlap = 200 +const pdfToMDVersion = "v1.0.0" +const mdSplitVersion = "v1.0.1" +const textSplitVersion = "v1.0.0" +const textEmbedVersion = "v1.1.0" // ConvertPDFToMD using converting pipeline to convert PDF to MD and consume caller's credits func (s *Service) ConvertPDFToMD(ctx context.Context, caller uuid.UUID, pdfBase64 string) (string, error) { @@ -22,7 +26,7 @@ func (s *Service) ConvertPDFToMD(ctx context.Context, caller uuid.UUID, pdfBase6 ctx = metadata.NewOutgoingContext(ctx, md) req := &pipelinev1beta.TriggerOrganizationPipelineReleaseRequest{ - Name: "organizations/preset/pipelines/indexing-convert-pdf/releases/v1.0.0", + Name: "organizations/preset/pipelines/indexing-convert-pdf/releases/" + pdfToMDVersion, Inputs: []*structpb.Struct{ { Fields: map[string]*structpb.Value{ @@ -73,14 +77,14 @@ func (s *Service) SplitMarkdown(ctx context.Context, caller uuid.UUID, markdown md := metadata.New(map[string]string{"Instill-User-Uid": caller.String(), "Instill-Auth-Type": "user"}) ctx = metadata.NewOutgoingContext(ctx, md) req := &pipelinev1beta.TriggerOrganizationPipelineReleaseRequest{ - Name: "organizations/preset/pipelines/indexing-split-markdown/releases/v0.0.1", + Name: "organizations/preset/pipelines/indexing-split-markdown/releases/" + mdSplitVersion, Inputs: []*structpb.Struct{ { Fields: map[string]*structpb.Value{ - "text_input": {Kind: &structpb.Value_StringValue{StringValue: markdown}}, - "chunk_length": {Kind: &structpb.Value_NumberValue{NumberValue: chunkLength}}, - "chunk_overlap": {Kind: &structpb.Value_NumberValue{NumberValue: chunkOverlap}}, + "md_input": {Kind: &structpb.Value_StringValue{StringValue: markdown}}, + "max_chunk_length": {Kind: &structpb.Value_NumberValue{NumberValue: chunkLength}}, + "chunk_overlap": {Kind: &structpb.Value_NumberValue{NumberValue: chunkOverlap}}, }, }, }, @@ -129,7 +133,7 @@ func (s *Service) SplitText(ctx context.Context, caller uuid.UUID, text string) md := metadata.New(map[string]string{"Instill-User-Uid": caller.String(), "Instill-Auth-Type": "user"}) ctx = metadata.NewOutgoingContext(ctx, md) req := &pipelinev1beta.TriggerOrganizationPipelineReleaseRequest{ - Name: "organizations/preset/pipelines/indexing-split-text/releases/v0.0.1", + Name: "organizations/preset/pipelines/indexing-split-text/releases/" + textSplitVersion, Inputs: []*structpb.Struct{ { @@ -152,7 +156,6 @@ func (s *Service) SplitText(ctx context.Context, caller uuid.UUID, text string) return result, nil } -// TODO VectorizeText - waiting for CE to implement the global secret and use it in file-to-embeddings worker // VectorizeText using embedding pipeline to vectorize text and consume caller's credits func (s *Service) VectorizeText(ctx context.Context, caller uuid.UUID, texts []string) ([][]float32, error) { md := metadata.New(map[string]string{"Instill-User-Uid": caller.String(), "Instill-Auth-Type": "user"}) @@ -167,7 +170,7 @@ func (s *Service) VectorizeText(ctx context.Context, caller uuid.UUID, texts []s } req := &pipelinev1beta.TriggerOrganizationPipelineReleaseRequest{ - Name: "organizations/preset/pipelines/indexing-embed-text/releases/v0.0.1", + Name: "organizations/preset/pipelines/indexing-embed-text/releases/" + textEmbedVersion, Inputs: inputs, } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index aaa0c5c..2e3fe83 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -567,9 +567,8 @@ func (wp *fileToEmbWorkerPool) processEmbeddingFile(ctx context.Context, file re texts[i] = string(f.Content) } // call the embedding pipeline - // TODO replace with real embedding pipeline in service/pipeline.go - // vectors, err := wp.svc.VectorizeText(ctx, file.CreatorUID, texts) - vectors, err := mockVectorizeText(ctx, file.CreatorUID, texts) + vectors, err := wp.svc.VectorizeText(ctx, file.CreatorUID, texts) + // vectors, err := mockVectorizeText(ctx, file.CreatorUID, texts) if err != nil { logger.Error("Failed to get embeddings from chunks.", zap.String("SourceTable", sourceTable), zap.String("SourceUID", sourceUID.String())) return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err @@ -736,6 +735,7 @@ func getWorkerKey(fileUID string) string { } // mockVectorizeText is a mock implementation of the VectorizeText function. +//nolint:unused func mockVectorizeText(_ context.Context, _ uuid.UUID, texts []string) ([][]float32, error) { vectors := make([][]float32, len(texts)) for i := range texts {