From 04fb4ccde9fc8ff0c43effe801cff392189e9c89 Mon Sep 17 00:00:00 2001 From: Martin Gencur Date: Wed, 4 Sep 2024 13:58:48 +0200 Subject: [PATCH] Fix perf.patch - some files were not included --- openshift/performance/patches/perf.patch | 535 +++++++++++++++++++++++ 1 file changed, 535 insertions(+) diff --git a/openshift/performance/patches/perf.patch b/openshift/performance/patches/perf.patch index 37d1264a78cc..7daa69ba3756 100644 --- a/openshift/performance/patches/perf.patch +++ b/openshift/performance/patches/perf.patch @@ -1230,3 +1230,538 @@ index 356049086..13457d914 100644 - name: JOB_NAME valueFrom: secretKeyRef: +diff --git a/test/performance/performance/es.go b/test/performance/performance/es.go +new file mode 100644 +index 000000000..6a7794e46 +--- /dev/null ++++ b/test/performance/performance/es.go +@@ -0,0 +1,109 @@ ++package performance ++ ++import ( ++ "log" ++ "os" ++ "strings" ++ "time" ++ ++ vegeta "github.com/tsenart/vegeta/v12/lib" ++ indexers2 "knative.dev/serving/test/performance/performance/indexers" ++) ++ ++const ( ++ ESServerURLSEnv = "ES_URL" ++ UseOpenSearcnEnv = "USE_OPEN_SEARCH" ++ UseESEnv = "USE_ES" ++) ++ ++// ESReporter wraps an ES based indexer ++type ESReporter struct { ++ access *indexers2.Indexer ++ tags map[string]string ++} ++ ++func sanitizeIndex(index string) string { ++ indexRaw := strings.ToLower(index) ++ return strings.Replace(indexRaw, " ", "-", -1) ++} ++ ++func splitServers(envURLS string) []string { ++ var addrs []string ++ list := strings.Split(envURLS, ",") ++ for _, u := range list { ++ addrs = append(addrs, strings.TrimSpace(u)) ++ } ++ return addrs ++} ++ ++func NewESReporter(tags map[string]string, indexerType indexers2.IndexerType, index string) (*ESReporter, error) { ++ var servers []string ++ ++ if v, b := os.LookupEnv(ESServerURLSEnv); b { ++ servers = splitServers(v) ++ } ++ indexer, err := indexers2.NewIndexer(indexers2.IndexerConfig{ ++ Type: indexerType, ++ Index: sanitizeIndex(index), ++ Servers: servers, ++ InsecureSkipVerify: true, ++ }) ++ if err != nil { ++ return nil, err ++ } ++ ++ buildID, found := os.LookupEnv(buildIDKey) ++ if found { ++ tags[buildIDKey] = buildID ++ } ++ jobName, found := os.LookupEnv(jobNameKey) ++ if found { ++ tags[jobNameKey] = jobName ++ } ++ ++ return &ESReporter{ ++ access: indexer, ++ tags: tags, ++ }, nil ++} ++ ++func (esr *ESReporter) AddDataPointsForMetrics(m *vegeta.Metrics, benchmarkName string) { ++ metrics := []map[string]interface{}{ ++ { ++ "requests": float64(m.Requests), ++ "rate": m.Rate, ++ "throughput": m.Throughput, ++ "duration": float64(m.Duration), ++ "latency-mean": float64(m.Latencies.Mean), ++ "latency-min": float64(m.Latencies.Min), ++ "latency-max": float64(m.Latencies.Max), ++ "latency-p95": float64(m.Latencies.P95), ++ "success": m.Success, ++ "errors": float64(len(m.Errors)), ++ "bytes-in": float64(m.BytesIn.Total), ++ "bytes-out": float64(m.BytesOut.Total), ++ }, ++ } ++ ++ for _, m := range metrics { ++ esr.AddDataPoint(benchmarkName, m) ++ } ++} ++ ++func (esr *ESReporter) AddDataPoint(measurement string, fields map[string]interface{}) { ++ p := fields ++ p["_measurement"] = measurement ++ p["tags"] = esr.tags ++ // Use the same format as in influxdb ++ p["@timestamp"] = time.Now().Format(time.RFC3339Nano) ++ docs := []interface{}{p} ++ msg, err := (*esr.access).Index(docs, indexers2.IndexingOpts{}) ++ if err != nil { ++ log.Printf("Indexing failed: %s", err.Error()) ++ } ++ log.Printf("%s\n", msg) ++} ++ ++func (esr *ESReporter) FlushAndShutdown() { ++ ++} +diff --git a/test/performance/performance/indexers/elastic.go b/test/performance/performance/indexers/elastic.go +new file mode 100644 +index 000000000..c7b48726e +--- /dev/null ++++ b/test/performance/performance/indexers/elastic.go +@@ -0,0 +1,132 @@ ++// Copyright 2023 The go-commons Authors. ++// ++// Licensed under the Apache License, Version 2.0 (the "License"); ++// you may not use this file except in compliance with the License. ++// You may obtain a copy of the License at ++// ++// http://www.apache.org/licenses/LICENSE-2.0 ++// ++// Unless required by applicable law or agreed to in writing, software ++// distributed under the License is distributed on an "AS IS" BASIS, ++// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++// See the License for the specific language governing permissions and ++// limitations under the License. ++ ++package indexers ++ ++import ( ++ "bytes" ++ "context" ++ "crypto/sha256" ++ "crypto/tls" ++ "encoding/hex" ++ "encoding/json" ++ "fmt" ++ "net/http" ++ "runtime" ++ "strings" ++ "sync" ++ "time" ++ ++ elasticsearch "github.com/elastic/go-elasticsearch/v7" ++ "github.com/elastic/go-elasticsearch/v7/esutil" ++) ++ ++const elastic = "elastic" ++ ++// Elastic ElasticSearch instance ++type Elastic struct { ++ index string ++} ++ ++// ESClient elasticsearch client instance ++var ESClient *elasticsearch.Client ++ ++// Init function ++func init() { ++ indexerMap[elastic] = &Elastic{} ++} ++ ++// Returns new indexer for elastic search ++func (esIndexer *Elastic) new(indexerConfig IndexerConfig) error { ++ var err error ++ if indexerConfig.Index == "" { ++ return fmt.Errorf("index name not specified") ++ } ++ esIndex := strings.ToLower(indexerConfig.Index) ++ cfg := elasticsearch.Config{ ++ Addresses: indexerConfig.Servers, ++ Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: indexerConfig.InsecureSkipVerify}}, ++ } ++ ESClient, err = elasticsearch.NewClient(cfg) ++ if err != nil { ++ return fmt.Errorf("error creating the ES client: %s", err) ++ } ++ r, err := ESClient.Cluster.Health() ++ if err != nil { ++ return fmt.Errorf("ES health check failed: %s", err) ++ } ++ if r.StatusCode != 200 { ++ return fmt.Errorf("unexpected ES status code: %d", r.StatusCode) ++ } ++ esIndexer.index = esIndex ++ r, _ = ESClient.Indices.Exists([]string{esIndex}) ++ if r.IsError() { ++ r, _ = ESClient.Indices.Create(esIndex) ++ if r.IsError() { ++ return fmt.Errorf("error creating index %s on ES: %s", esIndex, r.String()) ++ } ++ } ++ return nil ++} ++ ++// Index uses bulkIndexer to index the documents in the given index ++func (esIndexer *Elastic) Index(documents []interface{}, opts IndexingOpts) (string, error) { ++ var statString string ++ var indexerStatsLock sync.Mutex ++ indexerStats := make(map[string]int) ++ hasher := sha256.New() ++ bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ ++ Client: ESClient, ++ Index: esIndexer.index, ++ FlushBytes: 5e+6, ++ NumWorkers: runtime.NumCPU(), ++ Timeout: 10 * time.Minute, // TODO: hardcoded ++ }) ++ if err != nil { ++ return "", fmt.Errorf("Error creating the indexer: %s", err) ++ } ++ start := time.Now().UTC() ++ for _, document := range documents { ++ j, err := json.Marshal(document) ++ if err != nil { ++ return "", fmt.Errorf("Cannot encode document %s: %s", document, err) ++ } ++ hasher.Write(j) ++ err = bi.Add( ++ context.Background(), ++ esutil.BulkIndexerItem{ ++ Action: "index", ++ Body: bytes.NewReader(j), ++ DocumentID: hex.EncodeToString(hasher.Sum(nil)), ++ OnSuccess: func(c context.Context, bii esutil.BulkIndexerItem, biri esutil.BulkIndexerResponseItem) { ++ indexerStatsLock.Lock() ++ defer indexerStatsLock.Unlock() ++ indexerStats[biri.Result]++ ++ }, ++ }, ++ ) ++ if err != nil { ++ return "", fmt.Errorf("Unexpected ES indexing error: %s", err) ++ } ++ hasher.Reset() ++ } ++ if err := bi.Close(context.Background()); err != nil { ++ return "", fmt.Errorf("Unexpected ES error: %s", err) ++ } ++ dur := time.Since(start) ++ for stat, val := range indexerStats { ++ statString += fmt.Sprintf(" %s=%d", stat, val) ++ } ++ return fmt.Sprintf("Indexing finished in %v:%v", dur.Truncate(time.Millisecond), statString), nil ++} +diff --git a/test/performance/performance/indexers/opensearch.go b/test/performance/performance/indexers/opensearch.go +new file mode 100644 +index 000000000..d8e17bda9 +--- /dev/null ++++ b/test/performance/performance/indexers/opensearch.go +@@ -0,0 +1,132 @@ ++// Copyright 2023 The go-commons Authors. ++// ++// Licensed under the Apache License, Version 2.0 (the "License"); ++// you may not use this file except in compliance with the License. ++// You may obtain a copy of the License at ++// ++// http://www.apache.org/licenses/LICENSE-2.0 ++// ++// Unless required by applicable law or agreed to in writing, software ++// distributed under the License is distributed on an "AS IS" BASIS, ++// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++// See the License for the specific language governing permissions and ++// limitations under the License. ++ ++package indexers ++ ++import ( ++ "bytes" ++ "context" ++ "crypto/sha256" ++ "crypto/tls" ++ "encoding/hex" ++ "encoding/json" ++ "fmt" ++ "net/http" ++ "runtime" ++ "strings" ++ "sync" ++ "time" ++ ++ opensearch "github.com/opensearch-project/opensearch-go" ++ opensearchutil "github.com/opensearch-project/opensearch-go/opensearchutil" ++) ++ ++const indexer = "opensearch" ++ ++// OSClient OpenSearch client instance ++var OSClient *opensearch.Client ++ ++// OpenSearch OpenSearch instance ++type OpenSearch struct { ++ index string ++} ++ ++// Init function ++func init() { ++ indexerMap[indexer] = &OpenSearch{} ++} ++ ++// Returns new indexer for OpenSearch ++func (OpenSearchIndexer *OpenSearch) new(indexerConfig IndexerConfig) error { ++ var err error ++ if indexerConfig.Index == "" { ++ return fmt.Errorf("index name not specified") ++ } ++ OpenSearchIndex := strings.ToLower(indexerConfig.Index) ++ cfg := opensearch.Config{ ++ Addresses: indexerConfig.Servers, ++ Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: indexerConfig.InsecureSkipVerify}}, ++ } ++ OSClient, err = opensearch.NewClient(cfg) ++ if err != nil { ++ return fmt.Errorf("error creating the OpenSearch client: %s", err) ++ } ++ r, err := OSClient.Cluster.Health() ++ if err != nil { ++ return fmt.Errorf("OpenSearch health check failed: %s", err) ++ } ++ if r.StatusCode != 200 { ++ return fmt.Errorf("unexpected OpenSearch status code: %d", r.StatusCode) ++ } ++ OpenSearchIndexer.index = OpenSearchIndex ++ r, _ = OSClient.Indices.Exists([]string{OpenSearchIndex}) ++ if r.IsError() { ++ r, _ = OSClient.Indices.Create(OpenSearchIndex) ++ if r.IsError() { ++ return fmt.Errorf("error creating index %s on OpenSearch: %s", OpenSearchIndex, r.String()) ++ } ++ } ++ return nil ++} ++ ++// Index uses bulkIndexer to index the documents in the given index ++func (OpenSearchIndexer *OpenSearch) Index(documents []interface{}, opts IndexingOpts) (string, error) { ++ var statString string ++ var indexerStatsLock sync.Mutex ++ indexerStats := make(map[string]int) ++ hasher := sha256.New() ++ bi, err := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{ ++ Client: OSClient, ++ Index: OpenSearchIndexer.index, ++ FlushBytes: 5e+6, ++ NumWorkers: runtime.NumCPU(), ++ Timeout: 10 * time.Minute, // TODO: hardcoded ++ }) ++ if err != nil { ++ return "", fmt.Errorf("Error creating the indexer: %s", err) ++ } ++ start := time.Now().UTC() ++ for _, document := range documents { ++ j, err := json.Marshal(document) ++ if err != nil { ++ return "", fmt.Errorf("Cannot encode document %s: %s", document, err) ++ } ++ hasher.Write(j) ++ err = bi.Add( ++ context.Background(), ++ opensearchutil.BulkIndexerItem{ ++ Action: "index", ++ Body: bytes.NewReader(j), ++ DocumentID: hex.EncodeToString(hasher.Sum(nil)), ++ OnSuccess: func(c context.Context, bii opensearchutil.BulkIndexerItem, biri opensearchutil.BulkIndexerResponseItem) { ++ indexerStatsLock.Lock() ++ defer indexerStatsLock.Unlock() ++ indexerStats[biri.Result]++ ++ }, ++ }, ++ ) ++ if err != nil { ++ return "", fmt.Errorf("Unexpected OpenSearch indexing error: %s", err) ++ } ++ hasher.Reset() ++ } ++ if err := bi.Close(context.Background()); err != nil { ++ return "", fmt.Errorf("Unexpected OpenSearch error: %s", err) ++ } ++ dur := time.Since(start) ++ for stat, val := range indexerStats { ++ statString += fmt.Sprintf(" %s=%d", stat, val) ++ } ++ return fmt.Sprintf("Indexing finished in %v:%v", dur.Truncate(time.Millisecond), statString), nil ++} +diff --git a/test/performance/performance/indexers/types.go b/test/performance/performance/indexers/types.go +new file mode 100644 +index 000000000..2317c746a +--- /dev/null ++++ b/test/performance/performance/indexers/types.go +@@ -0,0 +1,77 @@ ++// Copyright 2023 The go-commons Authors. ++// ++// Licensed under the Apache License, Version 2.0 (the "License"); ++// you may not use this file except in compliance with the License. ++// You may obtain a copy of the License at ++// ++// http://www.apache.org/licenses/LICENSE-2.0 ++// ++// Unless required by applicable law or agreed to in writing, software ++// distributed under the License is distributed on an "AS IS" BASIS, ++// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++// See the License for the specific language governing permissions and ++// limitations under the License. ++ ++package indexers ++ ++import "fmt" ++ ++// Types of indexers ++const ( ++ // Elastic indexer that sends metrics to the configured ES instance ++ ElasticIndexer IndexerType = "elastic" ++ // OpenSearch indexer that sends metrics to the configured Search Instance ++ OpenSearchIndexer IndexerType = "opensearch" ++ // Local indexer that writes metrics to local directory ++ LocalIndexer IndexerType = "local" ++) ++ ++var indexerMap = make(map[IndexerType]Indexer) ++ ++// Indexer interface ++type Indexer interface { ++ Index([]interface{}, IndexingOpts) (string, error) ++ new(IndexerConfig) error ++} ++ ++// Indexing options ++type IndexingOpts struct { ++ MetricName string // MetricName, required for local indexer ++} ++ ++// IndexerType type of indexer ++type IndexerType string ++ ++// IndexerConfig holds the indexer configuration ++type IndexerConfig struct { ++ // Type type of indexer ++ Type IndexerType `yaml:"type"` ++ // Servers List of ElasticSearch instances ++ Servers []string `yaml:"esServers"` ++ // Index index to send documents to server ++ Index string `yaml:"defaultIndex"` ++ // InsecureSkipVerify disable TLS ceriticate verification ++ InsecureSkipVerify bool `yaml:"insecureSkipVerify"` ++ // Directory to save metrics files in ++ MetricsDirectory string `yaml:"metricsDirectory"` ++ // Create tarball ++ CreateTarball bool `yaml:"createTarball"` ++ // TarBall name ++ TarballName string `yaml:"tarballName"` ++} ++ ++// NewIndexer creates a new Indexer with the specified IndexerConfig ++func NewIndexer(indexerConfig IndexerConfig) (*Indexer, error) { ++ var indexer Indexer ++ var exists bool ++ cfg := indexerConfig ++ if indexer, exists = indexerMap[cfg.Type]; exists { ++ err := indexer.new(indexerConfig) ++ if err != nil { ++ return &indexer, err ++ } ++ } else { ++ return &indexer, fmt.Errorf("Indexer not found: %s", cfg.Type) ++ } ++ return &indexer, nil ++} +diff --git a/test/performance/performance/reporter.go b/test/performance/performance/reporter.go +new file mode 100644 +index 000000000..6a3c41539 +--- /dev/null ++++ b/test/performance/performance/reporter.go +@@ -0,0 +1,55 @@ ++package performance ++ ++import ( ++ "os" ++ "strconv" ++ ++ vegeta "github.com/tsenart/vegeta/v12/lib" ++ "knative.dev/serving/test/performance/performance/indexers" ++) ++ ++type DataPointReporter interface { ++ AddDataPoint(measurement string, fields map[string]interface{}) ++ AddDataPointsForMetrics(m *vegeta.Metrics, benchmarkName string) ++ FlushAndShutdown() ++} ++ ++func NewDataPointReporterFactory(tags map[string]string, index string) (DataPointReporter, error) { ++ var reporter DataPointReporter ++ var err error ++ useDefaultReporter := true ++ ++ if v, b := os.LookupEnv(UseESEnv); b { ++ if b, err = strconv.ParseBool(v); err == nil { ++ if b { ++ useDefaultReporter = false ++ reporter, err = NewESReporter(tags, indexers.ElasticIndexer, index) ++ if err != nil { ++ return nil, err ++ } ++ } ++ } ++ } ++ ++ if v, b := os.LookupEnv(UseOpenSearcnEnv); b { ++ if b, err = strconv.ParseBool(v); err == nil { ++ if b { ++ useDefaultReporter = false ++ reporter, err = NewESReporter(tags, indexers.OpenSearchIndexer, index) ++ if err != nil { ++ return nil, err ++ } ++ } ++ } ++ } ++ ++ if useDefaultReporter { ++ reporter, err = NewInfluxReporter(tags) ++ if err != nil { ++ return nil, err ++ } ++ } ++ ++ rep := interface{}(reporter).(DataPointReporter) ++ return rep, nil ++}