Skip to content

Commit

Permalink
Merge pull request #101 from eljeko/main
Browse files Browse the repository at this point in the history
Closes #75
  • Loading branch information
ugol authored Jul 27, 2023
2 parents 6c17661 + a10aa5d commit 77f9200
Show file tree
Hide file tree
Showing 12 changed files with 318 additions and 4 deletions.
9 changes: 7 additions & 2 deletions pkg/cmd/templateRun.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ jr template run --template "{{name}}"
seed, _ := cmd.Flags().GetInt64("seed")
topic, _ := cmd.Flags().GetString("topic")
preload, _ := cmd.Flags().GetInt("preload")


csv, _ := cmd.Flags().GetString("csv")

if kcat {
oneline = true
output = "stdout"
Expand Down Expand Up @@ -112,7 +114,7 @@ jr template run --template "{{name}}"

e := emitter.Emitter{
Name: name,
Locale: locale,
Locale: locale,
Num: num,
Frequency: frequency,
Duration: duration,
Expand All @@ -125,6 +127,7 @@ jr template run --template "{{name}}"
Topic: topic,
Kcat: kcat,
Oneline: oneline,
Csv: csv,
}

functions.SetSeed(seed)
Expand All @@ -144,6 +147,8 @@ func init() {

templateRunCmd.Flags().Int64("seed", time.Now().UTC().UnixNano(), "Seed to init pseudorandom generator")

templateRunCmd.Flags().String("csv", "", "Path to csv file to use")

templateRunCmd.Flags().StringP("kafkaConfig", "F", "", "Kafka configuration")
templateRunCmd.Flags().String("registryConfig", "", "Kafka configuration")
templateRunCmd.Flags().Bool("embedded", false, "If enabled, [template] must be a string containing a template, to be embedded directly in the script")
Expand Down
5 changes: 5 additions & 0 deletions pkg/ctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ type Context struct {
CtxLock sync.RWMutex
CtxList map[string][]string
CtxListLock sync.RWMutex
CtxCSV map[int]map[string]string
CtxCSVLock sync.RWMutex
LastIndex int
CountryIndex int
CityIndex int
CurrentIterationLoopIndex int
}

func init() {
Expand All @@ -57,6 +60,8 @@ func init() {
CtxLock: sync.RWMutex{},
CtxList: make(map[string][]string),
CtxListLock: sync.RWMutex{},
CtxCSV: make(map[int]map[string]string),
CtxCSVLock: sync.RWMutex{},
LastIndex: -1,
CountryIndex: 232,
CityIndex: -1,
Expand Down
4 changes: 4 additions & 0 deletions pkg/emitter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,16 @@ type Emitter struct {
Topic string `mapstructure:"topic"`
Kcat bool `mapstructure:"kcat"`
Oneline bool `mapstructure:"oneline"`
Csv string `mapstructure:"csv"`
Producer Producer
KTpl tpl.Tpl
VTpl tpl.Tpl
}

func (e *Emitter) Initialize(conf configuration.GlobalConfiguration) {

functions.InitCSV(e.Csv)

templateName := e.ValueTemplate
if e.EmbeddedTemplate == "" {
path := os.ExpandEnv(fmt.Sprintf("%s/%s", constants.JRhome, "templates"))
Expand All @@ -71,6 +74,7 @@ func (e *Emitter) Initialize(conf configuration.GlobalConfiguration) {
log.Printf("Template '%s' not found in %s\n", templateName, path)
}
}


keyTpl, err := tpl.NewTpl("key", e.KeyTemplate, functions.FunctionsMap(), &ctx.JrContext)
if err != nil {
Expand Down
10 changes: 8 additions & 2 deletions pkg/emitter/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,15 @@ func DoLoop(es []Emitter) {
defer stop()

for i := 0; i < numTimers; i++ {


index := i

stopChannels[i] = make(chan struct{})

go func(timerIndex int) {
defer wg.Done()

frequency := es[timerIndex].Frequency
if frequency > 0 {
ticker := time.NewTicker(es[timerIndex].Frequency)
Expand Down Expand Up @@ -113,8 +115,12 @@ func doTemplate(emitter Emitter) {
ctx.JrContext.Locale = emitter.Locale
ctx.JrContext.CountryIndex = functions.IndexOf(strings.ToUpper(emitter.Locale), "country")

for i := 0; i < emitter.Num; i++ {



for i := 0; i < emitter.Num; i++ {
ctx.JrContext.CurrentIterationLoopIndex++

k := emitter.KTpl.Execute()
v := emitter.VTpl.Execute()
if emitter.Oneline {
Expand Down
60 changes: 60 additions & 0 deletions pkg/functions/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package functions

import (
"bufio"
"encoding/csv"
"fmt"
"github.com/google/uuid"
"github.com/ugol/jr/pkg/constants"
Expand Down Expand Up @@ -192,6 +193,7 @@ var fmap = map[string]interface{}{
"random_n_v_from_list": RandomNValuesFromList,
"get_v": GetV,
"set_v": SetV,
"fromcsv": fromcsv,
}

// Seed sets seeds and can be used in a template
Expand Down Expand Up @@ -448,3 +450,61 @@ func initialize(filename string) []string {

return words
}

func InitCSV(csvpath string) {
//Loads the csv file in the context
if len(csvpath) > 0 {

var csvHeaders = make(map[int]string)
csvValues := make(map[int]map[string]string)

if _, err := os.Stat(csvpath); err != nil {
println("File does not exist: ", csvpath)
os.Exit(1)
}

file, err := os.Open(csvpath)

if err != nil {
println("Error opening file:", csvpath, "error:", err)
os.Exit(1)
} else {
reader := csv.NewReader(file)

lines, err := reader.ReadAll()
if err != nil {
println("Error reading CSV file:", csvpath, "error:", err)
os.Exit(1)
} else {

for row := 0; row < len(lines); row++ {
var aRow = lines[row]
for col := 0; col < len(aRow); col++ {
var value = aRow[col]
//print(" ROW -> ",row)
if row == 0 {
//print(" H: ", value)
csvHeaders[col] = strings.Trim(value, " ")
} else {

val, exists := csvValues[row-1]
if exists {
val[csvHeaders[col]] = strings.Trim(value, " ")
csvValues[row-1] = val
} else {
var localmap = make(map[string]string)
localmap[csvHeaders[col]] = strings.Trim(value, " ")
csvValues[row-1] = localmap
}
// print(" V: ", value)
}
}
//println()
}
ctx.JrContext.CtxCSV = csvValues
}
}
defer file.Close()
}

}
13 changes: 13 additions & 0 deletions pkg/functions/utilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,16 @@ func Contains(s []string, str string) bool {
}
return false
}

// get the label value from csv file
func fromcsv(c string) string {
ctx.JrContext.CtxCSVLock.Lock()
defer ctx.JrContext.CtxCSVLock.Unlock()

if len(ctx.JrContext.CtxCSV) > 0 {
return ctx.JrContext.CtxCSV[(ctx.JrContext.CurrentIterationLoopIndex-1)%len(ctx.JrContext.CtxCSV)][c]
} else {
return ""
}

}
99 changes: 99 additions & 0 deletions pkg/functions_test/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package functions_test
import (
"bytes"
"fmt"
"github.com/ugol/jr/pkg/ctx"
"github.com/ugol/jr/pkg/functions"
"testing"
"text/template"
Expand Down Expand Up @@ -160,3 +161,101 @@ func runtv(tpl, expect string, vars interface{}) error {
}
return nil
}


func TestParamFromCSV_odd(t *testing.T) {
functions.InitCSV("../../testfiles/test3.csv")

tpl := `{{fromcsv "NAME"}} {{fromcsv "SURNAME"}}`

ctx.JrContext.CurrentIterationLoopIndex++

if err := runt(tpl, "Jhon Brown"); err != nil {
t.Error(err)
}

ctx.JrContext.CurrentIterationLoopIndex++

if err := runt(tpl, "Mary White"); err != nil {
t.Error(err)
}

ctx.JrContext.CurrentIterationLoopIndex++

if err := runt(tpl, "Anna Green"); err != nil {
t.Error(err)
}

ctx.JrContext.CurrentIterationLoopIndex++

if err := runt(tpl, "Jhon Brown"); err != nil {
t.Error(err)
}

ctx.JrContext.CurrentIterationLoopIndex++

if err := runt(tpl, "Mary White"); err != nil {
t.Error(err)
}

ctx.JrContext.CurrentIterationLoopIndex++

if err := runt(tpl, "Anna Green"); err != nil {
t.Error(err)
}

ctx.JrContext.CurrentIterationLoopIndex++

if err := runt(tpl, "Jhon Brown"); err != nil {
t.Error(err)
}
}

func TestParamFromCSV_not_initialized(t *testing.T) {

ctx.JrContext.CtxCSV = make(map[int]map[string]string)

tpl := `{{fromcsv "NAME"}} {{fromcsv "SURNAME"}}`

ctx.JrContext.CurrentIterationLoopIndex++

if err := runt(tpl, " "); err != nil {
t.Error(err)
}
}

func TestParamFromCSV_even(t *testing.T) {
functions.InitCSV("../../testfiles/test2.csv")

tpl := `{{fromcsv "NAME"}} {{fromcsv "SURNAME"}}`

ctx.JrContext.CurrentIterationLoopIndex++

if err := runt(tpl, "Jhon Brown"); err != nil {
t.Error(err)
}

ctx.JrContext.CurrentIterationLoopIndex++

if err := runt(tpl, "Mary White"); err != nil {
t.Error(err)
}

ctx.JrContext.CurrentIterationLoopIndex++

if err := runt(tpl, "Jhon Brown"); err != nil {
t.Error(err)
}

ctx.JrContext.CurrentIterationLoopIndex++

if err := runt(tpl, "Mary White"); err != nil {
t.Error(err)
}

ctx.JrContext.CurrentIterationLoopIndex++

if err := runt(tpl, "Jhon Brown"); err != nil {
t.Error(err)
}
}
6 changes: 6 additions & 0 deletions templates/csv_product.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"product_id": "{{fromcsv "id"}}",
"name": "{{fromcsv "name"}}",
"brand": "{{fromcsv "brand"}}",
"page_url": "https://www.acme.com/product/{{random_string 4 5}}"
}
8 changes: 8 additions & 0 deletions templates/csv_user.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"age": {{integer 20 60}},
"eyeColor": "{{randoms "blue|brown|green"}}",
"name": "{{fromcsv "NAME"}} {{fromcsv "SURNAME"}}",
"gender": "{{gender}}",
"company": "{{company}}",
"email": "{{fromcsv "NAME"}}.{{fromcsv "SURNAME"}}@thelandoof.oz"
}
Loading

0 comments on commit 77f9200

Please sign in to comment.