Skip to content

Commit

Permalink
feat(kb): using preset's pipeline for file-to-embedding worker (#45)
Browse files Browse the repository at this point in the history
Because

using preset's pipeline

This commit

integrate with pipleine service
  • Loading branch information
Yougigun authored Jul 23, 2024
1 parent e76aafa commit 8c57ad1
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 23 deletions.
21 changes: 14 additions & 7 deletions pkg/client/grpc/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package grpcclient
// "os"
// "testing"

// "github.com/instill-ai/artifact-backend/pkg/service"
// pipelinev1beta "github.com/instill-ai/protogen-go/vdp/pipeline/v1beta"
// "google.golang.org/grpc/metadata"
// "google.golang.org/protobuf/types/known/structpb"
Expand Down Expand Up @@ -79,7 +80,7 @@ package grpcclient
// ctx := metadata.NewOutgoingContext(context.Background(), md)
// pipelinePublicServiceClient := pipelinev1beta.NewPipelinePublicServiceClient(pipelinePublicGrpcConn)
// req := &pipelinev1beta.TriggerOrganizationPipelineReleaseRequest{
// Name: "organizations/preset/pipelines/indexing-embed/releases/v1.0.0",
// Name: "organizations/preset/pipelines/indexing-embed/releases/v1.1.0",
// Data: []*pipelinev1beta.TriggerData{
// {
// Variable: &structpb.Struct{
Expand All @@ -102,8 +103,14 @@ package grpcclient
// if err != nil {
// t.Fatalf("failed to trigger pipeline: %v", err)
// }
// vector, err := service.GetVectorFromResponse(res)
// if err != nil {
// t.Fatalf("failed to trigger pipeline: %v", err)
// }
// t.Logf("pipeline triggered successfully")
// fmt.Println(res)
// fmt.Println("length of vector", len(vector[0]))
// // first 10 elements of the vector
// fmt.Println("vector", vector[0][:10])
// }

// func TestCEPresetConvertPDF2MdPipeReleaseRequest(t *testing.T) {
Expand Down Expand Up @@ -189,13 +196,13 @@ package grpcclient
// t.Fatalf("failed to read pdf file: %v", err)
// }
// req := &pipelinev1beta.TriggerOrganizationPipelineReleaseRequest{
// Name: "organizations/preset/pipelines/indexing-split-markdown/releases/v0.0.1",
// Name: "organizations/preset/pipelines/indexing-split-markdown/releases/v1.0.1",
// Inputs: []*structpb.Struct{
// {
// Fields: map[string]*structpb.Value{
// "text_input": {Kind: &structpb.Value_StringValue{StringValue: mdString}},
// "chunk_length": {Kind: &structpb.Value_NumberValue{NumberValue: 800}},
// "chunk_overlap": {Kind: &structpb.Value_NumberValue{NumberValue: 200}},
// "md_input": {Kind: &structpb.Value_StringValue{StringValue: mdString}},
// "max_chunk_length": {Kind: &structpb.Value_NumberValue{NumberValue: 800}},
// "chunk_overlap": {Kind: &structpb.Value_NumberValue{NumberValue: 200}},
// },
// },
// },
Expand Down Expand Up @@ -259,7 +266,7 @@ package grpcclient
// t.Fatalf("failed to read pdf file: %v", err)
// }
// req := &pipelinev1beta.TriggerOrganizationPipelineReleaseRequest{
// Name: "organizations/preset/pipelines/indexing-split-text/releases/v0.0.1",
// Name: "organizations/preset/pipelines/indexing-split-text/releases/v1.0.0",
// Inputs: []*structpb.Struct{
// {
// Fields: map[string]*structpb.Value{
Expand Down
13 changes: 8 additions & 5 deletions pkg/milvus/milvus.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,17 @@ type MilvusClient struct {
}

const (
VectorDim = 3072
VectorDim = 1536
VectorType = entity.FieldTypeFloatVector
ScannNlist = 2048
ScannNlist = 1024
MetricType = entity.COSINE
WitRaw = true
)
// Search parameter
const (
Nprobe = 250
ReorderK = 250
)

