From 47c0a63e0b5c624a6a7878b73315d116ec959c4d Mon Sep 17 00:00:00 2001 From: Jared Tan Date: Mon, 2 Sep 2024 19:58:33 +0800 Subject: [PATCH 1/6] Add configurable index data layout and rollover frequency Signed-off-by: Jared Tan --- cmd/es-rollover/app/init/action.go | 16 +- cmd/es-rollover/app/init/flags.go | 28 +- cmd/es-rollover/app/init/flags_test.go | 12 +- cmd/esmapping-generator/app/flags.go | 8 +- cmd/esmapping-generator/app/flags_test.go | 9 +- .../app/renderer/render.go | 21 +- cmd/jaeger/config-elasticsearch.yaml | 21 ++ pkg/es/config/config.go | 152 ++++++----- pkg/es/config/config_test.go | 250 ++++++++---------- plugin/storage/es/factory.go | 50 ++-- .../es/mappings/jaeger-dependencies-7.json | 4 +- .../es/mappings/jaeger-dependencies-8.json | 8 +- .../es/mappings/jaeger-sampling-7.json | 2 +- .../es/mappings/jaeger-sampling-8.json | 8 +- .../storage/es/mappings/jaeger-service-7.json | 8 +- .../storage/es/mappings/jaeger-service-8.json | 10 +- plugin/storage/es/mappings/jaeger-span-7.json | 6 +- plugin/storage/es/mappings/jaeger-span-8.json | 8 +- plugin/storage/es/mappings/mapping.go | 60 +++-- plugin/storage/es/mappings/mapping_test.go | 92 ++++--- plugin/storage/es/options.go | 101 ++++--- plugin/storage/es/options_test.go | 39 ++- plugin/storage/es/spanstore/reader.go | 136 +++++----- plugin/storage/es/spanstore/reader_test.go | 118 +++++---- plugin/storage/es/spanstore/writer.go | 4 +- plugin/storage/es/spanstore/writer_test.go | 18 +- 26 files changed, 655 insertions(+), 534 deletions(-) diff --git a/cmd/es-rollover/app/init/action.go b/cmd/es-rollover/app/init/action.go index dcb7ad4735c..73e04487f15 100644 --- a/cmd/es-rollover/app/init/action.go +++ b/cmd/es-rollover/app/init/action.go @@ -29,17 +29,11 @@ type Action struct { func (c Action) getMapping(version uint, templateName string) (string, error) { mappingBuilder := mappings.MappingBuilder{ - TemplateBuilder: es.TextTemplateBuilder{}, - PrioritySpanTemplate: int64(c.Config.PrioritySpanTemplate), - PriorityServiceTemplate: int64(c.Config.PriorityServiceTemplate), - PriorityDependenciesTemplate: int64(c.Config.PriorityDependenciesTemplate), - PrioritySamplingTemplate: int64(c.Config.PrioritySamplingTemplate), - Shards: int64(c.Config.Shards), - Replicas: int64(c.Config.Replicas), - IndexPrefix: c.Config.IndexPrefix, - UseILM: c.Config.UseILM, - ILMPolicyName: c.Config.ILMPolicyName, - EsVersion: version, + TemplateBuilder: es.TextTemplateBuilder{}, + Indices: c.Config.Indices, + UseILM: c.Config.UseILM, + ILMPolicyName: c.Config.ILMPolicyName, + EsVersion: version, } return mappingBuilder.GetMapping(templateName) } diff --git a/cmd/es-rollover/app/init/flags.go b/cmd/es-rollover/app/init/flags.go index dbe969fbe3d..12dab457670 100644 --- a/cmd/es-rollover/app/init/flags.go +++ b/cmd/es-rollover/app/init/flags.go @@ -9,6 +9,7 @@ import ( "github.com/spf13/viper" "github.com/jaegertracing/jaeger/cmd/es-rollover/app" + cfg "github.com/jaegertracing/jaeger/pkg/es/config" ) const ( @@ -23,12 +24,7 @@ const ( // Config holds configuration for index cleaner binary. type Config struct { app.Config - Shards int - Replicas int - PrioritySpanTemplate int - PriorityServiceTemplate int - PriorityDependenciesTemplate int - PrioritySamplingTemplate int + cfg.Indices } // AddFlags adds flags for TLS to the FlagSet. @@ -43,10 +39,18 @@ func (*Config) AddFlags(flags *flag.FlagSet) { // InitFromViper initializes config from viper.Viper. func (c *Config) InitFromViper(v *viper.Viper) { - c.Shards = v.GetInt(shards) - c.Replicas = v.GetInt(replicas) - c.PrioritySpanTemplate = v.GetInt(prioritySpanTemplate) - c.PriorityServiceTemplate = v.GetInt(priorityServiceTemplate) - c.PriorityDependenciesTemplate = v.GetInt(priorityDependenciesTemplate) - c.PrioritySamplingTemplate = v.GetInt(prioritySamplingTemplate) + c.Indices.Spans.Shards = v.GetInt(shards) + c.Indices.Services.Shards = v.GetInt(shards) + c.Indices.Dependencies.Shards = v.GetInt(shards) + c.Indices.Sampling.Shards = v.GetInt(shards) + + c.Indices.Spans.Replicas = v.GetInt(replicas) + c.Indices.Services.Replicas = v.GetInt(replicas) + c.Indices.Dependencies.Replicas = v.GetInt(replicas) + c.Indices.Sampling.Replicas = v.GetInt(replicas) + + c.Indices.Spans.Priority = v.GetInt(prioritySpanTemplate) + c.Indices.Services.Priority = v.GetInt(priorityServiceTemplate) + c.Indices.Dependencies.Priority = v.GetInt(priorityDependenciesTemplate) + c.Indices.Sampling.Priority = v.GetInt(prioritySamplingTemplate) } diff --git a/cmd/es-rollover/app/init/flags_test.go b/cmd/es-rollover/app/init/flags_test.go index e3dbe54c94f..5fdf46174b6 100644 --- a/cmd/es-rollover/app/init/flags_test.go +++ b/cmd/es-rollover/app/init/flags_test.go @@ -33,10 +33,10 @@ func TestBindFlags(t *testing.T) { require.NoError(t, err) c.InitFromViper(v) - assert.Equal(t, 8, c.Shards) - assert.Equal(t, 16, c.Replicas) - assert.Equal(t, 300, c.PrioritySpanTemplate) - assert.Equal(t, 301, c.PriorityServiceTemplate) - assert.Equal(t, 302, c.PriorityDependenciesTemplate) - assert.Equal(t, 303, c.PrioritySamplingTemplate) + assert.Equal(t, 8, c.Indices.Spans.Shards) + assert.Equal(t, 16, c.Indices.Spans.Replicas) + assert.Equal(t, 300, c.Indices.Spans.Priority) + assert.Equal(t, 301, c.Indices.Services.Priority) + assert.Equal(t, 302, c.Indices.Dependencies.Priority) + assert.Equal(t, 303, c.Indices.Sampling.Priority) } diff --git a/cmd/esmapping-generator/app/flags.go b/cmd/esmapping-generator/app/flags.go index 4c684716044..74863005cf4 100644 --- a/cmd/esmapping-generator/app/flags.go +++ b/cmd/esmapping-generator/app/flags.go @@ -11,8 +11,8 @@ import ( type Options struct { Mapping string EsVersion uint - Shards int64 - Replicas int64 + Shards int + Replicas int IndexPrefix string UseILM string // using string as util is being used in python and using bool leads to type issues. ILMPolicyName string @@ -40,12 +40,12 @@ func (o *Options) AddFlags(command *cobra.Command) { esVersionFlag, 7, "The major Elasticsearch version") - command.Flags().Int64Var( + command.Flags().IntVar( &o.Shards, shardsFlag, 5, "The number of shards per index in Elasticsearch") - command.Flags().Int64Var( + command.Flags().IntVar( &o.Replicas, replicasFlag, 1, diff --git a/cmd/esmapping-generator/app/flags_test.go b/cmd/esmapping-generator/app/flags_test.go index 5189685b4d6..726120883b9 100644 --- a/cmd/esmapping-generator/app/flags_test.go +++ b/cmd/esmapping-generator/app/flags_test.go @@ -20,8 +20,9 @@ func TestOptionsWithDefaultFlags(t *testing.T) { assert.Equal(t, "", o.Mapping) assert.Equal(t, uint(7), o.EsVersion) - assert.Equal(t, int64(5), o.Shards) - assert.Equal(t, int64(1), o.Replicas) + assert.Equal(t, 5, o.Shards) + assert.Equal(t, 1, o.Replicas) + assert.Equal(t, "", o.IndexPrefix) assert.Equal(t, "false", o.UseILM) assert.Equal(t, "jaeger-ilm-policy", o.ILMPolicyName) @@ -44,8 +45,8 @@ func TestOptionsWithFlags(t *testing.T) { require.NoError(t, err) assert.Equal(t, "jaeger-span", o.Mapping) assert.Equal(t, uint(7), o.EsVersion) - assert.Equal(t, int64(5), o.Shards) - assert.Equal(t, int64(1), o.Replicas) + assert.Equal(t, 5, o.Shards) + assert.Equal(t, 1, o.Replicas) assert.Equal(t, "test", o.IndexPrefix) assert.Equal(t, "true", o.UseILM) assert.Equal(t, "jaeger-test-policy", o.ILMPolicyName) diff --git a/cmd/esmapping-generator/app/renderer/render.go b/cmd/esmapping-generator/app/renderer/render.go index 7cb3c89f49a..15cd0534f01 100644 --- a/cmd/esmapping-generator/app/renderer/render.go +++ b/cmd/esmapping-generator/app/renderer/render.go @@ -8,6 +8,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/esmapping-generator/app" "github.com/jaegertracing/jaeger/pkg/es" + cfg "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/plugin/storage/es/mappings" ) @@ -25,14 +26,22 @@ func GetMappingAsString(builder es.TemplateBuilder, opt *app.Options) (string, e return "", err } + indexOpts := cfg.IndexOptions{ + Priority: 0, + Shards: opt.Shards, + Replicas: opt.Shards, + } mappingBuilder := mappings.MappingBuilder{ TemplateBuilder: builder, - Shards: opt.Shards, - Replicas: opt.Replicas, - EsVersion: opt.EsVersion, - IndexPrefix: opt.IndexPrefix, - UseILM: enableILM, - ILMPolicyName: opt.ILMPolicyName, + Indices: cfg.Indices{ + Spans: indexOpts, + Services: indexOpts, + Dependencies: indexOpts, + Sampling: indexOpts, + }, + EsVersion: opt.EsVersion, + UseILM: enableILM, + ILMPolicyName: opt.ILMPolicyName, } return mappingBuilder.GetMapping(opt.Mapping) } diff --git a/cmd/jaeger/config-elasticsearch.yaml b/cmd/jaeger/config-elasticsearch.yaml index 42cf78571c6..308b7d11c5f 100644 --- a/cmd/jaeger/config-elasticsearch.yaml +++ b/cmd/jaeger/config-elasticsearch.yaml @@ -21,6 +21,27 @@ extensions: some_storage: elasticsearch: index_prefix: "jaeger-main" + indices: + spans: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + services: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + dependencies: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + sampling: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 another_storage: elasticsearch: index_prefix: "jaeger-archive" diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index d5fcde09668..5a2b9a2d18a 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -33,48 +33,55 @@ import ( storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics" ) +// IndexOptions describes the index format and rollover frequency +type IndexOptions struct { + Prefix string `mapstructure:"prefix"` + Priority int `mapstructure:"priority"` + DateLayout string `mapstructure:"date_layout"` + Shards int `mapstructure:"shards"` + Replicas int `mapstructure:"replicas"` + RolloverFrequency string `mapstructure:"rollover_frequency"` // "hour" or "day" +} + +// Indices describes different configuration options for each index type +type Indices struct { + Spans IndexOptions `mapstructure:"spans"` + Services IndexOptions `mapstructure:"services"` + Dependencies IndexOptions `mapstructure:"dependencies"` + Sampling IndexOptions `mapstructure:"sampling"` +} + // Configuration describes the configuration properties needed to connect to an ElasticSearch cluster type Configuration struct { - Servers []string `mapstructure:"server_urls" valid:"required,url"` - RemoteReadClusters []string `mapstructure:"remote_read_clusters"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password" json:"-"` - TokenFilePath string `mapstructure:"token_file"` - PasswordFilePath string `mapstructure:"password_file"` - AllowTokenFromContext bool `mapstructure:"-"` - Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing - SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"` - MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query - MaxSpanAge time.Duration `mapstructure:"-"` // configures the maximum lookback on span reads - NumShards int64 `mapstructure:"num_shards"` - NumReplicas int64 `mapstructure:"num_replicas"` - PrioritySpanTemplate int64 `mapstructure:"priority_span_template"` - PriorityServiceTemplate int64 `mapstructure:"priority_service_template"` - PriorityDependenciesTemplate int64 `mapstructure:"priority_dependencies_template"` - Timeout time.Duration `mapstructure:"-"` - BulkSize int `mapstructure:"-"` - BulkWorkers int `mapstructure:"-"` - BulkActions int `mapstructure:"-"` - BulkFlushInterval time.Duration `mapstructure:"-"` - IndexPrefix string `mapstructure:"index_prefix"` - IndexDateLayoutSpans string `mapstructure:"-"` - IndexDateLayoutServices string `mapstructure:"-"` - IndexDateLayoutSampling string `mapstructure:"-"` - IndexDateLayoutDependencies string `mapstructure:"-"` - IndexRolloverFrequencySpans string `mapstructure:"-"` - IndexRolloverFrequencyServices string `mapstructure:"-"` - IndexRolloverFrequencySampling string `mapstructure:"-"` - ServiceCacheTTL time.Duration `mapstructure:"service_cache_ttl"` - AdaptiveSamplingLookback time.Duration `mapstructure:"-"` - Tags TagsAsFields `mapstructure:"tags_as_fields"` - Enabled bool `mapstructure:"-"` - TLS tlscfg.Options `mapstructure:"tls"` - UseReadWriteAliases bool `mapstructure:"use_aliases"` - CreateIndexTemplates bool `mapstructure:"create_mappings"` - UseILM bool `mapstructure:"use_ilm"` - Version uint `mapstructure:"version"` - LogLevel string `mapstructure:"log_level"` - SendGetBodyAs string `mapstructure:"send_get_body_as"` + Servers []string `mapstructure:"server_urls" valid:"required,url"` + RemoteReadClusters []string `mapstructure:"remote_read_clusters"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password" json:"-"` + TokenFilePath string `mapstructure:"token_file"` + PasswordFilePath string `mapstructure:"password_file"` + AllowTokenFromContext bool `mapstructure:"-"` + Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing + SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"` + MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query + MaxSpanAge time.Duration `mapstructure:"-"` // configures the maximum lookback on span reads + Timeout time.Duration `mapstructure:"-"` + BulkSize int `mapstructure:"-"` + BulkWorkers int `mapstructure:"-"` + BulkActions int `mapstructure:"-"` + BulkFlushInterval time.Duration `mapstructure:"-"` + IndexPrefix string `mapstructure:"index_prefix"` + Indices Indices `mapstructure:"indices"` + ServiceCacheTTL time.Duration `mapstructure:"service_cache_ttl"` + AdaptiveSamplingLookback time.Duration `mapstructure:"-"` + Tags TagsAsFields `mapstructure:"tags_as_fields"` + Enabled bool `mapstructure:"-"` + TLS tlscfg.Options `mapstructure:"tls"` + UseReadWriteAliases bool `mapstructure:"use_aliases"` + CreateIndexTemplates bool `mapstructure:"create_mappings"` + UseILM bool `mapstructure:"use_ilm"` + Version uint `mapstructure:"version"` + LogLevel string `mapstructure:"log_level"` + SendGetBodyAs string `mapstructure:"send_get_body_as"` } // TagsAsFields holds configuration for tag schema. @@ -207,6 +214,32 @@ func newElasticsearchV8(c *Configuration, logger *zap.Logger) (*esV8.Client, err return esV8.NewClient(options) } +func setDefaultIndexOptions(cfg, source *IndexOptions) { + if cfg.Shards == 0 { + cfg.Shards = source.Shards + } + + if cfg.Replicas == 0 { + cfg.Replicas = source.Replicas + } + + if cfg.Priority == 0 { + cfg.Priority = source.Priority + } + + if cfg.DateLayout == "" { + cfg.DateLayout = source.DateLayout + } + + if cfg.RolloverFrequency == "" { + cfg.RolloverFrequency = source.RolloverFrequency + } + + if cfg.Prefix == "" { + cfg.Prefix = source.Prefix + } +} + // ApplyDefaults copies settings from source unless its own value is non-zero. func (c *Configuration) ApplyDefaults(source *Configuration) { if len(c.RemoteReadClusters) == 0 { @@ -227,21 +260,11 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { if c.AdaptiveSamplingLookback == 0 { c.AdaptiveSamplingLookback = source.AdaptiveSamplingLookback } - if c.NumShards == 0 { - c.NumShards = source.NumShards - } - if c.NumReplicas == 0 { - c.NumReplicas = source.NumReplicas - } - if c.PrioritySpanTemplate == 0 { - c.PrioritySpanTemplate = source.PrioritySpanTemplate - } - if c.PriorityServiceTemplate == 0 { - c.PriorityServiceTemplate = source.PriorityServiceTemplate - } - if c.PrioritySpanTemplate == 0 { - c.PriorityDependenciesTemplate = source.PriorityDependenciesTemplate - } + + setDefaultIndexOptions(&c.Indices.Spans, &source.Indices.Spans) + setDefaultIndexOptions(&c.Indices.Services, &source.Indices.Services) + setDefaultIndexOptions(&c.Indices.Dependencies, &source.Indices.Dependencies) + if c.BulkSize == 0 { c.BulkSize = source.BulkSize } @@ -280,23 +303,8 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { } } -// GetIndexRolloverFrequencySpansDuration returns jaeger-span index rollover frequency duration -func (c *Configuration) GetIndexRolloverFrequencySpansDuration() time.Duration { - return getIndexRolloverFrequencyDuration(c.IndexRolloverFrequencySpans) -} - -// GetIndexRolloverFrequencyServicesDuration returns jaeger-service index rollover frequency duration -func (c *Configuration) GetIndexRolloverFrequencyServicesDuration() time.Duration { - return getIndexRolloverFrequencyDuration(c.IndexRolloverFrequencyServices) -} - -// GetIndexRolloverFrequencySamplingDuration returns jaeger-sampling index rollover frequency duration -func (c *Configuration) GetIndexRolloverFrequencySamplingDuration() time.Duration { - return getIndexRolloverFrequencyDuration(c.IndexRolloverFrequencySampling) -} - -// GetIndexRolloverFrequencyDuration returns the index rollover frequency duration for the given frequency string -func getIndexRolloverFrequencyDuration(frequency string) time.Duration { +// RolloverFrequencyAsNegativeDuration returns the index rollover frequency duration for the given frequency string +func RolloverFrequencyAsNegativeDuration(frequency string) time.Duration { if frequency == "hour" { return -1 * time.Hour } diff --git a/pkg/es/config/config_test.go b/pkg/es/config/config_test.go index 7ba124bfd9a..fb3a77c410d 100644 --- a/pkg/es/config/config_test.go +++ b/pkg/es/config/config_test.go @@ -315,26 +315,39 @@ func TestNewClient(t *testing.T) { func TestApplyDefaults(t *testing.T) { source := &Configuration{ - RemoteReadClusters: []string{"cluster1", "cluster2"}, - Username: "sourceUser", - Password: "sourcePass", - Sniffer: true, - MaxSpanAge: 100, - AdaptiveSamplingLookback: 50, - NumShards: 5, - NumReplicas: 1, - PrioritySpanTemplate: 10, - PriorityServiceTemplate: 20, - PriorityDependenciesTemplate: 30, - BulkSize: 1000, - BulkWorkers: 10, - BulkActions: 100, - BulkFlushInterval: 30, - SnifferTLSEnabled: true, - Tags: TagsAsFields{AllAsFields: true, DotReplacement: "dot", Include: "include", File: "file"}, - MaxDocCount: 10000, - LogLevel: "info", - SendGetBodyAs: "json", + RemoteReadClusters: []string{"cluster1", "cluster2"}, + Username: "sourceUser", + Password: "sourcePass", + Sniffer: true, + MaxSpanAge: 100, + AdaptiveSamplingLookback: 50, + Indices: Indices{ + Spans: IndexOptions{ + Shards: 5, + Replicas: 1, + Priority: 10, + }, + Services: IndexOptions{ + Shards: 5, + Replicas: 1, + Priority: 20, + }, + Dependencies: IndexOptions{ + Shards: 5, + Replicas: 1, + Priority: 30, + }, + Sampling: IndexOptions{}, + }, + BulkSize: 1000, + BulkWorkers: 10, + BulkActions: 100, + BulkFlushInterval: 30, + SnifferTLSEnabled: true, + Tags: TagsAsFields{AllAsFields: true, DotReplacement: "dot", Include: "include", File: "file"}, + MaxDocCount: 10000, + LogLevel: "info", + SendGetBodyAs: "json", } tests := []struct { @@ -345,66 +358,102 @@ func TestApplyDefaults(t *testing.T) { { name: "All Defaults Applied except PriorityDependenciesTemplate", target: &Configuration{ - PriorityDependenciesTemplate: 30, + Indices: Indices{ + Dependencies: IndexOptions{ + Priority: 30, + }, + }, }, // All fields are empty expected: source, }, { name: "Some Defaults Applied", target: &Configuration{ - RemoteReadClusters: []string{"customCluster"}, - Username: "customUser", - PrioritySpanTemplate: 10, - PriorityServiceTemplate: 20, - PriorityDependenciesTemplate: 30, + RemoteReadClusters: []string{"customCluster"}, + Username: "customUser", + Indices: Indices{ + Spans: IndexOptions{ + Priority: 10, + }, + Services: IndexOptions{ + Priority: 20, + }, + Dependencies: IndexOptions{ + Priority: 30, + }, + }, // Other fields left default }, expected: &Configuration{ - RemoteReadClusters: []string{"customCluster"}, - Username: "customUser", - Password: "sourcePass", - Sniffer: true, - SnifferTLSEnabled: true, - MaxSpanAge: 100, - AdaptiveSamplingLookback: 50, - NumShards: 5, - NumReplicas: 1, - PrioritySpanTemplate: 10, - PriorityServiceTemplate: 20, - PriorityDependenciesTemplate: 30, - BulkSize: 1000, - BulkWorkers: 10, - BulkActions: 100, - BulkFlushInterval: 30, - Tags: TagsAsFields{AllAsFields: true, DotReplacement: "dot", Include: "include", File: "file"}, - MaxDocCount: 10000, - LogLevel: "info", - SendGetBodyAs: "json", + RemoteReadClusters: []string{"customCluster"}, + Username: "customUser", + Password: "sourcePass", + Sniffer: true, + SnifferTLSEnabled: true, + MaxSpanAge: 100, + AdaptiveSamplingLookback: 50, + Indices: Indices{ + Spans: IndexOptions{ + Shards: 5, + Replicas: 1, + Priority: 10, + }, + Services: IndexOptions{ + Shards: 5, + Replicas: 1, + Priority: 20, + }, + Dependencies: IndexOptions{ + Shards: 5, + Replicas: 1, + Priority: 30, + }, + }, + BulkSize: 1000, + BulkWorkers: 10, + BulkActions: 100, + BulkFlushInterval: 30, + Tags: TagsAsFields{AllAsFields: true, DotReplacement: "dot", Include: "include", File: "file"}, + MaxDocCount: 10000, + LogLevel: "info", + SendGetBodyAs: "json", }, }, { name: "No Defaults Applied", target: &Configuration{ - RemoteReadClusters: []string{"cluster1", "cluster2"}, - Username: "sourceUser", - Password: "sourcePass", - Sniffer: true, - MaxSpanAge: 100, - AdaptiveSamplingLookback: 50, - NumShards: 5, - NumReplicas: 1, - PrioritySpanTemplate: 10, - PriorityServiceTemplate: 20, - PriorityDependenciesTemplate: 30, - BulkSize: 1000, - BulkWorkers: 10, - BulkActions: 100, - BulkFlushInterval: 30, - SnifferTLSEnabled: true, - Tags: TagsAsFields{AllAsFields: true, DotReplacement: "dot", Include: "include", File: "file"}, - MaxDocCount: 10000, - LogLevel: "info", - SendGetBodyAs: "json", + RemoteReadClusters: []string{"cluster1", "cluster2"}, + Username: "sourceUser", + Password: "sourcePass", + Sniffer: true, + MaxSpanAge: 100, + AdaptiveSamplingLookback: 50, + Indices: Indices{ + Spans: IndexOptions{ + Shards: 5, + Replicas: 1, + Priority: 10, + }, + Services: IndexOptions{ + Shards: 5, + Replicas: 1, + Priority: 20, + }, + Dependencies: IndexOptions{ + Shards: 5, + Replicas: 1, + Priority: 30, + }, + }, + BulkSize: 1000, + BulkWorkers: 10, + BulkActions: 100, + BulkFlushInterval: 30, + SnifferTLSEnabled: true, + Tags: TagsAsFields{AllAsFields: true, DotReplacement: "dot", Include: "include", File: "file"}, + MaxDocCount: 10000, + LogLevel: "info", + SendGetBodyAs: "json", }, expected: source, }, @@ -503,7 +552,7 @@ func TestTagKeysAsFields(t *testing.T) { } } -func TestGetIndexRolloverFrequencySpansDuration(t *testing.T) { +func TestRolloverFrequencyAsNegativeDuration(t *testing.T) { tests := []struct { name string indexFrequency string @@ -528,72 +577,7 @@ func TestGetIndexRolloverFrequencySpansDuration(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - c := &Configuration{IndexRolloverFrequencySpans: test.indexFrequency} - got := c.GetIndexRolloverFrequencySpansDuration() - require.Equal(t, test.expected, got) - }) - } -} - -func TestGetIndexRolloverFrequencyServicesDuration(t *testing.T) { - tests := []struct { - name string - indexFrequency string - expected time.Duration - }{ - { - name: "hourly jaeger-service", - indexFrequency: "hour", - expected: -1 * time.Hour, - }, - { - name: "daily jaeger-service", - indexFrequency: "daily", - expected: -24 * time.Hour, - }, - { - name: "empty jaeger-service", - indexFrequency: "", - expected: -24 * time.Hour, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - c := &Configuration{IndexRolloverFrequencyServices: test.indexFrequency} - got := c.GetIndexRolloverFrequencyServicesDuration() - require.Equal(t, test.expected, got) - }) - } -} - -func TestGetIndexRolloverFrequencySamplingDuration(t *testing.T) { - tests := []struct { - name string - indexFrequency string - expected time.Duration - }{ - { - name: "hourly jaeger-sampling", - indexFrequency: "hour", - expected: -1 * time.Hour, - }, - { - name: "daily jaeger-sampling", - indexFrequency: "daily", - expected: -24 * time.Hour, - }, - { - name: "empty jaeger-sampling", - indexFrequency: "", - expected: -24 * time.Hour, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - c := &Configuration{IndexRolloverFrequencySampling: test.indexFrequency} - got := c.GetIndexRolloverFrequencySamplingDuration() + got := RolloverFrequencyAsNegativeDuration(test.indexFrequency) require.Equal(t, test.expected, got) }) } diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 1a9874dc0c6..af35be59ef1 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -221,21 +221,18 @@ func createSpanReader( return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping") } return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{ - Client: clientFn, - MaxDocCount: cfg.MaxDocCount, - MaxSpanAge: cfg.MaxSpanAge, - IndexPrefix: cfg.IndexPrefix, - SpanIndexDateLayout: cfg.IndexDateLayoutSpans, - ServiceIndexDateLayout: cfg.IndexDateLayoutServices, - SpanIndexRolloverFrequency: cfg.GetIndexRolloverFrequencySpansDuration(), - ServiceIndexRolloverFrequency: cfg.GetIndexRolloverFrequencyServicesDuration(), - TagDotReplacement: cfg.Tags.DotReplacement, - UseReadWriteAliases: cfg.UseReadWriteAliases, - Archive: archive, - RemoteReadClusters: cfg.RemoteReadClusters, - Logger: logger, - MetricsFactory: mFactory, - Tracer: tp.Tracer("esSpanStore.SpanReader"), + Client: clientFn, + MaxDocCount: cfg.MaxDocCount, + MaxSpanAge: cfg.MaxSpanAge, + SpanIndex: cfg.Indices.Spans, + ServiceIndex: cfg.Indices.Services, + TagDotReplacement: cfg.Tags.DotReplacement, + UseReadWriteAliases: cfg.UseReadWriteAliases, + Archive: archive, + RemoteReadClusters: cfg.RemoteReadClusters, + Logger: logger, + MetricsFactory: mFactory, + Tracer: tp.Tracer("esSpanStore.SpanReader"), }), nil } @@ -259,8 +256,8 @@ func createSpanWriter( writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{ Client: clientFn, IndexPrefix: cfg.IndexPrefix, - SpanIndexDateLayout: cfg.IndexDateLayoutSpans, - ServiceIndexDateLayout: cfg.IndexDateLayoutServices, + SpanIndexDateLayout: cfg.Indices.Spans.DateLayout, + ServiceIndexDateLayout: cfg.Indices.Services.DateLayout, AllTagsAsFields: cfg.Tags.AllAsFields, TagKeysAsFields: tags, TagDotReplacement: cfg.Tags.DotReplacement, @@ -290,8 +287,8 @@ func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store Client: f.getPrimaryClient, Logger: f.logger, IndexPrefix: f.primaryConfig.IndexPrefix, - IndexDateLayout: f.primaryConfig.IndexDateLayoutSampling, - IndexRolloverFrequency: f.primaryConfig.GetIndexRolloverFrequencySamplingDuration(), + IndexDateLayout: f.primaryConfig.Indices.Sampling.DateLayout, + IndexRolloverFrequency: config.RolloverFrequencyAsNegativeDuration(f.primaryConfig.Indices.Sampling.RolloverFrequency), Lookback: f.primaryConfig.AdaptiveSamplingLookback, MaxDocCount: f.primaryConfig.MaxDocCount, } @@ -313,15 +310,10 @@ func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store func mappingBuilderFromConfig(cfg *config.Configuration) mappings.MappingBuilder { return mappings.MappingBuilder{ - TemplateBuilder: es.TextTemplateBuilder{}, - Shards: cfg.NumShards, - Replicas: cfg.NumReplicas, - EsVersion: cfg.Version, - IndexPrefix: cfg.IndexPrefix, - UseILM: cfg.UseILM, - PrioritySpanTemplate: cfg.PrioritySpanTemplate, - PriorityServiceTemplate: cfg.PriorityServiceTemplate, - PriorityDependenciesTemplate: cfg.PriorityDependenciesTemplate, + TemplateBuilder: es.TextTemplateBuilder{}, + Indices: cfg.Indices, + EsVersion: cfg.Version, + UseILM: cfg.UseILM, } } @@ -334,7 +326,7 @@ func createDependencyReader( Client: clientFn, Logger: logger, IndexPrefix: cfg.IndexPrefix, - IndexDateLayout: cfg.IndexDateLayoutDependencies, + IndexDateLayout: cfg.Indices.Dependencies.RolloverFrequency, MaxDocCount: cfg.MaxDocCount, UseReadWriteAliases: cfg.UseReadWriteAliases, }) diff --git a/plugin/storage/es/mappings/jaeger-dependencies-7.json b/plugin/storage/es/mappings/jaeger-dependencies-7.json index 18afe1b056e..e183efb7f69 100644 --- a/plugin/storage/es/mappings/jaeger-dependencies-7.json +++ b/plugin/storage/es/mappings/jaeger-dependencies-7.json @@ -2,7 +2,7 @@ "index_patterns": "*jaeger-dependencies-*", {{- if .UseILM }} "aliases": { - "{{ .IndexPrefix }}jaeger-dependencies-read" : {} + "{{ .Prefix }}jaeger-dependencies-read" : {} }, {{- end }} "settings":{ @@ -13,7 +13,7 @@ {{- if .UseILM }} ,"lifecycle": { "name": "{{ .ILMPolicyName }}", - "rollover_alias": "{{ .IndexPrefix }}jaeger-dependencies-write" + "rollover_alias": "{{ .Prefix }}jaeger-dependencies-write" } {{- end }} }, diff --git a/plugin/storage/es/mappings/jaeger-dependencies-8.json b/plugin/storage/es/mappings/jaeger-dependencies-8.json index 57767866284..c76cc777e42 100644 --- a/plugin/storage/es/mappings/jaeger-dependencies-8.json +++ b/plugin/storage/es/mappings/jaeger-dependencies-8.json @@ -1,10 +1,10 @@ { - "priority": {{ .PriorityDependenciesTemplate }}, - "index_patterns": "{{ .IndexPrefix }}jaeger-dependencies-*", + "priority": {{ .Priority }}, + "index_patterns": "{{ .Prefix }}jaeger-dependencies-*", "template": { {{- if .UseILM }} "aliases": { - "{{ .IndexPrefix }}jaeger-dependencies-read": {} + "{{ .Prefix }}jaeger-dependencies-read": {} }, {{- end }} "settings": { @@ -15,7 +15,7 @@ {{- if .UseILM }}, "lifecycle": { "name": "{{ .ILMPolicyName }}", - "rollover_alias": "{{ .IndexPrefix }}jaeger-dependencies-write" + "rollover_alias": "{{ .Prefix }}jaeger-dependencies-write" } {{- end }} }, diff --git a/plugin/storage/es/mappings/jaeger-sampling-7.json b/plugin/storage/es/mappings/jaeger-sampling-7.json index 167c1d47928..9092cfa885d 100644 --- a/plugin/storage/es/mappings/jaeger-sampling-7.json +++ b/plugin/storage/es/mappings/jaeger-sampling-7.json @@ -13,7 +13,7 @@ {{- if .UseILM }} ,"lifecycle": { "name": "{{ .ILMPolicyName }}", - "rollover_alias": "{{ .IndexPrefix }}jaeger-sampling-write" + "rollover_alias": "{{ .Prefix }}jaeger-sampling-write" } {{- end }} }, diff --git a/plugin/storage/es/mappings/jaeger-sampling-8.json b/plugin/storage/es/mappings/jaeger-sampling-8.json index 0667520803a..5c69bc31142 100644 --- a/plugin/storage/es/mappings/jaeger-sampling-8.json +++ b/plugin/storage/es/mappings/jaeger-sampling-8.json @@ -1,10 +1,10 @@ { - "priority": {{ .PrioritySamplingTemplate }}, - "index_patterns": "{{ .IndexPrefix }}jaeger-sampling-*", + "priority": {{ .Priority }}, + "index_patterns": "{{ .Prefix }}jaeger-sampling-*", "template": { {{- if .UseILM }} "aliases": { - "{{ .IndexPrefix }}jaeger-sampling-read": {} + "{{ .Prefix }}jaeger-sampling-read": {} }, {{- end }} "settings": { @@ -15,7 +15,7 @@ {{- if .UseILM }}, "lifecycle": { "name": "{{ .ILMPolicyName }}", - "rollover_alias": "{{ .IndexPrefix }}jaeger-sampling-write" + "rollover_alias": "{{ .Prefix }}jaeger-sampling-write" } {{- end }} }, diff --git a/plugin/storage/es/mappings/jaeger-service-7.json b/plugin/storage/es/mappings/jaeger-service-7.json index 0ca2d186319..4b1f95cf69d 100644 --- a/plugin/storage/es/mappings/jaeger-service-7.json +++ b/plugin/storage/es/mappings/jaeger-service-7.json @@ -1,19 +1,19 @@ { - "index_patterns": "*{{ .IndexPrefix }}jaeger-service-*", + "index_patterns": "*{{ .Prefix }}jaeger-service-*", {{- if .UseILM }} "aliases": { - "{{ .IndexPrefix }}jaeger-service-read" : {} + "{{ .Prefix }}jaeger-service-read" : {} }, {{- end }} "settings":{ "index.number_of_shards": {{ .Shards }}, - "index.number_of_replicas": {{ .Replicas }}, + "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit":50, "index.requests.cache.enable":true {{- if .UseILM }} ,"lifecycle": { "name": "{{ .ILMPolicyName }}", - "rollover_alias": "{{ .IndexPrefix }}jaeger-service-write" + "rollover_alias": "{{ .Prefix }}jaeger-service-write" } {{- end }} }, diff --git a/plugin/storage/es/mappings/jaeger-service-8.json b/plugin/storage/es/mappings/jaeger-service-8.json index 97ab02f573d..ae0a1ae5cff 100644 --- a/plugin/storage/es/mappings/jaeger-service-8.json +++ b/plugin/storage/es/mappings/jaeger-service-8.json @@ -1,21 +1,21 @@ { - "priority": {{ .PriorityServiceTemplate}}, - "index_patterns": "{{ .IndexPrefix }}jaeger-service-*", + "priority": {{ .Priority }}, + "index_patterns": "{{ .Prefix }}jaeger-service-*", "template": { {{- if .UseILM }} "aliases": { - "{{ .IndexPrefix }}jaeger-service-read": {} + "{{ .Prefix }}jaeger-service-read": {} }, {{- end }} "settings": { "index.number_of_shards": {{ .Shards }}, - "index.number_of_replicas": {{ .Replicas }}, + "index.number_of_replicas": {{ .Replicas }}, "index.mapping.nested_fields.limit": 50, "index.requests.cache.enable": true {{- if .UseILM }}, "lifecycle": { "name": "{{ .ILMPolicyName }}", - "rollover_alias": "{{ .IndexPrefix }}jaeger-service-write" + "rollover_alias": "{{ .Prefix }}jaeger-service-write" } {{- end }} }, diff --git a/plugin/storage/es/mappings/jaeger-span-7.json b/plugin/storage/es/mappings/jaeger-span-7.json index 3d8bbae95dc..1393b7a96b4 100644 --- a/plugin/storage/es/mappings/jaeger-span-7.json +++ b/plugin/storage/es/mappings/jaeger-span-7.json @@ -1,8 +1,8 @@ { - "index_patterns": "*{{ .IndexPrefix }}jaeger-span-*", + "index_patterns": "*{{ .Prefix }}jaeger-span-*", {{- if .UseILM }} "aliases": { - "{{ .IndexPrefix }}jaeger-span-read": {} + "{{ .Prefix }}jaeger-span-read": {} }, {{- end }} "settings":{ @@ -13,7 +13,7 @@ {{- if .UseILM }} ,"lifecycle": { "name": "{{ .ILMPolicyName }}", - "rollover_alias": "{{ .IndexPrefix }}jaeger-span-write" + "rollover_alias": "{{ .Prefix }}jaeger-span-write" } {{- end }} }, diff --git a/plugin/storage/es/mappings/jaeger-span-8.json b/plugin/storage/es/mappings/jaeger-span-8.json index 60bc5eaa910..7dec754e3e9 100644 --- a/plugin/storage/es/mappings/jaeger-span-8.json +++ b/plugin/storage/es/mappings/jaeger-span-8.json @@ -1,11 +1,11 @@ { - "priority": {{ .PrioritySpanTemplate}}, - "index_patterns": "{{ .IndexPrefix }}jaeger-span-*", + "priority": {{ .Priority }}, + "index_patterns": "{{ .Prefix }}jaeger-span-*", "template": { {{- if .UseILM}} "aliases": { - "{{ .IndexPrefix }}jaeger-span-read": {} + "{{ .Prefix }}jaeger-span-read": {} }, {{- end}} "settings": { @@ -16,7 +16,7 @@ {{- if .UseILM }}, "lifecycle": { "name": "{{ .ILMPolicyName }}", - "rollover_alias": "{{ .IndexPrefix }}jaeger-span-write" + "rollover_alias": "{{ .Prefix }}jaeger-span-write" } {{- end }} }, diff --git a/plugin/storage/es/mappings/mapping.go b/plugin/storage/es/mappings/mapping.go index 1962f85100d..93c8682224f 100644 --- a/plugin/storage/es/mappings/mapping.go +++ b/plugin/storage/es/mappings/mapping.go @@ -9,6 +9,7 @@ import ( "strings" "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/pkg/es/config" ) // MAPPINGS contains embedded index templates. @@ -16,29 +17,50 @@ import ( //go:embed *.json var MAPPINGS embed.FS -// MappingBuilder holds parameters required to render an elasticsearch index template +// MappingBuilder holds common parameters required to render an elasticsearch index template type MappingBuilder struct { - TemplateBuilder es.TemplateBuilder - Shards int64 - Replicas int64 - PrioritySpanTemplate int64 - PriorityServiceTemplate int64 - PriorityDependenciesTemplate int64 - PrioritySamplingTemplate int64 - EsVersion uint - IndexPrefix string - UseILM bool - ILMPolicyName string + TemplateBuilder es.TemplateBuilder + Indices config.Indices + EsVersion uint + UseILM bool + ILMPolicyName string +} + +// IndexTemplateOptions holds parameters required to render an elasticsearch index template +type IndexTemplateOptions struct { + UseILM bool + ILMPolicyName string + config.IndexOptions +} + +func (mb *MappingBuilder) getMappingTemplateOptions(mapping string) *IndexTemplateOptions { + mappingOpts := &IndexTemplateOptions{} + mappingOpts.UseILM = mb.UseILM + mappingOpts.ILMPolicyName = mb.ILMPolicyName + + switch { + case strings.Contains(mapping, "span"): + mappingOpts.IndexOptions = mb.Indices.Spans + case strings.Contains(mapping, "service"): + mappingOpts.IndexOptions = mb.Indices.Services + case strings.Contains(mapping, "dependencies"): + mappingOpts.IndexOptions = mb.Indices.Dependencies + case strings.Contains(mapping, "sampling"): + mappingOpts.IndexOptions = mb.Indices.Sampling + } + + return mappingOpts } // GetMapping returns the rendered mapping based on elasticsearch version func (mb *MappingBuilder) GetMapping(mapping string) (string, error) { + templateOpts := mb.getMappingTemplateOptions(mapping) if mb.EsVersion == 8 { - return mb.fixMapping(mapping + "-8.json") + return mb.fixMapping(mapping+"-8.json", templateOpts) } else if mb.EsVersion == 7 { - return mb.fixMapping(mapping + "-7.json") + return mb.fixMapping(mapping+"-7.json", templateOpts) } - return mb.fixMapping(mapping + "-6.json") + return mb.fixMapping(mapping+"-6.json", templateOpts) } // GetSpanServiceMappings returns span and service mappings @@ -69,17 +91,17 @@ func loadMapping(name string) string { return string(s) } -func (mb *MappingBuilder) fixMapping(mapping string) (string, error) { +func (mb *MappingBuilder) fixMapping(mapping string, options *IndexTemplateOptions) (string, error) { tmpl, err := mb.TemplateBuilder.Parse(loadMapping(mapping)) if err != nil { return "", err } writer := new(bytes.Buffer) - if mb.IndexPrefix != "" && !strings.HasSuffix(mb.IndexPrefix, "-") { - mb.IndexPrefix += "-" + if options.Prefix != "" && !strings.HasSuffix(options.Prefix, "-") { + options.Prefix += "-" } - if err := tmpl.Execute(writer, mb); err != nil { + if err := tmpl.Execute(writer, options); err != nil { return "", err } diff --git a/plugin/storage/es/mappings/mapping_test.go b/plugin/storage/es/mappings/mapping_test.go index 194a15b2c24..6ad66dc2c95 100644 --- a/plugin/storage/es/mappings/mapping_test.go +++ b/plugin/storage/es/mappings/mapping_test.go @@ -17,6 +17,7 @@ import ( "github.com/stretchr/testify/require" "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/pkg/es/mocks" "github.com/jaegertracing/jaeger/pkg/testutils" ) @@ -46,17 +47,30 @@ func TestMappingBuilderGetMapping(t *testing.T) { } for _, tt := range tests { t.Run(tt.mapping, func(t *testing.T) { + indexTemOps := config.IndexOptions{ + Shards: 3, + Replicas: 3, + Priority: 500, + Prefix: "test-", + } + serviceOps := indexTemOps + serviceOps.Priority = 501 + dependenciesOps := indexTemOps + dependenciesOps.Priority = 502 + samplingOps := indexTemOps + samplingOps.Priority = 503 + mb := &MappingBuilder{ - TemplateBuilder: es.TextTemplateBuilder{}, - Shards: 3, - Replicas: 3, - PrioritySpanTemplate: 500, - PriorityServiceTemplate: 501, - PriorityDependenciesTemplate: 502, - EsVersion: tt.esVersion, - IndexPrefix: "test-", - UseILM: true, - ILMPolicyName: "jaeger-test-policy", + TemplateBuilder: es.TextTemplateBuilder{}, + Indices: config.Indices{ + Spans: indexTemOps, + Services: serviceOps, + Dependencies: dependenciesOps, + Sampling: samplingOps, + }, + EsVersion: tt.esVersion, + UseILM: true, + ILMPolicyName: "jaeger-test-policy", } got, err := mb.GetMapping(tt.mapping) require.NoError(t, err) @@ -137,16 +151,25 @@ func TestMappingBuilderFixMapping(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + indexTemOps := config.IndexOptions{ + Shards: 3, + Replicas: 5, + Priority: 500, + Prefix: "test", + } mappingBuilder := MappingBuilder{ TemplateBuilder: test.templateBuilderMockFunc(), - Shards: 3, - Replicas: 5, - EsVersion: 7, - IndexPrefix: "test", - UseILM: true, - ILMPolicyName: "jaeger-test-policy", + Indices: config.Indices{ + Spans: indexTemOps, + Services: indexTemOps, + Dependencies: indexTemOps, + Sampling: indexTemOps, + }, + EsVersion: 7, + UseILM: true, + ILMPolicyName: "jaeger-test-policy", } - _, err := mappingBuilder.fixMapping("test") + _, err := mappingBuilder.fixMapping("test", mappingBuilder.getMappingTemplateOptions("test")) if test.err != "" { require.EqualError(t, err, test.err) } else { @@ -158,8 +181,6 @@ func TestMappingBuilderFixMapping(t *testing.T) { func TestMappingBuilderGetSpanServiceMappings(t *testing.T) { type args struct { - shards int64 - replicas int64 esVersion uint indexPrefix string useILM bool @@ -174,8 +195,6 @@ func TestMappingBuilderGetSpanServiceMappings(t *testing.T) { { name: "ES Version 7", args: args{ - shards: 3, - replicas: 3, esVersion: 7, indexPrefix: "test", useILM: true, @@ -193,8 +212,6 @@ func TestMappingBuilderGetSpanServiceMappings(t *testing.T) { { name: "ES Version 7 Service Error", args: args{ - shards: 3, - replicas: 3, esVersion: 7, indexPrefix: "test", useILM: true, @@ -214,8 +231,6 @@ func TestMappingBuilderGetSpanServiceMappings(t *testing.T) { { name: "ES Version < 7", args: args{ - shards: 3, - replicas: 3, esVersion: 6, indexPrefix: "test", useILM: true, @@ -233,8 +248,6 @@ func TestMappingBuilderGetSpanServiceMappings(t *testing.T) { { name: "ES Version < 7 Service Error", args: args{ - shards: 3, - replicas: 3, esVersion: 6, indexPrefix: "test", useILM: true, @@ -253,8 +266,6 @@ func TestMappingBuilderGetSpanServiceMappings(t *testing.T) { { name: "ES Version < 7 Span Error", args: args{ - shards: 3, - replicas: 3, esVersion: 6, indexPrefix: "test", useILM: true, @@ -272,8 +283,6 @@ func TestMappingBuilderGetSpanServiceMappings(t *testing.T) { { name: "ES Version 7 Span Error", args: args{ - shards: 3, - replicas: 3, esVersion: 7, indexPrefix: "test", useILM: true, @@ -291,14 +300,23 @@ func TestMappingBuilderGetSpanServiceMappings(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + indexTemOps := config.IndexOptions{ + Shards: 3, + Replicas: 3, + Prefix: test.args.indexPrefix, + } + mappingBuilder := MappingBuilder{ TemplateBuilder: test.mockNewTextTemplateBuilder(), - Shards: test.args.shards, - Replicas: test.args.replicas, - EsVersion: test.args.esVersion, - IndexPrefix: test.args.indexPrefix, - UseILM: test.args.useILM, - ILMPolicyName: test.args.ilmPolicyName, + Indices: config.Indices{ + Spans: indexTemOps, + Services: indexTemOps, + Dependencies: indexTemOps, + Sampling: indexTemOps, + }, + EsVersion: test.args.esVersion, + UseILM: test.args.useILM, + ILMPolicyName: test.args.ilmPolicyName, } _, _, err := mappingBuilder.GetSpanServiceMappings() if test.err != "" { diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 7aa3000405f..9afa5da6771 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -69,6 +69,14 @@ const ( defaultSendGetBodyAs = "" ) +var defaultIndexOptions = config.IndexOptions{ + DateLayout: initDateLayout(defaultIndexRolloverFrequency, defaultIndexDateSeparator), + RolloverFrequency: defaultIndexRolloverFrequency, + Shards: 5, + Replicas: 1, + Priority: 0, +} + // TODO this should be moved next to config.Configuration struct (maybe ./flags package) // Options contains various type of Elasticsearch configs and provides the ability @@ -157,30 +165,30 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixTimeout, nsConfig.Timeout, "Timeout used for queries. A Timeout of zero means no timeout") - flagSet.Int64( + flagSet.Int( nsConfig.namespace+suffixNumShards, - nsConfig.NumShards, + defaultIndexOptions.Shards, "The number of shards per index in Elasticsearch") flagSet.Duration( nsConfig.namespace+suffixServiceCacheTTL, nsConfig.ServiceCacheTTL, "The TTL for the cache of known service names", ) - flagSet.Int64( + flagSet.Int( nsConfig.namespace+suffixNumReplicas, - nsConfig.NumReplicas, + defaultIndexOptions.Replicas, "The number of replicas per index in Elasticsearch") - flagSet.Int64( + flagSet.Int( nsConfig.namespace+suffixPrioritySpanTemplate, - nsConfig.PrioritySpanTemplate, + defaultIndexOptions.Priority, "Priority of jaeger-span index template (ESv8 only)") - flagSet.Int64( + flagSet.Int( nsConfig.namespace+suffixPriorityServiceTemplate, - nsConfig.PriorityServiceTemplate, + defaultIndexOptions.Priority, "Priority of jaeger-service index template (ESv8 only)") - flagSet.Int64( + flagSet.Int( nsConfig.namespace+suffixPriorityDependenciesTemplate, - nsConfig.PriorityDependenciesTemplate, + defaultIndexOptions.Priority, "Priority of jaeger-dependecies index template (ESv8 only)") flagSet.Int( nsConfig.namespace+suffixBulkSize, @@ -311,11 +319,22 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.Servers = strings.Split(stripWhiteSpace(v.GetString(cfg.namespace+suffixServerURLs)), ",") cfg.MaxSpanAge = v.GetDuration(cfg.namespace + suffixMaxSpanAge) cfg.AdaptiveSamplingLookback = v.GetDuration(cfg.namespace + suffixAdaptiveSamplingLookback) - cfg.NumShards = v.GetInt64(cfg.namespace + suffixNumShards) - cfg.NumReplicas = v.GetInt64(cfg.namespace + suffixNumReplicas) - cfg.PrioritySpanTemplate = v.GetInt64(cfg.namespace + suffixPrioritySpanTemplate) - cfg.PriorityServiceTemplate = v.GetInt64(cfg.namespace + suffixPriorityServiceTemplate) - cfg.PriorityDependenciesTemplate = v.GetInt64(cfg.namespace + suffixPriorityDependenciesTemplate) + + cfg.Indices.Spans.Shards = v.GetInt(cfg.namespace + suffixNumShards) + cfg.Indices.Services.Shards = v.GetInt(cfg.namespace + suffixNumShards) + cfg.Indices.Sampling.Shards = v.GetInt(cfg.namespace + suffixNumShards) + cfg.Indices.Dependencies.Shards = v.GetInt(cfg.namespace + suffixNumShards) + + cfg.Indices.Spans.Replicas = v.GetInt(cfg.namespace + suffixNumReplicas) + cfg.Indices.Services.Replicas = v.GetInt(cfg.namespace + suffixNumReplicas) + cfg.Indices.Sampling.Replicas = v.GetInt(cfg.namespace + suffixNumReplicas) + cfg.Indices.Dependencies.Replicas = v.GetInt(cfg.namespace + suffixNumReplicas) + + cfg.Indices.Spans.Priority = v.GetInt(cfg.namespace + suffixPrioritySpanTemplate) + cfg.Indices.Services.Priority = v.GetInt(cfg.namespace + suffixPriorityServiceTemplate) + // cfg.Indices.Sampling does not have a separate flag + cfg.Indices.Dependencies.Priority = v.GetInt(cfg.namespace + suffixPriorityDependenciesTemplate) + cfg.BulkSize = v.GetInt(cfg.namespace + suffixBulkSize) cfg.BulkWorkers = v.GetInt(cfg.namespace + suffixBulkWorkers) cfg.BulkActions = v.GetInt(cfg.namespace + suffixBulkActions) @@ -323,6 +342,13 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout) cfg.ServiceCacheTTL = v.GetDuration(cfg.namespace + suffixServiceCacheTTL) cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix) + + // TODO cfg.Indices does not have a separate flag + cfg.Indices.Spans.Prefix = cfg.IndexPrefix + cfg.Indices.Services.Prefix = cfg.IndexPrefix + cfg.Indices.Sampling.Prefix = cfg.IndexPrefix + cfg.Indices.Dependencies.Prefix = cfg.IndexPrefix + cfg.Tags.AllAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll) cfg.Tags.Include = v.GetString(cfg.namespace + suffixTagsAsFieldsInclude) cfg.Tags.File = v.GetString(cfg.namespace + suffixTagsFile) @@ -345,17 +371,17 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.RemoteReadClusters = strings.Split(remoteReadClusters, ",") } - cfg.IndexRolloverFrequencySpans = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencySpans)) - cfg.IndexRolloverFrequencyServices = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencyServices)) - cfg.IndexRolloverFrequencySampling = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencySampling)) + cfg.Indices.Spans.RolloverFrequency = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencySpans)) + cfg.Indices.Services.RolloverFrequency = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencyServices)) + cfg.Indices.Sampling.RolloverFrequency = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencySampling)) separator := v.GetString(cfg.namespace + suffixIndexDateSeparator) - cfg.IndexDateLayoutSpans = initDateLayout(cfg.IndexRolloverFrequencySpans, separator) - cfg.IndexDateLayoutServices = initDateLayout(cfg.IndexRolloverFrequencyServices, separator) - cfg.IndexDateLayoutSampling = initDateLayout(cfg.IndexRolloverFrequencySampling, separator) + cfg.Indices.Spans.DateLayout = initDateLayout(cfg.Indices.Spans.RolloverFrequency, separator) + cfg.Indices.Services.DateLayout = initDateLayout(cfg.Indices.Services.RolloverFrequency, separator) + cfg.Indices.Sampling.DateLayout = initDateLayout(cfg.Indices.Sampling.RolloverFrequency, separator) - // Dependencies calculation should be daily, and this index size is very small - cfg.IndexDateLayoutDependencies = initDateLayout(defaultIndexRolloverFrequency, separator) + // Daily is recommended for dependencies calculation, and this index size is very small + cfg.Indices.Dependencies.DateLayout = initDateLayout(cfg.Indices.Dependencies.RolloverFrequency, separator) var err error cfg.TLS, err = cfg.getTLSFlagsConfig().InitFromViper(v) if err != nil { @@ -399,20 +425,15 @@ func initDateLayout(rolloverFreq, sep string) string { func DefaultConfig() config.Configuration { return config.Configuration{ - Username: "", - Password: "", - Sniffer: false, - MaxSpanAge: 72 * time.Hour, - AdaptiveSamplingLookback: 72 * time.Hour, - NumShards: 5, - NumReplicas: 1, - PrioritySpanTemplate: 0, - PriorityServiceTemplate: 0, - PriorityDependenciesTemplate: 0, - BulkSize: 5 * 1000 * 1000, - BulkWorkers: 1, - BulkActions: 1000, - BulkFlushInterval: time.Millisecond * 200, + Username: "", + Password: "", + Sniffer: false, + MaxSpanAge: 72 * time.Hour, + AdaptiveSamplingLookback: 72 * time.Hour, + BulkSize: 5 * 1000 * 1000, + BulkWorkers: 1, + BulkActions: 1000, + BulkFlushInterval: time.Millisecond * 200, Tags: config.TagsAsFields{ DotReplacement: "@", }, @@ -426,5 +447,11 @@ func DefaultConfig() config.Configuration { MaxDocCount: defaultMaxDocCount, LogLevel: "error", SendGetBodyAs: defaultSendGetBodyAs, + Indices: config.Indices{ + Spans: defaultIndexOptions, + Services: defaultIndexOptions, + Dependencies: defaultIndexOptions, + Sampling: defaultIndexOptions, + }, } } diff --git a/plugin/storage/es/options_test.go b/plugin/storage/es/options_test.go index 8e08c74a6f1..18d851e70ab 100644 --- a/plugin/storage/es/options_test.go +++ b/plugin/storage/es/options_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/jaegertracing/jaeger/pkg/config" + escfg "github.com/jaegertracing/jaeger/pkg/es/config" ) func TestOptions(t *testing.T) { @@ -22,8 +23,14 @@ func TestOptions(t *testing.T) { assert.Empty(t, primary.PasswordFilePath) assert.NotEmpty(t, primary.Servers) assert.Empty(t, primary.RemoteReadClusters) - assert.Equal(t, int64(5), primary.NumShards) - assert.Equal(t, int64(1), primary.NumReplicas) + assert.Equal(t, 5, primary.Indices.Spans.Shards) + assert.Equal(t, 5, primary.Indices.Services.Shards) + assert.Equal(t, 5, primary.Indices.Sampling.Shards) + assert.Equal(t, 5, primary.Indices.Dependencies.Shards) + assert.Equal(t, 1, primary.Indices.Spans.Replicas) + assert.Equal(t, 1, primary.Indices.Services.Replicas) + assert.Equal(t, 1, primary.Indices.Sampling.Replicas) + assert.Equal(t, 1, primary.Indices.Dependencies.Replicas) assert.Equal(t, 72*time.Hour, primary.MaxSpanAge) assert.False(t, primary.Sniffer) assert.False(t, primary.SnifferTLSEnabled) @@ -87,22 +94,28 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "!", primary.Tags.DotReplacement) assert.Equal(t, "./file.txt", primary.Tags.File) assert.Equal(t, "test,tags", primary.Tags.Include) - assert.Equal(t, "20060102", primary.IndexDateLayoutServices) - assert.Equal(t, "2006010215", primary.IndexDateLayoutSpans) + assert.Equal(t, "20060102", primary.Indices.Services.DateLayout) + assert.Equal(t, "2006010215", primary.Indices.Spans.DateLayout) aux := opts.Get("es.aux") assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Servers) assert.Equal(t, "hello", aux.Username) assert.Equal(t, "world", aux.Password) - assert.Equal(t, int64(5), aux.NumShards) - assert.Equal(t, int64(10), aux.NumReplicas) + assert.Equal(t, 5, aux.Indices.Spans.Shards) + assert.Equal(t, 5, aux.Indices.Services.Shards) + assert.Equal(t, 5, aux.Indices.Sampling.Shards) + assert.Equal(t, 5, aux.Indices.Dependencies.Shards) + assert.Equal(t, 10, aux.Indices.Spans.Replicas) + assert.Equal(t, 10, aux.Indices.Services.Replicas) + assert.Equal(t, 10, aux.Indices.Sampling.Replicas) + assert.Equal(t, 10, aux.Indices.Dependencies.Replicas) assert.Equal(t, 24*time.Hour, aux.MaxSpanAge) assert.True(t, aux.Sniffer) assert.True(t, aux.Tags.AllAsFields) assert.Equal(t, "@", aux.Tags.DotReplacement) assert.Equal(t, "./file.txt", aux.Tags.File) assert.Equal(t, "test,tags", aux.Tags.Include) - assert.Equal(t, "2006.01.02", aux.IndexDateLayoutServices) - assert.Equal(t, "2006.01.02.15", aux.IndexDateLayoutSpans) + assert.Equal(t, "2006.01.02", aux.Indices.Services.DateLayout) + assert.Equal(t, "2006.01.02.15", aux.Indices.Spans.DateLayout) assert.True(t, primary.UseILM) assert.Equal(t, "POST", aux.SendGetBodyAs) } @@ -171,7 +184,7 @@ func TestIndexDateSeparator(t *testing.T) { opts.InitFromViper(v) primary := opts.GetPrimary() - assert.Equal(t, tc.wantDateLayout, primary.IndexDateLayoutSpans) + assert.Equal(t, tc.wantDateLayout, primary.Indices.Spans.DateLayout) }) } } @@ -225,10 +238,10 @@ func TestIndexRollover(t *testing.T) { command.ParseFlags(tc.flags) opts.InitFromViper(v) primary := opts.GetPrimary() - assert.Equal(t, tc.wantSpanDateLayout, primary.IndexDateLayoutSpans) - assert.Equal(t, tc.wantServiceDateLayout, primary.IndexDateLayoutServices) - assert.Equal(t, tc.wantSpanIndexRolloverFrequency, primary.GetIndexRolloverFrequencySpansDuration()) - assert.Equal(t, tc.wantServiceIndexRolloverFrequency, primary.GetIndexRolloverFrequencyServicesDuration()) + assert.Equal(t, tc.wantSpanDateLayout, primary.Indices.Spans.DateLayout) + assert.Equal(t, tc.wantServiceDateLayout, primary.Indices.Services.DateLayout) + assert.Equal(t, tc.wantSpanIndexRolloverFrequency, escfg.RolloverFrequencyAsNegativeDuration(primary.Indices.Spans.RolloverFrequency)) + assert.Equal(t, tc.wantServiceIndexRolloverFrequency, escfg.RolloverFrequencyAsNegativeDuration(primary.Indices.Services.RolloverFrequency)) }) } } diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 2d856c19614..37f6563190e 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -20,14 +20,15 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es" + cfg "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" "github.com/jaegertracing/jaeger/storage/spanstore" ) const ( - spanIndex = "jaeger-span-" - serviceIndex = "jaeger-service-" + spanIndexBaseName = "jaeger-span-" + serviceIndexBaseName = "jaeger-service-" archiveIndexSuffix = "archive" archiveReadIndexSuffix = archiveIndexSuffix + "-read" archiveWriteIndexSuffix = archiveIndexSuffix + "-write" @@ -84,40 +85,33 @@ type SpanReader struct { client func() es.Client // The age of the oldest service/operation we will look for. Because indices in ElasticSearch are by day, // this will be rounded down to UTC 00:00 of that day. - maxSpanAge time.Duration - serviceOperationStorage *ServiceOperationStorage - spanIndexPrefix string - serviceIndexPrefix string - spanIndexDateLayout string - serviceIndexDateLayout string - spanIndexRolloverFrequency time.Duration - serviceIndexRolloverFrequency time.Duration - spanConverter dbmodel.ToDomain - timeRangeIndices timeRangeIndexFn - sourceFn sourceFn - maxDocCount int - useReadWriteAliases bool - logger *zap.Logger - tracer trace.Tracer + maxSpanAge time.Duration + serviceOperationStorage *ServiceOperationStorage + spanIndex cfg.IndexOptions + serviceIndex cfg.IndexOptions + spanConverter dbmodel.ToDomain + timeRangeIndices timeRangeIndexFn + sourceFn sourceFn + maxDocCount int + useReadWriteAliases bool + logger *zap.Logger + tracer trace.Tracer } // SpanReaderParams holds constructor params for NewSpanReader type SpanReaderParams struct { - Client func() es.Client - MaxSpanAge time.Duration - MaxDocCount int - IndexPrefix string - SpanIndexDateLayout string - ServiceIndexDateLayout string - SpanIndexRolloverFrequency time.Duration - ServiceIndexRolloverFrequency time.Duration - TagDotReplacement string - Archive bool - UseReadWriteAliases bool - RemoteReadClusters []string - MetricsFactory metrics.Factory - Logger *zap.Logger - Tracer trace.Tracer + Client func() es.Client + MaxSpanAge time.Duration + MaxDocCount int + SpanIndex cfg.IndexOptions + ServiceIndex cfg.IndexOptions + TagDotReplacement string + Archive bool + UseReadWriteAliases bool + RemoteReadClusters []string + MetricsFactory metrics.Factory + Logger *zap.Logger + Tracer trace.Tracer } // NewSpanReader returns a new SpanReader with a metrics. @@ -128,23 +122,30 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { if p.UseReadWriteAliases { maxSpanAge = rolloverMaxSpanAge } + + makeName := func(prefix, index string) string { + if prefix != "" { + return prefix + indexPrefixSeparator + index + } + return index + } + + p.SpanIndex.Prefix = makeName(p.SpanIndex.Prefix, spanIndexBaseName) + p.ServiceIndex.Prefix = makeName(p.ServiceIndex.Prefix, serviceIndexBaseName) + return &SpanReader{ - client: p.Client, - maxSpanAge: maxSpanAge, - serviceOperationStorage: NewServiceOperationStorage(p.Client, p.Logger, 0), // the decorator takes care of metrics - spanIndexPrefix: indexNames(p.IndexPrefix, spanIndex), - serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex), - spanIndexDateLayout: p.SpanIndexDateLayout, - serviceIndexDateLayout: p.ServiceIndexDateLayout, - spanIndexRolloverFrequency: p.SpanIndexRolloverFrequency, - serviceIndexRolloverFrequency: p.SpanIndexRolloverFrequency, - spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), - timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters), - sourceFn: getSourceFn(p.Archive, p.MaxDocCount), - maxDocCount: p.MaxDocCount, - useReadWriteAliases: p.UseReadWriteAliases, - logger: p.Logger, - tracer: p.Tracer, + client: p.Client, + maxSpanAge: maxSpanAge, + serviceOperationStorage: NewServiceOperationStorage(p.Client, p.Logger, 0), // the decorator takes care of metrics + spanIndex: p.SpanIndex, + serviceIndex: p.ServiceIndex, + spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), + timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters), + sourceFn: getSourceFn(p.Archive, p.MaxDocCount), + maxDocCount: p.MaxDocCount, + useReadWriteAliases: p.UseReadWriteAliases, + logger: p.Logger, + tracer: p.Tracer, } } @@ -221,13 +222,6 @@ func timeRangeIndices(indexName, indexDateLayout string, startTime time.Time, en return indices } -func indexNames(prefix, index string) string { - if prefix != "" { - return prefix + indexPrefixSeparator + index - } - return index -} - // GetTrace takes a traceID and returns a Trace associated with that traceID func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { ctx, span := s.tracer.Start(ctx, "GetTrace") @@ -278,7 +272,13 @@ func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) { ctx, span := s.tracer.Start(ctx, "GetService") defer span.End() currentTime := time.Now() - jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.serviceIndexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime, s.serviceIndexRolloverFrequency) + jaegerIndices := s.timeRangeIndices( + s.serviceIndex.Prefix, + s.serviceIndex.DateLayout, + currentTime.Add(-s.maxSpanAge), + currentTime, + cfg.RolloverFrequencyAsNegativeDuration(s.serviceIndex.RolloverFrequency), + ) return s.serviceOperationStorage.getServices(ctx, jaegerIndices, s.maxDocCount) } @@ -290,7 +290,13 @@ func (s *SpanReader) GetOperations( ctx, span := s.tracer.Start(ctx, "GetOperations") defer span.End() currentTime := time.Now() - jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.serviceIndexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime, s.serviceIndexRolloverFrequency) + jaegerIndices := s.timeRangeIndices( + s.serviceIndex.Prefix, + s.serviceIndex.DateLayout, + currentTime.Add(-s.maxSpanAge), + currentTime, + cfg.RolloverFrequencyAsNegativeDuration(s.serviceIndex.RolloverFrequency), + ) operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName, s.maxDocCount) if err != nil { return nil, err @@ -369,7 +375,13 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st // Add an hour in both directions so that traces that straddle two indexes are retrieved. // i.e starts in one and ends in another. - indices := s.timeRangeIndices(s.spanIndexPrefix, s.spanIndexDateLayout, startTime.Add(-time.Hour), endTime.Add(time.Hour), s.spanIndexRolloverFrequency) + indices := s.timeRangeIndices( + s.spanIndex.Prefix, + s.spanIndex.DateLayout, + startTime.Add(-time.Hour), + endTime.Add(time.Hour), + cfg.RolloverFrequencyAsNegativeDuration(s.spanIndex.RolloverFrequency), + ) nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour)) searchAfterTime := make(map[model.TraceID]uint64) totalDocumentsFetched := make(map[model.TraceID]int) @@ -561,7 +573,13 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra // } aggregation := s.buildTraceIDAggregation(traceQuery.NumTraces) boolQuery := s.buildFindTraceIDsQuery(traceQuery) - jaegerIndices := s.timeRangeIndices(s.spanIndexPrefix, s.spanIndexDateLayout, traceQuery.StartTimeMin, traceQuery.StartTimeMax, s.spanIndexRolloverFrequency) + jaegerIndices := s.timeRangeIndices( + s.spanIndex.Prefix, + s.spanIndex.DateLayout, + traceQuery.StartTimeMin, + traceQuery.StartTimeMax, + cfg.RolloverFrequencyAsNegativeDuration(s.spanIndex.RolloverFrequency), + ) searchService := s.client().Search(jaegerIndices...). Size(0). // set to 0 because we don't want actual documents. diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index c96c7858d8b..17f90144fcc 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -26,6 +26,7 @@ import ( "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/pkg/es/mocks" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" @@ -110,7 +111,6 @@ func withSpanReader(t *testing.T, fn func(r *spanReaderTest)) { Logger: zap.NewNop(), Tracer: tracer.Tracer("test"), MaxSpanAge: 0, - IndexPrefix: "", TagDotReplacement: "@", MaxDocCount: defaultMaxDocCount, }), @@ -133,7 +133,6 @@ func withArchiveSpanReader(t *testing.T, readAlias bool, fn func(r *spanReaderTe Logger: zap.NewNop(), Tracer: tracer.Tracer("test"), MaxSpanAge: 0, - IndexPrefix: "", TagDotReplacement: "@", Archive: true, UseReadWriteAliases: readAlias, @@ -179,111 +178,122 @@ func TestSpanReaderIndices(t *testing.T) { client := &mocks.Client{} clientFn := func() es.Client { return client } date := time.Date(2019, 10, 10, 5, 0, 0, 0, time.UTC) + spanDataLayout := "2006-01-02-15" serviceDataLayout := "2006-01-02" spanDataLayoutFormat := date.UTC().Format(spanDataLayout) serviceDataLayoutFormat := date.UTC().Format(serviceDataLayout) + metricsFactory := metricstest.NewFactory(0) logger, _ := testutils.NewLogger() tracer, _, closer := tracerProvider(t) defer closer() + spanIndexOpts := config.IndexOptions{DateLayout: spanDataLayout} + serviceIndexOpts := config.IndexOptions{DateLayout: serviceDataLayout} + + serviceIndexOptsWithFoo := serviceIndexOpts + serviceIndexOptsWithFoo.Prefix = "foo:" + + spanIndexOptsWithFoo := spanIndexOpts + spanIndexOptsWithFoo.Prefix = "foo:" + testCases := []struct { indices []string params SpanReaderParams }{ { params: SpanReaderParams{ - IndexPrefix: "", Archive: false, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, + Archive: false, SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, }, - indices: []string{spanIndex + spanDataLayoutFormat, serviceIndex + serviceDataLayoutFormat}, + indices: []string{spanIndexBaseName + spanDataLayoutFormat, serviceIndexBaseName + serviceDataLayoutFormat}, }, { params: SpanReaderParams{ - IndexPrefix: "", UseReadWriteAliases: true, + UseReadWriteAliases: true, }, - indices: []string{spanIndex + "read", serviceIndex + "read"}, + indices: []string{spanIndexBaseName + "read", serviceIndexBaseName + "read"}, }, { params: SpanReaderParams{ - IndexPrefix: "foo:", Archive: false, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, + Archive: false, SpanIndex: spanIndexOptsWithFoo, ServiceIndex: serviceIndexOptsWithFoo, }, - indices: []string{"foo:" + indexPrefixSeparator + spanIndex + spanDataLayoutFormat, "foo:" + indexPrefixSeparator + serviceIndex + serviceDataLayoutFormat}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndexBaseName + spanDataLayoutFormat, "foo:" + indexPrefixSeparator + serviceIndexBaseName + serviceDataLayoutFormat}, }, { params: SpanReaderParams{ - IndexPrefix: "foo:", UseReadWriteAliases: true, + SpanIndex: spanIndexOptsWithFoo, ServiceIndex: serviceIndexOptsWithFoo, UseReadWriteAliases: true, }, - indices: []string{"foo:-" + spanIndex + "read", "foo:-" + serviceIndex + "read"}, + indices: []string{"foo:-" + spanIndexBaseName + "read", "foo:-" + serviceIndexBaseName + "read"}, }, { params: SpanReaderParams{ - IndexPrefix: "", Archive: true, + Archive: true, }, - indices: []string{spanIndex + archiveIndexSuffix, serviceIndex + archiveIndexSuffix}, + indices: []string{spanIndexBaseName + archiveIndexSuffix, serviceIndexBaseName + archiveIndexSuffix}, }, { params: SpanReaderParams{ - IndexPrefix: "foo:", Archive: true, + SpanIndex: spanIndexOptsWithFoo, ServiceIndex: serviceIndexOptsWithFoo, Archive: true, }, - indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndex + archiveIndexSuffix}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndexBaseName + archiveIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndexBaseName + archiveIndexSuffix}, }, { params: SpanReaderParams{ - IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true, + SpanIndex: spanIndexOptsWithFoo, ServiceIndex: serviceIndexOptsWithFoo, Archive: true, UseReadWriteAliases: true, }, - indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndex + archiveReadIndexSuffix}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndexBaseName + archiveReadIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndexBaseName + archiveReadIndexSuffix}, }, { params: SpanReaderParams{ - IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ - spanIndex + spanDataLayoutFormat, - "cluster_one:" + spanIndex + spanDataLayoutFormat, - "cluster_two:" + spanIndex + spanDataLayoutFormat, - serviceIndex + serviceDataLayoutFormat, - "cluster_one:" + serviceIndex + serviceDataLayoutFormat, - "cluster_two:" + serviceIndex + serviceDataLayoutFormat, + spanIndexBaseName + spanDataLayoutFormat, + "cluster_one:" + spanIndexBaseName + spanDataLayoutFormat, + "cluster_two:" + spanIndexBaseName + spanDataLayoutFormat, + serviceIndexBaseName + serviceDataLayoutFormat, + "cluster_one:" + serviceIndexBaseName + serviceDataLayoutFormat, + "cluster_two:" + serviceIndexBaseName + serviceDataLayoutFormat, }, }, { params: SpanReaderParams{ - IndexPrefix: "", Archive: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, + Archive: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ - spanIndex + archiveIndexSuffix, - "cluster_one:" + spanIndex + archiveIndexSuffix, - "cluster_two:" + spanIndex + archiveIndexSuffix, - serviceIndex + archiveIndexSuffix, - "cluster_one:" + serviceIndex + archiveIndexSuffix, - "cluster_two:" + serviceIndex + archiveIndexSuffix, + spanIndexBaseName + archiveIndexSuffix, + "cluster_one:" + spanIndexBaseName + archiveIndexSuffix, + "cluster_two:" + spanIndexBaseName + archiveIndexSuffix, + serviceIndexBaseName + archiveIndexSuffix, + "cluster_one:" + serviceIndexBaseName + archiveIndexSuffix, + "cluster_two:" + serviceIndexBaseName + archiveIndexSuffix, }, }, { params: SpanReaderParams{ - IndexPrefix: "", Archive: false, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, + Archive: false, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ - spanIndex + "read", - "cluster_one:" + spanIndex + "read", - "cluster_two:" + spanIndex + "read", - serviceIndex + "read", - "cluster_one:" + serviceIndex + "read", - "cluster_two:" + serviceIndex + "read", + spanIndexBaseName + "read", + "cluster_one:" + spanIndexBaseName + "read", + "cluster_two:" + spanIndexBaseName + "read", + serviceIndexBaseName + "read", + "cluster_one:" + serviceIndexBaseName + "read", + "cluster_two:" + serviceIndexBaseName + "read", }, }, { params: SpanReaderParams{ - IndexPrefix: "", Archive: true, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, + Archive: true, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ - spanIndex + archiveReadIndexSuffix, - "cluster_one:" + spanIndex + archiveReadIndexSuffix, - "cluster_two:" + spanIndex + archiveReadIndexSuffix, - serviceIndex + archiveReadIndexSuffix, - "cluster_one:" + serviceIndex + archiveReadIndexSuffix, - "cluster_two:" + serviceIndex + archiveReadIndexSuffix, + spanIndexBaseName + archiveReadIndexSuffix, + "cluster_one:" + spanIndexBaseName + archiveReadIndexSuffix, + "cluster_two:" + spanIndexBaseName + archiveReadIndexSuffix, + serviceIndexBaseName + archiveReadIndexSuffix, + "cluster_one:" + serviceIndexBaseName + archiveReadIndexSuffix, + "cluster_two:" + serviceIndexBaseName + archiveReadIndexSuffix, }, }, } @@ -294,8 +304,8 @@ func TestSpanReaderIndices(t *testing.T) { testCase.params.Tracer = tracer.Tracer("test") r := NewSpanReader(testCase.params) - actualSpan := r.timeRangeIndices(r.spanIndexPrefix, r.spanIndexDateLayout, date, date, -1*time.Hour) - actualService := r.timeRangeIndices(r.serviceIndexPrefix, r.serviceIndexDateLayout, date, date, -24*time.Hour) + actualSpan := r.timeRangeIndices(r.spanIndex.Prefix, r.spanIndex.DateLayout, date, date, -1*time.Hour) + actualService := r.timeRangeIndices(r.serviceIndex.Prefix, r.serviceIndex.DateLayout, date, date, -24*time.Hour) assert.Equal(t, testCase.indices, append(actualSpan, actualService...)) } } @@ -574,30 +584,30 @@ func TestSpanReaderFindIndices(t *testing.T) { startTime: today.Add(-time.Millisecond), endTime: today, expected: []string{ - indexWithDate(spanIndex, dateLayout, today), + indexWithDate(spanIndexBaseName, dateLayout, today), }, }, { startTime: today.Add(-13 * time.Hour), endTime: today, expected: []string{ - indexWithDate(spanIndex, dateLayout, today), - indexWithDate(spanIndex, dateLayout, yesterday), + indexWithDate(spanIndexBaseName, dateLayout, today), + indexWithDate(spanIndexBaseName, dateLayout, yesterday), }, }, { startTime: today.Add(-48 * time.Hour), endTime: today, expected: []string{ - indexWithDate(spanIndex, dateLayout, today), - indexWithDate(spanIndex, dateLayout, yesterday), - indexWithDate(spanIndex, dateLayout, twoDaysAgo), + indexWithDate(spanIndexBaseName, dateLayout, today), + indexWithDate(spanIndexBaseName, dateLayout, yesterday), + indexWithDate(spanIndexBaseName, dateLayout, twoDaysAgo), }, }, } withSpanReader(t, func(r *spanReaderTest) { for _, testCase := range testCases { - actual := r.reader.timeRangeIndices(spanIndex, dateLayout, testCase.startTime, testCase.endTime, -24*time.Hour) + actual := r.reader.timeRangeIndices(spanIndexBaseName, dateLayout, testCase.startTime, testCase.endTime, -24*time.Hour) assert.EqualValues(t, testCase.expected, actual) } }) @@ -605,7 +615,7 @@ func TestSpanReaderFindIndices(t *testing.T) { func TestSpanReader_indexWithDate(t *testing.T) { withSpanReader(t, func(_ *spanReaderTest) { - actual := indexWithDate(spanIndex, "2006-01-02", time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC)) + actual := indexWithDate(spanIndexBaseName, "2006-01-02", time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC)) assert.Equal(t, "jaeger-span-1995-04-21", actual) }) } diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index 9a18677a258..74fd361171d 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -103,8 +103,8 @@ func getSpanAndServiceIndexFn(archive, useReadWriteAliases bool, prefix, spanDat if prefix != "" { prefix += indexPrefixSeparator } - spanIndexPrefix := prefix + spanIndex - serviceIndexPrefix := prefix + serviceIndex + spanIndexPrefix := prefix + spanIndexBaseName + serviceIndexPrefix := prefix + serviceIndexBaseName if archive { return func(_ time.Time) (string, string) { if useReadWriteAliases { diff --git a/plugin/storage/es/spanstore/writer_test.go b/plugin/storage/es/spanstore/writer_test.go index 33314c726ce..17749dd30bb 100644 --- a/plugin/storage/es/spanstore/writer_test.go +++ b/plugin/storage/es/spanstore/writer_test.go @@ -66,49 +66,49 @@ func TestSpanWriterIndices(t *testing.T) { Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: false, }, - indices: []string{spanIndex + spanDataLayoutFormat, serviceIndex + serviceDataLayoutFormat}, + indices: []string{spanIndexBaseName + spanDataLayoutFormat, serviceIndexBaseName + serviceDataLayoutFormat}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, UseReadWriteAliases: true, }, - indices: []string{spanIndex + "write", serviceIndex + "write"}, + indices: []string{spanIndexBaseName + "write", serviceIndexBaseName + "write"}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: false, }, - indices: []string{"foo:" + indexPrefixSeparator + spanIndex + spanDataLayoutFormat, "foo:" + indexPrefixSeparator + serviceIndex + serviceDataLayoutFormat}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndexBaseName + spanDataLayoutFormat, "foo:" + indexPrefixSeparator + serviceIndexBaseName + serviceDataLayoutFormat}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, UseReadWriteAliases: true, }, - indices: []string{"foo:-" + spanIndex + "write", "foo:-" + serviceIndex + "write"}, + indices: []string{"foo:-" + spanIndexBaseName + "write", "foo:-" + serviceIndexBaseName + "write"}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: true, }, - indices: []string{spanIndex + archiveIndexSuffix, ""}, + indices: []string{spanIndexBaseName + archiveIndexSuffix, ""}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: true, }, - indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix, ""}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndexBaseName + archiveIndexSuffix, ""}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: true, UseReadWriteAliases: true, }, - indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveWriteIndexSuffix, ""}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndexBaseName + archiveWriteIndexSuffix, ""}, }, } for _, testCase := range testCases { @@ -296,8 +296,8 @@ func TestSpanIndexName(t *testing.T) { span := &model.Span{ StartTime: date, } - spanIndexName := indexWithDate(spanIndex, "2006-01-02", span.StartTime) - serviceIndexName := indexWithDate(serviceIndex, "2006-01-02", span.StartTime) + spanIndexName := indexWithDate(spanIndexBaseName, "2006-01-02", span.StartTime) + serviceIndexName := indexWithDate(serviceIndexBaseName, "2006-01-02", span.StartTime) assert.Equal(t, "jaeger-span-1995-04-21", spanIndexName) assert.Equal(t, "jaeger-service-1995-04-21", serviceIndexName) } From 4ce1b974cd38bc4b4dbb445faf200344cb462168 Mon Sep 17 00:00:00 2001 From: Jared Tan Date: Tue, 3 Sep 2024 23:22:04 +0800 Subject: [PATCH 2/6] remove indexprefix Signed-off-by: Jared Tan --- cmd/jaeger/config-elasticsearch.yaml | 8 +++- pkg/es/config/config.go | 1 - plugin/storage/es/factory.go | 29 +++++++------- plugin/storage/es/options.go | 13 ++++--- plugin/storage/es/spanstore/writer.go | 44 ++++++++++++---------- plugin/storage/es/spanstore/writer_test.go | 27 +++++++++---- 6 files changed, 71 insertions(+), 51 deletions(-) diff --git a/cmd/jaeger/config-elasticsearch.yaml b/cmd/jaeger/config-elasticsearch.yaml index 308b7d11c5f..46a8f6a07b4 100644 --- a/cmd/jaeger/config-elasticsearch.yaml +++ b/cmd/jaeger/config-elasticsearch.yaml @@ -20,19 +20,21 @@ extensions: backends: some_storage: elasticsearch: - index_prefix: "jaeger-main" indices: spans: + index_prefix: "jaeger-main" date_layout: "2006-01-02" rollover_frequency: "day" shards: 5 replicas: 1 services: + index_prefix: "jaeger-main" date_layout: "2006-01-02" rollover_frequency: "day" shards: 5 replicas: 1 dependencies: + index_prefix: "jaeger-main" date_layout: "2006-01-02" rollover_frequency: "day" shards: 5 @@ -44,7 +46,9 @@ extensions: replicas: 1 another_storage: elasticsearch: - index_prefix: "jaeger-archive" + indices: + spans: + index_prefix: "jaeger-archive" receivers: otlp: diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 5a2b9a2d18a..03097da32aa 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -69,7 +69,6 @@ type Configuration struct { BulkWorkers int `mapstructure:"-"` BulkActions int `mapstructure:"-"` BulkFlushInterval time.Duration `mapstructure:"-"` - IndexPrefix string `mapstructure:"index_prefix"` Indices Indices `mapstructure:"indices"` ServiceCacheTTL time.Duration `mapstructure:"service_cache_ttl"` AdaptiveSamplingLookback time.Duration `mapstructure:"-"` diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index af35be59ef1..2ca216c065b 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -254,18 +254,17 @@ func createSpanWriter( } writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{ - Client: clientFn, - IndexPrefix: cfg.IndexPrefix, - SpanIndexDateLayout: cfg.Indices.Spans.DateLayout, - ServiceIndexDateLayout: cfg.Indices.Services.DateLayout, - AllTagsAsFields: cfg.Tags.AllAsFields, - TagKeysAsFields: tags, - TagDotReplacement: cfg.Tags.DotReplacement, - Archive: archive, - UseReadWriteAliases: cfg.UseReadWriteAliases, - Logger: logger, - MetricsFactory: mFactory, - ServiceCacheTTL: cfg.ServiceCacheTTL, + Client: clientFn, + SpanIndex: cfg.Indices.Spans, + ServiceIndex: cfg.Indices.Services, + AllTagsAsFields: cfg.Tags.AllAsFields, + TagKeysAsFields: tags, + TagDotReplacement: cfg.Tags.DotReplacement, + Archive: archive, + UseReadWriteAliases: cfg.UseReadWriteAliases, + Logger: logger, + MetricsFactory: mFactory, + ServiceCacheTTL: cfg.ServiceCacheTTL, }) // Creating a template here would conflict with the one created for ILM resulting to no index rollover @@ -275,7 +274,7 @@ func createSpanWriter( if err != nil { return nil, err } - if err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.IndexPrefix); err != nil { + if err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.Indices.Spans.Prefix); err != nil { return nil, err } } @@ -286,7 +285,7 @@ func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store params := esSampleStore.Params{ Client: f.getPrimaryClient, Logger: f.logger, - IndexPrefix: f.primaryConfig.IndexPrefix, + IndexPrefix: f.primaryConfig.Indices.Sampling.Prefix, IndexDateLayout: f.primaryConfig.Indices.Sampling.DateLayout, IndexRolloverFrequency: config.RolloverFrequencyAsNegativeDuration(f.primaryConfig.Indices.Sampling.RolloverFrequency), Lookback: f.primaryConfig.AdaptiveSamplingLookback, @@ -325,7 +324,7 @@ func createDependencyReader( reader := esDepStore.NewDependencyStore(esDepStore.Params{ Client: clientFn, Logger: logger, - IndexPrefix: cfg.IndexPrefix, + IndexPrefix: cfg.Indices.Dependencies.Prefix, IndexDateLayout: cfg.Indices.Dependencies.RolloverFrequency, MaxDocCount: cfg.MaxDocCount, UseReadWriteAliases: cfg.UseReadWriteAliases, diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 9afa5da6771..854c190e017 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -67,6 +67,7 @@ const ( defaultIndexRolloverFrequency = "day" defaultSendGetBodyAs = "" + defaultIndexPrefix = "" ) var defaultIndexOptions = config.IndexOptions{ @@ -208,7 +209,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { "A time.Duration after which bulk requests are committed, regardless of other thresholds. Set to zero to disable. By default, this is disabled.") flagSet.String( nsConfig.namespace+suffixIndexPrefix, - nsConfig.IndexPrefix, + defaultIndexPrefix, "Optional prefix of Jaeger indices. For example \"production\" creates \"production-jaeger-*\".") flagSet.String( nsConfig.namespace+suffixIndexDateSeparator, @@ -341,13 +342,13 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval) cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout) cfg.ServiceCacheTTL = v.GetDuration(cfg.namespace + suffixServiceCacheTTL) - cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix) + indexPrefix := v.GetString(cfg.namespace + suffixIndexPrefix) // TODO cfg.Indices does not have a separate flag - cfg.Indices.Spans.Prefix = cfg.IndexPrefix - cfg.Indices.Services.Prefix = cfg.IndexPrefix - cfg.Indices.Sampling.Prefix = cfg.IndexPrefix - cfg.Indices.Dependencies.Prefix = cfg.IndexPrefix + cfg.Indices.Spans.Prefix = indexPrefix + cfg.Indices.Services.Prefix = indexPrefix + cfg.Indices.Sampling.Prefix = indexPrefix + cfg.Indices.Dependencies.Prefix = indexPrefix cfg.Tags.AllAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll) cfg.Tags.Include = v.GetString(cfg.namespace + suffixTagsAsFieldsInclude) diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index 74fd361171d..036a4ad9ff9 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -15,6 +15,7 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/cache" "github.com/jaegertracing/jaeger/pkg/es" + cfg "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics" @@ -46,18 +47,20 @@ type SpanWriter struct { // SpanWriterParams holds constructor parameters for NewSpanWriter type SpanWriterParams struct { - Client func() es.Client - Logger *zap.Logger - MetricsFactory metrics.Factory - IndexPrefix string - SpanIndexDateLayout string - ServiceIndexDateLayout string - AllTagsAsFields bool - TagKeysAsFields []string - TagDotReplacement string - Archive bool - UseReadWriteAliases bool - ServiceCacheTTL time.Duration + Client func() es.Client + Logger *zap.Logger + MetricsFactory metrics.Factory + SpanIndex cfg.IndexOptions + ServiceIndex cfg.IndexOptions + // IndexPrefix string + // SpanIndexDateLayout string + // ServiceIndexDateLayout string + AllTagsAsFields bool + TagKeysAsFields []string + TagDotReplacement string + Archive bool + UseReadWriteAliases bool + ServiceCacheTTL time.Duration } // NewSpanWriter creates a new SpanWriter for use @@ -76,7 +79,7 @@ func NewSpanWriter(p SpanWriterParams) *SpanWriter { }, serviceWriter: serviceOperationStorage.Write, spanConverter: dbmodel.NewFromDomain(p.AllTagsAsFields, p.TagKeysAsFields, p.TagDotReplacement), - spanServiceIndex: getSpanAndServiceIndexFn(p.Archive, p.UseReadWriteAliases, p.IndexPrefix, p.SpanIndexDateLayout, p.ServiceIndexDateLayout), + spanServiceIndex: getSpanAndServiceIndexFn(p.Archive, p.UseReadWriteAliases, p.SpanIndex, p.ServiceIndex), } } @@ -99,12 +102,15 @@ func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate, indexPrefix // spanAndServiceIndexFn returns names of span and service indices type spanAndServiceIndexFn func(spanTime time.Time) (string, string) -func getSpanAndServiceIndexFn(archive, useReadWriteAliases bool, prefix, spanDateLayout string, serviceDateLayout string) spanAndServiceIndexFn { - if prefix != "" { - prefix += indexPrefixSeparator +func getSpanAndServiceIndexFn(archive, useReadWriteAliases bool, spanIndexOpts, serviceIndexOpts cfg.IndexOptions) spanAndServiceIndexFn { + if spanIndexOpts.Prefix != "" { + spanIndexOpts.Prefix += indexPrefixSeparator } - spanIndexPrefix := prefix + spanIndexBaseName - serviceIndexPrefix := prefix + serviceIndexBaseName + if serviceIndexOpts.Prefix != "" { + serviceIndexOpts.Prefix += indexPrefixSeparator + } + spanIndexPrefix := spanIndexOpts.Prefix + spanIndexBaseName + serviceIndexPrefix := serviceIndexOpts.Prefix + serviceIndexBaseName if archive { return func(_ time.Time) (string, string) { if useReadWriteAliases { @@ -120,7 +126,7 @@ func getSpanAndServiceIndexFn(archive, useReadWriteAliases bool, prefix, spanDat } } return func(date time.Time) (string, string) { - return indexWithDate(spanIndexPrefix, spanDateLayout, date), indexWithDate(serviceIndexPrefix, serviceDateLayout, date) + return indexWithDate(spanIndexPrefix, spanIndexOpts.DateLayout, date), indexWithDate(serviceIndexPrefix, serviceIndexOpts.DateLayout, date) } } diff --git a/plugin/storage/es/spanstore/writer_test.go b/plugin/storage/es/spanstore/writer_test.go index 17749dd30bb..aaaec3c0bf0 100644 --- a/plugin/storage/es/spanstore/writer_test.go +++ b/plugin/storage/es/spanstore/writer_test.go @@ -19,6 +19,7 @@ import ( "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/pkg/es/mocks" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" @@ -40,7 +41,7 @@ func withSpanWriter(fn func(w *spanWriterTest)) { client: client, logger: logger, logBuffer: logBuffer, - writer: NewSpanWriter(SpanWriterParams{Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, SpanIndexDateLayout: "2006-01-02", ServiceIndexDateLayout: "2006-01-02"}), + writer: NewSpanWriter(SpanWriterParams{Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, SpanIndex: config.IndexOptions{DateLayout: "2006-01-02"}, ServiceIndex: config.IndexOptions{DateLayout: "2006-01-02"}}), } fn(w) } @@ -57,6 +58,16 @@ func TestSpanWriterIndices(t *testing.T) { serviceDataLayout := "2006-01-02" spanDataLayoutFormat := date.UTC().Format(spanDataLayout) serviceDataLayoutFormat := date.UTC().Format(serviceDataLayout) + + spanIndexOpts := config.IndexOptions{DateLayout: spanDataLayout} + serviceIndexOpts := config.IndexOptions{DateLayout: serviceDataLayout} + + serviceIndexOptsWithFoo := serviceIndexOpts + serviceIndexOptsWithFoo.Prefix = "foo:" + + spanIndexOptsWithFoo := spanIndexOpts + spanIndexOptsWithFoo.Prefix = "foo:" + testCases := []struct { indices []string params SpanWriterParams @@ -64,49 +75,49 @@ func TestSpanWriterIndices(t *testing.T) { { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: false, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, Archive: false, }, indices: []string{spanIndexBaseName + spanDataLayoutFormat, serviceIndexBaseName + serviceDataLayoutFormat}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, UseReadWriteAliases: true, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, UseReadWriteAliases: true, }, indices: []string{spanIndexBaseName + "write", serviceIndexBaseName + "write"}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: false, + SpanIndex: spanIndexOptsWithFoo, ServiceIndex: serviceIndexOptsWithFoo, Archive: false, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndexBaseName + spanDataLayoutFormat, "foo:" + indexPrefixSeparator + serviceIndexBaseName + serviceDataLayoutFormat}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, UseReadWriteAliases: true, + SpanIndex: spanIndexOptsWithFoo, ServiceIndex: serviceIndexOptsWithFoo, UseReadWriteAliases: true, }, indices: []string{"foo:-" + spanIndexBaseName + "write", "foo:-" + serviceIndexBaseName + "write"}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: true, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, Archive: true, }, indices: []string{spanIndexBaseName + archiveIndexSuffix, ""}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: true, + SpanIndex: spanIndexOptsWithFoo, ServiceIndex: serviceIndexOptsWithFoo, Archive: true, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndexBaseName + archiveIndexSuffix, ""}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: true, UseReadWriteAliases: true, + SpanIndex: spanIndexOptsWithFoo, ServiceIndex: serviceIndexOptsWithFoo, Archive: true, UseReadWriteAliases: true, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndexBaseName + archiveWriteIndexSuffix, ""}, }, From cf818b8038526a660f8ce79c10223937515f5f94 Mon Sep 17 00:00:00 2001 From: Jared Tan Date: Sat, 7 Sep 2024 12:41:49 +0800 Subject: [PATCH 3/6] fix prefix Signed-off-by: Jared Tan --- cmd/jaeger/config-opensearch.yaml | 29 +++++++++++++++++-- .../es/mappings/jaeger-sampling-7.json | 2 +- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/cmd/jaeger/config-opensearch.yaml b/cmd/jaeger/config-opensearch.yaml index 19be6e941c1..cbe370bfc8f 100644 --- a/cmd/jaeger/config-opensearch.yaml +++ b/cmd/jaeger/config-opensearch.yaml @@ -20,11 +20,34 @@ extensions: backends: some_storage: opensearch: - index_prefix: "jaeger-main" - + indices: + spans: + prefix: "jaeger-main" + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + services: + prefix: "jaeger-main" + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + dependencies: + prefix: "jaeger-main" + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 + sampling: + date_layout: "2006-01-02" + rollover_frequency: "day" + shards: 5 + replicas: 1 another_storage: opensearch: - index_prefix: "jaeger-main" + spans: + prefix: "jaeger-archive" receivers: otlp: diff --git a/plugin/storage/es/mappings/jaeger-sampling-7.json b/plugin/storage/es/mappings/jaeger-sampling-7.json index 9092cfa885d..e9db50f83b2 100644 --- a/plugin/storage/es/mappings/jaeger-sampling-7.json +++ b/plugin/storage/es/mappings/jaeger-sampling-7.json @@ -2,7 +2,7 @@ "index_patterns": "*jaeger-sampling-*", {{- if .UseILM }} "aliases": { - "{{ .IndexPrefix }}jaeger-sampling-read" : {} + "{{ .Prefix }}jaeger-sampling-read" : {} }, {{- end }} "settings":{ From 890daa3aa7286bd12f9e2ee5a63b639a09579547 Mon Sep 17 00:00:00 2001 From: Jared Tan Date: Sun, 8 Sep 2024 14:58:19 +0800 Subject: [PATCH 4/6] fix prefix Signed-off-by: Jared Tan --- cmd/es-rollover/app/index_options.go | 14 ++++++++------ cmd/es-rollover/app/index_options_test.go | 11 ++++++++++- cmd/es-rollover/app/init/action.go | 10 +++++++++- cmd/esmapping-generator/app/renderer/render.go | 2 +- 4 files changed, 28 insertions(+), 9 deletions(-) diff --git a/cmd/es-rollover/app/index_options.go b/cmd/es-rollover/app/index_options.go index 9acb4aab752..26b0de9d906 100644 --- a/cmd/es-rollover/app/index_options.go +++ b/cmd/es-rollover/app/index_options.go @@ -6,6 +6,8 @@ package app import ( "fmt" "strings" + + cfg "github.com/jaegertracing/jaeger/pkg/es/config" ) const ( @@ -22,11 +24,11 @@ type IndexOption struct { } // RolloverIndices return an array of indices to rollover -func RolloverIndices(archive bool, skipDependencies bool, adaptiveSampling bool, prefix string) []IndexOption { +func RolloverIndices(archive bool, skipDependencies bool, adaptiveSampling bool, indices cfg.Indices) []IndexOption { if archive { return []IndexOption{ { - prefix: prefix, + prefix: indices.Spans.Prefix, indexType: "jaeger-span-archive", Mapping: "jaeger-span", }, @@ -35,12 +37,12 @@ func RolloverIndices(archive bool, skipDependencies bool, adaptiveSampling bool, indexOptions := []IndexOption{ { - prefix: prefix, + prefix: indices.Spans.Prefix, Mapping: "jaeger-span", indexType: "jaeger-span", }, { - prefix: prefix, + prefix: indices.Services.Prefix, Mapping: "jaeger-service", indexType: "jaeger-service", }, @@ -48,7 +50,7 @@ func RolloverIndices(archive bool, skipDependencies bool, adaptiveSampling bool, if !skipDependencies { indexOptions = append(indexOptions, IndexOption{ - prefix: prefix, + prefix: indices.Dependencies.Prefix, Mapping: "jaeger-dependencies", indexType: "jaeger-dependencies", }) @@ -56,7 +58,7 @@ func RolloverIndices(archive bool, skipDependencies bool, adaptiveSampling bool, if adaptiveSampling { indexOptions = append(indexOptions, IndexOption{ - prefix: prefix, + prefix: indices.Sampling.Prefix, Mapping: "jaeger-sampling", indexType: "jaeger-sampling", }) diff --git a/cmd/es-rollover/app/index_options_test.go b/cmd/es-rollover/app/index_options_test.go index 0d8c9caf327..66c0fae1aaa 100644 --- a/cmd/es-rollover/app/index_options_test.go +++ b/cmd/es-rollover/app/index_options_test.go @@ -7,6 +7,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + + cfg "github.com/jaegertracing/jaeger/pkg/es/config" ) func TestRolloverIndices(t *testing.T) { @@ -171,7 +173,14 @@ func TestRolloverIndices(t *testing.T) { if test.prefix != "" { test.prefix += "-" } - result := RolloverIndices(test.archive, test.skipDependencies, test.adaptiveSampling, test.prefix) + opts := cfg.IndexOptions{Prefix: test.prefix} + indices := cfg.Indices{ + Spans: opts, + Services: opts, + Dependencies: opts, + Sampling: opts, + } + result := RolloverIndices(test.archive, test.skipDependencies, test.adaptiveSampling, indices) assert.Equal(t, len(test.expected), len(result)) for i, r := range result { assert.Equal(t, test.expected[i].templateName, r.TemplateName()) diff --git a/cmd/es-rollover/app/init/action.go b/cmd/es-rollover/app/init/action.go index 73e04487f15..7e572fb059b 100644 --- a/cmd/es-rollover/app/init/action.go +++ b/cmd/es-rollover/app/init/action.go @@ -35,6 +35,14 @@ func (c Action) getMapping(version uint, templateName string) (string, error) { ILMPolicyName: c.Config.ILMPolicyName, EsVersion: version, } + + if c.Config.IndexPrefix != "" { + mappingBuilder.Indices.Spans.Prefix = c.Config.IndexPrefix + mappingBuilder.Indices.Services.Prefix = c.Config.IndexPrefix + mappingBuilder.Indices.Dependencies.Prefix = c.Config.IndexPrefix + mappingBuilder.Indices.Sampling.Prefix = c.Config.IndexPrefix + } + return mappingBuilder.GetMapping(templateName) } @@ -56,7 +64,7 @@ func (c Action) Do() error { return fmt.Errorf("ILM policy %s doesn't exist in Elasticsearch. Please create it and re-run init", c.Config.ILMPolicyName) } } - rolloverIndices := app.RolloverIndices(c.Config.Archive, c.Config.SkipDependencies, c.Config.AdaptiveSampling, c.Config.IndexPrefix) + rolloverIndices := app.RolloverIndices(c.Config.Archive, c.Config.SkipDependencies, c.Config.AdaptiveSampling, c.Config.Indices) for _, indexName := range rolloverIndices { if err := c.init(version, indexName); err != nil { return err diff --git a/cmd/esmapping-generator/app/renderer/render.go b/cmd/esmapping-generator/app/renderer/render.go index 15cd0534f01..67d4a2ff834 100644 --- a/cmd/esmapping-generator/app/renderer/render.go +++ b/cmd/esmapping-generator/app/renderer/render.go @@ -27,7 +27,7 @@ func GetMappingAsString(builder es.TemplateBuilder, opt *app.Options) (string, e } indexOpts := cfg.IndexOptions{ - Priority: 0, + Prefix: opt.IndexPrefix, Shards: opt.Shards, Replicas: opt.Shards, } From f99515eb99a3d8b745287b262209e20ed6124092 Mon Sep 17 00:00:00 2001 From: Jared Tan Date: Sun, 8 Sep 2024 15:41:35 +0800 Subject: [PATCH 5/6] fix prefix Signed-off-by: Jared Tan --- cmd/es-rollover/app/index_options.go | 14 ++++++-------- cmd/es-rollover/app/index_options_test.go | 11 +---------- cmd/es-rollover/app/init/action.go | 13 ++++++------- 3 files changed, 13 insertions(+), 25 deletions(-) diff --git a/cmd/es-rollover/app/index_options.go b/cmd/es-rollover/app/index_options.go index 26b0de9d906..9acb4aab752 100644 --- a/cmd/es-rollover/app/index_options.go +++ b/cmd/es-rollover/app/index_options.go @@ -6,8 +6,6 @@ package app import ( "fmt" "strings" - - cfg "github.com/jaegertracing/jaeger/pkg/es/config" ) const ( @@ -24,11 +22,11 @@ type IndexOption struct { } // RolloverIndices return an array of indices to rollover -func RolloverIndices(archive bool, skipDependencies bool, adaptiveSampling bool, indices cfg.Indices) []IndexOption { +func RolloverIndices(archive bool, skipDependencies bool, adaptiveSampling bool, prefix string) []IndexOption { if archive { return []IndexOption{ { - prefix: indices.Spans.Prefix, + prefix: prefix, indexType: "jaeger-span-archive", Mapping: "jaeger-span", }, @@ -37,12 +35,12 @@ func RolloverIndices(archive bool, skipDependencies bool, adaptiveSampling bool, indexOptions := []IndexOption{ { - prefix: indices.Spans.Prefix, + prefix: prefix, Mapping: "jaeger-span", indexType: "jaeger-span", }, { - prefix: indices.Services.Prefix, + prefix: prefix, Mapping: "jaeger-service", indexType: "jaeger-service", }, @@ -50,7 +48,7 @@ func RolloverIndices(archive bool, skipDependencies bool, adaptiveSampling bool, if !skipDependencies { indexOptions = append(indexOptions, IndexOption{ - prefix: indices.Dependencies.Prefix, + prefix: prefix, Mapping: "jaeger-dependencies", indexType: "jaeger-dependencies", }) @@ -58,7 +56,7 @@ func RolloverIndices(archive bool, skipDependencies bool, adaptiveSampling bool, if adaptiveSampling { indexOptions = append(indexOptions, IndexOption{ - prefix: indices.Sampling.Prefix, + prefix: prefix, Mapping: "jaeger-sampling", indexType: "jaeger-sampling", }) diff --git a/cmd/es-rollover/app/index_options_test.go b/cmd/es-rollover/app/index_options_test.go index 66c0fae1aaa..0d8c9caf327 100644 --- a/cmd/es-rollover/app/index_options_test.go +++ b/cmd/es-rollover/app/index_options_test.go @@ -7,8 +7,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - - cfg "github.com/jaegertracing/jaeger/pkg/es/config" ) func TestRolloverIndices(t *testing.T) { @@ -173,14 +171,7 @@ func TestRolloverIndices(t *testing.T) { if test.prefix != "" { test.prefix += "-" } - opts := cfg.IndexOptions{Prefix: test.prefix} - indices := cfg.Indices{ - Spans: opts, - Services: opts, - Dependencies: opts, - Sampling: opts, - } - result := RolloverIndices(test.archive, test.skipDependencies, test.adaptiveSampling, indices) + result := RolloverIndices(test.archive, test.skipDependencies, test.adaptiveSampling, test.prefix) assert.Equal(t, len(test.expected), len(result)) for i, r := range result { assert.Equal(t, test.expected[i].templateName, r.TemplateName()) diff --git a/cmd/es-rollover/app/init/action.go b/cmd/es-rollover/app/init/action.go index 7e572fb059b..b57dc6be16d 100644 --- a/cmd/es-rollover/app/init/action.go +++ b/cmd/es-rollover/app/init/action.go @@ -36,12 +36,11 @@ func (c Action) getMapping(version uint, templateName string) (string, error) { EsVersion: version, } - if c.Config.IndexPrefix != "" { - mappingBuilder.Indices.Spans.Prefix = c.Config.IndexPrefix - mappingBuilder.Indices.Services.Prefix = c.Config.IndexPrefix - mappingBuilder.Indices.Dependencies.Prefix = c.Config.IndexPrefix - mappingBuilder.Indices.Sampling.Prefix = c.Config.IndexPrefix - } + // TODO: does not have a separate flag for Indices + mappingBuilder.Indices.Spans.Prefix = c.Config.IndexPrefix + mappingBuilder.Indices.Services.Prefix = c.Config.IndexPrefix + mappingBuilder.Indices.Dependencies.Prefix = c.Config.IndexPrefix + mappingBuilder.Indices.Sampling.Prefix = c.Config.IndexPrefix return mappingBuilder.GetMapping(templateName) } @@ -64,7 +63,7 @@ func (c Action) Do() error { return fmt.Errorf("ILM policy %s doesn't exist in Elasticsearch. Please create it and re-run init", c.Config.ILMPolicyName) } } - rolloverIndices := app.RolloverIndices(c.Config.Archive, c.Config.SkipDependencies, c.Config.AdaptiveSampling, c.Config.Indices) + rolloverIndices := app.RolloverIndices(c.Config.Archive, c.Config.SkipDependencies, c.Config.AdaptiveSampling, c.Config.IndexPrefix) for _, indexName := range rolloverIndices { if err := c.init(version, indexName); err != nil { return err From b9cebd6076dc2ca8069183711f57a45d0dabead4 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sun, 8 Sep 2024 12:49:23 -0400 Subject: [PATCH 6/6] fix-opensearch-config Signed-off-by: Yuri Shkuro --- cmd/jaeger/config-opensearch.yaml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/jaeger/config-opensearch.yaml b/cmd/jaeger/config-opensearch.yaml index cbe370bfc8f..a3c86f36edd 100644 --- a/cmd/jaeger/config-opensearch.yaml +++ b/cmd/jaeger/config-opensearch.yaml @@ -46,8 +46,9 @@ extensions: replicas: 1 another_storage: opensearch: - spans: - prefix: "jaeger-archive" + indices: + spans: + prefix: "jaeger-archive" receivers: otlp: