diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 3dcc2585..f270a105 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -145,4 +145,10 @@ updates: labels: - "๐Ÿค– Dependencies" schedule: - interval: "daily" \ No newline at end of file + interval: "daily" + - package-ecosystem: "gomod" + directory: "/nats/" # Location of package manifests + labels: + - "๐Ÿค– Dependencies" + schedule: + interval: "daily" diff --git a/.github/release-drafter-nats.yml b/.github/release-drafter-nats.yml new file mode 100644 index 00000000..2f947eaf --- /dev/null +++ b/.github/release-drafter-nats.yml @@ -0,0 +1,50 @@ +name-template: 'Nats - v$RESOLVED_VERSION' +tag-template: 'nats/v$RESOLVED_VERSION' +tag-prefix: nats/v +include-paths: + - nats +categories: + - title: 'โ— Breaking Changes' + labels: + - 'โ— BreakingChange' + - title: '๐Ÿš€ New' + labels: + - 'โœ๏ธ Feature' + - title: '๐Ÿงน Updates' + labels: + - '๐Ÿงน Updates' + - '๐Ÿค– Dependencies' + - title: '๐Ÿ› Fixes' + labels: + - 'โ˜ข๏ธ Bug' + - title: '๐Ÿ“š Documentation' + labels: + - '๐Ÿ“’ Documentation' +change-template: '- $TITLE (#$NUMBER)' +change-title-escapes: '\<*_&' # You can add # and @ to disable mentions, and add ` to disable code blocks. +exclude-contributors: + - dependabot + - dependabot[bot] +version-resolver: + major: + labels: + - 'major' + - 'โ— BreakingChange' + minor: + labels: + - 'minor' + - 'โœ๏ธ Feature' + patch: + labels: + - 'patch' + - '๐Ÿ“’ Documentation' + - 'โ˜ข๏ธ Bug' + - '๐Ÿค– Dependencies' + - '๐Ÿงน Updates' + default: patch +template: | + $CHANGES + + **Full Changelog**: https://github.com/$OWNER/$REPOSITORY/compare/$PREVIOUS_TAG...nats/v$RESOLVED_VERSION + + Thank you $CONTRIBUTORS for making this update possible. diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index f2c9d8a9..4dc1ef15 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -77,6 +77,7 @@ jobs: options: >- --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 + steps: - name: Fetch Repository uses: actions/checkout@v4 @@ -132,6 +133,12 @@ jobs: run: | redis-server --port 6379 & + - name: Run NATS + run: | + ./.github/scripts/gen-test-certs.sh + docker run -d --name nats-jetstream -p 4443:4443 -v ./nats/testdata:/testdata -v ./tls:/tls nats:latest --jetstream -c /testdata/nats-tls.conf + sleep 2 + - name: Run Benchmarks run: | set -o pipefail diff --git a/.github/workflows/gosec.yml b/.github/workflows/gosec.yml index 2520ecd4..6db5253a 100644 --- a/.github/workflows/gosec.yml +++ b/.github/workflows/gosec.yml @@ -38,6 +38,7 @@ jobs: json: true escape_json: false dir_names: true + dir_names_max_depth: '1' dir_names_exclude_current_dir: true gosec-scan: diff --git a/.github/workflows/release-drafter-nats.yml b/.github/workflows/release-drafter-nats.yml new file mode 100644 index 00000000..42ced3ea --- /dev/null +++ b/.github/workflows/release-drafter-nats.yml @@ -0,0 +1,19 @@ +name: Release Drafter Nats +on: + push: + # branches to consider in the event; optional, defaults to all + branches: + - master + - main + paths: + - 'nats/**' +jobs: + draft_release_nats: + runs-on: ubuntu-latest + timeout-minutes: 30 + steps: + - uses: release-drafter/release-drafter@v5 + with: + config-name: release-drafter-nats.yml + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/test-nats.yml b/.github/workflows/test-nats.yml new file mode 100644 index 00000000..1712bf9a --- /dev/null +++ b/.github/workflows/test-nats.yml @@ -0,0 +1,34 @@ +on: + push: + branches: + - master + - main + paths: + - 'nats/**' + pull_request: + paths: + - 'nats/**' +name: "Tests Nats Driver" +jobs: + Tests: + strategy: + matrix: + go-version: + - 1.20.x + - 1.21.x + runs-on: ubuntu-latest + steps: + - name: Fetch Repository + uses: actions/checkout@v4 + - name: Install Go + uses: actions/setup-go@v5 + with: + go-version: '${{ matrix.go-version }}' + - name: Generate config + run: ./.github/scripts/gen-test-certs.sh + - name: Run NATS + run: | + docker run -d --name nats-jetstream -p 4443:4443 -v ./nats/testdata:/testdata -v ./tls:/tls nats:latest --jetstream -c /testdata/nats-tls.conf + sleep 2 + - name: Test Nats + run: cd ./nats && go test ./... -v -race diff --git a/.gitignore b/.gitignore index 3d503277..bc107497 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,6 @@ vendor/ vendor /Godeps/ + +# Go specific +go.work* diff --git a/README.md b/README.md index 0e35501c..573df7fc 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,7 @@ type Storage interface { - [MongoDB](./mongodb/README.md) - [MSSQL](./mssql/README.md) - [MySQL](./mysql/README.md) +- [NATS](./nats/README.md) - [Pebble](./pebble/README.md) - [Postgres](./postgres/README.md) - [Redis](./redis/README.md) @@ -71,4 +72,3 @@ type Storage interface { - [S3](./s3/README.md) - [ScyllaDB](./scylladb/README.md) - [SQLite3](./sqlite3/README.md) - diff --git a/nats/README.md b/nats/README.md new file mode 100644 index 00000000..8955fd56 --- /dev/null +++ b/nats/README.md @@ -0,0 +1,116 @@ +--- +id: nats +title: Nats +--- + + +![Release](https://img.shields.io/github/v/tag/gofiber/storage?filter=nats*) +[![Discord](https://img.shields.io/discord/704680098577514527?style=flat&label=%F0%9F%92%AC%20discord&color=00ACD7)](https://gofiber.io/discord) +![Test](https://img.shields.io/github/actions/workflow/status/gofiber/storage/test-nats.yml?label=Tests) +![Security](https://img.shields.io/github/actions/workflow/status/gofiber/storage/gosec.yml?label=Security) +![Linter](https://img.shields.io/github/actions/workflow/status/gofiber/storage/linter.yml?label=Linter) + +A NATS Key/Value storage driver. + +**Note: Requires Go 1.20 and above** + +### Table of Contents + +- [Signatures](#signatures) +- [Installation](#installation) +- [Examples](#examples) +- [Config](#config) +- [Default Config](#default-config) + +### Signatures + +```go +func New(config ...Config) Storage +func (s *Storage) Get(key string) ([]byte, error) +func (s *Storage) Set(key string, val []byte, exp time.Duration) error +func (s *Storage) Delete(key string) error +func (s *Storage) Reset() error +func (s *Storage) Close() error +func (s *Storage) Conn() (*nats.Conn, jetstream.KeyValue) +func (s *Storage) Keys() ([]string, error) +``` + +### Installation + +[NATS Key/Value Store](https://docs.nats.io/nats-concepts/jetstream/key-value-store) driver is tested on the 2 last [Go versions](https://golang.org/dl/) with support for modules. So make sure to initialize one first if you didn't do that yet: + +```bash +go mod init github.com// +``` + +And then install the nats implementation: + +```bash +go get github.com/gofiber/storage/nats +``` + +### Examples + +Import the storage package. + +```go +import "github.com/gofiber/storage/nats" +``` + +You can use the following possibilities to create a storage: + +```go +// Initialize default config +store := nats.New() + +// Initialize custom config +store := nats.New(Config{ + URLs: "nats://127.0.0.1:4443", + NatsOptions: []nats.Option{ + nats.MaxReconnects(2), + // Enable TLS by specifying RootCAs + nats.RootCAs("./testdata/certs/ca.pem"), + }, + KeyValueConfig: jetstream.KeyValueConfig{ + Bucket: "test", + Storage: jetstream.MemoryStorage, + }, +}) +``` + +### Config + +```go +type Config struct { + // Nats URLs, default "nats://127.0.0.1:4222". Can be comma separated list for multiple servers + URLs string + // Nats connection options. See nats_test.go for an example of how to use this. + NatsOptions []nats.Option + // Nats connection name + ClientName string + // Nats context + Context context.Context + // Nats key value config + KeyValueConfig jetstream.KeyValueConfig + // Logger. Using Fiber AllLogger interface for adapting the various log libraries. + Logger log.AllLogger + // Use the Logger for nats events, default: false + Verbose bool + // Wait for connection to be established, default: 100ms + WaitForConnection time.Duration +} +``` + +### Default Config + +```go +var ConfigDefault = Config{ + URLs: nats.DefaultURL, + Context: context.Background(), + ClientName: "fiber_storage", + KeyValueConfig: jetstream.KeyValueConfig{ + Bucket: "fiber_storage", + }, + WaitForConnection: 100 * time.Millisecond, +} +``` diff --git a/nats/config.go b/nats/config.go new file mode 100644 index 00000000..7f43c79e --- /dev/null +++ b/nats/config.go @@ -0,0 +1,78 @@ +package nats + +import ( + "context" + "time" + + "github.com/gofiber/fiber/v2/log" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +// Config defines the config for storage. +type Config struct { + // Nats URLs, default "nats://127.0.0.1:4222". Can be comma separated list for multiple servers + URLs string + // Nats connection options. See nats_test.go for an example of how to use this. + NatsOptions []nats.Option + // Nats connection name + ClientName string + // Nats context + Context context.Context + // Nats key value config + KeyValueConfig jetstream.KeyValueConfig + // Logger. Using Fiber AllLogger interface for adapting the various log libraries. + Logger log.AllLogger + // Use the Logger for nats events, default: false + Verbose bool + // Wait for connection to be established, default: 100ms + WaitForConnection time.Duration +} + +// ConfigDefault is the default config +var ConfigDefault = Config{ + URLs: nats.DefaultURL, + Context: context.Background(), + ClientName: "fiber_storage", + KeyValueConfig: jetstream.KeyValueConfig{ + Bucket: "fiber_storage", + }, + WaitForConnection: 100 * time.Millisecond, +} + +// Helper function to set default values +func configDefault(config ...Config) Config { + // Return default config if nothing provided + if len(config) < 1 { + return ConfigDefault + } + + // Override default config + cfg := config[0] + + // Set default values + if cfg.URLs == "" { + cfg.URLs = ConfigDefault.URLs + } + if cfg.Context == nil { + cfg.Context = ConfigDefault.Context + } + if len(cfg.KeyValueConfig.Bucket) == 0 { + cfg.KeyValueConfig.Bucket = ConfigDefault.KeyValueConfig.Bucket + } + if cfg.Verbose { + if cfg.Logger == nil { + cfg.Logger = log.DefaultLogger() + } + } else { + cfg.Logger = nil + } + if cfg.ClientName == "" { + cfg.ClientName = ConfigDefault.ClientName + } + if cfg.WaitForConnection == 0 { + cfg.WaitForConnection = ConfigDefault.WaitForConnection + } + + return cfg +} diff --git a/nats/go.mod b/nats/go.mod new file mode 100644 index 00000000..2b19fad0 --- /dev/null +++ b/nats/go.mod @@ -0,0 +1,22 @@ +module github.com/gofiber/storage/nats + +go 1.20 + +require ( + github.com/gofiber/fiber/v2 v2.52.0 + github.com/nats-io/nats.go v1.32.0 + github.com/stretchr/testify v1.8.4 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/klauspost/compress v1.17.2 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + golang.org/x/crypto v0.18.0 // indirect + golang.org/x/sys v0.16.0 // indirect + golang.org/x/text v0.14.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/nats/go.sum b/nats/go.sum new file mode 100644 index 00000000..c0bd6e5c --- /dev/null +++ b/nats/go.sum @@ -0,0 +1,29 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gofiber/fiber/v2 v2.52.0 h1:S+qXi7y+/Pgvqq4DrSmREGiFwtB7Bu6+QFLuIHYw/UE= +github.com/gofiber/fiber/v2 v2.52.0/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0= +github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/nats/nats.go b/nats/nats.go new file mode 100644 index 00000000..4d8c3a60 --- /dev/null +++ b/nats/nats.go @@ -0,0 +1,366 @@ +package nats + +import ( + "bytes" + "context" + "encoding/gob" + "errors" + "fmt" + "net" + "strings" + "sync" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +// Storage interface that is implemented by storage providers +type Storage struct { + nc *nats.Conn + kv jetstream.KeyValue + err error + ctx context.Context + cfg Config + mu sync.RWMutex +} + +type entry struct { + Data []byte + Expiry int64 +} + +func init() { + gob.Register(entry{}) +} + +// logErrorw is a helper function to log error messages +func (s *Storage) logErrorw(msg string, keysAndValues ...interface{}) { + if s.cfg.Verbose { + s.cfg.Logger.Errorw(msg, keysAndValues...) + } +} + +// logInfow is a helper function to log error messages +func (s *Storage) logInfow(msg string, keysAndValues ...interface{}) { + if s.cfg.Verbose { + s.cfg.Logger.Infow(msg, keysAndValues...) + } +} + +// connectHandler is a helper function to set the initial connect handler +func (s *Storage) connectHandler(nc *nats.Conn) { + s.logInfow("connected", + "diver", "nats", + "url", nc.ConnectedUrlRedacted(), + ) + + var err error + s.mu.Lock() + defer s.mu.Unlock() + s.kv, err = newNatsKV( + nc, + s.ctx, + s.cfg.KeyValueConfig, + ) + if err != nil { + s.logErrorw("kv not initialized", + "diver", "nats", + "error", err.Error(), + ) + s.err = errors.Join(s.err, err) + } +} + +// disconnectErrHandler is a helper function to set the disconnect error handler +func (s *Storage) disconnectErrHandler(nc *nats.Conn, err error) { + if err != nil { + s.logErrorw("disconnected", + "diver", "nats", + "error", err.Error(), + ) + } else { + s.logInfow("disconnected", + "diver", "nats", + ) + } + s.mu.Lock() + defer s.mu.Unlock() + nc.Opts.RetryOnFailedConnect = true + if err != nil { + s.err = errors.Join(s.err, err) + } +} + +// reconnectHandler is a helper function to set the reconnect handler +func (s *Storage) reconnectHandler(nc *nats.Conn) { + s.connectHandler(nc) +} + +// errorHandler is a helper function to set the error handler +func (s *Storage) errorHandler(nc *nats.Conn, sub *nats.Subscription, err error) { + s.logErrorw("error handler", + "diver", "nats", + "sub", sub.Subject, + "error", err.Error(), + ) + s.mu.Lock() + defer s.mu.Unlock() + if err != nil { + s.err = errors.Join(s.err, fmt.Errorf("subject %q: %w", sub.Subject, err)) + } +} + +// closedHandler is a helper function to set the closed handler +func (s *Storage) closedHandler(nc *nats.Conn) { + s.logInfow("closed", + "diver", "nats", + ) +} + +func newNatsKV(nc *nats.Conn, ctx context.Context, keyValueConfig jetstream.KeyValueConfig) (jetstream.KeyValue, error) { + js, err := jetstream.New(nc) + if err != nil { + return nil, fmt.Errorf("get jetstream: %w", err) + } + jskv, err := js.KeyValue(ctx, keyValueConfig.Bucket) + if err != nil { + if errors.Is(err, jetstream.ErrBucketNotFound) { + jskv, err = js.CreateKeyValue(ctx, keyValueConfig) + if err != nil { + return nil, fmt.Errorf("jetstream: create kv: %w", err) + } + } else { + return nil, fmt.Errorf("jetstream: get kv: %w", err) + } + } + + return jskv, nil +} + +// Process the url string argument to Connect. +// Return an array of urls, even if only one. +func processUrlString(url string) []string { + urls := strings.Split(url, ",") + var j int + for _, s := range urls { + u := strings.TrimSpace(s) + if len(u) > 0 { + urls[j] = u + j++ + } + } + return urls[:j] +} + +// New creates a new nats kv storage +func New(config ...Config) *Storage { + // Set default config + cfg := configDefault(config...) + + storage := &Storage{ + ctx: cfg.Context, + cfg: cfg, + } + + // Set the nats options with default custom handlers + cfg.NatsOptions = append( + []nats.Option{ + nats.ConnectHandler(storage.connectHandler), + nats.DisconnectErrHandler(storage.disconnectErrHandler), + nats.ReconnectHandler(storage.reconnectHandler), + nats.ErrorHandler(storage.errorHandler), + nats.ClosedHandler(storage.closedHandler), + }, + cfg.NatsOptions..., + ) + natsOpts := nats.GetDefaultOptions() + natsOpts.Servers = processUrlString(cfg.URLs) + for _, opt := range cfg.NatsOptions { + if opt != nil { + if err := opt(&natsOpts); err != nil { + panic(err) + } + } + } + // Connect to NATS + var err error + storage.nc, err = natsOpts.Connect() + + if opErr, ok := err.(*net.OpError); ok && natsOpts.RetryOnFailedConnect { + if opErr.Op != "dial" { + panic(err) + } + } else if err != nil { + panic(err) + } + + // TODO improve this crude way to wait for the connection to be established + time.Sleep(cfg.WaitForConnection) + + return storage +} + +// Get value by key +func (s *Storage) Get(key string) ([]byte, error) { + if len(key) <= 0 { + return nil, nil + } + + s.mu.RLock() + kv := s.kv + s.mu.RUnlock() + if kv == nil { + return nil, fmt.Errorf("kv not initialized: %v", s.err) + } + + v, err := kv.Get(s.ctx, key) + if err != nil { + if errors.Is(err, jetstream.ErrKeyNotFound) { + return nil, nil + } + return nil, fmt.Errorf("get: %w", err) + } + + e := entry{} + err = gob.NewDecoder( + bytes.NewBuffer(v.Value())). + Decode(&e) + if err != nil || e.Expiry <= time.Now().Unix() { + _ = kv.Delete(s.ctx, key) + return nil, nil + } + + return e.Data, nil +} + +// Set key with value +func (s *Storage) Set(key string, val []byte, exp time.Duration) error { + if len(key) <= 0 || len(val) <= 0 { + return nil + } + + s.mu.RLock() + kv := s.kv + s.mu.RUnlock() + if kv == nil { + return fmt.Errorf("kv not initialized: %v", s.err) + } + + // expiry + var expSeconds int64 + if exp != 0 { + expSeconds = time.Now().Add(exp).Unix() + } + // encode + e := new(bytes.Buffer) + err := gob.NewEncoder(e).Encode(entry{ + Data: val, + Expiry: expSeconds, + }) + if err != nil { + return fmt.Errorf("encode: %w", err) + } + + // set + _, err = kv.Put(s.ctx, key, e.Bytes()) + if errors.Is(err, jetstream.ErrKeyNotFound) { + _, err := kv.Create(s.ctx, key, e.Bytes()) + if err != nil { + return fmt.Errorf("create: %w", err) + } + } + + return err +} + +// Delete key by key +func (s *Storage) Delete(key string) error { + if len(key) <= 0 { + return nil + } + + s.mu.RLock() + kv := s.kv + s.mu.RUnlock() + + if kv == nil { + return fmt.Errorf("kv not initialized: %v", s.err) + } + + return kv.Delete(s.ctx, key) +} + +// Reset all keys +func (s *Storage) Reset() error { + js, err := jetstream.New(s.nc) + if err != nil { + return fmt.Errorf("get jetstream: %w", err) + } + + // Delete the bucket + err = js.DeleteKeyValue(s.ctx, s.cfg.KeyValueConfig.Bucket) + if err != nil { + return fmt.Errorf("delete kv: %w", err) + } + + // Create the bucket + s.mu.Lock() + defer s.mu.Unlock() + s.kv, err = newNatsKV( + s.nc, + s.ctx, + s.cfg.KeyValueConfig, + ) + if err != nil { + s.err = errors.Join(err) + return err + } + + s.err = nil + return nil +} + +// Close the nats connection +func (s *Storage) Close() error { + s.mu.RLock() + s.nc.Close() + s.mu.RUnlock() + return nil +} + +// Return database client +func (s *Storage) Conn() (*nats.Conn, jetstream.KeyValue) { + s.mu.RLock() + defer s.mu.RUnlock() + return s.nc, s.kv +} + +// Return all the keys +func (s *Storage) Keys() ([]string, error) { + s.mu.RLock() + kv := s.kv + s.mu.RUnlock() + if kv == nil { + return nil, fmt.Errorf("kv not initialized: %v", s.err) + } + + keyLister, err := kv.ListKeys(s.ctx) + + if err != nil { + return nil, fmt.Errorf("keys: %w", err) + } + + var keys []string + for key := range keyLister.Keys() { + keys = append(keys, key) + } + _ = keyLister.Stop() + + // Double check if no valid keys were found + if len(keys) == 0 { + return nil, nil + } + + return keys, nil +} diff --git a/nats/nats_test.go b/nats/nats_test.go new file mode 100644 index 00000000..dbe042d7 --- /dev/null +++ b/nats/nats_test.go @@ -0,0 +1,270 @@ +package nats + +import ( + "testing" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/require" +) + +var config = Config{ + URLs: "nats://localhost:4443", + NatsOptions: []nats.Option{ + nats.MaxReconnects(2), + // Enable TLS with client certificate authentication + nats.ClientCert("../tls/client.crt", "../tls/client.key"), + nats.RootCAs("../tls/ca.crt"), + }, + KeyValueConfig: jetstream.KeyValueConfig{ + Bucket: "test", + Storage: jetstream.MemoryStorage, + }, +} + +func Test_Storage_Nats_Set(t *testing.T) { + var ( + testStore = New(config) + key = "john" + val = []byte("doe") + ) + defer testStore.Close() + + err := testStore.Set(key, val, 0) + require.NoError(t, err) + + keys, err := testStore.Keys() + require.NoError(t, err) + require.Len(t, keys, 1) +} + +func Test_Storage_Nats_Set_Overwrite(t *testing.T) { + var ( + testStore = New(config) + key = "john" + val1 = []byte("doe") + val2 = []byte("overwritten") + ) + defer testStore.Close() + + err := testStore.Set(key, val1, 0) + require.NoError(t, err) + + err = testStore.Set(key, val2, 30*time.Second) + require.NoError(t, err) + v, err := testStore.Get(key) + require.NoError(t, err) + require.Equal(t, val2, v) + + keys, err := testStore.Keys() + require.NoError(t, err) + require.Len(t, keys, 1) +} + +func Test_Storage_Nats_Get(t *testing.T) { + var ( + testStore = New(config) + key = "john" + val = []byte("doe") + ) + defer testStore.Close() + + err := testStore.Set(key, val, 30*time.Second) + require.NoError(t, err) + + result, err := testStore.Get(key) + require.NoError(t, err) + require.Equal(t, val, result) + + keys, err := testStore.Keys() + require.NoError(t, err) + require.Len(t, keys, 1) +} + +func Test_Storage_Nats_Set_Expiration(t *testing.T) { + var ( + testStore = New(config) + key = "john" + val = []byte("doe") + exp = 1 * time.Second + ) + defer testStore.Close() + + err := testStore.Set(key, val, exp) + require.NoError(t, err) + + time.Sleep(1100 * time.Millisecond) + + result, err := testStore.Get(key) + require.NoError(t, err) + require.Zero(t, len(result)) + + keys, err := testStore.Keys() + require.NoError(t, err) + require.Nil(t, keys) +} + +func Test_Storage_Nats_Set_Long_Expiration_with_Keys(t *testing.T) { + var ( + testStore = New(config) + key = "john" + val = []byte("doe") + exp = 5 * time.Second + ) + defer testStore.Close() + + keys, err := testStore.Keys() + require.NoError(t, err) + require.Nil(t, keys) + + err = testStore.Set(key, val, exp) + require.NoError(t, err) + + time.Sleep(1100 * time.Millisecond) + + keys, err = testStore.Keys() + require.NoError(t, err) + require.Len(t, keys, 1) + + time.Sleep(4000 * time.Millisecond) + result, err := testStore.Get(key) + require.NoError(t, err) + require.Zero(t, len(result)) + + keys, err = testStore.Keys() + require.NoError(t, err) + require.Nil(t, keys) +} + +func Test_Storage_Nats_Get_NotExist(t *testing.T) { + testStore := New(config) + defer testStore.Close() + + result, err := testStore.Get("notexist") + require.NoError(t, err) + require.Zero(t, len(result)) + + keys, err := testStore.Keys() + require.NoError(t, err) + require.Nil(t, keys) +} + +func Test_Storage_Nats_Delete(t *testing.T) { + var ( + testStore = New(config) + key = "john" + val = []byte("doe") + ) + defer testStore.Close() + + err := testStore.Set(key, val, 0) + require.NoError(t, err) + + keys, err := testStore.Keys() + require.NoError(t, err) + require.Len(t, keys, 1) + + err = testStore.Delete(key) + require.NoError(t, err) + + result, err := testStore.Get(key) + require.NoError(t, err) + require.Zero(t, len(result)) + + keys, err = testStore.Keys() + require.NoError(t, err) + require.Nil(t, keys) +} + +func Test_Storage_Nats_Reset(t *testing.T) { + testStore := New(config) + defer testStore.Close() + val := []byte("doe") + + err := testStore.Set("john1", val, 0) + require.NoError(t, err) + + err = testStore.Set("john2", val, 0) + require.NoError(t, err) + + keys, err := testStore.Keys() + require.NoError(t, err) + require.Len(t, keys, 2) + + err = testStore.Reset() + require.NoError(t, err) + + result, err := testStore.Get("john1") + require.NoError(t, err) + require.Zero(t, len(result)) + + result, err = testStore.Get("john2") + require.NoError(t, err) + require.Zero(t, len(result)) + + keys, err = testStore.Keys() + require.NoError(t, err) + require.Nil(t, keys) +} + +func Test_Storage_Nats_Close(t *testing.T) { + testStore := New(config) + require.Nil(t, testStore.Close()) +} + +func Test_Storage_Nats_Conn(t *testing.T) { + testStore := New(config) + defer testStore.Close() + n, k := testStore.Conn() + require.NotNil(t, n) + require.NotNil(t, k) +} + +func Benchmark_Nats_Set(b *testing.B) { + testStore := New(config) + defer testStore.Close() + + b.ReportAllocs() + b.ResetTimer() + + var err error + for i := 0; i < b.N; i++ { + err = testStore.Set("john", []byte("doe"), 0) + } + + require.NoError(b, err) +} + +func Benchmark_Nats_Get(b *testing.B) { + testStore := New(config) + defer testStore.Close() + + err := testStore.Set("john", []byte("doe"), 0) + require.NoError(b, err) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, err = testStore.Get("john") + } + + require.NoError(b, err) +} + +func Benchmark_Nats_SetAndDelete(b *testing.B) { + testStore := New(config) + defer testStore.Close() + + b.ReportAllocs() + b.ResetTimer() + + var err error + for i := 0; i < b.N; i++ { + _ = testStore.Set("john", []byte("doe"), 0) + err = testStore.Delete("john") + } + + require.NoError(b, err) +} diff --git a/nats/testdata/nats-tls.conf b/nats/testdata/nats-tls.conf new file mode 100644 index 00000000..657c1c16 --- /dev/null +++ b/nats/testdata/nats-tls.conf @@ -0,0 +1,11 @@ +# Simple TLS config file + +port: 4443 +net: 0.0.0.0 # net interface + +tls { + cert_file: "/tls/redis.crt" + key_file: "/tls/redis.key" + ca_file: "/tls/ca.crt" + timeout: 2 +}