type Embedding struct {
SourceTable string
Expand Down Expand Up @@ -334,9 +339,7 @@ func (m *MilvusClient) SearchSimilarEmbeddings(ctx context.Context, collectionNa
}

// Perform the search
nprobe := 500
reorderK := 500
sp, err := entity.NewIndexSCANNSearchParam(nprobe, reorderK)
sp, err := entity.NewIndexSCANNSearchParam(Nprobe, ReorderK)
if err != nil {
log.Error("failed to create search param", zap.Error(err))
return nil, fmt.Errorf("failed to create search param: %w", err)
Expand Down
19 changes: 11 additions & 8 deletions pkg/service/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (

const chunkLength = 800
const chunkOverlap = 200
const pdfToMDVersion = "v1.0.0"
const mdSplitVersion = "v1.0.1"
const textSplitVersion = "v1.0.0"
const textEmbedVersion = "v1.1.0"

// ConvertPDFToMD using converting pipeline to convert PDF to MD and consume caller's credits
func (s *Service) ConvertPDFToMD(ctx context.Context, caller uuid.UUID, pdfBase64 string) (string, error) {
Expand All @@ -22,7 +26,7 @@ func (s *Service) ConvertPDFToMD(ctx context.Context, caller uuid.UUID, pdfBase6
ctx = metadata.NewOutgoingContext(ctx, md)

req := &pipelinev1beta.TriggerOrganizationPipelineReleaseRequest{
Name: "organizations/preset/pipelines/indexing-convert-pdf/releases/v1.0.0",
Name: "organizations/preset/pipelines/indexing-convert-pdf/releases/" + pdfToMDVersion,
Inputs: []*structpb.Struct{
{
Fields: map[string]*structpb.Value{
Expand Down Expand Up @@ -73,14 +77,14 @@ func (s *Service) SplitMarkdown(ctx context.Context, caller uuid.UUID, markdown
md := metadata.New(map[string]string{"Instill-User-Uid": caller.String(), "Instill-Auth-Type": "user"})
ctx = metadata.NewOutgoingContext(ctx, md)
req := &pipelinev1beta.TriggerOrganizationPipelineReleaseRequest{
Name: "organizations/preset/pipelines/indexing-split-markdown/releases/v0.0.1",
Name: "organizations/preset/pipelines/indexing-split-markdown/releases/" + mdSplitVersion,

Inputs: []*structpb.Struct{
{
Fields: map[string]*structpb.Value{
"text_input": {Kind: &structpb.Value_StringValue{StringValue: markdown}},
"chunk_length": {Kind: &structpb.Value_NumberValue{NumberValue: chunkLength}},
"chunk_overlap": {Kind: &structpb.Value_NumberValue{NumberValue: chunkOverlap}},
"md_input": {Kind: &structpb.Value_StringValue{StringValue: markdown}},
"max_chunk_length": {Kind: &structpb.Value_NumberValue{NumberValue: chunkLength}},
"chunk_overlap": {Kind: &structpb.Value_NumberValue{NumberValue: chunkOverlap}},
},
},
},
Expand Down Expand Up @@ -129,7 +133,7 @@ func (s *Service) SplitText(ctx context.Context, caller uuid.UUID, text string)
md := metadata.New(map[string]string{"Instill-User-Uid": caller.String(), "Instill-Auth-Type": "user"})
ctx = metadata.NewOutgoingContext(ctx, md)
req := &pipelinev1beta.TriggerOrganizationPipelineReleaseRequest{
Name: "organizations/preset/pipelines/indexing-split-text/releases/v0.0.1",
Name: "organizations/preset/pipelines/indexing-split-text/releases/" + textSplitVersion,

Inputs: []*structpb.Struct{
{
Expand All @@ -152,7 +156,6 @@ func (s *Service) SplitText(ctx context.Context, caller uuid.UUID, text string)
return result, nil
}

// TODO VectorizeText - waiting for CE to implement the global secret and use it in file-to-embeddings worker
// VectorizeText using embedding pipeline to vectorize text and consume caller's credits
func (s *Service) VectorizeText(ctx context.Context, caller uuid.UUID, texts []string) ([][]float32, error) {
md := metadata.New(map[string]string{"Instill-User-Uid": caller.String(), "Instill-Auth-Type": "user"})
Expand All @@ -167,7 +170,7 @@ func (s *Service) VectorizeText(ctx context.Context, caller uuid.UUID, texts []s
}

req := &pipelinev1beta.TriggerOrganizationPipelineReleaseRequest{
Name: "organizations/preset/pipelines/indexing-embed-text/releases/v0.0.1",
Name: "organizations/preset/pipelines/indexing-embed-text/releases/" + textEmbedVersion,

Inputs: inputs,
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,8 @@ func (wp *fileToEmbWorkerPool) processEmbeddingFile(ctx context.Context, file re
texts[i] = string(f.Content)
}
// call the embedding pipeline
// TODO replace with real embedding pipeline in service/pipeline.go
// vectors, err := wp.svc.VectorizeText(ctx, file.CreatorUID, texts)
vectors, err := mockVectorizeText(ctx, file.CreatorUID, texts)
vectors, err := wp.svc.VectorizeText(ctx, file.CreatorUID, texts)
// vectors, err := mockVectorizeText(ctx, file.CreatorUID, texts)
if err != nil {
logger.Error("Failed to get embeddings from chunks.", zap.String("SourceTable", sourceTable), zap.String("SourceUID", sourceUID.String()))
return nil, artifactpb.FileProcessStatus_FILE_PROCESS_STATUS_UNSPECIFIED, err
Expand Down Expand Up @@ -736,6 +735,7 @@ func getWorkerKey(fileUID string) string {
}

// mockVectorizeText is a mock implementation of the VectorizeText function.
//nolint:unused
func mockVectorizeText(_ context.Context, _ uuid.UUID, texts []string) ([][]float32, error) {
vectors := make([][]float32, len(texts))
for i := range texts {
Expand Down

0 comments on commit 8c57ad1

Please sign in to comment.