From a6d5398a539de849c68f1e532ba596df26b9d276 Mon Sep 17 00:00:00 2001 From: Dusan Borovcanin Date: Fri, 12 Jul 2024 11:17:08 +0200 Subject: [PATCH] Update Twins Signed-off-by: Dusan Borovcanin --- cmd/smpp-notifier/main.go | 2 +- consumers/notifiers/api/doc.go | 6 - consumers/notifiers/api/endpoint.go | 103 ---- consumers/notifiers/api/endpoint_test.go | 548 ------------------ consumers/notifiers/api/logging.go | 131 ----- consumers/notifiers/api/metrics.go | 81 --- consumers/notifiers/api/requests.go | 55 -- consumers/notifiers/api/responses.go | 88 --- consumers/notifiers/api/transport.go | 132 ----- consumers/notifiers/doc.go | 6 - consumers/notifiers/mocks/doc.go | 5 - consumers/notifiers/mocks/notifier.go | 47 -- consumers/notifiers/mocks/repository.go | 133 ----- consumers/notifiers/mocks/service.go | 151 ----- consumers/notifiers/notifier.go | 22 - consumers/notifiers/postgres/database.go | 74 --- consumers/notifiers/postgres/doc.go | 6 - consumers/notifiers/postgres/init.go | 28 - consumers/notifiers/postgres/setup_test.go | 89 --- consumers/notifiers/postgres/subscriptions.go | 164 ------ .../notifiers/postgres/subscriptions_test.go | 263 --------- consumers/notifiers/service.go | 174 ------ consumers/notifiers/service_test.go | 377 ------------ consumers/notifiers/subscriptions.go | 48 -- consumers/notifiers/tracing/doc.go | 12 - consumers/notifiers/tracing/subscriptions.go | 73 --- twins/api/http/endpoint_states_test.go | 42 +- twins/api/http/endpoint_twins_test.go | 35 +- twins/mongodb/states_test.go | 4 +- twins/mongodb/twins_test.go | 2 +- twins/service.go | 26 +- twins/service_test.go | 33 +- 32 files changed, 72 insertions(+), 2888 deletions(-) delete mode 100644 consumers/notifiers/api/doc.go delete mode 100644 consumers/notifiers/api/endpoint.go delete mode 100644 consumers/notifiers/api/endpoint_test.go delete mode 100644 consumers/notifiers/api/logging.go delete mode 100644 consumers/notifiers/api/metrics.go delete mode 100644 consumers/notifiers/api/requests.go delete mode 100644 consumers/notifiers/api/responses.go delete mode 100644 consumers/notifiers/api/transport.go delete mode 100644 consumers/notifiers/doc.go delete mode 100644 consumers/notifiers/mocks/doc.go delete mode 100644 consumers/notifiers/mocks/notifier.go delete mode 100644 consumers/notifiers/mocks/repository.go delete mode 100644 consumers/notifiers/mocks/service.go delete mode 100644 consumers/notifiers/notifier.go delete mode 100644 consumers/notifiers/postgres/database.go delete mode 100644 consumers/notifiers/postgres/doc.go delete mode 100644 consumers/notifiers/postgres/init.go delete mode 100644 consumers/notifiers/postgres/setup_test.go delete mode 100644 consumers/notifiers/postgres/subscriptions.go delete mode 100644 consumers/notifiers/postgres/subscriptions_test.go delete mode 100644 consumers/notifiers/service.go delete mode 100644 consumers/notifiers/service_test.go delete mode 100644 consumers/notifiers/subscriptions.go delete mode 100644 consumers/notifiers/tracing/doc.go delete mode 100644 consumers/notifiers/tracing/subscriptions.go diff --git a/cmd/smpp-notifier/main.go b/cmd/smpp-notifier/main.go index 7b6eed1..c76f9e6 100644 --- a/cmd/smpp-notifier/main.go +++ b/cmd/smpp-notifier/main.go @@ -18,7 +18,6 @@ import ( "github.com/absmach/magistrala/consumers/notifiers" "github.com/absmach/magistrala/consumers/notifiers/api" notifierpg "github.com/absmach/magistrala/consumers/notifiers/postgres" - mgsmpp "github.com/absmach/magistrala/consumers/notifiers/smpp" "github.com/absmach/magistrala/consumers/notifiers/tracing" mglog "github.com/absmach/magistrala/logger" "github.com/absmach/magistrala/pkg/auth" @@ -31,6 +30,7 @@ import ( httpserver "github.com/absmach/magistrala/pkg/server/http" "github.com/absmach/magistrala/pkg/ulid" "github.com/absmach/magistrala/pkg/uuid" + mgsmpp "github.com/absmach/mg-contrib/consumers/notifiers/smpp" "github.com/caarlos0/env/v10" "github.com/jmoiron/sqlx" "go.opentelemetry.io/otel/trace" diff --git a/consumers/notifiers/api/doc.go b/consumers/notifiers/api/doc.go deleted file mode 100644 index 2424852..0000000 --- a/consumers/notifiers/api/doc.go +++ /dev/null @@ -1,6 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -// Package api contains API-related concerns: endpoint definitions, middlewares -// and all resource representations. -package api diff --git a/consumers/notifiers/api/endpoint.go b/consumers/notifiers/api/endpoint.go deleted file mode 100644 index 4b411ea..0000000 --- a/consumers/notifiers/api/endpoint.go +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package api - -import ( - "context" - - notifiers "github.com/absmach/magistrala/consumers/notifiers" - "github.com/absmach/magistrala/pkg/apiutil" - "github.com/absmach/magistrala/pkg/errors" - "github.com/go-kit/kit/endpoint" -) - -func createSubscriptionEndpoint(svc notifiers.Service) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (interface{}, error) { - req := request.(createSubReq) - if err := req.validate(); err != nil { - return createSubRes{}, errors.Wrap(apiutil.ErrValidation, err) - } - sub := notifiers.Subscription{ - Contact: req.Contact, - Topic: req.Topic, - } - id, err := svc.CreateSubscription(ctx, req.token, sub) - if err != nil { - return createSubRes{}, err - } - ucr := createSubRes{ - ID: id, - } - - return ucr, nil - } -} - -func viewSubscriptionEndpint(svc notifiers.Service) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (interface{}, error) { - req := request.(subReq) - if err := req.validate(); err != nil { - return viewSubRes{}, errors.Wrap(apiutil.ErrValidation, err) - } - sub, err := svc.ViewSubscription(ctx, req.token, req.id) - if err != nil { - return viewSubRes{}, err - } - res := viewSubRes{ - ID: sub.ID, - OwnerID: sub.OwnerID, - Contact: sub.Contact, - Topic: sub.Topic, - } - return res, nil - } -} - -func listSubscriptionsEndpoint(svc notifiers.Service) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (interface{}, error) { - req := request.(listSubsReq) - if err := req.validate(); err != nil { - return listSubsRes{}, errors.Wrap(apiutil.ErrValidation, err) - } - pm := notifiers.PageMetadata{ - Topic: req.topic, - Contact: req.contact, - Offset: req.offset, - Limit: int(req.limit), - } - page, err := svc.ListSubscriptions(ctx, req.token, pm) - if err != nil { - return listSubsRes{}, err - } - res := listSubsRes{ - Offset: page.Offset, - Limit: page.Limit, - Total: page.Total, - } - for _, sub := range page.Subscriptions { - r := viewSubRes{ - ID: sub.ID, - OwnerID: sub.OwnerID, - Contact: sub.Contact, - Topic: sub.Topic, - } - res.Subscriptions = append(res.Subscriptions, r) - } - - return res, nil - } -} - -func deleteSubscriptionEndpint(svc notifiers.Service) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (interface{}, error) { - req := request.(subReq) - if err := req.validate(); err != nil { - return nil, errors.Wrap(apiutil.ErrValidation, err) - } - if err := svc.RemoveSubscription(ctx, req.token, req.id); err != nil { - return nil, err - } - return removeSubRes{}, nil - } -} diff --git a/consumers/notifiers/api/endpoint_test.go b/consumers/notifiers/api/endpoint_test.go deleted file mode 100644 index e1148d6..0000000 --- a/consumers/notifiers/api/endpoint_test.go +++ /dev/null @@ -1,548 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package api_test - -import ( - "encoding/json" - "fmt" - "io" - "net/http" - "net/http/httptest" - "path" - "strings" - "testing" - - authmocks "github.com/absmach/magistrala/auth/mocks" - "github.com/absmach/magistrala/consumers/notifiers" - httpapi "github.com/absmach/magistrala/consumers/notifiers/api" - "github.com/absmach/magistrala/consumers/notifiers/mocks" - mglog "github.com/absmach/magistrala/logger" - "github.com/absmach/magistrala/pkg/apiutil" - svcerr "github.com/absmach/magistrala/pkg/errors/service" - "github.com/absmach/magistrala/pkg/uuid" - "github.com/absmach/mg-contrib/pkg/testsutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" -) - -const ( - contentType = "application/json" - email = "user@example.com" - contact1 = "email1@example.com" - contact2 = "email2@example.com" - token = "token" - topic = "topic" - instanceID = "5de9b29a-feb9-11ed-be56-0242ac120002" - validID = "d4ebb847-5d0e-4e46-bdd9-b6aceaaa3a22" -) - -var ( - notFoundRes = toJSON(apiutil.ErrorRes{Msg: svcerr.ErrNotFound.Error()}) - unauthRes = toJSON(apiutil.ErrorRes{Msg: svcerr.ErrAuthentication.Error()}) - invalidRes = toJSON(apiutil.ErrorRes{Err: apiutil.ErrInvalidQueryParams.Error(), Msg: apiutil.ErrValidation.Error()}) - missingTokRes = toJSON(apiutil.ErrorRes{Err: apiutil.ErrBearerToken.Error(), Msg: apiutil.ErrValidation.Error()}) -) - -type testRequest struct { - client *http.Client - method string - url string - contentType string - token string - body io.Reader -} - -func (tr testRequest) make() (*http.Response, error) { - req, err := http.NewRequest(tr.method, tr.url, tr.body) - if err != nil { - return nil, err - } - if tr.token != "" { - req.Header.Set("Authorization", apiutil.BearerPrefix+tr.token) - } - if tr.contentType != "" { - req.Header.Set("Content-Type", tr.contentType) - } - return tr.client.Do(req) -} - -func newServer() (*httptest.Server, *mocks.Service) { - logger := mglog.NewMock() - svc := new(mocks.Service) - mux := httpapi.MakeHandler(svc, logger, instanceID) - return httptest.NewServer(mux), svc -} - -func toJSON(data interface{}) string { - jsonData, err := json.Marshal(data) - if err != nil { - return "" - } - return string(jsonData) -} - -func TestCreate(t *testing.T) { - ss, svc := newServer() - defer ss.Close() - - sub := notifiers.Subscription{ - Topic: topic, - Contact: contact1, - } - - data := toJSON(sub) - - emptyTopic := toJSON(notifiers.Subscription{Contact: contact1}) - emptyContact := toJSON(notifiers.Subscription{Topic: "topic123"}) - - cases := []struct { - desc string - req string - contentType string - auth string - status int - location string - err error - }{ - { - desc: "add successfully", - req: data, - contentType: contentType, - auth: token, - status: http.StatusCreated, - location: fmt.Sprintf("/subscriptions/%s%012d", uuid.Prefix, 1), - err: nil, - }, - { - desc: "add an existing subscription", - req: data, - contentType: contentType, - auth: token, - status: http.StatusConflict, - location: "", - err: svcerr.ErrConflict, - }, - { - desc: "add with empty topic", - req: emptyTopic, - contentType: contentType, - auth: token, - status: http.StatusBadRequest, - location: "", - err: svcerr.ErrMalformedEntity, - }, - { - desc: "add with empty contact", - req: emptyContact, - contentType: contentType, - auth: token, - status: http.StatusBadRequest, - location: "", - err: svcerr.ErrMalformedEntity, - }, - { - desc: "add with invalid auth token", - req: data, - contentType: contentType, - auth: authmocks.InvalidValue, - status: http.StatusUnauthorized, - location: "", - err: svcerr.ErrAuthentication, - }, - { - desc: "add with empty auth token", - req: data, - contentType: contentType, - auth: "", - status: http.StatusUnauthorized, - location: "", - err: svcerr.ErrAuthentication, - }, - { - desc: "add with invalid request format", - req: "}", - contentType: contentType, - auth: token, - status: http.StatusBadRequest, - location: "", - err: svcerr.ErrMalformedEntity, - }, - { - desc: "add without content type", - req: data, - contentType: "", - auth: token, - status: http.StatusUnsupportedMediaType, - location: "", - err: apiutil.ErrUnsupportedContentType, - }, - } - - for _, tc := range cases { - svcCall := svc.On("CreateSubscription", mock.Anything, tc.auth, sub).Return(path.Base(tc.location), tc.err) - - req := testRequest{ - client: ss.Client(), - method: http.MethodPost, - url: fmt.Sprintf("%s/subscriptions", ss.URL), - contentType: tc.contentType, - token: tc.auth, - body: strings.NewReader(tc.req), - } - res, err := req.make() - assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err)) - - location := res.Header.Get("Location") - assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code %d got %d", tc.desc, tc.status, res.StatusCode)) - assert.Equal(t, tc.location, location, fmt.Sprintf("%s: expected location %s got %s", tc.desc, tc.location, location)) - - svcCall.Unset() - } -} - -func TestView(t *testing.T) { - ss, svc := newServer() - defer ss.Close() - - sub := notifiers.Subscription{ - Topic: topic, - Contact: contact1, - ID: testsutil.GenerateUUID(t), - OwnerID: validID, - } - - sr := subRes{ - ID: sub.ID, - OwnerID: validID, - Contact: sub.Contact, - Topic: sub.Topic, - } - data := toJSON(sr) - - cases := []struct { - desc string - id string - auth string - status int - res string - err error - Sub notifiers.Subscription - }{ - { - desc: "view successfully", - id: sub.ID, - auth: token, - status: http.StatusOK, - res: data, - err: nil, - Sub: sub, - }, - { - desc: "view not existing", - id: "not existing", - auth: token, - status: http.StatusNotFound, - res: notFoundRes, - err: svcerr.ErrNotFound, - }, - { - desc: "view with invalid auth token", - id: sub.ID, - auth: authmocks.InvalidValue, - status: http.StatusUnauthorized, - res: unauthRes, - err: svcerr.ErrAuthentication, - }, - { - desc: "view with empty auth token", - id: sub.ID, - auth: "", - status: http.StatusUnauthorized, - res: missingTokRes, - err: svcerr.ErrAuthentication, - }, - } - - for _, tc := range cases { - svcCall := svc.On("ViewSubscription", mock.Anything, tc.auth, tc.id).Return(tc.Sub, tc.err) - - req := testRequest{ - client: ss.Client(), - method: http.MethodGet, - url: fmt.Sprintf("%s/subscriptions/%s", ss.URL, tc.id), - token: tc.auth, - } - res, err := req.make() - assert.Nil(t, err, fmt.Sprintf("%s: unexpected request error %s", tc.desc, err)) - body, err := io.ReadAll(res.Body) - assert.Nil(t, err, fmt.Sprintf("%s: unexpected read error %s", tc.desc, err)) - data := strings.Trim(string(body), "\n") - assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code %d got %d", tc.desc, tc.status, res.StatusCode)) - assert.Equal(t, tc.res, data, fmt.Sprintf("%s: expected body %s got %s", tc.desc, tc.res, data)) - - svcCall.Unset() - } -} - -func TestList(t *testing.T) { - ss, svc := newServer() - defer ss.Close() - - const numSubs = 100 - var subs []subRes - var sub notifiers.Subscription - - for i := 0; i < numSubs; i++ { - sub = notifiers.Subscription{ - Topic: fmt.Sprintf("topic.subtopic.%d", i), - Contact: contact1, - ID: testsutil.GenerateUUID(t), - } - if i%2 == 0 { - sub.Contact = contact2 - } - sr := subRes{ - ID: sub.ID, - OwnerID: validID, - Contact: sub.Contact, - Topic: sub.Topic, - } - subs = append(subs, sr) - } - noLimit := toJSON(page{Offset: 5, Limit: 20, Total: numSubs, Subscriptions: subs[5:25]}) - one := toJSON(page{Offset: 0, Limit: 20, Total: 1, Subscriptions: subs[10:11]}) - - var contact2Subs []subRes - for i := 20; i < 40; i += 2 { - contact2Subs = append(contact2Subs, subs[i]) - } - contactList := toJSON(page{Offset: 10, Limit: 10, Total: 50, Subscriptions: contact2Subs}) - - cases := []struct { - desc string - query map[string]string - auth string - status int - res string - err error - page notifiers.Page - }{ - { - desc: "list default limit", - query: map[string]string{ - "offset": "5", - }, - auth: token, - status: http.StatusOK, - res: noLimit, - err: nil, - page: notifiers.Page{ - PageMetadata: notifiers.PageMetadata{ - Offset: 5, - Limit: 20, - }, - Total: numSubs, - Subscriptions: subscriptionsSlice(subs, 5, 25), - }, - }, - { - desc: "list not existing", - query: map[string]string{ - "topic": "not-found-topic", - }, - auth: token, - status: http.StatusNotFound, - res: notFoundRes, - err: svcerr.ErrNotFound, - }, - { - desc: "list one with topic", - query: map[string]string{ - "topic": "topic.subtopic.10", - }, - auth: token, - status: http.StatusOK, - res: one, - err: nil, - page: notifiers.Page{ - PageMetadata: notifiers.PageMetadata{ - Offset: 0, - Limit: 20, - }, - Total: 1, - Subscriptions: subscriptionsSlice(subs, 10, 11), - }, - }, - { - desc: "list with contact", - query: map[string]string{ - "contact": contact2, - "offset": "10", - "limit": "10", - }, - auth: token, - status: http.StatusOK, - res: contactList, - err: nil, - page: notifiers.Page{ - PageMetadata: notifiers.PageMetadata{ - Offset: 10, - Limit: 10, - }, - Total: 50, - Subscriptions: subscriptionsSlice(contact2Subs, 0, 10), - }, - }, - { - desc: "list with invalid query", - query: map[string]string{ - "offset": "two", - }, - auth: token, - status: http.StatusBadRequest, - res: invalidRes, - err: svcerr.ErrMalformedEntity, - }, - { - desc: "list with invalid auth token", - auth: authmocks.InvalidValue, - status: http.StatusUnauthorized, - res: unauthRes, - err: svcerr.ErrAuthentication, - }, - { - desc: "list with empty auth token", - auth: "", - status: http.StatusUnauthorized, - res: missingTokRes, - err: svcerr.ErrAuthentication, - }, - } - - for _, tc := range cases { - svcCall := svc.On("ListSubscriptions", mock.Anything, tc.auth, mock.Anything).Return(tc.page, tc.err) - req := testRequest{ - client: ss.Client(), - method: http.MethodGet, - url: fmt.Sprintf("%s/subscriptions%s", ss.URL, makeQuery(tc.query)), - token: tc.auth, - } - res, err := req.make() - assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err)) - body, err := io.ReadAll(res.Body) - assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err)) - data := strings.Trim(string(body), "\n") - assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code %d got %d", tc.desc, tc.status, res.StatusCode)) - assert.Equal(t, tc.res, data, fmt.Sprintf("%s: got unexpected body\n", tc.desc)) - - svcCall.Unset() - } -} - -func TestRemove(t *testing.T) { - ss, svc := newServer() - defer ss.Close() - id := testsutil.GenerateUUID(t) - - cases := []struct { - desc string - id string - auth string - status int - res string - err error - }{ - { - desc: "remove successfully", - id: id, - auth: token, - status: http.StatusNoContent, - err: nil, - }, - { - desc: "remove not existing", - id: "not existing", - auth: token, - status: http.StatusNotFound, - err: svcerr.ErrNotFound, - }, - { - desc: "remove empty id", - id: "", - auth: token, - status: http.StatusBadRequest, - err: svcerr.ErrMalformedEntity, - }, - { - desc: "view with invalid auth token", - id: id, - auth: authmocks.InvalidValue, - status: http.StatusUnauthorized, - res: unauthRes, - err: svcerr.ErrAuthentication, - }, - { - desc: "view with empty auth token", - id: id, - auth: "", - status: http.StatusUnauthorized, - res: missingTokRes, - err: svcerr.ErrAuthentication, - }, - } - - for _, tc := range cases { - svcCall := svc.On("RemoveSubscription", mock.Anything, tc.auth, tc.id).Return(tc.err) - - req := testRequest{ - client: ss.Client(), - method: http.MethodDelete, - url: fmt.Sprintf("%s/subscriptions/%s", ss.URL, tc.id), - token: tc.auth, - } - res, err := req.make() - assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err)) - assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code %d got %d", tc.desc, tc.status, res.StatusCode)) - - svcCall.Unset() - } -} - -func makeQuery(m map[string]string) string { - var ret string - for k, v := range m { - ret += fmt.Sprintf("&%s=%s", k, v) - } - if ret != "" { - return fmt.Sprintf("?%s", ret[1:]) - } - return "" -} - -type subRes struct { - ID string `json:"id"` - OwnerID string `json:"owner_id"` - Contact string `json:"contact"` - Topic string `json:"topic"` -} -type page struct { - Offset uint `json:"offset"` - Limit int `json:"limit"` - Total uint `json:"total,omitempty"` - Subscriptions []subRes `json:"subscriptions,omitempty"` -} - -func subscriptionsSlice(subs []subRes, start, end int) []notifiers.Subscription { - var res []notifiers.Subscription - for i := start; i < end; i++ { - sub := subs[i] - res = append(res, notifiers.Subscription{ - ID: sub.ID, - OwnerID: sub.OwnerID, - Contact: sub.Contact, - Topic: sub.Topic, - }) - } - return res -} diff --git a/consumers/notifiers/api/logging.go b/consumers/notifiers/api/logging.go deleted file mode 100644 index e327d92..0000000 --- a/consumers/notifiers/api/logging.go +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -//go:build !test - -package api - -import ( - "context" - "log/slog" - "time" - - "github.com/absmach/magistrala/consumers/notifiers" -) - -var _ notifiers.Service = (*loggingMiddleware)(nil) - -type loggingMiddleware struct { - logger *slog.Logger - svc notifiers.Service -} - -// LoggingMiddleware adds logging facilities to the core service. -func LoggingMiddleware(svc notifiers.Service, logger *slog.Logger) notifiers.Service { - return &loggingMiddleware{logger, svc} -} - -// CreateSubscription logs the create_subscription request. It logs subscription ID and topic and the time it took to complete the request. -// If the request fails, it logs the error. -func (lm *loggingMiddleware) CreateSubscription(ctx context.Context, token string, sub notifiers.Subscription) (id string, err error) { - defer func(begin time.Time) { - args := []any{ - slog.String("duration", time.Since(begin).String()), - slog.Group("subscription", - slog.String("topic", sub.Topic), - slog.String("id", id), - ), - } - if err != nil { - args = append(args, slog.Any("error", err)) - lm.logger.Warn("Create subscription failed", args...) - return - } - lm.logger.Info("Create subscription completed successfully", args...) - }(time.Now()) - - return lm.svc.CreateSubscription(ctx, token, sub) -} - -// ViewSubscription logs the view_subscription request. It logs subscription topic and id and the time it took to complete the request. -// If the request fails, it logs the error. -func (lm *loggingMiddleware) ViewSubscription(ctx context.Context, token, topic string) (sub notifiers.Subscription, err error) { - defer func(begin time.Time) { - args := []any{ - slog.String("duration", time.Since(begin).String()), - slog.Group("subscription", - slog.String("topic", topic), - slog.String("id", sub.ID), - ), - } - if err != nil { - args = append(args, slog.Any("error", err)) - lm.logger.Warn("View subscription failed", args...) - return - } - lm.logger.Info("View subscription completed successfully", args...) - }(time.Now()) - - return lm.svc.ViewSubscription(ctx, token, topic) -} - -// ListSubscriptions logs the list_subscriptions request. It logs page metadata and subscription topic and the time it took to complete the request. -// If the request fails, it logs the error. -func (lm *loggingMiddleware) ListSubscriptions(ctx context.Context, token string, pm notifiers.PageMetadata) (res notifiers.Page, err error) { - defer func(begin time.Time) { - args := []any{ - slog.String("duration", time.Since(begin).String()), - slog.Group("page", - slog.String("topic", pm.Topic), - slog.Int("limit", pm.Limit), - slog.Uint64("offset", uint64(pm.Offset)), - slog.Uint64("total", uint64(res.Total)), - ), - } - if err != nil { - args = append(args, slog.Any("error", err)) - lm.logger.Warn("List subscriptions failed", args...) - return - } - lm.logger.Info("List subscriptions completed successfully", args...) - }(time.Now()) - - return lm.svc.ListSubscriptions(ctx, token, pm) -} - -// RemoveSubscription logs the remove_subscription request. It logs subscription ID and the time it took to complete the request. -// If the request fails, it logs the error. -func (lm *loggingMiddleware) RemoveSubscription(ctx context.Context, token, id string) (err error) { - defer func(begin time.Time) { - args := []any{ - slog.String("duration", time.Since(begin).String()), - slog.String("subscription_id", id), - } - if err != nil { - args = append(args, slog.Any("error", err)) - lm.logger.Warn("Remove subscription failed", args...) - return - } - lm.logger.Info("Remove subscription completed successfully", args...) - }(time.Now()) - - return lm.svc.RemoveSubscription(ctx, token, id) -} - -// ConsumeBlocking logs the consume_blocking request. It logs the time it took to complete the request. -// If the request fails, it logs the error. -func (lm *loggingMiddleware) ConsumeBlocking(ctx context.Context, msg interface{}) (err error) { - defer func(begin time.Time) { - args := []any{ - slog.String("duration", time.Since(begin).String()), - } - if err != nil { - args = append(args, slog.Any("error", err)) - lm.logger.Warn("Blocking consumer failed to consume messages successfully", args...) - return - } - lm.logger.Info("Blocking consumer consumed messages successfully", args...) - }(time.Now()) - - return lm.svc.ConsumeBlocking(ctx, msg) -} diff --git a/consumers/notifiers/api/metrics.go b/consumers/notifiers/api/metrics.go deleted file mode 100644 index 2097302..0000000 --- a/consumers/notifiers/api/metrics.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -//go:build !test - -package api - -import ( - "context" - "time" - - "github.com/absmach/magistrala/consumers/notifiers" - "github.com/go-kit/kit/metrics" -) - -var _ notifiers.Service = (*metricsMiddleware)(nil) - -type metricsMiddleware struct { - counter metrics.Counter - latency metrics.Histogram - svc notifiers.Service -} - -// MetricsMiddleware instruments core service by tracking request count and latency. -func MetricsMiddleware(svc notifiers.Service, counter metrics.Counter, latency metrics.Histogram) notifiers.Service { - return &metricsMiddleware{ - counter: counter, - latency: latency, - svc: svc, - } -} - -// CreateSubscription instruments CreateSubscription method with metrics. -func (ms *metricsMiddleware) CreateSubscription(ctx context.Context, token string, sub notifiers.Subscription) (string, error) { - defer func(begin time.Time) { - ms.counter.With("method", "create_subscription").Add(1) - ms.latency.With("method", "create_subscription").Observe(time.Since(begin).Seconds()) - }(time.Now()) - - return ms.svc.CreateSubscription(ctx, token, sub) -} - -// ViewSubscription instruments ViewSubscription method with metrics. -func (ms *metricsMiddleware) ViewSubscription(ctx context.Context, token, topic string) (notifiers.Subscription, error) { - defer func(begin time.Time) { - ms.counter.With("method", "view_subscription").Add(1) - ms.latency.With("method", "view_subscription").Observe(time.Since(begin).Seconds()) - }(time.Now()) - - return ms.svc.ViewSubscription(ctx, token, topic) -} - -// ListSubscriptions instruments ListSubscriptions method with metrics. -func (ms *metricsMiddleware) ListSubscriptions(ctx context.Context, token string, pm notifiers.PageMetadata) (notifiers.Page, error) { - defer func(begin time.Time) { - ms.counter.With("method", "list_subscriptions").Add(1) - ms.latency.With("method", "list_subscriptions").Observe(time.Since(begin).Seconds()) - }(time.Now()) - - return ms.svc.ListSubscriptions(ctx, token, pm) -} - -// RemoveSubscription instruments RemoveSubscription method with metrics. -func (ms *metricsMiddleware) RemoveSubscription(ctx context.Context, token, id string) error { - defer func(begin time.Time) { - ms.counter.With("method", "remove_subscription").Add(1) - ms.latency.With("method", "remove_subscription").Observe(time.Since(begin).Seconds()) - }(time.Now()) - - return ms.svc.RemoveSubscription(ctx, token, id) -} - -// ConsumeBlocking instruments ConsumeBlocking method with metrics. -func (ms *metricsMiddleware) ConsumeBlocking(ctx context.Context, msg interface{}) error { - defer func(begin time.Time) { - ms.counter.With("method", "consume").Add(1) - ms.latency.With("method", "consume").Observe(time.Since(begin).Seconds()) - }(time.Now()) - - return ms.svc.ConsumeBlocking(ctx, msg) -} diff --git a/consumers/notifiers/api/requests.go b/consumers/notifiers/api/requests.go deleted file mode 100644 index 9285f4d..0000000 --- a/consumers/notifiers/api/requests.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package api - -import "github.com/absmach/magistrala/pkg/apiutil" - -type createSubReq struct { - token string - Topic string `json:"topic,omitempty"` - Contact string `json:"contact,omitempty"` -} - -func (req createSubReq) validate() error { - if req.token == "" { - return apiutil.ErrBearerToken - } - if req.Topic == "" { - return apiutil.ErrInvalidTopic - } - if req.Contact == "" { - return apiutil.ErrInvalidContact - } - return nil -} - -type subReq struct { - token string - id string -} - -func (req subReq) validate() error { - if req.token == "" { - return apiutil.ErrBearerToken - } - if req.id == "" { - return apiutil.ErrMissingID - } - return nil -} - -type listSubsReq struct { - token string - topic string - contact string - offset uint - limit uint -} - -func (req listSubsReq) validate() error { - if req.token == "" { - return apiutil.ErrBearerToken - } - return nil -} diff --git a/consumers/notifiers/api/responses.go b/consumers/notifiers/api/responses.go deleted file mode 100644 index 7d31006..0000000 --- a/consumers/notifiers/api/responses.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package api - -import ( - "fmt" - "net/http" - - "github.com/absmach/magistrala" -) - -var ( - _ magistrala.Response = (*createSubRes)(nil) - _ magistrala.Response = (*viewSubRes)(nil) - _ magistrala.Response = (*listSubsRes)(nil) - _ magistrala.Response = (*removeSubRes)(nil) -) - -type createSubRes struct { - ID string -} - -func (res createSubRes) Code() int { - return http.StatusCreated -} - -func (res createSubRes) Headers() map[string]string { - return map[string]string{ - "Location": fmt.Sprintf("/subscriptions/%s", res.ID), - } -} - -func (res createSubRes) Empty() bool { - return true -} - -type viewSubRes struct { - ID string `json:"id"` - OwnerID string `json:"owner_id"` - Contact string `json:"contact"` - Topic string `json:"topic"` -} - -func (res viewSubRes) Code() int { - return http.StatusOK -} - -func (res viewSubRes) Headers() map[string]string { - return map[string]string{} -} - -func (res viewSubRes) Empty() bool { - return false -} - -type listSubsRes struct { - Offset uint `json:"offset"` - Limit int `json:"limit"` - Total uint `json:"total,omitempty"` - Subscriptions []viewSubRes `json:"subscriptions,omitempty"` -} - -func (res listSubsRes) Code() int { - return http.StatusOK -} - -func (res listSubsRes) Headers() map[string]string { - return map[string]string{} -} - -func (res listSubsRes) Empty() bool { - return false -} - -type removeSubRes struct{} - -func (res removeSubRes) Code() int { - return http.StatusNoContent -} - -func (res removeSubRes) Headers() map[string]string { - return map[string]string{} -} - -func (res removeSubRes) Empty() bool { - return true -} diff --git a/consumers/notifiers/api/transport.go b/consumers/notifiers/api/transport.go deleted file mode 100644 index 693819e..0000000 --- a/consumers/notifiers/api/transport.go +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package api - -import ( - "context" - "encoding/json" - "log/slog" - "net/http" - "strings" - - "github.com/absmach/magistrala" - "github.com/absmach/magistrala/consumers/notifiers" - "github.com/absmach/magistrala/pkg/apiutil" - "github.com/absmach/magistrala/pkg/errors" - "github.com/absmach/mg-contrib/pkg/api" - "github.com/go-chi/chi/v5" - kithttp "github.com/go-kit/kit/transport/http" - "github.com/prometheus/client_golang/prometheus/promhttp" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" -) - -const ( - contentType = "application/json" - offsetKey = "offset" - limitKey = "limit" - topicKey = "topic" - contactKey = "contact" - defOffset = 0 - defLimit = 20 -) - -// MakeHandler returns a HTTP handler for API endpoints. -func MakeHandler(svc notifiers.Service, logger *slog.Logger, instanceID string) http.Handler { - opts := []kithttp.ServerOption{ - kithttp.ServerErrorEncoder(apiutil.LoggingErrorEncoder(logger, api.EncodeError)), - } - - mux := chi.NewRouter() - - mux.Route("/subscriptions", func(r chi.Router) { - r.Post("/", otelhttp.NewHandler(kithttp.NewServer( - createSubscriptionEndpoint(svc), - decodeCreate, - api.EncodeResponse, - opts..., - ), "create").ServeHTTP) - - r.Get("/", otelhttp.NewHandler(kithttp.NewServer( - listSubscriptionsEndpoint(svc), - decodeList, - api.EncodeResponse, - opts..., - ), "list").ServeHTTP) - - r.Delete("/", otelhttp.NewHandler(kithttp.NewServer( - deleteSubscriptionEndpint(svc), - decodeSubscription, - api.EncodeResponse, - opts..., - ), "delete").ServeHTTP) - - r.Get("/{subID}", otelhttp.NewHandler(kithttp.NewServer( - viewSubscriptionEndpint(svc), - decodeSubscription, - api.EncodeResponse, - opts..., - ), "view").ServeHTTP) - - r.Delete("/{subID}", otelhttp.NewHandler(kithttp.NewServer( - deleteSubscriptionEndpint(svc), - decodeSubscription, - api.EncodeResponse, - opts..., - ), "delete").ServeHTTP) - }) - - mux.Get("/health", magistrala.Health("notifier", instanceID)) - mux.Handle("/metrics", promhttp.Handler()) - - return mux -} - -func decodeCreate(_ context.Context, r *http.Request) (interface{}, error) { - if !strings.Contains(r.Header.Get("Content-Type"), contentType) { - return nil, errors.Wrap(apiutil.ErrValidation, apiutil.ErrUnsupportedContentType) - } - - req := createSubReq{token: apiutil.ExtractBearerToken(r)} - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - return nil, errors.Wrap(apiutil.ErrValidation, errors.Wrap(err, errors.ErrMalformedEntity)) - } - - return req, nil -} - -func decodeSubscription(_ context.Context, r *http.Request) (interface{}, error) { - req := subReq{ - id: chi.URLParam(r, "subID"), - token: apiutil.ExtractBearerToken(r), - } - - return req, nil -} - -func decodeList(_ context.Context, r *http.Request) (interface{}, error) { - req := listSubsReq{token: apiutil.ExtractBearerToken(r)} - vals := r.URL.Query()[topicKey] - if len(vals) > 0 { - req.topic = vals[0] - } - - vals = r.URL.Query()[contactKey] - if len(vals) > 0 { - req.contact = vals[0] - } - - offset, err := apiutil.ReadNumQuery[uint64](r, offsetKey, defOffset) - if err != nil { - return listSubsReq{}, errors.Wrap(apiutil.ErrValidation, err) - } - req.offset = uint(offset) - - limit, err := apiutil.ReadNumQuery[uint64](r, limitKey, defLimit) - if err != nil { - return listSubsReq{}, errors.Wrap(apiutil.ErrValidation, err) - } - req.limit = uint(limit) - - return req, nil -} diff --git a/consumers/notifiers/doc.go b/consumers/notifiers/doc.go deleted file mode 100644 index e90c58c..0000000 --- a/consumers/notifiers/doc.go +++ /dev/null @@ -1,6 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -// Package notifiers contain the domain concept definitions needed to -// support Magistrala notifications functionality. -package notifiers diff --git a/consumers/notifiers/mocks/doc.go b/consumers/notifiers/mocks/doc.go deleted file mode 100644 index 16ed198..0000000 --- a/consumers/notifiers/mocks/doc.go +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -// Package mocks contains mocks for testing purposes. -package mocks diff --git a/consumers/notifiers/mocks/notifier.go b/consumers/notifiers/mocks/notifier.go deleted file mode 100644 index a3dcc56..0000000 --- a/consumers/notifiers/mocks/notifier.go +++ /dev/null @@ -1,47 +0,0 @@ -// Code generated by mockery v2.43.2. DO NOT EDIT. - -// Copyright (c) Abstract Machines - -package mocks - -import ( - messaging "github.com/absmach/magistrala/pkg/messaging" - mock "github.com/stretchr/testify/mock" -) - -// Notifier is an autogenerated mock type for the Notifier type -type Notifier struct { - mock.Mock -} - -// Notify provides a mock function with given fields: from, to, msg -func (_m *Notifier) Notify(from string, to []string, msg *messaging.Message) error { - ret := _m.Called(from, to, msg) - - if len(ret) == 0 { - panic("no return value specified for Notify") - } - - var r0 error - if rf, ok := ret.Get(0).(func(string, []string, *messaging.Message) error); ok { - r0 = rf(from, to, msg) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// NewNotifier creates a new instance of Notifier. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewNotifier(t interface { - mock.TestingT - Cleanup(func()) -}) *Notifier { - mock := &Notifier{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/consumers/notifiers/mocks/repository.go b/consumers/notifiers/mocks/repository.go deleted file mode 100644 index 49e5727..0000000 --- a/consumers/notifiers/mocks/repository.go +++ /dev/null @@ -1,133 +0,0 @@ -// Code generated by mockery v2.43.2. DO NOT EDIT. - -// Copyright (c) Abstract Machines - -package mocks - -import ( - context "context" - - notifiers "github.com/absmach/magistrala/consumers/notifiers" - mock "github.com/stretchr/testify/mock" -) - -// SubscriptionsRepository is an autogenerated mock type for the SubscriptionsRepository type -type SubscriptionsRepository struct { - mock.Mock -} - -// Remove provides a mock function with given fields: ctx, id -func (_m *SubscriptionsRepository) Remove(ctx context.Context, id string) error { - ret := _m.Called(ctx, id) - - if len(ret) == 0 { - panic("no return value specified for Remove") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { - r0 = rf(ctx, id) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Retrieve provides a mock function with given fields: ctx, id -func (_m *SubscriptionsRepository) Retrieve(ctx context.Context, id string) (notifiers.Subscription, error) { - ret := _m.Called(ctx, id) - - if len(ret) == 0 { - panic("no return value specified for Retrieve") - } - - var r0 notifiers.Subscription - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string) (notifiers.Subscription, error)); ok { - return rf(ctx, id) - } - if rf, ok := ret.Get(0).(func(context.Context, string) notifiers.Subscription); ok { - r0 = rf(ctx, id) - } else { - r0 = ret.Get(0).(notifiers.Subscription) - } - - if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { - r1 = rf(ctx, id) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// RetrieveAll provides a mock function with given fields: ctx, pm -func (_m *SubscriptionsRepository) RetrieveAll(ctx context.Context, pm notifiers.PageMetadata) (notifiers.Page, error) { - ret := _m.Called(ctx, pm) - - if len(ret) == 0 { - panic("no return value specified for RetrieveAll") - } - - var r0 notifiers.Page - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, notifiers.PageMetadata) (notifiers.Page, error)); ok { - return rf(ctx, pm) - } - if rf, ok := ret.Get(0).(func(context.Context, notifiers.PageMetadata) notifiers.Page); ok { - r0 = rf(ctx, pm) - } else { - r0 = ret.Get(0).(notifiers.Page) - } - - if rf, ok := ret.Get(1).(func(context.Context, notifiers.PageMetadata) error); ok { - r1 = rf(ctx, pm) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// Save provides a mock function with given fields: ctx, sub -func (_m *SubscriptionsRepository) Save(ctx context.Context, sub notifiers.Subscription) (string, error) { - ret := _m.Called(ctx, sub) - - if len(ret) == 0 { - panic("no return value specified for Save") - } - - var r0 string - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, notifiers.Subscription) (string, error)); ok { - return rf(ctx, sub) - } - if rf, ok := ret.Get(0).(func(context.Context, notifiers.Subscription) string); ok { - r0 = rf(ctx, sub) - } else { - r0 = ret.Get(0).(string) - } - - if rf, ok := ret.Get(1).(func(context.Context, notifiers.Subscription) error); ok { - r1 = rf(ctx, sub) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// NewSubscriptionsRepository creates a new instance of SubscriptionsRepository. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewSubscriptionsRepository(t interface { - mock.TestingT - Cleanup(func()) -}) *SubscriptionsRepository { - mock := &SubscriptionsRepository{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/consumers/notifiers/mocks/service.go b/consumers/notifiers/mocks/service.go deleted file mode 100644 index 9fe9494..0000000 --- a/consumers/notifiers/mocks/service.go +++ /dev/null @@ -1,151 +0,0 @@ -// Code generated by mockery v2.43.2. DO NOT EDIT. - -// Copyright (c) Abstract Machines - -package mocks - -import ( - context "context" - - notifiers "github.com/absmach/magistrala/consumers/notifiers" - mock "github.com/stretchr/testify/mock" -) - -// Service is an autogenerated mock type for the Service type -type Service struct { - mock.Mock -} - -// ConsumeBlocking provides a mock function with given fields: ctx, messages -func (_m *Service) ConsumeBlocking(ctx context.Context, messages interface{}) error { - ret := _m.Called(ctx, messages) - - if len(ret) == 0 { - panic("no return value specified for ConsumeBlocking") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, interface{}) error); ok { - r0 = rf(ctx, messages) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// CreateSubscription provides a mock function with given fields: ctx, token, sub -func (_m *Service) CreateSubscription(ctx context.Context, token string, sub notifiers.Subscription) (string, error) { - ret := _m.Called(ctx, token, sub) - - if len(ret) == 0 { - panic("no return value specified for CreateSubscription") - } - - var r0 string - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, notifiers.Subscription) (string, error)); ok { - return rf(ctx, token, sub) - } - if rf, ok := ret.Get(0).(func(context.Context, string, notifiers.Subscription) string); ok { - r0 = rf(ctx, token, sub) - } else { - r0 = ret.Get(0).(string) - } - - if rf, ok := ret.Get(1).(func(context.Context, string, notifiers.Subscription) error); ok { - r1 = rf(ctx, token, sub) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ListSubscriptions provides a mock function with given fields: ctx, token, pm -func (_m *Service) ListSubscriptions(ctx context.Context, token string, pm notifiers.PageMetadata) (notifiers.Page, error) { - ret := _m.Called(ctx, token, pm) - - if len(ret) == 0 { - panic("no return value specified for ListSubscriptions") - } - - var r0 notifiers.Page - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, notifiers.PageMetadata) (notifiers.Page, error)); ok { - return rf(ctx, token, pm) - } - if rf, ok := ret.Get(0).(func(context.Context, string, notifiers.PageMetadata) notifiers.Page); ok { - r0 = rf(ctx, token, pm) - } else { - r0 = ret.Get(0).(notifiers.Page) - } - - if rf, ok := ret.Get(1).(func(context.Context, string, notifiers.PageMetadata) error); ok { - r1 = rf(ctx, token, pm) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// RemoveSubscription provides a mock function with given fields: ctx, token, id -func (_m *Service) RemoveSubscription(ctx context.Context, token string, id string) error { - ret := _m.Called(ctx, token, id) - - if len(ret) == 0 { - panic("no return value specified for RemoveSubscription") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, token, id) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// ViewSubscription provides a mock function with given fields: ctx, token, id -func (_m *Service) ViewSubscription(ctx context.Context, token string, id string) (notifiers.Subscription, error) { - ret := _m.Called(ctx, token, id) - - if len(ret) == 0 { - panic("no return value specified for ViewSubscription") - } - - var r0 notifiers.Subscription - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) (notifiers.Subscription, error)); ok { - return rf(ctx, token, id) - } - if rf, ok := ret.Get(0).(func(context.Context, string, string) notifiers.Subscription); ok { - r0 = rf(ctx, token, id) - } else { - r0 = ret.Get(0).(notifiers.Subscription) - } - - if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { - r1 = rf(ctx, token, id) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// NewService creates a new instance of Service. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewService(t interface { - mock.TestingT - Cleanup(func()) -}) *Service { - mock := &Service{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/consumers/notifiers/notifier.go b/consumers/notifiers/notifier.go deleted file mode 100644 index 2c23bc9..0000000 --- a/consumers/notifiers/notifier.go +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package notifiers - -import ( - "errors" - - "github.com/absmach/magistrala/pkg/messaging" -) - -// ErrNotify wraps sending notification errors. -var ErrNotify = errors.New("error sending notification") - -// Notifier represents an API for sending notification. -// -//go:generate mockery --name Notifier --output=./mocks --filename notifier.go --quiet --note "Copyright (c) Abstract Machines" -type Notifier interface { - // Notify method is used to send notification for the - // received message to the provided list of receivers. - Notify(from string, to []string, msg *messaging.Message) error -} diff --git a/consumers/notifiers/postgres/database.go b/consumers/notifiers/postgres/database.go deleted file mode 100644 index 2e7ee74..0000000 --- a/consumers/notifiers/postgres/database.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package postgres - -import ( - "context" - "database/sql" - "fmt" - - "github.com/jmoiron/sqlx" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" -) - -var _ Database = (*database)(nil) - -type database struct { - db *sqlx.DB - tracer trace.Tracer -} - -// Database provides a database interface. -type Database interface { - NamedExecContext(context.Context, string, interface{}) (sql.Result, error) - QueryRowxContext(context.Context, string, ...interface{}) *sqlx.Row - NamedQueryContext(context.Context, string, interface{}) (*sqlx.Rows, error) - GetContext(context.Context, interface{}, string, ...interface{}) error -} - -// NewDatabase creates a SubscriptionsDatabase instance. -func NewDatabase(db *sqlx.DB, tracer trace.Tracer) Database { - return &database{ - db: db, - tracer: tracer, - } -} - -func (dm database) NamedExecContext(ctx context.Context, query string, args interface{}) (sql.Result, error) { - ctx, span := dm.addSpanTags(ctx, "NamedExecContext", query) - defer span.End() - return dm.db.NamedExecContext(ctx, query, args) -} - -func (dm database) QueryRowxContext(ctx context.Context, query string, args ...interface{}) *sqlx.Row { - ctx, span := dm.addSpanTags(ctx, "QueryRowxContext", query) - defer span.End() - return dm.db.QueryRowxContext(ctx, query, args...) -} - -func (dm database) NamedQueryContext(ctx context.Context, query string, args interface{}) (*sqlx.Rows, error) { - ctx, span := dm.addSpanTags(ctx, "NamedQueryContext", query) - defer span.End() - return dm.db.NamedQueryContext(ctx, query, args) -} - -func (dm database) GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error { - ctx, span := dm.addSpanTags(ctx, "GetContext", query) - defer span.End() - return dm.db.GetContext(ctx, dest, query, args...) -} - -func (dm database) addSpanTags(ctx context.Context, method, query string) (context.Context, trace.Span) { - ctx, span := dm.tracer.Start(ctx, - fmt.Sprintf("sql_%s", method), - trace.WithAttributes( - attribute.String("sql.statement", query), - attribute.String("span.kind", "client"), - attribute.String("peer.service", "postgres"), - attribute.String("db.type", "sql"), - ), - ) - return ctx, span -} diff --git a/consumers/notifiers/postgres/doc.go b/consumers/notifiers/postgres/doc.go deleted file mode 100644 index 73a6784..0000000 --- a/consumers/notifiers/postgres/doc.go +++ /dev/null @@ -1,6 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -// Package postgres contains repository implementations using PostgreSQL as -// the underlying database. -package postgres diff --git a/consumers/notifiers/postgres/init.go b/consumers/notifiers/postgres/init.go deleted file mode 100644 index ac74c3c..0000000 --- a/consumers/notifiers/postgres/init.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package postgres - -import migrate "github.com/rubenv/sql-migrate" - -func Migration() *migrate.MemoryMigrationSource { - return &migrate.MemoryMigrationSource{ - Migrations: []*migrate.Migration{ - { - Id: "subscriptions_1", - Up: []string{ - `CREATE TABLE IF NOT EXISTS subscriptions ( - id VARCHAR(254) PRIMARY KEY, - owner_id VARCHAR(254) NOT NULL, - contact VARCHAR(254), - topic TEXT, - UNIQUE(topic, contact) - )`, - }, - Down: []string{ - "DROP TABLE IF EXISTS subscriptions", - }, - }, - }, - } -} diff --git a/consumers/notifiers/postgres/setup_test.go b/consumers/notifiers/postgres/setup_test.go deleted file mode 100644 index b603378..0000000 --- a/consumers/notifiers/postgres/setup_test.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -// Package postgres_test contains tests for PostgreSQL repository -// implementations. -package postgres_test - -import ( - "fmt" - "log" - "os" - "testing" - - "github.com/absmach/magistrala/consumers/notifiers/postgres" - pgclient "github.com/absmach/magistrala/pkg/postgres" - "github.com/absmach/magistrala/pkg/ulid" - _ "github.com/jackc/pgx/v5/stdlib" // required for SQL access - "github.com/jmoiron/sqlx" - "github.com/ory/dockertest/v3" - "github.com/ory/dockertest/v3/docker" -) - -var ( - idProvider = ulid.New() - db *sqlx.DB -) - -func TestMain(m *testing.M) { - pool, err := dockertest.NewPool("") - if err != nil { - log.Fatalf("Could not connect to docker: %s", err) - } - - container, err := pool.RunWithOptions(&dockertest.RunOptions{ - Repository: "postgres", - Tag: "16.2-alpine", - Env: []string{ - "POSTGRES_USER=test", - "POSTGRES_PASSWORD=test", - "POSTGRES_DB=test", - "listen_addresses = '*'", - }, - }, func(config *docker.HostConfig) { - config.AutoRemove = true - config.RestartPolicy = docker.RestartPolicy{Name: "no"} - }) - if err != nil { - log.Fatalf("Could not start container: %s", err) - } - - port := container.GetPort("5432/tcp") - - url := fmt.Sprintf("host=localhost port=%s user=test dbname=test password=test sslmode=disable", port) - if err := pool.Retry(func() error { - db, err = sqlx.Open("pgx", url) - if err != nil { - return err - } - return db.Ping() - }); err != nil { - log.Fatalf("Could not connect to docker: %s", err) - } - - dbConfig := pgclient.Config{ - Host: "localhost", - Port: port, - User: "test", - Pass: "test", - Name: "test", - SSLMode: "disable", - SSLCert: "", - SSLKey: "", - SSLRootCert: "", - } - - if db, err = pgclient.Setup(dbConfig, *postgres.Migration()); err != nil { - log.Fatalf("Could not setup test DB connection: %s", err) - } - - code := m.Run() - - // Defers will not be run when using os.Exit - db.Close() - if err := pool.Purge(container); err != nil { - log.Fatalf("Could not purge container: %s", err) - } - - os.Exit(code) -} diff --git a/consumers/notifiers/postgres/subscriptions.go b/consumers/notifiers/postgres/subscriptions.go deleted file mode 100644 index 1d445d9..0000000 --- a/consumers/notifiers/postgres/subscriptions.go +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package postgres - -import ( - "context" - "database/sql" - "fmt" - "strings" - - "github.com/absmach/magistrala/consumers/notifiers" - "github.com/absmach/magistrala/pkg/errors" - repoerr "github.com/absmach/magistrala/pkg/errors/repository" - "github.com/jackc/pgerrcode" - "github.com/jackc/pgx/v5/pgconn" -) - -var _ notifiers.SubscriptionsRepository = (*subscriptionsRepo)(nil) - -type subscriptionsRepo struct { - db Database -} - -// New instantiates a PostgreSQL implementation of Subscriptions repository. -func New(db Database) notifiers.SubscriptionsRepository { - return &subscriptionsRepo{ - db: db, - } -} - -func (repo subscriptionsRepo) Save(ctx context.Context, sub notifiers.Subscription) (string, error) { - q := `INSERT INTO subscriptions (id, owner_id, contact, topic) VALUES (:id, :owner_id, :contact, :topic) RETURNING id` - - dbSub := dbSubscription{ - ID: sub.ID, - OwnerID: sub.OwnerID, - Contact: sub.Contact, - Topic: sub.Topic, - } - - row, err := repo.db.NamedQueryContext(ctx, q, dbSub) - if err != nil { - if pqErr, ok := err.(*pgconn.PgError); ok && pqErr.Code == pgerrcode.UniqueViolation { - return "", errors.Wrap(repoerr.ErrConflict, err) - } - return "", errors.Wrap(repoerr.ErrCreateEntity, err) - } - defer row.Close() - - return sub.ID, nil -} - -func (repo subscriptionsRepo) Retrieve(ctx context.Context, id string) (notifiers.Subscription, error) { - q := `SELECT id, owner_id, contact, topic FROM subscriptions WHERE id = $1` - sub := dbSubscription{} - if err := repo.db.QueryRowxContext(ctx, q, id).StructScan(&sub); err != nil { - if err == sql.ErrNoRows { - return notifiers.Subscription{}, errors.Wrap(repoerr.ErrNotFound, err) - } - return notifiers.Subscription{}, errors.Wrap(repoerr.ErrViewEntity, err) - } - - return fromDBSub(sub), nil -} - -func (repo subscriptionsRepo) RetrieveAll(ctx context.Context, pm notifiers.PageMetadata) (notifiers.Page, error) { - q := `SELECT id, owner_id, contact, topic FROM subscriptions` - args := make(map[string]interface{}) - if pm.Topic != "" { - args["topic"] = pm.Topic - } - if pm.Contact != "" { - args["contact"] = pm.Contact - } - var condition string - if len(args) > 0 { - var cond []string - for k := range args { - cond = append(cond, fmt.Sprintf("%s = :%s", k, k)) - } - condition = fmt.Sprintf(" WHERE %s", strings.Join(cond, " AND ")) - q = fmt.Sprintf("%s%s", q, condition) - } - args["offset"] = pm.Offset - q = fmt.Sprintf("%s OFFSET :offset", q) - if pm.Limit > 0 { - q = fmt.Sprintf("%s LIMIT :limit", q) - args["limit"] = pm.Limit - } - - rows, err := repo.db.NamedQueryContext(ctx, q, args) - if err != nil { - return notifiers.Page{}, errors.Wrap(repoerr.ErrViewEntity, err) - } - defer rows.Close() - - var subs []notifiers.Subscription - for rows.Next() { - sub := dbSubscription{} - if err := rows.StructScan(&sub); err != nil { - return notifiers.Page{}, errors.Wrap(repoerr.ErrViewEntity, err) - } - subs = append(subs, fromDBSub(sub)) - } - - if len(subs) == 0 { - return notifiers.Page{}, repoerr.ErrNotFound - } - - cq := fmt.Sprintf(`SELECT COUNT(*) FROM subscriptions %s`, condition) - total, err := total(ctx, repo.db, cq, args) - if err != nil { - return notifiers.Page{}, errors.Wrap(repoerr.ErrViewEntity, err) - } - - ret := notifiers.Page{ - PageMetadata: pm, - Total: total, - Subscriptions: subs, - } - - return ret, nil -} - -func (repo subscriptionsRepo) Remove(ctx context.Context, id string) error { - q := `DELETE from subscriptions WHERE id = $1` - - if r := repo.db.QueryRowxContext(ctx, q, id); r.Err() != nil { - return errors.Wrap(repoerr.ErrRemoveEntity, r.Err()) - } - return nil -} - -func total(ctx context.Context, db Database, query string, params interface{}) (uint, error) { - rows, err := db.NamedQueryContext(ctx, query, params) - if err != nil { - return 0, err - } - defer rows.Close() - var total uint - if rows.Next() { - if err := rows.Scan(&total); err != nil { - return 0, err - } - } - return total, nil -} - -type dbSubscription struct { - ID string `db:"id"` - OwnerID string `db:"owner_id"` - Contact string `db:"contact"` - Topic string `db:"topic"` -} - -func fromDBSub(sub dbSubscription) notifiers.Subscription { - return notifiers.Subscription{ - ID: sub.ID, - OwnerID: sub.OwnerID, - Contact: sub.Contact, - Topic: sub.Topic, - } -} diff --git a/consumers/notifiers/postgres/subscriptions_test.go b/consumers/notifiers/postgres/subscriptions_test.go deleted file mode 100644 index 507de04..0000000 --- a/consumers/notifiers/postgres/subscriptions_test.go +++ /dev/null @@ -1,263 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package postgres_test - -import ( - "context" - "fmt" - "testing" - - "github.com/absmach/magistrala/consumers/notifiers" - "github.com/absmach/magistrala/consumers/notifiers/postgres" - "github.com/absmach/magistrala/pkg/errors" - repoerr "github.com/absmach/magistrala/pkg/errors/repository" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel" -) - -const ( - owner = "owner@example.com" - numSubs = 100 -) - -var tracer = otel.Tracer("tests") - -func TestSave(t *testing.T) { - dbMiddleware := postgres.NewDatabase(db, tracer) - repo := postgres.New(dbMiddleware) - - id1, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - - id2, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - - sub1 := notifiers.Subscription{ - OwnerID: id1, - ID: id1, - Contact: owner, - Topic: "topic.subtopic", - } - - sub2 := sub1 - sub2.ID = id2 - - cases := []struct { - desc string - sub notifiers.Subscription - id string - err error - }{ - { - desc: "save successfully", - sub: sub1, - id: id1, - err: nil, - }, - { - desc: "save duplicate", - sub: sub2, - id: "", - err: repoerr.ErrConflict, - }, - } - - for _, tc := range cases { - id, err := repo.Save(context.Background(), tc.sub) - assert.Equal(t, tc.id, id, fmt.Sprintf("%s: expected id %s got %s\n", tc.desc, tc.id, id)) - assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) - } -} - -func TestView(t *testing.T) { - dbMiddleware := postgres.NewDatabase(db, tracer) - repo := postgres.New(dbMiddleware) - - id, err := idProvider.ID() - require.Nil(t, err, fmt.Sprintf("got an error creating id: %s", err)) - - sub := notifiers.Subscription{ - OwnerID: id, - ID: id, - Contact: owner, - Topic: "view.subtopic", - } - - ret, err := repo.Save(context.Background(), sub) - require.Nil(t, err, fmt.Sprintf("creating subscription must not fail: %s", err)) - require.Equal(t, id, ret, fmt.Sprintf("provided id %s must be the same as the returned id %s", id, ret)) - - cases := []struct { - desc string - sub notifiers.Subscription - id string - err error - }{ - { - desc: "retrieve successfully", - sub: sub, - id: id, - err: nil, - }, - { - desc: "retrieve not existing", - sub: notifiers.Subscription{}, - id: "non-existing", - err: repoerr.ErrNotFound, - }, - } - - for _, tc := range cases { - sub, err := repo.Retrieve(context.Background(), tc.id) - assert.Equal(t, tc.sub, sub, fmt.Sprintf("%s: expected sub %v got %v\n", tc.desc, tc.sub, sub)) - assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) - } -} - -func TestRetrieveAll(t *testing.T) { - _, err := db.Exec("DELETE FROM subscriptions") - require.Nil(t, err, fmt.Sprintf("cleanup must not fail: %s", err)) - - dbMiddleware := postgres.NewDatabase(db, tracer) - repo := postgres.New(dbMiddleware) - - var subs []notifiers.Subscription - - for i := 0; i < numSubs; i++ { - id, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - sub := notifiers.Subscription{ - OwnerID: "owner", - ID: id, - Contact: owner, - Topic: fmt.Sprintf("list.subtopic.%d", i), - } - - ret, err := repo.Save(context.Background(), sub) - require.Nil(t, err, fmt.Sprintf("creating subscription must not fail: %s", err)) - require.Equal(t, id, ret, fmt.Sprintf("provided id %s must be the same as the returned id %s", id, ret)) - subs = append(subs, sub) - } - - cases := []struct { - desc string - pageMeta notifiers.PageMetadata - page notifiers.Page - err error - }{ - { - desc: "retrieve successfully", - pageMeta: notifiers.PageMetadata{ - Offset: 10, - Limit: 2, - }, - page: notifiers.Page{ - Total: numSubs, - PageMetadata: notifiers.PageMetadata{ - Offset: 10, - Limit: 2, - }, - Subscriptions: subs[10:12], - }, - err: nil, - }, - { - desc: "retrieve with contact", - pageMeta: notifiers.PageMetadata{ - Offset: 10, - Limit: 2, - Contact: owner, - }, - page: notifiers.Page{ - Total: numSubs, - PageMetadata: notifiers.PageMetadata{ - Offset: 10, - Limit: 2, - Contact: owner, - }, - Subscriptions: subs[10:12], - }, - err: nil, - }, - { - desc: "retrieve with topic", - pageMeta: notifiers.PageMetadata{ - Offset: 0, - Limit: 2, - Topic: "list.subtopic.11", - }, - page: notifiers.Page{ - Total: 1, - PageMetadata: notifiers.PageMetadata{ - Offset: 0, - Limit: 2, - Topic: "list.subtopic.11", - }, - Subscriptions: subs[11:12], - }, - err: nil, - }, - { - desc: "retrieve with no limit", - pageMeta: notifiers.PageMetadata{ - Offset: 0, - Limit: -1, - }, - page: notifiers.Page{ - Total: numSubs, - PageMetadata: notifiers.PageMetadata{ - Limit: -1, - }, - Subscriptions: subs, - }, - err: nil, - }, - } - - for _, tc := range cases { - page, err := repo.RetrieveAll(context.Background(), tc.pageMeta) - assert.Equal(t, tc.page, page, fmt.Sprintf("%s: got unexpected page\n", tc.desc)) - assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) - } -} - -func TestRemove(t *testing.T) { - dbMiddleware := postgres.NewDatabase(db, tracer) - repo := postgres.New(dbMiddleware) - id, err := idProvider.ID() - require.Nil(t, err, fmt.Sprintf("got an error creating id: %s", err)) - sub := notifiers.Subscription{ - OwnerID: id, - ID: id, - Contact: owner, - Topic: "remove.subtopic.%d", - } - - ret, err := repo.Save(context.Background(), sub) - require.Nil(t, err, fmt.Sprintf("creating subscription must not fail: %s", err)) - require.Equal(t, id, ret, fmt.Sprintf("provided id %s must be the same as the returned id %s", id, ret)) - - cases := []struct { - desc string - id string - err error - }{ - { - desc: "remove successfully", - id: id, - err: nil, - }, - { - desc: "remove not existing", - id: "empty", - err: nil, - }, - } - - for _, tc := range cases { - err := repo.Remove(context.Background(), tc.id) - assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) - } -} diff --git a/consumers/notifiers/service.go b/consumers/notifiers/service.go deleted file mode 100644 index 563fd59..0000000 --- a/consumers/notifiers/service.go +++ /dev/null @@ -1,174 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package notifiers - -import ( - "context" - "fmt" - - "github.com/absmach/magistrala" - "github.com/absmach/magistrala/consumers" - "github.com/absmach/magistrala/pkg/errors" - svcerr "github.com/absmach/magistrala/pkg/errors/service" - "github.com/absmach/magistrala/pkg/messaging" -) - -// ErrMessage indicates an error converting a message to Magistrala message. -var ErrMessage = errors.New("failed to convert to Magistrala message") - -var _ consumers.AsyncConsumer = (*notifierService)(nil) - -// Service reprents a notification service. -// -//go:generate mockery --name Service --output=./mocks --filename service.go --quiet --note "Copyright (c) Abstract Machines" -type Service interface { - // CreateSubscription persists a subscription. - // Successful operation is indicated by non-nil error response. - CreateSubscription(ctx context.Context, token string, sub Subscription) (string, error) - - // ViewSubscription retrieves the subscription for the given user and id. - ViewSubscription(ctx context.Context, token, id string) (Subscription, error) - - // ListSubscriptions lists subscriptions having the provided user token and search params. - ListSubscriptions(ctx context.Context, token string, pm PageMetadata) (Page, error) - - // RemoveSubscription removes the subscription having the provided identifier. - RemoveSubscription(ctx context.Context, token, id string) error - - consumers.BlockingConsumer -} - -var _ Service = (*notifierService)(nil) - -type notifierService struct { - auth magistrala.AuthServiceClient - subs SubscriptionsRepository - idp magistrala.IDProvider - notifier Notifier - errCh chan error - from string -} - -// New instantiates the subscriptions service implementation. -func New(auth magistrala.AuthServiceClient, subs SubscriptionsRepository, idp magistrala.IDProvider, notifier Notifier, from string) Service { - return ¬ifierService{ - auth: auth, - subs: subs, - idp: idp, - notifier: notifier, - errCh: make(chan error, 1), - from: from, - } -} - -func (ns *notifierService) CreateSubscription(ctx context.Context, token string, sub Subscription) (string, error) { - res, err := ns.auth.Identify(ctx, &magistrala.IdentityReq{Token: token}) - if err != nil { - return "", err - } - sub.ID, err = ns.idp.ID() - if err != nil { - return "", err - } - - sub.OwnerID = res.GetId() - id, err := ns.subs.Save(ctx, sub) - if err != nil { - return "", errors.Wrap(svcerr.ErrCreateEntity, err) - } - return id, nil -} - -func (ns *notifierService) ViewSubscription(ctx context.Context, token, id string) (Subscription, error) { - if _, err := ns.auth.Identify(ctx, &magistrala.IdentityReq{Token: token}); err != nil { - return Subscription{}, err - } - - return ns.subs.Retrieve(ctx, id) -} - -func (ns *notifierService) ListSubscriptions(ctx context.Context, token string, pm PageMetadata) (Page, error) { - if _, err := ns.auth.Identify(ctx, &magistrala.IdentityReq{Token: token}); err != nil { - return Page{}, err - } - - return ns.subs.RetrieveAll(ctx, pm) -} - -func (ns *notifierService) RemoveSubscription(ctx context.Context, token, id string) error { - if _, err := ns.auth.Identify(ctx, &magistrala.IdentityReq{Token: token}); err != nil { - return err - } - - return ns.subs.Remove(ctx, id) -} - -func (ns *notifierService) ConsumeBlocking(ctx context.Context, message interface{}) error { - msg, ok := message.(*messaging.Message) - if !ok { - return ErrMessage - } - topic := msg.GetChannel() - if msg.GetSubtopic() != "" { - topic = fmt.Sprintf("%s.%s", msg.GetChannel(), msg.GetSubtopic()) - } - pm := PageMetadata{ - Topic: topic, - Offset: 0, - Limit: -1, - } - page, err := ns.subs.RetrieveAll(ctx, pm) - if err != nil { - return err - } - - var to []string - for _, sub := range page.Subscriptions { - to = append(to, sub.Contact) - } - if len(to) > 0 { - err := ns.notifier.Notify(ns.from, to, msg) - if err != nil { - return errors.Wrap(ErrNotify, err) - } - } - - return nil -} - -func (ns *notifierService) ConsumeAsync(ctx context.Context, message interface{}) { - msg, ok := message.(*messaging.Message) - if !ok { - ns.errCh <- ErrMessage - return - } - topic := msg.GetChannel() - if msg.GetSubtopic() != "" { - topic = fmt.Sprintf("%s.%s", msg.GetChannel(), msg.GetSubtopic()) - } - pm := PageMetadata{ - Topic: topic, - Offset: 0, - Limit: -1, - } - page, err := ns.subs.RetrieveAll(ctx, pm) - if err != nil { - ns.errCh <- err - return - } - - var to []string - for _, sub := range page.Subscriptions { - to = append(to, sub.Contact) - } - if len(to) > 0 { - if err := ns.notifier.Notify(ns.from, to, msg); err != nil { - ns.errCh <- errors.Wrap(ErrNotify, err) - } - } -} - -func (ns *notifierService) Errors() <-chan error { - return ns.errCh -} diff --git a/consumers/notifiers/service_test.go b/consumers/notifiers/service_test.go deleted file mode 100644 index fe3ed7e..0000000 --- a/consumers/notifiers/service_test.go +++ /dev/null @@ -1,377 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package notifiers_test - -import ( - "context" - "fmt" - "testing" - - "github.com/absmach/magistrala" - authmocks "github.com/absmach/magistrala/auth/mocks" - "github.com/absmach/magistrala/consumers/notifiers" - "github.com/absmach/magistrala/consumers/notifiers/mocks" - "github.com/absmach/magistrala/pkg/errors" - repoerr "github.com/absmach/magistrala/pkg/errors/repository" - svcerr "github.com/absmach/magistrala/pkg/errors/service" - "github.com/absmach/magistrala/pkg/messaging" - "github.com/absmach/magistrala/pkg/uuid" - "github.com/absmach/mg-contrib/pkg/testsutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" -) - -const ( - total = 100 - exampleUser1 = "token1" - exampleUser2 = "token2" - validID = "d4ebb847-5d0e-4e46-bdd9-b6aceaaa3a22" -) - -func newService() (notifiers.Service, *authmocks.AuthClient, *mocks.SubscriptionsRepository) { - repo := new(mocks.SubscriptionsRepository) - auth := new(authmocks.AuthClient) - notifier := new(mocks.Notifier) - idp := uuid.NewMock() - from := "exampleFrom" - return notifiers.New(auth, repo, idp, notifier, from), auth, repo -} - -func TestCreateSubscription(t *testing.T) { - svc, auth, repo := newService() - - cases := []struct { - desc string - token string - sub notifiers.Subscription - id string - err error - identifyErr error - userID string - }{ - { - desc: "test success", - token: exampleUser1, - sub: notifiers.Subscription{Contact: exampleUser1, Topic: "valid.topic"}, - id: uuid.Prefix + fmt.Sprintf("%012d", 1), - err: nil, - identifyErr: nil, - userID: validID, - }, - { - desc: "test already existing", - token: exampleUser1, - sub: notifiers.Subscription{Contact: exampleUser1, Topic: "valid.topic"}, - id: "", - err: repoerr.ErrConflict, - identifyErr: nil, - userID: validID, - }, - { - desc: "test with empty token", - token: "", - sub: notifiers.Subscription{Contact: exampleUser1, Topic: "valid.topic"}, - id: "", - err: svcerr.ErrAuthentication, - identifyErr: svcerr.ErrAuthentication, - }, - } - - for _, tc := range cases { - repoCall := auth.On("Identify", context.Background(), &magistrala.IdentityReq{Token: tc.token}).Return(&magistrala.IdentityRes{Id: tc.userID}, tc.identifyErr) - repoCall1 := repo.On("Save", context.Background(), mock.Anything).Return(tc.id, tc.err) - id, err := svc.CreateSubscription(context.Background(), tc.token, tc.sub) - assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) - assert.Equal(t, tc.id, id, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.id, id)) - repoCall.Unset() - repoCall1.Unset() - } -} - -func TestViewSubscription(t *testing.T) { - svc, auth, repo := newService() - sub := notifiers.Subscription{ - Contact: exampleUser1, - Topic: "valid.topic", - ID: testsutil.GenerateUUID(t), - OwnerID: validID, - } - - cases := []struct { - desc string - token string - id string - sub notifiers.Subscription - err error - identifyErr error - userID string - }{ - { - desc: "test success", - token: exampleUser1, - id: validID, - sub: sub, - err: nil, - identifyErr: nil, - userID: validID, - }, - { - desc: "test not existing", - token: exampleUser1, - id: "not_exist", - sub: notifiers.Subscription{}, - err: svcerr.ErrNotFound, - identifyErr: nil, - userID: validID, - }, - { - desc: "test with empty token", - token: "", - id: validID, - sub: notifiers.Subscription{}, - err: svcerr.ErrAuthentication, - identifyErr: svcerr.ErrAuthentication, - }, - } - - for _, tc := range cases { - repoCall := auth.On("Identify", context.Background(), &magistrala.IdentityReq{Token: tc.token}).Return(&magistrala.IdentityRes{Id: tc.userID}, tc.identifyErr) - repoCall1 := repo.On("Retrieve", context.Background(), tc.id).Return(tc.sub, tc.err) - sub, err := svc.ViewSubscription(context.Background(), tc.token, tc.id) - assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) - assert.Equal(t, tc.sub, sub, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.sub, sub)) - repoCall.Unset() - repoCall1.Unset() - } -} - -func TestListSubscriptions(t *testing.T) { - svc, auth, repo := newService() - sub := notifiers.Subscription{Contact: exampleUser1, OwnerID: exampleUser1} - topic := "topic.subtopic" - var subs []notifiers.Subscription - for i := 0; i < total; i++ { - tmp := sub - if i%2 == 0 { - tmp.Contact = exampleUser2 - tmp.OwnerID = exampleUser2 - } - tmp.Topic = fmt.Sprintf("%s.%d", topic, i) - tmp.ID = testsutil.GenerateUUID(t) - tmp.OwnerID = validID - subs = append(subs, tmp) - } - - var offsetSubs []notifiers.Subscription - for i := 20; i < 40; i += 2 { - offsetSubs = append(offsetSubs, subs[i]) - } - - cases := []struct { - desc string - token string - pageMeta notifiers.PageMetadata - page notifiers.Page - err error - identifyErr error - userID string - }{ - { - desc: "test success", - token: exampleUser1, - pageMeta: notifiers.PageMetadata{ - Offset: 0, - Limit: 3, - }, - err: nil, - page: notifiers.Page{ - PageMetadata: notifiers.PageMetadata{ - Offset: 0, - Limit: 3, - }, - Subscriptions: subs[:3], - Total: total, - }, - identifyErr: nil, - userID: validID, - }, - { - desc: "test not existing", - token: exampleUser1, - pageMeta: notifiers.PageMetadata{ - Limit: 10, - Contact: "empty@example.com", - }, - page: notifiers.Page{}, - err: svcerr.ErrNotFound, - identifyErr: nil, - userID: validID, - }, - { - desc: "test with empty token", - token: "", - pageMeta: notifiers.PageMetadata{ - Offset: 2, - Limit: 12, - Topic: "topic.subtopic.13", - }, - page: notifiers.Page{}, - err: svcerr.ErrAuthentication, - identifyErr: svcerr.ErrAuthentication, - }, - { - desc: "test with topic", - token: exampleUser1, - pageMeta: notifiers.PageMetadata{ - Limit: 10, - Topic: fmt.Sprintf("%s.%d", topic, 4), - }, - page: notifiers.Page{ - PageMetadata: notifiers.PageMetadata{ - Limit: 10, - Topic: fmt.Sprintf("%s.%d", topic, 4), - }, - Subscriptions: subs[4:5], - Total: 1, - }, - err: nil, - identifyErr: nil, - userID: validID, - }, - { - desc: "test with contact and offset", - token: exampleUser1, - pageMeta: notifiers.PageMetadata{ - Offset: 10, - Limit: 10, - Contact: exampleUser2, - }, - page: notifiers.Page{ - PageMetadata: notifiers.PageMetadata{ - Offset: 10, - Limit: 10, - Contact: exampleUser2, - }, - Subscriptions: offsetSubs, - Total: uint(total / 2), - }, - err: nil, - identifyErr: nil, - userID: validID, - }, - } - - for _, tc := range cases { - repoCall := auth.On("Identify", context.Background(), &magistrala.IdentityReq{Token: tc.token}).Return(&magistrala.IdentityRes{Id: tc.userID}, tc.identifyErr) - repoCall1 := repo.On("RetrieveAll", context.Background(), tc.pageMeta).Return(tc.page, tc.err) - page, err := svc.ListSubscriptions(context.Background(), tc.token, tc.pageMeta) - assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) - assert.Equal(t, tc.page, page, fmt.Sprintf("%s: got unexpected page\n", tc.desc)) - repoCall.Unset() - repoCall1.Unset() - } -} - -func TestRemoveSubscription(t *testing.T) { - svc, auth, repo := newService() - sub := notifiers.Subscription{ - Contact: exampleUser1, - Topic: "valid.topic", - ID: testsutil.GenerateUUID(t), - OwnerID: validID, - } - - cases := []struct { - desc string - token string - id string - err error - identifyErr error - userID string - }{ - { - desc: "test success", - token: exampleUser1, - id: sub.ID, - err: nil, - identifyErr: nil, - userID: validID, - }, - { - desc: "test not existing", - token: exampleUser1, - id: "not_exist", - err: svcerr.ErrNotFound, - identifyErr: nil, - userID: validID, - }, - { - desc: "test with empty token", - token: "", - id: sub.ID, - err: svcerr.ErrAuthentication, - identifyErr: svcerr.ErrAuthentication, - }, - } - - for _, tc := range cases { - repoCall := auth.On("Identify", context.Background(), &magistrala.IdentityReq{Token: tc.token}).Return(&magistrala.IdentityRes{Id: tc.userID}, tc.identifyErr) - repoCall1 := repo.On("Remove", context.Background(), tc.id).Return(tc.err) - err := svc.RemoveSubscription(context.Background(), tc.token, tc.id) - assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) - repoCall.Unset() - repoCall1.Unset() - } -} - -func TestConsume(t *testing.T) { - svc, _, repo := newService() - sub := notifiers.Subscription{ - Contact: exampleUser1, - OwnerID: validID, - Topic: "topic.subtopic", - } - for i := 0; i < total; i++ { - tmp := sub - tmp.Contact = fmt.Sprintf("contact%d@example.com", i) - if i%2 == 0 { - tmp.Topic = fmt.Sprintf("%s-2", sub.Topic) - } - } - sub.Contact = "invalid@example.com" - sub.Topic = fmt.Sprintf("%s-2", sub.Topic) - - msg := messaging.Message{ - Channel: "topic", - Subtopic: "subtopic", - } - errMsg := messaging.Message{ - Channel: "topic", - Subtopic: "subtopic-2", - } - - cases := []struct { - desc string - msg *messaging.Message - err error - }{ - { - desc: "test success", - msg: &msg, - err: nil, - }, - { - desc: "test fail", - msg: &errMsg, - err: notifiers.ErrNotify, - }, - } - - for _, tc := range cases { - repoCall := repo.On("RetrieveAll", context.TODO(), mock.Anything).Return(notifiers.Page{}, tc.err) - err := svc.ConsumeBlocking(context.TODO(), tc.msg) - assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) - repoCall.Unset() - } -} diff --git a/consumers/notifiers/subscriptions.go b/consumers/notifiers/subscriptions.go deleted file mode 100644 index dcaf4eb..0000000 --- a/consumers/notifiers/subscriptions.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package notifiers - -import "context" - -// Subscription represents a user Subscription. -type Subscription struct { - ID string - OwnerID string - Contact string - Topic string -} - -// Page represents page metadata with content. -type Page struct { - PageMetadata - Total uint - Subscriptions []Subscription -} - -// PageMetadata contains page metadata that helps navigation. -type PageMetadata struct { - Offset uint - // Limit values less than 0 indicate no limit. - Limit int - Topic string - Contact string -} - -// SubscriptionsRepository specifies a Subscription persistence API. -// -//go:generate mockery --name SubscriptionsRepository --output=./mocks --filename repository.go --quiet --note "Copyright (c) Abstract Machines" -type SubscriptionsRepository interface { - // Save persists a subscription. Successful operation is indicated by non-nil - // error response. - Save(ctx context.Context, sub Subscription) (string, error) - - // Retrieve retrieves the subscription for the given id. - Retrieve(ctx context.Context, id string) (Subscription, error) - - // RetrieveAll retrieves all the subscriptions for the given page metadata. - RetrieveAll(ctx context.Context, pm PageMetadata) (Page, error) - - // Remove removes the subscription for the given ID. - Remove(ctx context.Context, id string) error -} diff --git a/consumers/notifiers/tracing/doc.go b/consumers/notifiers/tracing/doc.go deleted file mode 100644 index 2d65dbe..0000000 --- a/consumers/notifiers/tracing/doc.go +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -// Package tracing provides tracing instrumentation for Magistrala WebSocket adapter service. -// -// This package provides tracing middleware for Magistrala WebSocket adapter service. -// It can be used to trace incoming requests and add tracing capabilities to -// Magistrala WebSocket adapter service. -// -// For more details about tracing instrumentation for Magistrala messaging refer -// to the documentation at https://docs.magistrala.abstractmachines.fr/tracing/. -package tracing diff --git a/consumers/notifiers/tracing/subscriptions.go b/consumers/notifiers/tracing/subscriptions.go deleted file mode 100644 index c8c2920..0000000 --- a/consumers/notifiers/tracing/subscriptions.go +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -// Package tracing contains middlewares that will add spans -// to existing traces. -package tracing - -import ( - "context" - - "github.com/absmach/magistrala/consumers/notifiers" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" -) - -const ( - saveOp = "save_op" - retrieveOp = "retrieve_op" - retrieveAllOp = "retrieve_all_op" - removeOp = "remove_op" -) - -var _ notifiers.SubscriptionsRepository = (*subRepositoryMiddleware)(nil) - -type subRepositoryMiddleware struct { - tracer trace.Tracer - repo notifiers.SubscriptionsRepository -} - -// New instantiates a new Subscriptions repository that -// tracks request and their latency, and adds spans to context. -func New(tracer trace.Tracer, repo notifiers.SubscriptionsRepository) notifiers.SubscriptionsRepository { - return subRepositoryMiddleware{ - tracer: tracer, - repo: repo, - } -} - -// Save traces the "Save" operation of the wrapped Subscriptions repository. -func (urm subRepositoryMiddleware) Save(ctx context.Context, sub notifiers.Subscription) (string, error) { - ctx, span := urm.tracer.Start(ctx, saveOp, trace.WithAttributes( - attribute.String("id", sub.ID), - attribute.String("contact", sub.Contact), - attribute.String("topic", sub.Topic), - )) - defer span.End() - - return urm.repo.Save(ctx, sub) -} - -// Retrieve traces the "Retrieve" operation of the wrapped Subscriptions repository. -func (urm subRepositoryMiddleware) Retrieve(ctx context.Context, id string) (notifiers.Subscription, error) { - ctx, span := urm.tracer.Start(ctx, retrieveOp, trace.WithAttributes(attribute.String("id", id))) - defer span.End() - - return urm.repo.Retrieve(ctx, id) -} - -// RetrieveAll traces the "RetrieveAll" operation of the wrapped Subscriptions repository. -func (urm subRepositoryMiddleware) RetrieveAll(ctx context.Context, pm notifiers.PageMetadata) (notifiers.Page, error) { - ctx, span := urm.tracer.Start(ctx, retrieveAllOp) - defer span.End() - - return urm.repo.RetrieveAll(ctx, pm) -} - -// Remove traces the "Remove" operation of the wrapped Subscriptions repository. -func (urm subRepositoryMiddleware) Remove(ctx context.Context, id string) error { - ctx, span := urm.tracer.Start(ctx, removeOp, trace.WithAttributes(attribute.String("id", id))) - defer span.End() - - return urm.repo.Remove(ctx, id) -} diff --git a/twins/api/http/endpoint_states_test.go b/twins/api/http/endpoint_states_test.go index 36ae0ef..a63b23b 100644 --- a/twins/api/http/endpoint_states_test.go +++ b/twins/api/http/endpoint_states_test.go @@ -24,8 +24,10 @@ import ( ) const ( - numRecs = 100 - publisher = "twins" + numRecs = 100 + publisher = "twins" + validToken = "validToken" + invalidToken = "invalidToken" ) var ( @@ -81,7 +83,7 @@ func TestListStates(t *testing.T) { queryFmt := "%s?offset=%d&limit=%d" cases := []struct { desc string - auth string + token string status int url string res []stateRes @@ -92,7 +94,7 @@ func TestListStates(t *testing.T) { }{ { desc: "get a list of states", - auth: token, + token: validToken, status: http.StatusOK, url: baseURL, res: data[0:10], @@ -105,7 +107,7 @@ func TestListStates(t *testing.T) { }, { desc: "get a list of states with valid offset and limit", - auth: token, + token: validToken, status: http.StatusOK, url: fmt.Sprintf(queryFmt, baseURL, 20, 15), res: data[20:35], @@ -118,7 +120,7 @@ func TestListStates(t *testing.T) { }, { desc: "get a list of states with invalid token", - auth: authmocks.InvalidValue, + token: invalidToken, status: http.StatusUnauthorized, url: fmt.Sprintf(queryFmt, baseURL, 0, 5), res: nil, @@ -127,7 +129,7 @@ func TestListStates(t *testing.T) { }, { desc: "get a list of states with empty token", - auth: "", + token: "", status: http.StatusUnauthorized, url: fmt.Sprintf(queryFmt, baseURL, 0, 5), res: nil, @@ -136,7 +138,7 @@ func TestListStates(t *testing.T) { }, { desc: "get a list of states with + limit > total", - auth: token, + token: validToken, status: http.StatusOK, url: fmt.Sprintf(queryFmt, baseURL, 91, 20), res: data[91:], @@ -149,7 +151,7 @@ func TestListStates(t *testing.T) { }, { desc: "get a list of states with negative offset", - auth: token, + token: validToken, status: http.StatusBadRequest, url: fmt.Sprintf(queryFmt, baseURL, -1, 5), res: nil, @@ -159,7 +161,7 @@ func TestListStates(t *testing.T) { }, { desc: "get a list of states with negative limit", - auth: token, + token: validToken, status: http.StatusBadRequest, url: fmt.Sprintf(queryFmt, baseURL, 0, -5), res: nil, @@ -169,7 +171,7 @@ func TestListStates(t *testing.T) { }, { desc: "get a list of states with zero limit", - auth: token, + token: validToken, status: http.StatusBadRequest, url: fmt.Sprintf(queryFmt, baseURL, 0, 0), res: nil, @@ -179,7 +181,7 @@ func TestListStates(t *testing.T) { }, { desc: "get a list of states with limit greater than max", - auth: token, + token: validToken, status: http.StatusBadRequest, url: fmt.Sprintf(queryFmt, baseURL, 0, 110), res: nil, @@ -189,7 +191,7 @@ func TestListStates(t *testing.T) { }, { desc: "get a list of states with invalid offset", - auth: token, + token: validToken, status: http.StatusBadRequest, url: fmt.Sprintf("%s?offset=invalid&limit=%d", baseURL, 15), res: nil, @@ -199,7 +201,7 @@ func TestListStates(t *testing.T) { }, { desc: "get a list of states with invalid limit", - auth: token, + token: validToken, status: http.StatusBadRequest, url: fmt.Sprintf("%s?offset=%d&limit=invalid", baseURL, 0), res: nil, @@ -209,7 +211,7 @@ func TestListStates(t *testing.T) { }, { desc: "get a list of states without offset", - auth: token, + token: validToken, status: http.StatusOK, url: fmt.Sprintf("%s?limit=%d", baseURL, 15), res: data[0:15], @@ -222,7 +224,7 @@ func TestListStates(t *testing.T) { }, { desc: "get a list of states without limit", - auth: token, + token: validToken, status: http.StatusOK, url: fmt.Sprintf("%s?offset=%d", baseURL, 14), res: data[14:24], @@ -235,7 +237,7 @@ func TestListStates(t *testing.T) { }, { desc: "get a list of states with invalid number of parameters", - auth: token, + token: validToken, status: http.StatusBadRequest, url: fmt.Sprintf("%s%s", baseURL, "?offset=4&limit=4&limit=5&offset=5"), res: nil, @@ -245,7 +247,7 @@ func TestListStates(t *testing.T) { }, { desc: "get a list of states with redundant query parameters", - auth: token, + token: validToken, status: http.StatusOK, url: fmt.Sprintf("%s?offset=%d&limit=%d&value=something", baseURL, 0, 5), res: data[0:5], @@ -259,13 +261,13 @@ func TestListStates(t *testing.T) { } for _, tc := range cases { - authCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: tc.auth}).Return(&magistrala.IdentityRes{Id: tc.userID}, tc.identifyErr) + authCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: tc.token}).Return(&magistrala.IdentityRes{Id: tc.userID}, tc.identifyErr) repoCall := stateRepo.On("RetrieveAll", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.page, tc.err) req := testRequest{ client: ts.Client(), method: http.MethodGet, url: tc.url, - token: tc.auth, + token: tc.token, } res, err := req.make() assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err)) diff --git a/twins/api/http/endpoint_twins_test.go b/twins/api/http/endpoint_twins_test.go index 59f2609..6f401aa 100644 --- a/twins/api/http/endpoint_twins_test.go +++ b/twins/api/http/endpoint_twins_test.go @@ -14,7 +14,6 @@ import ( "testing" "github.com/absmach/magistrala" - authmocks "github.com/absmach/magistrala/auth/mocks" mglog "github.com/absmach/magistrala/logger" "github.com/absmach/magistrala/pkg/apiutil" svcerr "github.com/absmach/magistrala/pkg/errors/service" @@ -26,15 +25,16 @@ import ( ) const ( - twinName = "name" - contentType = "application/json" - email = "user@example.com" - token = "token" - wrongID = 0 - maxNameSize = 1024 - instanceID = "5de9b29a-feb9-11ed-be56-0242ac120002" - retained = "saved" - validID = "123e4567-e89b-12d3-a456-426614174000" + twinName = "name" + contentType = "application/json" + email = "user@example.com" + token = "token" + invalidtoken = "invalid" + wrongID = 0 + maxNameSize = 1024 + instanceID = "5de9b29a-feb9-11ed-be56-0242ac120002" + retained = "saved" + validID = "123e4567-e89b-12d3-a456-426614174000" ) var invalidName = strings.Repeat("m", maxNameSize+1) @@ -153,7 +153,7 @@ func TestAddTwin(t *testing.T) { desc: "add twin with invalid auth token", req: data, contentType: contentType, - auth: authmocks.InvalidValue, + auth: invalidtoken, status: http.StatusUnauthorized, location: "", err: svcerr.ErrAuthentication, @@ -318,7 +318,7 @@ func TestUpdateTwin(t *testing.T) { req: data, id: twin.ID, contentType: contentType, - auth: authmocks.InvalidValue, + auth: invalidtoken, status: http.StatusUnauthorized, err: svcerr.ErrAuthentication, retrieveErr: svcerr.ErrNotFound, @@ -468,7 +468,7 @@ func TestViewTwin(t *testing.T) { { desc: "view twin by passing invalid token", id: twin.ID, - auth: authmocks.InvalidValue, + auth: invalidtoken, status: http.StatusForbidden, res: twinRes{}, err: svcerr.ErrAuthentication, @@ -560,7 +560,7 @@ func TestListTwins(t *testing.T) { }, { desc: "get a list of twins with invalid token", - auth: authmocks.InvalidValue, + auth: invalidtoken, status: http.StatusUnauthorized, url: fmt.Sprintf(queryFmt, baseURL, 0, 1), res: nil, @@ -767,10 +767,7 @@ func TestRemoveTwin(t *testing.T) { defer ts.Close() twin := twins.Twin{ - Owner: email, - ID: testsutil.GenerateUUID(t), - Name: twinName, - Revision: 50, + ID: testsutil.GenerateUUID(t), } cases := []struct { @@ -816,7 +813,7 @@ func TestRemoveTwin(t *testing.T) { { desc: "delete twin with invalid token", id: twin.ID, - auth: authmocks.InvalidValue, + auth: invalidtoken, status: http.StatusUnauthorized, err: svcerr.ErrAuthentication, removeErr: svcerr.ErrRemoveEntity, diff --git a/twins/mongodb/states_test.go b/twins/mongodb/states_test.go index 95184c3..b22f704 100644 --- a/twins/mongodb/states_test.go +++ b/twins/mongodb/states_test.go @@ -9,8 +9,8 @@ import ( "testing" "time" - "github.com/absmach/mg-contrib/twins" - "github.com/absmach/mg-contrib/twins/mongodb" + "github.com/absmach/magistrala/twins" + "github.com/absmach/magistrala/twins/mongodb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/bson" diff --git a/twins/mongodb/twins_test.go b/twins/mongodb/twins_test.go index e13b958..fbbbe90 100644 --- a/twins/mongodb/twins_test.go +++ b/twins/mongodb/twins_test.go @@ -40,7 +40,7 @@ var ( invalidName = strings.Repeat("m", maxNameSize+1) ) -func TestTwinsSave(t *testing.T) { +func Testtwinsave(t *testing.T) { client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr)) require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err)) diff --git a/twins/service.go b/twins/service.go index 25a4a49..1783264 100644 --- a/twins/service.go +++ b/twins/service.go @@ -74,7 +74,7 @@ var crudOp = map[string]string{ "stateFail": "save.failure", } -type twinsService struct { +type twinservice struct { publisher messaging.Publisher auth magistrala.AuthServiceClient twins TwinRepository @@ -85,11 +85,11 @@ type twinsService struct { logger *slog.Logger } -var _ Service = (*twinsService)(nil) +var _ Service = (*twinservice)(nil) // New instantiates the twins service implementation. func New(publisher messaging.Publisher, auth magistrala.AuthServiceClient, twins TwinRepository, tcache TwinCache, sr StateRepository, idp magistrala.IDProvider, chann string, logger *slog.Logger) Service { - return &twinsService{ + return &twinservice{ publisher: publisher, auth: auth, twins: twins, @@ -101,7 +101,7 @@ func New(publisher messaging.Publisher, auth magistrala.AuthServiceClient, twins } } -func (ts *twinsService) AddTwin(ctx context.Context, token string, twin Twin, def Definition) (tw Twin, err error) { +func (ts *twinservice) AddTwin(ctx context.Context, token string, twin Twin, def Definition) (tw Twin, err error) { var id string var b []byte defer ts.publish(ctx, &id, &err, crudOp["createSucc"], crudOp["createFail"], &b) @@ -143,7 +143,7 @@ func (ts *twinsService) AddTwin(ctx context.Context, token string, twin Twin, de return twin, ts.twinCache.Save(ctx, twin) } -func (ts *twinsService) UpdateTwin(ctx context.Context, token string, twin Twin, def Definition) (err error) { +func (ts *twinservice) UpdateTwin(ctx context.Context, token string, twin Twin, def Definition) (err error) { var b []byte var id string defer ts.publish(ctx, &id, &err, crudOp["updateSucc"], crudOp["updateFail"], &b) @@ -194,7 +194,7 @@ func (ts *twinsService) UpdateTwin(ctx context.Context, token string, twin Twin, return ts.twinCache.Update(ctx, twin) } -func (ts *twinsService) ViewTwin(ctx context.Context, token, twinID string) (tw Twin, err error) { +func (ts *twinservice) ViewTwin(ctx context.Context, token, twinID string) (tw Twin, err error) { var b []byte defer ts.publish(ctx, &twinID, &err, crudOp["getSucc"], crudOp["getFail"], &b) @@ -213,7 +213,7 @@ func (ts *twinsService) ViewTwin(ctx context.Context, token, twinID string) (tw return twin, nil } -func (ts *twinsService) RemoveTwin(ctx context.Context, token, twinID string) (err error) { +func (ts *twinservice) RemoveTwin(ctx context.Context, token, twinID string) (err error) { var b []byte defer ts.publish(ctx, &twinID, &err, crudOp["removeSucc"], crudOp["removeFail"], &b) @@ -229,7 +229,7 @@ func (ts *twinsService) RemoveTwin(ctx context.Context, token, twinID string) (e return ts.twinCache.Remove(ctx, twinID) } -func (ts *twinsService) ListTwins(ctx context.Context, token string, offset, limit uint64, name string, metadata Metadata) (Page, error) { +func (ts *twinservice) ListTwins(ctx context.Context, token string, offset, limit uint64, name string, metadata Metadata) (Page, error) { res, err := ts.auth.Identify(ctx, &magistrala.IdentityReq{Token: token}) if err != nil { return Page{}, errors.Wrap(svcerr.ErrAuthentication, err) @@ -238,7 +238,7 @@ func (ts *twinsService) ListTwins(ctx context.Context, token string, offset, lim return ts.twins.RetrieveAll(ctx, res.GetId(), offset, limit, name, metadata) } -func (ts *twinsService) ListStates(ctx context.Context, token string, offset, limit uint64, twinID string) (StatesPage, error) { +func (ts *twinservice) ListStates(ctx context.Context, token string, offset, limit uint64, twinID string) (StatesPage, error) { _, err := ts.auth.Identify(ctx, &magistrala.IdentityReq{Token: token}) if err != nil { return StatesPage{}, svcerr.ErrAuthentication @@ -247,7 +247,7 @@ func (ts *twinsService) ListStates(ctx context.Context, token string, offset, li return ts.states.RetrieveAll(ctx, offset, limit, twinID) } -func (ts *twinsService) SaveStates(ctx context.Context, msg *messaging.Message) error { +func (ts *twinservice) SaveStates(ctx context.Context, msg *messaging.Message) error { var ids []string channel, subtopic := msg.GetChannel(), msg.GetSubtopic() @@ -277,7 +277,7 @@ func (ts *twinsService) SaveStates(ctx context.Context, msg *messaging.Message) return nil } -func (ts *twinsService) saveState(ctx context.Context, msg *messaging.Message, twinID string) error { +func (ts *twinservice) saveState(ctx context.Context, msg *messaging.Message, twinID string) error { var b []byte var err error @@ -320,7 +320,7 @@ func (ts *twinsService) saveState(ctx context.Context, msg *messaging.Message, t return nil } -func (ts *twinsService) prepareState(st *State, tw *Twin, rec senml.Record, msg *messaging.Message) int { +func (ts *twinservice) prepareState(st *State, tw *Twin, rec senml.Record, msg *messaging.Message) int { def := tw.Definitions[len(tw.Definitions)-1] st.TwinID = tw.ID st.Definition = def.ID @@ -396,7 +396,7 @@ func findAttribute(name string, attrs []Attribute) (idx int) { return -1 } -func (ts *twinsService) publish(ctx context.Context, twinID *string, err *error, succOp, failOp string, payload *[]byte) { +func (ts *twinservice) publish(ctx context.Context, twinID *string, err *error, succOp, failOp string, payload *[]byte) { if ts.channelID == "" { return } diff --git a/twins/service_test.go b/twins/service_test.go index 36ab960..cd5658d 100644 --- a/twins/service_test.go +++ b/twins/service_test.go @@ -10,26 +10,27 @@ import ( "github.com/absmach/magistrala" authmocks "github.com/absmach/magistrala/auth/mocks" + "github.com/absmach/magistrala/internal/testsutil" mglog "github.com/absmach/magistrala/logger" "github.com/absmach/magistrala/pkg/errors" svcerr "github.com/absmach/magistrala/pkg/errors/service" "github.com/absmach/magistrala/pkg/uuid" - "github.com/absmach/mg-contrib/pkg/testsutil" - "github.com/absmach/mg-contrib/twins" - "github.com/absmach/mg-contrib/twins/mocks" + "github.com/absmach/magistrala/twins" + "github.com/absmach/magistrala/twins/mocks" "github.com/absmach/senml" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) const ( - twinName = "name" - wrongID = "" - token = "token" - email = "user@example.com" - numRecs = 100 - retained = "saved" - validID = "123e4567-e89b-12d3-a456-426614174000" + twinName = "name" + wrongID = "" + token = "token" + invalidToken = "invalidToken" + email = "user@example.com" + numRecs = 100 + retained = "saved" + validID = "123e4567-e89b-12d3-a456-426614174000" ) var ( @@ -75,7 +76,7 @@ func TestAddTwin(t *testing.T) { { desc: "add twin with wrong credentials", twin: twin, - token: authmocks.InvalidValue, + token: invalidToken, err: svcerr.ErrAuthentication, saveErr: svcerr.ErrCreateEntity, identifyErr: svcerr.ErrAuthentication, @@ -128,7 +129,7 @@ func TestUpdateTwin(t *testing.T) { { desc: "update twin with wrong credentials", twin: twin, - token: authmocks.InvalidValue, + token: invalidToken, err: svcerr.ErrAuthentication, retrieveErr: svcerr.ErrNotFound, updateErr: svcerr.ErrUpdateEntity, @@ -188,7 +189,7 @@ func TestViewTwin(t *testing.T) { { desc: "view twin with wrong credentials", id: twin.ID, - token: authmocks.InvalidValue, + token: invalidToken, err: svcerr.ErrAuthentication, identifyErr: svcerr.ErrAuthentication, }, @@ -265,7 +266,7 @@ func TestListTwins(t *testing.T) { }, { desc: "list with wrong credentials", - token: authmocks.InvalidValue, + token: invalidToken, limit: 0, offset: n, err: svcerr.ErrAuthentication, @@ -303,7 +304,7 @@ func TestRemoveTwin(t *testing.T) { { desc: "remove twin with wrong credentials", id: twin.ID, - token: authmocks.InvalidValue, + token: invalidToken, err: svcerr.ErrAuthentication, removeErr: svcerr.ErrRemoveEntity, identifyErr: svcerr.ErrAuthentication, @@ -545,7 +546,7 @@ func TestListStates(t *testing.T) { { desc: "get a list with wrong user token", id: twin.ID, - token: authmocks.InvalidValue, + token: invalidToken, offset: 0, limit: 10, size: 0,