Skip to content

Commit

Permalink
Implement syncer (#367)
Browse files Browse the repository at this point in the history
* add syncer

Co-authored-by: Kensei Nakada <handbomusic@gmail.com>

* add syncer to the di container and invoke it from the entry point

---------

Co-authored-by: Kensei Nakada <handbomusic@gmail.com>
  • Loading branch information
saza-ku and sanposhiho committed Aug 29, 2024
1 parent f041ac5 commit 79850fb
Show file tree
Hide file tree
Showing 11 changed files with 1,079 additions and 21 deletions.
42 changes: 36 additions & 6 deletions simulator/cmd/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,24 @@ import (
"golang.org/x/xerrors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/klog/v2"

"sigs.k8s.io/kube-scheduler-simulator/simulator/config"
"sigs.k8s.io/kube-scheduler-simulator/simulator/server"
"sigs.k8s.io/kube-scheduler-simulator/simulator/server/di"
"sigs.k8s.io/kube-scheduler-simulator/simulator/syncer"
)

const (
kubeAPIServerPollInterval = 5 * time.Second
kubeAPIServerReadyTimeout = 2 * time.Minute
importTimeout = 2 * time.Minute
)

// entry point.
Expand All @@ -40,13 +51,23 @@ func startSimulator() error {
Host: cfg.KubeAPIServerURL,
}
client := clientset.NewForConfigOrDie(restCfg)
dynamicClient := dynamic.NewForConfigOrDie(restCfg)
discoverClient := discovery.NewDiscoveryClient(client.RESTClient())
cachedDiscoveryClient := memory.NewMemCacheClient(discoverClient)
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscoveryClient)

