Skip to content

Commit

Permalink
[WIP] feat: Implenet vald index export
Browse files Browse the repository at this point in the history
  • Loading branch information
highpon committed Dec 15, 2024
1 parent 6d0bc44 commit 26e2322
Show file tree
Hide file tree
Showing 12 changed files with 1,222 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ HELM_OPERATOR_IMAGE = $(NAME)-helm-operator
INDEX_CORRECTION_IMAGE = $(NAME)-index-correction
INDEX_CREATION_IMAGE = $(NAME)-index-creation
INDEX_DELETION_IMAGE = $(NAME)-index-deletion
INDEX_EXPORTATION_IMAGE = $(NAME)-index-exportion
INDEX_EXPORTATION_IMAGE = $(NAME)-index-exportation
INDEX_IMPORTATION_IMAGE = $(NAME)-index-importation
INDEX_OPERATOR_IMAGE = $(NAME)-index-operator
INDEX_SAVE_IMAGE = $(NAME)-index-save
Expand Down
37 changes: 35 additions & 2 deletions cmd/index/job/exportation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,17 @@
// limitations under the License.
package main

import "fmt"
import (
"context"
"log"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/info"
"github.com/vdaas/vald/internal/runner"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/pkg/index/job/exportation/config"
"github.com/vdaas/vald/pkg/index/job/exportation/usecase"
)

