From 59aef637373086a2beee70c7ad1a5f578b7c12cd Mon Sep 17 00:00:00 2001 From: Jakub Hrozek Date: Thu, 19 Sep 2024 17:05:45 +0200 Subject: [PATCH] Add new watermill handlers that get or refresh entities by properties and call another handler These will be used in webhook handlers instead of calling the property service directly. The handlers follow the same base logic, just with "pluggable" ways of retrieving the entity, converting the entity to properties and which handler to call next. Related: #4327 --- database/mock/fixtures/store.go | 41 ++ internal/entities/handlers/handler.go | 191 ++++++++ internal/entities/handlers/handler_test.go | 412 ++++++++++++++++++ internal/entities/handlers/message/message.go | 98 +++++ .../entities/handlers/message/message_test.go | 109 +++++ .../entity/add_originating_entity.go | 173 ++++++++ .../handlers/strategies/entity/common.go | 62 +++ .../entity/del_originating_entity.go | 140 ++++++ .../entity/get_by_upstream_props.go | 53 +++ .../entity/refresh_by_upstream_props.go | 78 ++++ .../handlers/strategies/message/empty.go | 41 ++ .../strategies/message/entity_info_wrapper.go | 91 ++++ .../strategies/message/minder_entity.go | 57 +++ .../handlers/strategies/strategies.go | 41 ++ internal/entities/models/models.go | 27 +- .../service/mock/fixtures/fixtures.go | 127 ++++++ internal/events/constants.go | 8 + internal/events/stubs/eventer.go | 9 +- internal/service/service.go | 5 + 19 files changed, 1751 insertions(+), 12 deletions(-) create mode 100644 internal/entities/handlers/handler.go create mode 100644 internal/entities/handlers/handler_test.go create mode 100644 internal/entities/handlers/message/message.go create mode 100644 internal/entities/handlers/message/message_test.go create mode 100644 internal/entities/handlers/strategies/entity/add_originating_entity.go create mode 100644 internal/entities/handlers/strategies/entity/common.go create mode 100644 internal/entities/handlers/strategies/entity/del_originating_entity.go create mode 100644 internal/entities/handlers/strategies/entity/get_by_upstream_props.go create mode 100644 internal/entities/handlers/strategies/entity/refresh_by_upstream_props.go create mode 100644 internal/entities/handlers/strategies/message/empty.go create mode 100644 internal/entities/handlers/strategies/message/entity_info_wrapper.go create mode 100644 internal/entities/handlers/strategies/message/minder_entity.go create mode 100644 internal/entities/handlers/strategies/strategies.go create mode 100644 internal/entities/properties/service/mock/fixtures/fixtures.go diff --git a/database/mock/fixtures/store.go b/database/mock/fixtures/store.go index bc73d3495d..0b8977381f 100644 --- a/database/mock/fixtures/store.go +++ b/database/mock/fixtures/store.go @@ -130,6 +130,22 @@ func WithSuccessfulUpsertPullRequest( } } +func WithSuccessfulUpsertPullRequestWithParams( + pullRequest db.PullRequest, + instance db.EntityInstance, + params db.UpsertPullRequestParams, + entParams db.CreateOrEnsureEntityByIDParams, +) func(*mockdb.MockStore) { + return func(mockStore *mockdb.MockStore) { + mockStore.EXPECT(). + UpsertPullRequest(gomock.Any(), params). + Return(pullRequest, nil) + mockStore.EXPECT(). + CreateOrEnsureEntityByID(gomock.Any(), entParams). + Return(instance, nil) + } +} + func WithSuccessfulDeletePullRequest() func(*mockdb.MockStore) { return func(mockStore *mockdb.MockStore) { mockStore.EXPECT(). @@ -180,3 +196,28 @@ func WithTransaction() func(*mockdb.MockStore) { Return(nil) } } + +func WithRollbackTransaction() func(*mockdb.MockStore) { + return func(mockStore *mockdb.MockStore) { + mockStore.EXPECT(). + BeginTransaction(). + Return(nil, nil) + mockStore.EXPECT(). + GetQuerierWithTransaction(gomock.Any()). + Return(mockStore) + mockStore.EXPECT(). + Rollback(gomock.Any()). + Return(nil) + } +} + +func WithSuccessfullGetEntityByID( + expID uuid.UUID, + entity db.EntityInstance, +) func(*mockdb.MockStore) { + return func(mockStore *mockdb.MockStore) { + mockStore.EXPECT(). + GetEntityByID(gomock.Any(), expID). + Return(entity, nil) + } +} diff --git a/internal/entities/handlers/handler.go b/internal/entities/handlers/handler.go new file mode 100644 index 0000000000..0146c61d3c --- /dev/null +++ b/internal/entities/handlers/handler.go @@ -0,0 +1,191 @@ +// +// Copyright 2024 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package handlers contains the message handlers for entities. +package handlers + +import ( + watermill "github.com/ThreeDotsLabs/watermill/message" + "github.com/rs/zerolog" + + "github.com/stacklok/minder/internal/db" + "github.com/stacklok/minder/internal/entities/handlers/message" + "github.com/stacklok/minder/internal/entities/handlers/strategies" + entStrategies "github.com/stacklok/minder/internal/entities/handlers/strategies/entity" + msgStrategies "github.com/stacklok/minder/internal/entities/handlers/strategies/message" + propertyService "github.com/stacklok/minder/internal/entities/properties/service" + "github.com/stacklok/minder/internal/events" + "github.com/stacklok/minder/internal/providers/manager" +) + +type handleEntityAndDoBase struct { + evt events.Publisher + + refreshEntity strategies.GetEntityStrategy + createMessage strategies.MessageCreateStrategy + + handlerName string + forwardHandlerName string + + handlerMiddleware []watermill.HandlerMiddleware +} + +// Register satisfies the events.Consumer interface. +func (b *handleEntityAndDoBase) Register(r events.Registrar) { + r.Register(b.handlerName, b.handleRefreshEntityAndDo, b.handlerMiddleware...) +} + +// handleRefreshEntityAndDo handles the refresh entity and forwarding a new message to the +// next handler. Creating the message and the way the entity is refreshed is determined by the +// strategies passed in. +// +// The handler doesn't retry on errors, it just logs them. We've had issues with retrying +// recently and it's unclear if there are any errors we /can/ retry on. We should identify +// errors to retry on and implement that in the future. +func (b *handleEntityAndDoBase) handleRefreshEntityAndDo(msg *watermill.Message) error { + ctx := msg.Context() + + l := zerolog.Ctx(ctx).With(). + Str("messageStrategy", b.createMessage.GetName()). + Str("refreshStrategy", b.refreshEntity.GetName()). + Logger() + + // unmarshal the message + entMsg, err := message.ToEntityRefreshAndDo(msg) + if err != nil { + l.Error().Err(err).Msg("error unpacking message") + return nil + } + l.Debug().Msg("message unpacked") + + // call refreshEntity + ewp, err := b.refreshEntity.GetEntity(ctx, entMsg) + if err != nil { + l.Error().Err(err).Msg("error refreshing entity") + // do not return error in the handler, just log it + // we might want to special-case retrying /some/ errors specifically those from the + // provider, but for now, just log it + return nil + } + + if ewp != nil { + l.Debug(). + Str("entityID", ewp.Entity.ID.String()). + Str("providerID", ewp.Entity.ProviderID.String()). + Msg("entity refreshed") + } else { + l.Debug().Msg("entity not retrieved") + } + + nextMsg, err := b.createMessage.CreateMessage(ctx, ewp) + if err != nil { + l.Error().Err(err).Msg("error creating message") + return nil + } + + // If nextMsg is nil, it means we don't need to publish anything (entity not found) + if nextMsg != nil { + l.Debug().Msg("publishing message") + if err := b.evt.Publish(b.forwardHandlerName, nextMsg); err != nil { + l.Error().Err(err).Msg("error publishing message") + return nil + } + } else { + l.Info().Msg("no message to publish") + } + + return nil +} + +// NewRefreshEntityAndEvaluateHandler creates a new handler that refreshes an entity and evaluates it. +func NewRefreshEntityAndEvaluateHandler( + evt events.Publisher, + store db.Store, + propSvc propertyService.PropertiesService, + provMgr manager.ProviderManager, + handlerMiddleware ...watermill.HandlerMiddleware, +) events.Consumer { + return &handleEntityAndDoBase{ + evt: evt, + + refreshEntity: entStrategies.NewRefreshEntityByUpstreamPropsStrategy(propSvc, provMgr, store), + createMessage: msgStrategies.NewToEntityInfoWrapper(store, propSvc, provMgr), + + handlerName: events.TopicQueueRefreshEntityAndEvaluate, + forwardHandlerName: events.TopicQueueEntityEvaluate, + + handlerMiddleware: handlerMiddleware, + } +} + +// NewGetEntityAndDeleteHandler creates a new handler that gets an entity and deletes it. +func NewGetEntityAndDeleteHandler( + evt events.Publisher, + propSvc propertyService.PropertiesService, + handlerMiddleware ...watermill.HandlerMiddleware, +) events.Consumer { + return &handleEntityAndDoBase{ + evt: evt, + + refreshEntity: entStrategies.NewGetEntityByUpstreamIDStrategy(propSvc), + createMessage: msgStrategies.NewToMinderEntity(), + + handlerName: events.TopicQueueGetEntityAndDelete, + forwardHandlerName: events.TopicQueueReconcileEntityDelete, + + handlerMiddleware: handlerMiddleware, + } +} + +// NewAddOriginatingEntityHandler creates a new handler that adds an originating entity. +func NewAddOriginatingEntityHandler( + evt events.Publisher, + store db.Store, + propSvc propertyService.PropertiesService, + provMgr manager.ProviderManager, + handlerMiddleware ...watermill.HandlerMiddleware, +) events.Consumer { + return &handleEntityAndDoBase{ + evt: evt, + + refreshEntity: entStrategies.NewAddOriginatingEntityStrategy(propSvc, provMgr, store), + createMessage: msgStrategies.NewToEntityInfoWrapper(store, propSvc, provMgr), + + handlerName: events.TopicQueueOriginatingEntityAdd, + forwardHandlerName: events.TopicQueueEntityEvaluate, + + handlerMiddleware: handlerMiddleware, + } +} + +// NewRemoveOriginatingEntityHandler creates a new handler that removes an originating entity. +func NewRemoveOriginatingEntityHandler( + evt events.Publisher, + store db.Store, + propSvc propertyService.PropertiesService, + provMgr manager.ProviderManager, + handlerMiddleware ...watermill.HandlerMiddleware, +) events.Consumer { + return &handleEntityAndDoBase{ + evt: evt, + + refreshEntity: entStrategies.NewDelOriginatingEntityStrategy(propSvc, provMgr, store), + createMessage: msgStrategies.NewCreateEmpty(), + + handlerName: events.TopicQueueOriginatingEntityDelete, + + handlerMiddleware: handlerMiddleware, + } +} diff --git a/internal/entities/handlers/handler_test.go b/internal/entities/handlers/handler_test.go new file mode 100644 index 0000000000..e26002dd31 --- /dev/null +++ b/internal/entities/handlers/handler_test.go @@ -0,0 +1,412 @@ +// +// Copyright 2024 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handlers + +import ( + "errors" + "testing" + + watermill "github.com/ThreeDotsLabs/watermill/message" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + df "github.com/stacklok/minder/database/mock/fixtures" + "github.com/stacklok/minder/internal/db" + "github.com/stacklok/minder/internal/engine/entities" + "github.com/stacklok/minder/internal/entities/handlers/message" + "github.com/stacklok/minder/internal/entities/models" + "github.com/stacklok/minder/internal/entities/properties" + "github.com/stacklok/minder/internal/entities/properties/service" + "github.com/stacklok/minder/internal/entities/properties/service/mock/fixtures" + "github.com/stacklok/minder/internal/events" + stubeventer "github.com/stacklok/minder/internal/events/stubs" + mockgithub "github.com/stacklok/minder/internal/providers/github/mock" + ghprops "github.com/stacklok/minder/internal/providers/github/properties" + "github.com/stacklok/minder/internal/providers/manager" + mock_manager "github.com/stacklok/minder/internal/providers/manager/mock" + provManFixtures "github.com/stacklok/minder/internal/providers/manager/mock/fixtures" + minderv1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" + provifv1 "github.com/stacklok/minder/pkg/providers/v1" +) + +var ( + projectID = uuid.New() + providerID = uuid.New() + repoID = uuid.New() + pullRequestID = uuid.New() + + repoName = "testorg/testrepo" + pullName = "testorg/testrepo/789" + + repoEwp = &models.EntityWithProperties{ + Entity: models.EntityInstance{ + ID: repoID, + Type: minderv1.Entity_ENTITY_REPOSITORIES, + Name: repoName, + ProviderID: providerID, + ProjectID: projectID, + }, + } + repoPropMap = map[string]any{ + properties.PropertyName: repoName, + ghprops.RepoPropertyName: "testrepo", + ghprops.RepoPropertyOwner: "testorg", + ghprops.RepoPropertyId: int64(123), + properties.RepoPropertyIsPrivate: false, + properties.RepoPropertyIsFork: false, + } + + pullRequestEwp = &models.EntityWithProperties{ + Entity: models.EntityInstance{ + ID: pullRequestID, + Type: minderv1.Entity_ENTITY_PULL_REQUESTS, + Name: pullName, + ProviderID: providerID, + ProjectID: projectID, + }, + } + pullRequestPropMap = map[string]any{ + properties.PropertyName: pullName, + ghprops.PullPropertyNumber: int64(789), + } + + githubHint = service.ByUpstreamHint{ + ProviderImplements: db.NullProviderType{ + ProviderType: db.ProviderTypeGithub, + Valid: true, + }, + } +) + +type ( + providerMock = *mockgithub.MockGitHub + providerMockBuilder = func(controller *gomock.Controller) providerMock +) + +func newProviderMock(opts ...func(providerMock)) providerMockBuilder { + return func(ctrl *gomock.Controller) providerMock { + mock := mockgithub.NewMockGitHub(ctrl) + for _, opt := range opts { + opt(mock) + } + return mock + } +} + +func withSuccessfulGetEntityName(name string) func(providerMock) { + return func(mock providerMock) { + mock.EXPECT(). + GetEntityName(gomock.Any(), gomock.Any()). + Return(name, nil) + } +} + +func buildEwp(t *testing.T, ewp *models.EntityWithProperties, propMap map[string]any) *models.EntityWithProperties { + t.Helper() + + entProps, err := properties.NewProperties(propMap) + require.NoError(t, err) + ewp.Properties = entProps + + return ewp +} + +func checkRepoMessage(t *testing.T, msg *watermill.Message) { + t.Helper() + + eiw, err := entities.ParseEntityEvent(msg) + require.NoError(t, err) + require.NotNil(t, eiw) + + pbrepo, ok := eiw.Entity.(*minderv1.Repository) + require.True(t, ok) + assert.Equal(t, repoPropMap[ghprops.RepoPropertyName].(string), pbrepo.Name) + assert.Equal(t, repoPropMap[ghprops.RepoPropertyOwner].(string), pbrepo.Owner) + assert.Equal(t, repoPropMap[ghprops.RepoPropertyId].(int64), pbrepo.RepoId) + assert.Equal(t, repoPropMap[properties.RepoPropertyIsPrivate].(bool), pbrepo.IsPrivate) + assert.Equal(t, repoPropMap[properties.RepoPropertyIsFork].(bool), pbrepo.IsFork) +} + +func checkPullRequestMessage(t *testing.T, msg *watermill.Message) { + t.Helper() + + eiw, err := entities.ParseEntityEvent(msg) + require.NoError(t, err) + require.NotNil(t, eiw) + + pbpr, ok := eiw.Entity.(*minderv1.PullRequest) + require.True(t, ok) + assert.Equal(t, pullRequestPropMap[ghprops.PullPropertyNumber].(int64), pbpr.Number) +} + +type handlerBuilder func( + evt events.Publisher, + store db.Store, + propSvc service.PropertiesService, + provMgr manager.ProviderManager, +) events.Consumer + +func refreshEntityHandlerBuilder( + evt events.Publisher, + store db.Store, + propSvc service.PropertiesService, + provMgr manager.ProviderManager, +) events.Consumer { + return NewRefreshEntityAndEvaluateHandler(evt, store, propSvc, provMgr) +} + +func addOriginatingEntityHandlerBuilder( + evt events.Publisher, + store db.Store, + propSvc service.PropertiesService, + provMgr manager.ProviderManager, +) events.Consumer { + return NewAddOriginatingEntityHandler(evt, store, propSvc, provMgr) +} + +func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + lookupPropMap map[string]any + lookupType minderv1.Entity + ownerPropMap map[string]any + ownerType minderv1.Entity + providerHint string + setupPropSvcMocks func() fixtures.MockPropertyServiceBuilder + mockStoreFunc df.MockStoreBuilder + providerManagerSetup func(prov provifv1.Provider) provManFixtures.ProviderManagerMockBuilder + providerSetup providerMockBuilder + expectedPublish bool + topic string + checkWmMsg func(t *testing.T, msg *watermill.Message) + handlerBuilderFn handlerBuilder + }{ + { + name: "NewRefreshEntityAndEvaluateHandler: successful refresh and publish of a repo", + handlerBuilderFn: refreshEntityHandlerBuilder, + lookupPropMap: map[string]any{ + properties.PropertyUpstreamID: "123", + }, + lookupType: minderv1.Entity_ENTITY_REPOSITORIES, + providerHint: "github", + setupPropSvcMocks: func() fixtures.MockPropertyServiceBuilder { + ewp := buildEwp(t, repoEwp, repoPropMap) + protoEnt, err := ghprops.RepoV1FromProperties(ewp.Properties) + require.NoError(t, err) + + return fixtures.NewMockPropertiesService( + fixtures.WithSuccessfulEntityByUpstreamHint(ewp, githubHint), + fixtures.WithSuccessfulRetrieveAllPropertiesForEntity(), + fixtures.WithSuccessfulEntityWithPropertiesAsProto(protoEnt), + ) + }, + mockStoreFunc: df.NewMockStore( + df.WithTransaction(), + ), + expectedPublish: true, + topic: events.TopicQueueEntityEvaluate, + checkWmMsg: checkRepoMessage, + }, + { + name: "NewRefreshEntityAndEvaluateHandler: Failure to get an entity doesn't publish", + handlerBuilderFn: refreshEntityHandlerBuilder, + lookupType: minderv1.Entity_ENTITY_REPOSITORIES, + setupPropSvcMocks: func() fixtures.MockPropertyServiceBuilder { + return fixtures.NewMockPropertiesService( + fixtures.WithFailedEntityByUpstreamHint(service.ErrEntityNotFound), + ) + }, + mockStoreFunc: df.NewMockStore( + df.WithRollbackTransaction(), + ), + expectedPublish: false, + }, + { + name: "NewRefreshEntityAndEvaluateHandler: Failure to retrieve all properties doesn't publish", + handlerBuilderFn: refreshEntityHandlerBuilder, + lookupType: minderv1.Entity_ENTITY_REPOSITORIES, + providerHint: "github", + setupPropSvcMocks: func() fixtures.MockPropertyServiceBuilder { + return fixtures.NewMockPropertiesService( + fixtures.WithSuccessfulEntityByUpstreamHint(repoEwp, githubHint), + fixtures.WithFailedRetrieveAllPropertiesForEntity(service.ErrEntityNotFound), + ) + }, + mockStoreFunc: df.NewMockStore( + df.WithRollbackTransaction(), + ), + expectedPublish: false, + }, + { + name: "NewRefreshEntityAndEvaluateHandler: Failure to convert entity to proto doesn't publish", + handlerBuilderFn: refreshEntityHandlerBuilder, + providerHint: "github", + lookupType: minderv1.Entity_ENTITY_REPOSITORIES, + setupPropSvcMocks: func() fixtures.MockPropertyServiceBuilder { + return fixtures.NewMockPropertiesService( + fixtures.WithSuccessfulEntityByUpstreamHint(repoEwp, githubHint), + fixtures.WithSuccessfulRetrieveAllPropertiesForEntity(), + fixtures.WithFailedEntityWithPropertiesAsProto(errors.New("fart")), + ) + }, + mockStoreFunc: df.NewMockStore( + df.WithTransaction(), + ), + expectedPublish: false, + }, + { + name: "NewAddOriginatingEntityHandler: Adding a pull request originating entity publishes", + handlerBuilderFn: addOriginatingEntityHandlerBuilder, + lookupPropMap: map[string]any{ + properties.PropertyUpstreamID: "789", + ghprops.PullPropertyNumber: int64(789), + }, + lookupType: minderv1.Entity_ENTITY_PULL_REQUESTS, + ownerPropMap: map[string]any{ + properties.PropertyUpstreamID: "123", + }, + ownerType: minderv1.Entity_ENTITY_REPOSITORIES, + providerHint: "github", + setupPropSvcMocks: func() fixtures.MockPropertyServiceBuilder { + pullEwp := buildEwp(t, pullRequestEwp, pullRequestPropMap) + pullProtoEnt, err := ghprops.PullRequestV1FromProperties(pullEwp.Properties) + require.NoError(t, err) + + repoPropsEwp := buildEwp(t, repoEwp, pullRequestPropMap) + + return fixtures.NewMockPropertiesService( + fixtures.WithSuccessfulEntityByUpstreamHint(repoPropsEwp, githubHint), + fixtures.WithSuccessfulRetrieveAllProperties( + projectID, + providerID, + minderv1.Entity_ENTITY_PULL_REQUESTS, + pullEwp.Properties, + ), + fixtures.WithSuccessfulEntityWithPropertiesAsProto(pullProtoEnt), + ) + }, + mockStoreFunc: df.NewMockStore( + df.WithTransaction(), + df.WithSuccessfulUpsertPullRequestWithParams( + db.PullRequest{ID: pullRequestID}, + db.EntityInstance{ + ID: uuid.UUID{}, + EntityType: db.EntitiesPullRequest, + Name: "", + ProjectID: projectID, + ProviderID: providerID, + OriginatedFrom: uuid.NullUUID{ + UUID: repoID, + Valid: true, + }, + }, + db.UpsertPullRequestParams{ + PrNumber: 789, + RepositoryID: repoID, + }, + db.CreateOrEnsureEntityByIDParams{ + ID: pullRequestID, + EntityType: db.EntitiesPullRequest, + Name: pullName, + ProjectID: projectID, + ProviderID: providerID, + OriginatedFrom: uuid.NullUUID{ + UUID: repoID, + Valid: true, + }, + }, + ), + df.WithSuccessfullGetEntityByID( + repoID, + db.EntityInstance{ + ID: repoID, + EntityType: db.EntitiesRepository, + }), + ), + providerSetup: newProviderMock(withSuccessfulGetEntityName(pullName)), + providerManagerSetup: func(prov provifv1.Provider) provManFixtures.ProviderManagerMockBuilder { + return provManFixtures.NewProviderManagerMock( + provManFixtures.WithSuccessfulInstantiateFromID(prov), + ) + }, + expectedPublish: true, + topic: events.TopicQueueEntityEvaluate, + checkWmMsg: checkPullRequestMessage, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + getByProps, err := properties.NewProperties(tt.lookupPropMap) + require.NoError(t, err) + + entityMsg := message.NewEntityRefreshAndDoMessage(). + WithEntity(tt.lookupType, getByProps). + WithProviderImplementsHint(tt.providerHint) + + if tt.ownerPropMap != nil { + ownerProps, err := properties.NewProperties(tt.ownerPropMap) + require.NoError(t, err) + entityMsg = entityMsg.WithOwner(tt.ownerType, ownerProps) + } + + handlerMsg := watermill.NewMessage(uuid.New().String(), nil) + err = entityMsg.ToMessage(handlerMsg) + require.NoError(t, err) + + mockPropSvc := tt.setupPropSvcMocks()(ctrl) + mockStore := tt.mockStoreFunc(ctrl) + stubEventer := &stubeventer.StubEventer{} + + var prov provifv1.Provider + if tt.providerSetup != nil { + prov = tt.providerSetup(ctrl) + } + + var provMgr manager.ProviderManager + if tt.providerManagerSetup != nil { + provMgr = tt.providerManagerSetup(prov)(ctrl) + } else { + provMgr = mock_manager.NewMockProviderManager(ctrl) + } + + handler := tt.handlerBuilderFn(stubEventer, mockStore, mockPropSvc, provMgr) + refreshHandlerStruct, ok := handler.(*handleEntityAndDoBase) + require.True(t, ok) + err = refreshHandlerStruct.handleRefreshEntityAndDo(handlerMsg) + assert.NoError(t, err) + + if !tt.expectedPublish { + assert.Equal(t, 0, len(stubEventer.Sent), "Expected no publish calls") + return + } + + assert.Equal(t, 1, len(stubEventer.Topics), "Expected one topic") + assert.Equal(t, tt.topic, stubEventer.Topics[0], "Expected topic to be %s", tt.topic) + assert.Equal(t, 1, len(stubEventer.Sent), "Expected one publish call") + tt.checkWmMsg(t, stubEventer.Sent[0]) + }) + } +} diff --git a/internal/entities/handlers/message/message.go b/internal/entities/handlers/message/message.go new file mode 100644 index 0000000000..4f2fa7b391 --- /dev/null +++ b/internal/entities/handlers/message/message.go @@ -0,0 +1,98 @@ +// +// Copyright 2024 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package message contains the message creation strategies +package message + +import ( + "encoding/json" + "fmt" + + "github.com/ThreeDotsLabs/watermill/message" + + "github.com/stacklok/minder/internal/entities/properties" + v1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" +) + +// TypedProps is a struct that contains the type of entity and its properties. +// it is used for either the entity or the owner entity. +type TypedProps struct { + Type v1.Entity `json:"type"` + GetByProps map[string]any `json:"get_by_props"` +} + +// EntityHint is a hint that is used to help the entity handler find the entity. +type EntityHint struct { + ProviderImplementsHint string `json:"provider_implements_hint"` +} + +// HandleEntityAndDoMessage is a message that is sent to the entity handler to refresh an entity and perform an action. +type HandleEntityAndDoMessage struct { + Entity TypedProps `json:"entity"` + Owner TypedProps `json:"owner"` + Hint EntityHint `json:"hint"` +} + +// NewEntityRefreshAndDoMessage creates a new HandleEntityAndDoMessage struct. +func NewEntityRefreshAndDoMessage() *HandleEntityAndDoMessage { + return &HandleEntityAndDoMessage{} +} + +// WithEntity sets the entity and its properties. +func (e *HandleEntityAndDoMessage) WithEntity(entType v1.Entity, getByProps *properties.Properties) *HandleEntityAndDoMessage { + e.Entity = TypedProps{ + Type: entType, + GetByProps: getByProps.ToProtoStruct().AsMap(), + } + return e +} + +// WithOwner sets the owner entity and its properties. +func (e *HandleEntityAndDoMessage) WithOwner(ownerType v1.Entity, ownerProps *properties.Properties) *HandleEntityAndDoMessage { + e.Owner = TypedProps{ + Type: ownerType, + GetByProps: ownerProps.ToProtoStruct().AsMap(), + } + return e +} + +// WithProviderImplementsHint sets the provider hint for the entity that will be used when looking up the entity. +func (e *HandleEntityAndDoMessage) WithProviderImplementsHint(providerHint string) *HandleEntityAndDoMessage { + e.Hint.ProviderImplementsHint = providerHint + return e +} + +// ToEntityRefreshAndDo converts a Watermill message to a HandleEntityAndDoMessage struct. +func ToEntityRefreshAndDo(msg *message.Message) (*HandleEntityAndDoMessage, error) { + entMsg := &HandleEntityAndDoMessage{} + + err := json.Unmarshal(msg.Payload, entMsg) + if err != nil { + return nil, fmt.Errorf("error unmarshalling entity: %w", err) + } + + return entMsg, nil +} + +// ToMessage converts the HandleEntityAndDoMessage struct to a Watermill message. +func (e *HandleEntityAndDoMessage) ToMessage(msg *message.Message) error { + payloadBytes, err := json.Marshal(e) + if err != nil { + return fmt.Errorf("error marshalling entity: %w", err) + } + + msg.Payload = payloadBytes + return nil +} diff --git a/internal/entities/handlers/message/message_test.go b/internal/entities/handlers/message/message_test.go new file mode 100644 index 0000000000..717c6490d2 --- /dev/null +++ b/internal/entities/handlers/message/message_test.go @@ -0,0 +1,109 @@ +// +// Copyright 2024 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package message + +import ( + "testing" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/stacklok/minder/internal/entities/properties" + v1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" +) + +func TestEntityRefreshAndDoMessageRoundTrip(t *testing.T) { + t.Parallel() + + scenarios := []struct { + name string + props map[string]any + entType v1.Entity + ownerProps map[string]any + ownerType v1.Entity + providerHint string + }{ + { + name: "Valid repository entity", + props: map[string]any{ + "id": "123", + "name": "test-repo", + }, + entType: v1.Entity_ENTITY_REPOSITORIES, + providerHint: "github", + }, + { + name: "Valid artifact entity", + props: map[string]any{ + "id": "456", + "version": "1.0.0", + }, + entType: v1.Entity_ENTITY_ARTIFACTS, + ownerProps: map[string]any{ + "id": "123", + }, + ownerType: v1.Entity_ENTITY_REPOSITORIES, + providerHint: "docker", + }, + { + name: "Valid pull request entity", + props: map[string]any{ + "id": "789", + }, + entType: v1.Entity_ENTITY_PULL_REQUESTS, + ownerProps: map[string]any{ + "id": "123", + }, + ownerType: v1.Entity_ENTITY_REPOSITORIES, + providerHint: "github", + }, + } + + for _, sc := range scenarios { + t.Run(sc.name, func(t *testing.T) { + t.Parallel() + + props, err := properties.NewProperties(sc.props) + require.NoError(t, err) + + original := NewEntityRefreshAndDoMessage(). + WithEntity(sc.entType, props). + WithProviderImplementsHint(sc.providerHint) + + if sc.ownerProps != nil { + ownerProps, err := properties.NewProperties(sc.ownerProps) + require.NoError(t, err) + original.WithOwner(sc.ownerType, ownerProps) + } + + handlerMsg := message.NewMessage(uuid.New().String(), nil) + err = original.ToMessage(handlerMsg) + require.NoError(t, err) + + roundTrip, err := ToEntityRefreshAndDo(handlerMsg) + assert.NoError(t, err) + assert.Equal(t, original.Entity.GetByProps, roundTrip.Entity.GetByProps) + assert.Equal(t, original.Entity.Type, roundTrip.Entity.Type) + assert.Equal(t, original.Hint.ProviderImplementsHint, roundTrip.Hint.ProviderImplementsHint) + if original.Owner.Type != v1.Entity_ENTITY_UNSPECIFIED { + assert.Equal(t, original.Owner.GetByProps, roundTrip.Owner.GetByProps) + assert.Equal(t, original.Owner.Type, roundTrip.Owner.Type) + } + }) + } +} diff --git a/internal/entities/handlers/strategies/entity/add_originating_entity.go b/internal/entities/handlers/strategies/entity/add_originating_entity.go new file mode 100644 index 0000000000..d9239ab572 --- /dev/null +++ b/internal/entities/handlers/strategies/entity/add_originating_entity.go @@ -0,0 +1,173 @@ +// Copyright 2024 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package entity contains the entity creation strategies +package entity + +import ( + "context" + "fmt" + + "github.com/google/uuid" + + "github.com/stacklok/minder/internal/db" + "github.com/stacklok/minder/internal/engine/entities" + "github.com/stacklok/minder/internal/entities/handlers/message" + "github.com/stacklok/minder/internal/entities/handlers/strategies" + "github.com/stacklok/minder/internal/entities/models" + "github.com/stacklok/minder/internal/entities/properties" + propertyService "github.com/stacklok/minder/internal/entities/properties/service" + ghprop "github.com/stacklok/minder/internal/providers/github/properties" + "github.com/stacklok/minder/internal/providers/manager" + minderv1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" +) + +type addOriginatingEntityStrategy struct { + propSvc propertyService.PropertiesService + provMgr manager.ProviderManager + store db.Store +} + +// NewAddOriginatingEntityStrategy creates a new addOriginatingEntityStrategy. +func NewAddOriginatingEntityStrategy( + propSvc propertyService.PropertiesService, + provMgr manager.ProviderManager, + store db.Store, +) strategies.GetEntityStrategy { + return &addOriginatingEntityStrategy{ + propSvc: propSvc, + provMgr: provMgr, + store: store, + } +} + +// GetEntity adds an originating entity. +func (a *addOriginatingEntityStrategy) GetEntity( + ctx context.Context, entMsg *message.HandleEntityAndDoMessage, +) (*models.EntityWithProperties, error) { + childProps, err := properties.NewProperties(entMsg.Entity.GetByProps) + if err != nil { + return nil, fmt.Errorf("error creating properties: %w", err) + } + + // store the originating entity + childEwp, err := db.WithTransaction(a.store, func(t db.ExtendQuerier) (*models.EntityWithProperties, error) { + parentEwp, err := getEntityInner( + ctx, + entMsg.Owner.Type, entMsg.Owner.GetByProps, entMsg.Hint, + a.propSvc, + propertyService.CallBuilder().WithStoreOrTransaction(t)) + if err != nil { + return nil, fmt.Errorf("error getting parent entity: %w", err) + } + + legacyId, err := a.upsertLegacyEntity(ctx, entMsg.Entity.Type, parentEwp, childProps, t) + if err != nil { + return nil, fmt.Errorf("error upserting legacy entity: %w", err) + } + + prov, err := a.provMgr.InstantiateFromID(ctx, parentEwp.Entity.ProviderID) + if err != nil { + return nil, fmt.Errorf("error getting provider: %w", err) + } + + childEntName, err := prov.GetEntityName(entMsg.Entity.Type, childProps) + if err != nil { + return nil, fmt.Errorf("error getting child entity name: %w", err) + } + + childEnt, err := t.CreateOrEnsureEntityByID(ctx, db.CreateOrEnsureEntityByIDParams{ + ID: legacyId, + EntityType: entities.EntityTypeToDB(entMsg.Entity.Type), + Name: childEntName, + ProjectID: parentEwp.Entity.ProjectID, + ProviderID: parentEwp.Entity.ProviderID, + OriginatedFrom: uuid.NullUUID{ + UUID: parentEwp.Entity.ID, + Valid: true, + }, + }) + if err != nil { + return nil, err + } + + upstreamProps, err := a.propSvc.RetrieveAllProperties(ctx, prov, + parentEwp.Entity.ProjectID, parentEwp.Entity.ProviderID, + childProps, entMsg.Entity.Type, + propertyService.ReadBuilder().WithStoreOrTransaction(t), + ) + if err != nil { + return nil, fmt.Errorf("error retrieving properties: %w", err) + } + + return models.NewEntityWithProperties(childEnt, upstreamProps), nil + + }) + + if err != nil { + return nil, fmt.Errorf("error storing originating entity: %w", err) + } + return childEwp, nil +} + +// GetName returns the name of the strategy. Used for debugging +func (_ *addOriginatingEntityStrategy) GetName() string { + return "addOriginatingEntityStrategy" +} + +func (_ *addOriginatingEntityStrategy) upsertLegacyEntity( + ctx context.Context, + entType minderv1.Entity, + parentEwp *models.EntityWithProperties, childProps *properties.Properties, + t db.ExtendQuerier, +) (uuid.UUID, error) { + var legacyId uuid.UUID + + switch entType { // nolint:exhaustive + case minderv1.Entity_ENTITY_PULL_REQUESTS: + dbPr, err := t.UpsertPullRequest(ctx, db.UpsertPullRequestParams{ + RepositoryID: parentEwp.Entity.ID, + PrNumber: childProps.GetProperty(ghprop.PullPropertyNumber).GetInt64(), + }) + if err != nil { + return uuid.Nil, fmt.Errorf("error upserting pull request: %w", err) + } + legacyId = dbPr.ID + case minderv1.Entity_ENTITY_ARTIFACTS: + // TODO: remove this once we migrate artifacts to entities. We should get rid of the provider name. + dbProv, err := t.GetProviderByID(ctx, parentEwp.Entity.ProviderID) + if err != nil { + return uuid.Nil, fmt.Errorf("error getting provider: %w", err) + } + + dbArtifact, err := t.UpsertArtifact(ctx, db.UpsertArtifactParams{ + RepositoryID: uuid.NullUUID{ + UUID: parentEwp.Entity.ID, + Valid: true, + }, + ArtifactName: childProps.GetProperty(ghprop.ArtifactPropertyName).GetString(), + ArtifactType: childProps.GetProperty(ghprop.ArtifactPropertyType).GetString(), + ArtifactVisibility: childProps.GetProperty(ghprop.ArtifactPropertyVisibility).GetString(), + ProjectID: parentEwp.Entity.ProjectID, + ProviderID: parentEwp.Entity.ProviderID, + ProviderName: dbProv.Name, + }) + if err != nil { + return uuid.Nil, fmt.Errorf("error upserting artifact: %w", err) + } + legacyId = dbArtifact.ID + } + + return legacyId, nil +} diff --git a/internal/entities/handlers/strategies/entity/common.go b/internal/entities/handlers/strategies/entity/common.go new file mode 100644 index 0000000000..b9999acd68 --- /dev/null +++ b/internal/entities/handlers/strategies/entity/common.go @@ -0,0 +1,62 @@ +// Copyright 2024 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package entity contains the entity creation strategies +package entity + +import ( + "context" + "fmt" + + "github.com/stacklok/minder/internal/entities/handlers/message" + "github.com/stacklok/minder/internal/entities/models" + "github.com/stacklok/minder/internal/entities/properties" + propertyService "github.com/stacklok/minder/internal/entities/properties/service" + minderv1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" +) + +func getEntityInner( + ctx context.Context, + entType minderv1.Entity, + entPropMap map[string]any, + hint message.EntityHint, + propSvc propertyService.PropertiesService, + getEntityOpts *propertyService.CallOptions, +) (*models.EntityWithProperties, error) { + svcHint := propertyService.ByUpstreamHint{} + if hint.ProviderImplementsHint != "" { + svcHint.ProviderImplements.Valid = true + if err := svcHint.ProviderImplements.ProviderType.Scan(hint.ProviderImplementsHint); err != nil { + return nil, fmt.Errorf("error scanning provider type: %w", err) + } + } + + lookupProperties, err := properties.NewProperties(entPropMap) + if err != nil { + return nil, fmt.Errorf("error creating properties: %w", err) + } + + ewp, err := propSvc.EntityWithPropertiesByUpstreamHint( + ctx, + entType, + lookupProperties, + svcHint, + getEntityOpts, + ) + if err != nil { + return nil, fmt.Errorf("error searching entity by ID: %w", err) + } + + return ewp, nil +} diff --git a/internal/entities/handlers/strategies/entity/del_originating_entity.go b/internal/entities/handlers/strategies/entity/del_originating_entity.go new file mode 100644 index 0000000000..a6e7e28d17 --- /dev/null +++ b/internal/entities/handlers/strategies/entity/del_originating_entity.go @@ -0,0 +1,140 @@ +// Copyright 2024 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package entity contains the entity creation strategies +package entity + +import ( + "context" + "database/sql" + "errors" + "fmt" + + "github.com/stacklok/minder/internal/db" + "github.com/stacklok/minder/internal/entities/handlers/message" + "github.com/stacklok/minder/internal/entities/handlers/strategies" + "github.com/stacklok/minder/internal/entities/models" + "github.com/stacklok/minder/internal/entities/properties" + propertyService "github.com/stacklok/minder/internal/entities/properties/service" + ghprop "github.com/stacklok/minder/internal/providers/github/properties" + "github.com/stacklok/minder/internal/providers/manager" + minderv1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" +) + +type delOriginatingEntityStrategy struct { + propSvc propertyService.PropertiesService + provMgr manager.ProviderManager + store db.Store +} + +// NewDelOriginatingEntityStrategy creates a new delOriginatingEntityStrategy. +func NewDelOriginatingEntityStrategy( + propSvc propertyService.PropertiesService, + provMgr manager.ProviderManager, + store db.Store, +) strategies.GetEntityStrategy { + return &delOriginatingEntityStrategy{ + propSvc: propSvc, + provMgr: provMgr, + store: store, + } +} + +// GetEntity deletes the originating entity. +func (d *delOriginatingEntityStrategy) GetEntity( + ctx context.Context, entMsg *message.HandleEntityAndDoMessage, +) (*models.EntityWithProperties, error) { + childProps, err := properties.NewProperties(entMsg.Entity.GetByProps) + if err != nil { + return nil, fmt.Errorf("error creating properties: %w", err) + } + + tx, err := d.store.BeginTransaction() + if err != nil { + return nil, fmt.Errorf("error starting transaction: %w", err) + } + defer func() { + _ = d.store.Rollback(tx) + }() + + txq := d.store.GetQuerierWithTransaction(tx) + if txq == nil { + return nil, fmt.Errorf("error getting querier") + } + + parentEwp, err := getEntityInner( + ctx, + entMsg.Owner.Type, entMsg.Owner.GetByProps, entMsg.Hint, + d.propSvc, + propertyService.CallBuilder().WithStoreOrTransaction(txq)) + if err != nil { + return nil, fmt.Errorf("error getting parent entity: %w", err) + } + + prov, err := d.provMgr.InstantiateFromID(ctx, parentEwp.Entity.ProviderID) + if err != nil { + return nil, fmt.Errorf("error getting provider: %w", err) + } + + childEntName, err := prov.GetEntityName(entMsg.Entity.Type, childProps) + if err != nil { + return nil, fmt.Errorf("error getting child entity name: %w", err) + } + + err = txq.DeleteEntityByName(ctx, db.DeleteEntityByNameParams{ + Name: childEntName, + ProjectID: parentEwp.Entity.ProjectID, + }) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, err + } + + err = d.deleteLegacyEntity(ctx, entMsg.Entity.Type, parentEwp, childProps, txq) + if err != nil { + return nil, fmt.Errorf("error deleting legacy entity: %w", err) + } + + if err := d.store.Commit(tx); err != nil { + return nil, fmt.Errorf("error committing transaction: %w", err) + } + + return nil, nil +} + +func (_ *delOriginatingEntityStrategy) deleteLegacyEntity( + ctx context.Context, + entType minderv1.Entity, + parentEwp *models.EntityWithProperties, + childProps *properties.Properties, + t db.ExtendQuerier, +) error { + if entType == minderv1.Entity_ENTITY_PULL_REQUESTS { + err := t.DeletePullRequest(ctx, db.DeletePullRequestParams{ + RepositoryID: parentEwp.Entity.ID, + PrNumber: childProps.GetProperty(ghprop.PullPropertyNumber).GetInt64(), + }) + if err != nil { + return fmt.Errorf("error deleting pull request: %w", err) + } + } else { + return fmt.Errorf("unsupported entity type: %v", entType) + } + + return nil +} + +// GetName returns the name of the strategy. Used for debugging +func (_ *delOriginatingEntityStrategy) GetName() string { + return "delOriginatingEntityStrategy" +} diff --git a/internal/entities/handlers/strategies/entity/get_by_upstream_props.go b/internal/entities/handlers/strategies/entity/get_by_upstream_props.go new file mode 100644 index 0000000000..a3ef87c493 --- /dev/null +++ b/internal/entities/handlers/strategies/entity/get_by_upstream_props.go @@ -0,0 +1,53 @@ +// Copyright 2024 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package entity contains the entity creation strategies +package entity + +import ( + "context" + + "github.com/stacklok/minder/internal/entities/handlers/message" + "github.com/stacklok/minder/internal/entities/handlers/strategies" + "github.com/stacklok/minder/internal/entities/models" + propertyService "github.com/stacklok/minder/internal/entities/properties/service" +) + +type getEntityByUpstreamIDStrategy struct { + propSvc propertyService.PropertiesService +} + +// NewGetEntityByUpstreamIDStrategy creates a new getEntityByUpstreamIDStrategy. +func NewGetEntityByUpstreamIDStrategy( + propSvc propertyService.PropertiesService, +) strategies.GetEntityStrategy { + return &getEntityByUpstreamIDStrategy{ + propSvc: propSvc, + } +} + +// GetEntity gets an entity by its upstream ID. +func (g *getEntityByUpstreamIDStrategy) GetEntity( + ctx context.Context, entMsg *message.HandleEntityAndDoMessage, +) (*models.EntityWithProperties, error) { + return getEntityInner(ctx, + entMsg.Entity.Type, entMsg.Entity.GetByProps, entMsg.Hint, + g.propSvc, + propertyService.CallBuilder()) +} + +// GetName returns the name of the strategy. Used for debugging +func (_ *getEntityByUpstreamIDStrategy) GetName() string { + return "getEntityByUpstreamID" +} diff --git a/internal/entities/handlers/strategies/entity/refresh_by_upstream_props.go b/internal/entities/handlers/strategies/entity/refresh_by_upstream_props.go new file mode 100644 index 0000000000..80dd10a29c --- /dev/null +++ b/internal/entities/handlers/strategies/entity/refresh_by_upstream_props.go @@ -0,0 +1,78 @@ +// Copyright 2024 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package entity contains the entity creation strategies +package entity + +import ( + "context" + "fmt" + + "github.com/stacklok/minder/internal/db" + "github.com/stacklok/minder/internal/entities/handlers/message" + "github.com/stacklok/minder/internal/entities/handlers/strategies" + "github.com/stacklok/minder/internal/entities/models" + propertyService "github.com/stacklok/minder/internal/entities/properties/service" + "github.com/stacklok/minder/internal/providers/manager" +) + +type refreshEntityByUpstreamIDStrategy struct { + propSvc propertyService.PropertiesService + provMgr manager.ProviderManager + store db.Store +} + +// NewRefreshEntityByUpstreamPropsStrategy creates a new refreshEntityByUpstreamIDStrategy. +func NewRefreshEntityByUpstreamPropsStrategy( + propSvc propertyService.PropertiesService, + provMgr manager.ProviderManager, + store db.Store, +) strategies.GetEntityStrategy { + return &refreshEntityByUpstreamIDStrategy{ + propSvc: propSvc, + provMgr: provMgr, + store: store, + } +} + +// GetEntity refreshes an entity by its upstream ID. +func (r *refreshEntityByUpstreamIDStrategy) GetEntity( + ctx context.Context, entMsg *message.HandleEntityAndDoMessage, +) (*models.EntityWithProperties, error) { + getEnt, err := db.WithTransaction(r.store, func(t db.ExtendQuerier) (*models.EntityWithProperties, error) { + ewp, err := getEntityInner( + ctx, + entMsg.Entity.Type, entMsg.Entity.GetByProps, entMsg.Hint, + r.propSvc, propertyService.CallBuilder().WithStoreOrTransaction(t)) + if err != nil { + return nil, fmt.Errorf("error getting entity: %w", err) + } + + err = r.propSvc.RetrieveAllPropertiesForEntity(ctx, ewp, r.provMgr, propertyService.ReadBuilder()) + if err != nil { + return nil, fmt.Errorf("error fetching entity: %w", err) + } + return ewp, nil + }) + if err != nil { + return nil, fmt.Errorf("error refreshing entity: %w", err) + } + + return getEnt, nil +} + +// GetName returns the name of the strategy. Used for debugging +func (_ *refreshEntityByUpstreamIDStrategy) GetName() string { + return "refreshEntityByUpstreamIDStrategy" +} diff --git a/internal/entities/handlers/strategies/message/empty.go b/internal/entities/handlers/strategies/message/empty.go new file mode 100644 index 0000000000..a8128f41af --- /dev/null +++ b/internal/entities/handlers/strategies/message/empty.go @@ -0,0 +1,41 @@ +// +// Copyright 2024 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package message contains the message creation strategies +package message + +import ( + "context" + + "github.com/ThreeDotsLabs/watermill/message" + + "github.com/stacklok/minder/internal/entities/handlers/strategies" + "github.com/stacklok/minder/internal/entities/models" +) + +type createEmpty struct{} + +// NewCreateEmpty creates a new createEmpty strategy +func NewCreateEmpty() strategies.MessageCreateStrategy { + return &createEmpty{} +} + +func (_ *createEmpty) CreateMessage(_ context.Context, _ *models.EntityWithProperties) (*message.Message, error) { + return nil, nil +} + +func (_ *createEmpty) GetName() string { + return "empty" +} diff --git a/internal/entities/handlers/strategies/message/entity_info_wrapper.go b/internal/entities/handlers/strategies/message/entity_info_wrapper.go new file mode 100644 index 0000000000..cbbfe3abbb --- /dev/null +++ b/internal/entities/handlers/strategies/message/entity_info_wrapper.go @@ -0,0 +1,91 @@ +// +// Copyright 2024 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package message contains the message creation strategies +package message + +import ( + "context" + "fmt" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/google/uuid" + + "github.com/stacklok/minder/internal/db" + "github.com/stacklok/minder/internal/engine/entities" + "github.com/stacklok/minder/internal/entities/handlers/strategies" + "github.com/stacklok/minder/internal/entities/models" + propertyService "github.com/stacklok/minder/internal/entities/properties/service" + "github.com/stacklok/minder/internal/providers/manager" +) + +type toEntityInfoWrapper struct { + store db.Store + propSvc propertyService.PropertiesService + provMgr manager.ProviderManager +} + +// NewToEntityInfoWrapper creates a new toEntityInfoWrapper. +func NewToEntityInfoWrapper( + store db.Store, + propSvc propertyService.PropertiesService, + provMgr manager.ProviderManager, +) strategies.MessageCreateStrategy { + return &toEntityInfoWrapper{ + store: store, + propSvc: propSvc, + provMgr: provMgr, + } +} + +func (c *toEntityInfoWrapper) CreateMessage( + ctx context.Context, ewp *models.EntityWithProperties, +) (*message.Message, error) { + pbEnt, err := c.propSvc.EntityWithPropertiesAsProto(ctx, ewp, c.provMgr) + if err != nil { + return nil, fmt.Errorf("error converting entity to protobuf: %w", err) + } + + m := message.NewMessage(uuid.New().String(), nil) + + eiw := entities.NewEntityInfoWrapper(). + WithProjectID(ewp.Entity.ProjectID). + WithProviderID(ewp.Entity.ProviderID). + WithProtoMessage(ewp.Entity.Type, pbEnt). + WithID(ewp.Entity.Type, ewp.Entity.ID) + + // in case the entity originated from another entity, add that information as well. + // the property service does not provide this information (should it?) so we need to fetch it from the store. + // for now we could have hardcoded the entity type as everything originates from a repository, + // but this is more flexible. + if ewp.Entity.OriginatedFrom != uuid.Nil { + dbEnt, err := c.store.GetEntityByID(ctx, ewp.Entity.OriginatedFrom) + if err != nil { + return nil, fmt.Errorf("error getting originating entity: %w", err) + } + eiw.WithID(entities.EntityTypeFromDB(dbEnt.EntityType), dbEnt.ID) + } + + err = eiw.ToMessage(m) + if err != nil { + return nil, fmt.Errorf("error converting entity to message: %w", err) + } + + return m, nil +} + +func (_ *toEntityInfoWrapper) GetName() string { + return "toEntityInfoWrapper" +} diff --git a/internal/entities/handlers/strategies/message/minder_entity.go b/internal/entities/handlers/strategies/message/minder_entity.go new file mode 100644 index 0000000000..301fbd6222 --- /dev/null +++ b/internal/entities/handlers/strategies/message/minder_entity.go @@ -0,0 +1,57 @@ +// +// Copyright 2024 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package message contains the message creation strategies +package message + +import ( + "context" + "fmt" + + watermill "github.com/ThreeDotsLabs/watermill/message" + "github.com/google/uuid" + + "github.com/stacklok/minder/internal/entities/handlers/strategies" + "github.com/stacklok/minder/internal/entities/models" + "github.com/stacklok/minder/internal/reconcilers/messages" +) + +type toMinderEntityStrategy struct{} + +// NewToMinderEntity creates a new toMinderEntityStrategy. +func NewToMinderEntity() strategies.MessageCreateStrategy { + return &toMinderEntityStrategy{} +} + +func (_ *toMinderEntityStrategy) CreateMessage(_ context.Context, ewp *models.EntityWithProperties) (*watermill.Message, error) { + m := watermill.NewMessage(uuid.New().String(), nil) + + entEvent := messages.NewMinderEvent(). + WithProjectID(ewp.Entity.ProjectID). + WithProviderID(ewp.Entity.ProviderID). + WithEntityType(ewp.Entity.Type.String()). + WithEntityID(ewp.Entity.ID) + + err := entEvent.ToMessage(m) + if err != nil { + return nil, fmt.Errorf("error converting entity to message: %w", err) + } + + return m, nil +} + +func (_ *toMinderEntityStrategy) GetName() string { + return "toMinderv1Entity" +} diff --git a/internal/entities/handlers/strategies/strategies.go b/internal/entities/handlers/strategies/strategies.go new file mode 100644 index 0000000000..4acdb895cb --- /dev/null +++ b/internal/entities/handlers/strategies/strategies.go @@ -0,0 +1,41 @@ +// Copyright 2024 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package strategies contains the message creation strategies for entities and messages +package strategies + +import ( + "context" + + watermill "github.com/ThreeDotsLabs/watermill/message" + + "github.com/stacklok/minder/internal/entities/handlers/message" + "github.com/stacklok/minder/internal/entities/models" +) + +// MessageCreateStrategy is the interface for creating messages +type MessageCreateStrategy interface { + CreateMessage( + ctx context.Context, ewp *models.EntityWithProperties, + ) (*watermill.Message, error) + GetName() string +} + +// GetEntityStrategy is the interface for getting entities +type GetEntityStrategy interface { + GetEntity( + ctx context.Context, entMsg *message.HandleEntityAndDoMessage, + ) (*models.EntityWithProperties, error) + GetName() string +} diff --git a/internal/entities/models/models.go b/internal/entities/models/models.go index 8586074126..f13bc46e43 100644 --- a/internal/entities/models/models.go +++ b/internal/entities/models/models.go @@ -26,11 +26,12 @@ import ( // EntityInstance represents an entity instance type EntityInstance struct { - ID uuid.UUID - Type minderv1.Entity - Name string - ProviderID uuid.UUID - ProjectID uuid.UUID + ID uuid.UUID + Type minderv1.Entity + Name string + ProviderID uuid.UUID + ProjectID uuid.UUID + OriginatedFrom uuid.UUID } // EntityWithProperties represents an entity instance with properties @@ -41,13 +42,19 @@ type EntityWithProperties struct { // NewEntityWithProperties creates a new EntityWithProperties instance func NewEntityWithProperties(dbEntity db.EntityInstance, props *properties.Properties) *EntityWithProperties { + var originatedFrom uuid.UUID + if dbEntity.OriginatedFrom.Valid { + originatedFrom = dbEntity.OriginatedFrom.UUID + } + return &EntityWithProperties{ Entity: EntityInstance{ - ID: dbEntity.ID, - Type: entities.EntityTypeFromDB(dbEntity.EntityType), - Name: dbEntity.Name, - ProviderID: dbEntity.ProviderID, - ProjectID: dbEntity.ProjectID, + ID: dbEntity.ID, + Type: entities.EntityTypeFromDB(dbEntity.EntityType), + Name: dbEntity.Name, + ProviderID: dbEntity.ProviderID, + ProjectID: dbEntity.ProjectID, + OriginatedFrom: originatedFrom, }, Properties: props, } diff --git a/internal/entities/properties/service/mock/fixtures/fixtures.go b/internal/entities/properties/service/mock/fixtures/fixtures.go new file mode 100644 index 0000000000..6c33efa052 --- /dev/null +++ b/internal/entities/properties/service/mock/fixtures/fixtures.go @@ -0,0 +1,127 @@ +// Copyright 2024 Stacklok, Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance cf.With the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package fixtures contains code for creating RepositoryService +// fixtures and is used in various parts of the code. For testing use +// only. +// +//nolint:all +package fixtures + +import ( + "github.com/google/uuid" + "github.com/stacklok/minder/internal/entities/properties" + minder "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" + "go.uber.org/mock/gomock" + "google.golang.org/protobuf/reflect/protoreflect" + + "github.com/stacklok/minder/internal/entities/models" + "github.com/stacklok/minder/internal/entities/properties/service" + mockSvc "github.com/stacklok/minder/internal/entities/properties/service/mock" +) + +type ( + MockPropertyServiceBuilder = func(*gomock.Controller) *mockSvc.MockPropertiesService + MockPropertyServiceOption = func(*mockSvc.MockPropertiesService) +) + +func NewMockPropertiesService( + funcs ...MockPropertyServiceOption, +) MockPropertyServiceBuilder { + return func(ctrl *gomock.Controller) *mockSvc.MockPropertiesService { + mockPropSvc := mockSvc.NewMockPropertiesService(ctrl) + + for _, fn := range funcs { + fn(mockPropSvc) + } + + return mockPropSvc + } +} + +func WithSuccessfulEntityByUpstreamHint( + ewp *models.EntityWithProperties, + hint service.ByUpstreamHint, +) MockPropertyServiceOption { + return func(mockPropSvc *mockSvc.MockPropertiesService) { + mockPropSvc.EXPECT().EntityWithPropertiesByUpstreamHint(gomock.Any(), ewp.Entity.Type, gomock.Any(), hint, gomock.Any()). + Return(ewp, nil) + } +} + +func WithFailedEntityByUpstreamHint( + err error, +) MockPropertyServiceOption { + return func(mockPropSvc *mockSvc.MockPropertiesService) { + mockPropSvc.EXPECT(). + EntityWithPropertiesByUpstreamHint(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, err) + } +} + +func WithSuccessfulRetrieveAllPropertiesForEntity() MockPropertyServiceOption { + return func(mockPropSvc *mockSvc.MockPropertiesService) { + mockPropSvc.EXPECT(). + RetrieveAllPropertiesForEntity(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil) + } +} + +func WithFailedRetrieveAllPropertiesForEntity( + err error, +) MockPropertyServiceOption { + return func(mockPropSvc *mockSvc.MockPropertiesService) { + mockPropSvc.EXPECT(). + RetrieveAllPropertiesForEntity(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(err) + } +} + +func WithSuccessfulEntityWithPropertiesAsProto( + message protoreflect.ProtoMessage, +) MockPropertyServiceOption { + return func(mockPropSvc *mockSvc.MockPropertiesService) { + mockPropSvc.EXPECT(). + EntityWithPropertiesAsProto(gomock.Any(), gomock.Any(), gomock.Any()). + Return(message, nil) + } +} + +func WithFailedEntityWithPropertiesAsProto( + err error, +) MockPropertyServiceOption { + return func(mockPropSvc *mockSvc.MockPropertiesService) { + mockPropSvc.EXPECT(). + EntityWithPropertiesAsProto(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, err) + } +} + +func WithSuccessfulRetrieveAllProperties( + expProject uuid.UUID, + expProvider uuid.UUID, + expEntityType minder.Entity, + retProps *properties.Properties, +) MockPropertyServiceOption { + return func(mockPropSvc *mockSvc.MockPropertiesService) { + mockPropSvc.EXPECT(). + RetrieveAllProperties( + gomock.Any(), gomock.Any(), + expProject, expProvider, + gomock.Any(), + expEntityType, + gomock.Any()). + Return(retProps, nil) + } +} diff --git a/internal/events/constants.go b/internal/events/constants.go index 2ee1f1034c..d06e9d8af4 100644 --- a/internal/events/constants.go +++ b/internal/events/constants.go @@ -36,6 +36,14 @@ const ( ) const ( + // TopicQueueOriginatingEntityAdd adds an entity originating from another entity to the database + TopicQueueOriginatingEntityAdd = "originating.entity.add.event" + // TopicQueueOriginatingEntityDelete deletes an entity originating from another entity from the database + TopicQueueOriginatingEntityDelete = "originating.entity.delete.event" + // TopicQueueGetEntityAndDelete retrieves an entity from the database and schedules it for deletion + TopicQueueGetEntityAndDelete = "get.entity.delete.event" + // TopicQueueRefreshEntityAndEvaluate makes sure that entity properties are up-to-date and schedules an evaluation + TopicQueueRefreshEntityAndEvaluate = "refresh.entity.evaluate.event" // TopicQueueEntityEvaluate is the topic for entity evaluation events from webhooks TopicQueueEntityEvaluate = "execute.entity.event" // TopicQueueEntityFlush is the topic for flushing internal webhook events diff --git a/internal/events/stubs/eventer.go b/internal/events/stubs/eventer.go index d5691bb711..937c13b53c 100644 --- a/internal/events/stubs/eventer.go +++ b/internal/events/stubs/eventer.go @@ -17,6 +17,7 @@ package stubs import ( "context" + "slices" "github.com/ThreeDotsLabs/watermill/message" @@ -29,7 +30,8 @@ var _ events.Publisher = (*StubEventer)(nil) // StubEventer is an eventer that's useful for testing. type StubEventer struct { - Sent []*message.Message + Topics []string + Sent []*message.Message } // Close implements events.Interface. @@ -43,7 +45,10 @@ func (*StubEventer) ConsumeEvents(...events.Consumer) { } // Publish implements events.Interface. -func (s *StubEventer) Publish(_ string, messages ...*message.Message) error { +func (s *StubEventer) Publish(topic string, messages ...*message.Message) error { + if !slices.Contains(s.Topics, topic) { + s.Topics = append(s.Topics, topic) + } s.Sent = append(s.Sent, messages...) return nil } diff --git a/internal/service/service.go b/internal/service/service.go index 7ea9ed03ac..646824ea1f 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -37,6 +37,7 @@ import ( "github.com/stacklok/minder/internal/email/noop" "github.com/stacklok/minder/internal/engine" "github.com/stacklok/minder/internal/engine/selectors" + "github.com/stacklok/minder/internal/entities/handlers" propService "github.com/stacklok/minder/internal/entities/properties/service" "github.com/stacklok/minder/internal/events" "github.com/stacklok/minder/internal/flags" @@ -257,6 +258,10 @@ func AllInOneServerService( im := installations.NewInstallationManager(ghProviders) evt.ConsumeEvents(im) + // Register the entity refresh manager to handle entity refresh events + refresh := handlers.NewRefreshEntityAndEvaluateHandler(evt, store, propSvc, providerManager) + evt.ConsumeEvents(refresh) + // Register the email manager to handle email invitations var mailClient events.Consumer if cfg.Email.AWSSES.Region != "" && cfg.Email.AWSSES.Sender != "" {