Skip to content

Commit

Permalink
Add WASM producer
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Aug 23, 2024
1 parent b873d99 commit 5c0d18c
Show file tree
Hide file tree
Showing 11 changed files with 300 additions and 1 deletion.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ require (
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/technoweenie/multipartstreamer v1.0.1 // indirect
github.com/tetratelabs/wazero v1.8.0 // indirect
github.com/tink-crypto/tink-go-gcpkms/v2 v2.1.0 // indirect
github.com/tink-crypto/tink-go-hcvault/v2 v2.1.0 // indirect
github.com/tink-crypto/tink-go/v2 v2.1.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,8 @@ github.com/testcontainers/testcontainers-go v0.31.0 h1:W0VwIhcEVhRflwL9as3dhY6jX
github.com/testcontainers/testcontainers-go v0.31.0/go.mod h1:D2lAoA0zUFiSY+eAflqK5mcUx/A5hrrORaEQrd0SefI=
github.com/testcontainers/testcontainers-go/modules/compose v0.31.0 h1:H74o3HisnApIDQx7sWibGzOl/Oo0By8DjyVeUf3qd6I=
github.com/testcontainers/testcontainers-go/modules/compose v0.31.0/go.mod h1:z1JAsvL2/pNFy40yJX0VX9Yk+hzOCIO5DydxBJHBbCY=
github.com/tetratelabs/wazero v1.8.0 h1:iEKu0d4c2Pd+QSRieYbnQC9yiFlMS9D+Jr0LsRmcF4g=
github.com/tetratelabs/wazero v1.8.0/go.mod h1:yAI0XTsMBhREkM/YDAK/zNou3GoiAce1P6+rp/wQhjs=
github.com/theupdateframework/notary v0.7.0 h1:QyagRZ7wlSpjT5N2qQAh/pN+DVqgekv4DzbAiAiEL3c=
github.com/theupdateframework/notary v0.7.0/go.mod h1:c9DRxcmhHmVLDay4/2fUYdISnHqbFDGRSlXPO0AhYWw=
github.com/tilt-dev/fsnotify v1.4.8-0.20220602155310-fff9c274a375 h1:QB54BJwA6x8QU9nHY3xJSZR2kX9bgpZekRKGkLTmEXA=
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/producerList.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var producerListCmd = &cobra.Command{
fmt.Printf("%sAZCosmosDB%s (--output = azcosmosdb)\n", Green, Reset)
fmt.Printf("%sCassandra%s (--output = cassandra)\n", Green, Reset)
fmt.Printf("%sLUA Script%s (--output = luascript)\n", Green, Reset)
fmt.Printf("%sWASM Function%s (--output = wasm)\n", Green, Reset)
fmt.Printf("%sAWS DynamoDB%s (--output = awsdynamodb)\n", Green, Reset)
fmt.Println()

Expand Down
5 changes: 4 additions & 1 deletion pkg/cmd/templateRun.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ jr template run --template "{{name}}"
configuration.GlobalCfg.CassandraConfig, _ = cmd.Flags().GetString(f.Name)
case "luascriptConfig":
configuration.GlobalCfg.LUAScriptConfig, _ = cmd.Flags().GetString(f.Name)
case "wasmConfig":
configuration.GlobalCfg.WASMConfig, _ = cmd.Flags().GetString(f.Name)
}
}
})
Expand Down Expand Up @@ -171,7 +173,7 @@ func init() {
templateRunCmd.Flags().StringP("topic", "t", constants.DEFAULT_TOPIC, "Kafka topic")

templateRunCmd.Flags().Bool("kcat", false, "If you want to pipe jr with kcat, use this flag: it is equivalent to --output stdout --outputTemplate '{{key}},{{value}}' --oneline")
templateRunCmd.Flags().StringP("output", "o", constants.DEFAULT_OUTPUT, "can be one of stdout, kafka, http, redis, mongo, elastic, s3, gcs, azblobstorage, azcosmosdb, cassandra, luascript, awsdynamodb")
templateRunCmd.Flags().StringP("output", "o", constants.DEFAULT_OUTPUT, "can be one of stdout, kafka, http, redis, mongo, elastic, s3, gcs, azblobstorage, azcosmosdb, cassandra, luascript, wasm, awsdynamodb")
templateRunCmd.Flags().String("outputTemplate", constants.DEFAULT_OUTPUT_TEMPLATE, "Formatting of K,V on standard output")
templateRunCmd.Flags().BoolP("oneline", "l", false, "strips /n from output, for example to be pipelined to tools like kcat")
templateRunCmd.Flags().BoolP("autocreate", "a", false, "if enabled, autocreate topics")
Expand All @@ -191,5 +193,6 @@ func init() {
templateRunCmd.Flags().String("azCosmosDBConfig", "", "Azure CosmosDB configuration")
templateRunCmd.Flags().String("cassandraConfig", "", "Cassandra configuration")
templateRunCmd.Flags().String("luascriptConfig", "", "LUA Script configuration")
templateRunCmd.Flags().String("wasmConfig", "", "WASM configuration")

}
1 change: 1 addition & 0 deletions pkg/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type GlobalConfiguration struct {
CassandraConfig string
AWSDynamoDBConfig string
LUAScriptConfig string
WASMConfig string
Url string
EmbeddedTemplate bool
FileNameTemplate bool
Expand Down
12 changes: 12 additions & 0 deletions pkg/emitter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package emitter
import (
"context"
"fmt"
"github.com/jrnd-io/jr/pkg/producers/wasm"
"os"
"time"

Expand Down Expand Up @@ -168,6 +169,10 @@ func (e *Emitter) Initialize(ctx context.Context, conf configuration.GlobalConfi
e.Producer = createLUAScriptProducer(ctx, conf.LUAScriptConfig)
return
}
if e.Output == "wasm" {
e.Producer = createWASMProducer(ctx, conf.LUAScriptConfig)
return
}

}

Expand Down Expand Up @@ -269,6 +274,13 @@ func createLUAScriptProducer(_ context.Context, config string) Producer {
return producer
}

func createWASMProducer(ctx context.Context, config string) Producer {
producer := &wasm.Producer{}
producer.Initialize(ctx, config)

return producer
}

func createKafkaProducer(_ context.Context, conf configuration.GlobalConfiguration, topic string, templateType string) *kafka.KafkaManager {

kManager := &kafka.KafkaManager{
Expand Down
25 changes: 25 additions & 0 deletions pkg/producers/wasm/wasm_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright © 2024 JR team
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package wasm

type Config struct {
ModulePath string `json:"module_path"`
BindStdout bool `json:"bind_stdout"`
}
150 changes: 150 additions & 0 deletions pkg/producers/wasm/wasm_producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright © 2024 JR team
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package wasm

import (
"bytes"
"context"
"encoding/json"
"errors"
"os"
"sync"

"github.com/rs/zerolog/log"
"github.com/tetratelabs/wazero"

wazapi "github.com/tetratelabs/wazero/api"
wasi "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
)

type Producer struct {
configuration Config
lock sync.Mutex

r wazero.Runtime
m wazapi.Module
f wazapi.Function

stdin *bytes.Buffer
stderr *bytes.Buffer
}

func (p *Producer) Initialize(ctx context.Context, configFile string) {
cfgBytes, err := os.ReadFile(configFile)
if err != nil {
log.Fatal().Err(err).Msg("Failed to read config file")
}

config := Config{}
if err := json.Unmarshal(cfgBytes, &config); err != nil {
log.Fatal().Err(err).Msg("Failed to unmarshal config")
}

p.InitializeFromConfig(ctx, config)
}

func (p *Producer) InitializeFromConfig(ctx context.Context, config Config) {
p.lock = sync.Mutex{}
p.r = wazero.NewRuntime(ctx)

// initialize WASI for stdin/out
if _, err := wasi.NewBuilder(p.r).Instantiate(ctx); err != nil {
log.Fatal().Err(err).Msg("Failed to configure WASI")
}

moduleBytes, err := os.ReadFile(config.ModulePath)
if err != nil {
log.Fatal().Err(err).Msg("Failed to read WASM module file")
}

p.stdin = bytes.NewBuffer(nil)
p.stderr = bytes.NewBuffer(nil)

mCfg := wazero.NewModuleConfig()
mCfg = mCfg.WithStdin(p.stdin)
mCfg = mCfg.WithStderr(p.stderr)

if config.BindStdout {
mCfg = mCfg.WithStdout(os.Stdout)
}

m, err := p.r.InstantiateWithConfig(ctx, moduleBytes, mCfg)
if err != nil {
log.Fatal().Err(err).Msg("unable to create WASM module")
}

p.m = m
p.f = p.m.ExportedFunction("produce")
}

func (p *Producer) Produce(ctx context.Context, k []byte, v []byte, _ any) {
p.lock.Lock()
defer p.lock.Unlock()

p.stdin.Reset()
p.stderr.Reset()

data, err := json.Marshal(map[string][]byte{
"k": k,
"v": v,
})

if err != nil {
log.Fatal().Err(err).Msg("Failed to serialize WASM request")
}

p.stdin.Write(data)
ret, err := p.f.Call(ctx, uint64(len(data)))

if err != nil {
log.Fatal().Err(err).Msg("Failed to invoke WASM function")
}

if len(ret) == 1 && ret[0] > 0 {
err = p.extractError(ret[0])
if err != nil {
log.Fatal().Err(err).Msg("Failed to execute WASM function")
}
}
}

func (p *Producer) Close(ctx context.Context) error {
if p.r == nil {
if err := p.r.Close(ctx); err != nil {
return err
}
}

return nil
}

func (p *Producer) extractError(len uint64) error {
if len == 0 {
return nil
}

out := p.stderr.Bytes()
if out == nil {
return nil
}

out = out[0:len]

return errors.New(string(out))
}
54 changes: 54 additions & 0 deletions pkg/producers/wasm/wasm_producer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright © 2024 JR team
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package wasm_test

import (
"context"
"github.com/jrnd-io/jr/pkg/producers/wasm"
"testing"
)

func TestWASMProducer(t *testing.T) {

testCases := []struct {
name string
config wasm.Config
}{
{
name: "testprint",
config: wasm.Config{
ModulePath: "wasm_producer_test_function.wasm",
BindStdout: true,
},
},
}

for _, tc := range testCases {
tc := tc

t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()

p := &wasm.Producer{}
p.InitializeFromConfig(ctx, tc.config)
p.Produce(ctx, []byte("somekey"), []byte("someval"), nil)
})

}
}
50 changes: 50 additions & 0 deletions pkg/producers/wasm/wasm_producer_test_function.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//go:build tinygo.wasm

// tinygo build -o pkg/producers/wasm/wasm_producer_test_function.wasm -target=wasi pkg/producers/wasm/wasm_producer_test_function.go
package main

import (
"bytes"
"encoding/json"
"io"
"os"
)

const NoError = 0

//export produce
func _produce(size uint32) uint64 {
b := make([]byte, size)

_, err := io.ReadAtLeast(os.Stdin, b, int(size))
if err != nil {
return e(err)
}

in := make(map[string][]byte)

err = json.Unmarshal(b, &in)
if err != nil {
return e(err)
}

out := bytes.ToUpper(in["v"])

_, err = os.Stdout.Write(out)
if err != nil {
return e(err)
}

return NoError
}

func e(err error) uint64 {
if err == nil {
return NoError
}

_, _ = os.Stderr.WriteString(err.Error())
return uint64(len(err.Error()))
}

func main() {}
Binary file not shown.

0 comments on commit 5c0d18c

Please sign in to comment.