Skip to content

Commit

Permalink
Enable storing of Run Events as Record
Browse files Browse the repository at this point in the history
All Events related to taskrun are stored when we are done with Runs
and in a single List.
This can be controlled by flag passed to watcher "store-event". Put
it to false disable storing of eventlist.
Record Name of EventList is stored as `results.tekton.dev/eventlist`
in TaskRun and PipelineRun.
  • Loading branch information
khrm authored and tekton-robot committed Jul 3, 2024
1 parent 31db5c9 commit 1e174ec
Show file tree
Hide file tree
Showing 15 changed files with 312 additions and 13 deletions.
2 changes: 2 additions & 0 deletions cmd/watcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ var (
checkOwner = flag.Bool("check_owner", true, "If enabled, owner references will be checked while deleting objects")
updateLogTimeout = flag.Duration("update_log_timeout", 300*time.Second, "How log the Watcher waits for the UpdateLog operation for storing logs to complete before it aborts.")
dynamicReconcileTimeout = flag.Duration("dynamic_reconcile_timeout", 30*time.Second, "How long the Watcher waits for the dynamic reconciler to complete before it aborts.")
storeEvent = flag.Bool("store_event", false, "If enabled, events related to runs will also be stored")
)

func main() {
Expand Down Expand Up @@ -106,6 +107,7 @@ func main() {
CheckOwner: *checkOwner,
UpdateLogTimeout: updateLogTimeout,
DynamicReconcileTimeout: dynamicReconcileTimeout,
StoreEvent: *storeEvent,
}
log.Printf("dynamic reconcile timeout %s and update log timeout is %s", cfg.DynamicReconcileTimeout.String(), cfg.UpdateLogTimeout.String())

Expand Down
2 changes: 1 addition & 1 deletion config/base/100-watcher-serviceaccount.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ rules:
# Watcher currently get config from APISever, so will
# fail to start if it does not have this permission.
- apiGroups: [""]
resources: ["configmaps", "pods"]
resources: ["configmaps", "pods", "events"]
verbs: ["get", "list", "watch"]
# Required to read logs, when logs API is enabled
- apiGroups: [""]
Expand Down
2 changes: 2 additions & 0 deletions docs/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ Possible values for `data_type` and `summary.type` (for Result) are:
- `tekton.dev/v1beta1.TaskRun` or `TASK_RUN`
- `tekton.dev/v1beta1.PipelineRun` or `PIPELINE_RUN`
- `results.tekton.dev/v1alpha2.Log`
- `results.tekton.dev/v1alpha3.Log`
- `results.tekton.dev/v1.EventList`

#### The `data` field in Record

Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/v1alpha3/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"k8s.io/apimachinery/pkg/types"
)

// EventListRecordType represents the API resource type for EventSet records.
const EventListRecordType = "results.tekton.dev/v1.EventList"

// LogRecordType represents the API resource type for Tekton log records.
const LogRecordType = "results.tekton.dev/v1alpha3.Log"

