diff --git a/go.mod b/go.mod index a3db7dee..81162d01 100644 --- a/go.mod +++ b/go.mod @@ -126,6 +126,7 @@ require ( github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/technoweenie/multipartstreamer v1.0.1 // indirect + github.com/tetratelabs/wazero v1.8.0 // indirect github.com/tink-crypto/tink-go-gcpkms/v2 v2.1.0 // indirect github.com/tink-crypto/tink-go-hcvault/v2 v2.1.0 // indirect github.com/tink-crypto/tink-go/v2 v2.1.0 // indirect diff --git a/go.sum b/go.sum index fb8b528d..b44b5be6 100644 --- a/go.sum +++ b/go.sum @@ -623,6 +623,8 @@ github.com/testcontainers/testcontainers-go v0.31.0 h1:W0VwIhcEVhRflwL9as3dhY6jX github.com/testcontainers/testcontainers-go v0.31.0/go.mod h1:D2lAoA0zUFiSY+eAflqK5mcUx/A5hrrORaEQrd0SefI= github.com/testcontainers/testcontainers-go/modules/compose v0.31.0 h1:H74o3HisnApIDQx7sWibGzOl/Oo0By8DjyVeUf3qd6I= github.com/testcontainers/testcontainers-go/modules/compose v0.31.0/go.mod h1:z1JAsvL2/pNFy40yJX0VX9Yk+hzOCIO5DydxBJHBbCY= +github.com/tetratelabs/wazero v1.8.0 h1:iEKu0d4c2Pd+QSRieYbnQC9yiFlMS9D+Jr0LsRmcF4g= +github.com/tetratelabs/wazero v1.8.0/go.mod h1:yAI0XTsMBhREkM/YDAK/zNou3GoiAce1P6+rp/wQhjs= github.com/theupdateframework/notary v0.7.0 h1:QyagRZ7wlSpjT5N2qQAh/pN+DVqgekv4DzbAiAiEL3c= github.com/theupdateframework/notary v0.7.0/go.mod h1:c9DRxcmhHmVLDay4/2fUYdISnHqbFDGRSlXPO0AhYWw= github.com/tilt-dev/fsnotify v1.4.8-0.20220602155310-fff9c274a375 h1:QB54BJwA6x8QU9nHY3xJSZR2kX9bgpZekRKGkLTmEXA= diff --git a/pkg/cmd/producerList.go b/pkg/cmd/producerList.go index 47c0d477..d8d6d74a 100644 --- a/pkg/cmd/producerList.go +++ b/pkg/cmd/producerList.go @@ -58,6 +58,7 @@ var producerListCmd = &cobra.Command{ fmt.Printf("%sAZCosmosDB%s (--output = azcosmosdb)\n", Green, Reset) fmt.Printf("%sCassandra%s (--output = cassandra)\n", Green, Reset) fmt.Printf("%sLUA Script%s (--output = luascript)\n", Green, Reset) + fmt.Printf("%sWASM Function%s (--output = wasm)\n", Green, Reset) fmt.Printf("%sAWS DynamoDB%s (--output = awsdynamodb)\n", Green, Reset) fmt.Println() diff --git a/pkg/cmd/templateRun.go b/pkg/cmd/templateRun.go index 9d5075d3..3b980403 100644 --- a/pkg/cmd/templateRun.go +++ b/pkg/cmd/templateRun.go @@ -123,6 +123,8 @@ jr template run --template "{{name}}" configuration.GlobalCfg.CassandraConfig, _ = cmd.Flags().GetString(f.Name) case "luascriptConfig": configuration.GlobalCfg.LUAScriptConfig, _ = cmd.Flags().GetString(f.Name) + case "wasmConfig": + configuration.GlobalCfg.WASMConfig, _ = cmd.Flags().GetString(f.Name) } } }) @@ -171,7 +173,7 @@ func init() { templateRunCmd.Flags().StringP("topic", "t", constants.DEFAULT_TOPIC, "Kafka topic") templateRunCmd.Flags().Bool("kcat", false, "If you want to pipe jr with kcat, use this flag: it is equivalent to --output stdout --outputTemplate '{{key}},{{value}}' --oneline") - templateRunCmd.Flags().StringP("output", "o", constants.DEFAULT_OUTPUT, "can be one of stdout, kafka, http, redis, mongo, elastic, s3, gcs, azblobstorage, azcosmosdb, cassandra, luascript, awsdynamodb") + templateRunCmd.Flags().StringP("output", "o", constants.DEFAULT_OUTPUT, "can be one of stdout, kafka, http, redis, mongo, elastic, s3, gcs, azblobstorage, azcosmosdb, cassandra, luascript, wasm, awsdynamodb") templateRunCmd.Flags().String("outputTemplate", constants.DEFAULT_OUTPUT_TEMPLATE, "Formatting of K,V on standard output") templateRunCmd.Flags().BoolP("oneline", "l", false, "strips /n from output, for example to be pipelined to tools like kcat") templateRunCmd.Flags().BoolP("autocreate", "a", false, "if enabled, autocreate topics") @@ -191,5 +193,6 @@ func init() { templateRunCmd.Flags().String("azCosmosDBConfig", "", "Azure CosmosDB configuration") templateRunCmd.Flags().String("cassandraConfig", "", "Cassandra configuration") templateRunCmd.Flags().String("luascriptConfig", "", "LUA Script configuration") + templateRunCmd.Flags().String("wasmConfig", "", "WASM configuration") } diff --git a/pkg/configuration/configuration.go b/pkg/configuration/configuration.go index dcf73080..ebf6ebdb 100644 --- a/pkg/configuration/configuration.go +++ b/pkg/configuration/configuration.go @@ -43,6 +43,7 @@ type GlobalConfiguration struct { CassandraConfig string AWSDynamoDBConfig string LUAScriptConfig string + WASMConfig string Url string EmbeddedTemplate bool FileNameTemplate bool diff --git a/pkg/emitter/emitter.go b/pkg/emitter/emitter.go index 69694de1..895e74ef 100644 --- a/pkg/emitter/emitter.go +++ b/pkg/emitter/emitter.go @@ -23,6 +23,7 @@ package emitter import ( "context" "fmt" + "github.com/jrnd-io/jr/pkg/producers/wasm" "os" "time" @@ -168,6 +169,10 @@ func (e *Emitter) Initialize(ctx context.Context, conf configuration.GlobalConfi e.Producer = createLUAScriptProducer(ctx, conf.LUAScriptConfig) return } + if e.Output == "wasm" { + e.Producer = createWASMProducer(ctx, conf.LUAScriptConfig) + return + } } @@ -269,6 +274,13 @@ func createLUAScriptProducer(_ context.Context, config string) Producer { return producer } +func createWASMProducer(ctx context.Context, config string) Producer { + producer := &wasm.Producer{} + producer.Initialize(ctx, config) + + return producer +} + func createKafkaProducer(_ context.Context, conf configuration.GlobalConfiguration, topic string, templateType string) *kafka.KafkaManager { kManager := &kafka.KafkaManager{ diff --git a/pkg/producers/wasm/wasm_config.go b/pkg/producers/wasm/wasm_config.go new file mode 100644 index 00000000..61b52a1d --- /dev/null +++ b/pkg/producers/wasm/wasm_config.go @@ -0,0 +1,25 @@ +// Copyright © 2024 JR team +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +package wasm + +type Config struct { + ModulePath string `json:"module_path"` + BindStdout bool `json:"bind_stdout"` +} diff --git a/pkg/producers/wasm/wasm_producer.go b/pkg/producers/wasm/wasm_producer.go new file mode 100644 index 00000000..71421bb1 --- /dev/null +++ b/pkg/producers/wasm/wasm_producer.go @@ -0,0 +1,150 @@ +// Copyright © 2024 JR team +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +package wasm + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "os" + "sync" + + "github.com/rs/zerolog/log" + "github.com/tetratelabs/wazero" + + wazapi "github.com/tetratelabs/wazero/api" + wasi "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" +) + +type Producer struct { + configuration Config + lock sync.Mutex + + r wazero.Runtime + m wazapi.Module + f wazapi.Function + + stdin *bytes.Buffer + stderr *bytes.Buffer +} + +func (p *Producer) Initialize(ctx context.Context, configFile string) { + cfgBytes, err := os.ReadFile(configFile) + if err != nil { + log.Fatal().Err(err).Msg("Failed to read config file") + } + + config := Config{} + if err := json.Unmarshal(cfgBytes, &config); err != nil { + log.Fatal().Err(err).Msg("Failed to unmarshal config") + } + + p.InitializeFromConfig(ctx, config) +} + +func (p *Producer) InitializeFromConfig(ctx context.Context, config Config) { + p.lock = sync.Mutex{} + p.r = wazero.NewRuntime(ctx) + + // initialize WASI for stdin/out + if _, err := wasi.NewBuilder(p.r).Instantiate(ctx); err != nil { + log.Fatal().Err(err).Msg("Failed to configure WASI") + } + + moduleBytes, err := os.ReadFile(config.ModulePath) + if err != nil { + log.Fatal().Err(err).Msg("Failed to read WASM module file") + } + + p.stdin = bytes.NewBuffer(nil) + p.stderr = bytes.NewBuffer(nil) + + mCfg := wazero.NewModuleConfig() + mCfg = mCfg.WithStdin(p.stdin) + mCfg = mCfg.WithStderr(p.stderr) + + if config.BindStdout { + mCfg = mCfg.WithStdout(os.Stdout) + } + + m, err := p.r.InstantiateWithConfig(ctx, moduleBytes, mCfg) + if err != nil { + log.Fatal().Err(err).Msg("unable to create WASM module") + } + + p.m = m + p.f = p.m.ExportedFunction("produce") +} + +func (p *Producer) Produce(ctx context.Context, k []byte, v []byte, _ any) { + p.lock.Lock() + defer p.lock.Unlock() + + p.stdin.Reset() + p.stderr.Reset() + + data, err := json.Marshal(map[string][]byte{ + "k": k, + "v": v, + }) + + if err != nil { + log.Fatal().Err(err).Msg("Failed to serialize WASM request") + } + + p.stdin.Write(data) + ret, err := p.f.Call(ctx, uint64(len(data))) + + if err != nil { + log.Fatal().Err(err).Msg("Failed to invoke WASM function") + } + + if len(ret) == 1 && ret[0] > 0 { + err = p.extractError(ret[0]) + if err != nil { + log.Fatal().Err(err).Msg("Failed to execute WASM function") + } + } +} + +func (p *Producer) Close(ctx context.Context) error { + if p.r == nil { + if err := p.r.Close(ctx); err != nil { + return err + } + } + + return nil +} + +func (p *Producer) extractError(len uint64) error { + if len == 0 { + return nil + } + + out := p.stderr.Bytes() + if out == nil { + return nil + } + + out = out[0:len] + + return errors.New(string(out)) +} diff --git a/pkg/producers/wasm/wasm_producer_test.go b/pkg/producers/wasm/wasm_producer_test.go new file mode 100644 index 00000000..c1498106 --- /dev/null +++ b/pkg/producers/wasm/wasm_producer_test.go @@ -0,0 +1,54 @@ +// Copyright © 2024 JR team +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. +package wasm_test + +import ( + "context" + "github.com/jrnd-io/jr/pkg/producers/wasm" + "testing" +) + +func TestWASMProducer(t *testing.T) { + + testCases := []struct { + name string + config wasm.Config + }{ + { + name: "testprint", + config: wasm.Config{ + ModulePath: "wasm_producer_test_function.wasm", + BindStdout: true, + }, + }, + } + + for _, tc := range testCases { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + + p := &wasm.Producer{} + p.InitializeFromConfig(ctx, tc.config) + p.Produce(ctx, []byte("somekey"), []byte("someval"), nil) + }) + + } +} diff --git a/pkg/producers/wasm/wasm_producer_test_function.go b/pkg/producers/wasm/wasm_producer_test_function.go new file mode 100644 index 00000000..09b98d2c --- /dev/null +++ b/pkg/producers/wasm/wasm_producer_test_function.go @@ -0,0 +1,50 @@ +//go:build tinygo.wasm + +// tinygo build -o pkg/producers/wasm/wasm_producer_test_function.wasm -target=wasi pkg/producers/wasm/wasm_producer_test_function.go +package main + +import ( + "bytes" + "encoding/json" + "io" + "os" +) + +const NoError = 0 + +//export produce +func _produce(size uint32) uint64 { + b := make([]byte, size) + + _, err := io.ReadAtLeast(os.Stdin, b, int(size)) + if err != nil { + return e(err) + } + + in := make(map[string][]byte) + + err = json.Unmarshal(b, &in) + if err != nil { + return e(err) + } + + out := bytes.ToUpper(in["v"]) + + _, err = os.Stdout.Write(out) + if err != nil { + return e(err) + } + + return NoError +} + +func e(err error) uint64 { + if err == nil { + return NoError + } + + _, _ = os.Stderr.WriteString(err.Error()) + return uint64(len(err.Error())) +} + +func main() {} diff --git a/pkg/producers/wasm/wasm_producer_test_function.wasm b/pkg/producers/wasm/wasm_producer_test_function.wasm new file mode 100755 index 00000000..c70f40f3 Binary files /dev/null and b/pkg/producers/wasm/wasm_producer_test_function.wasm differ