diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 96f9ebe72..53141068b 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -120,6 +120,9 @@ type ClusterCache interface { // IterateHierarchy iterates resource tree starting from the specified top level resource and executes callback for each resource in the tree. // The action callback returns true if iteration should continue and false otherwise. IterateHierarchy(key kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) + // IterateHierarchyV2 iterates resource tree starting from the specified top level resources and executes callback for each resource in the tree. + // The action callback returns true if iteration should continue and false otherwise. + IterateHierarchyV2(keys []kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) // IsNamespaced answers if specified group/kind is a namespaced resource API or not IsNamespaced(gk schema.GroupKind) (bool, error) // GetManagedLiveObjs helps finding matching live K8S resources for a given resources list. @@ -1004,6 +1007,96 @@ func (c *clusterCache) IterateHierarchy(key kube.ResourceKey, action func(resour } } +// IterateHierarchy iterates resource tree starting from the specified top level resources and executes callback for each resource in the tree +func (c *clusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) { + c.lock.RLock() + defer c.lock.RUnlock() + keysPerNamespace := make(map[string][]kube.ResourceKey) + for _, key := range keys { + _, ok := c.resources[key] + if !ok { + continue + } + keysPerNamespace[key.Namespace] = append(keysPerNamespace[key.Namespace], key) + } + for namespace, namespaceKeys := range keysPerNamespace { + nsNodes := c.nsIndex[namespace] + // Prepare to construct a graph + nodesByUID := make(map[types.UID][]*Resource) + nodeByGraphKey := make(map[string]*Resource) + for _, node := range nsNodes { + nodesByUID[node.Ref.UID] = append(nodesByUID[node.Ref.UID], node) + // Based on what's used by isParentOf + graphKey := fmt.Sprintf("%s/%s/%s", node.Ref.Kind, node.Ref.APIVersion, node.Ref.Name) + nodeByGraphKey[graphKey] = node + } + // Construct a graph using a logic similar to isParentOf but more optimal + graph := make(map[kube.ResourceKey][]kube.ResourceKey) + childrenByUID := make(map[kube.ResourceKey]map[types.UID][]*Resource) + for _, node := range nsNodes { + childrenByUID[node.ResourceKey()] = make(map[types.UID][]*Resource) + } + for _, node := range nsNodes { + for i, ownerRef := range node.OwnerRefs { + // backfill UID of inferred owner child references + if ownerRef.UID == "" { + graphKey := fmt.Sprintf("%s/%s/%s", ownerRef.Kind, ownerRef.APIVersion, ownerRef.Name) + graphKeyNode, ok := nodeByGraphKey[graphKey] + if ok { + ownerRef.UID = graphKeyNode.Ref.UID + node.OwnerRefs[i] = ownerRef + } else { + continue + } + } + + uidNodes, ok := nodesByUID[ownerRef.UID] + if ok { + for _, uidNode := range uidNodes { + graph[uidNode.ResourceKey()] = append(graph[uidNode.ResourceKey()], node.ResourceKey()) + childrenByUID[uidNode.ResourceKey()][node.Ref.UID] = append(childrenByUID[uidNode.ResourceKey()][node.Ref.UID], node) + } + } + } + } + visited := make(map[kube.ResourceKey]int) + for _, key := range namespaceKeys { + visited[key] = 0 + } + for _, key := range namespaceKeys { + // The check for existence of key is done above. + res := c.resources[key] + if visited[key] == 2 || !action(res, nsNodes) { + continue + } + visited[key] = 1 + // make sure children has no duplicates + for _, children := range childrenByUID[key] { + if len(children) > 0 { + // The object might have multiple children with the same UID (e.g. replicaset from apps and extensions group). It is ok to pick any object but we need to make sure + // we pick the same child after every refresh. + sort.Slice(children, func(i, j int) bool { + key1 := children[i].ResourceKey() + key2 := children[j].ResourceKey() + return strings.Compare(key1.String(), key2.String()) < 0 + }) + child := children[0] + if visited[child.ResourceKey()] == 0 && action(child, nsNodes) { + child.iterateChildrenV2(graph, nsNodes, visited, func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool { + if err != nil { + c.log.V(2).Info(err.Error()) + return false + } + return action(child, namespaceResources) + }) + } + } + } + visited[key] = 2 + } + } +} + // IsNamespaced answers if specified group/kind is a namespaced resource API or not func (c *clusterCache) IsNamespaced(gk schema.GroupKind) (bool, error) { if isNamespaced, ok := c.namespacedResources[gk]; ok { diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index f5e61a065..6ffbbf4fe 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -112,6 +112,10 @@ func newClusterWithOptions(t *testing.T, opts []UpdateSettingsFunc, objs ...runt GroupKind: schema.GroupKind{Group: "apps", Kind: "StatefulSet"}, GroupVersionResource: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}, Meta: metav1.APIResource{Namespaced: true}, + }, { + GroupKind: schema.GroupKind{Group: "extensions", Kind: "ReplicaSet"}, + GroupVersionResource: schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicasets"}, + Meta: metav1.APIResource{Namespaced: true}, }} opts = append([]UpdateSettingsFunc{ @@ -289,7 +293,7 @@ func TestEnsureSyncedSingleNamespace(t *testing.T) { } func TestGetChildren(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) err := cluster.EnsureSynced() require.NoError(t, err) @@ -298,7 +302,7 @@ func TestGetChildren(t *testing.T) { Ref: corev1.ObjectReference{ Kind: "Pod", Namespace: "default", - Name: "helm-guestbook-pod", + Name: "helm-guestbook-pod-1", APIVersion: "v1", UID: "1", }, @@ -332,7 +336,7 @@ func TestGetChildren(t *testing.T) { } func TestGetManagedLiveObjs(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -358,7 +362,7 @@ metadata: } func TestGetManagedLiveObjsNamespacedModeClusterLevelResource(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -383,7 +387,7 @@ metadata: } func TestGetManagedLiveObjsNamespacedModeClusterLevelResource_ClusterResourceEnabled(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -424,7 +428,7 @@ metadata: } func TestGetManagedLiveObjsAllNamespaces(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -452,7 +456,7 @@ metadata: } func TestGetManagedLiveObjsValidNamespace(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -480,7 +484,7 @@ metadata: } func TestGetManagedLiveObjsInvalidNamespace(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) cluster.Invalidate(SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) { return nil, true })) @@ -587,26 +591,26 @@ metadata: } func TestChildDeletedEvent(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) err := cluster.EnsureSynced() require.NoError(t, err) - cluster.processEvent(watch.Deleted, mustToUnstructured(testPod())) + cluster.processEvent(watch.Deleted, mustToUnstructured(testPod1())) rsChildren := getChildren(cluster, mustToUnstructured(testRS())) assert.Equal(t, []*Resource{}, rsChildren) } func TestProcessNewChildEvent(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testRS(), testDeploy()) err := cluster.EnsureSynced() require.NoError(t, err) newPod := strToUnstructured(` apiVersion: v1 kind: Pod metadata: - uid: "4" - name: helm-guestbook-pod2 + uid: "5" + name: helm-guestbook-pod-1-new namespace: default ownerReferences: - apiVersion: apps/v1 @@ -625,7 +629,7 @@ func TestProcessNewChildEvent(t *testing.T) { Ref: corev1.ObjectReference{ Kind: "Pod", Namespace: "default", - Name: "helm-guestbook-pod", + Name: "helm-guestbook-pod-1", APIVersion: "v1", UID: "1", }, @@ -643,9 +647,9 @@ func TestProcessNewChildEvent(t *testing.T) { Ref: corev1.ObjectReference{ Kind: "Pod", Namespace: "default", - Name: "helm-guestbook-pod2", + Name: "helm-guestbook-pod-1-new", APIVersion: "v1", - UID: "4", + UID: "5", }, OwnerRefs: []metav1.OwnerReference{{ APIVersion: "apps/v1", @@ -658,10 +662,10 @@ func TestProcessNewChildEvent(t *testing.T) { } func TestWatchCacheUpdated(t *testing.T) { - removed := testPod() + removed := testPod1() removed.SetName(removed.GetName() + "-removed-pod") - updated := testPod() + updated := testPod1() updated.SetName(updated.GetName() + "-updated-pod") updated.SetResourceVersion("updated-pod-version") @@ -670,10 +674,10 @@ func TestWatchCacheUpdated(t *testing.T) { require.NoError(t, err) - added := testPod() + added := testPod1() added.SetName(added.GetName() + "-new-pod") - podGroupKind := testPod().GroupVersionKind().GroupKind() + podGroupKind := testPod1().GroupVersionKind().GroupKind() cluster.lock.Lock() defer cluster.lock.Unlock() @@ -684,13 +688,13 @@ func TestWatchCacheUpdated(t *testing.T) { } func TestNamespaceModeReplace(t *testing.T) { - ns1Pod := testPod() + ns1Pod := testPod1() ns1Pod.SetNamespace("ns1") ns1Pod.SetName("pod1") - ns2Pod := testPod() + ns2Pod := testPod1() ns2Pod.SetNamespace("ns2") - podGroupKind := testPod().GroupVersionKind().GroupKind() + podGroupKind := testPod1().GroupVersionKind().GroupKind() cluster := newCluster(t, ns1Pod, ns2Pod) err := cluster.EnsureSynced() @@ -805,14 +809,14 @@ func getResourceKey(t *testing.T, obj runtime.Object) kube.ResourceKey { return kube.NewResourceKey(gvk.Group, gvk.Kind, m.GetNamespace(), m.GetName()) } -func testPod() *corev1.Pod { +func testPod1() *corev1.Pod { return &corev1.Pod{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", Kind: "Pod", }, ObjectMeta: metav1.ObjectMeta{ - Name: "helm-guestbook-pod", + Name: "helm-guestbook-pod-1", Namespace: "default", UID: "1", ResourceVersion: "123", @@ -829,6 +833,30 @@ func testPod() *corev1.Pod { } } +// Similar to pod1, but owner reference lacks uid +func testPod2() *corev1.Pod { + return &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "helm-guestbook-pod-2", + Namespace: "default", + UID: "4", + ResourceVersion: "123", + CreationTimestamp: metav1.NewTime(testCreationTime), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: "helm-guestbook-rs", + }, + }, + }, + } +} + func testCRD() *apiextensions.CustomResourceDefinition { return &apiextensions.CustomResourceDefinition{ TypeMeta: metav1.TypeMeta{ @@ -958,7 +986,7 @@ func testDeploy() *appsv1.Deployment { } func TestIterateHierachy(t *testing.T) { - cluster := newCluster(t, testPod(), testRS(), testDeploy()) + cluster := newCluster(t, testPod1(), testPod2(), testRS(), testExtensionsRS(), testDeploy()) err := cluster.EnsureSynced() require.NoError(t, err) @@ -971,7 +999,8 @@ func TestIterateHierachy(t *testing.T) { assert.ElementsMatch(t, []kube.ResourceKey{ - kube.GetResourceKey(mustToUnstructured(testPod())), + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), kube.GetResourceKey(mustToUnstructured(testRS())), kube.GetResourceKey(mustToUnstructured(testDeploy()))}, keys) @@ -1016,10 +1045,135 @@ func TestIterateHierachy(t *testing.T) { []kube.ResourceKey{ kube.GetResourceKey(mustToUnstructured(testDeploy())), kube.GetResourceKey(mustToUnstructured(testRS())), - kube.GetResourceKey(mustToUnstructured(testPod())), + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), }, keys) }) + + // After uid is backfilled for owner of pod2, it should appear in results here as well. + t.Run("IterateStartFromExtensionsRS", func(t *testing.T) { + keys := []kube.ResourceKey{} + cluster.IterateHierarchy(kube.GetResourceKey(mustToUnstructured(testExtensionsRS())), func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return true + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + kube.GetResourceKey(mustToUnstructured(testExtensionsRS()))}, + keys) + }) +} + +func TestIterateHierachyV2(t *testing.T) { + cluster := newCluster(t, testPod1(), testPod2(), testRS(), testExtensionsRS(), testDeploy()) + err := cluster.EnsureSynced() + require.NoError(t, err) + + t.Run("IterateAll", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return true + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + kube.GetResourceKey(mustToUnstructured(testRS())), + kube.GetResourceKey(mustToUnstructured(testDeploy()))}, + keys) + }) + + t.Run("ExitAtRoot", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return false + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testDeploy()))}, + keys) + }) + + t.Run("ExitAtSecondLevelChild", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return child.ResourceKey().Kind != kube.ReplicaSetKind + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testDeploy())), + kube.GetResourceKey(mustToUnstructured(testRS())), + }, + keys) + }) + + t.Run("ExitAtThirdLevelChild", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testDeploy()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return child.ResourceKey().Kind != kube.PodKind + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testDeploy())), + kube.GetResourceKey(mustToUnstructured(testRS())), + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + }, + keys) + }) + + t.Run("IterateAllStartFromMultiple", func(t *testing.T) { + startKeys := []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testRS())), + kube.GetResourceKey(mustToUnstructured(testDeploy())), + } + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return true + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + kube.GetResourceKey(mustToUnstructured(testRS())), + kube.GetResourceKey(mustToUnstructured(testDeploy()))}, + keys) + }) + + // After uid is backfilled for owner of pod2, it should appear in results here as well. + t.Run("IterateStartFromExtensionsRS", func(t *testing.T) { + startKeys := []kube.ResourceKey{kube.GetResourceKey(mustToUnstructured(testExtensionsRS()))} + keys := []kube.ResourceKey{} + cluster.IterateHierarchyV2(startKeys, func(child *Resource, _ map[kube.ResourceKey]*Resource) bool { + keys = append(keys, child.ResourceKey()) + return true + }) + + assert.ElementsMatch(t, + []kube.ResourceKey{ + kube.GetResourceKey(mustToUnstructured(testPod1())), + kube.GetResourceKey(mustToUnstructured(testPod2())), + kube.GetResourceKey(mustToUnstructured(testExtensionsRS()))}, + keys) + }) } // Test_watchEvents_Deadlock validates that starting watches will not create a deadlock @@ -1031,7 +1185,7 @@ func Test_watchEvents_Deadlock(t *testing.T) { deadlock := sync.RWMutex{} hasDeadlock := false - res1 := testPod() + res1 := testPod1() res2 := testRS() cluster := newClusterWithOptions(t, []UpdateSettingsFunc{ diff --git a/pkg/cache/mocks/ClusterCache.go b/pkg/cache/mocks/ClusterCache.go index 7a1be7324..c9fbc8f9c 100644 --- a/pkg/cache/mocks/ClusterCache.go +++ b/pkg/cache/mocks/ClusterCache.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.43.2. DO NOT EDIT. package mocks @@ -237,6 +237,11 @@ func (_m *ClusterCache) IterateHierarchy(key kube.ResourceKey, action func(*cach _m.Called(key, action) } +// IterateHierarchyV2 provides a mock function with given fields: keys, action +func (_m *ClusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(*cache.Resource, map[kube.ResourceKey]*cache.Resource) bool) { + _m.Called(keys, action) +} + // OnEvent provides a mock function with given fields: handler func (_m *ClusterCache) OnEvent(handler cache.OnEventHandler) cache.Unsubscribe { ret := _m.Called(handler) diff --git a/pkg/cache/resource.go b/pkg/cache/resource.go index 4097f4dca..95ef8b1cf 100644 --- a/pkg/cache/resource.go +++ b/pkg/cache/resource.go @@ -99,3 +99,28 @@ func (r *Resource) iterateChildren(ns map[kube.ResourceKey]*Resource, parents ma } } } + +func (r *Resource) iterateChildrenV2(graph map[kube.ResourceKey][]kube.ResourceKey, ns map[kube.ResourceKey]*Resource, visited map[kube.ResourceKey]int, action func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) { + key := r.ResourceKey() + if visited[key] == 2 { + return + } + visited[key] = 1 + defer func() { + visited[key] = 2 + }() + childKeys, ok := graph[key] + if !ok || childKeys == nil { + return + } + for _, childKey := range childKeys { + child := ns[childKey] + if visited[childKey] == 1 { + _ = action(fmt.Errorf("circular dependency detected. %s is child and parent of %s", childKey.String(), key.String()), child, ns) + } else if visited[childKey] == 0 { + if action(nil, child, ns) { + child.iterateChildrenV2(graph, ns, visited, action) + } + } + } +} diff --git a/pkg/cache/resource_test.go b/pkg/cache/resource_test.go index 45e597341..a3b06a6cc 100644 --- a/pkg/cache/resource_test.go +++ b/pkg/cache/resource_test.go @@ -10,7 +10,7 @@ import ( var cacheTest = NewClusterCache(&rest.Config{}) func TestIsParentOf(t *testing.T) { - child := cacheTest.newResource(mustToUnstructured(testPod())) + child := cacheTest.newResource(mustToUnstructured(testPod1())) parent := cacheTest.newResource(mustToUnstructured(testRS())) grandParent := cacheTest.newResource(mustToUnstructured(testDeploy())) @@ -22,7 +22,7 @@ func TestIsParentOfSameKindDifferentGroupAndUID(t *testing.T) { rs := testRS() rs.APIVersion = "somecrd.io/v1" rs.SetUID("123") - child := cacheTest.newResource(mustToUnstructured(testPod())) + child := cacheTest.newResource(mustToUnstructured(testPod1())) invalidParent := cacheTest.newResource(mustToUnstructured(rs)) assert.False(t, invalidParent.isParentOf(child))