importClusterResourceClient := &clientset.Clientset{}
if cfg.ExternalImportEnabled {
var importClusterDynamicClient dynamic.Interface
if cfg.ExternalImportEnabled || cfg.ResourceSyncEnabled {
importClusterResourceClient, err = clientset.NewForConfig(cfg.ExternalKubeClientCfg)
if err != nil {
return xerrors.Errorf("creates a new Clientset for the ExternalKubeClientCfg: %w", err)
}

importClusterDynamicClient, err = dynamic.NewForConfig(cfg.ExternalKubeClientCfg)
if err != nil {
return xerrors.Errorf("creates a new dynamic Clientset for the ExternalKubeClientCfg: %w", err)
}
}

etcdclient, err := clientv3.New(clientv3.Config{
Expand All @@ -57,9 +78,10 @@ func startSimulator() error {
return xerrors.Errorf("create an etcd client: %w", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = wait.PollUntilContextCancel(ctx, time.Second*5, true, func(ctx context.Context) (bool, error) {

err = wait.PollUntilContextTimeout(ctx, kubeAPIServerPollInterval, kubeAPIServerReadyTimeout, true, func(ctx context.Context) (bool, error) {
_, err := client.CoreV1().Namespaces().Get(context.Background(), "kube-system", metav1.GetOptions{})
if err != nil {
klog.Infof("waiting for kube-system namespace to be ready: %v", err)
Expand All @@ -72,7 +94,7 @@ func startSimulator() error {
return xerrors.Errorf("kubeapi-server is not ready: %w", err)
}

dic, err := di.NewDIContainer(client, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, importClusterResourceClient, cfg.ExternalSchedulerEnabled, cfg.Port)
dic, err := di.NewDIContainer(client, dynamicClient, restMapper, etcdclient, restCfg, cfg.InitialSchedulerCfg, cfg.ExternalImportEnabled, cfg.ResourceSyncEnabled, importClusterResourceClient, importClusterDynamicClient, cfg.ExternalSchedulerEnabled, cfg.Port, syncer.Options{})
if err != nil {
return xerrors.Errorf("create di container: %w", err)
}
Expand All @@ -86,13 +108,21 @@ func startSimulator() error {
// If ExternalImportEnabled is enabled, the simulator import resources
// from the target cluster that indicated by the `KUBECONFIG`.
if cfg.ExternalImportEnabled {
ctx := context.Background()
// This must be called after `StartScheduler`
if err := dic.OneshotClusterResourceImporter().ImportClusterResources(ctx); err != nil {
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, importTimeout)
defer timeoutCancel()
if err := dic.OneshotClusterResourceImporter().ImportClusterResources(timeoutCtx); err != nil {
return xerrors.Errorf("import from the target cluster: %w", err)
}
}

if cfg.ResourceSyncEnabled {
// Start the resource syncer to sync resources from the target cluster.
if err = dic.ResourceSyncer().Run(ctx); err != nil {
return xerrors.Errorf("start syncing: %w", err)
}
}

// start simulator server
s := server.NewSimulatorServer(cfg, dic)
shutdownFn, err := s.Start(cfg.Port)
Expand Down
15 changes: 12 additions & 3 deletions simulator/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ etcdURL: "http://127.0.0.1:2379"
corsAllowedOriginList:
- "http://localhost:3000"

# This is for the beta feature "Importing cluster's resources".
# This is for the beta feature "One-shot importing cluster's resources"
# and "Continuous syncing cluster's resources".
# This variable is used to find Kubeconfig required to access your
# cluster for importing resources to scheduler simulator.
kubeConfig: "/kubeconfig.yaml"
Expand All @@ -35,10 +36,18 @@ kubeApiServerUrl: ""
kubeSchedulerConfigPath: ""

# This variable indicates whether the simulator will
# import resources from an user cluster's or not.
# Note, this is still a beta feature.
# import resources from a user cluster specified by kubeConfig.
# Note that it only imports the resources once when the simulator is started.
# You cannot make both externalImportEnabled and resourceSyncEnabled true because those features would be conflicted.
# This is still a beta feature.
externalImportEnabled: false

# This variable indicates whether the simulator will
# keep syncing resources from an user cluster's or not.
# You cannot make both externalImportEnabled and resourceSyncEnabled true because those features would be conflicted.
# Note, this is still a beta feature.
resourceSyncEnabled: false

# This variable indicates whether an external scheduler
# is used.
externalSchedulerEnabled: false
28 changes: 25 additions & 3 deletions simulator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ type Config struct {
KubeAPIServerURL string
EtcdURL string
CorsAllowedOriginList []string
// ExternalImportEnabled indicates whether the simulator will import resources from an target cluster or not.
// ExternalImportEnabled indicates whether the simulator will import resources from a target cluster once
// when it's started.
ExternalImportEnabled bool
// ExternalImportEnabled indicates whether the simulator will keep syncing resources from a target cluster.
ResourceSyncEnabled bool
// ExternalKubeClientCfg is KubeConfig to get resources from external cluster.
// This field is non-empty only when ExternalImportEnabled == true.
// This field should be set when ExternalImportEnabled == true or ResourceSyncEnabled == true.
ExternalKubeClientCfg *rest.Config
InitialSchedulerCfg *configv1.KubeSchedulerConfiguration
// ExternalSchedulerEnabled indicates whether an external scheduler is enabled.
Expand All @@ -48,6 +51,8 @@ const (
)

// NewConfig gets some settings from config file or environment variables.
//
//nolint:cyclop
func NewConfig() (*Config, error) {
if err := LoadYamlConfig(defaultFilePath); err != nil {
return nil, err
Expand All @@ -74,8 +79,12 @@ func NewConfig() (*Config, error) {
}

externalimportenabled := getExternalImportEnabled()
resourceSyncEnabled := getResourceSyncEnabled()
externalKubeClientCfg := &rest.Config{}
if externalimportenabled {
if externalimportenabled && resourceSyncEnabled {
return nil, xerrors.Errorf("externalImportEnabled and resourceSyncEnabled cannot be used simultaneously.")
}
if externalimportenabled || resourceSyncEnabled {
externalKubeClientCfg, err = clientcmd.BuildConfigFromFlags("", configYaml.KubeConfig)
if err != nil {
return nil, xerrors.Errorf("get kube clientconfig: %w", err)
Expand All @@ -98,6 +107,7 @@ func NewConfig() (*Config, error) {
ExternalImportEnabled: externalimportenabled,
ExternalKubeClientCfg: externalKubeClientCfg,
ExternalSchedulerEnabled: externalSchedEnabled,
ResourceSyncEnabled: resourceSyncEnabled,
}, nil
}

Expand Down Expand Up @@ -257,6 +267,18 @@ func getExternalImportEnabled() bool {
return isExternalImportEnabled
}

// getResourceSyncEnabled reads RESOURCE_SYNC_ENABLED and converts it to bool
// if empty from the config file.
// This function will return `true` if `RESOURCE_SYNC_ENABLED` is "1".
func getResourceSyncEnabled() bool {
resourceSyncEnabledString := os.Getenv("RESOURCE_SYNC_ENABLED")
if resourceSyncEnabledString == "" {
resourceSyncEnabledString = strconv.FormatBool(configYaml.ResourceSyncEnabled)
}
resourceSyncEnabled, _ := strconv.ParseBool(resourceSyncEnabledString)
return resourceSyncEnabled
}

func decodeSchedulerCfg(buf []byte) (*configv1.KubeSchedulerConfiguration, error) {
decoder := scheme.Codecs.UniversalDeserializer()
obj, _, err := decoder.Decode(buf, nil, nil)
Expand Down
4 changes: 4 additions & 0 deletions simulator/config/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ type SimulatorConfiguration struct {
// Note, this is still a beta feature.
ExternalImportEnabled bool `json:"externalImportEnabled,omitempty"`

// This variable indicates whether the simulator will
// sync resources from an user cluster's or not.
ResourceSyncEnabled bool `json:"resourceSyncEnabled,omitempty"`

// This variable indicates whether an external scheduler
// is used.
ExternalSchedulerEnabled bool `json:"externalSchedulerEnabled,omitempty"`
Expand Down
33 changes: 26 additions & 7 deletions simulator/docs/import-cluster-resources.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,35 @@
### [Beta] Import your real cluster's resources

The simulator can import resources from your cluster.
There are two ways to import resources from your cluster. These methods cannot be used simultaneously.
- Import resources from your cluster once when initializing the simulator.
- Keep importing resources from your cluster.

To use this, you need to follow these two steps
- Set to `true` the `externalImportEnabled` value in the simulator server configuration.
- Set the path of the kubeconfig file of the your cluster to `KubeConfig` value in the Simulator Server Configuration.
#### Import resources once when initializing the simulator

To use this, you need to follow these two steps in the simulator configuration:
- Set `true` to `externalImportEnabled`.
- Set the path of the kubeconfig file for the your cluster to `KubeConfig`.
- This feature only requires the read permission for resources.

```yaml
externalImportEnabled: true
kubeConfig: "/path/to/your-cluster-kubeconfig"
```
#### Keep importing resources
To use this, you need to follow these two steps in the scheduler configuration:
- Set `true` to `resourceSyncEnabled`.
- Set the path of the kubeconfig file for the your cluster to `KubeConfig`.
- This feature only requires the read permission for resources.

```yaml
externalImportEnabled: false
resourceSyncEnabled: true
kubeConfig: "/path/to/your-cluster-kubeconfig"
```

Then, the simulator imports resources from your cluster once when it's initialized.
> [!NOTE]
> When you enable `resourceSyncEnabled`, adding/updating/deleting resources directly in the simulator cluster could cause a problem of syncing.
> You can do them for debugging etc purposes though, make sure you reboot the simulator and the fake source cluster afterward.

See also [simulator/docs/simulator-server-config.md](simulator-server-config.md).
See [simulator/docs/simulator-server-config.md](simulator/docs/simulator-server-config.md) for more information about the simulator configuration.
12 changes: 10 additions & 2 deletions simulator/docs/simulator-server-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,18 @@ kubeAPIServerURL: ""
kubeSchedulerConfigPath: ""
# This variable indicates whether the simulator will
# import resources from an user cluster's or not.
# Note, this is still a beta feature.
# import resources from a user cluster specified by kubeConfig.
# Note that it only imports the resources once when the simulator is started.
# You cannot make both externalImportEnabled and resourceSyncEnabled true because those features would be conflicted.
# This is still a beta feature.
externalImportEnabled: false
# This variable indicates whether the simulator will
# keep syncing resources from an user cluster's or not.
# You cannot make both externalImportEnabled and resourceSyncEnabled true because those features would be conflicted.
# Note, this is still a beta feature.
resourceSyncEnabled: false
# This variable indicates whether an external scheduler
# is used.
externalSchedulerEnabled: false
Expand Down
17 changes: 17 additions & 0 deletions simulator/server/di/di.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package di
import (
clientv3 "go.etcd.io/etcd/client/v3"
"golang.org/x/xerrors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
configv1 "k8s.io/kube-scheduler/config/v1"
Expand All @@ -15,6 +17,7 @@ import (
"sigs.k8s.io/kube-scheduler-simulator/simulator/resourcewatcher"
"sigs.k8s.io/kube-scheduler-simulator/simulator/scheduler"
"sigs.k8s.io/kube-scheduler-simulator/simulator/snapshot"
"sigs.k8s.io/kube-scheduler-simulator/simulator/syncer"
)

// Container saves and provides dependencies.
Expand All @@ -23,6 +26,7 @@ type Container struct {
snapshotService SnapshotService
resetService ResetService
oneshotClusterResourceImporter OneShotClusterResourceImporter
resourceSyncer ResourceSyncer
resourceWatcherService ResourceWatcherService
}

Expand All @@ -31,13 +35,18 @@ type Container struct {
// Only when externalImportEnabled is true, the simulator uses externalClient and creates ImportClusterResourceService.
func NewDIContainer(
client clientset.Interface,
dynamicClient dynamic.Interface,
restMapper meta.RESTMapper,
etcdclient *clientv3.Client,
restclientCfg *restclient.Config,
initialSchedulerCfg *configv1.KubeSchedulerConfiguration,
externalImportEnabled bool,
resourceSyncEnabled bool,
externalClient clientset.Interface,
externalDynamicClient dynamic.Interface,
externalSchedulerEnabled bool,
simulatorPort int,
syncerOptions syncer.Options,
) (*Container, error) {
c := &Container{}

Expand All @@ -54,6 +63,9 @@ func NewDIContainer(
extSnapshotSvc := snapshot.NewService(externalClient, c.schedulerService)
c.oneshotClusterResourceImporter = oneshotimporter.NewService(snapshotSvc, extSnapshotSvc)
}
if resourceSyncEnabled {
c.resourceSyncer = syncer.New(externalDynamicClient, dynamicClient, restMapper, syncerOptions)
}
c.resourceWatcherService = resourcewatcher.NewService(client)

return c, nil
Expand All @@ -80,6 +92,11 @@ func (c *Container) OneshotClusterResourceImporter() OneShotClusterResourceImpor
return c.oneshotClusterResourceImporter
}

// ResourceSyncer returns ResourceSyncer.
func (c *Container) ResourceSyncer() ResourceSyncer {
return c.resourceSyncer
}

// ResourceWatcherService returns ResourceWatcherService.
func (c *Container) ResourceWatcherService() ResourceWatcherService {
return c.resourceWatcherService
Expand Down
7 changes: 7 additions & 0 deletions simulator/server/di/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ type OneShotClusterResourceImporter interface {
ImportClusterResources(ctx context.Context) error
}

// ResourceSyncer represents a service to constantly sync resources from an target cluster.
type ResourceSyncer interface {
// Run starts the resource syncer.
// It should be run until the context is canceled.
Run(ctx context.Context) error
}

// ResourceWatcherService represents service for watch k8s resources.
type ResourceWatcherService interface {
ListWatch(ctx context.Context, stream streamwriter.ResponseStream, lrVersions *resourcewatcher.LastResourceVersions) error
Expand Down
Loading

0 comments on commit 79850fb

Please sign in to comment.