const (
maxVersion = "v0.0.10"
Expand All @@ -22,5 +32,28 @@ const (
)

func main() {
fmt.Println("hello world")
if err := safety.RecoverFunc(func() error {
return runner.Do(
context.Background(),
runner.WithName(name),
runner.WithVersion(info.Version, maxVersion, minVersion),
runner.WithConfigLoader(func(path string) (any, *config.GlobalConfig, error) {
cfg, err := config.NewConfig(path)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load "+name+"'s configuration")
}
return cfg, &cfg.GlobalConfig, nil
}),
runner.WithDaemonInitializer(func(cfg any) (runner.Runner, error) {
c, ok := cfg.(*config.Data)
if !ok {
return nil, errors.ErrInvalidConfig
}
return usecase.New(c)
}),
)
})(); err != nil {
log.Fatal(err, info.Get())
return
}
}
193 changes: 193 additions & 0 deletions cmd/index/job/exportation/sample.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
#
# Copyright (C) 2019-2024 vdaas.org vald team <vald@vdaas.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
version: v0.0.0
time_zone: JST
logging:
format: raw
level: info
logger: glg
server_config:
servers:
- name: grpc
host: 0.0.0.0
port: 8081
grpc:
bidirectional_stream_concurrency: 20
connection_timeout: ""
header_table_size: 0
initial_conn_window_size: 0
initial_window_size: 0
interceptors: []
keepalive:
max_conn_age: ""
max_conn_age_grace: ""
max_conn_idle: ""
time: ""
timeout: ""
max_header_list_size: 0
max_receive_message_size: 0
max_send_message_size: 0
read_buffer_size: 0
write_buffer_size: 0
mode: GRPC
probe_wait_time: 3s
restart: true
health_check_servers:
- name: readiness
host: 0.0.0.0
port: 3001
http:
handler_timeout: ""
idle_timeout: ""
read_header_timeout: ""
read_timeout: ""
shutdown_duration: 0s
write_timeout: ""
mode: ""
probe_wait_time: 3s
metrics_servers:
startup_strategy:
- grpc
- readiness
full_shutdown_duration: 600s
tls:
ca: /path/to/ca
cert: /path/to/cert
enabled: false
key: /path/to/key
exporter:
concurrency: 1
kvs_background_sync_interval: 5s
kvs_background_compaction_interval: 5s
index_path: "/var/export/index"
gateway:
addrs:
- vald-lb-gateway.default.svc.cluster.local:8081
health_check_duration: "1s"
connection_pool:
enable_dns_resolver: true
enable_rebalance: true
old_conn_close_duration: 2m
rebalance_duration: 30m
size: 3
backoff:
backoff_factor: 1.1
backoff_time_limit: 5s
enable_error_log: true
initial_duration: 5ms
jitter_limit: 100ms
maximum_duration: 5s
retry_count: 100
circuit_breaker:
closed_error_rate: 0.7
closed_refresh_timeout: 10s
half_open_error_rate: 0.5
min_samples: 1000
open_timeout: 1s
call_option:
content_subtype: ""
max_recv_msg_size: 0
max_retry_rpc_buffer_size: 0
max_send_msg_size: 0
wait_for_ready: true
dial_option:
authority: ""
backoff_base_delay: 1s
backoff_jitter: 0.2
backoff_max_delay: 120s
backoff_multiplier: 1.6
disable_retry: false
enable_backoff: false
idle_timeout: 1h
initial_connection_window_size: 2097152
initial_window_size: 1048576
insecure: true
interceptors: []
keepalive:
permit_without_stream: false
time: ""
timeout: 30s
max_call_attempts: 0
max_header_list_size: 0
max_msg_size: 0
min_connection_timeout: 20s
net:
dialer:
dual_stack_enabled: true
keepalive: ""
timeout: ""
dns:
cache_enabled: true
cache_expiration: 1h
refresh_duration: 30m
socket_option:
ip_recover_destination_addr: false
ip_transparent: false
reuse_addr: true
reuse_port: true
tcp_cork: false
tcp_defer_accept: false
tcp_fast_open: false
tcp_no_delay: false
tcp_quick_ack: false
tls:
ca: /path/to/ca
cert: /path/to/cert
enabled: false
insecure_skip_verify: false
key: /path/to/key
read_buffer_size: 0
shared_write_buffer: false
timeout: ""
user_agent: Vald-gRPC
write_buffer_size: 0
tls:
ca: /path/to/ca
cert: /path/to/cert
enabled: false
insecure_skip_verify: false
key: /path/to/key
observability:
enabled: false
otlp:
collector_endpoint: "otel-collector.monitoring.svc.cluster.local:4317"
trace_batch_timeout: "1s"
trace_export_timeout: "1m"
trace_max_export_batch_size: 1024
trace_max_queue_size: 256
metrics_export_interval: "1s"
metrics_export_timeout: "1m"
attribute:
namespace: "_MY_POD_NAMESPACE_"
pod_name: "_MY_POD_NAME_"
node_name: "_MY_NODE_NAME_"
service_name: "vald-index-deletion"
metrics:
enable_cgo: true
enable_goroutine: true
enable_memory: true
enable_version_info: true
version_info_labels:
- vald_version
- server_name
- git_commit
- build_time
- go_version
- go_os
- go_arch
- algorithm_info
trace:
enabled: true
1 change: 1 addition & 0 deletions dockers/index/job/exportation/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ RUN --mount=type=bind,target=.,rw \
FROM gcr.io/distroless/static:nonroot
LABEL maintainer="vdaas.org vald team <vald@vdaas.org>"
COPY --from=builder /usr/bin/index-exportation /usr/bin/index-exportation
COPY cmd/index/job/exportation/sample.yaml /etc/server/config.yaml
# skipcq: DOK-DL3002
USER nonroot:nonroot
ENTRYPOINT ["/usr/bin/index-exportation"]
43 changes: 43 additions & 0 deletions internal/config/index_exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (C) 2019-2024 vdaas.org vald team <vald@vdaas.org>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package config

// IndexExporter represents the configurations for index exportation.
type IndexExporter struct {
// Concurrency represents indexing concurrency.
Concurrency int `json:"concurrency" yaml:"concurrency"`

// KVSBackgroundSyncInterval represents interval for checked id list kvs sync duration
KVSBackgroundSyncInterval string `json:"kvs_background_sync_interval" yaml:"kvs_background_sync_interval"`

// KVSBackgroundCompactionInterval represents interval for checked id list kvs compaction duration
KVSBackgroundCompactionInterval string `json:"kvs_background_compaction_interval" yaml:"kvs_background_compaction_interval"`

// IndexPath represents the export index file path
IndexPath string `json:"index_path,omitempty" yaml:"index_path"`

// Gateway represent gateway service configuration
Gateway *GRPCClient `json:"gateway" yaml:"gateway"`
}

func (e *IndexExporter) Bind() *IndexExporter {
e.KVSBackgroundCompactionInterval = GetActualValue(e.KVSBackgroundCompactionInterval)
e.KVSBackgroundSyncInterval = GetActualValue(e.KVSBackgroundSyncInterval)
e.IndexPath = GetActualValue(e.IndexPath)

if e.Gateway != nil {
e.Gateway = e.Gateway.Bind()
}
return e
}
Loading

0 comments on commit 26e2322

Please sign in to comment.