diff --git a/common/constants.go b/common/constants.go index a2c21c2a..d6150160 100644 --- a/common/constants.go +++ b/common/constants.go @@ -21,6 +21,7 @@ const ( MemoryTubeType = "memory" HttpTubeType = "http" EmptyTubeType = "empty" + NatsTubeType = "nats" WASMRuntime = "wasm" ExternalRuntime = "external" diff --git a/fs/contube/nats.go b/fs/contube/nats.go new file mode 100644 index 00000000..598a7592 --- /dev/null +++ b/fs/contube/nats.go @@ -0,0 +1,130 @@ +/* + * Copyright 2024 Function Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package contube + +import ( + "context" + "time" + + "github.com/functionstream/function-stream/common" + "github.com/nats-io/nats.go" + "github.com/pkg/errors" +) + +type NatsTubeFactoryConfig struct { + NatsURL string `json:"nats_url"` +} + +type NatsEventQueueFactory struct { + nc *nats.Conn +} + +type NatsSourceTubeConfig struct { + Subject string `json:"subject" validate:"required"` +} + +func (n NatsEventQueueFactory) NewSourceTube(ctx context.Context, configMap ConfigMap) (<-chan Record, error) { + config := &NatsSourceTubeConfig{} + if err := configMap.ToConfigStruct(config); err != nil { + return nil, err + } + c := make(chan Record) + sub, err := n.nc.SubscribeSync(config.Subject) + if err != nil { + return nil, err + } + log := common.NewDefaultLogger() + go func() { + for { + msg, err := sub.NextMsg(10 * time.Millisecond) + if err != nil { + if !errors.Is(err, nats.ErrTimeout) { + log.Error(err, "Failed to get next message", "subject", config.Subject) + } + continue + } + select { + case c <- NewRecordImpl(msg.Data, func() { + _ = msg.Ack() + }): // do nothing + case <-ctx.Done(): + return + } + } + }() + return c, nil +} + +type NatsSinkTubeConfig struct { + Subject string `json:"subject" validate:"required"` +} + +func (n NatsEventQueueFactory) NewSinkTube(ctx context.Context, configMap ConfigMap) (chan<- Record, error) { + config := &NatsSinkTubeConfig{} + if err := configMap.ToConfigStruct(config); err != nil { + return nil, err + } + c := make(chan Record) + log := common.NewDefaultLogger() + go func() { + for { + select { + case <-ctx.Done(): + return + case event, ok := <-c: + if !ok { + return + } + err := n.nc.Publish(config.Subject, event.GetPayload()) + log.Info("Published message", "subject", config.Subject, "err", err) + if err != nil { + log.Error(err, "Failed to publish message", "subject", config.Subject) + continue + } + event.Commit() + } + } + }() + return c, nil +} + +func NewNatsEventQueueFactory(ctx context.Context, configMap ConfigMap) (TubeFactory, error) { + config := &NatsTubeFactoryConfig{} + if err := configMap.ToConfigStruct(config); err != nil { + return nil, err + } + if config.NatsURL == "" { + config.NatsURL = "nats://localhost:4222" + } + nc, err := nats.Connect(config.NatsURL) + if err != nil { + return nil, err + } + log := common.NewDefaultLogger() + go func() { + <-ctx.Done() + // Close the nats queue factory + log.Info("Closing nats queue factory", "url", config.NatsURL) + err := nc.Drain() + if err != nil { + log.Error(err, "Failed to drain nats connection", "url", config.NatsURL) + } + }() + return &NatsEventQueueFactory{ + nc: nc, + }, nil +} diff --git a/go.mod b/go.mod index b0dedca6..8fe63292 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/go-logr/zapr v1.3.0 github.com/go-openapi/spec v0.21.0 github.com/go-playground/validator/v10 v10.11.1 + github.com/nats-io/nats.go v1.37.0 github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.8.0 github.com/spf13/viper v1.18.2 @@ -74,6 +75,8 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mtibben/percent v0.2.1 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect diff --git a/go.sum b/go.sum index ded485aa..b0601130 100644 --- a/go.sum +++ b/go.sum @@ -152,6 +152,12 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= diff --git a/server/server.go b/server/server.go index c316931a..ae36188b 100644 --- a/server/server.go +++ b/server/server.go @@ -189,6 +189,10 @@ func GetBuiltinTubeFactoryBuilder() map[string]func(configMap config.ConfigMap) common.EmptyTubeType: func(_ config.ConfigMap) (contube.TubeFactory, error) { return contube.NewEmptyTubeFactory(), nil }, + //nolint:unparam + common.NatsTubeType: func(configMap config.ConfigMap) (contube.TubeFactory, error) { + return contube.NewNatsEventQueueFactory(context.Background(), contube.ConfigMap(configMap)) + }, } } diff --git a/server/server_test.go b/server/server_test.go index 41ffc814..81197f21 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -23,6 +23,9 @@ import ( "net" "strconv" "testing" + "time" + + "github.com/nats-io/nats.go" "github.com/functionstream/function-stream/common/config" @@ -47,6 +50,13 @@ func getListener(t *testing.T) net.Listener { func startStandaloneSvr(t *testing.T, ctx context.Context, opts ...ServerOption) (*Server, string) { ln := getListener(t) defaultOpts := []ServerOption{ + WithConfig(&Config{ + TubeConfig: map[string]config.ConfigMap{ + common.NatsTubeType: { + "nats_url": "nats://localhost:4222", + }, + }, + }), WithHttpListener(ln), WithTubeFactoryBuilders(GetBuiltinTubeFactoryBuilder()), WithRuntimeFactoryBuilders(GetBuiltinRuntimeFactoryBuilder()), @@ -189,6 +199,66 @@ func TestHttpTube(t *testing.T) { } } +func TestNatsTube(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + s, _ := startStandaloneSvr(t, ctx, nil, nil) + + funcConf := &model.Function{ + Package: "../bin/example_basic.wasm", + Sources: []model.TubeConfig{{ + Type: common.NatsTubeType, + Config: map[string]interface{}{ + "subject": "input", + }, + }}, + Sink: model.TubeConfig{ + Type: common.NatsTubeType, + Config: map[string]interface{}{ + "subject": "output", + }, + }, + Name: "test-func", + Replicas: 1, + } + + err := s.Manager.StartFunction(funcConf) + assert.Nil(t, err) + + p := &tests.Person{ + Name: "rbt", + Money: 0, + } + jsonBytes, err := json.Marshal(p) + if err != nil { + t.Fatal(err) + } + + nc, err := nats.Connect("nats://localhost:4222") + assert.NoError(t, err) + + sub, err := nc.SubscribeSync("output") + assert.NoError(t, err) + + assert.NoError(t, nc.Publish("input", jsonBytes)) + + event, err := sub.NextMsg(3 * time.Second) + if err != nil { + t.Error(err) + return + } + var out tests.Person + err = json.Unmarshal(event.Data, &out) + if err != nil { + t.Error(err) + return + } + if out.Money != 1 { + t.Errorf("expected 1, got %d", out.Money) + return + } +} + type MockRuntimeFactory struct { } diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml index a07c692c..371fa003 100644 --- a/tests/docker-compose.yaml +++ b/tests/docker-compose.yaml @@ -20,3 +20,11 @@ services: ports: - "6650:6650" - "8080:8080" + nats: + image: nats:latest + container_name: nats-server + ports: + - "4222:4222" + - "8222:8222" + environment: + - NATS_ALLOW_NEW_USERS=true