Skip to content

Commit

Permalink
Merge pull request #199 from OS-Mind/main
Browse files Browse the repository at this point in the history
Add WAMP Producer
  • Loading branch information
vmarchese authored Nov 11, 2024
2 parents a67c27f + 20d2655 commit 659dfd6
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 0 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.5 // indirect
github.com/gorilla/securecookie v1.1.2 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/hamba/avro/v2 v2.20.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
Expand Down Expand Up @@ -133,6 +134,7 @@ require (
github.com/tink-crypto/tink-go-gcpkms/v2 v2.1.0 // indirect
github.com/tink-crypto/tink-go-hcvault/v2 v2.1.0 // indirect
github.com/tink-crypto/tink-go/v2 v2.1.0 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
Expand Down Expand Up @@ -168,6 +170,7 @@ require (
github.com/cnkei/gospline v0.0.0-20191204052713-d67fac29a294
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gammazero/nexus/v3 v3.2.2
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/invopop/jsonschema v0.12.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/fvbommel/sortorder v1.0.2 h1:mV4o8B2hKboCdkJm+a7uX/SIpZob4JzUpc5GGnM45eo=
github.com/fvbommel/sortorder v1.0.2/go.mod h1:uk88iVf1ovNn1iLfgUVU2F9o5eO30ui720w+kxuqRs0=
github.com/gammazero/nexus/v3 v3.2.2 h1:uEBe4rKIcbBcbdP6XuyKUhnWBXxT0BnJrecG9+yZSTs=
github.com/gammazero/nexus/v3 v3.2.2/go.mod h1:55oZwPZFgRFCEjpMj1kdzffiPORKKmRsipSY8BeKRvY=
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k=
Expand Down Expand Up @@ -661,6 +663,8 @@ github.com/tonistiigi/units v0.0.0-20180711220420-6950e57a87ea h1:SXhTLE6pb6eld/
github.com/tonistiigi/units v0.0.0-20180711220420-6950e57a87ea/go.mod h1:WPnis/6cRcDZSUvVmezrxJPkiO87ThFYsoUiMwWNDJk=
github.com/tonistiigi/vt100 v0.0.0-20230623042737-f9a4f7ef6531 h1:Y/M5lygoNPKwVNLMPXgVfsRT40CSFKXCxuU8LoHySjs=
github.com/tonistiigi/vt100 v0.0.0-20230623042737-f9a4f7ef6531/go.mod h1:ulncasL3N9uLrVann0m+CDlJKWsIAP34MPcOJF6VRvc=
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/vadv/gopher-lua-libs v0.5.0 h1:m0hhWia1A1U3PIRmtdHWBj88ogzuIjm6HUBmtUa0Tz4=
github.com/vadv/gopher-lua-libs v0.5.0/go.mod h1:mlSOxmrjug7DwisiH7xBFnBellHobPbvAIhVeI/4SYY=
github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc=
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/producerList.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var producerListCmd = &cobra.Command{
fmt.Printf("%sLUA Script%s (--output = luascript)\n", Green, Reset)
fmt.Printf("%sWASM Function%s (--output = wasm)\n", Green, Reset)
fmt.Printf("%sAWS DynamoDB%s (--output = awsdynamodb)\n", Green, Reset)
fmt.Printf("%sWAMP Topic%s (--output = wamp)\n", Green, Reset)
fmt.Println()

},
Expand Down
3 changes: 3 additions & 0 deletions pkg/cmd/templateRun.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ jr template run --template "{{name}}"
configuration.GlobalCfg.LUAScriptConfig, _ = cmd.Flags().GetString(f.Name)
case "wasmConfig":
configuration.GlobalCfg.WASMConfig, _ = cmd.Flags().GetString(f.Name)
case "wampConfig":
configuration.GlobalCfg.WAMPConfig, _ = cmd.Flags().GetString(f.Name)
}
}
})
Expand Down Expand Up @@ -198,5 +200,6 @@ func init() {
templateRunCmd.Flags().String("cassandraConfig", "", "Cassandra configuration")
templateRunCmd.Flags().String("luascriptConfig", "", "LUA Script configuration")
templateRunCmd.Flags().String("wasmConfig", "", "WASM configuration")
templateRunCmd.Flags().String("wampConfig", "", "WAMP configuration")

}
1 change: 1 addition & 0 deletions pkg/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type GlobalConfiguration struct {
AWSDynamoDBConfig string
LUAScriptConfig string
WASMConfig string
WAMPConfig string
Url string
EmbeddedTemplate bool
FileNameTemplate bool
Expand Down
12 changes: 12 additions & 0 deletions pkg/emitter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/jrnd-io/jr/pkg/producers/redis"
"github.com/jrnd-io/jr/pkg/producers/s3"
"github.com/jrnd-io/jr/pkg/producers/server"
"github.com/jrnd-io/jr/pkg/producers/wamp"
"github.com/jrnd-io/jr/pkg/tpl"
"github.com/rs/zerolog/log"
)
Expand Down Expand Up @@ -177,6 +178,10 @@ func (e *Emitter) Initialize(ctx context.Context, conf configuration.GlobalConfi
e.Producer = createWASMProducer(ctx, conf.LUAScriptConfig)
return
}
if e.Output == "wamp" {
e.Producer = createWAMPProducer(ctx, conf.WAMPConfig)
return
}

}

