Skip to content

Commit

Permalink
The Job Controller Should Run (minio#1978)
Browse files Browse the repository at this point in the history
  • Loading branch information
cniackz authored Feb 12, 2024
1 parent a5bc5e1 commit 22d4612
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 78 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func StartOperator(kubeconfig string) {
kubeInformerFactory.Core().V1().Services(),
hostsTemplate,
pkg.Version,
minioInformerFactory,
minioInformerFactory.Job().V1alpha1().MinIOJobs(),
)

go kubeInformerFactory.Start(stopCh)
Expand Down
184 changes: 119 additions & 65 deletions pkg/controller/job-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,32 @@ package controller
import (
"context"
"fmt"
"time"

corelisters "k8s.io/client-go/listers/core/v1"
"github.com/minio/minio-go/v7/pkg/set"
"k8s.io/apimachinery/pkg/api/meta"

informers "github.com/minio/operator/pkg/client/informers/externalversions/job.min.io/v1alpha1"
listers "github.com/minio/operator/pkg/client/listers/job.min.io/v1alpha1"
"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/api/errors"
jobinformers "github.com/minio/operator/pkg/client/informers/externalversions/job.min.io/v1alpha1"
joblisters "github.com/minio/operator/pkg/client/listers/job.min.io/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
appslisters "k8s.io/client-go/listers/apps/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
queue "k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)

type jobController struct {
lister listers.MinIOJobLister
// JobController struct watches the Kubernetes API for changes to Tenant resources
type JobController struct {
namespacesToWatch set.StringSet
lister joblisters.MinIOJobLister
hasSynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
kubeClientSet kubernetes.Interface
statefulSetLister appslisters.StatefulSetLister
recorder record.EventRecorder
}

type controllerConfig struct {
serviceLister corelisters.ServiceLister
kubeClientSet kubernetes.Interface
statefulSetLister appslisters.StatefulSetLister
deploymentLister appslisters.DeploymentLister
recorder record.EventRecorder
workqueue queue.RateLimitingInterface
}

// JobControllerInterface is an interface for the controller with the methods supported by it.
Expand All @@ -50,54 +43,126 @@ type JobControllerInterface interface {
HandleObject(obj metav1.Object)
}

func enqueue(c JobControllerInterface, obj interface{}) {
var key string
var err error
if key, err = c.KeyFunc()(obj); err != nil {
utilruntime.HandleError(err)
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *JobController) runJobWorker() {
defer runtime.HandleCrash()
for c.processNextJobWorkItem() {
}
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *JobController) processNextJobWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}

// We wrap this block in a func so we can defer c.workqueue.Done.
processItem := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
klog.V(2).Infof("Key from workqueue: %s", key)

c.SyncHandler(key)
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
klog.V(4).Infof("Successfully synced '%s'", key)
return nil
}

if err := processItem(obj); err != nil {
runtime.HandleError(err)
return true
}
return true
}

func (c *JobController) enqueueJob(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
c.WorkQueue().Add(key)
if !c.namespacesToWatch.IsEmpty() {
meta, err := meta.Accessor(obj)
if err != nil {
runtime.HandleError(err)
return
}
if !c.namespacesToWatch.Contains(meta.GetNamespace()) {
klog.Infof("Ignoring tenant `%s` in namespace that is not watched by this controller.", key)
return
}
}
// key = default/mc-job-1
c.workqueue.AddRateLimited(key)
}

func newJobController(informer informers.MinIOJobInformer, config controllerConfig) *jobController {
rateLimiter := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
)
jobController := &jobController{
lister: informer.Lister(),
hasSynced: informer.Informer().HasSynced,
workqueue: workqueue.NewRateLimitingQueue(rateLimiter),
kubeClientSet: config.kubeClientSet,
statefulSetLister: config.statefulSetLister,
recorder: config.recorder,
// NewJobController returns a new Operator Controller
func NewJobController(
jobinformer jobinformers.MinIOJobInformer,
namespacesToWatch set.StringSet,
joblister joblisters.MinIOJobLister,
hasSynced cache.InformerSynced,
kubeClientSet kubernetes.Interface,
statefulSetLister appslisters.StatefulSetLister,
recorder record.EventRecorder,
workqueue queue.RateLimitingInterface,
) *JobController {
controller := &JobController{
namespacesToWatch: namespacesToWatch,
lister: joblister,
hasSynced: hasSynced,
kubeClientSet: kubeClientSet,
statefulSetLister: statefulSetLister,
recorder: recorder,
workqueue: workqueue,
}

// Set up an event handler for when resources change
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
jobinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
enqueue(jobController, obj)
controller.enqueueJob(obj)
},
UpdateFunc: func(old, new interface{}) {
enqueue(jobController, new)
controller.enqueueJob(new)
},
})
return jobController
}

func (c *jobController) WorkQueue() workqueue.RateLimitingInterface {
return c.workqueue
}

func (c *jobController) KeyFunc() cache.KeyFunc {
return cache.MetaNamespaceKeyFunc
return controller
}

