Skip to content

Commit

Permalink
Added integration test and minor changes
Browse files Browse the repository at this point in the history
Signed-off-by: Alok Kumar Singh <dev.alok.singh123@gmail.com>
  • Loading branch information
akstron committed Nov 9, 2024
1 parent 32485af commit ed3e172
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 45 deletions.
28 changes: 26 additions & 2 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,13 @@ type BasicAuthenticator struct {
func DefaultConfiguration() Configuration {
return Configuration{
Schema: Schema{
Keyspace: "jaeger_v1_test",
Keyspace: "jaeger_v1_dc1",
Datacenter: "test",
TraceTTL: 172800,
DependenciesTTL: 172800,
ReplicationFactor: 1,
CasVersion: 4,
CompactionWindow: "1m",
CompactionWindow: "",
},
Connection: Connection{
Servers: []string{"127.0.0.1"},
Expand Down Expand Up @@ -152,8 +152,32 @@ type SessionBuilder interface {
NewSession() (cassandra.Session, error)
}

func (c *Configuration) newSessionPrerequisites() error {
cluster, err := c.NewCluster()
if err != nil {
return err
}

cluster.Keyspace = ""

session, err := cluster.CreateSession()
if err != nil {
return err
}

wSession := gocqlw.WrapCQLSession(session)
defer wSession.Close()

return GenerateSchemaIfNotPresent(wSession, &c.Schema)
}

// NewSession creates a new Cassandra session
func (c *Configuration) NewSession() (cassandra.Session, error) {
err := c.newSessionPrerequisites()
if err != nil {
return nil, err
}

cluster, err := c.NewCluster()
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package schema
package config

import (
"bytes"
"embed"
"fmt"
"regexp"
"strconv"
"text/template"

"github.com/jaegertracing/jaeger/pkg/cassandra"
"github.com/jaegertracing/jaeger/pkg/cassandra/config"
)

//go:embed v004-go-tmpl.cql.tmpl
//go:embed v004-go-tmpl-test.cql.tmpl
var schemaFile embed.FS

type TemplateParams struct {
config.Schema
Schema
// Replication is the replication strategy used
Replication string `mapstructure:"replication" valid:"optional"`
// CompactionWindowSize is the numberical part of CompactionWindow. Extracted from CompactionWindow
Expand All @@ -31,8 +29,8 @@ type TemplateParams struct {

func DefaultParams() TemplateParams {
return TemplateParams{
Schema: config.Schema{
Keyspace: "jaeger_v2_test",
Schema: Schema{
Keyspace: "jaeger_v1_dc1",
Datacenter: "test",
TraceTTL: 172800,
DependenciesTTL: 0,
Expand Down Expand Up @@ -67,30 +65,31 @@ func applyDefaults(params *TemplateParams) {
}

// Applies defaults for the configs and contructs other optional parameters from it
func constructTemplateParams(cfg config.Schema) (TemplateParams, error) {
func constructTemplateParams(cfg Schema) (TemplateParams, error) {
params := TemplateParams{
Schema: config.Schema{},
Schema: Schema{},
}
applyDefaults(&params)

params.Replication = fmt.Sprintf("{'class': 'NetworkTopologyStrategy', '%s': '%v' }", params.Datacenter, params.ReplicationFactor)
params.Replication = fmt.Sprintf("{'class': 'NetworkTopologyStrategy', 'replication_factor': '%v' }", params.ReplicationFactor)

if cfg.CompactionWindow != "" {
isMatch, err := regexp.MatchString("^[0-9]+[mhd]$", params.CompactionWindow)
if err != nil {
return TemplateParams{}, err
}
var err error
// isMatch, err := regexp.MatchString("[0-9]+[mhd]", params.CompactionWindow)
// if err != nil {
// return TemplateParams{}, err
// }

if !isMatch {
return TemplateParams{}, fmt.Errorf("Invalid compaction window size format. Please use numeric value followed by 'm' for minutes, 'h' for hours, or 'd' for days")
}
// if !isMatch {
// return TemplateParams{}, fmt.Errorf("invalid compaction window size format: %s. Please use numeric value followed by 'm' for minutes, 'h' for hours, or 'd' for days", cfg.CompactionWindow)
// }

params.CompactionWindowSize, err = strconv.Atoi(params.CompactionWindow[:len(params.CompactionWindow)-1])
params.CompactionWindowSize, err = strconv.Atoi(params.CompactionWindow[0 : len(params.CompactionWindow)-1])
if err != nil {
return TemplateParams{}, err
}

params.CompactionWindowUnit = params.CompactionWindow[len(params.CompactionWindow)-1:]
params.CompactionWindowUnit = params.CompactionWindow[len(params.CompactionWindow)-1 : len(params.CompactionWindow)]
} else {
traceTTLMinutes := cfg.TraceTTL / 60

Expand Down Expand Up @@ -176,7 +175,7 @@ func getCassandraQueriesFromQueryStrings(session cassandra.Session, queries []st
return casQueries
}

func contructSchemaQueries(session cassandra.Session, cfg *config.Schema) ([]cassandra.Query, error) {
func contructSchemaQueries(session cassandra.Session, cfg *Schema) ([]cassandra.Query, error) {
params, err := constructTemplateParams(*cfg)
if err != nil {
return nil, err
Expand All @@ -197,7 +196,7 @@ func contructSchemaQueries(session cassandra.Session, cfg *config.Schema) ([]cas
return casQueries, nil
}

func GenerateSchemaIfNotPresent(session cassandra.Session, cfg *config.Schema) error {
func GenerateSchemaIfNotPresent(session cassandra.Session, cfg *Schema) error {
casQueries, err := contructSchemaQueries(session, cfg)
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package schema
package config

import (
"testing"
Expand All @@ -19,7 +19,7 @@ func TestQueryCreationFromTemplate(t *testing.T) {
require.NoError(t, err)

expOutputQueries := []string{
`CREATE TYPE IF NOT EXISTS jaeger_v2_test.keyvalue (
`CREATE TYPE IF NOT EXISTS jaeger_v1_dc1.keyvalue (
key text,
value_type text,
value_string text,
Expand All @@ -29,12 +29,12 @@ value_double double,
value_binary blob
);
`,
`CREATE TYPE IF NOT EXISTS jaeger_v2_test.log (
`CREATE TYPE IF NOT EXISTS jaeger_v1_dc1.log (
ts bigint,
fields frozen<list<frozen<jaeger_v2_test.keyvalue>>>
fields frozen<list<frozen<jaeger_v1_dc1.keyvalue>>>
);
`,
`CREATE TABLE IF NOT EXISTS jaeger_v2_test.service_names (
`CREATE TABLE IF NOT EXISTS jaeger_v1_dc1.service_names (
service_name text,
PRIMARY KEY (service_name)
)
Expand All @@ -47,7 +47,7 @@ AND default_time_to_live = 172800
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800;
`,
`CREATE TABLE IF NOT EXISTS jaeger_v2_test.dependencies_v2 (
`CREATE TABLE IF NOT EXISTS jaeger_v1_dc1.dependencies_v2 (
ts_bucket timestamp,
ts timestamp,
dependencies list<frozen<dependency>>,
Expand Down
File renamed without changes.
14 changes: 0 additions & 14 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
cLock "github.com/jaegertracing/jaeger/plugin/pkg/distributedlock/cassandra"
cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore"
cSamplingStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/samplingstore"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra/schema"
cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/dbmodel"
"github.com/jaegertracing/jaeger/storage"
Expand Down Expand Up @@ -132,10 +131,6 @@ func (f *Factory) configureFromOptions(o *Options) {
}
}

func (f *Factory) initializeDB(session cassandra.Session, cfg *config.Schema) error {
return schema.GenerateSchemaIfNotPresent(session, cfg)
}

// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.primaryMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra", Tags: nil})
Expand All @@ -148,21 +143,12 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
}
f.primarySession = primarySession

// After creating a session, execute commands to initialize the setup if not already present
if err := f.initializeDB(primarySession, &f.Options.Primary.Schema); err != nil {
return err
}

if f.archiveConfig != nil {
archiveSession, err := f.archiveConfig.NewSession()
if err != nil {
return err
}
f.archiveSession = archiveSession

if err := f.initializeDB(archiveSession, &f.Options.Primary.Schema); err != nil {
return err
}
} else {
logger.Info("Cassandra archive storage configuration is empty, skipping")
}
Expand Down
18 changes: 15 additions & 3 deletions scripts/cassandra-integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,23 @@ export CASSANDRA_USERNAME="cassandra"
export CASSANDRA_PASSWORD="cassandra"
success="false"

SKIP_APPLY_SCHEMA=false

while getopts "s" opt; do
case "${opt}" in
s)
SKIP_APPLY_SCHEMA=true
;;
esac
done

usage() {
echo $"Usage: $0 <cassandra_version> <schema_version>"
exit 1
}

check_arg() {
if [ ! $# -eq 3 ]; then
if [ ! $# -ge 3 ]; then
echo "ERROR: need exactly three arguments, <cassandra_version> <schema_version> <jaeger_version>"
usage
fi
Expand Down Expand Up @@ -74,8 +84,10 @@ run_integration_test() {
# shellcheck disable=SC2064
trap "teardown_cassandra ${compose_file}" EXIT

apply_schema "$schema_version" "$primaryKeyspace"
apply_schema "$schema_version" "$archiveKeyspace"
if [[ ! SKIP_APPLY_SCHEMA ]]; then
apply_schema "$schema_version" "$primaryKeyspace"
apply_schema "$schema_version" "$archiveKeyspace"
fi

if [ "${jaegerVersion}" = "v1" ]; then
STORAGE=cassandra make storage-integration-test
Expand Down

0 comments on commit ed3e172

Please sign in to comment.