diff --git a/.go-version b/.go-version index 71f7f51d..a1b6e17d 100644 --- a/.go-version +++ b/.go-version @@ -1 +1 @@ -1.22 +1.23 diff --git a/Dockerfile b/Dockerfile index 264b1fa4..4f658854 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,7 @@ FROM golang AS builder MAINTAINER Ugo Landini -ARG VERSION=0.3.9 +ARG VERSION=0.4.0 ARG GOVERSION=$(go version) ARG USER=$(id -u -n) ARG TIME=$(date) diff --git a/Dockerfile.alpine b/Dockerfile.alpine index d438a050..eae71f54 100644 --- a/Dockerfile.alpine +++ b/Dockerfile.alpine @@ -1,7 +1,7 @@ FROM golang:1.22-alpine AS builder MAINTAINER Ugo Landini -ARG VERSION=0.3.9 +ARG VERSION=0.4.0 ARG GOVERSION=$(go version) ARG USER=$(id -u -n) ARG TIME=$(date) diff --git a/Dockerfile.scratch b/Dockerfile.scratch index a23e18d0..f246bb8a 100644 --- a/Dockerfile.scratch +++ b/Dockerfile.scratch @@ -1,7 +1,7 @@ FROM golang:1.22-alpine AS builder MAINTAINER Ugo Landini -ARG VERSION=0.3.9 +ARG VERSION=0.4.0 ARG GOVERSION=$(go version) ARG USER=$(id -u -n) ARG TIME=$(date) diff --git a/Makefile b/Makefile index 7d3f1435..970c3c12 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -VERSION=0.3.9 +VERSION=0.4.0 GOVERSION=$(shell go version) USER=$(shell id -u -n) TIME=$(shell date) @@ -89,7 +89,7 @@ vet: go vet lint: - golangci-lint run --enable-all + golangci-lint run --config .localci/lint/golangci.yml --out-format tab help: hello @echo '' diff --git a/go.mod b/go.mod index 81162d01..4cd51774 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/spf13/cobra v1.8.1 github.com/spf13/viper v1.19.0 github.com/squeeze69/generacodicefiscale v1.0.5 + github.com/stretchr/testify v1.9.0 github.com/vadv/gopher-lua-libs v0.5.0 github.com/yuin/gopher-lua v1.1.1 go.mongodb.org/mongo-driver v1.16.0 @@ -73,6 +74,7 @@ require ( github.com/cbroglie/mustache v1.0.1 // indirect github.com/cenkalti/backoff/v3 v3.0.0 // indirect github.com/cheggaaa/pb/v3 v3.0.5 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect github.com/fatih/color v1.16.0 // indirect @@ -116,6 +118,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/montanaflynn/stats v0.7.1 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.17.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.44.0 // indirect diff --git a/pkg/cmd/createTopic.go b/pkg/cmd/createTopic.go index e4f023b4..56a3b351 100644 --- a/pkg/cmd/createTopic.go +++ b/pkg/cmd/createTopic.go @@ -37,11 +37,11 @@ jr createTopic newDefaultTopic Run: func(cmd *cobra.Command, args []string) { kafkaConfig, _ := cmd.Flags().GetString("kafkaConfig") - kManager := &kafka.KafkaManager{} + kManager := &kafka.Manager{} kManager.Initialize(kafkaConfig) partitions, _ := cmd.Flags().GetInt("partitions") replica, _ := cmd.Flags().GetInt("replica") - kManager.CreateTopicFull(args[0], partitions, replica) + kManager.CreateTopicFull(cmd.Context(), args[0], partitions, replica) }, } diff --git a/pkg/cmd/emitterRun.go b/pkg/cmd/emitterRun.go index cc4a84a2..44a59a0f 100644 --- a/pkg/cmd/emitterRun.go +++ b/pkg/cmd/emitterRun.go @@ -42,7 +42,7 @@ func RunEmitters(ctx context.Context, emitterNames []string, ems map[string][]em defer emitter.WriteStats() defer emitter.CloseProducers(ctx, ems) emittersToRun := emitter.Initialize(ctx, emitterNames, ems, dryrun) - emitter.DoLoop(emittersToRun) + emitter.DoLoop(ctx, emittersToRun) } func init() { diff --git a/pkg/cmd/functionList.go b/pkg/cmd/functionList.go index acd30b7e..77ab5bcb 100644 --- a/pkg/cmd/functionList.go +++ b/pkg/cmd/functionList.go @@ -57,7 +57,8 @@ func doList(cmd *cobra.Command, args []string) { isMarkdown, _ := cmd.Flags().GetBool("markdown") noColor, _ := cmd.Flags().GetBool("nocolor") - if category && len(args) > 0 { + switch { + case category && len(args) > 0: var functionNames []string for k, v := range functions.DescriptionMap() { if v.Category == args[0] { @@ -65,7 +66,7 @@ func doList(cmd *cobra.Command, args []string) { } } sortAndPrint(functionNames, isMarkdown, noColor) - } else if find && len(args) > 0 { + case find && len(args) > 0: var functionNames []string for k, v := range functions.DescriptionMap() { if strings.Contains(v.Description, args[0]) || strings.Contains(v.Name, args[0]) { @@ -73,8 +74,7 @@ func doList(cmd *cobra.Command, args []string) { } } sortAndPrint(functionNames, isMarkdown, noColor) - } else if len(args) == 1 { - + case len(args) == 1: if run { f, found := printFunction(args[0], isMarkdown, noColor) if found { @@ -87,9 +87,7 @@ func doList(cmd *cobra.Command, args []string) { } else { printFunction(args[0], isMarkdown, noColor) } - } else { - - //l := len(functions.FunctionsMap()) + default: l := len(functions.DescriptionMap()) functionNames := make([]string, l) @@ -101,6 +99,7 @@ func doList(cmd *cobra.Command, args []string) { sortAndPrint(functionNames, isMarkdown, noColor) } + fmt.Println() } @@ -108,7 +107,7 @@ func sortAndPrint(functionNames []string, isMarkdown bool, noColor bool) { slices.Sort(functionNames) for _, k := range functionNames { printFunction(k, isMarkdown, noColor) - //fmt.Println(k) + // fmt.Println(k) } fmt.Println() fmt.Printf("Total functions: %d\n", len(functionNames)) @@ -124,34 +123,37 @@ func printFunction(name string, isMarkdown bool, noColor bool) (functions.Functi Reset = "\033[0m" } - if found { - if isMarkdown { - fmt.Println() - fmt.Printf("### %s \n", f.Name) - fmt.Printf("**Category:** %s\\\n", f.Category) - fmt.Printf("**Description:** %s\\\n", f.Description) - - if len(f.Parameters) > 0 { - fmt.Printf("**Parameters:** `%s`\\\n", f.Parameters) - } else { - fmt.Printf("**Parameters:** %s \\\n", f.Parameters) - } - fmt.Printf("**Localizable:** `%v`\\\n", f.Localizable) - fmt.Printf("**Return:** `%s`\\\n", f.Return) - fmt.Printf("**Example:** `%s`\\\n", f.Example) - fmt.Printf("**Output:** `%s`\n", f.Output) + if !found { + return f, found + } + + if isMarkdown { + fmt.Println() + fmt.Printf("### %s \n", f.Name) + fmt.Printf("**Category:** %s\\\n", f.Category) + fmt.Printf("**Description:** %s\\\n", f.Description) + + if len(f.Parameters) > 0 { + fmt.Printf("**Parameters:** `%s`\\\n", f.Parameters) } else { - fmt.Println() - fmt.Printf("%sName: %s%s\n", Cyan, Reset, f.Name) - fmt.Printf("%sCategory: %s%s\n", Cyan, Reset, f.Category) - fmt.Printf("%sDescription: %s%s\n", Cyan, Reset, f.Description) - fmt.Printf("%sParameters: %s%s\n", Cyan, Reset, f.Parameters) - fmt.Printf("%sLocalizable: %s%v\n", Cyan, Reset, f.Localizable) - fmt.Printf("%sReturn: %s%s\n", Cyan, Reset, f.Return) - fmt.Printf("%sExample: %s%s\n", Cyan, Reset, f.Example) - fmt.Printf("%sOutput: %s%s\n", Cyan, Reset, f.Output) + fmt.Printf("**Parameters:** %s \\\n", f.Parameters) } + fmt.Printf("**Localizable:** `%v`\\\n", f.Localizable) + fmt.Printf("**Return:** `%s`\\\n", f.Return) + fmt.Printf("**Example:** `%s`\\\n", f.Example) + fmt.Printf("**Output:** `%s`\n", f.Output) + } else { + fmt.Println() + fmt.Printf("%sName: %s%s\n", Cyan, Reset, f.Name) + fmt.Printf("%sCategory: %s%s\n", Cyan, Reset, f.Category) + fmt.Printf("%sDescription: %s%s\n", Cyan, Reset, f.Description) + fmt.Printf("%sParameters: %s%s\n", Cyan, Reset, f.Parameters) + fmt.Printf("%sLocalizable: %s%v\n", Cyan, Reset, f.Localizable) + fmt.Printf("%sReturn: %s%s\n", Cyan, Reset, f.Return) + fmt.Printf("%sExample: %s%s\n", Cyan, Reset, f.Example) + fmt.Printf("%sOutput: %s%s\n", Cyan, Reset, f.Output) } + return f, found } diff --git a/pkg/cmd/server.go b/pkg/cmd/server.go index ee08959e..13a0ba3e 100644 --- a/pkg/cmd/server.go +++ b/pkg/cmd/server.go @@ -90,6 +90,12 @@ var emitterToRun = make(map[string][]emitter.Emitter) var store = sessions.NewCookieStore([]byte("templates")) +type serverKey string + +const ( + sessionKey serverKey = "session" +) + var serverCmd = &cobra.Command{ Use: "server", Short: "Starts the jr http server", @@ -153,14 +159,24 @@ var serverCmd = &cobra.Command{ addr := fmt.Sprintf(":%d", port) log.Info().Int("port", port).Msg("Starting HTTP server") - log.Fatal().Err(http.ListenAndServe(addr, router)) + + // TODO: must validate values + s := &http.Server{ + Addr: addr, + ReadHeaderTimeout: 20 * time.Second, + ReadTimeout: 1 * time.Minute, + WriteTimeout: 2 * time.Minute, + Handler: router, + } + + log.Fatal().Err(s.ListenAndServe()) }, } func SessionMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { session, _ := store.Get(r, "session-name") - r = r.WithContext(context.WithValue(r.Context(), "session", session)) + r = r.WithContext(context.WithValue(r.Context(), sessionKey, session)) next.ServeHTTP(w, r) }) } @@ -244,7 +260,7 @@ func FileServer(r chi.Router, path string, root http.FileSystem) { } if path != "/" && path[len(path)-1] != '/' { - r.Get(path, http.RedirectHandler(path+"/", 301).ServeHTTP) + r.Get(path, http.RedirectHandler(path+"/", http.StatusMovedPermanently).ServeHTTP) path += "/" } path += "*" @@ -260,7 +276,7 @@ func FileServer(r chi.Router, path string, root http.FileSystem) { func listEmitters(w http.ResponseWriter, r *http.Request) { emitters_json, _ := json.Marshal(emitters) - _, err := w.Write([]byte(emitters_json)) + _, err := w.Write(emitters_json) if err != nil { log.Error().Err(err).Msg("Error writing response") } @@ -342,22 +358,25 @@ func runEmitter(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") url := chi.URLParam(r, "emitter") - if firstRun[url] == false { - for i := 0; i < len(emitters); i++ { - if functions.Contains([]string{url}, emitters[i].Name) { - emitters[i].Initialize(r.Context(), configuration.GlobalCfg) - emitterToRun[url] = append(emitterToRun[url], emitters[i]) - if emitters[i].Preload > 0 { - emitters[i].Run(emitters[i].Preload, w) - } else { - emitters[i].Run(emitters[i].Num, w) - } - firstRun[url] = true - } - } - } else { + + if firstRun[url] { for _, e := range emitterToRun[url] { - e.Run(e.Num, w) + e.Run(r.Context(), e.Num, w) + } + + return + } + + for i := 0; i < len(emitters); i++ { + if functions.Contains([]string{url}, emitters[i].Name) { + emitters[i].Initialize(r.Context(), configuration.GlobalCfg) + emitterToRun[url] = append(emitterToRun[url], emitters[i]) + if emitters[i].Preload > 0 { + emitters[i].Run(r.Context(), emitters[i].Preload, w) + } else { + emitters[i].Run(r.Context(), emitters[i].Num, w) + } + firstRun[url] = true } } @@ -380,12 +399,12 @@ func loadLastStatus(w http.ResponseWriter, r *http.Request) { response.WriteString("{") - session := r.Context().Value("session").(*sessions.Session) - lastTemplateSubmittedValue_without_type, _ := session.Values["lastTemplateSubmittedValue"] - lastTemplateSubmittedValue, _ := lastTemplateSubmittedValue_without_type.(string) + session := r.Context().Value(sessionKey).(*sessions.Session) + lastTemplateSubmittedValue_without_type := session.Values["lastTemplateSubmittedValue"] + lastTemplateSubmittedValue := lastTemplateSubmittedValue_without_type.(string) - lastTemplateSubmittedisJsonOutputValue_without_type, _ := session.Values["lastTemplateSubmittedisJsonOutputValue"] - lastTemplateSubmittedisJsonOutputValue, _ := lastTemplateSubmittedisJsonOutputValue_without_type.(string) + lastTemplateSubmittedisJsonOutputValue_without_type := session.Values["lastTemplateSubmittedisJsonOutputValue"] + lastTemplateSubmittedisJsonOutputValue := lastTemplateSubmittedisJsonOutputValue_without_type.(string) lastTemplateSubmittedValueB64 := base64.StdEncoding.EncodeToString([]byte(lastTemplateSubmittedValue)) @@ -412,7 +431,7 @@ func executeTemplate(w http.ResponseWriter, r *http.Request) { var lastTemplateSubmittedValue = r.Form.Get("template") var lastTemplateSubmittedisJsonOutputValue = r.Form.Get("isJsonOutput") - session := r.Context().Value("session").(*sessions.Session) + session := r.Context().Value(sessionKey).(*sessions.Session) session.Values["lastTemplateSubmittedValue"] = lastTemplateSubmittedValue session.Values["lastTemplateSubmittedisJsonOutputValue"] = lastTemplateSubmittedisJsonOutputValue session.Save(r, w) @@ -434,7 +453,7 @@ func executeTemplate(w http.ResponseWriter, r *http.Request) { return } - _, err := w.Write([]byte(b.String())) + _, err := w.Write(b.Bytes()) if err != nil { log.Error().Err(err).Msg("Error writing response") } diff --git a/pkg/cmd/templateShow.go b/pkg/cmd/templateShow.go index 58ef2dc1..308c2f48 100644 --- a/pkg/cmd/templateShow.go +++ b/pkg/cmd/templateShow.go @@ -49,7 +49,7 @@ var templateShowCmd = &cobra.Command{ if err != nil { log.Fatal().Err(err).Msg("Failed to ReadFile") } - valid, err := isValidTemplate([]byte(templateScript)) + valid, err := isValidTemplate(templateScript) if err != nil { log.Fatal().Err(err).Msg("Failed to read a template") } diff --git a/pkg/ctx/context.go b/pkg/ctx/context.go index 91da7937..7b83c5e1 100644 --- a/pkg/ctx/context.go +++ b/pkg/ctx/context.go @@ -25,7 +25,7 @@ import ( "time" ) -var JrContext Context +var JrContext *Context // Context is the object passed on the templates which contains all the needed details. type Context struct { @@ -50,7 +50,7 @@ type Context struct { func init() { - JrContext = Context{ + JrContext = &Context{ StartTime: time.Now(), GeneratedBytes: 0, GeneratedObjects: 0, diff --git a/pkg/emitter/emitter.go b/pkg/emitter/emitter.go index 895e74ef..bfd4acdb 100644 --- a/pkg/emitter/emitter.go +++ b/pkg/emitter/emitter.go @@ -41,7 +41,7 @@ import ( "github.com/jrnd-io/jr/pkg/producers/http" "github.com/jrnd-io/jr/pkg/producers/kafka" "github.com/jrnd-io/jr/pkg/producers/luascript" - "github.com/jrnd-io/jr/pkg/producers/mongoDB" + "github.com/jrnd-io/jr/pkg/producers/mongodb" "github.com/jrnd-io/jr/pkg/producers/redis" "github.com/jrnd-io/jr/pkg/producers/s3" "github.com/jrnd-io/jr/pkg/producers/server" @@ -99,17 +99,17 @@ func (e *Emitter) Initialize(ctx context.Context, conf configuration.GlobalConfi o, _ := tpl.NewTpl("out", e.OutputTemplate, functions.FunctionsMap(), nil) if e.Output == "stdout" { - e.Producer = &console.ConsoleProducer{OutputTpl: &o} + e.Producer = &console.Producer{OutputTpl: &o} return } if e.Output == "kafka" { e.Producer = createKafkaProducer(ctx, conf, e.Topic, templateName) return - } else { - if conf.SchemaRegistry { - log.Warn().Msg("Ignoring schemaRegistry and/or serializer when output not set to kafka") - } + } + + if conf.SchemaRegistry { + log.Warn().Msg("Ignoring schemaRegistry and/or serializer when output not set to kafka") } if e.Output == "redis" { @@ -176,7 +176,7 @@ func (e *Emitter) Initialize(ctx context.Context, conf configuration.GlobalConfi } -func (e *Emitter) Run(num int, o any) { +func (e *Emitter) Run(ctx context.Context, num int, o any) { for i := 0; i < num; i++ { @@ -185,9 +185,9 @@ func (e *Emitter) Run(num int, o any) { kInValue := functions.GetV("KEY") if kInValue != "" { - e.Producer.Produce(context.TODO(), []byte(kInValue), []byte(v), o) + e.Producer.Produce(ctx, []byte(kInValue), []byte(v), o) } else { - e.Producer.Produce(context.TODO(), []byte(k), []byte(v), o) + e.Producer.Produce(ctx, []byte(k), []byte(v), o) } jtctx.JrContext.GeneratedObjects++ jtctx.JrContext.GeneratedBytes += int64(len(v)) @@ -197,7 +197,7 @@ func (e *Emitter) Run(num int, o any) { } func createRedisProducer(_ context.Context, ttl time.Duration, redisConfig string) Producer { - rProducer := &redis.RedisProducer{ + rProducer := &redis.Producer{ Ttl: ttl, } rProducer.Initialize(redisConfig) @@ -205,21 +205,21 @@ func createRedisProducer(_ context.Context, ttl time.Duration, redisConfig strin } func createMongoProducer(ctx context.Context, mongoConfig string) Producer { - mProducer := &mongoDB.MongoProducer{} + mProducer := &mongodb.MongoProducer{} mProducer.Initialize(ctx, mongoConfig) return mProducer } func createElasticProducer(_ context.Context, elasticConfig string) Producer { - eProducer := &elastic.ElasticProducer{} + eProducer := &elastic.Producer{} eProducer.Initialize(elasticConfig) return eProducer } func createS3Producer(ctx context.Context, s3Config string) Producer { - sProducer := &s3.S3Producer{} + sProducer := &s3.Producer{} sProducer.Initialize(ctx, s3Config) return sProducer @@ -247,7 +247,7 @@ func createAZCosmosDBProducer(_ context.Context, azConfig string) Producer { } func createGCSProducer(ctx context.Context, gcsConfig string) Producer { - gProducer := &gcs.GCSProducer{} + gProducer := &gcs.Producer{} gProducer.Initialize(ctx, gcsConfig) return gProducer @@ -281,9 +281,10 @@ func createWASMProducer(ctx context.Context, config string) Producer { return producer } -func createKafkaProducer(_ context.Context, conf configuration.GlobalConfiguration, topic string, templateType string) *kafka.KafkaManager { +func createKafkaProducer(ctx context.Context, conf configuration.GlobalConfiguration, topic string, templateType string) *kafka.Manager { + - kManager := &kafka.KafkaManager{ + kManager := &kafka.Manager{ Serializer: conf.Serializer, Topic: topic, TemplateType: templateType, @@ -295,7 +296,7 @@ func createKafkaProducer(_ context.Context, conf configuration.GlobalConfigurati kManager.InitializeSchemaRegistry(conf.RegistryConfig) } if conf.AutoCreate { - kManager.CreateTopic(topic) + kManager.CreateTopic(ctx, topic) } return kManager } diff --git a/pkg/emitter/loop.go b/pkg/emitter/loop.go index 3b9003d1..065aad7f 100644 --- a/pkg/emitter/loop.go +++ b/pkg/emitter/loop.go @@ -24,7 +24,7 @@ import ( "context" "fmt" "github.com/jrnd-io/jr/pkg/configuration" - "github.com/jrnd-io/jr/pkg/ctx" + jrctx "github.com/jrnd-io/jr/pkg/ctx" "github.com/jrnd-io/jr/pkg/functions" "os" "os/signal" @@ -66,12 +66,12 @@ func InitializeEmitters(ctx context.Context, emitters []Emitter, dryrun bool, em } emitters[i].Initialize(ctx, configuration.GlobalCfg) emittersToRun = append(emittersToRun, emitters[i]) - emitters[i].Run(emitters[i].Preload, nil) + emitters[i].Run(ctx, emitters[i].Preload, nil) } return emittersToRun } -func DoLoop(es []Emitter) { +func DoLoop(ctx context.Context, es []Emitter) { for _, e := range es { addEmitterToExpectedObjects(e) @@ -105,14 +105,14 @@ func DoLoop(es []Emitter) { stop() return case <-ticker.C: - doTemplate(es[index]) + doTemplate(ctx, es[index]) case <-stopChannels[timerIndex]: return } } } else { - doTemplate(es[index]) + doTemplate(ctx, es[index]) } }(index) @@ -124,12 +124,12 @@ func DoLoop(es []Emitter) { wg.Wait() } -func doTemplate(emitter Emitter) { - ctx.JrContext.Locale = emitter.Locale - ctx.JrContext.CountryIndex = functions.IndexOf(strings.ToUpper(emitter.Locale), "country") +func doTemplate(ctx context.Context, emitter Emitter) { + jrctx.JrContext.Locale = emitter.Locale + jrctx.JrContext.CountryIndex = functions.IndexOf(strings.ToUpper(emitter.Locale), "country") for i := 0; i < emitter.Num; i++ { - ctx.JrContext.CurrentIterationLoopIndex++ + jrctx.JrContext.CurrentIterationLoopIndex++ k := emitter.KTpl.Execute() v := emitter.VTpl.Execute() @@ -139,13 +139,13 @@ func doTemplate(emitter Emitter) { kInValue := functions.GetV("KEY") if (kInValue) != "" { - emitter.Producer.Produce(context.TODO(), []byte(kInValue), []byte(v), nil) + emitter.Producer.Produce(ctx, []byte(kInValue), []byte(v), nil) } else { - emitter.Producer.Produce(context.TODO(), []byte(k), []byte(v), nil) + emitter.Producer.Produce(ctx, []byte(k), []byte(v), nil) } - ctx.JrContext.GeneratedObjects++ - ctx.JrContext.GeneratedBytes += int64(len(v)) + jrctx.JrContext.GeneratedObjects++ + jrctx.JrContext.GeneratedBytes += int64(len(v)) } } @@ -172,21 +172,21 @@ func addEmitterToExpectedObjects(e Emitter) { if d > 0 && f > 0 && n > 0 { expected := (d / f) * int64(n) - ctx.JrContext.ExpectedObjects += expected + jrctx.JrContext.ExpectedObjects += expected } } func WriteStats() { _, _ = fmt.Fprintln(os.Stderr) - elapsed := time.Since(ctx.JrContext.StartTime) + elapsed := time.Since(jrctx.JrContext.StartTime) _, _ = fmt.Fprintf(os.Stderr, "Elapsed time: %v\n", elapsed.Round(1*time.Second)) - _, _ = fmt.Fprintf(os.Stderr, "Data Generated (Objects): %d\n", ctx.JrContext.GeneratedObjects) + _, _ = fmt.Fprintf(os.Stderr, "Data Generated (Objects): %d\n", jrctx.JrContext.GeneratedObjects) - ungenerated := ctx.JrContext.ExpectedObjects - ctx.JrContext.GeneratedObjects + ungenerated := jrctx.JrContext.ExpectedObjects - jrctx.JrContext.GeneratedObjects if ungenerated > 0 { _, _ = fmt.Fprintf(os.Stderr, "Data NOT Generated (Objects): %d\n", ungenerated) } - _, _ = fmt.Fprintf(os.Stderr, "Data Generated (bytes): %d\n", ctx.JrContext.GeneratedBytes) - _, _ = fmt.Fprintf(os.Stderr, "Throughput (bytes per second): %9.f\n", float64(ctx.JrContext.GeneratedBytes)/elapsed.Seconds()) + _, _ = fmt.Fprintf(os.Stderr, "Data Generated (bytes): %d\n", jrctx.JrContext.GeneratedBytes) + _, _ = fmt.Fprintf(os.Stderr, "Throughput (bytes per second): %9.f\n", float64(jrctx.JrContext.GeneratedBytes)/elapsed.Seconds()) _, _ = fmt.Fprintln(os.Stderr) } diff --git a/pkg/functions/address.go b/pkg/functions/address.go index fdd746b2..b4a930f1 100644 --- a/pkg/functions/address.go +++ b/pkg/functions/address.go @@ -55,10 +55,10 @@ func Cardinal(short bool) string { if short { directions := []string{"N", "S", "E", "O", "NE", "NO", "SE", "SO"} return directions[Random.Intn(len(directions))] - } else { - directions := []string{"North", "South", "East", "Ovest", "North-East", "North-Ovest", "South-East", "South-Ovest"} - return directions[Random.Intn(len(directions))] } + + directions := []string{"North", "South", "East", "Ovest", "North-East", "North-Ovest", "South-East", "South-Ovest"} + return directions[Random.Intn(len(directions))] } // City returns a random City @@ -79,9 +79,9 @@ func Country() string { countryIndex := ctx.JrContext.CountryIndex if countryIndex == -1 { return Word("country") - } else { - return WordAt("country", countryIndex) } + + return WordAt("country", countryIndex) } // CountryRandom returns a random ISO 3166 Country @@ -163,9 +163,9 @@ func Zip() string { z := Word("zip") zip, _ := Regex(z) return zip - } else { - return ZipAt(cityIndex) } + + return ZipAt(cityIndex) } // ZipAt returns Zip code at given index diff --git a/pkg/functions/finance.go b/pkg/functions/finance.go index 8f291b30..8621addf 100644 --- a/pkg/functions/finance.go +++ b/pkg/functions/finance.go @@ -121,7 +121,7 @@ func Swift() string { location := rand.Intn(100) branch := rand.Intn(1000) - return string(bankCode) + string(country) + fmt.Sprintf("%02d", location) + fmt.Sprintf("%03d", branch) + return string(bankCode) + country + fmt.Sprintf("%02d", location) + fmt.Sprintf("%03d", branch) } diff --git a/pkg/functions/functions.go b/pkg/functions/functions.go index 07089277..be219d93 100644 --- a/pkg/functions/functions.go +++ b/pkg/functions/functions.go @@ -145,7 +145,7 @@ var fmap = map[string]interface{}{ "zip": Zip, "zip_at": ZipAt, - //finance + // finance "account": Account, "amount": Amount, "bitcoin": Bitcoin, @@ -160,7 +160,7 @@ var fmap = map[string]interface{}{ "valor": Valor, "wkn": Wkn, - //time and dates + // time and dates "birthdate": BirthDate, "date_between": DateBetween, "dates_between": DatesBetween, @@ -170,7 +170,7 @@ var fmap = map[string]interface{}{ "soon": Soon, "unix_time_stamp": UnixTimeStamp, - //phone + // phone "country_code": CountryCode, "country_code_at": CountryCodeAt, "imei": Imei, @@ -204,10 +204,10 @@ var fmap = map[string]interface{}{ func Atoi(s string) int { if len(s) == 0 { return 0 - } else { - i, _ := strconv.Atoi(s) - return i } + + i, _ := strconv.Atoi(s) + return i } // Seed sets seeds and can be used in a template @@ -256,9 +256,9 @@ func IndexOf(s string, name string) int { if index < len(words) && words[index] == s { return index - } else { - return -1 } + + return -1 } // Len returns number of words (lines) in a word file @@ -290,9 +290,9 @@ func RandomValueFromList(s string) string { l := len(list) if l != 0 { return list[Random.Intn(l)] - } else { - return "" } + + return "" } // GetValuesFromList returns a value from Context list l at index @@ -304,9 +304,9 @@ func GetValueFromListAtIndex(s string, index int) string { l := len(list) if l != 0 && index < l { return list[index] - } else { - return "" } + + return "" } // RandomNValuesFromList returns a random value from Context list l @@ -322,9 +322,9 @@ func RandomNValuesFromList(s string, n int) []string { results[i] = list[i] } return results - } else { - return []string{""} } + + return []string{""} } // Word returns a random string from a list of strings in a file. @@ -394,28 +394,29 @@ func Cache(name string) (bool, error) { templateDir := fmt.Sprintf("%s/%s", constants.JR_SYSTEM_DIR, "templates") v := data[name] - if v == nil { - locale := strings.ToLower(ctx.JrContext.Locale) - filename := fmt.Sprintf("%s/data/%s/%s", os.ExpandEnv(templateDir), locale, name) - if locale != "us" && !(fileExists(filename)) { - filename = fmt.Sprintf("%s/data/%s/%s", os.ExpandEnv(templateDir), "us", name) - } - data[name] = initialize(filename) - if len(data[name]) == 0 { - return false, fmt.Errorf("no words found in %s", filename) - } else { - return true, nil - } + if v != nil { + return false, nil + } + + locale := strings.ToLower(ctx.JrContext.Locale) + filename := fmt.Sprintf("%s/data/%s/%s", os.ExpandEnv(templateDir), locale, name) + if locale != "us" && !(fileExists(filename)) { + filename = fmt.Sprintf("%s/data/%s/%s", os.ExpandEnv(templateDir), "us", name) + } + data[name] = initialize(filename) + if len(data[name]) == 0 { + return false, fmt.Errorf("no words found in %s", filename) } - return false, nil + + return true, nil } func fileExists(filename string) bool { if _, err := os.Stat(filename); err == nil { return true - } else { - return false } + + return false } // Helper function to generate n different integers from 0 to length @@ -469,59 +470,60 @@ func initialize(filename string) []string { } func InitCSV(csvpath string) { - //Loads the csv file in the context - if len(csvpath) > 0 { + // Loads the csv file in the context + if len(csvpath) == 0 { + return + } - var csvHeaders = make(map[int]string) - csvValues := make(map[int]map[string]string) + var csvHeaders = make(map[int]string) + csvValues := make(map[int]map[string]string) - if _, err := os.Stat(csvpath); err != nil { - println("File does not exist: ", csvpath) - os.Exit(1) - } + if _, err := os.Stat(csvpath); err != nil { + println("File does not exist: ", csvpath) + os.Exit(1) + } - file, err := os.Open(csvpath) + file, err := os.Open(csvpath) - if err != nil { - println("Error opening file:", csvpath, "error:", err) - os.Exit(1) - } else { - reader := csv.NewReader(file) - - lines, err := reader.ReadAll() - if err != nil { - println("Error reading CSV file:", csvpath, "error:", err) - os.Exit(1) + if err != nil { + println("Error opening file:", csvpath, "error:", err) + os.Exit(1) + } + + defer file.Close() + + reader := csv.NewReader(file) + + lines, err := reader.ReadAll() + if err != nil { + println("Error reading CSV file:", csvpath, "error:", err) + os.Exit(1) + } + + for row := 0; row < len(lines); row++ { + var aRow = lines[row] + for col := 0; col < len(aRow); col++ { + var value = aRow[col] + // print(" ROW -> ",row) + if row == 0 { + // print(" H: ", value) + csvHeaders[col] = strings.Trim(value, " ") } else { - for row := 0; row < len(lines); row++ { - var aRow = lines[row] - for col := 0; col < len(aRow); col++ { - var value = aRow[col] - //print(" ROW -> ",row) - if row == 0 { - //print(" H: ", value) - csvHeaders[col] = strings.Trim(value, " ") - } else { - - val, exists := csvValues[row-1] - if exists { - val[csvHeaders[col]] = strings.Trim(value, " ") - csvValues[row-1] = val - } else { - var localmap = make(map[string]string) - localmap[csvHeaders[col]] = strings.Trim(value, " ") - csvValues[row-1] = localmap - } - // print(" V: ", value) - } - } - //println() + val, exists := csvValues[row-1] + if exists { + val[csvHeaders[col]] = strings.Trim(value, " ") + csvValues[row-1] = val + } else { + var localmap = make(map[string]string) + localmap[csvHeaders[col]] = strings.Trim(value, " ") + csvValues[row-1] = localmap } - ctx.JrContext.CtxCSV = csvValues + // print(" V: ", value) } } - defer file.Close() + // println() } + ctx.JrContext.CtxCSV = csvValues } diff --git a/pkg/functions/networking.go b/pkg/functions/networking.go index 1dd5fb3d..8f060b25 100644 --- a/pkg/functions/networking.go +++ b/pkg/functions/networking.go @@ -36,7 +36,7 @@ func Ip(cidr string) string { GENERATE: - ip, ipnet, err := net.ParseCIDR(cidr) + _, ipnet, err := net.ParseCIDR(cidr) if err != nil { return "0.0.0.0" } @@ -56,7 +56,7 @@ GENERATE: r[i] = ipnet.IP[i] } } - ip = net.IPv4(r[0], r[1], r[2], r[3]) + ip := net.IPv4(r[0], r[1], r[2], r[3]) if ip.Equal(ipnet.IP) /*|| Ip.Equal(broadcast) */ { goto GENERATE diff --git a/pkg/functions/people.go b/pkg/functions/people.go index 86963292..7d6c053d 100644 --- a/pkg/functions/people.go +++ b/pkg/functions/people.go @@ -146,9 +146,9 @@ func Name() string { s := Random.Intn(2) if s == 0 { return NameM() - } else { - return NameF() } + + return NameF() } // NameM returns a random male Name diff --git a/pkg/functions/phone.go b/pkg/functions/phone.go index a3c352bd..59f92fa5 100644 --- a/pkg/functions/phone.go +++ b/pkg/functions/phone.go @@ -27,9 +27,9 @@ func CountryCode() string { countryIndex := ctx.JrContext.CountryIndex if countryIndex == -1 { return Word("country_code") - } else { - return WordAt("country_code", countryIndex) } + + return WordAt("country_code", countryIndex) } // CountryCodeAt returns a Country Code prefix at a given index @@ -54,10 +54,9 @@ func Phone() string { l := Word("phone") lp, _ := Regex(l) return lp - } else { - return PhoneAt(cityIndex) } + return PhoneAt(cityIndex) } // PhoneAt returns a land prefix at a given index @@ -74,9 +73,9 @@ func MobilePhone() string { m := Word("mobile_phone") mp, _ := Regex(m) return mp - } else { - return MobilePhoneAt(countryIndex) } + + return MobilePhoneAt(countryIndex) } // MobilePhoneAt returns a mobile phone at a given index diff --git a/pkg/functions/regex.go b/pkg/functions/regex.go index 0cf22bf7..4314705e 100644 --- a/pkg/functions/regex.go +++ b/pkg/functions/regex.go @@ -40,7 +40,7 @@ type regexState struct { //gocyclo:ignore func generate(s *regexState, re *syntax.Regexp) string { - //fmt.Println("re:", re, "sub:", re.Sub) + // fmt.Println("re:", re, "sub:", re.Sub) op := re.Op switch op { case syntax.OpNoMatch: @@ -57,7 +57,7 @@ func generate(s *regexState, re *syntax.Regexp) string { sum := 0 for i := 0; i < len(re.Rune); i += 2 { - //fmt.Printf("Range: %#U-%#U\n", re.Rune[i], re.Rune[i+1]) + // fmt.Printf("Range: %#U-%#U\n", re.Rune[i], re.Rune[i+1]) sum += int(re.Rune[i+1]-re.Rune[i]) + 1 if re.Rune[i+1] == runeRangeEnd { @@ -70,7 +70,7 @@ func generate(s *regexState, re *syntax.Regexp) string { possibleChars := []uint8{} for j := 0; j < len(printableChars); j++ { c := printableChars[j] - //fmt.Printf("Char %c %d\n", c, c) + // fmt.Printf("Char %c %d\n", c, c) // Check c in range for i := 0; i < len(re.Rune); i += 2 { if rune(c) >= re.Rune[i] && rune(c) <= re.Rune[i+1] { @@ -79,16 +79,16 @@ func generate(s *regexState, re *syntax.Regexp) string { } } } - //fmt.Println("Possible chars: ", possibleChars) + // fmt.Println("Possible chars: ", possibleChars) if len(possibleChars) > 0 { c := possibleChars[Random.Intn(len(possibleChars))] - //fmt.Printf("Generated rune %c for inverse range %v\n", c, re) + // fmt.Printf("Generated rune %c for inverse range %v\n", c, re) return string([]byte{c}) } } - //fmt.Println("Char range: ", sum) - r := Random.Intn(int(sum)) + // fmt.Println("Char range: ", sum) + r := Random.Intn(sum) var ru rune sum = 0 for i := 0; i < len(re.Rune); i += 2 { @@ -100,7 +100,7 @@ func generate(s *regexState, re *syntax.Regexp) string { sum += gap } - //fmt.Printf("Generated rune %c for range %v\n", ru, re) + // fmt.Printf("Generated rune %c for range %v\n", ru, re) return string(ru) case syntax.OpAnyCharNotNL, syntax.OpAnyChar: @@ -117,7 +117,7 @@ func generate(s *regexState, re *syntax.Regexp) string { case syntax.OpWordBoundary: case syntax.OpNoWordBoundary: case syntax.OpCapture: - //fmt.Println("OpCapture", re.Sub, len(re.Sub)) + // fmt.Println("OpCapture", re.Sub, len(re.Sub)) return generate(s, re.Sub0[0]) case syntax.OpStar: // Repeat zero or more times @@ -143,7 +143,7 @@ func generate(s *regexState, re *syntax.Regexp) string { // Zero or one instances res := "" count := Random.Intn(2) - //fmt.Println("Quest", count) + // fmt.Println("Quest", count) for i := 0; i < count; i++ { for _, r := range re.Sub { res += generate(s, r) @@ -152,7 +152,7 @@ func generate(s *regexState, re *syntax.Regexp) string { return res case syntax.OpRepeat: // Repeat one or more times - //fmt.Println("OpRepeat", re.Min, re.Max) + // fmt.Println("OpRepeat", re.Min, re.Max) res := "" count := 0 @@ -160,7 +160,7 @@ func generate(s *regexState, re *syntax.Regexp) string { if re.Max > re.Min { count = Random.Intn(re.Max - re.Min + 1) } - //fmt.Println(re.Max, count) + // fmt.Println(re.Max, count) for i := 0; i < re.Min || i < (re.Min+count); i++ { for _, r := range re.Sub { @@ -176,7 +176,7 @@ func generate(s *regexState, re *syntax.Regexp) string { } return res case syntax.OpAlternate: - //fmt.Println("OpAlternative", re.Sub, len(re.Sub)) + // fmt.Println("OpAlternative", re.Sub, len(re.Sub)) i := Random.Intn(len(re.Sub)) return generate(s, re.Sub[i]) diff --git a/pkg/functions/utilities.go b/pkg/functions/utilities.go index de17693b..a8f90d79 100644 --- a/pkg/functions/utilities.go +++ b/pkg/functions/utilities.go @@ -34,10 +34,10 @@ func Counter(c string, start, step int) int { if exists { ctx.JrContext.CtxCounters[c] = val + step return ctx.JrContext.CtxCounters[c] - } else { - ctx.JrContext.CtxCounters[c] = start - return start } + + ctx.JrContext.CtxCounters[c] = start + return start } // Image generates a random Image url of given width, height and type @@ -60,9 +60,9 @@ func RandomBool() string { b := Random.Intn(2) if b == 0 { return "false" - } else { - return "true" } + + return "true" } // UniqueId returns a random uuid @@ -75,9 +75,9 @@ func YesOrNo() string { b := Random.Intn(2) if b == 0 { return "no" - } else { - return "yes" } + + return "yes" } // Contains checks if the str string is present in an array of string []string @@ -105,8 +105,7 @@ func FromCsv(c string) string { if len(ctx.JrContext.CtxCSV) > 0 { return ctx.JrContext.CtxCSV[(ctx.JrContext.CurrentIterationLoopIndex-1)%len(ctx.JrContext.CtxCSV)][c] - } else { - return "" } + return "" } diff --git a/pkg/functions_test/templates_test.go b/pkg/functions_test/templates_test.go index e4d87272..acc315c7 100644 --- a/pkg/functions_test/templates_test.go +++ b/pkg/functions_test/templates_test.go @@ -22,6 +22,7 @@ package functions_test import ( "bytes" + "github.com/stretchr/testify/require" "strconv" "testing" "text/template" @@ -131,11 +132,11 @@ func TestNestedPassingContext(t *testing.T) { var b bytes.Buffer - //* + // * data := struct { C string }{"10"} - //*/ + // */ err = aggregate.Execute(&b, data) if err != nil { @@ -281,7 +282,7 @@ func TestTemplatesWithValueFromListAtIndex(t *testing.T) { } var expectedOne bytes.Buffer - tOne.Execute(&expectedOne, nil) + require.NoError(t, tOne.Execute(&expectedOne, nil)) tTwo, err := v.New("user").Parse(templateTwo) if err != nil { @@ -289,7 +290,7 @@ func TestTemplatesWithValueFromListAtIndex(t *testing.T) { } var expectedTwo bytes.Buffer - tTwo.Execute(&expectedTwo, nil) + require.NoError(t, tTwo.Execute(&expectedTwo, nil)) check, err := v.New("order").Parse(checkTemplate) if err != nil { @@ -321,7 +322,7 @@ func TestTemplatesWithValueFromListAtIndex_greater_than_length(t *testing.T) { } var expectedOne bytes.Buffer - tOne.Execute(&expectedOne, nil) + require.NoError(t, tOne.Execute(&expectedOne, nil)) tTwo, err := v.New("user").Parse(templateTwo) if err != nil { @@ -329,7 +330,7 @@ func TestTemplatesWithValueFromListAtIndex_greater_than_length(t *testing.T) { } var expectedTwo bytes.Buffer - tTwo.Execute(&expectedTwo, nil) + require.NoError(t, tTwo.Execute(&expectedTwo, nil)) check, err := v.New("order").Parse(checkTemplate) if err != nil { diff --git a/pkg/producers/console/ConsoleProducer.go b/pkg/producers/console/ConsoleProducer.go index 8fcdbe9b..3b46e8b9 100644 --- a/pkg/producers/console/ConsoleProducer.go +++ b/pkg/producers/console/ConsoleProducer.go @@ -26,16 +26,16 @@ import ( "github.com/jrnd-io/jr/pkg/tpl" ) -type ConsoleProducer struct { +type Producer struct { OutputTpl *tpl.Tpl } -func (c *ConsoleProducer) Close(_ context.Context) error { +func (c *Producer) Close(_ context.Context) error { // no need to close return nil } -func (c *ConsoleProducer) Produce(_ context.Context, key []byte, value []byte, _ any) { +func (c *Producer) Produce(_ context.Context, key []byte, value []byte, _ any) { data := struct { K string diff --git a/pkg/producers/elastic/elasticProducer.go b/pkg/producers/elastic/elasticProducer.go index b18b2851..0077487b 100644 --- a/pkg/producers/elastic/elasticProducer.go +++ b/pkg/producers/elastic/elasticProducer.go @@ -43,12 +43,12 @@ type Config struct { ElasticPassword string `json:"password"` } -type ElasticProducer struct { - client elasticsearch.Client +type Producer struct { + client *elasticsearch.Client index string } -func (p *ElasticProducer) Initialize(configFile string) { +func (p *Producer) Initialize(configFile string) { var config Config file, err := os.ReadFile(configFile) if err != nil { @@ -83,14 +83,14 @@ func (p *ElasticProducer) Initialize(configFile string) { } p.index = config.ElasticIndex - p.client = *client + p.client = client } -func (p *ElasticProducer) Produce(ctx context.Context, k []byte, v []byte, _ any) { +func (p *Producer) Produce(ctx context.Context, k []byte, v []byte, _ any) { var req esapi.IndexRequest - if k == nil || len(k) == 0 { + if len(k) == 0 { // generate a UUID as index id := uuid.New() @@ -109,7 +109,7 @@ func (p *ElasticProducer) Produce(ctx context.Context, k []byte, v []byte, _ any } } - res, err := req.Do(ctx, &p.client) + res, err := req.Do(ctx, p.client) if err != nil { log.Fatal().Err(err).Msg("Failed to write data in Elastic") } @@ -120,7 +120,7 @@ func (p *ElasticProducer) Produce(ctx context.Context, k []byte, v []byte, _ any } } -func (p *ElasticProducer) Close(_ context.Context) error { +func (p *Producer) Close(_ context.Context) error { log.Warn().Msg("elasticsearch Client doesn't provide a close method!") return nil } diff --git a/pkg/producers/gcs/gcsProducer.go b/pkg/producers/gcs/gcsProducer.go index 53d0c013..bf46ab2b 100644 --- a/pkg/producers/gcs/gcsProducer.go +++ b/pkg/producers/gcs/gcsProducer.go @@ -36,12 +36,12 @@ type Config struct { Bucket string `json:"bucket_name"` } -type GCSProducer struct { +type Producer struct { client storage.Client bucket string } -func (p *GCSProducer) Initialize(ctx context.Context, configFile string) { +func (p *Producer) Initialize(ctx context.Context, configFile string) { var config Config file, err := os.ReadFile(configFile) if err != nil { @@ -65,7 +65,7 @@ func (p *GCSProducer) Initialize(ctx context.Context, configFile string) { p.bucket = config.Bucket } -func (p *GCSProducer) Produce(ctx context.Context, k []byte, v []byte, _ any) { +func (p *Producer) Produce(ctx context.Context, k []byte, v []byte, _ any) { bucket := p.bucket var key string @@ -89,7 +89,7 @@ func (p *GCSProducer) Produce(ctx context.Context, k []byte, v []byte, _ any) { } -func (p *GCSProducer) Close(_ context.Context) error { +func (p *Producer) Close(_ context.Context) error { p.client.Close() return nil } diff --git a/pkg/producers/http/producer.go b/pkg/producers/http/producer.go index 6cdab83c..570dd883 100644 --- a/pkg/producers/http/producer.go +++ b/pkg/producers/http/producer.go @@ -142,7 +142,8 @@ func (p *Producer) Produce(_ context.Context, _ []byte, v []byte, _ any) { req := p.client.R(). SetBody(v) - resp := &resty.Response{} + var resp *resty.Response + switch p.configuration.Endpoint.Method { case POST: resp, err = req.Post(p.configuration.Endpoint.URL) diff --git a/pkg/producers/http/producer_test.go b/pkg/producers/http/producer_test.go index 1a9efb32..06df0d39 100644 --- a/pkg/producers/http/producer_test.go +++ b/pkg/producers/http/producer_test.go @@ -232,8 +232,6 @@ func TestProducer(t *testing.T) { } for _, tc := range testCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { producer := phttp.Producer{} producer.InitializeFromConfig(tc.config) diff --git a/pkg/producers/kafka/kafkaProducer.go b/pkg/producers/kafka/kafkaProducer.go index 8f3ed546..6f2e7387 100644 --- a/pkg/producers/kafka/kafkaProducer.go +++ b/pkg/producers/kafka/kafkaProducer.go @@ -49,7 +49,7 @@ import ( "github.com/rs/zerolog/log" ) -type KafkaManager struct { +type Manager struct { producer *kafka.Producer admin *kafka.AdminClient schema schemaregistry.Client @@ -60,7 +60,7 @@ type KafkaManager struct { fleEnabled bool } -func (k *KafkaManager) Initialize(configFile string) { +func (k *Manager) Initialize(configFile string) { var err error conf := convertInKafkaConfig(readConfig(configFile)) @@ -74,7 +74,7 @@ func (k *KafkaManager) Initialize(configFile string) { } } -func (k *KafkaManager) InitializeSchemaRegistry(configFile string) { +func (k *Manager) InitializeSchemaRegistry(configFile string) { var err error conf := readConfig(configFile) @@ -94,80 +94,82 @@ func (k *KafkaManager) InitializeSchemaRegistry(configFile string) { k.schemaRegistry = true } -func verifyCSFLE(conf map[string]string, k *KafkaManager) { - if conf["kekName"] != "" && conf["kmsType"] != "" && conf["kmsKeyID"] != "" { - registerProviders() +func verifyCSFLE(conf map[string]string, k *Manager) { + if conf["kekName"] == "" || conf["kmsType"] == "" || conf["kmsKeyID"] == "" { + return + } - // load avro schema file: CSFLE requires schema registration - _, currentFilePath, _, _ := runtime.Caller(0) - currentDir := filepath.Dir(currentFilePath) - filePath := filepath.Join(currentDir, "../../types/"+k.TemplateType+".avsc") - file, err := os.Open(filePath) - if err != nil { - log.Fatal().Err(err).Msg("Failed to open file") - } - defer file.Close() + registerProviders() - content, err := io.ReadAll(file) - if err != nil { - log.Fatal().Err(err).Msg("Failed to read file") - } - contentString := string(content) - - // check presence of PII in schema - substring := `"confluent:tags": [ "PII" ]` - normalizedJSON := normalizeWhitespace(contentString) - normalizedSubstring := normalizeWhitespace(substring) - - if strings.Contains(normalizedJSON, normalizedSubstring) { - // upper-casing the first letter of the fields --> name - required by https://pkg.go.dev/github.com/actgardner/gogen-avro#readme-naming - re := regexp.MustCompile(`"name"\s*:\s*"([^"]+)"`) - result := re.ReplaceAllStringFunc(contentString, func(match string) string { - // extract the name part after "name: " - parts := re.FindStringSubmatch(match) - fmt.Print(len(parts)) - if len(parts) > 1 { - name := parts[1] - // capitalize the first letter of the name - capitalized := capitalizeFirstLetter(name) - // replace the original match with the new capitalized version - return "\"name\":" + "\"" + capitalized + "\"" - } - return match - }) - - // register the avro schema adding rule set: PII - schema := schemaregistry.SchemaInfo{ - Schema: result, - SchemaType: "AVRO", - RuleSet: &schemaregistry.RuleSet{ - DomainRules: []schemaregistry.Rule{ - { - Name: "encryptPII", - Kind: "TRANSFORM", - Mode: "WRITEREAD", - Type: "ENCRYPT", - Tags: []string{"PII"}, - Params: map[string]string{ - "encrypt.kek.name": conf["kekName"], - "encrypt.kms.type": conf["kmsType"], - "encrypt.kms.key.id": conf["kmsKeyID"], - }, - OnFailure: "ERROR,NONE", - }, - }, - }, - } - _, err = k.schema.Register(k.Topic+"-value", schema, true) - if err != nil { - log.Fatal().Err(err).Msg("Failed to register schema") - } + // load avro schema file: CSFLE requires schema registration + _, currentFilePath, _, _ := runtime.Caller(0) + currentDir := filepath.Dir(currentFilePath) + filePath := filepath.Join(currentDir, "../../types/"+k.TemplateType+".avsc") + file, err := os.Open(filePath) + if err != nil { + log.Fatal().Err(err).Msg("Failed to open file") + } + defer file.Close() - k.fleEnabled = true + content, err := io.ReadAll(file) + if err != nil { + log.Fatal().Err(err).Msg("Failed to read file") + } + contentString := string(content) - } + // check presence of PII in schema + substring := `"confluent:tags": [ "PII" ]` + normalizedJSON := normalizeWhitespace(contentString) + normalizedSubstring := normalizeWhitespace(substring) + + if !strings.Contains(normalizedJSON, normalizedSubstring) { + return + } + // upper-casing the first letter of the fields --> name - required by https://pkg.go.dev/github.com/actgardner/gogen-avro#readme-naming + re := regexp.MustCompile(`"name"\s*:\s*"([^"]+)"`) + result := re.ReplaceAllStringFunc(contentString, func(match string) string { + // extract the name part after "name: " + parts := re.FindStringSubmatch(match) + fmt.Print(len(parts)) + if len(parts) > 1 { + name := parts[1] + // capitalize the first letter of the name + capitalized := capitalizeFirstLetter(name) + // replace the original match with the new capitalized version + return "\"name\":" + "\"" + capitalized + "\"" + } + return match + }) + + // register the avro schema adding rule set: PII + schema := schemaregistry.SchemaInfo{ + Schema: result, + SchemaType: "AVRO", + RuleSet: &schemaregistry.RuleSet{ + DomainRules: []schemaregistry.Rule{ + { + Name: "encryptPII", + Kind: "TRANSFORM", + Mode: "WRITEREAD", + Type: "ENCRYPT", + Tags: []string{"PII"}, + Params: map[string]string{ + "encrypt.kek.name": conf["kekName"], + "encrypt.kms.type": conf["kmsType"], + "encrypt.kms.key.id": conf["kmsKeyID"], + }, + OnFailure: "ERROR,NONE", + }, + }, + }, } + _, err = k.schema.Register(k.Topic+"-value", schema, true) + if err != nil { + log.Fatal().Err(err).Msg("Failed to register schema") + } + + k.fleEnabled = true } func registerProviders() { @@ -178,14 +180,14 @@ func registerProviders() { encryption.Register() } -func (k *KafkaManager) Close(_ context.Context) error { +func (k *Manager) Close(_ context.Context) error { k.admin.Close() k.producer.Flush(15 * 1000) k.producer.Close() return nil } -func (k *KafkaManager) Produce(_ context.Context, key []byte, data []byte, _ any) { +func (k *Manager) Produce(_ context.Context, key []byte, data []byte, _ any) { go listenToEventsFrom(k.producer, k.Topic) @@ -254,15 +256,15 @@ func (k *KafkaManager) Produce(_ context.Context, key []byte, data []byte, _ any } -func (k *KafkaManager) CreateTopic(topic string) { - k.CreateTopicFull(topic, 6, 3) +func (k *Manager) CreateTopic(ctx context.Context, topic string) { + k.CreateTopicFull(ctx, topic, 6, 3) } -func (k *KafkaManager) CreateTopicFull(topic string, partitions int, rf int) { +func (k *Manager) CreateTopicFull(ctx context.Context, topic string, partitions int, rf int) { // Contexts are used to abort or limit the amount of time // the Admin call blocks waiting for a result. - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() // Create topics on cluster. @@ -287,7 +289,7 @@ func (k *KafkaManager) CreateTopicFull(topic string, partitions int, rf int) { log.Fatal(). Str("topic", result.Topic). Str("code", result.Error.Code().String()). - Err(fmt.Errorf(result.Error.String())). + Err(result.Error). Msg("Topic creation failed") } } @@ -365,8 +367,7 @@ func readConfig(configFile string) map[string]string { } func convertInKafkaConfig(m map[string]string) kafka.ConfigMap { - var conf kafka.ConfigMap - conf = make(map[string]kafka.ConfigValue) + conf := make(map[string]kafka.ConfigValue) for k, v := range m { conf[k] = v } diff --git a/pkg/producers/luascript/producer.go b/pkg/producers/luascript/producer.go index 2987354f..70e68f5e 100644 --- a/pkg/producers/luascript/producer.go +++ b/pkg/producers/luascript/producer.go @@ -100,6 +100,6 @@ func (p *Producer) Produce(_ context.Context, k []byte, v []byte, _ any) { } -func (p *Producer) Close(ctx context.Context) error { +func (p *Producer) Close(_ context.Context) error { return nil } diff --git a/pkg/producers/luascript/producer_test.go b/pkg/producers/luascript/producer_test.go index 90e8343a..fc6148d8 100644 --- a/pkg/producers/luascript/producer_test.go +++ b/pkg/producers/luascript/producer_test.go @@ -41,10 +41,9 @@ func TestProducer(t *testing.T) { }, } for _, tc := range testCases { - tc := tc someJson := `{"key": "value"}` - t.Run(tc.name, func(t *testing.T) { + t.Run(tc.name, func(_ *testing.T) { p := &luascript.Producer{} p.InitializeFromConfig(tc.config) p.Produce(context.TODO(), []byte("somekey"), []byte(someJson), nil) diff --git a/pkg/producers/mongoDB/config-atlas.json.example b/pkg/producers/mongodb/config-atlas.json.example similarity index 100% rename from pkg/producers/mongoDB/config-atlas.json.example rename to pkg/producers/mongodb/config-atlas.json.example diff --git a/pkg/producers/mongoDB/config.json.example b/pkg/producers/mongodb/config.json.example similarity index 100% rename from pkg/producers/mongoDB/config.json.example rename to pkg/producers/mongodb/config.json.example diff --git a/pkg/producers/mongoDB/mongoProducer.go b/pkg/producers/mongodb/mongoProducer.go similarity index 98% rename from pkg/producers/mongoDB/mongoProducer.go rename to pkg/producers/mongodb/mongoProducer.go index a9cf0a6e..cd1ef318 100644 --- a/pkg/producers/mongoDB/mongoProducer.go +++ b/pkg/producers/mongodb/mongoProducer.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package mongoDB +package mongodb import ( "context" @@ -85,7 +85,7 @@ func (p *MongoProducer) Produce(ctx context.Context, k []byte, v []byte, _ any) log.Fatal().Err(err).Msg("Failed to unmarshal json") } - if k == nil || len(k) == 0 { + if len(k) == 0 { dev["_id"] = string(k) } diff --git a/pkg/producers/mongoDB/mongoProducer_test.go b/pkg/producers/mongodb/mongoProducer_test.go similarity index 99% rename from pkg/producers/mongoDB/mongoProducer_test.go rename to pkg/producers/mongodb/mongoProducer_test.go index d9e50c9e..a7ef5622 100644 --- a/pkg/producers/mongoDB/mongoProducer_test.go +++ b/pkg/producers/mongodb/mongoProducer_test.go @@ -20,7 +20,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package mongoDB +package mongodb import ( "context" diff --git a/pkg/producers/redis/jsonWriter.go b/pkg/producers/redis/jsonWriter.go index 8aa63cd2..0fb780be 100644 --- a/pkg/producers/redis/jsonWriter.go +++ b/pkg/producers/redis/jsonWriter.go @@ -23,6 +23,7 @@ package redis import ( "bufio" "encoding/json" + "errors" "fmt" "net" "os" @@ -38,20 +39,20 @@ const STAR = "*" const NEW_LINE = '\n' const PLUS = "+" -type RedisConfig struct { +type Config struct { Host string `json:"host"` Port string `json:"port"` Username string `json:"username"` Password string `json:"password"` } -type RedisClient struct { - config RedisConfig +type Client struct { + config Config conn net.Conn } -func InitializeJsonWriter(filename string) (RedisConfig, error) { - var config RedisConfig +func InitializeJsonWriter(filename string) (Config, error) { + var config Config data, err := os.ReadFile(filename) if err != nil { return config, err @@ -65,7 +66,7 @@ func InitializeJsonWriter(filename string) (RedisConfig, error) { return config, nil } -func (r *RedisClient) Connect() error { +func (r *Client) Connect() error { addr := fmt.Sprintf("%s:%s", r.config.Host, r.config.Port) conn, err := net.Dial("tcp", addr) if err != nil { @@ -92,7 +93,7 @@ func (r *RedisClient) Connect() error { if strings.HasPrefix(authResponse, "-") { msg := fmt.Sprint("Authentication failed:", strings.TrimSpace(authResponse[1:])) fmt.Println(msg) - return fmt.Errorf(msg) + return errors.New(msg) } } @@ -100,11 +101,11 @@ func (r *RedisClient) Connect() error { return nil } -func (r *RedisClient) Disconnect() { +func (r *Client) Disconnect() { r.conn.Close() } -func (r *RedisClient) Set(key, value, path string) error { +func (r *Client) Set(key, value, path string) error { args := make([]string, 4) args[0] = JSON_SET args[1] = key @@ -133,19 +134,19 @@ func (r *RedisClient) Set(key, value, path string) error { if !strings.HasPrefix(response, PLUS) { msg := fmt.Sprintf("Unexpected response from Redis: %s", response) - return fmt.Errorf(msg) + return errors.New(msg) } return nil } -func (r *RedisClient) Get(key, path string) (string, error) { +func (r *Client) Get(key, path string) (string, error) { args := make([]string, 2, 3) args[0] = JSON_GET args[1] = key if path != "" { - args[2] = path + args = append(args, path) } setCmd := fmt.Sprint(STAR, len(args), CRLF) @@ -166,7 +167,7 @@ func (r *RedisClient) Get(key, path string) (string, error) { if !strings.HasPrefix(response, DOLLAR) { msg := fmt.Sprintf("Unexpected response from Redis: %s", response) - return msg, fmt.Errorf(msg) + return msg, errors.New(msg) } valueLen, err := strconv.Atoi(response[1 : len(response)-2]) diff --git a/pkg/producers/redis/jsonWriter_test.go b/pkg/producers/redis/jsonWriter_test.go index 29d09f70..2b08c4c0 100644 --- a/pkg/producers/redis/jsonWriter_test.go +++ b/pkg/producers/redis/jsonWriter_test.go @@ -32,7 +32,7 @@ func TestConnectAndClose(t *testing.T) { t.Fatalf("Error reading configuration file: %v", err) } - client := RedisClient{config: config} + client := Client{config: config} err = client.Connect() if err != nil { @@ -49,7 +49,7 @@ func TestSetAndGet(t *testing.T) { t.Fatalf("Error reading configuration file: %v", err) } - client := RedisClient{config: options} + client := Client{config: options} err = client.Connect() if err != nil { diff --git a/pkg/producers/redis/redisProducer.go b/pkg/producers/redis/redisProducer.go index 4f16240f..bdd171d6 100644 --- a/pkg/producers/redis/redisProducer.go +++ b/pkg/producers/redis/redisProducer.go @@ -30,12 +30,12 @@ import ( "github.com/rs/zerolog/log" ) -type RedisProducer struct { +type Producer struct { client redis.Client Ttl time.Duration } -func (p *RedisProducer) Initialize(configFile string) { +func (p *Producer) Initialize(configFile string) { var options redis.Options data, err := os.ReadFile(configFile) @@ -51,7 +51,7 @@ func (p *RedisProducer) Initialize(configFile string) { p.client = *redis.NewClient(&options) } -func (p *RedisProducer) Close(_ context.Context) error { +func (p *Producer) Close(_ context.Context) error { err := p.client.Close() if err != nil { log.Warn().Err(err).Msg("Failed to close Redis connection") @@ -59,7 +59,7 @@ func (p *RedisProducer) Close(_ context.Context) error { return err } -func (p *RedisProducer) Produce(ctx context.Context, k []byte, v []byte, _ any) { +func (p *Producer) Produce(ctx context.Context, k []byte, v []byte, _ any) { err := p.client.Set(ctx, string(k), string(v), p.Ttl).Err() if err != nil { log.Fatal().Err(err).Msg("Failed to write data in Redis") diff --git a/pkg/producers/s3/s3Producer.go b/pkg/producers/s3/s3Producer.go index a4ed2279..06b40d52 100644 --- a/pkg/producers/s3/s3Producer.go +++ b/pkg/producers/s3/s3Producer.go @@ -38,12 +38,12 @@ type Config struct { Bucket string `json:"bucket"` } -type S3Producer struct { +type Producer struct { client *s3.Client bucket string } -func (p *S3Producer) Initialize(ctx context.Context, configFile string) { +func (p *Producer) Initialize(ctx context.Context, configFile string) { var config Config file, err := os.ReadFile(configFile) if err != nil { @@ -65,7 +65,7 @@ func (p *S3Producer) Initialize(ctx context.Context, configFile string) { p.bucket = config.Bucket } -func (p *S3Producer) Produce(ctx context.Context, k []byte, v []byte, _ any) { +func (p *Producer) Produce(ctx context.Context, k []byte, v []byte, _ any) { bucket := p.bucket var key string @@ -89,7 +89,7 @@ func (p *S3Producer) Produce(ctx context.Context, k []byte, v []byte, _ any) { } } -func (p *S3Producer) Close(_ context.Context) error { +func (p *Producer) Close(_ context.Context) error { log.Warn().Msg("S3 Client doesn't provide a close method!") return nil } diff --git a/pkg/producers/server/JsonProducer.go b/pkg/producers/server/JsonProducer.go index a7832635..a4df9f07 100644 --- a/pkg/producers/server/JsonProducer.go +++ b/pkg/producers/server/JsonProducer.go @@ -39,24 +39,25 @@ func (c *JsonProducer) Close(_ context.Context) error { func (c *JsonProducer) Produce(_ context.Context, key []byte, value []byte, o any) { - if o != nil { - respWriter := o.(http.ResponseWriter) - if string(key) != "null" { - _, err := (respWriter).Write(key) - if err != nil { - log.Error().Err(err).Msg("Error writing key") - } - _, err = (respWriter).Write([]byte(",")) - if err != nil { - log.Error().Err(err).Msg("Error writing comma") - } + if o == nil { + log.Warn().Interface("o", o).Msg("Server producer must produce to a http.ResponseWriter") + return + } + + respWriter := o.(http.ResponseWriter) + if string(key) != "null" { + _, err := (respWriter).Write(key) + if err != nil { + log.Error().Err(err).Msg("Error writing key") } - _, err := (respWriter).Write(value) + _, err = (respWriter).Write([]byte(",")) if err != nil { - log.Error().Err(err).Msg("Error writing value") + log.Error().Err(err).Msg("Error writing comma") } - } else { - log.Warn().Interface("o", o).Msg("Server producer must produce to a http.ResponseWriter") + } + _, err := (respWriter).Write(value) + if err != nil { + log.Error().Err(err).Msg("Error writing value") } } diff --git a/pkg/producers/test/TestProducer.go b/pkg/producers/test/TestProducer.go index 70544069..23d3e46b 100644 --- a/pkg/producers/test/TestProducer.go +++ b/pkg/producers/test/TestProducer.go @@ -28,35 +28,36 @@ import ( "github.com/rs/zerolog/log" ) -type TestProducer struct { +type Producer struct { OutputTpl *tpl.Tpl } -func (c *TestProducer) Close(ctx context.Context) error { +func (c *Producer) Close(_ context.Context) error { // no need to close return nil } -func (c *TestProducer) Produce(_ context.Context, key []byte, value []byte, o any) { - - if o != nil { - respWriter := o.(*bytes.Buffer) - if string(key) != "null" { - _, err := (respWriter).Write(key) - if err != nil { - log.Error().Err(err).Msg("Error writing key") - } - _, err = (respWriter).Write([]byte(",")) - if err != nil { - log.Error().Err(err).Msg("Error writing comma") - } +func (c *Producer) Produce(_ context.Context, key []byte, value []byte, o any) { + + if o == nil { + log.Warn().Interface("o", o).Msg("Test producer must produce to a bytes.Buffer") + return + } + + respWriter := o.(*bytes.Buffer) + if string(key) != "null" { + _, err := (respWriter).Write(key) + if err != nil { + log.Error().Err(err).Msg("Error writing key") } - _, err := (respWriter).Write(value) + _, err = (respWriter).Write([]byte(",")) if err != nil { - log.Error().Err(err).Msg("Error writing value") + log.Error().Err(err).Msg("Error writing comma") } - } else { - log.Warn().Interface("o", o).Msg("Test producer must produce to a bytes.Buffer") + } + _, err := (respWriter).Write(value) + if err != nil { + log.Error().Err(err).Msg("Error writing value") } } diff --git a/rpm/jr.spec b/rpm/jr.spec new file mode 100644 index 00000000..9bc4cab2 --- /dev/null +++ b/rpm/jr.spec @@ -0,0 +1,45 @@ +Name: jr +Version: 0.4.0 +Release: 1%{?dist} +Summary: JR: streaming quality random data from the command line + +License: MIT +URL: https://jrnd.io/ +Source0: https://github.com/jrnd-io/%{name}/archive/refs/tags/v%{version}.tar.gz + +BuildRequires: golang >= 1.23.0 +BuildRequires: make + +%description +JR is a CLI program that helps you to stream quality random data for your applications. + +%global debug_package %{nil} + +%prep +%setup -q + +%build +make all %{?_smp_mflags} + +%install +mkdir -p %{buildroot}/usr/bin +install -m 0755 %{_builddir}/%{name}-%{version}/build/jr %{buildroot}/usr/bin/jr + +# Copy templates section +mkdir -p %{buildroot}%{_datadir}/jr/ +cp -rf %{_builddir}/%{name}-%{version}/templates %{buildroot}%{_datadir}/jr/ + +# Copy config section +cp -rf %{_builddir}/%{name}-%{version}/config %{buildroot}%{_datadir}/jr/ + +%files +%license LICENSE +%{_bindir}/%{name} +%{_datadir}/jr/ + +%changelog +* Thu Aug 22 2024 Gianni Salinetti - v0.3.9 +- v0.3.9 release, includes jr default config files in /usr/share/jr +* Fri Aug 16 2024 Gianni Salinetti - v0.3.8 +- First jr package, templates included in /usr/share/jr/templates but still not seen by the program + diff --git a/snap/snapcraft.yaml b/snap/snapcraft.yaml index a86d69b2..b4f4a10b 100644 --- a/snap/snapcraft.yaml +++ b/snap/snapcraft.yaml @@ -12,7 +12,7 @@ apps: jr: command: bin/jr environment: - JR_HOME: $SNAP + JR_SYSTEM_DIR: $SNAP plugs: - home