Skip to content

Commit

Permalink
Rework the events controller cache
Browse files Browse the repository at this point in the history
Switch the cache implementation from a simple LRU to the bigcache
library which provides better performance and reduces the need
for garbage collection.

Further improve the cache by reducing the size of keys, by
adopting hashing of keys and thus storing smaller data in the cache.
Hashing also allows for detecting changes in a larger surface of
the data. While todays keys are based on the event type and
resource metadata, we can now include conditions in the equation
as well, to provide and enhanced logic for event sending.

Storing a resource metadata and its condition in the hashed keys
means we can detect immediately if no relevant change was made
in a resource, and quickly finish the reconcile cycle.
Additionally storing the event type along with the resource
metadata, means that we can detect if a specific message was
already sent for a specific resource.

To be able to rely on the cache to decide which event to send,
we need to ensure that reconcile cycles for the same resource
are never executed concurrently. To achieve this, the thread
number (or concurrency) is set to 1 in the controller.

Scalability needs can be addressed by scaling the events
controller horizontally. The sharding of resources across
instances of the reconciler is based on the resource key, so
the same resource will always be directed to the same controller
thus avoiding any cache miss.

Signed-off-by: Andrea Frittoli <andrea.frittoli@uk.ibm.com>
  • Loading branch information
afrittoli committed Jul 10, 2023
1 parent b9ac5d8 commit ded9367
Show file tree
Hide file tree
Showing 16 changed files with 483 additions and 155 deletions.
2 changes: 1 addition & 1 deletion config/config-events.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ data:
# this example block and unindented to be in the data block
# to actually change the configuration.
# formats contains a comma seperated list of event formats to be used
# formats contains a comma separated list of event formats to be used
# the only format supported today is "tektonv1". An empty string is not
# a valid configuration. To disable events, do not specify the sink.
formats: "tektonv1"
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/ahmetb/gen-crd-api-reference-docs v0.3.1-0.20220720053627-e327d0730470 // Waiting for https://github.com/ahmetb/gen-crd-api-reference-docs/pull/43/files to merge
github.com/allegro/bigcache/v3 v3.1.0
github.com/cloudevents/sdk-go/v2 v2.14.0
github.com/containerd/containerd v1.6.19
github.com/go-git/go-git/v5 v5.6.1
Expand All @@ -13,7 +14,6 @@ require (
github.com/google/uuid v1.3.0
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/golang-lru v0.5.4
github.com/jenkins-x/go-scm v1.13.13
github.com/mitchellh/go-homedir v1.1.0
github.com/opencontainers/image-spec v1.1.0-rc4
Expand Down Expand Up @@ -52,6 +52,7 @@ require (
code.gitea.io/sdk/gitea v0.15.1
github.com/goccy/kpoward v0.1.0
github.com/google/go-containerregistry/pkg/authn/k8schain v0.0.0-20230625233257-b8504803389b
github.com/hashicorp/golang-lru v0.5.4
github.com/sigstore/sigstore/pkg/signature/kms/aws v1.7.1
github.com/sigstore/sigstore/pkg/signature/kms/azure v1.7.1
github.com/sigstore/sigstore/pkg/signature/kms/gcp v1.7.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/pipeline/v1beta1/run_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type RunObject interface {
// ObjectMetaAccessor requires a GetObjectMeta that returns the ObjectMeta
metav1.ObjectMetaAccessor

// GetStatusCondition returns a ConditionAccessor for the status of the objectWithCondition
// GetStatusCondition returns a ConditionAccessor
GetStatusCondition() apis.ConditionAccessor

IsSuccessful() bool
Expand Down
133 changes: 106 additions & 27 deletions pkg/reconciler/events/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,52 +17,131 @@ limitations under the License.
package cache

import (
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"strconv"
"time"

bc "github.com/allegro/bigcache/v3"
cloudevents "github.com/cloudevents/sdk-go/v2"
lru "github.com/hashicorp/golang-lru"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"knative.dev/pkg/apis"
)

// Struct to unmarshal the event data
type eventData struct {
CustomRun *v1beta1.CustomRun `json:"customRun,omitempty"`
// ContainsOrAddKey checks if the key exists in the cache
// - it returns true if they key was found in the cache
// - it returns false if it wasn't and adds the key
// Either way, the current timestamp is appended as value
func ContainsOrAddKey(cacheClient *bc.BigCache, key string) (bool, error) {
if cacheClient == nil {
return false, errors.New("cache client is nil")
}
// Set the new key
timeNow, err := hash(time.Now().Format(time.RFC3339Nano))
if err != nil {
return false, err
}
err = cacheClient.Append(key, []byte(timeNow))
if err != nil {
return false, err
}
// Get the value - if it matches what we added, the key was new
value, err := cacheClient.Get(key)
if err != nil {
return false, err
}
return (string(value) != timeNow), nil
}

// ContainsOrAddCloudEvent checks if the event exists in the cache
func ContainsOrAddCloudEvent(cacheClient *lru.Cache, event *cloudevents.Event) (bool, error) {
// - it returns true if they key was found in the cache
// - it returns false if it wasn't and adds the key
// Either way, the current timestamp is appended as value
// The key is calculated via EventKey
func ContainsOrAddCloudEvent(cacheClient *bc.BigCache, event *cloudevents.Event, object v1beta1.RunObject) (bool, error) {
if cacheClient == nil {
return false, errors.New("cache client is nil")
}
eventKey, err := EventKey(event)
eventKey, err := EventKey(event, object)
if err != nil {
return false, err
}
isPresent, _ := cacheClient.ContainsOrAdd(eventKey, nil)
return isPresent, nil
return ContainsOrAddKey(cacheClient, eventKey)
}

// EventKey defines whether an event is considered different from another
// in future we might want to let specific event types override this
func EventKey(event *cloudevents.Event) (string, error) {
var (
data eventData
resourceName string
resourceNamespace string
resourceKind string
)
err := json.Unmarshal(event.Data(), &data)
// ContainsOrAddObject checks if the object exists in the cache
// - it returns true if they key was found in the cache
// - it returns false if it wasn't and adds the key
// Either way, the current timestamp is appended as value
// The key is calculated via ObjectKey
func ContainsOrAddObject(cacheClient *bc.BigCache, object v1beta1.RunObject) (bool, error) {
if cacheClient == nil {
return false, errors.New("cache client is nil")
}
eventKey, err := ObjectKey(object)
if err != nil {
return "", err
return false, err
}
if data.CustomRun == nil {
return "", fmt.Errorf("Invalid CustomRun data in %v", event)
return ContainsOrAddKey(cacheClient, eventKey)
}

// EventKey is the event cache key which combines event type and object kind, namespace and name
func EventKey(event *cloudevents.Event, object v1beta1.RunObject) (string, error) {
if object == nil || event == nil {
return "", fmt.Errorf("both object (%v) and event (%v) must be not nil", object, event)
}
if object.GetObjectKind() == nil {
return "", fmt.Errorf("object %v has nil object kind", object)
}
if object.GetObjectMeta() == nil {
return "", fmt.Errorf("object %v has nil object meta", object)
}
return hash(fmt.Sprintf("%s/%s/%s/%s/%s/%s",
event.Type(),
object.GetObjectKind().GroupVersionKind().Group,
object.GetObjectKind().GroupVersionKind().Version,
object.GetObjectKind().GroupVersionKind().Kind,
object.GetObjectMeta().GetNamespace(),
object.GetObjectMeta().GetName()))
}

// ObjectKey is the object condition cache key which combines condition and object kind, namespace and name
func ObjectKey(object v1beta1.RunObject) (string, error) {
var condition *apis.Condition
if object == nil {
return "", errors.New("object must be not nil")
}
// The condition may not be set, in that case return an empty condition
if object.GetStatusCondition() != nil {
condition = object.GetStatusCondition().GetCondition(apis.ConditionSucceeded)
}
if condition == nil {
condition = &apis.Condition{}
}
if object.GetObjectKind() == nil {
return "", fmt.Errorf("object %v has nil object kind", object)
}
if object.GetObjectMeta() == nil {
return "", fmt.Errorf("object %v has nil object meta", object)
}
return hash(fmt.Sprintf("%s/%s/%s/%s/%s/%s/%s/%s",
condition.Status,
condition.Reason,
condition.Message,
object.GetObjectKind().GroupVersionKind().Group,
object.GetObjectKind().GroupVersionKind().Version,
object.GetObjectKind().GroupVersionKind().Kind,
object.GetObjectMeta().GetNamespace(),
object.GetObjectMeta().GetName()))
}

// hash provide fnv64 hash converted to string in base36
func hash(input string) (string, error) {
hasher := fnv.New64a()
_, err := hasher.Write([]byte(input))
if err != nil {
return "", err
}
resourceName = data.CustomRun.Name
resourceNamespace = data.CustomRun.Namespace
resourceKind = "customrun"
eventType := event.Type()
return fmt.Sprintf("%s/%s/%s/%s", eventType, resourceKind, resourceNamespace, resourceName), nil
return strconv.FormatUint(hasher.Sum64(), 36), nil
}
Loading

0 comments on commit ded9367

Please sign in to comment.