Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TEP-0137] Restructure customrun event controller #6889

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/events/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ func main() {
}()

// start the events controller
sharedmain.Main(eventsControllerName, customrun.NewController())
sharedmain.Main(eventsControllerName,
customrun.NewController())
}

func handler(w http.ResponseWriter, r *http.Request) {
Expand Down
7 changes: 6 additions & 1 deletion docs/pipeline-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -11445,7 +11445,12 @@ this ResultsType.</p>
<h3 id="tekton.dev/v1beta1.RunObject">RunObject
</h3>
<div>
<p>RunObject is implemented by CustomRun and Run</p>
<p>RunObject is implemented by Run, CustomRun, TaskRun and PipelineRun</p>
</div>
<h3 id="tekton.dev/v1beta1.RunObjectWithRetries">RunObjectWithRetries
</h3>
<div>
<p>RunObject is implemented by Run and CustomRun</p>
</div>
<h3 id="tekton.dev/v1beta1.Sidecar">Sidecar
</h3>
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/config/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Store struct {
func NewStore(logger configmap.Logger, onAfterStore ...func(name string, value interface{})) *Store {
store := &Store{
UntypedStore: configmap.NewUntypedStore(
"defaults/features/artifacts",
"defaults/features/metrics/spire/events",
logger,
configmap.Constructors{
GetDefaultsConfigName(): NewDefaultsFromConfigMap,
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/pipeline/v1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ func (pr *PipelineRun) HasStarted() bool {
return pr.Status.StartTime != nil && !pr.Status.StartTime.IsZero()
}

// IsSuccessful returns true if the TaskRun's status indicates that it has succeeded.
func (tr *PipelineRun) IsSuccessful() bool {
return tr != nil && tr.Status.GetCondition(apis.ConditionSucceeded).IsTrue()
}

Comment on lines +86 to +90
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed so that PipelineRun may implement the RunObject interface

// IsCancelled returns true if the PipelineRun's spec status is set to Cancelled state
func (pr *PipelineRun) IsCancelled() bool {
return pr.Spec.Status == PipelineRunSpecStatusCancelled
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func (pr *PipelineRun) HasStarted() bool {
return pr.Status.StartTime != nil && !pr.Status.StartTime.IsZero()
}

// IsSuccessful returns true if the TaskRun's status indicates that it has succeeded.
func (tr *PipelineRun) IsSuccessful() bool {
return tr != nil && tr.Status.GetCondition(apis.ConditionSucceeded).IsTrue()
}

// IsCancelled returns true if the PipelineRun's spec status is set to Cancelled state
func (pr *PipelineRun) IsCancelled() bool {
return pr.Spec.Status == PipelineRunSpecStatusCancelled
Expand Down
7 changes: 6 additions & 1 deletion pkg/apis/pipeline/v1beta1/run_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"knative.dev/pkg/apis"
)

// RunObject is implemented by CustomRun and Run
// RunObject is implemented by Run, CustomRun, TaskRun and PipelineRun
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RunObject was introduced for remote resolution but it's not used anymore anywhere.
It is still very useful for the events controller though, except for the GetRetryCount, which I moved to a different interface.

type RunObject interface {
// Object requires GetObjectKind() and DeepCopyObject()
runtime.Object
Expand All @@ -38,6 +38,11 @@ type RunObject interface {
IsCancelled() bool
HasStarted() bool
IsDone() bool
}

// RunObject is implemented by Run and CustomRun
type RunObjectWithRetries interface {
RunObject

GetRetryCount() int
Comment on lines +41 to 47
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PipelineRuns do not have retries, so splitting GetRetryCount out to a separate interface

}
53 changes: 53 additions & 0 deletions pkg/reconciler/notifications/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2023 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package notifications

import (
"context"

"github.com/tektoncd/pipeline/pkg/apis/config"
cacheclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cache"
cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)

// ConfigStoreFromContext initialise the config store from the context
func ConfigStoreFromContext(ctx context.Context, cmw configmap.Watcher) *config.Store {
logger := logging.FromContext(ctx)
configStore := config.NewStore(logger.Named("config-store"))
configStore.WatchConfigs(cmw)
return configStore
}

// ReconcilerFromContext initialises a Reconciler from the context
func ReconcilerFromContext(ctx context.Context, c Reconciler) {
c.SetCloudEventsClient(cloudeventclient.Get(ctx))
c.SetCacheClient(cacheclient.Get(ctx))
}

// ControllerOptions returns a function that returns options for a controller implementation
func ControllerOptions(name string, store *config.Store) func(impl *controller.Impl) controller.Options {
return func(impl *controller.Impl) controller.Options {
return controller.Options{
AgentName: name,
ConfigStore: store,
SkipStatusUpdates: true,
}
}
}
30 changes: 9 additions & 21 deletions pkg/reconciler/notifications/customrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,27 @@ package customrun
import (
"context"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
customruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/customrun"
customrunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/customrun"
cacheclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cache"
cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/notifications"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)

const ControllerName = "CustomRunEvents"

// NewController instantiates a new controller.Impl from knative.dev/pkg/controller
// This is a read-only controller, hence the SkipStatusUpdates set to true
func NewController() func(context.Context, configmap.Watcher) *controller.Impl {
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
customRunInformer := customruninformer.Get(ctx)
configStore := notifications.ConfigStoreFromContext(ctx, cmw)

configStore := config.NewStore(logger.Named("config-store"))
configStore.WatchConfigs(cmw)

c := &Reconciler{
cloudEventClient: cloudeventclient.Get(ctx),
cacheClient: cacheclient.Get(ctx),
}
impl := customrunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {
return controller.Options{
AgentName: pipeline.CustomRunControllerName,
ConfigStore: configStore,
SkipStatusUpdates: true,
}
})
c := &Reconciler{}
notifications.ReconcilerFromContext(ctx, c)

impl := customrunreconciler.NewImpl(ctx, c, notifications.ControllerOptions(ControllerName, configStore))

customRunInformer := customruninformer.Get(ctx)
customRunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))

return impl
Expand Down
141 changes: 141 additions & 0 deletions pkg/reconciler/notifications/customrun/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
Copyright 2019 The Tekton Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package customrun_test

import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/notifications/customrun"
ntesting "github.com/tektoncd/pipeline/pkg/reconciler/notifications/testing"
"github.com/tektoncd/pipeline/test"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
cminformer "knative.dev/pkg/configmap/informer"
pkgreconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/system"
_ "knative.dev/pkg/system/testing" // Setup system.Namespace()
)

func InitializeTestController(t *testing.T, d test.Data, a test.Assets) test.Assets {
t.Helper()
configMapWatcher := cminformer.NewInformedWatcher(a.Clients.Kube, system.Namespace())
ctl := customrun.NewController()(a.Ctx, configMapWatcher)
if err := configMapWatcher.Start(a.Ctx.Done()); err != nil {
t.Fatalf("error starting configmap watcher: %v", err)
}

if la, ok := ctl.Reconciler.(pkgreconciler.LeaderAware); ok {
la.Promote(pkgreconciler.UniversalBucket(), func(pkgreconciler.Bucket, types.NamespacedName) {})
}
a.Controller = ctl
return a
}

// TestReconcileNewController runs reconcile with a cloud event sink configured
// to ensure that events are sent in different cases
func TestReconcileNewController(t *testing.T) {
ignoreResourceVersion := cmpopts.IgnoreFields(v1beta1.CustomRun{}, "ObjectMeta.ResourceVersion")

cms := []*corev1.ConfigMap{
{
ObjectMeta: metav1.ObjectMeta{Name: config.GetEventsConfigName(), Namespace: system.Namespace()},
Data: map[string]string{
"sink": "http://synk:8080",
},
}, {
ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()},
Data: map[string]string{
"send-cloudevents-for-runs": "true",
},
},
}

condition := &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionTrue,
Reason: v1beta1.CustomRunReasonSuccessful.String(),
}
objectStatus := duckv1.Status{
Conditions: []apis.Condition{},
}
crStatusFields := v1beta1.CustomRunStatusFields{}
objectStatus.Conditions = append(objectStatus.Conditions, *condition)
customRun := v1beta1.CustomRun{
ObjectMeta: metav1.ObjectMeta{
Name: "test-customRun",
Namespace: "foo",
},
Spec: v1beta1.CustomRunSpec{},
Status: v1beta1.CustomRunStatus{
Status: objectStatus,
CustomRunStatusFields: crStatusFields,
},
}
customRuns := []*v1beta1.CustomRun{&customRun}
wantCloudEvents := []string{`(?s)dev.tekton.event.customrun.successful.v1.*test-customRun`}

d := test.Data{
CustomRuns: customRuns,
ConfigMaps: cms,
ExpectedCloudEventCount: len(wantCloudEvents),
}
testAssets, cancel := ntesting.InitializeTestAssets(t, &d)
defer cancel()
clients := testAssets.Clients

// Initialise the controller.
// Verify that the config map watcher and reconciler setup works well
testAssets = InitializeTestController(t, d, testAssets)
c := testAssets.Controller

if err := c.Reconciler.Reconcile(testAssets.Ctx, ntesting.GetTestResourceName(&customRun)); err != nil {
t.Errorf("didn't expect an error, but got one: %v", err)
}

for _, a := range clients.Kube.Actions() {
aVerb := a.GetVerb()
if aVerb != "get" && aVerb != "list" && aVerb != "watch" {
t.Errorf("Expected only read actions to be logged in the kubeclient, got %s", aVerb)
}
}

crAfter, err := clients.Pipeline.TektonV1beta1().CustomRuns(customRun.Namespace).Get(testAssets.Ctx, customRun.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("getting updated customRun: %v", err)
}

if d := cmp.Diff(&customRun, crAfter, ignoreResourceVersion); d != "" {
t.Fatalf("CustomRun should not have changed, got %v instead", d)
}

ceClient := clients.CloudEvents.(cloudevent.FakeClient)
ceClient.CheckCloudEventsUnordered(t, "controller test", wantCloudEvents)

// Try and reconcile again - expect no event
if err := c.Reconciler.Reconcile(testAssets.Ctx, ntesting.GetTestResourceName(&customRun)); err != nil {
t.Errorf("didn't expect an error, but got one: %v", err)
}
ceClient.CheckCloudEventsUnordered(t, "controller test", []string{})
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@ import (
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
customrunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/customrun"
"github.com/tektoncd/pipeline/pkg/reconciler/events"
"github.com/tektoncd/pipeline/pkg/reconciler/events/cache"
"github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"knative.dev/pkg/apis"
"knative.dev/pkg/logging"
"github.com/tektoncd/pipeline/pkg/reconciler/notifications"
pkgreconciler "knative.dev/pkg/reconciler"
)

Expand All @@ -37,35 +34,36 @@ type Reconciler struct {
cacheClient *lru.Cache
}

func (c *Reconciler) GetCloudEventsClient() cloudevent.CEClient {
return c.cloudEventClient
}

func (c *Reconciler) GetCacheClient() *lru.Cache {
return c.cacheClient
}

func (c *Reconciler) SetCloudEventsClient(client cloudevent.CEClient) {
c.cloudEventClient = client
}

func (c *Reconciler) SetCacheClient(client *lru.Cache) {
c.cacheClient = client
}

// Check that our Reconciler implements customrunreconciler.Interface
var (
_ customrunreconciler.Interface = (*Reconciler)(nil)
)

// ReconcileKind compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the CustomRun
// resource with the current status of the resource.
// ReconcileKind oberves the resource conditions and triggers notifications accordingly
func (c *Reconciler) ReconcileKind(ctx context.Context, customRun *v1beta1.CustomRun) pkgreconciler.Event {
logger := logging.FromContext(ctx)
configs := config.FromContextOrDefaults(ctx)
ctx = cloudevent.ToContext(ctx, c.cloudEventClient)
ctx = cache.ToContext(ctx, c.cacheClient)
logger.Infof("Reconciling %s", customRun.Name)

// Create a copy of the CustomRun object, just in case, to avoid sync'ing changes
customRunEvents := *customRun.DeepCopy()

if configs.FeatureFlags.SendCloudEventsForRuns {
// Custom task controllers may be sending events for "CustomRuns" associated
// to the custom tasks they control. To avoid sending duplicate events,
// CloudEvents for "CustomRuns" are only sent when enabled

// Read and log the condition
condition := customRunEvents.Status.GetCondition(apis.ConditionSucceeded)
logger.Debugf("Emitting cloudevent for %s, condition: %s", customRunEvents.Name, condition)

events.EmitCloudEvents(ctx, &customRunEvents)
return notifications.ReconcileRuntimeObject(ctx, c, customRun)
}

return nil
}
Loading