Expand Down
3 changes: 3 additions & 0 deletions pkg/watcher/reconciler/annotation/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ const (
// Log identifier.
Log = "results.tekton.dev/log"

// EventList identifier.
EventList = "results.tekton.dev/eventlist"

// ResultAnnotations is an annotation that integrators should add to objects in order to store
// arbitrary keys/values into the Result.Annotations field.
ResultAnnotations = "results.tekton.dev/resultAnnotations"
Expand Down
2 changes: 2 additions & 0 deletions pkg/watcher/reconciler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type Config struct {

// DynamicReconcileTimeout is the time we provide for the dynamic reconciler to process an event
DynamicReconcileTimeout *time.Duration
// Whether to Store Events related to Taskrun and Pipelineruns
StoreEvent bool
}

// GetDisableAnnotationUpdate returns whether annotation updates should be
Expand Down
129 changes: 128 additions & 1 deletion pkg/watcher/reconciler/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package dynamic
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
"sync"
Expand All @@ -38,10 +39,12 @@ import (
"github.com/tektoncd/results/pkg/watcher/results"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/apis"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
Expand All @@ -54,6 +57,9 @@ var (
// Reconciler implements common reconciler behavior across different Tekton Run
// Object types.
type Reconciler struct {
// KubeClientSet allows us to talk to the k8s for core APIs
KubeClientSet kubernetes.Interface

resultsClient *results.Client
objectClient ObjectClient
cfg *reconciler.Config
Expand All @@ -79,9 +85,10 @@ type IsReadyForDeletion func(ctx context.Context, object results.Object) (bool,
type AfterDeletion func(ctx context.Context, object results.Object) error

// NewDynamicReconciler creates a new dynamic Reconciler.
func NewDynamicReconciler(rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient, cfg *reconciler.Config) *Reconciler {
func NewDynamicReconciler(kubeClientSet kubernetes.Interface, rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient, cfg *reconciler.Config) *Reconciler {
return &Reconciler{
resultsClient: results.NewClient(rc, lc),
KubeClientSet: kubeClientSet,
objectClient: oc,
cfg: cfg,
// Always true predicate.
Expand Down Expand Up @@ -227,6 +234,26 @@ func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error {
}
}

// CreateEvents if enabled
if r.cfg.StoreEvent {
if err := r.storeEvents(ctx, o); err != nil {
logger.Errorw("Error storing eventlist",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.Error(err),
)
if ctxCancel != nil {
ctxCancel()
}
return err
}
logger.Debugw("Successfully store eventlist",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
)
}
logger = logger.With(zap.String("results.tekton.dev/result", res.Name),
zap.String("results.tekton.dev/record", rec.Name))
logger.Debugw("Record has been successfully upserted into API server", timeTakenField)
Expand Down Expand Up @@ -584,3 +611,103 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType,

return nil
}

// storeEvents streams logs to the API server
func (r *Reconciler) storeEvents(ctx context.Context, o results.Object) error {
logger := logging.FromContext(ctx)
condition := o.GetStatusCondition().GetCondition(apis.ConditionSucceeded)
GVK := o.GetObjectKind().GroupVersionKind()
if !GVK.Empty() &&
(GVK.Kind == "TaskRun" || GVK.Kind == "PipelineRun") &&
condition != nil &&
!condition.IsUnknown() {

rec, err := r.resultsClient.GetEventListRecord(ctx, o)
if err != nil {
return err
}

if rec != nil {
// It means we have already stored events
eventListName := rec.GetName()
// Update Events annotation if it doesn't exist
return r.addResultsAnnotations(ctx, o, annotation.Annotation{Name: annotation.EventList, Value: eventListName})
}

events, err := r.KubeClientSet.CoreV1().Events(o.GetNamespace()).List(ctx, metav1.ListOptions{
FieldSelector: "involvedObject.uid=" + string(o.GetUID()),
})
if err != nil {
logger.Errorf("Failed to store events - retrieve",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.String("err", err.Error()),
)
return err
}

tr, ok := o.(*pipelinev1beta1.TaskRun)

if ok {
podName := tr.Status.PodName
podEvents, err := r.KubeClientSet.CoreV1().Events(o.GetNamespace()).List(ctx, metav1.ListOptions{
FieldSelector: "involvedObject.name=" + podName,
})
if err != nil {
logger.Errorf("Failed to fetch taskrun pod events",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.String("podname", podName),
zap.String("err", err.Error()),
)
}
if podEvents != nil && len(podEvents.Items) > 0 {
events.Items = append(events.Items, podEvents.Items...)
}

}

data := filterEventList(events)
eventList, err := json.Marshal(data)
if err != nil {
logger.Errorf("Failed to store events - marshal",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.String("err", err.Error()),
)
return err
}

rec, err = r.resultsClient.PutEventList(ctx, o, eventList)
if err != nil {
return err
}

if err := r.addResultsAnnotations(ctx, o, annotation.Annotation{Name: annotation.EventList, Value: rec.GetName()}); err != nil {
return err
}

}

return nil
}

func filterEventList(events *v1.EventList) *v1.EventList {
if events == nil || len(events.Items) == 0 {
return events
}

for i, event := range events.Items {
// Only taking Name, Namespace and CreationTimeStamp for ObjectMeta
events.Items[i].ObjectMeta = metav1.ObjectMeta{
Name: event.Name,
Namespace: event.Namespace,
CreationTimestamp: event.CreationTimestamp,
}
}

return events
}
37 changes: 34 additions & 3 deletions pkg/watcher/reconciler/dynamic/dynamic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sTest "k8s.io/client-go/kubernetes/fake"
"knative.dev/pkg/apis"
duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -138,9 +139,12 @@ func TestReconcile_TaskRun(t *testing.T) {
cfg := &reconciler.Config{
DisableAnnotationUpdate: true,
RequeueInterval: 1 * time.Second,
StoreEvent: true,
}

r := NewDynamicReconciler(resultsClient, logsClient, trclient, cfg)
client := k8sTest.NewSimpleClientset()

r := NewDynamicReconciler(client, resultsClient, logsClient, trclient, cfg)
if err := r.Reconcile(ctx, taskrun); err != nil {
t.Fatal(err)
}
Expand All @@ -164,10 +168,14 @@ func TestReconcile_TaskRun(t *testing.T) {
if err != nil {
t.Fatalf("Error parsing result uid: %v", err)
}
logRecordName := record.FormatName(resultName, uuid.NewMD5(uid, []byte(taskrun.GetUID())).String())
logRecordName := record.FormatName(resultName, uuid.NewMD5(uid, []byte(taskrun.GetUID()+"eventlist")).String())
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: logRecordName}); err != nil {
t.Fatalf("Error getting log record: %v", err)
}
eventListName := watcherresults.FormatEventListName(resultName, uid, taskrun)
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: eventListName}); err != nil {
t.Fatalf("Error getting eventlist %s: record: %v", eventListName, err)
}
})

// Enable Annotation Updates, re-reconcile
Expand Down Expand Up @@ -204,6 +212,14 @@ func TestReconcile_TaskRun(t *testing.T) {
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: logRecordName}); err != nil {
t.Fatalf("Error getting log record '%s': %v", logRecordName, err)
}
eventListName := tr.GetAnnotations()[annotation.EventList]
if eventListName == "" {
t.Fatalf("Error parsing eventlist name '%s'", eventListName)
}
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: eventListName}); err != nil {
t.Fatalf("Error getting eventlist record '%s': %v", eventListName, err)
}

})

