Skip to content

Commit

Permalink
feat: support join & leave for member controllers
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhiying Lin committed Jul 23, 2024
1 parent eb1c94e commit 0f3b9e9
Show file tree
Hide file tree
Showing 23 changed files with 348 additions and 42 deletions.
7 changes: 5 additions & 2 deletions cmd/mcs-controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
//+kubebuilder:scaffold:imports
clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/utils/controller"

fleetnetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1"
"go.goms.io/fleet-networking/pkg/common/hubconfig"
Expand Down Expand Up @@ -241,12 +242,13 @@ func setupControllersWithManager(_ context.Context, hubMgr, memberMgr manager.Ma
hubClient := hubMgr.GetClient()

klog.V(1).InfoS("Create multiclusterservice reconciler")
if err := (&multiclusterservice.Reconciler{
mcs := &multiclusterservice.Reconciler{
Client: memberClient,
Scheme: memberMgr.GetScheme(),
FleetSystemNamespace: *fleetSystemNamespace,
Recorder: memberMgr.GetEventRecorderFor(multiclusterservice.ControllerName),
}).SetupWithManager(memberMgr); err != nil {
}
if err := mcs.SetupWithManager(memberMgr); err != nil {
klog.ErrorS(err, "Unable to create multiclusterservice reconciler")
return err
}
Expand All @@ -269,6 +271,7 @@ func setupControllersWithManager(_ context.Context, hubMgr, memberMgr manager.Ma
MemberClient: memberClient,
HubClient: hubClient,
AgentType: clusterv1beta1.MultiClusterServiceAgent,
Controllers: []controller.MemberController{mcs},
}).SetupWithManager(hubMgr); err != nil {
klog.ErrorS(err, "Unable to create internalmembercluster (v1beta1 API) reconciler")
return err
Expand Down
45 changes: 31 additions & 14 deletions cmd/member-net-controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
//+kubebuilder:scaffold:imports
clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/utils/controller"

fleetnetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1"
"go.goms.io/fleet-networking/pkg/common/env"
Expand Down Expand Up @@ -259,79 +260,94 @@ func setupControllersWithManager(ctx context.Context, hubMgr, memberMgr manager.
memberClient := memberMgr.GetClient()
hubClient := hubMgr.GetClient()

var controllers []controller.MemberController
klog.V(1).InfoS("Create endpointslice controller")
if err := (&endpointslice.Reconciler{
endpointSliceController := &endpointslice.Reconciler{
MemberClusterID: mcName,
MemberClient: memberClient,
HubClient: hubClient,
HubNamespace: mcHubNamespace,
}).SetupWithManager(ctx, memberMgr); err != nil {
}
if err := endpointSliceController.SetupWithManager(ctx, memberMgr); err != nil {
klog.ErrorS(err, "Unable to create endpointslice controller")
return err
}
controllers = append(controllers, endpointSliceController)

klog.V(1).InfoS("Create endpointsliceexport controller")
if err := (&endpointsliceexport.Reconciler{
endpointSliceExportController := &endpointsliceexport.Reconciler{
MemberClient: memberClient,
HubClient: hubClient,
}).SetupWithManager(hubMgr); err != nil {
}
if err := endpointSliceExportController.SetupWithManager(hubMgr); err != nil {
klog.ErrorS(err, "Unable to create endpointsliceexport controller")
return err
}
controllers = append(controllers, endpointSliceExportController)

klog.V(1).InfoS("Create endpointsliceimport controller")
if err := (&endpointsliceimport.Reconciler{
endpointSliceImportController := &endpointsliceimport.Reconciler{
MemberClusterID: mcName,
MemberClient: memberClient,
HubClient: hubClient,
FleetSystemNamespace: *fleetSystemNamespace,
}).SetupWithManager(ctx, memberMgr, hubMgr); err != nil {
}
if err := endpointSliceImportController.SetupWithManager(ctx, memberMgr, hubMgr); err != nil {
klog.ErrorS(err, "Unable to create endpointsliceimport controller")
return err
}
controllers = append(controllers, endpointSliceImportController)

klog.V(1).InfoS("Create internalserviceexport controller")
if err := (&internalserviceexport.Reconciler{
internalServiceExportController := &internalserviceexport.Reconciler{
MemberClusterID: mcName,
MemberClient: memberClient,
HubClient: hubClient,
Recorder: memberMgr.GetEventRecorderFor(internalserviceexport.ControllerName),
}).SetupWithManager(hubMgr); err != nil {
}
if err := internalServiceExportController.SetupWithManager(hubMgr); err != nil {
klog.ErrorS(err, "Unable to create internalserviceexport controller")
return err
}
controllers = append(controllers, internalServiceExportController)

klog.V(1).InfoS("Create internalserviceimport controller")
if err := (&internalserviceimport.Reconciler{
internalServiceImportController := &internalserviceimport.Reconciler{
MemberClient: memberClient,
HubClient: hubClient,
}).SetupWithManager(hubMgr); err != nil {
}
if err := internalServiceImportController.SetupWithManager(hubMgr); err != nil {
klog.ErrorS(err, "Unable to create internalserviceimport controller")
return err
}
controllers = append(controllers, internalServiceImportController)

klog.V(1).InfoS("Create serviceexport reconciler")
if err := (&serviceexport.Reconciler{
serviceExportController := &serviceexport.Reconciler{
MemberClient: memberClient,
HubClient: hubClient,
MemberClusterID: mcName,
HubNamespace: mcHubNamespace,
Recorder: memberMgr.GetEventRecorderFor(serviceexport.ControllerName),
}).SetupWithManager(memberMgr); err != nil {
}
if err := serviceExportController.SetupWithManager(memberMgr); err != nil {
klog.ErrorS(err, "Unable to create serviceexport reconciler")
return err
}
controllers = append(controllers, serviceExportController)

klog.V(1).InfoS("Create serviceimport reconciler")
if err := (&serviceimport.Reconciler{
serviceImportController := &serviceimport.Reconciler{
MemberClient: memberClient,
HubClient: hubClient,
MemberClusterID: mcName,
HubNamespace: mcHubNamespace,
}).SetupWithManager(memberMgr); err != nil {
}
if err := serviceImportController.SetupWithManager(memberMgr); err != nil {
klog.ErrorS(err, "Unable to create serviceimport reconciler")
return err
}
controllers = append(controllers, serviceImportController)

if *isV1Alpha1APIEnabled {
klog.V(1).InfoS("Create internalmembercluster (v1alpha1 API) reconciler")
Expand All @@ -351,6 +367,7 @@ func setupControllersWithManager(ctx context.Context, hubMgr, memberMgr manager.
MemberClient: memberClient,
HubClient: hubClient,
AgentType: clusterv1beta1.ServiceExportImportAgent,
Controllers: controllers,
}).SetupWithManager(hubMgr); err != nil {
klog.ErrorS(err, "Unable to create internalmembercluster (v1beta1 API) reconciler")
return err
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ require (

require (
github.com/stretchr/testify v1.9.0
go.goms.io/fleet v0.10.5
go.goms.io/fleet v0.10.8
golang.org/x/sync v0.7.0
)

require (
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.goms.io/fleet v0.10.5 h1:Zc+pLk77zWv0hAqBbFZEMMd05MVw9P8jp8YHTy7WPdI=
go.goms.io/fleet v0.10.5/go.mod h1:FpVP3YsiewmyGH77Yx6sLngHbZKgepnmJDIibz2pjZo=
go.goms.io/fleet v0.10.7 h1:kVPcH+XPO894chIoHlMK0cNIi7xDAqy771yIAk4bQIQ=
go.goms.io/fleet v0.10.7/go.mod h1:2MaaOUGGespUMwgy64MBIMXELv8lDJq+0/NyS3OGzTw=
go.goms.io/fleet v0.10.8 h1:AAK4wr4uKB8ATMhC4cpCKYAq9lMr9XLYE5QE+vkBf5M=
go.goms.io/fleet v0.10.8/go.mod h1:2MaaOUGGespUMwgy64MBIMXELv8lDJq+0/NyS3OGzTw=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
Expand All @@ -120,6 +122,8 @@ golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
29 changes: 29 additions & 0 deletions pkg/controllers/member/endpointslice/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"context"
"fmt"
"strconv"
"sync/atomic"
"time"

discoveryv1 "k8s.io/api/discovery/v1"
Expand Down Expand Up @@ -53,6 +54,8 @@ type Reconciler struct {
HubClient client.Client
// The namespace reserved for the current member cluster in the hub cluster.
HubNamespace string
// whether to start exporting an EndpointSlice
joined atomic.Bool
}

//+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=endpointsliceexports,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -110,6 +113,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return ctrl.Result{}, nil
}

if !r.joined.Load() {
klog.V(2).InfoS("EndpointSlice controller is not started yet, requeue the request", "endpointSlice", endpointSliceRef)
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}

// Retrieve the unique name assigned; if none has been assigned, or the one assigned is not valid, possibly due
// to user tampering with the annotation, assign a new unique name.
fleetUniqueName, ok := endpointSlice.Annotations[objectmeta.ExportedObjectAnnotationUniqueName]
Expand Down Expand Up @@ -439,3 +447,24 @@ func (r *Reconciler) annotateLastSeenGenerationAndTimestamp(ctx context.Context,
endpointSlice.Annotations[metrics.MetricsAnnotationLastSeenTimestamp] = startTime.Format(metrics.MetricsLastSeenTimestampFormat)
return r.MemberClient.Update(ctx, endpointSlice)
}

// Join marks the joined status as true.
func (r *Reconciler) Join(_ context.Context) error {
if r.joined.Load() {
return nil
}
klog.InfoS("Mark the endpointSlice controller joined")
r.joined.Store(true)
return nil
}

// Leave marks the joined status as false.
// When the controller is in the leave state, it will only handle the delete events.
func (r *Reconciler) Leave(_ context.Context) error {
if !r.joined.Load() {
return nil
}
klog.InfoS("Mark the endpointSlice controller left")
r.joined.Store(false)
return nil
}
8 changes: 6 additions & 2 deletions pkg/controllers/member/endpointslice/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
hubClient client.Client
ctx context.Context
cancel context.CancelFunc
reconciler *Reconciler
)

// setUpResources help set up resources in the test environment.
Expand Down Expand Up @@ -99,13 +100,15 @@ var _ = BeforeSuite(func() {
ctrlMgr, err := ctrl.NewManager(memberCfg, ctrl.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())

err = (&Reconciler{
reconciler = &Reconciler{
MemberClusterID: memberClusterID,
MemberClient: memberClient,
HubClient: hubClient,
HubNamespace: hubNSForMember,
}).SetupWithManager(ctx, ctrlMgr)
}
err = reconciler.SetupWithManager(ctx, ctrlMgr)
Expect(err).NotTo(HaveOccurred())
Expect(reconciler.Join(ctx)).Should(Succeed())

go func() {
defer GinkgoRecover()
Expand All @@ -116,6 +119,7 @@ var _ = BeforeSuite(func() {

var _ = AfterSuite(func() {
defer klog.Flush()
Expect(reconciler.Leave(ctx)).Should(Succeed())
cancel()

By("tearing down the test environment")
Expand Down
14 changes: 14 additions & 0 deletions pkg/controllers/member/endpointsliceexport/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,17 @@ func isEndpointSliceExportLinkedWithEndpointSlice(endpointSliceExport *fleetnetv
}
return true
}

// Join does nothing.
// There is no need to start or stop the controller as this controller is designed to clean up any invalid
// EndpointSliceExport in the hub cluster.
func (r *Reconciler) Join(_ context.Context) error {
// do nothing
return nil
}

// Leave does nothing.
func (r *Reconciler) Leave(_ context.Context) error {
// do nothing
return nil
}
29 changes: 29 additions & 0 deletions pkg/controllers/member/endpointsliceimport/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package endpointsliceimport
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -82,6 +83,8 @@ type Reconciler struct {
HubClient client.Client
// The namespace reserved for fleet resources in the member cluster.
FleetSystemNamespace string
// whether to start exporting an EndpointSlice
joined atomic.Bool
}

//+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=endpointsliceimports,verbs=get;list;watch;update;patch
Expand Down Expand Up @@ -130,6 +133,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return ctrl.Result{}, nil
}

if !r.joined.Load() {
klog.V(2).InfoS("EndpointSliceImport controller is not started yet, requeue the request", "endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}

// Import the EndpointSlice, or update an imported EndpointSlice.

// Inquire the corresponding MCS to find out which Service the imported EndpointSlice should associate with.
Expand Down Expand Up @@ -428,3 +436,24 @@ func (r *Reconciler) observeMetrics(ctx context.Context, endpointSliceImport *fl
"isFirstImport", isFirstImport)
return nil
}

// Join marks the joined status as true.
func (r *Reconciler) Join(_ context.Context) error {
if r.joined.Load() {
return nil
}
klog.InfoS("Mark the endpointSliceImport controller joined")
r.joined.Store(true)
return nil
}

// Leave marks the joined status as false.
// When the controller is in the leave state, it will only handle the delete events.
func (r *Reconciler) Leave(_ context.Context) error {
if !r.joined.Load() {
return nil
}
klog.InfoS("Mark the endpointSliceImport controller left")
r.joined.Store(false)
return nil
}
8 changes: 6 additions & 2 deletions pkg/controllers/member/endpointsliceimport/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
hubClient client.Client
ctx context.Context
cancel context.CancelFunc
reconciler *Reconciler
)

// setUpResources help set up resources in the test environment.
Expand Down Expand Up @@ -116,12 +117,14 @@ var _ = BeforeSuite(func() {
hubClient = hubCtrlMgr.GetClient()
Expect(hubClient).NotTo(BeNil())

err = (&Reconciler{
reconciler = &Reconciler{
MemberClient: memberClient,
HubClient: hubClient,
FleetSystemNamespace: fleetSystemNS,
}).SetupWithManager(ctx, memberCtrlMgr, hubCtrlMgr)
}
err = reconciler.SetupWithManager(ctx, memberCtrlMgr, hubCtrlMgr)
Expect(err).NotTo(HaveOccurred())
Expect(reconciler.Join(ctx)).Should(Succeed())

go func() {
defer GinkgoRecover()
Expand All @@ -141,6 +144,7 @@ var _ = BeforeSuite(func() {

var _ = AfterSuite(func() {
defer klog.Flush()
Expect(reconciler.Leave(ctx)).Should(Succeed())
cancel()

By("tearing down the test environment")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ var _ = BeforeSuite(func() {
filepath.Join("../../../../../", "config", "crd", "bases"),
// need to make sure the version matches the one in the go.mod
// workaround mentioned in https://github.com/kubernetes-sigs/controller-runtime/issues/1191
filepath.Join(build.Default.GOPATH, "pkg", "mod", "go.goms.io", "fleet@v0.10.5", "config", "crd", "bases"),
filepath.Join(build.Default.GOPATH, "pkg", "mod", "go.goms.io", "fleet@v0.10.8", "config", "crd", "bases"),
},
ErrorIfCRDPathMissing: true,
}
Expand Down
Loading

0 comments on commit 0f3b9e9

Please sign in to comment.