From 4dbf320e2ab7bbdc1ad44022171ce1fd1e7f5e67 Mon Sep 17 00:00:00 2001 From: Sam Dowell Date: Mon, 23 Oct 2023 16:56:07 -0700 Subject: [PATCH] refactor: build resource-group image in repo (#959) This changes the resource-group-controller image to be built alongside the other first party images rather than built separately and imported as a third party image. The ResourceGroup controller is a core component of Config Sync, and this is intended to simplify the maintenance of this component. --- Makefile | 4 +- Makefile.build | 2 + build/all/Dockerfile | 13 +- cmd/resource-group/main.go | 25 + go.mod | 2 +- go.sum | 13 +- manifests/base/kustomization.yaml | 2 +- .../resourcegroup-manifest.yaml | 4 +- .../controllers/handler/crd_event_handler.go | 85 ++++ .../controllers/handler/event_handler.go | 102 ++++ .../controllers/handler/throttler.go | 104 ++++ .../resourcegroup/controllers/log/setup.go | 41 ++ .../controllers/metrics/metrics.go | 90 ++++ .../controllers/metrics/record.go | 125 +++++ .../controllers/metrics/register.go | 48 ++ .../controllers/metrics/tagkeys.go | 47 ++ .../controllers/metrics/views.go | 103 ++++ .../controllers/profiler/profile.go | 48 ++ .../controllers/resourcegroup/condition.go | 85 ++++ .../resourcegroup/resourcegroup_controller.go | 481 ++++++++++++++++++ .../controllers/resourcemap/resourcemap.go | 330 ++++++++++++ .../controllers/root/root_controller.go | 297 +++++++++++ .../resourcegroup/controllers/root/util.go | 40 ++ .../resourcegroup/controllers/runner/run.go | 141 +++++ .../controllers/status/status.go | 147 ++++++ .../controllers/typeresolver/fakeresolver.go | 58 +++ .../controllers/typeresolver/typeresolver.go | 105 ++++ .../controllers/watch/filteredwatcher.go | 352 +++++++++++++ .../controllers/watch/manager.go | 223 ++++++++ .../controllers/watch/watcher.go | 67 +++ .../third_party/k8s.io/client-go/LICENSE | 202 ++++++++ .../k8s.io/client-go/tools/cache/reflector.go | 32 ++ vendor/modules.txt | 16 +- 33 files changed, 3424 insertions(+), 10 deletions(-) create mode 100644 cmd/resource-group/main.go rename manifests/{third_party => templates}/resourcegroup-manifest.yaml (99%) create mode 100644 vendor/kpt.dev/resourcegroup/controllers/handler/crd_event_handler.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/handler/event_handler.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/handler/throttler.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/log/setup.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/metrics/metrics.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/metrics/record.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/metrics/register.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/metrics/tagkeys.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/metrics/views.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/profiler/profile.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/resourcegroup/condition.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/resourcegroup/resourcegroup_controller.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/resourcemap/resourcemap.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/root/root_controller.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/root/util.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/runner/run.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/status/status.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/typeresolver/fakeresolver.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/typeresolver/typeresolver.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/watch/filteredwatcher.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/watch/manager.go create mode 100644 vendor/kpt.dev/resourcegroup/controllers/watch/watcher.go create mode 100644 vendor/kpt.dev/resourcegroup/third_party/k8s.io/client-go/LICENSE create mode 100644 vendor/kpt.dev/resourcegroup/third_party/k8s.io/client-go/tools/cache/reflector.go diff --git a/Makefile b/Makefile index 779a32da75..2de4814f87 100644 --- a/Makefile +++ b/Makefile @@ -125,6 +125,7 @@ OCI_SYNC_IMAGE := oci-sync HELM_SYNC_IMAGE := helm-sync NOMOS_IMAGE := nomos ASKPASS_IMAGE := gcenode-askpass-sidecar +RESOURCE_GROUP_IMAGE := resource-group-controller # List of Config Sync images. Used to generate image-related variables/targets. IMAGES := \ $(RECONCILER_IMAGE) \ @@ -135,7 +136,8 @@ IMAGES := \ $(OCI_SYNC_IMAGE) \ $(HELM_SYNC_IMAGE) \ $(NOMOS_IMAGE) \ - $(ASKPASS_IMAGE) + $(ASKPASS_IMAGE) \ + $(RESOURCE_GROUP_IMAGE) # nomos binary for local run. NOMOS_LOCAL := $(BIN_DIR)/linux_amd64/nomos diff --git a/Makefile.build b/Makefile.build index a1211600f9..d16c302411 100644 --- a/Makefile.build +++ b/Makefile.build @@ -151,6 +151,7 @@ build-manifests-oss: "$(GOBIN)/addlicense" "$(BIN_DIR)/kustomize" $(OUTPUT_DIR) -e "s|HYDRATION_CONTROLLER_IMAGE_NAME|$(call gen_image_tag,$(HYDRATION_CONTROLLER_IMAGE))|g" \ -e "s|RECONCILER_MANAGER_IMAGE_NAME|$(call gen_image_tag,$(RECONCILER_MANAGER_IMAGE))|g" \ -e "s|ASKPASS_IMAGE_NAME|$(call gen_image_tag,$(ASKPASS_IMAGE))|g" \ + -e "s|RESOURCE_GROUP_CONTROLLER_IMAGE_NAME|$(call gen_image_tag,$(RESOURCE_GROUP_IMAGE))|g" \ > $(OSS_MANIFEST_STAGING_DIR)/config-sync-manifest.yaml @ "$(GOBIN)/addlicense" $(OSS_MANIFEST_STAGING_DIR)/config-sync-manifest.yaml @@ -176,6 +177,7 @@ build-manifests-operator: "$(GOBIN)/addlicense" "$(BIN_DIR)/kustomize" $(OUTPUT_ -e "s|RECONCILER_MANAGER_IMAGE_NAME|$(call gen_image_tag,$(RECONCILER_MANAGER_IMAGE))|g" \ -e "s|WEBHOOK_IMAGE_NAME|$(call gen_image_tag,$(ADMISSION_WEBHOOK_IMAGE))|g" \ -e "s|ASKPASS_IMAGE_NAME|$(call gen_image_tag,$(ASKPASS_IMAGE))|g" \ + -e "s|RESOURCE_GROUP_CONTROLLER_IMAGE_NAME|$(call gen_image_tag,$(RESOURCE_GROUP_IMAGE))|g" \ > $(NOMOS_MANIFEST_STAGING_DIR)/config-sync-manifest.yaml @ "$(GOBIN)/addlicense" $(NOMOS_MANIFEST_STAGING_DIR)/config-sync-manifest.yaml diff --git a/build/all/Dockerfile b/build/all/Dockerfile index cb822adc30..7dd016ec1b 100644 --- a/build/all/Dockerfile +++ b/build/all/Dockerfile @@ -37,7 +37,9 @@ RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on \ ./cmd/admission-webhook \ ./cmd/oci-sync \ ./cmd/helm-sync \ - ./cmd/gcenode-askpass-sidecar + ./cmd/gcenode-askpass-sidecar \ + ./cmd/resource-group + # Concatenate vendored licenses into LICENSES.txt # Built in the container to include binary licenses (helm & kustomize) @@ -142,6 +144,15 @@ COPY --from=bins /workspace/LICENSES.txt LICENSES.txt USER nonroot:nonroot ENTRYPOINT ["/gcenode-askpass-sidecar"] +# Resource group controller image +FROM gcr.io/distroless/static:nonroot as resource-group-controller +WORKDIR / +COPY --from=bins /go/bin/resource-group resource-group +COPY --from=bins /workspace/LICENSE LICENSE +COPY --from=bins /workspace/LICENSES.txt LICENSES.txt +USER nonroot:nonroot +ENTRYPOINT ["/resource-group"] + # Nomos image # Not used by Config Sync backend components. Intended for use cases with the # nomos CLI (e.g. containerized CI/CD) diff --git a/cmd/resource-group/main.go b/cmd/resource-group/main.go new file mode 100644 index 0000000000..70172b7ed5 --- /dev/null +++ b/cmd/resource-group/main.go @@ -0,0 +1,25 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "os" + + "kpt.dev/resourcegroup/controllers/runner" +) + +func main() { + os.Exit(runner.Run()) +} diff --git a/go.mod b/go.mod index 3058a7d6dd..aecaffe9a0 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( k8s.io/kubectl v0.26.9 k8s.io/kubernetes v1.26.9 k8s.io/utils v0.0.0-20230115233650-391b47cb4029 - kpt.dev/resourcegroup v0.0.0-20221109031828-db4c3d2c630f + kpt.dev/resourcegroup v0.0.0-20231023223236-7ca71815022b sigs.k8s.io/cli-utils v0.35.0 sigs.k8s.io/controller-runtime v0.14.1 sigs.k8s.io/kind v0.20.0 diff --git a/go.sum b/go.sum index a306b96aa2..3d62d36a2d 100644 --- a/go.sum +++ b/go.sum @@ -135,6 +135,8 @@ github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3Bop github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= +github.com/go-logr/glogr v1.2.2 h1:Jpc1ppf1yWnU+aRGFjIZeE2KfUnc1W4LV9XmndmP0QY= +github.com/go-logr/glogr v1.2.2/go.mod h1:u/37V9lMYDEmbMcbNNpRKnAB5Nof5FgtxhteHXbD3xY= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -149,6 +151,8 @@ github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= +github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -290,7 +294,10 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/ginkgo/v2 v2.7.0 h1:/XxtEV3I3Eif/HobnVx9YmJgk8ENdRsuUmM+fLCFNow= github.com/onsi/ginkgo/v2 v2.7.0/go.mod h1:yjiuMwPokqY1XauOgju45q3sJt6VzQ/Fict1LFVcsAo= github.com/onsi/gomega v1.24.2 h1:J/tulyYK6JwBldPViHJReihxxZ+22FHs0piGjQAvoUE= @@ -644,6 +651,8 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -694,8 +703,8 @@ k8s.io/kubernetes v1.26.9 h1:vdTix+Rh3wbNvbXk/efOeDLX3lng12t1xdsG4rSksmk= k8s.io/kubernetes v1.26.9/go.mod h1:gvP7bsbtu0/cA0ZBJqayLm9lS1PP3WCwrhQOAbpqsK8= k8s.io/utils v0.0.0-20230115233650-391b47cb4029 h1:L8zDtT4jrxj+TaQYD0k8KNlr556WaVQylDXswKmX+dE= k8s.io/utils v0.0.0-20230115233650-391b47cb4029/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -kpt.dev/resourcegroup v0.0.0-20221109031828-db4c3d2c630f h1:uHulkud4a4Z/Wbge4XuDGuywGHrl4l/fSpN8tVYJgBM= -kpt.dev/resourcegroup v0.0.0-20221109031828-db4c3d2c630f/go.mod h1:NUdYG1GBrhTRACE/TOH5Nv/Rf/rZ9ve3IvSJNhcJNfM= +kpt.dev/resourcegroup v0.0.0-20231023223236-7ca71815022b h1:+x8u8PkA7+VUZSw7adSLSzi9uCpsQOv7sUfgUCbsYA0= +kpt.dev/resourcegroup v0.0.0-20231023223236-7ca71815022b/go.mod h1:gRSeoBV3k0ecPbKU8Z8cWQ3JagHSXpNobLqV8h1SrD8= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/manifests/base/kustomization.yaml b/manifests/base/kustomization.yaml index 0f9617917e..fb24a5d783 100644 --- a/manifests/base/kustomization.yaml +++ b/manifests/base/kustomization.yaml @@ -25,4 +25,4 @@ resources: - ../templates/otel-collector.yaml - ../templates/reconciler-manager.yaml - ../templates/reconciler-manager-configmap.yaml -- ../third_party/resourcegroup-manifest.yaml +- ../templates/resourcegroup-manifest.yaml diff --git a/manifests/third_party/resourcegroup-manifest.yaml b/manifests/templates/resourcegroup-manifest.yaml similarity index 99% rename from manifests/third_party/resourcegroup-manifest.yaml rename to manifests/templates/resourcegroup-manifest.yaml index 0f9a3fc455..e3deeeb4f0 100644 --- a/manifests/third_party/resourcegroup-manifest.yaml +++ b/manifests/templates/resourcegroup-manifest.yaml @@ -495,11 +495,11 @@ spec: - --metrics-addr=127.0.0.1:8080 - --enable-leader-election command: - - /manager + - /resource-group env: - name: OC_RESOURCE_LABELS value: k8s.container.name="manager" - image: gcr.io/config-management-release/resource-group-controller:v1.0.20 + image: RESOURCE_GROUP_CONTROLLER_IMAGE_NAME name: manager resources: requests: diff --git a/vendor/kpt.dev/resourcegroup/controllers/handler/crd_event_handler.go b/vendor/kpt.dev/resourcegroup/controllers/handler/crd_event_handler.go new file mode 100644 index 0000000000..1d70e0bea0 --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/handler/crd_event_handler.go @@ -0,0 +1,85 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handler + +import ( + "github.com/go-logr/logr" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/util/workqueue" + "kpt.dev/resourcegroup/apis/kpt.dev/v1alpha1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" +) + +// CRDEventHandler pushes an event to ResourceGroup event channel +// when the CRD or its CRs are contained in some ResourceGroup CRs. +type CRDEventHandler struct { + Mapping resourceMap + Channel chan event.GenericEvent + Log logr.Logger +} + +var _ handler.EventHandler = &CRDEventHandler{} + +// Create implements EventHandler +func (h *CRDEventHandler) Create(e event.CreateEvent, _ workqueue.RateLimitingInterface) { + h.Log.V(5).Info("received a create event") + h.enqueueEvent(e.Object) +} + +// Update implements EventHandler +func (h *CRDEventHandler) Update(e event.UpdateEvent, _ workqueue.RateLimitingInterface) { + h.Log.V(5).Info("received an update event") + h.enqueueEvent(e.ObjectNew) +} + +// Delete implements EventHandler +func (h *CRDEventHandler) Delete(e event.DeleteEvent, _ workqueue.RateLimitingInterface) { + h.Log.V(5).Info("received a delete event") + h.enqueueEvent(e.Object) +} + +// Generic implements EventHandler +func (h *CRDEventHandler) Generic(e event.GenericEvent, _ workqueue.RateLimitingInterface) { + h.Log.V(5).Info("received a generic event") + h.enqueueEvent(e.Object) +} + +func (h *CRDEventHandler) enqueueEvent(obj client.Object) { + crd, ok := obj.(*apiextensionsv1.CustomResourceDefinition) + if !ok { + h.Log.Info("failed to derive a CRD from the event object", "name", obj.GetName()) + return + } + gk := schema.GroupKind{ + Group: crd.Spec.Group, + Kind: crd.Spec.Names.Kind, + } + for _, gknn := range h.Mapping.GetResources(gk) { + // clear the cached status for gknn since the CR status could change + // due to the change of CRD. + h.Log.V(5).Info("reset the cached resource status", "resource", gknn) + h.Mapping.SetStatus(gknn, nil) + for _, r := range h.Mapping.Get(gknn) { + var resgroup = &v1alpha1.ResourceGroup{} + resgroup.SetNamespace(r.Namespace) + resgroup.SetName(r.Name) + h.Log.V(5).Info("send a generic event for", "resourcegroup", resgroup.GetObjectMeta()) + h.Channel <- event.GenericEvent{Object: resgroup} + } + } +} diff --git a/vendor/kpt.dev/resourcegroup/controllers/handler/event_handler.go b/vendor/kpt.dev/resourcegroup/controllers/handler/event_handler.go new file mode 100644 index 0000000000..56250d7e5b --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/handler/event_handler.go @@ -0,0 +1,102 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handler + +import ( + "fmt" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/event" + + "kpt.dev/resourcegroup/apis/kpt.dev/v1alpha1" + "kpt.dev/resourcegroup/controllers/resourcemap" +) + +// resourceMap provides the interface for access the cached resources. +type resourceMap interface { + // Get maps from GKNN -> []RG. It gets the identifiers of ResourceGroup CRs + // that a GKNN is in. + Get(gknn v1alpha1.ObjMetadata) []types.NamespacedName + // GetResources map from GK -> []GKNN. It gets the list of GKNN for + // a given group kind. + GetResources(gk schema.GroupKind) []v1alpha1.ObjMetadata + // SetStatus sets the cached status for the given resource. + SetStatus(res v1alpha1.ObjMetadata, resStatus *resourcemap.CachedStatus) +} + +// EnqueueEventToChannel pushes an event to ResourceGroup event channel +// instead of enqueue a Reqeust for ResourceGroup. +type EnqueueEventToChannel struct { + Mapping resourceMap + Channel chan event.GenericEvent + Log logr.Logger + GVK schema.GroupVersionKind +} + +var _ cache.ResourceEventHandler = &EnqueueEventToChannel{} + +// Create implements EventHandler +func (e *EnqueueEventToChannel) OnAdd(obj interface{}) { + e.Log.V(5).Info("received an add event") + e.enqueueEvent(obj) +} + +// Update implements EventHandler +func (e *EnqueueEventToChannel) OnUpdate(_, newObj interface{}) { + e.Log.V(5).Info("received an update event") + e.enqueueEvent(newObj) +} + +// Delete implements EventHandler +func (e *EnqueueEventToChannel) OnDelete(obj interface{}) { + e.Log.V(5).Info("received a delete event") + e.enqueueEvent(obj) +} + +func (e *EnqueueEventToChannel) enqueueEvent(obj interface{}) { + gknn, err := e.toGKNN(obj) + if err != nil { + e.Log.Error(err, "failed to get GKNN from the received event", "object", obj) + return + } + for _, r := range e.Mapping.Get(gknn) { + var resgroup = &v1alpha1.ResourceGroup{} + resgroup.SetNamespace(r.Namespace) + resgroup.SetName(r.Name) + e.Log.V(5).Info("send a generic event for", "resourcegroup", resgroup.GetObjectMeta()) + e.Channel <- event.GenericEvent{Object: resgroup} + } +} + +func (e EnqueueEventToChannel) toGKNN(obj interface{}) (v1alpha1.ObjMetadata, error) { + metadata, err := meta.Accessor(obj) + if err != nil { + e.Log.Error(err, "missing object meta") + return v1alpha1.ObjMetadata{}, fmt.Errorf("missing object meta: %v", err) + } + gknn := v1alpha1.ObjMetadata{ + Namespace: metadata.GetNamespace(), + Name: metadata.GetName(), + GroupKind: v1alpha1.GroupKind{ + Group: e.GVK.Group, + Kind: e.GVK.Kind, + }, + } + return gknn, nil +} diff --git a/vendor/kpt.dev/resourcegroup/controllers/handler/throttler.go b/vendor/kpt.dev/resourcegroup/controllers/handler/throttler.go new file mode 100644 index 0000000000..1a9e7a9640 --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/handler/throttler.go @@ -0,0 +1,104 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handler + +import ( + "sync" + "time" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// Throttler only pushes a request to ResourceGroup work queue +// when there is no same event in the queue. +// It works with only GenericEvent and no-op for other events. +type Throttler struct { + // lock is for thread safe read/write on mapping + lock sync.RWMutex + + // mapping records which events are to be pushed to the queue. + // If an incoming event can be found in mapping, it is ignored. + mapping map[types.NamespacedName]struct{} + + // duration is the time duration that the event + // is kept in mapping + duration time.Duration +} + +// NewThrottler returns an instance of Throttler +func NewThrottler(d time.Duration) *Throttler { + return &Throttler{ + lock: sync.RWMutex{}, + mapping: make(map[types.NamespacedName]struct{}), + duration: d, + } +} + +// Create implements EventHandler. All create events are ignored. +func (e *Throttler) Create(event.CreateEvent, workqueue.RateLimitingInterface) { +} + +// Update implements EventHandler. All update events are ignored. +func (e *Throttler) Update(event.UpdateEvent, workqueue.RateLimitingInterface) { +} + +// Delete implements EventHandler. All delete events are ignored. +func (e *Throttler) Delete(event.DeleteEvent, workqueue.RateLimitingInterface) { +} + +// Generic implements EventHandler. +// It pushes at most one event for the same object to the queue during duration. +func (e *Throttler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { + if evt.Object == nil { + return + } + r := types.NamespacedName{ + Namespace: evt.Object.GetNamespace(), + Name: evt.Object.GetName(), + } + + e.lock.RLock() + _, found := e.mapping[r] + e.lock.RUnlock() + klog.V(4).Info("received a generic event in the throttler") + // Skip the event if there is already the same event in the queue + if found { + klog.V(4).Info("skip it since there is an event in the throttler") + return + } + + // The following code takes the following action: + // - mark the event is in the queue + // - push the event to the queue after duration + // - after duration, mark the event is not in the queue. + // As a result, there is at most one event pushed to the queue + // for an object during duration. + e.lock.Lock() + e.mapping[r] = struct{}{} + e.lock.Unlock() + + go func() { + time.Sleep(e.duration) + q.Add(reconcile.Request{NamespacedName: r}) + klog.V(4).Infof("add the request to the queue %v", r) + e.lock.Lock() + delete(e.mapping, r) + e.lock.Unlock() + }() +} diff --git a/vendor/kpt.dev/resourcegroup/controllers/log/setup.go b/vendor/kpt.dev/resourcegroup/controllers/log/setup.go new file mode 100644 index 0000000000..74c895837b --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/log/setup.go @@ -0,0 +1,41 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "flag" + + "k8s.io/klog/v2" + "k8s.io/klog/v2/klogr" + ctrl "sigs.k8s.io/controller-runtime" +) + +// InitFlags registers the klog command flags with new defaults. +// Call flag.Parse() afterwards to parse input from the command line. +func InitFlags() { + // Register klog flags + klog.InitFlags(nil) + + // Override klog default values + if err := flag.Set("v", "1"); err != nil { + klog.Fatal(err) + } + if err := flag.Set("logtostderr", "true"); err != nil { + klog.Fatal(err) + } + + // Configure controller-runtime to use klog, via the klogr library + ctrl.SetLogger(klogr.New()) +} diff --git a/vendor/kpt.dev/resourcegroup/controllers/metrics/metrics.go b/vendor/kpt.dev/resourcegroup/controllers/metrics/metrics.go new file mode 100644 index 0000000000..455348de72 --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/metrics/metrics.go @@ -0,0 +1,90 @@ +/* +Copyright 2020 Google LLC. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "go.opencensus.io/stats" +) + +const namespace = "resourcegroup" + +var ( + // ReconcileDuration tracks the time duration in seconds of reconciling + // a ResourceGroup CR by the ResourceGroup controller. + // label `reason`: the `Reason` field of the `Stalled` condition in a ResourceGroup CR. + // reason can be: StartReconciling, FinishReconciling, ComponentFailed, ExceedTimeout. + // This metric should be updated in the ResourceGroup controller. + ReconcileDuration = stats.Float64( + "rg_reconcile_duration_seconds", + "Time duration in seconds of reconciling a ResourceGroup CR by the ResourceGroup controller", + stats.UnitSeconds) + + // ResourceGroupTotal tracks the total number of ResourceGroup CRs in a cluster. + // This metric should be updated in the Root controller. + ResourceGroupTotal = stats.Int64( + "resource_group_total", + "Total number of ResourceGroup CRs in a cluster", + stats.UnitDimensionless) + + // ResourceCount tracks the number of resources in a ResourceGroup CR. + // This metric should be updated in the Root controller. + ResourceCount = stats.Int64( + "resource_count", + "The number of resources in a ResourceGroup CR", + stats.UnitDimensionless) + + // ReadyResourceCount tracks the number of resources with Current status in a ResourceGroup CR. + // This metric should be updated in the ResourceGroup controller. + ReadyResourceCount = stats.Int64( + "ready_resource_count", + "The number of resources with Current status in a ResourceGroup CR", + stats.UnitDimensionless) + + // KCCResourceCount tracks the number of KCC resources in a ResourceGroup CR. + // This metric should be updated in the ResourceGroup controller. + KCCResourceCount = stats.Int64( + "kcc_resource_count", + "The number of KCC resources in a ResourceGroup CR", + stats.UnitDimensionless) + + // NamespaceCount tracks the number of resource namespaces in a ResourceGroup CR. + // This metric should be updated in the Root controller. + NamespaceCount = stats.Int64( + "resource_ns_count", + "The number of resource namespaces in a ResourceGroup CR", + stats.UnitDimensionless) + + // ClusterScopedResourceCount tracks the number of cluster-scoped resources in a ResourceGroup CR. + // This metric should be updated in the Root controller. + ClusterScopedResourceCount = stats.Int64( + "cluster_scoped_resource_count", + "The number of cluster-scoped resources in a ResourceGroup CR", + stats.UnitDimensionless) + + // CRDCount tracks the number of CRDs in a ResourceGroup CR. + // This metric should be updated in the Root controller. + CRDCount = stats.Int64( + "crd_count", + "The number of CRDs in a ResourceGroup CR", + stats.UnitDimensionless) + + // PipelineError tracks the error that happened when syncing a commit + PipelineError = stats.Int64( + "pipeline_error_observed", + "A boolean value indicates if error happened at readiness stage when syncing a commit", + stats.UnitDimensionless) +) diff --git a/vendor/kpt.dev/resourcegroup/controllers/metrics/record.go b/vendor/kpt.dev/resourcegroup/controllers/metrics/record.go new file mode 100644 index 0000000000..77d75b99b1 --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/metrics/record.go @@ -0,0 +1,125 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "context" + "fmt" + "time" + + "go.opencensus.io/stats" + "go.opencensus.io/tag" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" +) + +const ( + // NsReconcilerPrefix is the prefix used for all Namespace reconcilers. + NsReconcilerPrefix = "ns-reconciler" + // RootReconcilerPrefix is the prefix usef for all Root reconcilers. + RootReconcilerPrefix = "root-reconciler" + // RepoSyncName is the expected name of any RepoSync CR. + RepoSyncName = "repo-sync" + // RootSyncName is the expected name of any RootSync CR. + RootSyncName = "root-sync" + + //nolint:gosec // ignore the false-positive alert for G101: Potential hardcoded credentials + // CMSNamespace is the name of the Config Sync controller's namespace + CMSNamespace = "config-management-system" +) + +// RecordReconcileDuration produces a measurement for the ReconcileDuration view. +func RecordReconcileDuration(ctx context.Context, stallStatus string, startTime time.Time) { + tagCtx, _ := tag.New(ctx, tag.Upsert(KeyStallReason, stallStatus)) + measurement := ReconcileDuration.M(time.Since(startTime).Seconds()) + stats.Record(tagCtx, measurement) +} + +// RecordReadyResourceCount produces a measurement for the ReadyResourceCount view. +func RecordReadyResourceCount(ctx context.Context, nn types.NamespacedName, count int64) { + tagCtx, _ := tag.New(ctx, tag.Upsert(KeyResourceGroup, nn.String())) + measurement := ReadyResourceCount.M(count) + stats.Record(tagCtx, measurement) +} + +// RecordKCCResourceCount produces a measurement for the KCCResourceCount view. +func RecordKCCResourceCount(ctx context.Context, nn types.NamespacedName, count int64) { + tagCtx, _ := tag.New(ctx, tag.Upsert(KeyResourceGroup, nn.String())) + measurement := KCCResourceCount.M(count) + stats.Record(tagCtx, measurement) +} + +// RecordResourceCount produces a measurement for the ResourceCount view. +func RecordResourceCount(ctx context.Context, nn types.NamespacedName, count int64) { + tagCtx, _ := tag.New(ctx, tag.Upsert(KeyResourceGroup, nn.String())) + measurement := ResourceCount.M(count) + stats.Record(tagCtx, measurement) +} + +// RecordResourceGroupTotal produces a measurement for the ResourceGroupTotalView +func RecordResourceGroupTotal(ctx context.Context, count int64) { + stats.Record(ctx, ResourceGroupTotal.M(count)) +} + +// RecordNamespaceCount produces a measurement for the NamespaceCount view. +func RecordNamespaceCount(ctx context.Context, nn types.NamespacedName, count int64) { + tagCtx, _ := tag.New(ctx, tag.Upsert(KeyResourceGroup, nn.String())) + measurement := NamespaceCount.M(count) + stats.Record(tagCtx, measurement) +} + +// RecordClusterScopedResourceCount produces a measurement for ClusterScopedResourceCount view +func RecordClusterScopedResourceCount(ctx context.Context, nn types.NamespacedName, count int64) { + tagCtx, _ := tag.New(ctx, tag.Upsert(KeyResourceGroup, nn.String())) + measurement := ClusterScopedResourceCount.M(count) + stats.Record(tagCtx, measurement) +} + +// RecordCRDCount produces a measurement for RecordCRDCount view +func RecordCRDCount(ctx context.Context, nn types.NamespacedName, count int64) { + tagCtx, _ := tag.New(ctx, tag.Upsert(KeyResourceGroup, nn.String())) + measurement := CRDCount.M(count) + stats.Record(tagCtx, measurement) +} + +// RecordPipelineError produces a measurement for PipelineErrorView +func RecordPipelineError(ctx context.Context, nn types.NamespacedName, component string, hasErr bool) { + reconcilerName, reconcilerType := ComputeReconcilerNameType(nn) + tagCtx, _ := tag.New(ctx, tag.Upsert(KeyComponent, component), tag.Upsert(KeyName, reconcilerName), + tag.Upsert(KeyType, reconcilerType)) + var metricVal int64 + if hasErr { + metricVal = 1 + } else { + metricVal = 0 + } + stats.Record(tagCtx, PipelineError.M(metricVal)) + klog.Infof("Recording %s metric at component: %s, namespace: %s, reconciler: %s, sync type: %s with value %v", + PipelineErrorView.Name, component, nn.Namespace, reconcilerName, nn.Name, metricVal) +} + +// ComputeReconcilerName computes the reconciler name from the ResourceGroup CR name +func ComputeReconcilerNameType(nn types.NamespacedName) (reconcilerName, reconcilerType string) { + if nn.Namespace == CMSNamespace { + if nn.Name == RootSyncName { + return RootReconcilerPrefix, RootSyncName + } + return fmt.Sprintf("%s-%s", RootReconcilerPrefix, nn.Name), RootSyncName + } + if nn.Name == RepoSyncName { + return fmt.Sprintf("%s-%s", NsReconcilerPrefix, namespace), RepoSyncName + } + return fmt.Sprintf("%s-%s-%s-%d", NsReconcilerPrefix, namespace, nn.Name, len(nn.Name)), RepoSyncName +} diff --git a/vendor/kpt.dev/resourcegroup/controllers/metrics/register.go b/vendor/kpt.dev/resourcegroup/controllers/metrics/register.go new file mode 100644 index 0000000000..71c8fe306e --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/metrics/register.go @@ -0,0 +1,48 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "contrib.go.opencensus.io/exporter/ocagent" + "go.opencensus.io/stats/view" +) + +// RegisterOCAgentExporter creates the OC Agent metrics exporter. +func RegisterOCAgentExporter() (*ocagent.Exporter, error) { + oce, err := ocagent.NewExporter( + ocagent.WithInsecure(), + ) + if err != nil { + return nil, err + } + + view.RegisterExporter(oce) + return oce, nil +} + +// RegisterReconcilerMetricsViews registers the views so that recorded metrics can be exported in the reconcilers. +func RegisterReconcilerMetricsViews() error { + return view.Register( + ReconcileDurationView, + ResourceGroupTotalView, + ResourceCountView, + ReadyResourceCountView, + NamespaceCountView, + ClusterScopedResourceCountView, + CRDCountView, + KCCResourceCountView, + PipelineErrorView, + ) +} diff --git a/vendor/kpt.dev/resourcegroup/controllers/metrics/tagkeys.go b/vendor/kpt.dev/resourcegroup/controllers/metrics/tagkeys.go new file mode 100644 index 0000000000..18f8298e6f --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/metrics/tagkeys.go @@ -0,0 +1,47 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "go.opencensus.io/tag" +) + +var ( + // KeyStallReason groups metrics by the stall condition reason field + KeyStallReason, _ = tag.NewKey("stallreason") + + // KeyOperation groups metrics by their operation. Possible values: create, patch, update, delete. + KeyOperation, _ = tag.NewKey("operation") + + // KeyErrorCode groups metrics by their error code. + KeyErrorCode, _ = tag.NewKey("errorcode") + + // KeyType groups metrics by their resource reconciler type. Possible values: root-sync, repo-sync + KeyType, _ = tag.NewKey("reconciler") + + // KeyResourceGroup groups metrics by their resource group + KeyResourceGroup, _ = tag.NewKey("resourcegroup") + + // KeyName groups metrics by their name of reconciler. + KeyName, _ = tag.NewKey("name") + + // KeyComponent groups metrics by their component. Possible value: readiness + KeyComponent, _ = tag.NewKey("component") + + // ResourceKeyDeploymentName groups metrics by k8s deployment name. + // This metric tag is populated from the k8s.deployment.name resource + // attribute for Prometheus using the resource_to_telemetry_conversion feature. + ResourceKeyDeploymentName, _ = tag.NewKey("k8s_deployment_name") +) diff --git a/vendor/kpt.dev/resourcegroup/controllers/metrics/views.go b/vendor/kpt.dev/resourcegroup/controllers/metrics/views.go new file mode 100644 index 0000000000..7ef4dfc379 --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/metrics/views.go @@ -0,0 +1,103 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +var ( + // ReconcileDurationView aggregates the ReconcileDuration metric measurements. + ReconcileDurationView = &view.View{ + Name: ReconcileDuration.Name(), + Measure: ReconcileDuration, + Description: "The distribution of time taken to reconcile a ResourceGroup CR", + TagKeys: []tag.Key{KeyStallReason}, + Aggregation: view.Distribution(0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10), + } + + // ResourceGroupTotalView aggregates the ResourceGroupTotal metric measurements. + ResourceGroupTotalView = &view.View{ + Name: ResourceGroupTotal.Name(), + Measure: ResourceGroupTotal, + Description: "The current number of ResourceGroup CRs", + Aggregation: view.LastValue(), + } + + // ResourceCountView aggregates the ResourceCount metric measurements. + ResourceCountView = &view.View{ + Name: ResourceCount.Name(), + Measure: ResourceCount, + Description: "The total number of resources tracked by a ResourceGroup", + TagKeys: []tag.Key{KeyResourceGroup}, + Aggregation: view.LastValue(), + } + + // ReadyResourceCountView aggregates the resources ready in a ResourceGroup + ReadyResourceCountView = &view.View{ + Name: ReadyResourceCount.Name(), + Measure: ReadyResourceCount, + Description: "The total number of ready resources in a ResourceGroup", + TagKeys: []tag.Key{KeyResourceGroup}, + Aggregation: view.LastValue(), + } + + // NamespaceCountView counts number of namespaces in a ResourceGroup + NamespaceCountView = &view.View{ + Name: NamespaceCount.Name(), + Measure: NamespaceCount, + Description: "The number of namespaces used by resources in a ResourceGroup", + TagKeys: []tag.Key{KeyResourceGroup}, + Aggregation: view.LastValue(), + } + + // ClusterScopedResourceCountView counts number of namespaces in a ResourceGroup + ClusterScopedResourceCountView = &view.View{ + Name: ClusterScopedResourceCount.Name(), + Measure: ClusterScopedResourceCount, + Description: "The number of cluster scoped resources in a ResourceGroup", + TagKeys: []tag.Key{KeyResourceGroup}, + Aggregation: view.LastValue(), + } + + // CRDCountView counts number of namespaces in a ResourceGroup + CRDCountView = &view.View{ + Name: CRDCount.Name(), + Measure: CRDCount, + Description: "The number of CRDs in a ResourceGroup", + TagKeys: []tag.Key{KeyResourceGroup}, + Aggregation: view.LastValue(), + } + + // KCCResourceCountView aggregates the KCC resources in a ResourceGroup + KCCResourceCountView = &view.View{ + Name: KCCResourceCount.Name(), + Measure: KCCResourceCount, + Description: "The total number of KCC resources in a ResourceGroup", + TagKeys: []tag.Key{KeyResourceGroup}, + Aggregation: view.LastValue(), + } + + // PipelineErrorView aggregates the PipelineError by components + // TODO: add link to same metric in Config Sync under pkg/metrics/views.go + PipelineErrorView = &view.View{ + Name: PipelineError.Name(), + Measure: PipelineError, + Description: "A boolean value indicates if error happened from different stages when syncing a commit", + TagKeys: []tag.Key{KeyName, KeyComponent, KeyType}, + Aggregation: view.LastValue(), + } +) diff --git a/vendor/kpt.dev/resourcegroup/controllers/profiler/profile.go b/vendor/kpt.dev/resourcegroup/controllers/profiler/profile.go new file mode 100644 index 0000000000..4e14c5b845 --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/profiler/profile.go @@ -0,0 +1,48 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package profiler + +import ( + "flag" + "fmt" + "net/http" + + //nolint:gosec // pprof init() registers handlers, which we serve with ListenAndServe + _ "net/http/pprof" + "time" + + "k8s.io/klog/v2" +) + +var enableProfiler = flag.Bool("enable-pprof", false, "enable pprof profiling") +var profilerPort = flag.Int("pprof-port", 6060, "port for pprof profiling. defaulted to 6060 if unspecified") + +// Service starts the profiler http endpoint if --enable-pprof flag is passed +func Service() { + if *enableProfiler { + go func() { + klog.Infof("Starting profiling on port %d", *profilerPort) + addr := fmt.Sprintf(":%d", *profilerPort) + server := &http.Server{ + Addr: addr, + ReadHeaderTimeout: 30 * time.Second, + } + err := server.ListenAndServe() + if err != nil { + klog.Fatalf("Profiler server failed to start: %+v", err) + } + }() + } +} diff --git a/vendor/kpt.dev/resourcegroup/controllers/resourcegroup/condition.go b/vendor/kpt.dev/resourcegroup/controllers/resourcegroup/condition.go new file mode 100644 index 0000000000..b0a97e0f8e --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/resourcegroup/condition.go @@ -0,0 +1,85 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resourcegroup + +import ( + "sort" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "kpt.dev/resourcegroup/apis/kpt.dev/v1alpha1" +) + +func newReconcilingCondition(status v1alpha1.ConditionStatus, reason, message string) v1alpha1.Condition { + return v1alpha1.Condition{ + Type: v1alpha1.Reconciling, + Status: status, + Reason: reason, + Message: message, + LastTransitionTime: metav1.Time{Time: time.Now().UTC()}, + } +} + +func newStalledCondition(status v1alpha1.ConditionStatus, reason, message string) v1alpha1.Condition { + return v1alpha1.Condition{ + Type: v1alpha1.Stalled, + Status: status, + Reason: reason, + Message: message, + LastTransitionTime: metav1.Time{Time: time.Now().UTC()}, + } +} + +// adjustConditionOrder adjusts the order of the conditions to make sure that +// the first condition in the slice is Reconciling; +// the second condition in the slice is Stalled; +// the remaining conditions are sorted alphabetically according their types. +// +// Returns: +// - a new slice of conditions including the ordered conditions. +// +// The +kubebuilder:printcolumn markers on the ResourceGroup struct expect the type of the first +// Condition in the slice to be Reconciling, and the type of the second Condition to be Stalled. +func adjustConditionOrder(conditions []v1alpha1.Condition) []v1alpha1.Condition { + var reconciling, stalled v1alpha1.Condition + var others []v1alpha1.Condition + for _, cond := range conditions { + switch cond.Type { + case v1alpha1.Reconciling: + reconciling = cond + case v1alpha1.Stalled: + stalled = cond + default: + others = append(others, cond) + } + } + + // sort the conditions in `others` + sort.Slice(others, func(i, j int) bool { + return others[i].Type < others[j].Type + }) + + if reconciling.IsEmpty() { + reconciling = newReconcilingCondition(v1alpha1.UnknownConditionStatus, "", "") + } + if stalled.IsEmpty() { + stalled = newStalledCondition(v1alpha1.UnknownConditionStatus, "", "") + } + + var result []v1alpha1.Condition + result = append(result, reconciling, stalled) + result = append(result, others...) + return result +} diff --git a/vendor/kpt.dev/resourcegroup/controllers/resourcegroup/resourcegroup_controller.go b/vendor/kpt.dev/resourcegroup/controllers/resourcegroup/resourcegroup_controller.go new file mode 100644 index 0000000000..a9b01322d0 --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/resourcegroup/resourcegroup_controller.go @@ -0,0 +1,481 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resourcegroup + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/go-logr/logr" + apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/cli-utils/pkg/common" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + "kpt.dev/resourcegroup/apis/kpt.dev/v1alpha1" + "kpt.dev/resourcegroup/controllers/handler" + "kpt.dev/resourcegroup/controllers/metrics" + "kpt.dev/resourcegroup/controllers/resourcemap" + controllerstatus "kpt.dev/resourcegroup/controllers/status" + "kpt.dev/resourcegroup/controllers/typeresolver" +) + +const ( + StartReconciling = "StartReconciling" + startReconcilingMsg = "start reconciling" + FinishReconciling = "FinishReconciling" + finishReconcilingMsg = "finish reconciling" + ComponentFailed = "ComponentFailed" + componentFailedMsgPrefix = "The following components failed:" + ExceedTimeout = "ExceedTimeout" + exceedTimeoutMsg = "Exceed timeout, the .status.observedGeneration and .status.resourceStatuses fields are old." + owningInventoryKey = "config.k8s.io/owning-inventory" + SourceHashAnnotationKey = "configmanagement.gke.io/token" + readinessComponent = "readiness" +) + +// contextKey is a custom type for wrapping context values to make them unique +// to this package +type contextKey string + +const contextLoggerKey = contextKey("logger") + +var DefaultDuration = 30 * time.Second + +// reconciler reconciles a ResourceGroup object +type reconciler struct { + // Client is to get and update ResourceGroup object. + client.Client + + // log is the logger of the reconciler. + log logr.Logger + + // TODO: check if scheme is needed + scheme *runtime.Scheme + + // resolver is the type resolver to find the server preferred + // GVK for a GK + resolver *typeresolver.TypeResolver + + // resMap is the resourcemap for storing the resource status and conditions. + resMap *resourcemap.ResourceMap +} + +// +kubebuilder:rbac:groups=kpt.dev,resources=resourcegroups,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=kpt.dev,resources=resourcegroups/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=*,resources=*,verbs=get;list;watch + +func (r *reconciler) Reconcile(c context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := r.log + ctx := context.WithValue(c, contextLoggerKey, logger) + logger.Info("starts reconciling") + return r.reconcileKpt(ctx, req, logger) +} + +func (r *reconciler) reconcileKpt(ctx context.Context, req ctrl.Request, logger logr.Logger) (ctrl.Result, error) { + // obtain the ResourceGroup CR + resgroup := &v1alpha1.ResourceGroup{} + err := r.Get(ctx, req.NamespacedName, resgroup) + if err != nil { + if errors.IsNotFound(err) { + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + // ResourceGroup is in the process of being deleted + if resgroup.DeletionTimestamp != nil { + return ctrl.Result{}, nil + } + + newStatus := r.startReconcilingStatus(resgroup.Status) + if err := r.updateStatusKptGroup(ctx, resgroup, newStatus); err != nil { + logger.Error(err, "failed to update") + return ctrl.Result{Requeue: true}, err + } + + id := getInventoryID(resgroup.Labels) + newStatus = r.endReconcilingStatus(ctx, id, req.NamespacedName, resgroup.Spec, resgroup.Status, resgroup.Generation) + if err := r.updateStatusKptGroup(ctx, resgroup, newStatus); err != nil { + logger.Error(err, "failed to update") + return ctrl.Result{Requeue: true}, err + } + + logger.Info("finished reconciling") + return ctrl.Result{}, nil +} + +// currentStatusCount counts the number of `Current` statuses. +func currentStatusCount(statuses []v1alpha1.ResourceStatus) int { + count := 0 + for _, status := range statuses { + if status.Status == v1alpha1.Current { + count++ + } + } + return count +} + +// updateResourceMetrics updates the resource metrics. +func updateResourceMetrics(ctx context.Context, nn types.NamespacedName, statuses []v1alpha1.ResourceStatus) { + metrics.RecordReadyResourceCount(ctx, nn, int64(currentStatusCount(statuses))) + metrics.RecordResourceCount(ctx, nn, int64(len(statuses))) + namespaces := make(map[string]bool) + clusterScopedCount := 0 + kccCount := 0 + crds := make(map[string]bool) + for _, res := range statuses { + namespaces[res.Namespace] = true + if res.Namespace == "" { + clusterScopedCount++ + } + if res.Kind == "CustomResourceDefinition" { + crds[res.Name] = true + } + if strings.Contains(res.Group, "cnrm.cloud.google.com") { + kccCount++ + } + } + metrics.RecordNamespaceCount(ctx, nn, int64(len(namespaces))) + metrics.RecordClusterScopedResourceCount(ctx, nn, int64(clusterScopedCount)) + metrics.RecordCRDCount(ctx, nn, int64(len(crds))) + metrics.RecordKCCResourceCount(ctx, nn, int64(kccCount)) +} + +func (r *reconciler) updateStatusKptGroup(ctx context.Context, resgroup *v1alpha1.ResourceGroup, newStatus v1alpha1.ResourceGroupStatus) error { + newStatus.Conditions = adjustConditionOrder(newStatus.Conditions) + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + if apiequality.Semantic.DeepEqual(resgroup.Status, newStatus) { + return nil + } + resgroup.Status = newStatus + // Use `r.Status().Update()` here instead of `r.Update()` to update only resgroup.Status. + return r.Status().Update(ctx, resgroup) + }) +} + +func (r *reconciler) startReconcilingStatus(status v1alpha1.ResourceGroupStatus) v1alpha1.ResourceGroupStatus { + newStatus := v1alpha1.ResourceGroupStatus{ + ObservedGeneration: status.ObservedGeneration, + ResourceStatuses: status.ResourceStatuses, + SubgroupStatuses: status.SubgroupStatuses, + Conditions: []v1alpha1.Condition{ + newReconcilingCondition(v1alpha1.TrueConditionStatus, StartReconciling, startReconcilingMsg), + newStalledCondition(v1alpha1.FalseConditionStatus, "", ""), + }, + } + return newStatus +} + +func (r *reconciler) endReconcilingStatus( + ctx context.Context, + id string, + namespacedName types.NamespacedName, + spec v1alpha1.ResourceGroupSpec, + status v1alpha1.ResourceGroupStatus, + generation int64, +) v1alpha1.ResourceGroupStatus { + // reset newStatus to make sure the former setting of newStatus does not carry over + newStatus := v1alpha1.ResourceGroupStatus{} + startTime := time.Now() + reconcileTimeout := getReconcileTimeOut(len(spec.Subgroups) + len(spec.Resources)) + + finish := make(chan struct{}) + go func() { + newStatus.ResourceStatuses = r.computeResourceStatuses(ctx, id, status, spec.Resources, namespacedName) + newStatus.SubgroupStatuses = r.computeSubGroupStatuses(ctx, id, status, spec.Subgroups, namespacedName) + close(finish) + }() + select { + case <-finish: + newStatus.ObservedGeneration = generation + newStatus.Conditions = []v1alpha1.Condition{ + newReconcilingCondition(v1alpha1.FalseConditionStatus, FinishReconciling, finishReconcilingMsg), + aggregateResourceStatuses(newStatus.ResourceStatuses), + } + case <-time.After(reconcileTimeout): + newStatus.ObservedGeneration = status.ObservedGeneration + newStatus.ResourceStatuses = status.ResourceStatuses + newStatus.SubgroupStatuses = status.SubgroupStatuses + newStatus.Conditions = []v1alpha1.Condition{ + newReconcilingCondition(v1alpha1.FalseConditionStatus, ExceedTimeout, exceedTimeoutMsg), + newStalledCondition(v1alpha1.TrueConditionStatus, ExceedTimeout, exceedTimeoutMsg), + } + } + + metrics.RecordReconcileDuration(ctx, newStatus.Conditions[1].Reason, startTime) + updateResourceMetrics(ctx, namespacedName, newStatus.ResourceStatuses) + return newStatus +} + +func (r *reconciler) computeResourceStatuses( + ctx context.Context, + id string, + existingStatus v1alpha1.ResourceGroupStatus, + metas []v1alpha1.ObjMetadata, + nn types.NamespacedName, +) []v1alpha1.ResourceStatus { + return r.computeStatus(ctx, id, existingStatus, metas, nn, true) +} + +func (r *reconciler) computeSubGroupStatuses( + ctx context.Context, + id string, + existingStatus v1alpha1.ResourceGroupStatus, + groupMetas []v1alpha1.GroupMetadata, + nn types.NamespacedName, +) []v1alpha1.GroupStatus { + objMetadata := v1alpha1.ToObjMetadata(groupMetas) + statuses := r.computeStatus(ctx, id, existingStatus, objMetadata, nn, false) + return v1alpha1.ToGroupStatuses(statuses) +} + +// getResourceStatus returns a map of resource statuses indexed on the resource's object meta. +func (r *reconciler) getResourceStatus(status v1alpha1.ResourceGroupStatus) map[v1alpha1.ObjMetadata]v1alpha1.ResourceStatus { + statuses := make(map[v1alpha1.ObjMetadata]v1alpha1.ResourceStatus, len(status.ResourceStatuses)) + + for _, res := range status.ResourceStatuses { + statuses[res.ObjMetadata] = res + } + + return statuses +} + +// isResource flag indicates if the compute is for resource when true. If false, the compute is for SubGroup (or others). +func (r *reconciler) computeStatus( + ctx context.Context, + id string, + existingStatus v1alpha1.ResourceGroupStatus, + metas []v1alpha1.ObjMetadata, + nn types.NamespacedName, + isResource bool, +) []v1alpha1.ResourceStatus { + actuationStatuses := r.getResourceStatus(existingStatus) + statuses := []v1alpha1.ResourceStatus{} + hasErr := false + for _, res := range metas { + resStatus := v1alpha1.ResourceStatus{ + ObjMetadata: res, + } + + cachedStatus := r.resMap.GetStatus(res) + + // Add status to cache, if not present. + switch { + case cachedStatus != nil: + r.log.V(4).Info("found the cached resource status for", "namespace", res.Namespace, "name", res.Name) + setResStatus(id, &resStatus, cachedStatus) + default: + resObj := new(unstructured.Unstructured) + gvk, gvkFound := r.resolver.Resolve(schema.GroupKind(res.GroupKind)) + if !gvkFound { + // If the resolver cache does not contain the server preferred GVK, then GVK returned + // will be empty resulting in a GET error. An instance of this occurring is when the + // resource type (CRD) does not exist. + r.log.V(4).Info("unable to get object from API server to compute status as resource does not exist", "namespace", res.Namespace, "name", res.Name) + resStatus.Status = v1alpha1.NotFound + break + } + resObj.SetGroupVersionKind(gvk) + r.log.Info("get the object from API server to compute status for", "namespace", res.Namespace, "name", res.Name) + err := r.Get(ctx, types.NamespacedName{ + Namespace: res.Namespace, + Name: res.Name, + }, resObj) + if err != nil { + if errors.IsNotFound(err) || meta.IsNoMatchError(err) { + resStatus.Status = v1alpha1.NotFound + } else { + resStatus.Status = v1alpha1.Unknown + } + r.log.V(4).Error(err, "unable to get object from API server to compute status", "namespace", res.Namespace, "name", res.Name) + + break // Breaks out of switch statement, not the loop. + } + // get the resource status using the kstatus library + cachedStatus = controllerstatus.ComputeStatus(resObj) + // save the computed status and condition in memory. + r.resMap.SetStatus(res, cachedStatus) + // Update the new resource status. + setResStatus(id, &resStatus, cachedStatus) + } + + if resStatus.Status == v1alpha1.Failed || controllerstatus.IsCNRMResource(resStatus.Group) && resStatus.Status != v1alpha1.Current { + hasErr = true + } + + // Update the legacy status field based on the actuation, strategy and reconcile + // statuses set by cli-utils. If the actuation is not successful, update the legacy + // status field to be of unknown status. + aStatus, exists := actuationStatuses[resStatus.ObjMetadata] + if exists { + resStatus.Actuation = aStatus.Actuation + resStatus.Strategy = aStatus.Strategy + resStatus.Reconcile = aStatus.Reconcile + + resStatus.Status = ActuationStatusToLegacy(resStatus) + } + + // add the resource status into resgroup + statuses = append(statuses, resStatus) + } + // Only record pipeline_error_observed when computing status for resources + // Since the subgroup is always 0 and can overwrite the resource status + if isResource { + metrics.RecordPipelineError(ctx, nn, readinessComponent, hasErr) + } + return statuses +} + +// ActuationStatusToLegacy contains the logic/rules to convert from the actuation statuses +// to the legacy status field. If conversion is not needed, the original status field is returned +// instead. +func ActuationStatusToLegacy(s v1alpha1.ResourceStatus) v1alpha1.Status { + if s.Status == v1alpha1.NotFound { + return v1alpha1.NotFound + } + + if s.Actuation != "" && + s.Actuation != v1alpha1.ActuationSucceeded { + return v1alpha1.Unknown + } + + if s.Actuation == v1alpha1.ActuationSucceeded && s.Reconcile == v1alpha1.ReconcileSucceeded { + return v1alpha1.Current + } + return s.Status +} + +// setResStatus updates a resource status struct using values within the cached status struct. +func setResStatus(id string, resStatus *v1alpha1.ResourceStatus, cachedStatus *resourcemap.CachedStatus) { + resStatus.Status = cachedStatus.Status + resStatus.Conditions = make([]v1alpha1.Condition, len(cachedStatus.Conditions)) + copy(resStatus.Conditions, cachedStatus.Conditions) + resStatus.SourceHash = cachedStatus.SourceHash + cond := ownershipCondition(id, cachedStatus.InventoryID) + if cond != nil { + resStatus.Conditions = append(resStatus.Conditions, *cond) + } +} + +func aggregateResourceStatuses(statuses []v1alpha1.ResourceStatus) v1alpha1.Condition { + failedResources := []string{} + for _, status := range statuses { + if status.Status == v1alpha1.Failed { + res := status.ObjMetadata + resStr := fmt.Sprintf("%s/%s/%s/%s", res.Group, res.Kind, res.Namespace, res.Name) + failedResources = append(failedResources, resStr) + } + } + if len(failedResources) > 0 { + return newStalledCondition(v1alpha1.TrueConditionStatus, ComponentFailed, + componentFailedMsgPrefix+strings.Join(failedResources, ", ")) + } + return newStalledCondition(v1alpha1.FalseConditionStatus, FinishReconciling, finishReconcilingMsg) +} + +func NewRGController(mgr ctrl.Manager, channel chan event.GenericEvent, logger logr.Logger, + resolver *typeresolver.TypeResolver, resMap *resourcemap.ResourceMap, duration time.Duration) error { + r := &reconciler{ + Client: mgr.GetClient(), + log: logger, + scheme: mgr.GetScheme(), + resolver: resolver, + resMap: resMap, + } + + c, err := controller.New(v1alpha1.ResourceGroupKind, mgr, controller.Options{ + Reconciler: reconcile.Func(r.Reconcile), + }) + + if err != nil { + return err + } + + err = c.Watch(&source.Channel{Source: channel}, handler.NewThrottler(duration)) + if err != nil { + return err + } + return nil +} + +func getInventoryID(labels map[string]string) string { + if len(labels) == 0 { + return "" + } + return labels[common.InventoryLabel] +} + +func getSourceHash(annotations map[string]string) string { + if len(annotations) == 0 { + return "" + } + return annotations[SourceHashAnnotationKey] +} + +func ownershipCondition(id, inv string) *v1alpha1.Condition { + if id == inv { + return nil + } + c := &v1alpha1.Condition{ + Type: v1alpha1.Ownership, + LastTransitionTime: metav1.Now(), + } + if inv != "" { + c.Status = v1alpha1.TrueConditionStatus + c.Reason = v1alpha1.OwnershipUnmatch + c.Message = fmt.Sprintf("This resource is owned by another ResourceGroup %s. "+ + "The status only reflects the specification for the current object in ResourceGroup %s.", inv, inv) + } else { + c.Status = v1alpha1.UnknownConditionStatus + c.Reason = v1alpha1.OwnershipEmpty + c.Message = "This object is not owned by any inventory object. " + + "The status for the current object may not reflect the specification for it in current ResourceGroup." + } + return c +} + +// maxTimeout is the largest timeout it may take +// to reconcile a ResourceGroup CR with the assumption +// that there are at most 5000 resources GKNN inside the +// ResourceGroup CR. +var maxTimeout = 5 * time.Minute + +// getReconcileTimeOut returns the timeout based on how many resources +// that are listed in the ResourceGroup spec. +// The rule for setting the timeout is that every 500 resources +// get 30 seconds. +func getReconcileTimeOut(count int) time.Duration { + q := count/500 + 1 + timeout := time.Duration(q) * 30 * time.Second + if timeout > maxTimeout { + return maxTimeout + } + return timeout +} diff --git a/vendor/kpt.dev/resourcegroup/controllers/resourcemap/resourcemap.go b/vendor/kpt.dev/resourcegroup/controllers/resourcemap/resourcemap.go new file mode 100644 index 0000000000..1ad164812d --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/resourcemap/resourcemap.go @@ -0,0 +1,330 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resourcemap + +import ( + "context" + "sync" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + + "kpt.dev/resourcegroup/apis/kpt.dev/v1alpha1" + "kpt.dev/resourcegroup/controllers/metrics" +) + +type resource = v1alpha1.ObjMetadata + +// resourceSet includes a set of resources +type resourceSet struct { + // Define this as a set to make it efficient to check whether a given resource is included + data map[resource]struct{} +} + +// Add adds res into resourceSet +func (s *resourceSet) Add(res resource) { + s.data[res] = struct{}{} +} + +// Remove removes res from resourceSet +func (s *resourceSet) Remove(res resource) { + delete(s.data, res) +} + +// Has checks whether res is in resourceSet +func (s *resourceSet) Has(res resource) bool { + _, ok := s.data[res] + return ok +} + +// Len returns the length of resourceSet +func (s *resourceSet) Len() int { + return len(s.data) +} + +// ToSlice converts a resourceSet into a slice +func (s *resourceSet) toSlice() []resource { + result := []resource{} + for k := range s.data { + result = append(result, k) + } + return result +} + +// newresourceSet initializes a resourceSet with a list of resources +func newresourceSet(resources []resource) *resourceSet { + result := &resourceSet{data: make(map[resource]struct{})} + for _, res := range resources { + result.Add(res) + } + return result +} + +// resourceGroupSet includes a set of resource group names +type resourceGroupSet struct { + // Define this as a set to make it efficient to check whether a given resource group is included + data map[types.NamespacedName]struct{} +} + +// Add adds a group into the resourceGroupSet +func (s *resourceGroupSet) Add(group types.NamespacedName) { + s.data[group] = struct{}{} +} + +// Remove removes a group from the resourceGroupSet +func (s *resourceGroupSet) Remove(group types.NamespacedName) { + delete(s.data, group) +} + +// Has checks whether a group is in the resourceGroupSet +func (s *resourceGroupSet) Has(group types.NamespacedName) bool { + _, ok := s.data[group] + return ok +} + +// Len returns the length of the resourceGroupSet +func (s *resourceGroupSet) Len() int { + return len(s.data) +} + +func (s *resourceGroupSet) toSlice() []types.NamespacedName { + result := []types.NamespacedName{} + for group := range s.data { + result = append(result, group) + } + return result +} + +// newresourceGroupSet initializes a resourceGroupSet with a list of resource groups +func newresourceGroupSet(groups []types.NamespacedName) *resourceGroupSet { + result := &resourceGroupSet{data: make(map[types.NamespacedName]struct{})} + for _, group := range groups { + result.Add(group) + } + return result +} + +// CachedStatus stores the status and condition for one resource. +type CachedStatus struct { + Status v1alpha1.Status + Conditions []v1alpha1.Condition + SourceHash string + InventoryID string +} + +// ResourceMap maintains the following maps: +// 1) resToResgroups maps a resource to all the resource groups including it +// 2) resgroupToResources maps a resource group to its resource set +// 3) resToStatus maps a resource to its cached status +// 4) gkToResources maps a GroupKind to its resource set +// During the reconciliation of a RG in the root controller, the updates to these two maps should be atomic. +type ResourceMap struct { + // use a lock to make sure that updating resToResgroups and resgroupToResources is atomic + lock sync.RWMutex + // resToResgroups maps a resource to all the resource groups including it. + resToResgroups map[resource]*resourceGroupSet + // resToStatus maps a resource to its reconciliation status and conditions. + resToStatus map[resource]*CachedStatus + // resgroupToResources maps a resource group to its resource set + resgroupToResources map[types.NamespacedName]*resourceSet + // gkToResources maps a GroupKind to its resource set + gkToResources map[schema.GroupKind]*resourceSet +} + +// Reconcile takes a resourcegroup name and all the resources belonging to it, and +// updates the resToResgroups map and resgroupToResources map atomically. The updates include: +// 1) calculate the diff between resgroupToResources[`group`] and `resources`; +// 2) update resToResgroups and gkToResources using the diff; +// - for resources only in resgroupToResources[`group`], remove from resToResgroups and gkToResources; +// - for resources only in `resources`, add into resToResgroups and gkToResources; +// - for resources in both resgroupToResources[`group`] and `resources`, do nothing. +// +// 3) set resgroupToResources[group] to resources, or delete group from resgroupToResources if `resources` is empty. +// +// Returns: +// +// a slice of GroupKinds that should be watched +// +// To set the resources managed by a RG to be empty, call ResourceMap.Reconcile(group, []resource{}, false). +// +// To delete a RG, call ResourceMap.Reconcile(group, []resource{}, true). +func (m *ResourceMap) Reconcile(ctx context.Context, group types.NamespacedName, resources []resource, deleteRG bool) []schema.GroupKind { + if deleteRG { + resources = []resource{} + } + // calculate the diff between resgroupToResources[`group`] and `resources` + oldResources := []resource{} + if v, ok := m.resgroupToResources[group]; ok { + oldResources = v.toSlice() + } + toAdd, toDelete := diffResources(oldResources, resources) + + // use a lock to make sure that updating resToResgroups and resgroupToResources is atomic + m.lock.Lock() + defer m.lock.Unlock() + + for _, res := range toDelete { + if groups, ok := m.resToResgroups[res]; ok { + groups.Remove(group) + if groups.Len() == 0 { + // delete res from m.resToResgroups if no resource group includes it any more + delete(m.resToResgroups, res) + delete(m.resToStatus, res) + + // delete res from m.gkToResources[gk] if no resource group includes it any more + gk := res.GK() + m.gkToResources[gk].Remove(res) + + if m.gkToResources[gk].Len() == 0 { + // delete gk from m.gkToResources if there is no resource group includes a gk resource + delete(m.gkToResources, gk) + } + } + } + } + + for _, res := range toAdd { + if groups, ok := m.resToResgroups[res]; ok { + groups.Add(group) + } else { + m.resToResgroups[res] = newresourceGroupSet([]types.NamespacedName{group}) + + gk := res.GK() + // add res to m.gkToResources[gk] + if gkResources, ok := m.gkToResources[gk]; !ok { + m.gkToResources[gk] = newresourceSet([]resource{res}) + } else { + gkResources.Add(res) + } + } + } + + gks := map[schema.GroupKind]bool{} + for res := range m.resToResgroups { + gk := res.GK() + gks[gk] = true + } + + if len(resources) == 0 && deleteRG { + delete(m.resgroupToResources, group) + } else { + m.resgroupToResources[group] = newresourceSet(resources) + } + + metrics.RecordResourceGroupTotal(ctx, int64(len(m.resgroupToResources))) + var gkSlice []schema.GroupKind + for gk := range gks { + gkSlice = append(gkSlice, gk) + } + return gkSlice +} + +// diffResources calculate the diff between two sets of resources. +func diffResources(oldResources, newResources []resource) (toAdd, toDelete []resource) { + newSet := newresourceSet(newResources) + for _, res := range oldResources { + if !newSet.Has(res) { + toDelete = append(toDelete, res) + } + } + + oldSet := newresourceSet(oldResources) + for _, res := range newResources { + if !oldSet.Has(res) { + toAdd = append(toAdd, res) + } + } + return toAdd, toDelete +} + +// HasResource checks whether a resource is in the ResourceMap +func (m *ResourceMap) HasResource(res resource) bool { + m.lock.RLock() + defer m.lock.RUnlock() + _, ok := m.resToResgroups[res] + return ok +} + +// HasResgroup checks whether a resourcegroup is in the ResourceMap +func (m *ResourceMap) HasResgroup(group types.NamespacedName) bool { + m.lock.RLock() + defer m.lock.RUnlock() + _, ok := m.resgroupToResources[group] + return ok +} + +// Get returns the resourceGroupSet for res +func (m *ResourceMap) Get(res resource) []types.NamespacedName { + m.lock.RLock() + defer m.lock.RUnlock() + if v, ok := m.resToResgroups[res]; ok { + return v.toSlice() + } + return []types.NamespacedName{} +} + +// GetStatus returns the cached status for the input resource. +func (m *ResourceMap) GetStatus(res resource) *CachedStatus { + m.lock.RLock() + defer m.lock.RUnlock() + if v, ok := m.resToStatus[res]; ok { + return v + } + return nil +} + +// GetStatusMap returns the map from resources to their status. +func (m *ResourceMap) GetStatusMap() map[resource]*CachedStatus { + return m.resToStatus +} + +// SetStatus sets the status and conditions for a resource. +func (m *ResourceMap) SetStatus(res resource, resStatus *CachedStatus) { + m.lock.Lock() + defer m.lock.Unlock() + m.resToStatus[res] = resStatus +} + +// GetResources get the set of resources for the given group kind. +func (m *ResourceMap) GetResources(gk schema.GroupKind) []resource { + m.lock.Lock() + defer m.lock.Unlock() + set := m.gkToResources[gk] + if set == nil { + return nil + } + resources := make([]resource, len(set.data)) + i := 0 + for r := range set.data { + resources[i] = r + i++ + } + return resources +} + +// IsEmpty checks whether the ResourceMap is empty +func (m *ResourceMap) IsEmpty() bool { + return len(m.resgroupToResources) == 0 && len(m.resToResgroups) == 0 +} + +// NewResourceMap initializes an empty ReverseMap +func NewResourceMap() *ResourceMap { + return &ResourceMap{ + resToResgroups: make(map[resource]*resourceGroupSet), + resToStatus: make(map[resource]*CachedStatus), + resgroupToResources: make(map[types.NamespacedName]*resourceSet), + gkToResources: make(map[schema.GroupKind]*resourceSet), + } +} diff --git a/vendor/kpt.dev/resourcegroup/controllers/root/root_controller.go b/vendor/kpt.dev/resourcegroup/controllers/root/root_controller.go new file mode 100644 index 0000000000..b2e3d22c47 --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/root/root_controller.go @@ -0,0 +1,297 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root + +import ( + "context" + + "github.com/go-logr/logr" + apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" + "k8s.io/client-go/util/retry" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/source" + + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "kpt.dev/resourcegroup/apis/kpt.dev/v1alpha1" + "kpt.dev/resourcegroup/controllers/handler" + "kpt.dev/resourcegroup/controllers/resourcegroup" + "kpt.dev/resourcegroup/controllers/resourcemap" + "kpt.dev/resourcegroup/controllers/typeresolver" + "kpt.dev/resourcegroup/controllers/watch" +) + +const ( + ConfigSyncGroup = "configsync" + KptGroup = "kpt" + DisableStatusKey = "configsync.gke.io/status" + DisableStatusValue = "disabled" +) + +// contextKey is a custom type for wrapping context values to make them unique +// to this package +type contextKey string + +const contextLoggerKey = contextKey("logger") + +// Reconciler reconciles a ResourceGroup object +// It only accepts the Create, Update, Delete events of ResourceGroup objects. +type Reconciler struct { + // cfg is the rest config associated with the reconciler + cfg *rest.Config + + // Client is to get and update ResourceGroup object. + client.Client + + // log is the logger of the reconciler. + log logr.Logger + + // TODO: check if scheme is needed + scheme *runtime.Scheme + + // resolver is the type resolver to find the server preferred + // GVK for a GK + resolver *typeresolver.TypeResolver + + // resMap is an instance of resourcemap which contains + // the mapping from the existing ResourceGroups to their underlying resources + // and reverse mapping. + resMap *resourcemap.ResourceMap + + // channel accepts the events that are from + // different watchers for GVKs. + channel chan event.GenericEvent + + // watches contains the mapping from GVK to their watchers. + watches *watch.Manager +} + +// +kubebuilder:rbac:groups=kpt.dev,resources=resourcegroups,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=kpt.dev,resources=resourcegroups/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=*,resources=*,verbs=get;list;watch + +func (r *Reconciler) Reconcile(rootCtx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := r.log + ctx := context.WithValue(rootCtx, contextLoggerKey, logger) + logger.Info("starts reconciling") + return r.reconcileKptGroup(ctx, logger, req) +} + +func (r *Reconciler) reconcileKptGroup(ctx context.Context, logger logr.Logger, req ctrl.Request) (ctrl.Result, error) { + var resgroup = &v1alpha1.ResourceGroup{} + err := r.Get(ctx, req.NamespacedName, resgroup) + if err != nil { + if errors.IsNotFound(err) { + // If the ResourceGroup has been deleted, update the resMap + return r.reconcile(ctx, req.NamespacedName, []v1alpha1.ObjMetadata{}, true) + } + return ctrl.Result{}, err + } + + // ResourceGroup CR is created from ConfigSync and set to disable the status + if isStatusDisabled(resgroup) { + return r.reconcileDisabledResourceGroup(ctx, req, resgroup) + } + + // ResourceGroup is in the process of being deleted, clean up the cache for this ResourceGroup + if resgroup.DeletionTimestamp != nil { + return r.reconcile(ctx, req.NamespacedName, []v1alpha1.ObjMetadata{}, true) + } + + resources := make([]v1alpha1.ObjMetadata, 0, len(resgroup.Spec.Resources)+len(resgroup.Spec.Subgroups)) + resources = append(resources, resgroup.Spec.Resources...) + resources = append(resources, v1alpha1.ToObjMetadata(resgroup.Spec.Subgroups)...) + if result, err := r.reconcile(ctx, req.NamespacedName, resources, false); err != nil { + return result, err + } + + // Push an event to the ResourceGroup event channel + r.channel <- event.GenericEvent{Object: resgroup} + logger.Info("finished reconciling") + + return ctrl.Result{}, nil +} + +func (r *Reconciler) reconcile(ctx context.Context, name types.NamespacedName, + resources []v1alpha1.ObjMetadata, deleteRG bool) (ctrl.Result, error) { + gks := r.resMap.Reconcile(ctx, name, resources, deleteRG) + if err := r.updateWatches(ctx, gks); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, nil +} + +// updateWatches add new watches for GVKs when resgroup includes the first GVK resource(s), +// and delete watches for GVKs when no resource group includes GVK resources any more. +func (r *Reconciler) updateWatches(ctx context.Context, gks []schema.GroupKind) error { + gvkMap := map[schema.GroupVersionKind]struct{}{} + for _, gk := range gks { + gvk, found := r.resolver.Resolve(gk) + if found { + gvkMap[gvk] = struct{}{} + } + } + return r.watches.UpdateWatches(ctx, gvkMap) +} + +func (r *Reconciler) reconcileDisabledResourceGroup(ctx context.Context, req ctrl.Request, resgroup *v1alpha1.ResourceGroup) (ctrl.Result, error) { + // clean the existing .status field + emptyStatus := v1alpha1.ResourceGroupStatus{} + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + if apiequality.Semantic.DeepEqual(resgroup.Status, emptyStatus) { + return nil + } + resgroup.Status = emptyStatus + // Use `r.Status().Update()` here instead of `r.Update()` to update only resgroup.Status. + return r.Status().Update(ctx, resgroup) + }) + if err != nil { + return ctrl.Result{}, err + } + // update the resMap + return r.reconcile(ctx, req.NamespacedName, []v1alpha1.ObjMetadata{}, true) +} + +func isStatusDisabled(resgroup *v1alpha1.ResourceGroup) bool { + annotations := resgroup.GetAnnotations() + if annotations == nil { + return false + } + val, found := annotations[DisableStatusKey] + return found && val == DisableStatusValue +} + +func NewController(mgr manager.Manager, channel chan event.GenericEvent, + logger logr.Logger, resolver *typeresolver.TypeResolver, group string, resMap *resourcemap.ResourceMap) error { + cfg := mgr.GetConfig() + watchOption, err := watch.DefaultOptions(cfg) + if err != nil { + return err + } + watchManager, err := watch.NewManager(cfg, resMap, channel, watchOption) + if err != nil { + return err + } + // Create the reconciler + reconciler := &Reconciler{ + Client: mgr.GetClient(), + cfg: cfg, + log: logger, + scheme: mgr.GetScheme(), + resolver: resolver, + resMap: resMap, + channel: channel, + watches: watchManager, + } + + _, err = ctrl.NewControllerManagedBy(mgr). + For(&v1alpha1.ResourceGroup{}). + Named(group+"Root"). + WithEventFilter(ResourceGroupPredicate{}). + // skip the Generic events + WithEventFilter(NoGenericEventPredicate{}). + Watches(&source.Kind{Type: &apiextensionsv1.CustomResourceDefinition{}}, &handler.CRDEventHandler{ + Mapping: resMap, + Channel: channel, + Log: logger, + }). + Build(reconciler) + + if err != nil { + return err + } + return nil +} + +// NoGenericEventPredicate skips all the generic events +type NoGenericEventPredicate struct { + predicate.Funcs +} + +// Generic skips all generic events +func (NoGenericEventPredicate) Generic(event.GenericEvent) bool { + return false +} + +// ResourceGroupPredicate skips events where the new status is not changed by the old status. +type ResourceGroupPredicate struct { + predicate.Funcs +} + +// Update ensures only select ResourceGroup updates causes a reconciliation loop. This prevents +// the controller from generating an infinite loop of reconcilers. +func (ResourceGroupPredicate) Update(e event.UpdateEvent) bool { + // Only allow ResourceGroup CR events. + rgNew, ok := e.ObjectNew.(*v1alpha1.ResourceGroup) + if !ok { + return false + } + rgOld, ok := e.ObjectOld.(*v1alpha1.ResourceGroup) + if !ok { + return false + } + + // Reconcile if the generation (spec) is updated, or the previous reconcile stalled and needs to be reconciled. + if rgNew.Generation != rgOld.Generation || isConditionTrue(v1alpha1.Stalled, rgNew.Status.Conditions) { + return true + } + + // If a ResourceGroup has the status disabled annotation and it status field + // is not empty, it should trigger a reconcile to remove reset the status. + if isStatusDisabled(rgNew) { + return rgNew.Status.Conditions != nil + } + + // If a current reconcile loop is already acting on the ResourceGroup CR, it + // should not trigger another reconcile. + if isConditionTrue(v1alpha1.Reconciling, rgNew.Status.Conditions) { + return false + } + + // Check if the status field needs to be updated since the actuation field was externally updated. + return statusNeedsUpdate(rgNew.Status.ResourceStatuses) +} + +// statusNeedsUpdate checks each resource status to ensure the legacy status field +// aligns with the new actuation/reconcile status fields. +func statusNeedsUpdate(statuses []v1alpha1.ResourceStatus) bool { + for _, s := range statuses { + if resourcegroup.ActuationStatusToLegacy(s) != s.Status { + return true + } + } + + return false +} + +// isConditionTrue scans through a slice of conditions and returns whether the wanted condition +// type is true or false. Defaults to false if the condition type is not found. +func isConditionTrue(cType v1alpha1.ConditionType, conditions []v1alpha1.Condition) bool { + for _, c := range conditions { + if c.Type == cType { + return c.Status == v1alpha1.TrueConditionStatus + } + } + + return false +} diff --git a/vendor/kpt.dev/resourcegroup/controllers/root/util.go b/vendor/kpt.dev/resourcegroup/controllers/root/util.go new file mode 100644 index 0000000000..7bb115a4a9 --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/root/util.go @@ -0,0 +1,40 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package root + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + + "kpt.dev/resourcegroup/apis/kpt.dev/v1alpha1" +) + +// getGroupKinds returns GroupKind's from a ResourceGroup's spec +func getGroupKinds(spec v1alpha1.ResourceGroupSpec) map[schema.GroupKind]struct{} { + if len(spec.Resources) == 0 { + return nil + } + gkSet := make(map[schema.GroupKind]struct{}) + + for _, res := range spec.Resources { + gk := schema.GroupKind{ + Group: res.Group, + Kind: res.Kind, + } + if _, ok := gkSet[gk]; !ok { + gkSet[gk] = struct{}{} + } + } + return gkSet +} diff --git a/vendor/kpt.dev/resourcegroup/controllers/runner/run.go b/vendor/kpt.dev/resourcegroup/controllers/runner/run.go new file mode 100644 index 0000000000..bdaa3e75f6 --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/runner/run.go @@ -0,0 +1,141 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runner + +import ( + "flag" + "fmt" + + "github.com/go-logr/logr" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" // register gcp auth provider plugin + "kpt.dev/resourcegroup/apis/kpt.dev/v1alpha1" + "kpt.dev/resourcegroup/controllers/log" + ocmetrics "kpt.dev/resourcegroup/controllers/metrics" + "kpt.dev/resourcegroup/controllers/profiler" + "kpt.dev/resourcegroup/controllers/resourcegroup" + "kpt.dev/resourcegroup/controllers/resourcemap" + "kpt.dev/resourcegroup/controllers/root" + "kpt.dev/resourcegroup/controllers/typeresolver" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/event" + // +kubebuilder:scaffold:imports +) + +var ( + scheme = runtime.NewScheme() + setupLog = ctrl.Log.WithName("setup") +) + +// Run starts all controllers and returns an integer exit code. +func Run() int { + if err := run(); err != nil { + setupLog.Error(err, "exiting") + return 1 + } + return 0 +} + +func run() error { + _ = clientgoscheme.AddToScheme(scheme) + + _ = v1alpha1.AddToScheme(scheme) + // +kubebuilder:scaffold:scheme + + _ = apiextensionsv1.AddToScheme(scheme) + + log.InitFlags() + + var metricsAddr string + var enableLeaderElection bool + flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.") + flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, + "Enable leader election for controller manager. "+ + "Enabling this will ensure there is only one active controller manager.") + flag.Parse() + + profiler.Service() + + // Register the OpenCensus views + if err := ocmetrics.RegisterReconcilerMetricsViews(); err != nil { + return fmt.Errorf("failed to register OpenCensus views: %w", err) + } + + // Register the OC Agent exporter + oce, err := ocmetrics.RegisterOCAgentExporter() + if err != nil { + return fmt.Errorf("failed to register the OC Agent exporter: %w", err) + } + + defer func() { + if err := oce.Stop(); err != nil { + setupLog.Error(err, "Unable to stop the OC Agent exporter") + } + }() + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + Scheme: scheme, + MetricsBindAddress: metricsAddr, + Port: 9443, + LeaderElection: enableLeaderElection, + LeaderElectionID: "413d8c8e.gke.io", + }) + if err != nil { + return fmt.Errorf("failed to build manager: %w", err) + } + + logger := ctrl.Log.WithName("controllers") + + for _, group := range []string{root.KptGroup} { + if err := registerControllersForGroup(mgr, logger, group); err != nil { + return fmt.Errorf("failed to register controllers for group %s: %w", group, err) + } + } + + // +kubebuilder:scaffold:builder + + setupLog.Info("starting manager") + if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + return fmt.Errorf("failed to start controller-manager: %w", err) + } + return nil +} + +func registerControllersForGroup(mgr ctrl.Manager, logger logr.Logger, group string) error { + // channel is watched by ResourceGroup controller. + // The Root controller pushes events to it and + // the ResourceGroup controller consumes events. + channel := make(chan event.GenericEvent) + + setupLog.Info("adding the type resolver") + resolver, err := typeresolver.NewTypeResolver(mgr, logger.WithName("TypeResolver")) + if err != nil { + return fmt.Errorf("unable to set up the type resolver: %w", err) + } + resolver.Refresh() + + setupLog.Info("adding the Root controller for group " + group) + resMap := resourcemap.NewResourceMap() + if err := root.NewController(mgr, channel, logger.WithName("Root"), resolver, group, resMap); err != nil { + return fmt.Errorf("unable to create the root controller for group %s: %w", group, err) + } + + setupLog.Info("adding the ResourceGroup controller for group " + group) + if err := resourcegroup.NewRGController(mgr, channel, logger.WithName(v1alpha1.ResourceGroupKind), resolver, resMap, resourcegroup.DefaultDuration); err != nil { + return fmt.Errorf("unable to create the ResourceGroup controller %s: %w", group, err) + } + return nil +} diff --git a/vendor/kpt.dev/resourcegroup/controllers/status/status.go b/vendor/kpt.dev/resourcegroup/controllers/status/status.go new file mode 100644 index 0000000000..54207949f1 --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/status/status.go @@ -0,0 +1,147 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package status + +import ( + "fmt" + "strings" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/klog/v2" + "kpt.dev/resourcegroup/controllers/resourcemap" + kstatus "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/yaml" + + "kpt.dev/resourcegroup/apis/kpt.dev/v1alpha1" +) + +const ( + owningInventoryKey = "config.k8s.io/owning-inventory" + SourceHashAnnotationKey = "configmanagement.gke.io/token" +) + +// ComputeStatus computes the status and conditions that should be +// saved in the memory. +func ComputeStatus(obj *unstructured.Unstructured) *resourcemap.CachedStatus { + resStatus := &resourcemap.CachedStatus{} + + // get the resource status using the kstatus library + result, err := kstatus.Compute(obj) + if err != nil || result == nil { + resStatus.Status = v1alpha1.Unknown + } + if err != nil { + klog.Errorf("kstatus.Compute for %v failed: %v", obj, err) + } + if err != nil || result == nil { + resStatus.Status = v1alpha1.Unknown + return resStatus + } + + resStatus.Status = v1alpha1.Status(result.Status) + if resStatus.Status == v1alpha1.Failed { + resStatus.Conditions = ConvertKstatusConditions(result.Conditions) + } else if IsCNRMResource(obj.GroupVersionKind().Group) && resStatus.Status != v1alpha1.Current { + // Special handling for KCC resources. + // It should be removed after KCC resources implement the stalled conditions. + conditions, cErr := ReadKCCResourceConditions(obj) + if cErr != nil { + klog.Errorf(cErr.Error()) + // fallback to use the kstatus conditions for this resource. + resStatus.Conditions = ConvertKstatusConditions(result.Conditions) + } else { + resStatus.Conditions = conditions + } + } + + hash := GetSourceHash(obj.GetAnnotations()) + if hash != "" { + resStatus.SourceHash = hash + } + // get the inventory ID. + inv := getOwningInventory(obj.GetAnnotations()) + resStatus.InventoryID = inv + return resStatus +} + +// ConvertKstatusConditions converts the status from kstatus library to the conditions +// defined in ResourceGroup apis. +func ConvertKstatusConditions(kstatusConds []kstatus.Condition) []v1alpha1.Condition { + var result []v1alpha1.Condition + for _, cond := range kstatusConds { + result = append(result, convertKstatusCondition(cond)) + } + return result +} + +func convertKstatusCondition(kstatusCond kstatus.Condition) v1alpha1.Condition { + return v1alpha1.Condition{ + Type: v1alpha1.ConditionType(kstatusCond.Type), + Status: v1alpha1.ConditionStatus(kstatusCond.Status), + Reason: kstatusCond.Reason, + Message: kstatusCond.Message, + // When kstatus adds the support for accepting an existing list of conditions and + // compute `LastTransitionTime`, we can set LastTransitionTime to: + // LastTransitionTime: kstatusCond.LastTransionTime, + // Leaving LastTransitionTime unset or setting it as `metav1.Time{}` or `metav1.Time{Time: time.Time{}}` will cause serialization error: + // status.resourceStatuses.conditions.lastTransitionTime: Invalid value: \"null\": + // status.resourceStatuses.conditions.lastTransitionTime in body must be of type string: \"null\"" + LastTransitionTime: metav1.Now(), + } +} + +// IsCNRMResource checks if a group is for a CNRM resource. +func IsCNRMResource(group string) bool { + return strings.HasSuffix(group, "cnrm.cloud.google.com") +} + +// ReadKCCResourceConditions reads the status.conditions from a KCC object. +func ReadKCCResourceConditions(obj *unstructured.Unstructured) ([]v1alpha1.Condition, error) { + conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions") + if err != nil { + return nil, fmt.Errorf("failed to find .stauts.conditions for %s/%s: %v", obj.GetNamespace(), obj.GetName(), err) + } + if !found { + return nil, fmt.Errorf("failed to find .stauts.conditions for %s/%s", obj.GetNamespace(), obj.GetName()) + } + data, err := yaml.Marshal(conditions) + if err != nil { + return nil, fmt.Errorf("failed to marshal conditions for %s/%s", obj.GetNamespace(), obj.GetName()) + } + results := make([]v1alpha1.Condition, len(conditions)) + err = yaml.Unmarshal(data, &results) + return results, err +} + +// GetSourceHash returns the source hash that is defined in the +// source hash annotation. +func GetSourceHash(annotations map[string]string) string { + if len(annotations) == 0 { + return "" + } + sourceHash := annotations[SourceHashAnnotationKey] + if len(sourceHash) > 7 { + return sourceHash[0:7] + } + return sourceHash +} + +func getOwningInventory(annotations map[string]string) string { + if len(annotations) == 0 { + return "" + } + return annotations[owningInventoryKey] +} diff --git a/vendor/kpt.dev/resourcegroup/controllers/typeresolver/fakeresolver.go b/vendor/kpt.dev/resourcegroup/controllers/typeresolver/fakeresolver.go new file mode 100644 index 0000000000..7c500e4a62 --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/typeresolver/fakeresolver.go @@ -0,0 +1,58 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package typeresolver + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func FakeResolver() *TypeResolver { + return &TypeResolver{ + typeMapping: map[schema.GroupKind]schema.GroupVersionKind{ + { + Group: "apps", + Kind: "Deployment", + }: { + Group: "apps", + Version: "v1", + Kind: "Deployment", + }, + { + Group: "apps", + Kind: "StatefulSet", + }: { + Group: "apps", + Version: "v1", + Kind: "StatefulSet", + }, + { + Group: "apps", + Kind: "DaemonSet", + }: { + Group: "apps", + Version: "v1", + Kind: "DaemonSet", + }, + { + Group: "", + Kind: "ConfigMap", + }: { + Group: "", + Version: "v1", + Kind: "ConfigMap", + }, + }, + } +} diff --git a/vendor/kpt.dev/resourcegroup/controllers/typeresolver/typeresolver.go b/vendor/kpt.dev/resourcegroup/controllers/typeresolver/typeresolver.go new file mode 100644 index 0000000000..0622dc6f3b --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/typeresolver/typeresolver.go @@ -0,0 +1,105 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package typeresolver + +import ( + "context" + "sync" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +// TypeResolver keeps the preferred GroupVersionKind for all the +// types in the cluster. +type TypeResolver struct { + log logr.Logger + mu sync.Mutex + dc *discovery.DiscoveryClient + typeMapping map[schema.GroupKind]schema.GroupVersionKind +} + +func (r *TypeResolver) Refresh() { + mapping := make(map[schema.GroupKind]schema.GroupVersionKind) + apiResourcesList, err := discovery.ServerPreferredResources(r.dc) + if err != nil { + r.log.Error(err, "Unable to fetch api resources list by dynamic client") + return + } + for _, resources := range apiResourcesList { + groupversion := resources.GroupVersion + gv, err := schema.ParseGroupVersion(groupversion) + if err != nil { + continue + } + for _, resource := range resources.APIResources { + gk := schema.GroupKind{ + Group: gv.Group, + Kind: resource.Kind, + } + mapping[gk] = gk.WithVersion(gv.Version) + } + } + r.mu.Lock() + defer r.mu.Unlock() + r.typeMapping = mapping +} + +func (r *TypeResolver) Resolve(gk schema.GroupKind) (schema.GroupVersionKind, bool) { + r.mu.Lock() + defer r.mu.Unlock() + item, found := r.typeMapping[gk] + return item, found +} + +// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch + +func (r *TypeResolver) Reconcile(context.Context, ctrl.Request) (ctrl.Result, error) { + logger := r.log + logger.Info("refreshing type resolver") + r.Refresh() + return ctrl.Result{}, nil +} + +func NewTypeResolver(mgr ctrl.Manager, logger logr.Logger) (*TypeResolver, error) { + dc := discovery.NewDiscoveryClientForConfigOrDie(mgr.GetConfig()) + r := &TypeResolver{ + log: logger, + dc: dc, + } + + c, err := controller.New("TypeResolver", mgr, controller.Options{ + Reconciler: reconcile.Func(r.Reconcile), + }) + + if err != nil { + return nil, err + } + + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "apiextensions.k8s.io", + Version: "v1", + Kind: "CustomResourceDefinition", + }) + return r, c.Watch(&source.Kind{Type: u}, &handler.EnqueueRequestForObject{}) +} diff --git a/vendor/kpt.dev/resourcegroup/controllers/watch/filteredwatcher.go b/vendor/kpt.dev/resourcegroup/controllers/watch/filteredwatcher.go new file mode 100644 index 0000000000..8eb8ba48bd --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/watch/filteredwatcher.go @@ -0,0 +1,352 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package watch + +import ( + "context" + "fmt" + "math" + "math/rand" + "reflect" + "sync" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + k8sevent "sigs.k8s.io/controller-runtime/pkg/event" + + "kpt.dev/resourcegroup/apis/kpt.dev/v1alpha1" + "kpt.dev/resourcegroup/controllers/resourcemap" + "kpt.dev/resourcegroup/controllers/status" + "kpt.dev/resourcegroup/third_party/k8s.io/client-go/tools/cache" +) + +const ( + // Copying strategy from k8s.io/client-go/tools/cache/reflector.go + // We try to spread the load on apiserver by setting timeouts for + // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout]. + minWatchTimeout = 5 * time.Minute +) + +// maxWatchRetryFactor is used to determine when the next retry should happen. +// 2^^18 * time.Millisecond = 262,144 ms, which is about 4.36 minutes. +const maxWatchRetryFactor = 18 + +// Runnable defines the custom watch interface. +type Runnable interface { + Stop() + Run(ctx context.Context) error +} + +const ( + watchEventBookmarkType = "Bookmark" + watchEventErrorType = "Error" + watchEventUnsupportedType = "Unsupported" +) + +// errorLoggingInterval specifies the minimal time interval two errors related to the same object +// and having the same errorType should be logged. +const errorLoggingInterval = time.Second + +// filteredWatcher is wrapper around a watch interface. +// It only keeps the events for objects that are +// listed in a ResourceGroup CR. +type filteredWatcher struct { + gvk string + startWatch startWatchFunc + resources *resourcemap.ResourceMap + // errorTracker maps an error to the time when the same error happened last time. + errorTracker map[string]time.Time + + // channel is the channel for ResourceGroup generic events. + channel chan k8sevent.GenericEvent + + // The following fields are guarded by the mutex. + mux sync.Mutex + base watch.Interface + stopped bool +} + +// filteredWatcher implements the Runnable interface. +var _ Runnable = &filteredWatcher{} + +// NewFiltered returns a new filtered watch initialized with the given options. +func NewFiltered(_ context.Context, cfg watcherConfig) Runnable { + return &filteredWatcher{ + gvk: cfg.gvk.String(), + startWatch: cfg.startWatch, + resources: cfg.resources, + base: watch.NewEmptyWatch(), + errorTracker: make(map[string]time.Time), + channel: cfg.channel, + } +} + +// pruneErrors removes the errors happened before errorLoggingInterval from w.errorTracker. +// This is to save the memory usage for tracking errors. +func (w *filteredWatcher) pruneErrors() { + for errName, lastErrorTime := range w.errorTracker { + if time.Since(lastErrorTime) >= errorLoggingInterval { + delete(w.errorTracker, errName) + } + } +} + +// addError checks whether an error identified by the errorID has been tracked, +// and handles it in one of the following ways: +// - tracks it if it has not yet been tracked; +// - updates the time for this error to time.Now() if `errorLoggingInterval` has passed +// since the same error happened last time; +// - ignore the error if `errorLoggingInterval` has NOT passed since it happened last time. +// +// addError returns false if the error is ignored, and true if it is not ignored. +func (w *filteredWatcher) addError(errorID string) bool { + lastErrorTime, ok := w.errorTracker[errorID] + if !ok || time.Since(lastErrorTime) >= errorLoggingInterval { + w.errorTracker[errorID] = time.Now() + return true + } + return false +} + +// Stop fully stops the filteredWatcher in a threadsafe manner. This means that +// it stops the underlying base watch and prevents the filteredWatcher from +// restarting it (like it does if the API server disconnects the base watch). +func (w *filteredWatcher) Stop() { + w.mux.Lock() + defer w.mux.Unlock() + + w.base.Stop() + w.stopped = true +} + +func waitUntilNextRetry(retries int) { + if retries > maxWatchRetryFactor { + retries = maxWatchRetryFactor + } + milliseconds := int64(math.Pow(2, float64(retries))) + duration := time.Duration(milliseconds) * time.Millisecond + time.Sleep(duration) +} + +// Run reads the event from the base watch interface, +// filters the event and pushes the object contained +// in the event to the controller work queue. +func (w *filteredWatcher) Run(context.Context) error { + klog.Infof("Watch started for %s", w.gvk) + var resourceVersion string + var retriesForWatchError int + + for { + // There are three ways this function can return: + // 1. false, error -> We were unable to start the watch, so exit Run(). + // 2. false, nil -> We have been stopped via Stop(), so exit Run(). + // 3. true, nil -> We have not been stopped and we started a new watch. + started, err := w.start(resourceVersion) + if err != nil { + return err + } + if !started { + break + } + + eventCount := 0 + ignoredEventCount := 0 + klog.Infof("(Re)starting watch for %s at resource version %q", w.gvk, resourceVersion) + for event := range w.base.ResultChan() { + w.pruneErrors() + newVersion, ignoreEvent, err := w.handle(event) + eventCount++ + if ignoreEvent { + ignoredEventCount++ + } + if err != nil { + if cache.IsExpiredError(err) { + klog.Infof("Watch for %s at resource version %q closed with: %v", w.gvk, resourceVersion, err) + // `w.handle` may fail because we try to watch an old resource version, setting + // a watch on an old resource version will always fail. + // Reset `resourceVersion` to an empty string here so that we can start a new + // watch at the most recent resource version. + resourceVersion = "" + } else if w.addError(watchEventErrorType + errorID(err)) { + klog.Errorf("Watch for %s at resource version %q ended with: %v", w.gvk, resourceVersion, err) + } + retriesForWatchError++ + waitUntilNextRetry(retriesForWatchError) + // Call `break` to restart the watch. + break + } + retriesForWatchError = 0 + if newVersion != "" { + resourceVersion = newVersion + } + } + klog.Infof("Ending watch for %s at resource version %q (total events: %d, ignored events: %d)", + w.gvk, resourceVersion, eventCount, ignoredEventCount) + } + klog.Infof("Watch stopped for %s", w.gvk) + return nil +} + +// start initiates a new base watch at the given resource version in a +// threadsafe manner and returns true if the new base watch was created. Returns +// false if the filteredWatcher is already stopped and returns error if the base +// watch could not be started. +func (w *filteredWatcher) start(resourceVersion string) (bool, error) { + w.mux.Lock() + defer w.mux.Unlock() + + if w.stopped { + return false, nil + } + w.base.Stop() + + // We want to avoid situations of hanging watchers. Stop any watchers that + // do not receive any events within the timeout window. + //nolint:gosec // don't need cryptographic randomness here + timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) + options := metav1.ListOptions{ + AllowWatchBookmarks: true, + ResourceVersion: resourceVersion, + TimeoutSeconds: &timeoutSeconds, + Watch: true, + } + + base, err := w.startWatch(options) + if err != nil { + return false, fmt.Errorf("failed to start watch for %s: %v", w.gvk, err) + } + w.base = base + return true, nil +} + +func errorID(err error) string { + errTypeName := reflect.TypeOf(err).String() + + var s string + switch t := err.(type) { + case *apierrors.StatusError: + if t == nil { + break + } + if t.ErrStatus.Details != nil { + s = t.ErrStatus.Details.Name + } + if s == "" { + s = fmt.Sprintf("%s-%s-%d", t.ErrStatus.Status, t.ErrStatus.Reason, t.ErrStatus.Code) + } + } + return errTypeName + s +} + +// handle reads the event from the base watch interface, +// filters the event and pushes the object contained +// in the event to the controller work queue. +// +// handle returns the new resource version, whether the event should be ignored, +// and an error indicating that a watch.Error event type was encountered and the +// watch should be restarted. +func (w *filteredWatcher) handle(event watch.Event) (string, bool, error) { + var deleted bool + switch event.Type { + case watch.Added, watch.Modified: + deleted = false + case watch.Deleted: + deleted = true + case watch.Bookmark: + m, err := meta.Accessor(event.Object) + if err != nil { + // For watch.Bookmark, only the ResourceVersion field of event.Object is set. + // Therefore, set the second argument of w.addError to watchEventBookmarkType. + if w.addError(watchEventBookmarkType) { + klog.Errorf("Unable to access metadata of Bookmark event: %v", event) + } + return "", false, nil + } + return m.GetResourceVersion(), false, nil + case watch.Error: + return "", false, apierrors.FromObject(event.Object) + // Keep the default case to catch any new watch event types added in the future. + default: + if w.addError(watchEventUnsupportedType) { + klog.Errorf("Unsupported watch event: %#v", event) + } + return "", false, nil + } + + // get client.Object from the runtime object. + object, ok := event.Object.(*unstructured.Unstructured) + if !ok { + klog.Infof("Received non unstructured object in watch event: %T", object) + return "", false, nil + } + // filter objects. + id := getID(object) + if !w.shouldProcess(object) { + klog.V(4).Infof("Ignoring event for object: %v", id) + return object.GetResourceVersion(), true, nil + } + + if deleted { + klog.Infof("updating the reconciliation status: %v: %v", id, v1alpha1.NotFound) + w.resources.SetStatus(id, &resourcemap.CachedStatus{Status: v1alpha1.NotFound}) + } else { + klog.Infof("Received watch event for created/updated object %q", id) + resStatus := status.ComputeStatus(object) + if resStatus != nil { + klog.Infof("updating the reconciliation status: %v: %v", id, resStatus.Status) + w.resources.SetStatus(id, resStatus) + } + } + + for _, r := range w.resources.Get(id) { + resgroup := &v1alpha1.ResourceGroup{} + resgroup.SetNamespace(r.Namespace) + resgroup.SetName(r.Name) + + klog.Infof("sending a generic event from watcher for %v", resgroup.GetObjectMeta()) + w.channel <- k8sevent.GenericEvent{Object: resgroup} + } + + return object.GetResourceVersion(), false, nil +} + +// shouldProcess returns true if the given object should be enqueued by the +// watcher for processing. +func (w *filteredWatcher) shouldProcess(object client.Object) bool { + if w.resources == nil { + klog.V(4).Infof("The resources are empty") + } + id := getID(object) + return w.resources.HasResource(id) +} + +func getID(object client.Object) v1alpha1.ObjMetadata { + gvk := object.GetObjectKind().GroupVersionKind() + id := v1alpha1.ObjMetadata{ + Name: object.GetName(), + Namespace: object.GetNamespace(), + GroupKind: v1alpha1.GroupKind{ + Group: gvk.Group, + Kind: gvk.Kind, + }, + } + return id +} diff --git a/vendor/kpt.dev/resourcegroup/controllers/watch/manager.go b/vendor/kpt.dev/resourcegroup/controllers/watch/manager.go new file mode 100644 index 0000000000..6360cd9236 --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/watch/manager.go @@ -0,0 +1,223 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package watch + +import ( + "context" + "sync" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/event" + + "kpt.dev/resourcegroup/controllers/resourcemap" +) + +// Manager records which GVK's are watched. +// When a new GVK needs to be watches, it adds the watch +// to the associated controller. +type Manager struct { + // cfg is the rest config used to talk to apiserver. + cfg *rest.Config + + // mapper is the RESTMapper to use for mapping GroupVersionKinds to Resources. + mapper meta.RESTMapper + + // resources is the declared resources that are parsed from Git. + resources *resourcemap.ResourceMap + + // createWatcherFunc is the function to create a watcher. + createWatcherFunc createWatcherFunc + + // channel is the channel for ResourceGroup generic events. + channel chan event.GenericEvent + + // The following fields are guarded by the mutex. + mux sync.Mutex + // watcherMap maps GVKs to their associated watchers + watcherMap map[schema.GroupVersionKind]Runnable + // needsUpdate indicates if the Manager's watches need to be updated. + needsUpdate bool +} + +// Options contains options for creating a watch manager. +type Options struct { + // Mapper is the RESTMapper to use for mapping GroupVersionKinds to Resources. + Mapper meta.RESTMapper + + watcherFunc createWatcherFunc +} + +// DefaultOptions return the default options: +// - create discovery RESTmapper from the passed rest.Config +// - use createWatcher to create watchers +func DefaultOptions(cfg *rest.Config) (*Options, error) { + mapper, err := apiutil.NewDynamicRESTMapper(cfg) + if err != nil { + return nil, err + } + + return &Options{ + Mapper: mapper, + watcherFunc: createWatcher, + }, nil +} + +// NewManager starts a new watch manager +func NewManager(cfg *rest.Config, decls *resourcemap.ResourceMap, channel chan event.GenericEvent, options *Options) (*Manager, error) { + if options == nil { + var err error + options, err = DefaultOptions(cfg) + if err != nil { + return nil, err + } + } + + return &Manager{ + cfg: cfg, + resources: decls, + watcherMap: make(map[schema.GroupVersionKind]Runnable), + createWatcherFunc: options.watcherFunc, + mapper: options.Mapper, + channel: channel, + mux: sync.Mutex{}, + }, nil +} + +// NeedsUpdate returns true if the Manager's watches need to be updated. This function is threadsafe. +func (m *Manager) NeedsUpdate() bool { + m.mux.Lock() + defer m.mux.Unlock() + return m.needsUpdate +} + +// UpdateWatches accepts a map of GVKs that should be watched and takes the +// following actions: +// - stop watchers for any GroupVersionKind that is not present in the given map. +// - start watchers for any GroupVersionKind that is present in the given map and not present in the current watch map. +// +// This function is threadsafe. +func (m *Manager) UpdateWatches(ctx context.Context, gvkMap map[schema.GroupVersionKind]struct{}) error { + m.mux.Lock() + defer m.mux.Unlock() + + m.needsUpdate = false + + var startedWatches, stoppedWatches uint64 + // Stop obsolete watchers. + for gvk := range m.watcherMap { + if _, keepWatching := gvkMap[gvk]; !keepWatching { + // We were watching the type, but no longer have declarations for it. + // It is safe to stop the watcher. + m.stopWatcher(gvk) + stoppedWatches++ + } + } + + // Start new watchers + var errs []error + for gvk := range gvkMap { + if _, isWatched := m.watcherMap[gvk]; !isWatched { + // We don't have a watcher for this type, so add a watcher for it. + if err := m.startWatcher(ctx, gvk); err != nil { + errs = append(errs, err) + } + startedWatches++ + } + } + + if startedWatches > 0 || stoppedWatches > 0 { + klog.Infof("The watch manager made new progress: started %d new watches, and stopped %d watches", startedWatches, stoppedWatches) + } else { + klog.V(4).Infof("The watch manager made no new progress") + } + if len(errs) == 0 { + return nil + } + return errs[0] +} + +// watchedGVKs returns a list of all GroupVersionKinds currently being watched. +func (m *Manager) watchedGVKs() []schema.GroupVersionKind { + var gvks []schema.GroupVersionKind + for gvk := range m.watcherMap { + gvks = append(gvks, gvk) + } + return gvks +} + +// startWatcher starts a watcher for a GVK. This function is NOT threadsafe; +// caller must have a lock on m.mux. +func (m *Manager) startWatcher(ctx context.Context, gvk schema.GroupVersionKind) error { + _, found := m.watcherMap[gvk] + if found { + // The watcher is already started. + return nil + } + cfg := watcherConfig{ + gvk: gvk, + mapper: m.mapper, + config: m.cfg, + channel: m.channel, + resources: m.resources, + } + w, err := m.createWatcherFunc(ctx, cfg) + if err != nil { + return err + } + + m.watcherMap[gvk] = w + go m.runWatcher(ctx, w, gvk) + return nil +} + +// runWatcher blocks until the given watcher finishes running. This function is +// threadsafe. +func (m *Manager) runWatcher(ctx context.Context, r Runnable, gvk schema.GroupVersionKind) { + if err := r.Run(ctx); err != nil { + klog.Warningf("Error running watcher for %s: %v", gvk.String(), err) + m.mux.Lock() + delete(m.watcherMap, gvk) + m.needsUpdate = true + m.mux.Unlock() + } +} + +// stopWatcher stops a watcher for a GVK. This function is NOT threadsafe; +// caller must have a lock on m.mux. +func (m *Manager) stopWatcher(gvk schema.GroupVersionKind) { + w, found := m.watcherMap[gvk] + if !found { + // The watcher is already stopped. + return + } + + // Stop the watcher. + w.Stop() + delete(m.watcherMap, gvk) +} + +// Len returns the number of types that are currently watched. +func (m *Manager) Len() int { + return len(m.watcherMap) +} + +func (m *Manager) IsWatched(gvk schema.GroupVersionKind) bool { + _, found := m.watcherMap[gvk] + return found +} diff --git a/vendor/kpt.dev/resourcegroup/controllers/watch/watcher.go b/vendor/kpt.dev/resourcegroup/controllers/watch/watcher.go new file mode 100644 index 0000000000..2c8dca2d0a --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/controllers/watch/watcher.go @@ -0,0 +1,67 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package watch + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/event" + + "kpt.dev/resourcegroup/controllers/resourcemap" +) + +type startWatchFunc func(metav1.ListOptions) (watch.Interface, error) + +// watcherConfig contains the options needed +// to create a watcher. +type watcherConfig struct { + gvk schema.GroupVersionKind + mapper meta.RESTMapper + config *rest.Config + resources *resourcemap.ResourceMap + startWatch startWatchFunc + channel chan event.GenericEvent +} + +// createWatcherFunc is the type of functions to create watchers +type createWatcherFunc func(ctx context.Context, cfg watcherConfig) (Runnable, error) + +// createWatcher creates a watcher for a given GVK +func createWatcher(ctx context.Context, cfg watcherConfig) (Runnable, error) { + if cfg.startWatch == nil { + mapping, err := cfg.mapper.RESTMapping(cfg.gvk.GroupKind(), cfg.gvk.Version) + if err != nil { + return nil, fmt.Errorf("watcher failed to get REST mapping for %s: %v", cfg.gvk.String(), err) + } + + dynamicClient, err := dynamic.NewForConfig(cfg.config) + if err != nil { + return nil, fmt.Errorf("watcher failed to get dynamic client for %s: %v", cfg.gvk.String(), err) + } + + cfg.startWatch = func(options metav1.ListOptions) (watch.Interface, error) { + return dynamicClient.Resource(mapping.Resource).Watch(ctx, options) + } + } + + return NewFiltered(ctx, cfg), nil +} diff --git a/vendor/kpt.dev/resourcegroup/third_party/k8s.io/client-go/LICENSE b/vendor/kpt.dev/resourcegroup/third_party/k8s.io/client-go/LICENSE new file mode 100644 index 0000000000..7a4a3ea242 --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/third_party/k8s.io/client-go/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/vendor/kpt.dev/resourcegroup/third_party/k8s.io/client-go/tools/cache/reflector.go b/vendor/kpt.dev/resourcegroup/third_party/k8s.io/client-go/tools/cache/reflector.go new file mode 100644 index 0000000000..9c9df70c1f --- /dev/null +++ b/vendor/kpt.dev/resourcegroup/third_party/k8s.io/client-go/tools/cache/reflector.go @@ -0,0 +1,32 @@ +/* +Copyright 2014 The Kubernetes Authors. +Copyright 2022 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + apierrors "k8s.io/apimachinery/pkg/api/errors" +) + +// IsExpiredError is copied from https://github.com/kubernetes/client-go/blob/b28e5dc3384c55127345f266bd285d31091aa745/tools/cache/reflector.go#L585 +// and updated to be public. +func IsExpiredError(err error) bool { + // In Kubernetes 1.17 and earlier, the api server returns both apierrors.StatusReasonExpired and + // apierrors.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent + // and always returns apierrors.StatusReasonExpired. For backward compatibility we can only remove the apierrors.IsGone + // check when we fully drop support for Kubernetes 1.17 servers from reflectors. + return apierrors.IsResourceExpired(err) || apierrors.IsGone(err) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 6e64145e68..365a4ac27f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1080,9 +1080,21 @@ k8s.io/utils/net k8s.io/utils/pointer k8s.io/utils/strings/slices k8s.io/utils/trace -# kpt.dev/resourcegroup v0.0.0-20221109031828-db4c3d2c630f -## explicit; go 1.17 +# kpt.dev/resourcegroup v0.0.0-20231023223236-7ca71815022b +## explicit; go 1.20 kpt.dev/resourcegroup/apis/kpt.dev/v1alpha1 +kpt.dev/resourcegroup/controllers/handler +kpt.dev/resourcegroup/controllers/log +kpt.dev/resourcegroup/controllers/metrics +kpt.dev/resourcegroup/controllers/profiler +kpt.dev/resourcegroup/controllers/resourcegroup +kpt.dev/resourcegroup/controllers/resourcemap +kpt.dev/resourcegroup/controllers/root +kpt.dev/resourcegroup/controllers/runner +kpt.dev/resourcegroup/controllers/status +kpt.dev/resourcegroup/controllers/typeresolver +kpt.dev/resourcegroup/controllers/watch +kpt.dev/resourcegroup/third_party/k8s.io/client-go/tools/cache # sigs.k8s.io/cli-utils v0.35.0 ## explicit; go 1.18 sigs.k8s.io/cli-utils/pkg/apis/actuation