t.Run("delete object once grace period elapses", func(t *testing.T) {
Expand Down Expand Up @@ -428,8 +444,13 @@ func TestReconcile_PipelineRun(t *testing.T) {
if _, err := prclient.Create(ctx, pipelinerun, metav1.CreateOptions{}); err != nil {
t.Fatal(err)
}
cfg := &reconciler.Config{
StoreEvent: true,
}

client := k8sTest.NewSimpleClientset()

r := NewDynamicReconciler(resultsClient, logsClient, prclient, nil)
r := NewDynamicReconciler(client, resultsClient, logsClient, prclient, cfg)
if err := r.Reconcile(ctx, pipelinerun); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -474,6 +495,16 @@ func TestReconcile_PipelineRun(t *testing.T) {
t.Fatalf("Error getting log record: %v", err)
}
})
t.Run("EventList", func(t *testing.T) {

eventListName := pr.GetAnnotations()[annotation.EventList]
if eventListName == "" {
t.Fatalf("Error parsing eventlist name '%s'", eventListName)
}
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: eventListName}); err != nil {
t.Fatalf("Error getting eventlist record '%s': %v", eventListName, err)
}
})

// We don't do the same exhaustive feature testing as TaskRuns here -
// since everything is handled as a generic object testing TaskRuns should
Expand Down
2 changes: 2 additions & 0 deletions pkg/watcher/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/tektoncd/results/pkg/watcher/reconciler/leaderelection"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)
Expand All @@ -48,6 +49,7 @@ func NewControllerWithConfig(ctx context.Context, resultsClient pb.ResultsClient

c := &Reconciler{
LeaderAwareFuncs: leaderelection.NewLeaderAwareFuncs(pipelineRunLister.List),
kubeClientSet: kubeclient.Get(ctx),
resultsClient: resultsClient,
logsClient: logs.Get(ctx),
pipelineRunLister: pipelineRunLister,
Expand Down
6 changes: 5 additions & 1 deletion pkg/watcher/reconciler/pipelinerun/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
Expand All @@ -43,6 +44,9 @@ type Reconciler struct {
// Inline LeaderAwareFuncs to support leader election.
knativereconciler.LeaderAwareFuncs

// kubeClientSet allows us to talk to the k8s for core APIs
kubeClientSet kubernetes.Interface

resultsClient pb.ResultsClient
logsClient pb.LogsClient
pipelineRunLister pipelinev1beta1listers.PipelineRunLister
Expand Down Expand Up @@ -86,7 +90,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error {
PipelineRunInterface: r.pipelineClient.TektonV1beta1().PipelineRuns(namespace),
}

dyn := dynamic.NewDynamicReconciler(r.resultsClient, r.logsClient, pipelineRunClient, r.cfg)
dyn := dynamic.NewDynamicReconciler(r.kubeClientSet, r.resultsClient, r.logsClient, pipelineRunClient, r.cfg)
// Tell the dynamic reconciler to wait until all underlying TaskRuns are
// ready for deletion before deleting the PipelineRun. This guarantees
// that the TaskRuns will not be deleted before their final state being
Expand Down
2 changes: 2 additions & 0 deletions pkg/watcher/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/tektoncd/results/pkg/watcher/reconciler/leaderelection"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)
Expand All @@ -47,6 +48,7 @@ func NewControllerWithConfig(ctx context.Context, resultsClient pb.ResultsClient

c := &Reconciler{
LeaderAwareFuncs: leaderelection.NewLeaderAwareFuncs(lister.List),
kubeClientSet: kubeclient.Get(ctx),
resultsClient: resultsClient,
logsClient: logs.Get(ctx),
lister: lister,
Expand Down
Loading

0 comments on commit 1e174ec

Please sign in to comment.