Expand Down Expand Up @@ -285,6 +290,13 @@ func createWASMProducer(ctx context.Context, config string) Producer {
return producer
}

func createWAMPProducer(ctx context.Context, config string) Producer {
producer := &wamp.Producer{}
producer.Initialize(ctx, config)

return producer
}

func createKafkaProducer(ctx context.Context, conf configuration.GlobalConfiguration, topic string, templateType string) *kafka.Manager {

kManager := &kafka.Manager{
Expand Down
10 changes: 10 additions & 0 deletions pkg/producers/wamp/config.json.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"wamp_uri": "ws://localhost:9009/ws",
"username": "admin",
"password": "password",
"realm": "realm1",
"topic": "example.hello",
"serType": "json",
"compress": true,
"authid": "clientJR"
}
100 changes: 100 additions & 0 deletions pkg/producers/wamp/wampProducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package wamp

import (
"context"
"encoding/json"
"os"

"github.com/gammazero/nexus/v3/client"
"github.com/gammazero/nexus/v3/wamp"
"github.com/rs/zerolog/log"
)

type Config struct {
WampURI string `json:"wamp_uri"`
Username string `json:"username"`
Password string `json:"password"`
Realm string `json:"realm"`
Topic string `json:"topic"`
SerType string `json:"serType"`
Compress bool `json:"compress"`
Authid string `json:"authid"`
}

type Producer struct {
client client.Client
realm string
topic string
authid string
}

func (p *Producer) Initialize(ctx context.Context, configFile string) {
var config Config
file, err := os.ReadFile(configFile)
if err != nil {
log.Fatal().Err(err).Msg("Failed to read configuration file")
}
err = json.Unmarshal(file, &config)
if err != nil {
log.Fatal().Err(err).Msg("Failed to parse configuration parameters")
}
var wampclient *client.Client

// Get requested serialization.
serialization := client.JSON
switch config.SerType {
case "json":
case "msgpack":
serialization = client.MSGPACK
case "cbor":
serialization = client.CBOR
default:
log.Fatal().Err(err).Msg("Invalid serialization, muse be one of: json, msgpack, cbor")
}

cfg := client.Config{
Realm: config.Realm,
Serialization: serialization,
HelloDetails: wamp.Dict{
"authid": config.Authid,
},
}

if config.Compress {
cfg.WsCfg.EnableCompression = true
}

addr := config.WampURI

wampclient, err = client.ConnectNet(context.Background(), addr, cfg)
if err != nil {
log.Fatal().Err(err).Msg("Can't connect to WAMP Router")
}
// defer wampclient.Close()

p.realm = config.Realm
p.topic = config.Topic
p.authid = config.Authid

p.client = *wampclient
}

func (p *Producer) Produce(ctx context.Context, k []byte, v []byte, _ any) {
data := string(v)
args := wamp.List{data}
opts := wamp.Dict{
"authid": p.authid,
}
err := p.client.Publish(p.topic, opts, args, nil)
if err != nil {
log.Fatal().Err(err).Msgf("publish error: %s", err)
}
}

func (p *Producer) Close(ctx context.Context) error {
err := p.client.Close()
if err != nil {
log.Warn().Err(err).Msg("Failed to close WAMP connection")
}
return err
}
58 changes: 58 additions & 0 deletions pkg/producers/wamp/wampProducer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//go:build exclude

package wamp

import (
"context"
"testing"

"github.com/jrnd-io/jr/pkg/producers/wamp"
)

func TestProducer_Initialize(t *testing.T) {
configFile := "config.json.example"

producer, err := wamp.ProducerFactory("wamp")
if err != nil {
t.Fatalf("Error reading configuration file: %v", err)
}
err = producer.Initialize(configFile)
if err != nil {
t.Fatalf("Error reading configuration file: %v", err)
}
}

func TestProducer_Close(t *testing.T) {
configFile := "config.json.example"

producer, err := wamp.ProducerFactory("wamp")
if err != nil {
t.Fatalf("Error reading configuration file: %v", err)
}
err = producer.Initialize(configFile)
if err != nil {
t.Fatalf("Error reading configuration file: %v", err)
}

producer.Close()
}

func TestProducer_Produce(t *testing.T) {
configFile := "config.json.example"

producer, err := wamp.ProducerFactory("wamp")
if err != nil {
t.Fatalf("Error reading configuration file: %v", err)
}
err = producer.Initialize(configFile)
if err != nil {
t.Fatalf("Error initializing producer: %v", err)
}

ctx := context.Background()
key := "loo"
val := "foo"
exp := 0
producer.Produce(ctx, key, val, exp)

}

0 comments on commit 659dfd6

Please sign in to comment.