From 4d266b84670f3ebd148aa7e86b3d1ad6f3681c66 Mon Sep 17 00:00:00 2001 From: Kiichiro YUKAWA Date: Mon, 22 Apr 2024 16:21:41 +0900 Subject: [PATCH] Implement client metrics interceptor for continuous benchmark job (#2477) * :recycle: Mount benchmark-operator-config to vald-benchmark-job Pod Signed-off-by: vankichi * :chart_with_upwards_trend: add framework for send gRPC client metrics Signed-off-by: vankichi * style: format code with Gofumpt and Prettier This commit fixes the style issues introduced in a532b7d according to the output from Gofumpt and Prettier. Details: https://github.com/vdaas/vald/pull/2477 * :chart_with_upwards_trend: Add client metric interceptor Signed-off-by: vankichi * :recycle: Change vbor chart for apply general job configuration Signed-off-by: vankichi * Update pkg/tools/benchmark/job/usecase/benchmarkd.go Co-authored-by: Hiroto Funakoshi Signed-off-by: Kiichiro YUKAWA * :bug: Fix build error and add comment Signed-off-by: vankichi * :recycle: Fix format Signed-off-by: vankichi --------- Signed-off-by: vankichi Signed-off-by: Kiichiro YUKAWA Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com> Co-authored-by: Hiroto Funakoshi --- .../crds/valdbenchmarkoperatorrelease.yaml | 200 +++++++++++++- .../templates/configmap.yaml | 5 +- charts/vald-benchmark-operator/values.yaml | 246 +++++++++++++++++- cmd/tools/benchmark/job/sample.yaml | 16 +- internal/config/benchmark.go | 12 +- .../k8s/vald/benchmark/job/job_template.go | 51 +++- .../vald/benchmark/job/job_template_option.go | 15 +- .../grpc/interceptor/client/metric/metric.go | 93 +++++++ internal/net/grpc/option.go | 12 + .../10-vald-benchmark-operator.yaml | 8 +- k8s/tools/benchmark/operator/configmap.yaml | 86 +++++- .../crds/valdbenchmarkoperatorrelease.yaml | 200 +++++++++++++- pkg/tools/benchmark/job/config/config.go | 2 + pkg/tools/benchmark/job/usecase/benchmarkd.go | 36 ++- pkg/tools/benchmark/operator/config/config.go | 10 +- .../benchmark/operator/service/operator.go | 12 +- .../operator/service/operator_test.go | 106 ++++---- .../benchmark/operator/service/option.go | 35 ++- .../benchmark/operator/usecase/benchmarkd.go | 7 +- 19 files changed, 1016 insertions(+), 136 deletions(-) create mode 100644 internal/net/grpc/interceptor/client/metric/metric.go diff --git a/charts/vald-benchmark-operator/crds/valdbenchmarkoperatorrelease.yaml b/charts/vald-benchmark-operator/crds/valdbenchmarkoperatorrelease.yaml index 92458c639c..16a5c912b0 100644 --- a/charts/vald-benchmark-operator/crds/valdbenchmarkoperatorrelease.yaml +++ b/charts/vald-benchmark-operator/crds/valdbenchmarkoperatorrelease.yaml @@ -86,19 +86,197 @@ spec: type: string tag: type: string - job_image: + job: type: object properties: - pullPolicy: - type: string - enum: - - Always - - Never - - IfNotPresent - repository: - type: string - tag: - type: string + client_config: + type: object + properties: + addrs: + type: array + items: + type: string + backoff: + type: object + properties: + backoff_factor: + type: number + backoff_time_limit: + type: string + enable_error_log: + type: boolean + initial_duration: + type: string + jitter_limit: + type: string + maximum_duration: + type: string + retry_count: + type: integer + call_option: + type: object + x-kubernetes-preserve-unknown-fields: true + circuit_breaker: + type: object + properties: + closed_error_rate: + type: number + closed_refresh_timeout: + type: string + half_open_error_rate: + type: number + min_samples: + type: integer + open_timeout: + type: string + connection_pool: + type: object + properties: + enable_dns_resolver: + type: boolean + enable_rebalance: + type: boolean + old_conn_close_duration: + type: string + rebalance_duration: + type: string + size: + type: integer + dial_option: + type: object + properties: + backoff_base_delay: + type: string + backoff_jitter: + type: number + backoff_max_delay: + type: string + backoff_multiplier: + type: number + enable_backoff: + type: boolean + initial_connection_window_size: + type: integer + initial_window_size: + type: integer + insecure: + type: boolean + interceptors: + type: array + items: + type: string + enum: + - TraceInterceptor + keepalive: + type: object + properties: + permit_without_stream: + type: boolean + time: + type: string + timeout: + type: string + max_msg_size: + type: integer + min_connection_timeout: + type: string + net: + type: object + properties: + dialer: + type: object + properties: + dual_stack_enabled: + type: boolean + keepalive: + type: string + timeout: + type: string + dns: + type: object + properties: + cache_enabled: + type: boolean + cache_expiration: + type: string + refresh_duration: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + read_buffer_size: + type: integer + timeout: + type: string + write_buffer_size: + type: integer + health_check_duration: + type: string + max_recv_msg_size: + type: integer + max_retry_rpc_buffer_size: + type: integer + max_send_msg_size: + type: integer + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + wait_for_ready: + type: boolean + image: + type: object + properties: + pullPolicy: + type: string + enum: + - Always + - Never + - IfNotPresent + repository: + type: string + tag: + type: string logging: type: object properties: diff --git a/charts/vald-benchmark-operator/templates/configmap.yaml b/charts/vald-benchmark-operator/templates/configmap.yaml index f2ddf91f85..2a92332b6b 100644 --- a/charts/vald-benchmark-operator/templates/configmap.yaml +++ b/charts/vald-benchmark-operator/templates/configmap.yaml @@ -38,6 +38,5 @@ data: observability: {{- $observability := dict "Values" .Values.observability}} {{- include "vald.observability" $observability | nindent 6 }} - job_image: - image: "{{ .Values.job_image.repository }}:{{ .Values.job_image.tag }}" - pullPolicy: {{ .Values.job_image.pullPolicy }} + job: + {{- toYaml .Values.job | nindent 6 }} diff --git a/charts/vald-benchmark-operator/values.yaml b/charts/vald-benchmark-operator/values.yaml index 3aebcabc36..59d78fd0f6 100644 --- a/charts/vald-benchmark-operator/values.yaml +++ b/charts/vald-benchmark-operator/values.yaml @@ -37,17 +37,241 @@ image: # @schema {"name": "image.pullPolicy", "type": "string", "enum": ["Always", "Never", "IfNotPresent"]} # image.pullPolicy -- image pull policy pullPolicy: Always -# @schema {"name": "job_image", "type": "object"} -job_image: - # @schema {"name": "job_image.repository", "type": "string"} - # image.repository -- job image repository - repository: vdaas/vald-benchmark-job - # @schema {"name": "job_image.tag", "type": "string"} - # image.tag -- image tag for job docker image - tag: v1.7.12 - # @schema {"name": "job_image.pullPolicy", "type": "string", "enum": ["Always", "Never", "IfNotPresent"]} - # image.pullPolicy -- image pull policy - pullPolicy: Always +# @schema {"name": "job", "type": "object"} +job: + # @schema {"name": "job.image", "type": "object"} + image: + # @schema {"name": "job.image.repository", "type": "string"} + # image.repository -- job image repository + repository: vdaas/vald-benchmark-job + # @schema {"name": "job.image.tag", "type": "string"} + # image.tag -- image tag for job docker image + tag: v1.7.12 + # @schema {"name": "job.image.pullPolicy", "type": "string", "enum": ["Always", "Never", "IfNotPresent"]} + # image.pullPolicy -- image pull policy + pullPolicy: Always + # @schema {"name": "job.client_config", "type": "object"} + # client_config -- gRPC client config for request to the Vald cluster + client_config: + # @schema {"name": "job.client_config.addrs", "type": "array", "items": {"type": "string"}} + # job.client_config.addrs -- gRPC client addresses + addrs: [] + # @schema {"name": "job.client_config.health_check_duration", "type": "string"} + # job.client_config.health_check_duration -- gRPC client health check duration + health_check_duration: "1s" + # @schema {"name": "job.client_config.connection_pool", "type": "object"} + connection_pool: + # @schema {"name": "job.client_config.connection_pool.enable_dns_resolver", "type": "boolean"} + # job.client_config.connection_pool.enable_dns_resolver -- enables gRPC client connection pool dns resolver, when enabled vald uses ip handshake exclude dns discovery which improves network performance + enable_dns_resolver: true + # @schema {"name": "job.client_config.connection_pool.enable_rebalance", "type": "boolean"} + # job.client_config.connection_pool.enable_rebalance -- enables gRPC client connection pool rebalance + enable_rebalance: true + # @schema {"name": "job.client_config.connection_pool.rebalance_duration", "type": "string"} + # job.client_config.connection_pool.rebalance_duration -- gRPC client connection pool rebalance duration + rebalance_duration: 30m + # @schema {"name": "job.client_config.connection_pool.size", "type": "integer"} + # job.client_config.connection_pool.size -- gRPC client connection pool size + size: 3 + # @schema {"name": "job.client_config.connection_pool.old_conn_close_duration", "type": "string"} + # job.client_config.connection_pool.old_conn_close_duration -- makes delay before gRPC client connection closing during connection pool rebalance + old_conn_close_duration: "2m" + # @schema {"name": "job.client_config.backoff", "type": "object", "anchor": "backoff"} + backoff: + # @schema {"name": "job.client_config.backoff.initial_duration", "type": "string"} + # job.client_config.backoff.initial_duration -- gRPC client backoff initial duration + initial_duration: 5ms + # @schema {"name": "job.client_config.backoff.backoff_time_limit", "type": "string"} + # job.client_config.backoff.backoff_time_limit -- gRPC client backoff time limit + backoff_time_limit: 5s + # @schema {"name": "job.client_config.backoff.maximum_duration", "type": "string"} + # job.client_config.backoff.maximum_duration -- gRPC client backoff maximum duration + maximum_duration: 5s + # @schema {"name": "job.client_config.backoff.jitter_limit", "type": "string"} + # job.client_config.backoff.jitter_limit -- gRPC client backoff jitter limit + jitter_limit: 100ms + # @schema {"name": "job.client_config.backoff.backoff_factor", "type": "number"} + # job.client_config.backoff.backoff_factor -- gRPC client backoff factor + backoff_factor: 1.1 + # @schema {"name": "job.client_config.backoff.retry_count", "type": "integer"} + # job.client_config.backoff.retry_count -- gRPC client backoff retry count + retry_count: 100 + # @schema {"name": "job.client_config.backoff.enable_error_log", "type": "boolean"} + # job.client_config.backoff.enable_error_log -- gRPC client backoff log enabled + enable_error_log: true + # @schema {"name": "job.client_config.circuit_breaker", "type": "object"} + circuit_breaker: + # @schema {"name": "job.client_config.circuit_breaker.closed_error_rate", "type": "number"} + # job.client_config.circuit_breaker.closed_error_rate -- gRPC client circuitbreaker closed error rate + closed_error_rate: 0.7 + # @schema {"name": "job.client_config.circuit_breaker.half_open_error_rate", "type": "number"} + # job.client_config.circuit_breaker.half_open_error_rate -- gRPC client circuitbreaker half-open error rate + half_open_error_rate: 0.5 + # @schema {"name": "job.client_config.circuit_breaker.min_samples", "type": "integer"} + # job.client_config.circuit_breaker.min_samples -- gRPC client circuitbreaker minimum sampling count + min_samples: 1000 + # @schema {"name": "job.client_config.circuit_breaker.open_timeout", "type": "string"} + # job.client_config.circuit_breaker.open_timeout -- gRPC client circuitbreaker open timeout + open_timeout: "1s" + # @schema {"name": "job.client_config.circuit_breaker.closed_refresh_timeout", "type": "string"} + # job.client_config.circuit_breaker.closed_refresh_timeout -- gRPC client circuitbreaker closed refresh timeout + closed_refresh_timeout: "10s" + # @schema {"name": "job.client_config.call_option", "type": "object"} + call_option: + # @schema {"name": "job.client_config.wait_for_ready", "type": "boolean"} + # job.client_config.call_option.wait_for_ready -- gRPC client call option wait for ready + wait_for_ready: true + # @schema {"name": "job.client_config.max_retry_rpc_buffer_size", "type": "integer"} + # job.client_config.call_option.max_retry_rpc_buffer_size -- gRPC client call option max retry rpc buffer size + max_retry_rpc_buffer_size: 0 + # @schema {"name": "job.client_config.max_recv_msg_size", "type": "integer"} + # job.client_config.call_option.max_recv_msg_size -- gRPC client call option max receive message size + max_recv_msg_size: 0 + # @schema {"name": "job.client_config.max_send_msg_size", "type": "integer"} + # job.client_config.call_option.max_send_msg_size -- gRPC client call option max send message size + max_send_msg_size: 0 + # @schema {"name": "job.client_config.dial_option", "type": "object"} + dial_option: + # @schema {"name": "job.client_config.dial_option.write_buffer_size", "type": "integer"} + # job.client_config.dial_option.write_buffer_size -- gRPC client dial option write buffer size + write_buffer_size: 0 + # @schema {"name": "job.client_config.dial_option.read_buffer_size", "type": "integer"} + # job.client_config.dial_option.read_buffer_size -- gRPC client dial option read buffer size + read_buffer_size: 0 + # @schema {"name": "job.client_config.dial_option.initial_window_size", "type": "integer"} + # job.client_config.dial_option.initial_window_size -- gRPC client dial option initial window size + initial_window_size: 0 + # @schema {"name": "job.client_config.dial_option.initial_connection_window_size", "type": "integer"} + # job.client_config.dial_option.initial_connection_window_size -- gRPC client dial option initial connection window size + initial_connection_window_size: 0 + # @schema {"name": "job.client_config.dial_option.max_msg_size", "type": "integer"} + # job.client_config.dial_option.max_msg_size -- gRPC client dial option max message size + max_msg_size: 0 + # @schema {"name": "job.client_config.dial_option.backoff_max_delay", "type": "string"} + # job.client_config.dial_option.backoff_max_delay -- gRPC client dial option max backoff delay + backoff_max_delay: "120s" + # @schema {"name": "job.client_config.dial_option.backoff_base_delay", "type": "string"} + # job.client_config.dial_option.backoff_base_delay -- gRPC client dial option base backoff delay + backoff_base_delay: "1s" + # @schema {"name": "job.client_config.dial_option.backoff_multiplier", "type": "number"} + # job.client_config.dial_option.backoff_multiplier -- gRPC client dial option base backoff delay + backoff_multiplier: 1.6 + # @schema {"name": "job.client_config.dial_option.backoff_jitter", "type": "number"} + # job.client_config.dial_option.backoff_jitter -- gRPC client dial option base backoff delay + backoff_jitter: 0.2 + # @schema {"name": "job.client_config.dial_option.min_connection_timeout", "type": "string"} + # job.client_config.dial_option.min_connection_timeout -- gRPC client dial option minimum connection timeout + min_connection_timeout: "20s" + # @schema {"name": "job.client_config.dial_option.enable_backoff", "type": "boolean"} + # job.client_config.dial_option.enable_backoff -- gRPC client dial option backoff enabled + enable_backoff: false + # @schema {"name": "job.client_config.dial_option.insecure", "type": "boolean"} + # job.client_config.dial_option.insecure -- gRPC client dial option insecure enabled + insecure: true + # @schema {"name": "job.client_config.dial_option.timeout", "type": "string"} + # job.client_config.dial_option.timeout -- gRPC client dial option timeout + timeout: "" + # @schema {"name": "job.client_config.dial_option.interceptors", "type": "array", "items": {"type": "string", "enum": ["TraceInterceptor"]}} + # job.client_config.dial_option.interceptors -- gRPC client interceptors + interceptors: [] + # @schema {"name": "job.client_config.dial_option.net", "type": "object", "anchor": "net"} + net: + # @schema {"name": "job.client_config.dial_option.net.dns", "type": "object"} + dns: + # @schema {"name": "job.client_config.dial_option.net.dns.cache_enabled", "type": "boolean"} + # job.client_config.dial_option.net.dns.cache_enabled -- gRPC client TCP DNS cache enabled + cache_enabled: true + # @schema {"name": "job.client_config.dial_option.net.dns.refresh_duration", "type": "string"} + # job.client_config.dial_option.net.dns.refresh_duration -- gRPC client TCP DNS cache refresh duration + refresh_duration: 30m + # @schema {"name": "job.client_config.dial_option.net.dns.cache_expiration", "type": "string"} + # job.client_config.dial_option.net.dns.cache_expiration -- gRPC client TCP DNS cache expiration + cache_expiration: 1h + # @schema {"name": "job.client_config.dial_option.net.dialer", "type": "object"} + dialer: + # @schema {"name": "job.client_config.dial_option.net.dialer.timeout", "type": "string"} + # job.client_config.dial_option.net.dialer.timeout -- gRPC client TCP dialer timeout + timeout: "" + # @schema {"name": "job.client_config.dial_option.net.dialer.keepalive", "type": "string"} + # job.client_config.dial_option.net.dialer.keepalive -- gRPC client TCP dialer keep alive + keepalive: "" + # @schema {"name": "job.client_config.dial_option.net.dialer.dual_stack_enabled", "type": "boolean"} + # job.client_config.dial_option.net.dialer.dual_stack_enabled -- gRPC client TCP dialer dual stack enabled + dual_stack_enabled: true + # @schema {"name": "job.client_config.dial_option.net.tls", "type": "object"} + tls: + # @schema {"name": "job.client_config.dial_option.net.tls.enabled", "type": "boolean"} + # job.client_config.tls.enabled -- TLS enabled + enabled: false + # @schema {"name": "job.client_config.dial_option.net.tls.cert", "type": "string"} + # job.client_config.tls.cert -- TLS cert path + cert: /path/to/cert + # @schema {"name": "job.client_config.dial_option.net.tls.key", "type": "string"} + # job.client_config.tls.key -- TLS key path + key: /path/to/key + # @schema {"name": "job.client_config.dial_option.net.tls.ca", "type": "string"} + # job.client_config.tls.ca -- TLS ca path + ca: /path/to/ca + # @schema {"name": "job.client_config.dial_option.net.tls.insecure_skip_verify", "type": "boolean"} + # job.client_config.tls.insecure_skip_verify -- enable/disable skip SSL certificate verification + insecure_skip_verify: false + # @schema {"name": "job.client_config.dial_option.net.socket_option", "type": "object"} + socket_option: + # @schema {"name": "job.client_config.dial_option.net.socket_option.reuse_port", "type": "boolean"} + # job.client_config.dial_option.net.socket_option.reuse_port -- server listen socket option for reuse_port functionality + reuse_port: true + # @schema {"name": "job.client_config.dial_option.net.socket_option.reuse_addr", "type": "boolean"} + # job.client_config.dial_option.net.socket_option.reuse_addr -- server listen socket option for reuse_addr functionality + reuse_addr: true + # @schema {"name": "job.client_config.dial_option.net.socket_option.tcp_fast_open", "type": "boolean"} + # job.client_config.dial_option.net.socket_option.tcp_fast_open -- server listen socket option for tcp_fast_open functionality + tcp_fast_open: true + # @schema {"name": "job.client_config.dial_option.net.socket_option.tcp_no_delay", "type": "boolean"} + # job.client_config.dial_option.net.socket_option.tcp_no_delay -- server listen socket option for tcp_no_delay functionality + tcp_no_delay: true + # @schema {"name": "job.client_config.dial_option.net.socket_option.tcp_cork", "type": "boolean"} + # job.client_config.dial_option.net.socket_option.tcp_cork -- server listen socket option for tcp_cork functionality + tcp_cork: false + # @schema {"name": "job.client_config.dial_option.net.socket_option.tcp_quick_ack", "type": "boolean"} + # job.client_config.dial_option.net.socket_option.tcp_quick_ack -- server listen socket option for tcp_quick_ack functionality + tcp_quick_ack: true + # @schema {"name": "job.client_config.dial_option.net.socket_option.tcp_defer_accept", "type": "boolean"} + # job.client_config.dial_option.net.socket_option.tcp_defer_accept -- server listen socket option for tcp_defer_accept functionality + tcp_defer_accept: true + # @schema {"name": "job.client_config.dial_option.net.socket_option.ip_transparent", "type": "boolean"} + # job.client_config.dial_option.net.socket_option.ip_transparent -- server listen socket option for ip_transparent functionality + ip_transparent: false + # @schema {"name": "job.client_config.dial_option.net.socket_option.ip_recover_destination_addr", "type": "boolean"} + # job.client_config.dial_option.net.socket_option.ip_recover_destination_addr -- server listen socket option for ip_recover_destination_addr functionality + ip_recover_destination_addr: false + # @schema {"name": "job.client_config.dial_option.keepalive", "type": "object"} + keepalive: + # @schema {"name": "job.client_config.dial_option.keepalive.time", "type": "string"} + # job.client_config.dial_option.keepalive.time -- gRPC client keep alive time + time: "120s" + # @schema {"name": "job.client_config.dial_option.keepalive.timeout", "type": "string"} + # job.client_config.dial_option.keepalive.timeout -- gRPC client keep alive timeout + timeout: "30s" + # @schema {"name": "job.client_config.dial_option.keepalive.permit_without_stream", "type": "boolean"} + # job.client_config.dial_option.keepalive.permit_without_stream -- gRPC client keep alive permit without stream + permit_without_stream: true + # @schema {"name": "job.client_config.tls", "type": "object"} + tls: + # @schema {"name": "job.client_config.tls.enabled", "type": "boolean"} + # job.client_config.tls.enabled -- TLS enabled + enabled: false + # @schema {"name": "job.client_config.tls.cert", "type": "string"} + # job.client_config.tls.cert -- TLS cert path + cert: /path/to/cert + # @schema {"name": "job.client_config.tls.key", "type": "string"} + # job.client_config.tls.key -- TLS key path + key: /path/to/key + # @schema {"name": "job.client_config.tls.ca", "type": "string"} + # job.client_config.tls.ca -- TLS ca path + ca: /path/to/ca + # @schema {"name": "job.client_config.tls.insecure_skip_verify", "type": "boolean"} + # job.client_config.tls.insecure_skip_verify -- enable/disable skip SSL certificate verification + insecure_skip_verify: false # @schema {"name": "rbac", "type": "object"} rbac: # @schema {"name": "rbac.create", "type": "boolean"} diff --git a/cmd/tools/benchmark/job/sample.yaml b/cmd/tools/benchmark/job/sample.yaml index c08b4ad603..ed71ba6f47 100644 --- a/cmd/tools/benchmark/job/sample.yaml +++ b/cmd/tools/benchmark/job/sample.yaml @@ -48,6 +48,8 @@ server_config: header_table_size: 0 interceptors: - "RecoverInterceptor" + - "TraceInterceptor" + - "MetricInterceptor" enable_reflection: true socket_option: reuse_port: true @@ -136,8 +138,8 @@ server_config: ip_recover_destination_addr: false startup_strategy: - liveness - - readiness - grpc + - readiness full_shutdown_duration: 30s tls: ca: /path/to/ca @@ -147,7 +149,7 @@ server_config: observability: enabled: false otlp: - collector_endpoint: "" + collector_endpoint: "opentelemetry-collector-collector.default.svc.cluster.local:4317" attribute: namespace: _MY_POD_NAMESPACE_ pod_name: _MY_POD_NAME_ @@ -157,6 +159,8 @@ observability: trace_export_timeout: "1m" trace_max_export_batch_size: 1024 trace_max_queue_size: 256 + metrics_export_interval: "1s" + metrics_export_timeout: "1m" metrics: enable_cgo: true enable_goroutine: true @@ -172,7 +176,7 @@ observability: - go_arch - algorithm_info trace: - enabled: false + enabled: true job: replica: 1 repetition: 1 @@ -181,7 +185,7 @@ job: rps: 200 concurrency_limit: 200 client_config: - health_check_duration: "1s" + health_check_duration: "" connection_pool: enable_dns_resolver: true enable_rebalance: true @@ -221,7 +225,9 @@ job: enable_backoff: false insecure: true timeout: "" - interceptors: [] + interceptors: + - MetricInterceptor + - TraceInterceptor net: dns: cache_enabled: true diff --git a/internal/config/benchmark.go b/internal/config/benchmark.go index fd45f4f293..7b8ab66b94 100644 --- a/internal/config/benchmark.go +++ b/internal/config/benchmark.go @@ -234,13 +234,21 @@ func (b *BenchmarkScenario) Bind() *BenchmarkScenario { // BenchmarkJobImageInfo represents the docker image information for benchmark job. type BenchmarkJobImageInfo struct { - Image string `info:"image" json:"image,omitempty" yaml:"image"` + Repository string `info:"repository" json:"repository,omitempty" yaml:"repository"` + Tag string `info:"tag" json:"tag,omitempty" yaml:"tag"` PullPolicy string `info:"pull_policy" json:"pull_policy,omitempty" yaml:"pull_policy"` } // Bind binds the actual data from the BenchmarkJobImageInfo receiver fields. func (b *BenchmarkJobImageInfo) Bind() *BenchmarkJobImageInfo { - b.Image = GetActualValue(b.Image) + b.Repository = GetActualValue(b.Repository) + b.Tag = GetActualValue(b.Tag) b.PullPolicy = GetActualValue(b.PullPolicy) return b } + +// OperatorJobConfig represents the general job configuration for operator. +type OperatorJobConfig struct { + Image *BenchmarkJobImageInfo `info:"image" json:"image,omitempty" yaml:"image"` + *BenchmarkJob +} diff --git a/internal/k8s/vald/benchmark/job/job_template.go b/internal/k8s/vald/benchmark/job/job_template.go index 486af7cd9e..fab264919c 100644 --- a/internal/k8s/vald/benchmark/job/job_template.go +++ b/internal/k8s/vald/benchmark/job/job_template.go @@ -35,12 +35,13 @@ const ( RestartPolicyAlways RestartPolicy = "Always" RestartPolicyOnFailure RestartPolicy = "OnFailure" RestartPolicyNever RestartPolicy = "Never" -) -const ( + volumeName = "vald-benchmark-job-config" svcAccount = "vald-benchmark-operator" ) +var mode = int32(420) + type BenchmarkJobTpl interface { CreateJobTpl(opts ...BenchmarkJobOption) (k8s.Job, error) } @@ -48,6 +49,7 @@ type BenchmarkJobTpl interface { type benchmarkJobTpl struct { containerName string containerImageName string + configMapName string imagePullPolicy ImagePullPolicy jobTpl k8s.Job } @@ -138,6 +140,51 @@ func (b *benchmarkJobTpl) CreateJobTpl(opts ...BenchmarkJobOption) (k8s.Job, err }, }, }, + { + Name: "MY_NODE_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "spec.nodeName", + }, + }, + }, + { + Name: "MY_POD_NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, + }, + }, + { + Name: "MY_POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: volumeName, + MountPath: "/etc/server", + }, + }, + }, + } + // mount benchmark operator config map. + // It is used for bind only observability config for each benchmark job + b.jobTpl.Spec.Template.Spec.Volumes = []corev1.Volume{ + { + Name: volumeName, + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: b.configMapName, + }, + DefaultMode: &mode, + }, }, }, } diff --git a/internal/k8s/vald/benchmark/job/job_template_option.go b/internal/k8s/vald/benchmark/job/job_template_option.go index 46574ea0c4..e664780671 100644 --- a/internal/k8s/vald/benchmark/job/job_template_option.go +++ b/internal/k8s/vald/benchmark/job/job_template_option.go @@ -52,10 +52,19 @@ func WithContainerImage(name string) BenchmarkJobTplOption { // WithImagePullPolicy sets the docker image pull policy for benchmark job. func WithImagePullPolicy(p ImagePullPolicy) BenchmarkJobTplOption { return func(b *benchmarkJobTpl) error { - if len(p) == 0 { - return nil + if len(p) > 0 { + b.imagePullPolicy = p + } + return nil + } +} + +// WithOperatorConfigMap sets the configMapName for mounting Job Pod. +func WithOperatorConfigMap(cm string) BenchmarkJobTplOption { + return func(b *benchmarkJobTpl) error { + if len(cm) > 0 { + b.configMapName = cm } - b.imagePullPolicy = p return nil } } diff --git a/internal/net/grpc/interceptor/client/metric/metric.go b/internal/net/grpc/interceptor/client/metric/metric.go new file mode 100644 index 0000000000..7bcec5833f --- /dev/null +++ b/internal/net/grpc/interceptor/client/metric/metric.go @@ -0,0 +1,93 @@ +// +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package metric provides gRPC client interceptors for client metric +package metric + +import ( + "context" + "time" + + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/net/grpc/codes" + "github.com/vdaas/vald/internal/net/grpc/status" + "github.com/vdaas/vald/internal/observability/attribute" + "github.com/vdaas/vald/internal/observability/metrics" + "google.golang.org/grpc" +) + +const ( + latencyMetricsName = "client_latency" + completedRPCsMetricsName = "client_completed_rpcs" + + gRPCMethodKeyName = "grpc_client_method" + gRPCStatus = "grpc_client_status" +) + +func ClientMetricInterceptors() (grpc.UnaryClientInterceptor, grpc.StreamClientInterceptor, error) { + meter := metrics.GetMeter() + + latencyHistgram, err := meter.Float64Histogram( + latencyMetricsName, + metrics.WithDescription("Client latency in milliseconds, by method"), + metrics.WithUnit(metrics.Milliseconds), + ) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to create latency metric") + } + + completedRPCCnt, err := meter.Int64Counter( + completedRPCsMetricsName, + metrics.WithDescription("Count of RPCs by method and status"), + metrics.WithUnit(metrics.Milliseconds), + ) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to create completedRPCs metric") + } + + record := func(ctx context.Context, method string, err error, latency float64) { + attrs := attributesFromError(method, err) + latencyHistgram.Record(ctx, latency, metrics.WithAttributes(attrs...)) + completedRPCCnt.Add(ctx, 1, metrics.WithAttributes(attrs...)) + } + return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + now := time.Now() + err := invoker(ctx, method, req, reply, cc, opts...) + elapsedTime := time.Since(now) + record(ctx, method, err, float64(elapsedTime)/float64(time.Millisecond)) + return err + }, func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + now := time.Now() + _, err := streamer(ctx, desc, cc, method, opts...) + elapsedTime := time.Since(now) + record(ctx, method, err, float64(elapsedTime)/float64(time.Millisecond)) + return nil, nil + }, nil +} + +func attributesFromError(method string, err error) []attribute.KeyValue { + code := codes.OK // default error is success when error is nil + if err != nil { + st, _ := status.FromError(err) + if st != nil { + code = st.Code() + } + } + return []attribute.KeyValue{ + attribute.String(gRPCMethodKeyName, method), + attribute.String(gRPCStatus, code.String()), + } +} diff --git a/internal/net/grpc/option.go b/internal/net/grpc/option.go index 3d896f48d8..4e5f3533d0 100644 --- a/internal/net/grpc/option.go +++ b/internal/net/grpc/option.go @@ -24,8 +24,10 @@ import ( "github.com/vdaas/vald/internal/backoff" "github.com/vdaas/vald/internal/circuitbreaker" + "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/net" + "github.com/vdaas/vald/internal/net/grpc/interceptor/client/metric" "github.com/vdaas/vald/internal/net/grpc/interceptor/client/trace" "github.com/vdaas/vald/internal/strings" "github.com/vdaas/vald/internal/sync/errgroup" @@ -353,6 +355,16 @@ func WithClientInterceptors(names ...string) Option { grpc.WithUnaryInterceptor(trace.UnaryClientInterceptor()), grpc.WithStreamInterceptor(trace.StreamClientInterceptor()), ) + case "metricinterceptor", "metric": + uci, sci, err := metric.ClientMetricInterceptors() + if err != nil { + lerr := errors.NewErrCriticalOption("gRPCInterceptors", "metric", errors.Wrap(err, "failed to create interceptor")) + log.Warn(lerr.Error()) + } + g.dopts = append(g.dopts, + grpc.WithUnaryInterceptor(uci), + grpc.WithStreamInterceptor(sci), + ) default: } } diff --git a/k8s/metrics/grafana/dashboards/10-vald-benchmark-operator.yaml b/k8s/metrics/grafana/dashboards/10-vald-benchmark-operator.yaml index 9ee9d26a62..e14dd51bbe 100644 --- a/k8s/metrics/grafana/dashboards/10-vald-benchmark-operator.yaml +++ b/k8s/metrics/grafana/dashboards/10-vald-benchmark-operator.yaml @@ -864,7 +864,7 @@ data: "calcs": [ "lastNotNull" ], - "fields": "/^image$/", + "fields": "repository", "values": false }, "showPercentChange": false, @@ -880,7 +880,7 @@ data: "uid": "prometheus" }, "editorMode": "code", - "expr": "label_replace(benchmark_operator_info{exported_kubernetes_namespace=\"$Namespace\", kubernetes_name=~\"$ReplicaSet\", target_pod=~\"$PodName\"}, \"image\", \"$1\", \"image\", \"(.*):.*\")", + "expr": "benchmark_operator_info{exported_kubernetes_namespace=\"$Namespace\", kubernetes_name=~\"$ReplicaSet\", target_pod=~\"$PodName\"}", "format": "table", "instant": true, "interval": "", @@ -944,7 +944,7 @@ data: "calcs": [ "lastNotNull" ], - "fields": "/^image$/", + "fields": "tag", "values": false }, "showPercentChange": false, @@ -960,7 +960,7 @@ data: "uid": "prometheus" }, "editorMode": "code", - "expr": "label_replace(benchmark_operator_info{exported_kubernetes_namespace=\"$Namespace\", kubernetes_name=~\"$ReplicaSet\", target_pod=~\"$PodName\"}, \"image\", \"$1\", \"image\", \".*:(.*)\")", + "expr": "benchmark_operator_info{exported_kubernetes_namespace=\"$Namespace\", kubernetes_name=~\"$ReplicaSet\", target_pod=~\"$PodName\"}", "format": "table", "instant": true, "interval": "", diff --git a/k8s/tools/benchmark/operator/configmap.yaml b/k8s/tools/benchmark/operator/configmap.yaml index 499c6053a8..6ee6d73dc0 100644 --- a/k8s/tools/benchmark/operator/configmap.yaml +++ b/k8s/tools/benchmark/operator/configmap.yaml @@ -137,6 +137,86 @@ data: trace: enabled: false sampling_rate: 1 - job_image: - image: "vdaas/vald-benchmark-job:v1.7.12" - pullPolicy: Always + job: + client_config: + addrs: [] + backoff: + backoff_factor: 1.1 + backoff_time_limit: 5s + enable_error_log: true + initial_duration: 5ms + jitter_limit: 100ms + maximum_duration: 5s + retry_count: 100 + call_option: + max_recv_msg_size: 0 + max_retry_rpc_buffer_size: 0 + max_send_msg_size: 0 + wait_for_ready: true + circuit_breaker: + closed_error_rate: 0.7 + closed_refresh_timeout: 10s + half_open_error_rate: 0.5 + min_samples: 1000 + open_timeout: 1s + connection_pool: + enable_dns_resolver: true + enable_rebalance: true + old_conn_close_duration: 2m + rebalance_duration: 30m + size: 3 + dial_option: + backoff_base_delay: 1s + backoff_jitter: 0.2 + backoff_max_delay: 120s + backoff_multiplier: 1.6 + enable_backoff: false + initial_connection_window_size: 0 + initial_window_size: 0 + insecure: true + interceptors: [] + keepalive: + permit_without_stream: true + time: 120s + timeout: 30s + max_msg_size: 0 + min_connection_timeout: 20s + net: + dialer: + dual_stack_enabled: true + keepalive: "" + timeout: "" + dns: + cache_enabled: true + cache_expiration: 1h + refresh_duration: 30m + socket_option: + ip_recover_destination_addr: false + ip_transparent: false + reuse_addr: true + reuse_port: true + tcp_cork: false + tcp_defer_accept: true + tcp_fast_open: true + tcp_no_delay: true + tcp_quick_ack: true + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + insecure_skip_verify: false + key: /path/to/key + read_buffer_size: 0 + timeout: "" + write_buffer_size: 0 + health_check_duration: 1s + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + insecure_skip_verify: false + key: /path/to/key + image: + pullPolicy: Always + repository: vdaas/vald-benchmark-job + tag: v1.7.12 diff --git a/k8s/tools/benchmark/operator/crds/valdbenchmarkoperatorrelease.yaml b/k8s/tools/benchmark/operator/crds/valdbenchmarkoperatorrelease.yaml index 92458c639c..16a5c912b0 100644 --- a/k8s/tools/benchmark/operator/crds/valdbenchmarkoperatorrelease.yaml +++ b/k8s/tools/benchmark/operator/crds/valdbenchmarkoperatorrelease.yaml @@ -86,19 +86,197 @@ spec: type: string tag: type: string - job_image: + job: type: object properties: - pullPolicy: - type: string - enum: - - Always - - Never - - IfNotPresent - repository: - type: string - tag: - type: string + client_config: + type: object + properties: + addrs: + type: array + items: + type: string + backoff: + type: object + properties: + backoff_factor: + type: number + backoff_time_limit: + type: string + enable_error_log: + type: boolean + initial_duration: + type: string + jitter_limit: + type: string + maximum_duration: + type: string + retry_count: + type: integer + call_option: + type: object + x-kubernetes-preserve-unknown-fields: true + circuit_breaker: + type: object + properties: + closed_error_rate: + type: number + closed_refresh_timeout: + type: string + half_open_error_rate: + type: number + min_samples: + type: integer + open_timeout: + type: string + connection_pool: + type: object + properties: + enable_dns_resolver: + type: boolean + enable_rebalance: + type: boolean + old_conn_close_duration: + type: string + rebalance_duration: + type: string + size: + type: integer + dial_option: + type: object + properties: + backoff_base_delay: + type: string + backoff_jitter: + type: number + backoff_max_delay: + type: string + backoff_multiplier: + type: number + enable_backoff: + type: boolean + initial_connection_window_size: + type: integer + initial_window_size: + type: integer + insecure: + type: boolean + interceptors: + type: array + items: + type: string + enum: + - TraceInterceptor + keepalive: + type: object + properties: + permit_without_stream: + type: boolean + time: + type: string + timeout: + type: string + max_msg_size: + type: integer + min_connection_timeout: + type: string + net: + type: object + properties: + dialer: + type: object + properties: + dual_stack_enabled: + type: boolean + keepalive: + type: string + timeout: + type: string + dns: + type: object + properties: + cache_enabled: + type: boolean + cache_expiration: + type: string + refresh_duration: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + read_buffer_size: + type: integer + timeout: + type: string + write_buffer_size: + type: integer + health_check_duration: + type: string + max_recv_msg_size: + type: integer + max_retry_rpc_buffer_size: + type: integer + max_send_msg_size: + type: integer + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + wait_for_ready: + type: boolean + image: + type: object + properties: + pullPolicy: + type: string + enum: + - Always + - Never + - IfNotPresent + repository: + type: string + tag: + type: string logging: type: object properties: diff --git a/pkg/tools/benchmark/job/config/config.go b/pkg/tools/benchmark/job/config/config.go index adec58d118..a81c68d1c9 100644 --- a/pkg/tools/benchmark/job/config/config.go +++ b/pkg/tools/benchmark/job/config/config.go @@ -54,6 +54,7 @@ var ( NAME = os.Getenv("CRD_NAME") JOBNAME_ANNOTATION = "before-job-name" JOBNAMESPACE_ANNOTATION = "before-job-namespace" + SERVICE_NAME = "vald-benchmark-job" ) // NewConfig represents the set config from the given setting file path. @@ -74,6 +75,7 @@ func NewConfig(ctx context.Context, path string) (cfg *Config, err error) { if cfg.Observability != nil { cfg.Observability = cfg.Observability.Bind() + cfg.Observability.OTLP.Attribute.ServiceName = SERVICE_NAME } else { cfg.Observability = new(config.Observability) } diff --git a/pkg/tools/benchmark/job/usecase/benchmarkd.go b/pkg/tools/benchmark/job/usecase/benchmarkd.go index 1036125b2e..b78a353a18 100644 --- a/pkg/tools/benchmark/job/usecase/benchmarkd.go +++ b/pkg/tools/benchmark/job/usecase/benchmarkd.go @@ -64,12 +64,34 @@ func New(cfg *config.Config) (r runner.Runner, err error) { } } + // bind metrics interceptor + var clientInterceptors []string + var obs observability.Observability + if cfg.Observability.Enabled { + obs, err = observability.NewWithConfig( + cfg.Observability, + infometrics.New("vald_benchmark_job_info", "Benchmark Job info", *cfg.Job), + ) + if err != nil { + return nil, err + } + // Add interceptors regardless of whether it is set in config. + // Because it is the benchmark job and requires metrics for measure benchmark result. + clientInterceptors = append(clientInterceptors, "metric") + if cfg.Observability.Trace.Enabled { + clientInterceptors = append(clientInterceptors, "trace") + } + } + copts, err := cfg.Job.ClientConfig.Opts() if err != nil { return nil, err } if cfg.Job.ClientConfig.DialOption == nil { - copts = append(copts, grpc.WithInsecure(true)) + copts = append(copts, + grpc.WithInsecure(true), + grpc.WithClientInterceptors(clientInterceptors...), + ) } gcli := grpc.New(copts...) vcli, err := vald.New( @@ -131,17 +153,6 @@ func New(cfg *config.Config) (r runner.Runner, err error) { }), } - var obs observability.Observability - if cfg.Observability.Enabled { - obs, err = observability.NewWithConfig( - cfg.Observability, - infometrics.New("vald_benchmark_job_info", "Benchmark Job info", *cfg.Job), - ) - if err != nil { - return nil, err - } - } - srv, err := starter.New( starter.WithConfig(cfg.Server), starter.WithREST(func(sc *iconf.Server) []server.Option { @@ -197,6 +208,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { if r.observability != nil { oech = r.observability.Start(ctx) } + dech, err = r.job.Start(ctx) if err != nil { ech <- err diff --git a/pkg/tools/benchmark/operator/config/config.go b/pkg/tools/benchmark/operator/config/config.go index f2c8f76b62..29c2ad522e 100644 --- a/pkg/tools/benchmark/operator/config/config.go +++ b/pkg/tools/benchmark/operator/config/config.go @@ -35,8 +35,8 @@ type Config struct { // Observability represent observability configurations Observability *config.Observability `json:"observability" yaml:"observability"` - // JobImage represents the location of Docker image for benchmark job and its ImagePullPolicy - JobImage *config.BenchmarkJobImageInfo `json:"job_image" yaml:"job_image"` + // Job represents the default benchmark job configuration and job images + Job *config.OperatorJobConfig `json:"job" yaml:"job"` } // NewConfig represents the set config from the given setting file path. @@ -58,10 +58,10 @@ func NewConfig(path string) (cfg *Config, err error) { cfg.Observability = cfg.Observability.Bind() } - if cfg.JobImage != nil { - cfg.JobImage = cfg.JobImage.Bind() + if cfg.Job != nil { + cfg.Job.Image = cfg.Job.Image.Bind() } else { - cfg.JobImage = new(config.BenchmarkJobImageInfo) + cfg.Job = new(config.OperatorJobConfig) } return cfg, nil } diff --git a/pkg/tools/benchmark/operator/service/operator.go b/pkg/tools/benchmark/operator/service/operator.go index f03c48b866..438967f1fa 100644 --- a/pkg/tools/benchmark/operator/service/operator.go +++ b/pkg/tools/benchmark/operator/service/operator.go @@ -31,6 +31,7 @@ import ( benchjob "github.com/vdaas/vald/internal/k8s/vald/benchmark/job" benchscenario "github.com/vdaas/vald/internal/k8s/vald/benchmark/scenario" "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/strings" "github.com/vdaas/vald/internal/sync/errgroup" ) @@ -50,6 +51,8 @@ type scenario struct { const ( Scenario = "scenario" ScenarioKind = "ValdBenchmarkScenario" + Name = "name" + ServerName = "vald-benchmark-job" BenchmarkName = "benchmark-name" BeforeJobName = "before-job-name" BeforeJobNamespace = "before-job-namespace" @@ -57,8 +60,10 @@ const ( type operator struct { jobNamespace string - jobImage string + jobImageRepository string + jobImageTag string jobImagePullPolicy string + configMapName string scenarios *atomic.Pointer[map[string]*scenario] benchjobs *atomic.Pointer[map[string]*v1.ValdBenchmarkJob] jobs *atomic.Pointer[map[string]string] @@ -455,7 +460,6 @@ func (o *operator) createBenchmarkJob(ctx context.Context, scenario v1.ValdBench } // set status bj.Status = v1.BenchmarkJobNotReady - // TODO: set metrics // create benchmark job resource c := o.ctrl.GetManager().GetClient() if err := c.Create(ctx, bj); err != nil { @@ -470,12 +474,14 @@ func (o *operator) createBenchmarkJob(ctx context.Context, scenario v1.ValdBench // createJob creates benchmark job from benchmark job resource. func (o *operator) createJob(ctx context.Context, bjr v1.ValdBenchmarkJob) error { label := map[string]string{ + Name: ServerName, BenchmarkName: bjr.GetName() + strconv.Itoa(int(bjr.GetGeneration())), } job, err := benchjob.NewBenchmarkJob( benchjob.WithContainerName(bjr.GetName()), - benchjob.WithContainerImage(o.jobImage), + benchjob.WithContainerImage(strings.Join([]string{o.jobImageRepository, o.jobImageTag}, ":")), benchjob.WithImagePullPolicy(benchjob.ImagePullPolicy(o.jobImagePullPolicy)), + benchjob.WithOperatorConfigMap(o.configMapName), ) if err != nil { return err diff --git a/pkg/tools/benchmark/operator/service/operator_test.go b/pkg/tools/benchmark/operator/service/operator_test.go index da896c6b3e..0510f078a2 100644 --- a/pkg/tools/benchmark/operator/service/operator_test.go +++ b/pkg/tools/benchmark/operator/service/operator_test.go @@ -520,7 +520,8 @@ func Test_operator_jobReconcile(t *testing.T) { } type fields struct { jobNamespace string - jobImage string + jobImageRepository string + jobImageTag string jobImagePullPolicy string scenarios *atomic.Pointer[map[string]*scenario] benchjobs *atomic.Pointer[map[string]*v1.ValdBenchmarkJob] @@ -556,7 +557,7 @@ func Test_operator_jobReconcile(t *testing.T) { }, fields: fields{ jobNamespace: "default", - jobImage: "vdaas/vald-benchmark-job", + jobImageRepository: "vdaas/vald-benchmark-job", jobImagePullPolicy: "Always", scenarios: &atomic.Pointer[map[string]*scenario]{}, benchjobs: &atomic.Pointer[map[string]*v1.ValdBenchmarkJob]{}, @@ -597,7 +598,7 @@ func Test_operator_jobReconcile(t *testing.T) { }, fields: fields{ jobNamespace: "default", - jobImage: "vdaas/vald-benchmark-job", + jobImageRepository: "vdaas/vald-benchmark-job", jobImagePullPolicy: "Always", scenarios: &atomic.Pointer[map[string]*scenario]{}, jobs: func() *atomic.Pointer[map[string]string] { @@ -694,7 +695,7 @@ func Test_operator_jobReconcile(t *testing.T) { }, fields: fields{ jobNamespace: "default", - jobImage: "vdaas/vald-benchmark-job", + jobImageRepository: "vdaas/vald-benchmark-job", jobImagePullPolicy: "Always", scenarios: &atomic.Pointer[map[string]*scenario]{}, jobs: func() *atomic.Pointer[map[string]string] { @@ -784,7 +785,7 @@ func Test_operator_jobReconcile(t *testing.T) { }, fields: fields{ jobNamespace: "default", - jobImage: "vdaas/vald-benchmark-job", + jobImageRepository: "vdaas/vald-benchmark-job", jobImagePullPolicy: "Always", scenarios: &atomic.Pointer[map[string]*scenario]{}, benchjobs: &atomic.Pointer[map[string]*v1.ValdBenchmarkJob]{}, @@ -823,7 +824,8 @@ func Test_operator_jobReconcile(t *testing.T) { } o := &operator{ jobNamespace: test.fields.jobNamespace, - jobImage: test.fields.jobImage, + jobImageRepository: test.fields.jobImageRepository, + jobImageTag: test.fields.jobImageTag, jobImagePullPolicy: test.fields.jobImagePullPolicy, benchjobs: test.fields.benchjobs, jobs: test.fields.jobs, @@ -847,7 +849,8 @@ func Test_operator_benchJobReconcile(t *testing.T) { } type fields struct { jobNamespace string - jobImage string + jobImageRepository string + jobImageTag string jobImagePullPolicy string scenarios *atomic.Pointer[map[string]*scenario] benchjobs *atomic.Pointer[map[string]*v1.ValdBenchmarkJob] @@ -887,7 +890,7 @@ func Test_operator_benchJobReconcile(t *testing.T) { }, fields: fields{ jobNamespace: "default", - jobImage: "vdaas/vald-benchmark-job", + jobImageRepository: "vdaas/vald-benchmark-job", jobImagePullPolicy: "Always", scenarios: &atomic.Pointer[map[string]*scenario]{}, benchjobs: &atomic.Pointer[map[string]*v1.ValdBenchmarkJob]{}, @@ -951,7 +954,7 @@ func Test_operator_benchJobReconcile(t *testing.T) { }, fields: fields{ jobNamespace: "default", - jobImage: "vdaas/vald-benchmark-job", + jobImageRepository: "vdaas/vald-benchmark-job", jobImagePullPolicy: "Always", scenarios: func() *atomic.Pointer[map[string]*scenario] { ap := atomic.Pointer[map[string]*scenario]{} @@ -1142,7 +1145,7 @@ func Test_operator_benchJobReconcile(t *testing.T) { }, fields: fields{ jobNamespace: "default", - jobImage: "vdaas/vald-benchmark-job", + jobImageRepository: "vdaas/vald-benchmark-job", jobImagePullPolicy: "Always", scenarios: func() *atomic.Pointer[map[string]*scenario] { ap := atomic.Pointer[map[string]*scenario]{} @@ -1376,7 +1379,7 @@ func Test_operator_benchJobReconcile(t *testing.T) { }, fields: fields{ jobNamespace: "default", - jobImage: "vdaas/vald-benchmark-job", + jobImageRepository: "vdaas/vald-benchmark-job", jobImagePullPolicy: "Always", scenarios: func() *atomic.Pointer[map[string]*scenario] { ap := atomic.Pointer[map[string]*scenario]{} @@ -1610,7 +1613,7 @@ func Test_operator_benchJobReconcile(t *testing.T) { }, fields: fields{ jobNamespace: "default", - jobImage: "vdaas/vald-benchmark-job", + jobImageRepository: "vdaas/vald-benchmark-job", jobImagePullPolicy: "Always", scenarios: func() *atomic.Pointer[map[string]*scenario] { ap := atomic.Pointer[map[string]*scenario]{} @@ -1852,7 +1855,8 @@ func Test_operator_benchJobReconcile(t *testing.T) { } o := &operator{ jobNamespace: test.fields.jobNamespace, - jobImage: test.fields.jobImage, + jobImageRepository: test.fields.jobImageRepository, + jobImageTag: test.fields.jobImageTag, jobImagePullPolicy: test.fields.jobImagePullPolicy, scenarios: test.fields.scenarios, benchjobs: test.fields.benchjobs, @@ -1878,7 +1882,8 @@ func Test_operator_benchScenarioReconcile(t *testing.T) { } type fields struct { jobNamespace string - jobImage string + jobImageRepository string + jobImageTag string jobImagePullPolicy string scenarios *atomic.Pointer[map[string]*scenario] benchjobs *atomic.Pointer[map[string]*v1.ValdBenchmarkJob] @@ -1935,7 +1940,7 @@ func Test_operator_benchScenarioReconcile(t *testing.T) { }, fields: fields{ jobNamespace: "default", - jobImage: "vdaas/vald-benchmark-job", + jobImageRepository: "vdaas/vald-benchmark-job", jobImagePullPolicy: "Always", scenarios: &atomic.Pointer[map[string]*scenario]{}, benchjobs: &atomic.Pointer[map[string]*v1.ValdBenchmarkJob]{}, @@ -2010,7 +2015,7 @@ func Test_operator_benchScenarioReconcile(t *testing.T) { }, fields: fields{ jobNamespace: "default", - jobImage: "vdaas/vald-benchmark-job", + jobImageRepository: "vdaas/vald-benchmark-job", jobImagePullPolicy: "Always", scenarios: &atomic.Pointer[map[string]*scenario]{}, benchjobs: &atomic.Pointer[map[string]*v1.ValdBenchmarkJob]{}, @@ -2146,7 +2151,7 @@ func Test_operator_benchScenarioReconcile(t *testing.T) { }, fields: fields{ jobNamespace: "default", - jobImage: "vdaas/vald-benchmark-job", + jobImageRepository: "vdaas/vald-benchmark-job", jobImagePullPolicy: "Always", scenarios: func() *atomic.Pointer[map[string]*scenario] { ap := atomic.Pointer[map[string]*scenario]{} @@ -2338,7 +2343,7 @@ func Test_operator_benchScenarioReconcile(t *testing.T) { }, fields: fields{ jobNamespace: "default", - jobImage: "vdaas/vald-benchmark-job", + jobImageRepository: "vdaas/vald-benchmark-job", jobImagePullPolicy: "Always", scenarios: func() *atomic.Pointer[map[string]*scenario] { ap := atomic.Pointer[map[string]*scenario]{} @@ -2530,7 +2535,7 @@ func Test_operator_benchScenarioReconcile(t *testing.T) { }, fields: fields{ jobNamespace: "default", - jobImage: "vdaas/vald-benchmark-job", + jobImageRepository: "vdaas/vald-benchmark-job", jobImagePullPolicy: "Always", scenarios: func() *atomic.Pointer[map[string]*scenario] { ap := atomic.Pointer[map[string]*scenario]{} @@ -2686,7 +2691,8 @@ func Test_operator_benchScenarioReconcile(t *testing.T) { } o := &operator{ jobNamespace: test.fields.jobNamespace, - jobImage: test.fields.jobImage, + jobImageRepository: test.fields.jobImageRepository, + jobImageTag: test.fields.jobImageTag, jobImagePullPolicy: test.fields.jobImagePullPolicy, scenarios: test.fields.scenarios, benchjobs: test.fields.benchjobs, @@ -3150,7 +3156,7 @@ func Test_operator_checkAtomics(t *testing.T) { // // want: func() Operator { // // o := &operator{ // // jobNamespace: "default", -// // jobImage: "vdaas/vald-benchmark-job", +// // jobImageRepository: "vdaas/vald-benchmark-job", // // jobImagePullPolicy: "Always", // // rcd: 10 * time.Second, // // } @@ -3255,7 +3261,7 @@ func Test_operator_checkAtomics(t *testing.T) { // }, // fields: fields { // jobNamespace:"", -// jobImage:"", +// jobImageRepository:"", // jobImagePullPolicy:"", // scenarios:nil, // benchjobs:nil, @@ -3285,7 +3291,7 @@ func Test_operator_checkAtomics(t *testing.T) { // }, // fields: fields { // jobNamespace:"", -// jobImage:"", +// jobImageRepository:"", // jobImagePullPolicy:"", // scenarios:nil, // benchjobs:nil, @@ -3324,7 +3330,7 @@ func Test_operator_checkAtomics(t *testing.T) { // } // o := &operator{ // jobNamespace: test.fields.jobNamespace, -// jobImage: test.fields.jobImage, +// jobImageRepository: test.fields.jobImage, // jobImagePullPolicy: test.fields.jobImagePullPolicy, // scenarios: test.fields.scenarios, // benchjobs: test.fields.benchjobs, @@ -3390,7 +3396,7 @@ func Test_operator_checkAtomics(t *testing.T) { // }, // fields: fields { // jobNamespace:"", -// jobImage:"", +// jobImageRepository:"", // jobImagePullPolicy:"", // scenarios:nil, // benchjobs:nil, @@ -3420,7 +3426,7 @@ func Test_operator_checkAtomics(t *testing.T) { // }, // fields: fields { // jobNamespace:"", -// jobImage:"", +// jobImageRepository:"", // jobImagePullPolicy:"", // scenarios:nil, // benchjobs:nil, @@ -3459,7 +3465,7 @@ func Test_operator_checkAtomics(t *testing.T) { // } // o := &operator{ // jobNamespace: test.fields.jobNamespace, -// jobImage: test.fields.jobImage, +// jobImageRepository: test.fields.jobImage, // jobImagePullPolicy: test.fields.jobImagePullPolicy, // scenarios: test.fields.scenarios, // benchjobs: test.fields.benchjobs, @@ -3513,7 +3519,7 @@ func Test_operator_checkAtomics(t *testing.T) { // // name: "test_case_1", // // fields: fields{ // // jobNamespace: "default", -// // jobImage: "vdaas/vald-benchmark-job", +// // jobImageRepository: "vdaas/vald-benchmark-job", // // jobImagePullPolicy: "Always", // // // scenarios:nil, // // // benchjobs:nil, @@ -3541,7 +3547,7 @@ func Test_operator_checkAtomics(t *testing.T) { // name: "test_case_2", // fields: fields { // jobNamespace:"", -// jobImage:"", +// jobImageRepository:"", // jobImagePullPolicy:"", // scenarios:nil, // benchjobs:nil, @@ -3580,7 +3586,7 @@ func Test_operator_checkAtomics(t *testing.T) { // } // o := &operator{ // jobNamespace: test.fields.jobNamespace, -// jobImage: test.fields.jobImage, +// jobImageRepository: test.fields.jobImage, // jobImagePullPolicy: test.fields.jobImagePullPolicy, // scenarios: test.fields.scenarios, // benchjobs: test.fields.benchjobs, @@ -3645,7 +3651,7 @@ func Test_operator_checkAtomics(t *testing.T) { // }, // fields: fields { // jobNamespace:"", -// jobImage:"", +// jobImageRepository:"", // jobImagePullPolicy:"", // scenarios:nil, // benchjobs:nil, @@ -3677,7 +3683,7 @@ func Test_operator_checkAtomics(t *testing.T) { // }, // fields: fields { // jobNamespace:"", -// jobImage:"", +// jobImageRepository:"", // jobImagePullPolicy:"", // scenarios:nil, // benchjobs:nil, @@ -3716,7 +3722,7 @@ func Test_operator_checkAtomics(t *testing.T) { // } // o := &operator{ // jobNamespace: test.fields.jobNamespace, -// jobImage: test.fields.jobImage, +// jobImageRepository: test.fields.jobImage, // jobImagePullPolicy: test.fields.jobImagePullPolicy, // scenarios: test.fields.scenarios, // benchjobs: test.fields.benchjobs, @@ -3781,7 +3787,7 @@ func Test_operator_checkAtomics(t *testing.T) { // }, // fields: fields { // jobNamespace:"", -// jobImage:"", +// jobImageRepository:"", // jobImagePullPolicy:"", // scenarios:nil, // benchjobs:nil, @@ -3813,7 +3819,7 @@ func Test_operator_checkAtomics(t *testing.T) { // }, // fields: fields { // jobNamespace:"", -// jobImage:"", +// jobImageRepository:"", // jobImagePullPolicy:"", // scenarios:nil, // benchjobs:nil, @@ -3852,7 +3858,7 @@ func Test_operator_checkAtomics(t *testing.T) { // } // o := &operator{ // jobNamespace: test.fields.jobNamespace, -// jobImage: test.fields.jobImage, +// jobImageRepository: test.fields.jobImage, // jobImagePullPolicy: test.fields.jobImagePullPolicy, // scenarios: test.fields.scenarios, // benchjobs: test.fields.benchjobs, @@ -3919,7 +3925,7 @@ func Test_operator_checkAtomics(t *testing.T) { // }, // fields: fields { // jobNamespace:"", -// jobImage:"", +// jobImageRepository:"", // jobImagePullPolicy:"", // scenarios:nil, // benchjobs:nil, @@ -3950,7 +3956,7 @@ func Test_operator_checkAtomics(t *testing.T) { // }, // fields: fields { // jobNamespace:"", -// jobImage:"", +// jobImageRepository:"", // jobImagePullPolicy:"", // scenarios:nil, // benchjobs:nil, @@ -3989,7 +3995,7 @@ func Test_operator_checkAtomics(t *testing.T) { // } // o := &operator{ // jobNamespace: test.fields.jobNamespace, -// jobImage: test.fields.jobImage, +// jobImageRepository: test.fields.jobImage, // jobImagePullPolicy: test.fields.jobImagePullPolicy, // scenarios: test.fields.scenarios, // benchjobs: test.fields.benchjobs, @@ -4052,7 +4058,7 @@ func Test_operator_checkAtomics(t *testing.T) { // }, // fields: fields { // jobNamespace:"", -// jobImage:"", +// jobImageRepository:"", // jobImagePullPolicy:"", // scenarios:nil, // benchjobs:nil, @@ -4083,7 +4089,7 @@ func Test_operator_checkAtomics(t *testing.T) { // }, // fields: fields { // jobNamespace:"", -// jobImage:"", +// jobImageRepository:"", // jobImagePullPolicy:"", // scenarios:nil, // benchjobs:nil, @@ -4122,7 +4128,7 @@ func Test_operator_checkAtomics(t *testing.T) { // } // o := &operator{ // jobNamespace: test.fields.jobNamespace, -// jobImage: test.fields.jobImage, +// jobImageRepository: test.fields.jobImage, // jobImagePullPolicy: test.fields.jobImagePullPolicy, // scenarios: test.fields.scenarios, // benchjobs: test.fields.benchjobs, @@ -4189,7 +4195,7 @@ func Test_operator_checkAtomics(t *testing.T) { // }, // fields: fields { // jobNamespace:"", -// jobImage:"", +// jobImageRepository:"", // jobImagePullPolicy:"", // scenarios:nil, // benchjobs:nil, @@ -4220,7 +4226,7 @@ func Test_operator_checkAtomics(t *testing.T) { // }, // fields: fields { // jobNamespace:"", -// jobImage:"", +// jobImageRepository:"", // jobImagePullPolicy:"", // scenarios:nil, // benchjobs:nil, @@ -4259,7 +4265,7 @@ func Test_operator_checkAtomics(t *testing.T) { // } // o := &operator{ // jobNamespace: test.fields.jobNamespace, -// jobImage: test.fields.jobImage, +// jobImageRepository: test.fields.jobImage, // jobImagePullPolicy: test.fields.jobImagePullPolicy, // scenarios: test.fields.scenarios, // benchjobs: test.fields.benchjobs, @@ -4326,7 +4332,7 @@ func Test_operator_checkAtomics(t *testing.T) { // }, // fields: fields { // jobNamespace:"", -// jobImage:"", +// jobImageRepository:"", // jobImagePullPolicy:"", // scenarios:nil, // benchjobs:nil, @@ -4357,7 +4363,7 @@ func Test_operator_checkAtomics(t *testing.T) { // }, // fields: fields { // jobNamespace:"", -// jobImage:"", +// jobImageRepository:"", // jobImagePullPolicy:"", // scenarios:nil, // benchjobs:nil, @@ -4396,7 +4402,7 @@ func Test_operator_checkAtomics(t *testing.T) { // } // o := &operator{ // jobNamespace: test.fields.jobNamespace, -// jobImage: test.fields.jobImage, +// jobImageRepository: test.fields.jobImage, // jobImagePullPolicy: test.fields.jobImagePullPolicy, // scenarios: test.fields.scenarios, // benchjobs: test.fields.benchjobs, @@ -4458,7 +4464,7 @@ func Test_operator_checkAtomics(t *testing.T) { // // }, // // fields: fields{ // // jobNamespace: "", -// // jobImage: "", +// // jobImageRepository: "", // // jobImagePullPolicy: "", // // scenarios: nil, // // benchjobs: nil, @@ -4496,7 +4502,7 @@ func Test_operator_checkAtomics(t *testing.T) { // } // o := &operator{ // jobNamespace: test.fields.jobNamespace, -// jobImage: test.fields.jobImage, +// jobImageRepository: test.fields.jobImage, // jobImagePullPolicy: test.fields.jobImagePullPolicy, // scenarios: test.fields.scenarios, // benchjobs: test.fields.benchjobs, diff --git a/pkg/tools/benchmark/operator/service/option.go b/pkg/tools/benchmark/operator/service/option.go index 11cb1a7a79..f464591761 100644 --- a/pkg/tools/benchmark/operator/service/option.go +++ b/pkg/tools/benchmark/operator/service/option.go @@ -28,10 +28,12 @@ import ( type Option func(o *operator) error var defaultOpts = []Option{ - WithJobImage("vdaas/vald-benchmark-job"), + WithJobImageRepository("vdaas/vald-benchmark-job"), + WithJobImageTag("latest"), WithJobImagePullPolicy("Always"), WithReconcileCheckDuration("10s"), WithJobNamespace("default"), + WithConfigMapName("vald-benchmark-operator-config"), } // WithErrGroup sets the error group to scenario. @@ -60,20 +62,28 @@ func WithReconcileCheckDuration(ts string) Option { // WithJobNamespace sets the namespace for running benchmark job. func WithJobNamespace(ns string) Option { return func(o *operator) error { - if ns == "" { - o.jobNamespace = "default" - } else { + if ns != "" { o.jobNamespace = ns } return nil } } -// WithJobImage sets the benchmark job docker image info. -func WithJobImage(image string) Option { +// WithJobImageRepository sets the benchmark job docker image info. +func WithJobImageRepository(repo string) Option { return func(o *operator) error { - if image != "" { - o.jobImage = image + if repo != "" { + o.jobImageRepository = repo + } + return nil + } +} + +// WithJobImageTag sets the benchmark job docker image tag. +func WithJobImageTag(tag string) Option { + return func(o *operator) error { + if tag != "" { + o.jobImageTag = tag } return nil } @@ -88,3 +98,12 @@ func WithJobImagePullPolicy(p string) Option { return nil } } + +func WithConfigMapName(cm string) Option { + return func(o *operator) error { + if cm != "" { + o.configMapName = cm + } + return nil + } +} diff --git a/pkg/tools/benchmark/operator/usecase/benchmarkd.go b/pkg/tools/benchmark/operator/usecase/benchmarkd.go index 5f4343f615..59bb0b077b 100644 --- a/pkg/tools/benchmark/operator/usecase/benchmarkd.go +++ b/pkg/tools/benchmark/operator/usecase/benchmarkd.go @@ -63,8 +63,9 @@ func New(cfg *config.Config) (r runner.Runner, err error) { operator, err := service.New( service.WithErrGroup(eg), service.WithJobNamespace(JOB_NAMESPACE), - service.WithJobImage(cfg.JobImage.Image), - service.WithJobImagePullPolicy(cfg.JobImage.PullPolicy), + service.WithJobImageRepository(cfg.Job.Image.Repository), + service.WithJobImageTag(cfg.Job.Image.Tag), + service.WithJobImagePullPolicy(cfg.Job.Image.PullPolicy), ) if err != nil { return nil, err @@ -98,7 +99,7 @@ func New(cfg *config.Config) (r runner.Runner, err error) { obs, err = observability.NewWithConfig( cfg.Observability, benchmarkmetrics.New(operator), - infometrics.New("benchmark_operator_info", "Benchmark Operator info", *cfg.JobImage), + infometrics.New("benchmark_operator_info", "Benchmark Operator info", *cfg.Job.Image), backoffmetrics.New(), ) if err != nil {