Skip to content

Commit

Permalink
Merge pull request #188 from lburgazzoli/fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ugol authored Aug 22, 2024
2 parents 14616a3 + 937433a commit b873d99
Show file tree
Hide file tree
Showing 24 changed files with 131 additions and 126 deletions.
9 changes: 5 additions & 4 deletions pkg/cmd/emitterRun.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package cmd

import (
"context"
"github.com/jrnd-io/jr/pkg/emitter"
"github.com/spf13/cobra"
)
Expand All @@ -32,15 +33,15 @@ var emitterRunCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {

dryrun, _ := cmd.Flags().GetBool("dryrun")
RunEmitters(args, emitters2, dryrun)
RunEmitters(cmd.Context(), args, emitters2, dryrun)

},
}

func RunEmitters(emitterNames []string, ems map[string][]emitter.Emitter, dryrun bool) {
func RunEmitters(ctx context.Context, emitterNames []string, ems map[string][]emitter.Emitter, dryrun bool) {
defer emitter.WriteStats()
defer emitter.CloseProducers(ems)
emittersToRun := emitter.Initialize(emitterNames, ems, dryrun)
defer emitter.CloseProducers(ctx, ems)
emittersToRun := emitter.Initialize(ctx, emitterNames, ems, dryrun)
emitter.DoLoop(emittersToRun)
}

Expand Down
20 changes: 10 additions & 10 deletions pkg/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ var serverCmd = &cobra.Command{
router.Use(middleware.Timeout(60 * time.Second))
router.Use(SessionMiddleware)

//comment for local dev
// comment for local dev
embeddedFileRoutes(router)

//Uncomment for local dev
//localDevServerSetup(router)
// Uncomment for local dev
// localDevServerSetup(router)

router.Route("/emitters", func(r chi.Router) {
r.Get("/", listEmitters)
Expand Down Expand Up @@ -298,15 +298,15 @@ func addEmitter(w http.ResponseWriter, r *http.Request) {
}

func updateEmitter(w http.ResponseWriter, r *http.Request) {
//@TODO update emitter by name
// @TODO update emitter by name
}

func deleteEmitter(w http.ResponseWriter, r *http.Request) {
//@TODO delete emitter by name
// @TODO delete emitter by name
}

func startEmitter(w http.ResponseWriter, r *http.Request) {
//@TODO start emitter by name
// @TODO start emitter by name
w.Header().Set("Content-Type", "application/json")
url := chi.URLParam(r, "emitter")

Expand All @@ -317,7 +317,7 @@ func startEmitter(w http.ResponseWriter, r *http.Request) {
}

func stopEmitter(w http.ResponseWriter, r *http.Request) {
//@TODO stop emitter by name
// @TODO stop emitter by name
w.Header().Set("Content-Type", "application/json")
url := chi.URLParam(r, "emitter")

Expand All @@ -328,7 +328,7 @@ func stopEmitter(w http.ResponseWriter, r *http.Request) {
}

func pauseEmitter(w http.ResponseWriter, r *http.Request) {
//@TODO pause emitter by name
// @TODO pause emitter by name
w.Header().Set("Content-Type", "application/json")
url := chi.URLParam(r, "emitter")

Expand All @@ -345,7 +345,7 @@ func runEmitter(w http.ResponseWriter, r *http.Request) {
if firstRun[url] == false {
for i := 0; i < len(emitters); i++ {
if functions.Contains([]string{url}, emitters[i].Name) {
emitters[i].Initialize(configuration.GlobalCfg)
emitters[i].Initialize(r.Context(), configuration.GlobalCfg)
emitterToRun[url] = append(emitterToRun[url], emitters[i])
if emitters[i].Preload > 0 {
emitters[i].Run(emitters[i].Preload, w)
Expand All @@ -364,7 +364,7 @@ func runEmitter(w http.ResponseWriter, r *http.Request) {
}

func statusEmitter(w http.ResponseWriter, r *http.Request) {
//@TODO status emitter by name
// @TODO status emitter by name
w.Header().Set("Content-Type", "application/json")
url := chi.URLParam(r, "emitter")

Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/templateRun.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ jr template run --template "{{name}}"

functions.SetSeed(seed)
es := map[string][]emitter.Emitter{constants.DEFAULT_EMITTER_NAME: {e}}
RunEmitters([]string{e.Name}, es, false)
RunEmitters(cmd.Context(), []string{e.Name}, es, false)
},
}

Expand Down
75 changes: 38 additions & 37 deletions pkg/emitter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
package emitter

import (
"context"
"fmt"
"os"
"time"

"github.com/jrnd-io/jr/pkg/configuration"
"github.com/jrnd-io/jr/pkg/constants"
"github.com/jrnd-io/jr/pkg/ctx"
jtctx "github.com/jrnd-io/jr/pkg/ctx"
"github.com/jrnd-io/jr/pkg/functions"
"github.com/jrnd-io/jr/pkg/producers/awsdynamodb"
"github.com/jrnd-io/jr/pkg/producers/azblobstorage"
Expand Down Expand Up @@ -68,7 +69,7 @@ type Emitter struct {
VTpl tpl.Tpl
}

func (e *Emitter) Initialize(conf configuration.GlobalConfiguration) {
func (e *Emitter) Initialize(ctx context.Context, conf configuration.GlobalConfiguration) {

functions.InitCSV(e.Csv)

Expand All @@ -83,11 +84,11 @@ func (e *Emitter) Initialize(conf configuration.GlobalConfiguration) {
}
}

keyTpl, err := tpl.NewTpl("key", e.KeyTemplate, functions.FunctionsMap(), &ctx.JrContext)
keyTpl, err := tpl.NewTpl("key", e.KeyTemplate, functions.FunctionsMap(), &jtctx.JrContext)
if err != nil {
log.Fatal().Err(err).Msg("Failed to create key template")
}
valueTpl, err := tpl.NewTpl("value", e.EmbeddedTemplate, functions.FunctionsMap(), &ctx.JrContext)
valueTpl, err := tpl.NewTpl("value", e.EmbeddedTemplate, functions.FunctionsMap(), &jtctx.JrContext)
if err != nil {
log.Fatal().Err(err).Msg("Failed to create value template")
}
Expand All @@ -102,7 +103,7 @@ func (e *Emitter) Initialize(conf configuration.GlobalConfiguration) {
}

if e.Output == "kafka" {
e.Producer = createKafkaProducer(conf, e.Topic, templateName)
e.Producer = createKafkaProducer(ctx, conf, e.Topic, templateName)
return
} else {
if conf.SchemaRegistry {
Expand All @@ -111,41 +112,41 @@ func (e *Emitter) Initialize(conf configuration.GlobalConfiguration) {
}

if e.Output == "redis" {
e.Producer = createRedisProducer(conf.RedisTtl, conf.RedisConfig)
e.Producer = createRedisProducer(ctx, conf.RedisTtl, conf.RedisConfig)
return
}

if e.Output == "mongo" || e.Output == "mongodb" {
e.Producer = createMongoProducer(conf.MongoConfig)
e.Producer = createMongoProducer(ctx, conf.MongoConfig)
return
}

if e.Output == "elastic" {
e.Producer = createElasticProducer(conf.ElasticConfig)
e.Producer = createElasticProducer(ctx, conf.ElasticConfig)
return
}

if e.Output == "s3" {
e.Producer = createS3Producer(conf.S3Config)
e.Producer = createS3Producer(ctx, conf.S3Config)
return
}

if e.Output == "awsdynamodb" {
e.Producer = createAWSDynamoDB(conf.AWSDynamoDBConfig)
e.Producer = createAWSDynamoDB(ctx, conf.AWSDynamoDBConfig)
return
}

if e.Output == "gcs" {
e.Producer = createGCSProducer(conf.GCSConfig)
e.Producer = createGCSProducer(ctx, conf.GCSConfig)
return
}

if e.Output == "azblobstorage" {
e.Producer = createAZBlobStorageProducer(conf.AzBlobStorageConfig)
e.Producer = createAZBlobStorageProducer(ctx, conf.AzBlobStorageConfig)
return
}
if e.Output == "azcosmosdb" {
e.Producer = createAZCosmosDBProducer(conf.AzCosmosDBConfig)
e.Producer = createAZCosmosDBProducer(ctx, conf.AzCosmosDBConfig)
return
}

Expand All @@ -155,16 +156,16 @@ func (e *Emitter) Initialize(conf configuration.GlobalConfiguration) {
}

if e.Output == "http" {
e.Producer = createHTTPProducer(conf.HTTPConfig)
e.Producer = createHTTPProducer(ctx, conf.HTTPConfig)
return
}

if e.Output == "cassandra" {
e.Producer = createCassandraProducer(conf.CassandraConfig)
e.Producer = createCassandraProducer(ctx, conf.CassandraConfig)
return
}
if e.Output == "luascript" {
e.Producer = createLUAScriptProducer(conf.LUAScriptConfig)
e.Producer = createLUAScriptProducer(ctx, conf.LUAScriptConfig)
return
}

Expand All @@ -179,96 +180,96 @@ func (e *Emitter) Run(num int, o any) {
kInValue := functions.GetV("KEY")

if kInValue != "" {
e.Producer.Produce([]byte(kInValue), []byte(v), o)
e.Producer.Produce(context.TODO(), []byte(kInValue), []byte(v), o)
} else {
e.Producer.Produce([]byte(k), []byte(v), o)
e.Producer.Produce(context.TODO(), []byte(k), []byte(v), o)
}
ctx.JrContext.GeneratedObjects++
ctx.JrContext.GeneratedBytes += int64(len(v))
jtctx.JrContext.GeneratedObjects++
jtctx.JrContext.GeneratedBytes += int64(len(v))

}

}

func createRedisProducer(ttl time.Duration, redisConfig string) Producer {
func createRedisProducer(_ context.Context, ttl time.Duration, redisConfig string) Producer {
rProducer := &redis.RedisProducer{
Ttl: ttl,
}
rProducer.Initialize(redisConfig)
return rProducer
}

func createMongoProducer(mongoConfig string) Producer {
func createMongoProducer(ctx context.Context, mongoConfig string) Producer {
mProducer := &mongoDB.MongoProducer{}
mProducer.Initialize(mongoConfig)
mProducer.Initialize(ctx, mongoConfig)

return mProducer
}

func createElasticProducer(elasticConfig string) Producer {
func createElasticProducer(_ context.Context, elasticConfig string) Producer {
eProducer := &elastic.ElasticProducer{}
eProducer.Initialize(elasticConfig)

return eProducer
}

func createS3Producer(s3Config string) Producer {
func createS3Producer(ctx context.Context, s3Config string) Producer {
sProducer := &s3.S3Producer{}
sProducer.Initialize(s3Config)
sProducer.Initialize(ctx, s3Config)

return sProducer
}

func createAWSDynamoDB(config string) Producer {
func createAWSDynamoDB(ctx context.Context, config string) Producer {
producer := &awsdynamodb.Producer{}
producer.Initialize(config)
producer.Initialize(ctx, config)

return producer
}

func createAZBlobStorageProducer(azConfig string) Producer {
func createAZBlobStorageProducer(ctx context.Context, azConfig string) Producer {
producer := &azblobstorage.Producer{}
producer.Initialize(azConfig)
producer.Initialize(ctx, azConfig)

return producer
}

func createAZCosmosDBProducer(azConfig string) Producer {
func createAZCosmosDBProducer(_ context.Context, azConfig string) Producer {
producer := &azcosmosdb.Producer{}
producer.Initialize(azConfig)

return producer
}

func createGCSProducer(gcsConfig string) Producer {
func createGCSProducer(ctx context.Context, gcsConfig string) Producer {
gProducer := &gcs.GCSProducer{}
gProducer.Initialize(gcsConfig)
gProducer.Initialize(ctx, gcsConfig)

return gProducer
}

func createHTTPProducer(httpConfig string) Producer {
func createHTTPProducer(_ context.Context, httpConfig string) Producer {
httpProducer := &http.Producer{}
httpProducer.Initialize(httpConfig)

return httpProducer
}

func createCassandraProducer(config string) Producer {
func createCassandraProducer(_ context.Context, config string) Producer {
producer := &cassandra.Producer{}
producer.Initialize(config)

return producer
}

func createLUAScriptProducer(config string) Producer {
func createLUAScriptProducer(_ context.Context, config string) Producer {
producer := &luascript.Producer{}
producer.Initialize(config)

return producer
}

func createKafkaProducer(conf configuration.GlobalConfiguration, topic string, templateType string) *kafka.KafkaManager {
func createKafkaProducer(_ context.Context, conf configuration.GlobalConfiguration, topic string, templateType string) *kafka.KafkaManager {

kManager := &kafka.KafkaManager{
Serializer: conf.Serializer,
Expand Down
Loading

0 comments on commit b873d99

Please sign in to comment.