From 79850fb470abba90bf1f80f6dadf73c4a3dfe4a8 Mon Sep 17 00:00:00 2001 From: Saza <49511480+saza-ku@users.noreply.github.com> Date: Thu, 29 Aug 2024 11:06:28 +0900 Subject: [PATCH] Implement syncer (#367) * add syncer Co-authored-by: Kensei Nakada * add syncer to the di container and invoke it from the entry point --------- Co-authored-by: Kensei Nakada --- simulator/cmd/simulator/simulator.go | 42 +- simulator/config.yaml | 15 +- simulator/config/config.go | 28 +- simulator/config/v1alpha1/types.go | 4 + simulator/docs/import-cluster-resources.md | 33 +- simulator/docs/simulator-server-config.md | 12 +- simulator/server/di/di.go | 17 + simulator/server/di/interface.go | 7 + simulator/syncer/resource.go | 123 +++++ simulator/syncer/syncer.go | 326 ++++++++++++++ simulator/syncer/syncer_test.go | 493 +++++++++++++++++++++ 11 files changed, 1079 insertions(+), 21 deletions(-) create mode 100644 simulator/syncer/resource.go create mode 100644 simulator/syncer/syncer.go create mode 100644 simulator/syncer/syncer_test.go diff --git a/simulator/cmd/simulator/simulator.go b/simulator/cmd/simulator/simulator.go index 77fde939..4b2ca159 100644 --- a/simulator/cmd/simulator/simulator.go +++ b/simulator/cmd/simulator/simulator.go @@ -11,13 +11,24 @@ import ( "golang.org/x/xerrors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + "k8s.io/client-go/discovery/cached/memory" + "k8s.io/client-go/dynamic" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" "k8s.io/klog/v2" "sigs.k8s.io/kube-scheduler-simulator/simulator/config" "sigs.k8s.io/kube-scheduler-simulator/simulator/server" "sigs.k8s.io/kube-scheduler-simulator/simulator/server/di" + "sigs.k8s.io/kube-scheduler-simulator/simulator/syncer" +) + +const ( + kubeAPIServerPollInterval = 5 * time.Second + kubeAPIServerReadyTimeout = 2 * time.Minute + importTimeout = 2 * time.Minute ) // entry point. @@ -40,13 +51,23 @@ func startSimulator() error { Host: cfg.KubeAPIServerURL, } client := clientset.NewForConfigOrDie(restCfg) + dynamicClient := dynamic.NewForConfigOrDie(restCfg) + discoverClient := discovery.NewDiscoveryClient(client.RESTClient()) + cachedDiscoveryClient := memory.NewMemCacheClient(discoverClient) + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscoveryClient) importClusterResourceClient := &clientset.Clientset{} - if cfg.ExternalImportEnabled { + var importClusterDynamicClient dynamic.Interface + if cfg.ExternalImportEnabled || cfg.ResourceSyncEnabled { importClusterResourceClient, err = clientset.NewForConfig(cfg.ExternalKubeClientCfg) if err != nil { return xerrors.Errorf("creates a new Clientset for the ExternalKubeClientCfg: %w", err) } + + importClusterDynamicClient, err = dynamic.NewForConfig(cfg.ExternalKubeClientCfg) + if err != nil { + return xerrors.Errorf("creates a new dynamic Clientset for the ExternalKubeClientCfg: %w", err) + } } etcdclient, err := clientv3.New(clientv3.Config{ @@ -57,9 +78,10 @@ func startSimulator() error { return xerrors.Errorf("create an etcd client: %w", err) } - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err = wait.PollUntilContextCancel(ctx, time.Second*5, true, func(ctx context.Context) (bool, error) { + + err = wait.PollUntilContextTimeout(ctx, kubeAPIServerPollInterval, kubeAPIServerReadyTimeout, true, func(ctx context.Context) (bool, error) { _, err := client.CoreV1().Namespaces().Get(context.Background(), "kube-system", metav1.GetOptions{}) if err != nil { klog.Infof("waiting for kube-system namespace to be ready: %v", err) @@ -72,7 +94,7 @@ func startSimulator() error { return xerrors.Errorf("kubeapi-server is not ready: %w", err) } - dic, err := di.NewDIContainer(client, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, importClusterResourceClient, cfg.ExternalSchedulerEnabled, cfg.Port) + dic, err := di.NewDIContainer(client, dynamicClient, restMapper, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, cfg.ResourceSyncEnabled, importClusterResourceClient, importClusterDynamicClient, cfg.ExternalSchedulerEnabled, cfg.Port, syncer.Options{}) if err != nil { return xerrors.Errorf("create di container: %w", err) } @@ -86,13 +108,21 @@ func startSimulator() error { // If ExternalImportEnabled is enabled, the simulator import resources // from the target cluster that indicated by the `KUBECONFIG`. if cfg.ExternalImportEnabled { - ctx := context.Background() // This must be called after `StartScheduler` - if err := dic.OneshotClusterResourceImporter().ImportClusterResources(ctx); err != nil { + timeoutCtx, timeoutCancel := context.WithTimeout(ctx, importTimeout) + defer timeoutCancel() + if err := dic.OneshotClusterResourceImporter().ImportClusterResources(timeoutCtx); err != nil { return xerrors.Errorf("import from the target cluster: %w", err) } } + if cfg.ResourceSyncEnabled { + // Start the resource syncer to sync resources from the target cluster. + if err = dic.ResourceSyncer().Run(ctx); err != nil { + return xerrors.Errorf("start syncing: %w", err) + } + } + // start simulator server s := server.NewSimulatorServer(cfg, dic) shutdownFn, err := s.Start(cfg.Port) diff --git a/simulator/config.yaml b/simulator/config.yaml index b2fb48f8..613e09b0 100644 --- a/simulator/config.yaml +++ b/simulator/config.yaml @@ -19,7 +19,8 @@ etcdURL: "http://127.0.0.1:2379" corsAllowedOriginList: - "http://localhost:3000" -# This is for the beta feature "Importing cluster's resources". +# This is for the beta feature "One-shot importing cluster's resources" +# and "Continuous syncing cluster's resources". # This variable is used to find Kubeconfig required to access your # cluster for importing resources to scheduler simulator. kubeConfig: "/kubeconfig.yaml" @@ -35,10 +36,18 @@ kubeApiServerUrl: "" kubeSchedulerConfigPath: "" # This variable indicates whether the simulator will -# import resources from an user cluster's or not. -# Note, this is still a beta feature. +# import resources from a user cluster specified by kubeConfig. +# Note that it only imports the resources once when the simulator is started. +# You cannot make both externalImportEnabled and resourceSyncEnabled true because those features would be conflicted. +# This is still a beta feature. externalImportEnabled: false +# This variable indicates whether the simulator will +# keep syncing resources from an user cluster's or not. +# You cannot make both externalImportEnabled and resourceSyncEnabled true because those features would be conflicted. +# Note, this is still a beta feature. +resourceSyncEnabled: false + # This variable indicates whether an external scheduler # is used. externalSchedulerEnabled: false diff --git a/simulator/config/config.go b/simulator/config/config.go index 85fc8936..6a09618c 100644 --- a/simulator/config/config.go +++ b/simulator/config/config.go @@ -31,10 +31,13 @@ type Config struct { KubeAPIServerURL string EtcdURL string CorsAllowedOriginList []string - // ExternalImportEnabled indicates whether the simulator will import resources from an target cluster or not. + // ExternalImportEnabled indicates whether the simulator will import resources from a target cluster once + // when it's started. ExternalImportEnabled bool + // ExternalImportEnabled indicates whether the simulator will keep syncing resources from a target cluster. + ResourceSyncEnabled bool // ExternalKubeClientCfg is KubeConfig to get resources from external cluster. - // This field is non-empty only when ExternalImportEnabled == true. + // This field should be set when ExternalImportEnabled == true or ResourceSyncEnabled == true. ExternalKubeClientCfg *rest.Config InitialSchedulerCfg *configv1.KubeSchedulerConfiguration // ExternalSchedulerEnabled indicates whether an external scheduler is enabled. @@ -48,6 +51,8 @@ const ( ) // NewConfig gets some settings from config file or environment variables. +// +//nolint:cyclop func NewConfig() (*Config, error) { if err := LoadYamlConfig(defaultFilePath); err != nil { return nil, err @@ -74,8 +79,12 @@ func NewConfig() (*Config, error) { } externalimportenabled := getExternalImportEnabled() + resourceSyncEnabled := getResourceSyncEnabled() externalKubeClientCfg := &rest.Config{} - if externalimportenabled { + if externalimportenabled && resourceSyncEnabled { + return nil, xerrors.Errorf("externalImportEnabled and resourceSyncEnabled cannot be used simultaneously.") + } + if externalimportenabled || resourceSyncEnabled { externalKubeClientCfg, err = clientcmd.BuildConfigFromFlags("", configYaml.KubeConfig) if err != nil { return nil, xerrors.Errorf("get kube clientconfig: %w", err) @@ -98,6 +107,7 @@ func NewConfig() (*Config, error) { ExternalImportEnabled: externalimportenabled, ExternalKubeClientCfg: externalKubeClientCfg, ExternalSchedulerEnabled: externalSchedEnabled, + ResourceSyncEnabled: resourceSyncEnabled, }, nil } @@ -257,6 +267,18 @@ func getExternalImportEnabled() bool { return isExternalImportEnabled } +// getResourceSyncEnabled reads RESOURCE_SYNC_ENABLED and converts it to bool +// if empty from the config file. +// This function will return `true` if `RESOURCE_SYNC_ENABLED` is "1". +func getResourceSyncEnabled() bool { + resourceSyncEnabledString := os.Getenv("RESOURCE_SYNC_ENABLED") + if resourceSyncEnabledString == "" { + resourceSyncEnabledString = strconv.FormatBool(configYaml.ResourceSyncEnabled) + } + resourceSyncEnabled, _ := strconv.ParseBool(resourceSyncEnabledString) + return resourceSyncEnabled +} + func decodeSchedulerCfg(buf []byte) (*configv1.KubeSchedulerConfiguration, error) { decoder := scheme.Codecs.UniversalDeserializer() obj, _, err := decoder.Decode(buf, nil, nil) diff --git a/simulator/config/v1alpha1/types.go b/simulator/config/v1alpha1/types.go index c1654d02..a44fb703 100644 --- a/simulator/config/v1alpha1/types.go +++ b/simulator/config/v1alpha1/types.go @@ -61,6 +61,10 @@ type SimulatorConfiguration struct { // Note, this is still a beta feature. ExternalImportEnabled bool `json:"externalImportEnabled,omitempty"` + // This variable indicates whether the simulator will + // sync resources from an user cluster's or not. + ResourceSyncEnabled bool `json:"resourceSyncEnabled,omitempty"` + // This variable indicates whether an external scheduler // is used. ExternalSchedulerEnabled bool `json:"externalSchedulerEnabled,omitempty"` diff --git a/simulator/docs/import-cluster-resources.md b/simulator/docs/import-cluster-resources.md index 36e7caa0..008b4530 100644 --- a/simulator/docs/import-cluster-resources.md +++ b/simulator/docs/import-cluster-resources.md @@ -1,16 +1,35 @@ ### [Beta] Import your real cluster's resources -The simulator can import resources from your cluster. +There are two ways to import resources from your cluster. These methods cannot be used simultaneously. +- Import resources from your cluster once when initializing the simulator. +- Keep importing resources from your cluster. -To use this, you need to follow these two steps -- Set to `true` the `externalImportEnabled` value in the simulator server configuration. -- Set the path of the kubeconfig file of the your cluster to `KubeConfig` value in the Simulator Server Configuration. +#### Import resources once when initializing the simulator + +To use this, you need to follow these two steps in the simulator configuration: +- Set `true` to `externalImportEnabled`. +- Set the path of the kubeconfig file for the your cluster to `KubeConfig`. + - This feature only requires the read permission for resources. + +```yaml +externalImportEnabled: true +kubeConfig: "/path/to/your-cluster-kubeconfig" +``` + +#### Keep importing resources + +To use this, you need to follow these two steps in the scheduler configuration: +- Set `true` to `resourceSyncEnabled`. +- Set the path of the kubeconfig file for the your cluster to `KubeConfig`. + - This feature only requires the read permission for resources. ```yaml -externalImportEnabled: false +resourceSyncEnabled: true kubeConfig: "/path/to/your-cluster-kubeconfig" ``` -Then, the simulator imports resources from your cluster once when it's initialized. +> [!NOTE] +> When you enable `resourceSyncEnabled`, adding/updating/deleting resources directly in the simulator cluster could cause a problem of syncing. +> You can do them for debugging etc purposes though, make sure you reboot the simulator and the fake source cluster afterward. -See also [simulator/docs/simulator-server-config.md](simulator-server-config.md). +See [simulator/docs/simulator-server-config.md](simulator/docs/simulator-server-config.md) for more information about the simulator configuration. diff --git a/simulator/docs/simulator-server-config.md b/simulator/docs/simulator-server-config.md index 218d6e57..22f959cf 100644 --- a/simulator/docs/simulator-server-config.md +++ b/simulator/docs/simulator-server-config.md @@ -40,10 +40,18 @@ kubeAPIServerURL: "" kubeSchedulerConfigPath: "" # This variable indicates whether the simulator will -# import resources from an user cluster's or not. -# Note, this is still a beta feature. +# import resources from a user cluster specified by kubeConfig. +# Note that it only imports the resources once when the simulator is started. +# You cannot make both externalImportEnabled and resourceSyncEnabled true because those features would be conflicted. +# This is still a beta feature. externalImportEnabled: false +# This variable indicates whether the simulator will +# keep syncing resources from an user cluster's or not. +# You cannot make both externalImportEnabled and resourceSyncEnabled true because those features would be conflicted. +# Note, this is still a beta feature. +resourceSyncEnabled: false + # This variable indicates whether an external scheduler # is used. externalSchedulerEnabled: false diff --git a/simulator/server/di/di.go b/simulator/server/di/di.go index 543dc86e..6c6f043b 100644 --- a/simulator/server/di/di.go +++ b/simulator/server/di/di.go @@ -6,6 +6,8 @@ package di import ( clientv3 "go.etcd.io/etcd/client/v3" "golang.org/x/xerrors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/dynamic" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" configv1 "k8s.io/kube-scheduler/config/v1" @@ -15,6 +17,7 @@ import ( "sigs.k8s.io/kube-scheduler-simulator/simulator/resourcewatcher" "sigs.k8s.io/kube-scheduler-simulator/simulator/scheduler" "sigs.k8s.io/kube-scheduler-simulator/simulator/snapshot" + "sigs.k8s.io/kube-scheduler-simulator/simulator/syncer" ) // Container saves and provides dependencies. @@ -23,6 +26,7 @@ type Container struct { snapshotService SnapshotService resetService ResetService oneshotClusterResourceImporter OneShotClusterResourceImporter + resourceSyncer ResourceSyncer resourceWatcherService ResourceWatcherService } @@ -31,13 +35,18 @@ type Container struct { // Only when externalImportEnabled is true, the simulator uses externalClient and creates ImportClusterResourceService. func NewDIContainer( client clientset.Interface, + dynamicClient dynamic.Interface, + restMapper meta.RESTMapper, etcdclient *clientv3.Client, restclientCfg *restclient.Config, initialSchedulerCfg *configv1.KubeSchedulerConfiguration, externalImportEnabled bool, + resourceSyncEnabled bool, externalClient clientset.Interface, + externalDynamicClient dynamic.Interface, externalSchedulerEnabled bool, simulatorPort int, + syncerOptions syncer.Options, ) (*Container, error) { c := &Container{} @@ -54,6 +63,9 @@ func NewDIContainer( extSnapshotSvc := snapshot.NewService(externalClient, c.schedulerService) c.oneshotClusterResourceImporter = oneshotimporter.NewService(snapshotSvc, extSnapshotSvc) } + if resourceSyncEnabled { + c.resourceSyncer = syncer.New(externalDynamicClient, dynamicClient, restMapper, syncerOptions) + } c.resourceWatcherService = resourcewatcher.NewService(client) return c, nil @@ -80,6 +92,11 @@ func (c *Container) OneshotClusterResourceImporter() OneShotClusterResourceImpor return c.oneshotClusterResourceImporter } +// ResourceSyncer returns ResourceSyncer. +func (c *Container) ResourceSyncer() ResourceSyncer { + return c.resourceSyncer +} + // ResourceWatcherService returns ResourceWatcherService. func (c *Container) ResourceWatcherService() ResourceWatcherService { return c.resourceWatcherService diff --git a/simulator/server/di/interface.go b/simulator/server/di/interface.go index 43cca59d..7bd806ce 100644 --- a/simulator/server/di/interface.go +++ b/simulator/server/di/interface.go @@ -38,6 +38,13 @@ type OneShotClusterResourceImporter interface { ImportClusterResources(ctx context.Context) error } +// ResourceSyncer represents a service to constantly sync resources from an target cluster. +type ResourceSyncer interface { + // Run starts the resource syncer. + // It should be run until the context is canceled. + Run(ctx context.Context) error +} + // ResourceWatcherService represents service for watch k8s resources. type ResourceWatcherService interface { ListWatch(ctx context.Context, stream streamwriter.ResponseStream, lrVersions *resourcewatcher.LastResourceVersions) error diff --git a/simulator/syncer/resource.go b/simulator/syncer/resource.go new file mode 100644 index 00000000..8e4cb5e7 --- /dev/null +++ b/simulator/syncer/resource.go @@ -0,0 +1,123 @@ +package syncer + +import ( + "context" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/klog/v2" +) + +// DefaultGVRs is a list of GroupVersionResource that we sync by default (configurable with Options), +// which is a suitable resource set for the vanilla scheduler. +// +// Note that this order matters - When first importing resources, we want to sync namespaces first, then priorityclasses, storageclasses... +var DefaultGVRs = []schema.GroupVersionResource{ + {Group: "", Version: "v1", Resource: "namespaces"}, + {Group: "scheduling.k8s.io", Version: "v1", Resource: "priorityclasses"}, + {Group: "storage.k8s.io", Version: "v1", Resource: "storageclasses"}, + {Group: "", Version: "v1", Resource: "persistentvolumeclaims"}, + {Group: "", Version: "v1", Resource: "nodes"}, + {Group: "", Version: "v1", Resource: "persistentvolumes"}, + {Group: "", Version: "v1", Resource: "pods"}, +} + +// Event is a type of events that occur in the source cluster. +type Event int + +const ( + Add Event = iota + Update +) + +// mandatoryMutatingFunctions is MutatingFunctions that we must register. +// We don't allow users to opt out them. +var mandatoryMutatingFunctions = map[schema.GroupVersionResource]MutatingFunction{ + {Group: "", Version: "v1", Resource: "persistentvolumes"}: mutatePV, + {Group: "", Version: "v1", Resource: "pods"}: mutatePods, +} + +// mandatoryFilteringFunctions is FilteringFunctions that we must register. +// We don't allow users to opt out them. +var mandatoryFilteringFunctions = map[schema.GroupVersionResource]FilteringFunction{ + {Group: "", Version: "v1", Resource: "pods"}: filterPods, +} + +// FilteringFunction is a function that filters a resource. +// If it returns false, the resource will not be imported. +type FilteringFunction func(ctx context.Context, resource *unstructured.Unstructured, clients *Clients, event Event) (bool, error) + +// MutatingFunction is a function that mutates a resource before importing it. +type MutatingFunction func(ctx context.Context, resource *unstructured.Unstructured, clients *Clients, event Event) (*unstructured.Unstructured, error) + +func mutatePV(ctx context.Context, resource *unstructured.Unstructured, clients *Clients, _ Event) (*unstructured.Unstructured, error) { + var pv v1.PersistentVolume + err := runtime.DefaultUnstructuredConverter.FromUnstructured(resource.UnstructuredContent(), &pv) + if err != nil { + return nil, err + } + + if pv.Status.Phase == v1.VolumeBound { + // PersistentVolumeClaims's UID is changed in a destination cluster when importing from a source cluster, + // and thus we need to update the PVC UID in the PersistentVolume. + // Get PVC of pv.Spec.ClaimRef.Name. + pvc, err := clients.SrcDynamicClient.Resource(schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "persistentvolumeclaims", + }).Namespace(pv.Spec.ClaimRef.Namespace).Get(ctx, pv.Spec.ClaimRef.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + pv.Spec.ClaimRef.UID = pvc.GetUID() + } + + modifiedUnstructed, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&pv) + return &unstructured.Unstructured{Object: modifiedUnstructed}, err +} + +func mutatePods(_ context.Context, resource *unstructured.Unstructured, _ *Clients, _ Event) (*unstructured.Unstructured, error) { + var pod v1.Pod + err := runtime.DefaultUnstructuredConverter.FromUnstructured(resource.UnstructuredContent(), &pod) + if err != nil { + return nil, err + } + + // Pods must have the default ServiceAccount because ServiceAccount is not synced. + pod.Spec.ServiceAccountName = "" + pod.Spec.DeprecatedServiceAccount = "" + + // If the pod has an owner, it may be deleted because resources such as ReplicaSet are not synced. + pod.OwnerReferences = nil + + modifiedUnstructed, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&pod) + return &unstructured.Unstructured{Object: modifiedUnstructed}, err +} + +// filterPods checks if a pod is already scheduled when it's updated. +// We only want to update pods that are not yet scheduled. +func filterPods(_ context.Context, resource *unstructured.Unstructured, _ *Clients, event Event) (bool, error) { + if event == Add { + // We always add a Pod, regardless it's scheduled or not. + return true, nil + } + + var pod v1.Pod + err := runtime.DefaultUnstructuredConverter.FromUnstructured(resource.UnstructuredContent(), &pod) + if err != nil { + return false, err + } + + if pod.Spec.NodeName != "" { + // We just ignore the not found error because the scheduler may preempt the Pods, or users may remove the resources for debugging. + klog.Info("Skipped to update resource because we cannot find it in the destination cluster", "resource", klog.KObj(&pod.ObjectMeta)) + return false, nil + } + + // This Pod should be applied on the destination cluster. + return true, nil +} diff --git a/simulator/syncer/syncer.go b/simulator/syncer/syncer.go new file mode 100644 index 00000000..de5beb5f --- /dev/null +++ b/simulator/syncer/syncer.go @@ -0,0 +1,326 @@ +package syncer + +import ( + "context" + + "golang.org/x/xerrors" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +type Service struct { + clients *Clients + + gvrs []schema.GroupVersionResource + mutatingFunctions map[schema.GroupVersionResource][]MutatingFunction + filteringFunctions map[schema.GroupVersionResource][]FilteringFunction +} + +// Note: Clients and its fields are exposed intentionally so that users can use it in MutatingFunction and FilteringFunction. +type Clients struct { + // SrcDynamicClient is the dynamic client for the source cluster, which the resource is supposed to be copied from. + SrcDynamicClient dynamic.Interface + // DestDynamicClient is the dynamic client for the destination cluster, which the resource is supposed to be copied to. + DestDynamicClient dynamic.Interface + RestMapper meta.RESTMapper +} + +type Options struct { + // GVRsToSync is a list of GroupVersionResource that will be synced. + // If GVRsToSync is nil, defaultGVRs are used. + GVRsToSync []schema.GroupVersionResource + // AdditionalMutatingFunctions is a list of mutating functions that users add. + AdditionalMutatingFunctions map[schema.GroupVersionResource]MutatingFunction + // AdditionalFilteringFunctions is a list of filtering functions that users add. + AdditionalFilteringFunctions map[schema.GroupVersionResource]FilteringFunction +} + +func New(srcDynamicClient, destDynamicClient dynamic.Interface, restMapper meta.RESTMapper, options Options) *Service { + s := &Service{ + clients: &Clients{ + SrcDynamicClient: srcDynamicClient, + DestDynamicClient: destDynamicClient, + RestMapper: restMapper, + }, + gvrs: DefaultGVRs, + mutatingFunctions: map[schema.GroupVersionResource][]MutatingFunction{}, + filteringFunctions: map[schema.GroupVersionResource][]FilteringFunction{}, + } + + if options.GVRsToSync != nil { + s.gvrs = options.GVRsToSync + } + + s.addMutatingFunctoins(mandatoryMutatingFunctions) + s.addMutatingFunctoins(options.AdditionalMutatingFunctions) + + s.addFilteringFunctoins(mandatoryFilteringFunctions) + s.addFilteringFunctoins(options.AdditionalFilteringFunctions) + + return s +} + +func (s *Service) Run(ctx context.Context) error { + klog.Info("Starting the cluster resource importer") + + infFact := dynamicinformer.NewFilteredDynamicSharedInformerFactory(s.clients.SrcDynamicClient, 0, metav1.NamespaceAll, nil) + for _, gvr := range s.gvrs { + inf := infFact.ForResource(gvr).Informer() + _, err := inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: s.addFunc, + UpdateFunc: s.updateFunc, + DeleteFunc: s.deleteFunc, + }) + if err != nil { + return xerrors.Errorf("failed to add event handler: %w", err) + } + go inf.Run(ctx.Done()) + infFact.WaitForCacheSync(ctx.Done()) + } + + klog.Info("Cluster resource syncer started") + + return nil +} + +// createResourceOnDestinationCluster creates the resource on the destination cluster. +func (s *Service) createResourceOnDestinationCluster( + ctx context.Context, + resource *unstructured.Unstructured, +) error { + // Extract the GroupVersionResource from the Unstructured object + gvk := resource.GroupVersionKind() + gvr, err := s.findGVRForGVK(gvk) + if err != nil { + return err + } + + // Namespaces resources should be created within the namespace defined in the Unstructured object + namespace := resource.GetNamespace() + + // Run the filtering function for the resource. + if ok, err := s.filterResource(ctx, gvr, resource, Add); !ok || err != nil { + return err + } + + // When creating a resource on the destination cluster, we must remove the metadata such as UID and Generation. + // It's done for all resources. + resource = removeUnnecessaryMetadata(resource) + + // Run the mutating function for the resource. + resource, err = s.mutateResource(ctx, gvr, resource, Add) + if err != nil { + return xerrors.Errorf("failed to mutate resource: %w", err) + } + + // Create the resource on the destination cluster using the dynamic client + _, err = s.clients.DestDynamicClient.Resource(gvr).Namespace(namespace).Create( + ctx, + resource, + metav1.CreateOptions{}, + ) + if err != nil { + return xerrors.Errorf("failed to create resource: %w", err) + } + + return nil +} + +func (s *Service) updateResourceOnDestinationCluster( + ctx context.Context, + resource *unstructured.Unstructured, +) error { + // Extract the GroupVersionResource from the Unstructured object. + gvk := resource.GroupVersionKind() + gvr, err := s.findGVRForGVK(gvk) + if err != nil { + return err + } + + // Namespaces resources should be created within the namespace defined in the Unstructured object. + namespace := resource.GetNamespace() + + // Run the filtering function for the resource. + if ok, err := s.filterResource(ctx, gvr, resource, Update); !ok || err != nil { + return err + } + + // Run the mutating function for the resource. + resource, err = s.mutateResource(ctx, gvr, resource, Update) + if err != nil { + return xerrors.Errorf("failed to mutate resource: %w", err) + } + + // Create the resource on the destination cluster using the dynamic client + _, err = s.clients.DestDynamicClient.Resource(gvr).Namespace(namespace).Update( + ctx, + resource, + metav1.UpdateOptions{}, + ) + if err != nil { + return xerrors.Errorf("failed to create resource: %w", err) + } + + return nil +} + +// removeUnnecessaryMetadata removes the metadata from the resource. +func removeUnnecessaryMetadata(resource *unstructured.Unstructured) *unstructured.Unstructured { + resource.SetUID("") + resource.SetGeneration(0) + resource.SetResourceVersion("") + + return resource +} + +func (s *Service) deleteResourceOnDestinationCluster( + ctx context.Context, + resource *unstructured.Unstructured, +) error { + // Extract the GroupVersionResource from the Unstructured object + gvk := resource.GroupVersionKind() + gvr, err := s.findGVRForGVK(gvk) + if err != nil { + return err + } + + // Namespaces resources should be created within the namespace defined in the Unstructured object + namespace := resource.GetNamespace() + + // Create the resource on the destination cluster using the dynamic client + err = s.clients.DestDynamicClient.Resource(gvr).Namespace(namespace).Delete( + ctx, + resource.GetName(), + metav1.DeleteOptions{}, + ) + if err != nil { + return xerrors.Errorf("failed to delete resource: %w", err) + } + + return nil +} + +// findGVRForGVK uses the discovery client to get the GroupVersionResource for a given GroupVersionKind. +func (s *Service) findGVRForGVK(gvk schema.GroupVersionKind) (schema.GroupVersionResource, error) { + m, err := s.clients.RestMapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return schema.GroupVersionResource{}, err + } + + return m.Resource, nil +} + +func (s *Service) addFunc(obj interface{}) { + ctx := context.Background() + unstructObj, ok := obj.(*unstructured.Unstructured) + if !ok { + klog.Error("Failed to convert runtime.Object to *unstructured.Unstructured") + return + } + + err := s.createResourceOnDestinationCluster(ctx, unstructObj) + if err != nil { + klog.ErrorS(err, "Failed to create resource on destination cluster") + } +} + +func (s *Service) updateFunc(_, newObj interface{}) { + ctx := context.Background() + unstructObj, ok := newObj.(*unstructured.Unstructured) + if !ok { + klog.Error("Failed to convert runtime.Object to *unstructured.Unstructured") + return + } + + err := s.updateResourceOnDestinationCluster(ctx, unstructObj) + if err != nil { + if errors.IsNotFound(err) { + // We just ignore the not found error because the scheduler may preempt the Pods, or users may remove the resources for debugging. + klog.Info("Skipped to update resource on destination: ", err) + } else { + klog.ErrorS(err, "Failed to update resource on destination cluster") + } + } +} + +func (s *Service) deleteFunc(obj interface{}) { + ctx := context.Background() + unstructObj, ok := obj.(*unstructured.Unstructured) + if !ok { + klog.Error("Failed to convert runtime.Object to *unstructured.Unstructured") + return + } + + err := s.deleteResourceOnDestinationCluster(ctx, unstructObj) + if err != nil { + if errors.IsNotFound(err) { + // We just ignore the not found error because the scheduler may preempt the Pods, or users may remove the resources for debugging. + klog.Info("Skipped to delete resource on destination: ", err) + } else { + klog.ErrorS(err, "Failed to delete resource on destination cluster") + } + } +} + +func (s *Service) addMutatingFunctoins(m map[schema.GroupVersionResource]MutatingFunction) { + for k, v := range m { + if s.mutatingFunctions[k] == nil { + s.mutatingFunctions[k] = []MutatingFunction{v} + } else { + s.mutatingFunctions[k] = append(s.mutatingFunctions[k], v) + } + } +} + +func (s *Service) addFilteringFunctoins(m map[schema.GroupVersionResource]FilteringFunction) { + for k, v := range m { + if s.filteringFunctions[k] == nil { + s.filteringFunctions[k] = []FilteringFunction{v} + } else { + s.filteringFunctions[k] = append(s.filteringFunctions[k], v) + } + } +} + +func (s *Service) mutateResource(ctx context.Context, gvr schema.GroupVersionResource, resource *unstructured.Unstructured, event Event) (*unstructured.Unstructured, error) { + mutatingFns, ok := s.mutatingFunctions[gvr] + if !ok { + return resource, nil + } + + for _, mutatingFn := range mutatingFns { + var err error + resource, err = mutatingFn(ctx, resource, s.clients, event) + if err != nil { + return resource, err + } + } + + return resource, nil +} + +func (s *Service) filterResource(ctx context.Context, gvr schema.GroupVersionResource, resource *unstructured.Unstructured, event Event) (bool, error) { + filteringFns, ok := s.filteringFunctions[gvr] + if !ok { + return true, nil + } + + for _, filteringFn := range filteringFns { + ok, err := filteringFn(ctx, resource, s.clients, event) + if err != nil { + return false, err + } + if !ok { + return false, nil + } + } + + return true, nil +} diff --git a/simulator/syncer/syncer_test.go b/simulator/syncer/syncer_test.go new file mode 100644 index 00000000..dc7c0300 --- /dev/null +++ b/simulator/syncer/syncer_test.go @@ -0,0 +1,493 @@ +package syncer + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + dynamicFake "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/restmapper" + scheduling "k8s.io/kubernetes/pkg/apis/scheduling/v1" + storage "k8s.io/kubernetes/pkg/apis/storage/v1" +) + +//nolint:gocognit // it is because of huge test cases. +func TestSyncerWithPod(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + initialPodsInSrcCluster []*v1.Pod + podsCreatedInSrcCluster []*v1.Pod + podsUpdatedInSrcCluster []*v1.Pod + podsDeletedInSrcCluster []*v1.Pod + afterPodsInDestCluster []*v1.Pod + }{ + { + name: "unscheduled pod is created in src cluster", + initialPodsInSrcCluster: []*v1.Pod{ + { + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container-1", + }, + }, + }, + }, + }, + podsCreatedInSrcCluster: []*v1.Pod{ + { + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container-2", + }, + }, + }, + }, + { + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-3", + Namespace: "default-3", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container-3", + }, + }, + }, + }, + }, + afterPodsInDestCluster: []*v1.Pod{ + { + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container-1", + }, + }, + }, + }, + { + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container-2", + }, + }, + }, + }, + { + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-3", + Namespace: "default-3", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container-3", + }, + }, + }, + }, + }, + }, + { + name: "pod is created and deleted in src cluster", + podsCreatedInSrcCluster: []*v1.Pod{ + { + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container-1", + }, + }, + }, + }, + { + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + Namespace: "default-2", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container-2", + }, + }, + }, + }, + }, + podsDeletedInSrcCluster: []*v1.Pod{ + { + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container-1", + }, + }, + }, + }, + }, + afterPodsInDestCluster: []*v1.Pod{ + { + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + Namespace: "default-2", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container-2", + }, + }, + }, + }, + }, + }, + { + name: "unscheduled pod is updated in src cluster", + podsCreatedInSrcCluster: []*v1.Pod{ + { + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container-1", + }, + }, + }, + }, + }, + podsUpdatedInSrcCluster: []*v1.Pod{ + { + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container-1", + }, + }, + }, + }, + }, + afterPodsInDestCluster: []*v1.Pod{ + { + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container-1", + }, + }, + }, + }, + }, + }, + { + name: "scheduled pod is NOT updated in src cluster", + podsCreatedInSrcCluster: []*v1.Pod{ + { + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container-1", + }, + }, + }, + }, + }, + podsUpdatedInSrcCluster: []*v1.Pod{ + { + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container-1", + }, + }, + + NodeName: "node-1", // Got NodeName, so this Pod is scheduled. + }, + }, + }, + afterPodsInDestCluster: []*v1.Pod{ + { + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container-1", + }, + }, + }, + }, + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + s := runtime.NewScheme() + v1.AddToScheme(s) + scheduling.AddToScheme(s) + storage.AddToScheme(s) + src := dynamicFake.NewSimpleDynamicClient(s) + dest := dynamicFake.NewSimpleDynamicClient(s) + resources := []*restmapper.APIGroupResources{ + { + Group: metav1.APIGroup{ + Versions: []metav1.GroupVersionForDiscovery{ + {Version: "v1"}, + }, + }, + VersionedResources: map[string][]metav1.APIResource{ + "v1": { + {Name: "pods", Namespaced: true, Kind: "Pod"}, + }, + }, + }, + { + Group: metav1.APIGroup{ + Versions: []metav1.GroupVersionForDiscovery{ + {Version: "v1"}, + }, + }, + VersionedResources: map[string][]metav1.APIResource{ + "v1": { + {Name: "nodes", Namespaced: true, Kind: "Node"}, + }, + }, + }, + } + mapper := restmapper.NewDiscoveryRESTMapper(resources) + service := New(src, dest, mapper, Options{}) + + ctx, cancel := context.WithCancel(context.Background()) + + createdPods := sets.New[podKey]() + for _, pod := range tt.initialPodsInSrcCluster { + p, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pod) + if err != nil { + t.Fatalf("failed to convert pod to unstructured: %v", err) + } + unstructedPod := &unstructured.Unstructured{Object: p} + _, err = src.Resource(v1.Resource("pods").WithVersion("v1")).Namespace(pod.Namespace).Create(ctx, unstructedPod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create pod: %v", err) + } + createdPods.Insert(podKey{pod.Name, pod.Namespace}) + } + + go service.Run(ctx) + defer cancel() + + for _, pod := range tt.podsCreatedInSrcCluster { + p, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pod) + if err != nil { + t.Fatalf("failed to convert pod to unstructured: %v", err) + } + unstructedPod := &unstructured.Unstructured{Object: p} + _, err = src.Resource(v1.Resource("pods").WithVersion("v1")).Namespace(pod.Namespace).Create(ctx, unstructedPod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create pod: %v", err) + } + createdPods.Insert(podKey{pod.Name, pod.Namespace}) + } + + for _, pod := range tt.podsUpdatedInSrcCluster { + p, err := runtime.DefaultUnstructuredConverter.ToUnstructured(pod) + if err != nil { + t.Fatalf("failed to convert pod to unstructured: %v", err) + } + unstructedPod := &unstructured.Unstructured{Object: p} + _, err = src.Resource(v1.Resource("pods").WithVersion("v1")).Namespace(pod.Namespace).Update(ctx, unstructedPod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("failed to update pod: %v", err) + } + } + + for _, pod := range tt.podsDeletedInSrcCluster { + err := src.Resource(v1.Resource("pods").WithVersion("v1")).Namespace(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("failed to delete pod: %v", err) + } + } + + errMessage := "" + err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, false, func(context.Context) (done bool, err error) { + checkedPods := sets.New[podKey]() + for _, pod := range tt.afterPodsInDestCluster { + // get Pod from dest cluster + p, err := dest.Resource(v1.Resource("pods").WithVersion("v1")).Namespace(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + errMessage = fmt.Sprintf("failed to get pod: %v", err) + return false, nil + } + + // convert Pod to v1.Pod + var got v1.Pod + err = runtime.DefaultUnstructuredConverter.FromUnstructured(p.Object, &got) + if err != nil { + errMessage = fmt.Sprintf("failed to convert pod to v1.Pod: %v", err) + return false, nil + } + + if diff := cmp.Diff(pod, &got, cmpopts.IgnoreTypes(metav1.Time{})); diff != "" { + errMessage = fmt.Sprintf("diff: %s", diff) + return false, nil + } + checkedPods.Insert(podKey{pod.Name, pod.Namespace}) + } + + for _, pod := range createdPods.Difference(checkedPods).UnsortedList() { + // get Pod from dest cluster + _, err := dest.Resource(v1.Resource("pods").WithVersion("v1")).Namespace(pod.namespace).Get(ctx, pod.name, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + errMessage = fmt.Sprintf("failed to get pod: %v", err) + return false, nil + } + if err == nil { + errMessage = fmt.Sprintf("pod %s/%s should be deleted", pod.namespace, pod.name) + return false, nil + } + } + + return true, nil + }) + if err != nil { + t.Fatal(errMessage) + } + }) + } +} + +type podKey struct{ name, namespace string }