func (c *jobController) HasSynced() cache.InformerSynced {
// HasSynced is to determine if obj is synced
func (c *JobController) HasSynced() cache.InformerSynced {
return c.hasSynced
}

func (c *jobController) HandleObject(obj metav1.Object) {
// HandleObject will take any resource implementing metav1.Object and attempt
// to find the CRD resource that 'owns' it.
func (c *JobController) HandleObject(obj metav1.Object) {
JobCRDResourceKind := "MinIOJob"
if ownerRef := metav1.GetControllerOf(obj); ownerRef != nil {
switch ownerRef.Kind {
Expand All @@ -107,30 +172,19 @@ func (c *jobController) HandleObject(obj metav1.Object) {
klog.V(4).Info("Ignore orphaned object", "object", klog.KObj(job), JobCRDResourceKind, ownerRef.Name)
return
}
enqueue(c, job)
c.enqueueJob(job)
default:
return
}
return
}
}

// syncJobHandler compares the current Job state with the desired, and attempts to
// SyncHandler compares the current Job state with the desired, and attempts to
// converge the two. It then updates the Status block of the Job resource
// with the current status of the resource.
func (c *jobController) SyncHandler(ctx context.Context, name, namespace string) error {
// Get the Job resource with this namespace/name
_, err := c.lister.MinIOJobs(namespace).Get(name)
if err != nil {
// The Job resource may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("job '%s' in work queue no longer exists: %+v", name, err))
return nil
}

return err
}
func (c *JobController) SyncHandler(key string) error {
klog.Info("Job Controller Loop!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")

return nil
}
43 changes: 31 additions & 12 deletions pkg/controller/main-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ import (
miniov2 "github.com/minio/operator/pkg/apis/minio.min.io/v2"
clientset "github.com/minio/operator/pkg/client/clientset/versioned"
minioscheme "github.com/minio/operator/pkg/client/clientset/versioned/scheme"
minioinformers "github.com/minio/operator/pkg/client/informers/externalversions"
jobinformers "github.com/minio/operator/pkg/client/informers/externalversions/job.min.io/v1alpha1"
informers "github.com/minio/operator/pkg/client/informers/externalversions/minio.min.io/v2"
stsInformers "github.com/minio/operator/pkg/client/informers/externalversions/sts.min.io/v1alpha1"
"github.com/minio/operator/pkg/resources/statefulsets"
Expand Down Expand Up @@ -201,7 +201,7 @@ type Controller struct {
// controllers denotes the list of components controlled
// by the controller. Each component is itself
// a controller. This handle is for supporting the abstraction.
controllers []JobControllerInterface
controllers []*JobController
}

// EventType is Event type to handle
Expand Down Expand Up @@ -237,7 +237,7 @@ func NewController(
serviceInformer coreinformers.ServiceInformer,
hostsTemplate,
operatorVersion string,
minioInformerFactory minioinformers.SharedInformerFactory,
jobinformer jobinformers.MinIOJobInformer,
) *Controller {
// Create event broadcaster
// Add minio-controller types to the default Kubernetes Scheme so Events can be
Expand Down Expand Up @@ -278,13 +278,13 @@ func NewController(

oprImg = env.Get(DefaultOperatorImageEnv, oprImg)

controllerConfig := controllerConfig{
serviceLister: serviceInformer.Lister(),
kubeClientSet: kubeClientSet,
statefulSetLister: statefulSetInformer.Lister(),
deploymentLister: deploymentInformer.Lister(),
recorder: recorder,
}
//controllerConfig := controllerConfig{
// serviceLister: serviceInformer.Lister(),
// kubeClientSet: kubeClientSet,
// statefulSetLister: statefulSetInformer.Lister(),
// deploymentLister: deploymentInformer.Lister(),
// recorder: recorder,
//}

controller := &Controller{
podName: podName,
Expand All @@ -308,8 +308,17 @@ func NewController(
operatorVersion: operatorVersion,
policyBindingListerSynced: policyBindingInformer.Informer().HasSynced,
operatorImage: oprImg,
controllers: []JobControllerInterface{
newJobController(minioInformerFactory.Job().V1alpha1().MinIOJobs(), controllerConfig),
controllers: []*JobController{
NewJobController(
jobinformer,
namespacesToWatch,
jobinformer.Lister(),
jobinformer.Informer().HasSynced,
kubeClientSet,
statefulSetInformer.Lister(),
recorder,
queue.NewNamedRateLimitingQueue(MinIOControllerRateLimiter(), "Tenants"),
),
},
}

Expand Down Expand Up @@ -456,6 +465,15 @@ func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-cha
go wait.Until(c.runWorker, time.Second, stopCh)
}

klog.Info("Starting Job workers")
JobController := c.controllers[0]
// fmt.Println(controller.SyncHandler())
// Launch two workers to process Job resources
for i := 0; i < threadiness; i++ {
go wait.Until(JobController.runJobWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, stopCh)
}

// Launch a single worker for Health Check reacting to Pod Changes
go wait.Until(c.runHealthCheckWorker, time.Second, stopCh)

Expand Down Expand Up @@ -762,6 +780,7 @@ func key2NamespaceName(key string) (namespace, name string) {
// converge the two. It then updates the Status block of the Tenant resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) (Result, error) {
klog.Info("MinIO Tenant Main loop!!!!")
ctx := context.Background()
cOpts := metav1.CreateOptions{}
uOpts := metav1.UpdateOptions{}
Expand Down

0 comments on commit 22d4612

Please sign in to comment.