From 6bbea5c3f1492eb65c105391040c9ae4a00ea4f2 Mon Sep 17 00:00:00 2001 From: Waleed Malik Date: Mon, 19 Aug 2024 18:57:09 +0500 Subject: [PATCH] Fix status propagation and global topology (#54) * Fix status propagation and global topology Signed-off-by: Waleed Malik * Update docs Signed-off-by: Waleed Malik --------- Signed-off-by: Waleed Malik --- .../v1alpha1/sync_secret_types.go | 2 + .../v1alpha1/zz_generated.deepcopy.go | 5 + .../crds/kubelb.k8c.io_syncsecrets.yaml | 2 + charts/kubelb-ccm/templates/clusterrole.yaml | 1 + .../crds/kubelb.k8c.io_syncsecrets.yaml | 2 + .../kubelb-manager/templates/clusterrole.yaml | 12 ++ cmd/ccm/main.go | 8 +- cmd/kubelb/main.go | 13 ++ .../crd/bases/kubelb.k8c.io_syncsecrets.yaml | 2 + config/kubelb/rbac/role.yaml | 12 ++ docs/api-reference.md | 3 +- .../controllers/ccm/gateway_controller.go | 14 +- .../ccm/gateway_grpcroute_controller.go | 13 +- .../ccm/gateway_httproute_controller.go | 13 +- .../controllers/ccm/ingress_controller.go | 13 +- .../ccm/secret_conversion_controller.go | 1 + .../controllers/ccm/service_controller.go | 57 +++--- internal/controllers/ccm/shared.go | 43 +++++ .../kubelb/bridgeservice_controller.go | 169 ++++++++++++++++++ .../controllers/kubelb/envoy_cp_controller.go | 19 +- .../controllers/kubelb/route_controller.go | 93 +++++++--- .../kubelb/sync_secret_controller.go | 7 - internal/kubelb/utils.go | 2 + .../resources/gatewayapi/gateway/gateway.go | 17 +- internal/resources/ingress/ingress.go | 2 +- internal/resources/service/service.go | 54 +++++- internal/util/kubernetes/secret.go | 4 +- 27 files changed, 469 insertions(+), 114 deletions(-) create mode 100644 internal/controllers/kubelb/bridgeservice_controller.go diff --git a/api/kubelb.k8c.io/v1alpha1/sync_secret_types.go b/api/kubelb.k8c.io/v1alpha1/sync_secret_types.go index 80b1a4a..84b77b7 100644 --- a/api/kubelb.k8c.io/v1alpha1/sync_secret_types.go +++ b/api/kubelb.k8c.io/v1alpha1/sync_secret_types.go @@ -31,6 +31,8 @@ type SyncSecret struct { // Source: https://pkg.go.dev/k8s.io/api/core/v1#Secret + // +optional + Immutable *bool `json:"immutable,omitempty" protobuf:"varint,5,opt,name=immutable"` // +optional Data map[string][]byte `json:"data,omitempty" protobuf:"bytes,2,rep,name=data"` diff --git a/api/kubelb.k8c.io/v1alpha1/zz_generated.deepcopy.go b/api/kubelb.k8c.io/v1alpha1/zz_generated.deepcopy.go index 961b120..2f64aa0 100644 --- a/api/kubelb.k8c.io/v1alpha1/zz_generated.deepcopy.go +++ b/api/kubelb.k8c.io/v1alpha1/zz_generated.deepcopy.go @@ -759,6 +759,11 @@ func (in *SyncSecret) DeepCopyInto(out *SyncSecret) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + if in.Immutable != nil { + in, out := &in.Immutable, &out.Immutable + *out = new(bool) + **out = **in + } if in.Data != nil { in, out := &in.Data, &out.Data *out = make(map[string][]byte, len(*in)) diff --git a/charts/kubelb-ccm/crds/kubelb.k8c.io_syncsecrets.yaml b/charts/kubelb-ccm/crds/kubelb.k8c.io_syncsecrets.yaml index 9867a3d..ea040f9 100644 --- a/charts/kubelb-ccm/crds/kubelb.k8c.io_syncsecrets.yaml +++ b/charts/kubelb-ccm/crds/kubelb.k8c.io_syncsecrets.yaml @@ -33,6 +33,8 @@ spec: format: byte type: string type: object + immutable: + type: boolean kind: description: |- Kind is a string value representing the REST resource this object represents. diff --git a/charts/kubelb-ccm/templates/clusterrole.yaml b/charts/kubelb-ccm/templates/clusterrole.yaml index bc597fc..555816c 100644 --- a/charts/kubelb-ccm/templates/clusterrole.yaml +++ b/charts/kubelb-ccm/templates/clusterrole.yaml @@ -25,6 +25,7 @@ rules: - patch - update - watch + - delete - apiGroups: - "" resources: diff --git a/charts/kubelb-manager/crds/kubelb.k8c.io_syncsecrets.yaml b/charts/kubelb-manager/crds/kubelb.k8c.io_syncsecrets.yaml index 9867a3d..ea040f9 100644 --- a/charts/kubelb-manager/crds/kubelb.k8c.io_syncsecrets.yaml +++ b/charts/kubelb-manager/crds/kubelb.k8c.io_syncsecrets.yaml @@ -33,6 +33,8 @@ spec: format: byte type: string type: object + immutable: + type: boolean kind: description: |- Kind is a string value representing the REST resource this object represents. diff --git a/charts/kubelb-manager/templates/clusterrole.yaml b/charts/kubelb-manager/templates/clusterrole.yaml index 8904839..ca70df4 100644 --- a/charts/kubelb-manager/templates/clusterrole.yaml +++ b/charts/kubelb-manager/templates/clusterrole.yaml @@ -53,6 +53,18 @@ rules: - patch - update - watch +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - gateway.networking.k8s.io resources: diff --git a/cmd/ccm/main.go b/cmd/ccm/main.go index 0691c4f..be5330b 100644 --- a/cmd/ccm/main.go +++ b/cmd/ccm/main.go @@ -232,7 +232,7 @@ func main() { if !disableIngressController { if err = (&ccm.IngressReconciler{ Client: mgr.GetClient(), - LBClient: kubeLBMgr.GetClient(), + LBManager: kubeLBMgr, ClusterName: clusterName, Log: ctrl.Log.WithName("controllers").WithName(ccm.IngressControllerName), Scheme: mgr.GetScheme(), @@ -247,7 +247,7 @@ func main() { if !disableGatewayController && !disableGatewayAPI { if err = (&ccm.GatewayReconciler{ Client: mgr.GetClient(), - LBClient: kubeLBMgr.GetClient(), + LBManager: kubeLBMgr, ClusterName: clusterName, Log: ctrl.Log.WithName("controllers").WithName(ccm.GatewayControllerName), Scheme: mgr.GetScheme(), @@ -262,7 +262,7 @@ func main() { if !disableHTTPRouteController && !disableGatewayAPI { if err = (&ccm.HTTPRouteReconciler{ Client: mgr.GetClient(), - LBClient: kubeLBMgr.GetClient(), + LBManager: kubeLBMgr, ClusterName: clusterName, Log: ctrl.Log.WithName("controllers").WithName(ccm.GatewayHTTPRouteControllerName), Scheme: mgr.GetScheme(), @@ -276,7 +276,7 @@ func main() { if !disableGRPCRouteController && !disableGatewayAPI { if err = (&ccm.GRPCRouteReconciler{ Client: mgr.GetClient(), - LBClient: kubeLBMgr.GetClient(), + LBManager: kubeLBMgr, ClusterName: clusterName, Log: ctrl.Log.WithName("controllers").WithName(ccm.GatewayGRPCRouteControllerName), Scheme: mgr.GetScheme(), diff --git a/cmd/kubelb/main.go b/cmd/kubelb/main.go index d9eb143..922bcf4 100644 --- a/cmd/kubelb/main.go +++ b/cmd/kubelb/main.go @@ -240,6 +240,19 @@ func main() { } } + // This is only required when using global topology. + if conf.IsGlobalTopology() { + if err = (&kubelb.BridgeServiceReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Log: ctrl.Log.WithName("controllers").WithName(kubelb.BridgeServiceControllerName), + Recorder: mgr.GetEventRecorderFor(kubelb.BridgeServiceControllerName), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", kubelb.BridgeServiceControllerName) + os.Exit(1) + } + } + go func() { setupLog.Info("starting kubelb envoy manager") diff --git a/config/crd/bases/kubelb.k8c.io_syncsecrets.yaml b/config/crd/bases/kubelb.k8c.io_syncsecrets.yaml index 9867a3d..ea040f9 100644 --- a/config/crd/bases/kubelb.k8c.io_syncsecrets.yaml +++ b/config/crd/bases/kubelb.k8c.io_syncsecrets.yaml @@ -33,6 +33,8 @@ spec: format: byte type: string type: object + immutable: + type: boolean kind: description: |- Kind is a string value representing the REST resource this object represents. diff --git a/config/kubelb/rbac/role.yaml b/config/kubelb/rbac/role.yaml index 3bc4617..e595640 100644 --- a/config/kubelb/rbac/role.yaml +++ b/config/kubelb/rbac/role.yaml @@ -84,6 +84,18 @@ rules: - patch - update - watch +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - gateway.networking.k8s.io resources: diff --git a/docs/api-reference.md b/docs/api-reference.md index ebfec56..f746def 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -453,7 +453,7 @@ _Appears in:_ | --- | --- | --- | --- | | `name` _string_ | The name of this port within the service. This must be a DNS_LABEL.
All ports within a ServiceSpec must have unique names. When considering
the endpoints for a Service, this must match the 'name' field in the
EndpointPort.
Optional if only one ServicePort is defined on this service. | | | | `protocol` _[Protocol](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#protocol-v1-core)_ | The IP protocol for this port. Supports "TCP", "UDP", and "SCTP".
Default is TCP. | | | -| `appProtocol` _string_ | The application protocol for this port.
This is used as a hint for implementations to offer richer behavior for protocols that they understand.
This field follows standard Kubernetes label syntax.
Valid values are either:

*Un-prefixed protocol names - reserved for IANA standard service names (as per
RFC-6335 and ).

* Kubernetes-defined prefixed names:
*'kubernetes.io/h2c' - HTTP/2 prior knowledge over cleartext as described in
* 'kubernetes.io/ws' - WebSocket over cleartext as described in
*'kubernetes.io/wss' - WebSocket over TLS as described in

* Other protocols should use implementation-defined prefixed names such as
mycompany.com/my-custom-protocol. | | | +| `appProtocol` _string_ | The application protocol for this port.
This is used as a hint for implementations to offer richer behavior for protocols that they understand.
This field follows standard Kubernetes label syntax.
Valid values are either:

_Un-prefixed protocol names - reserved for IANA standard service names (as per
RFC-6335 and ).

_ Kubernetes-defined prefixed names:
_'kubernetes.io/h2c' - HTTP/2 prior knowledge over cleartext as described in
_ 'kubernetes.io/ws' - WebSocket over cleartext as described in
_'kubernetes.io/wss' - WebSocket over TLS as described in

_ Other protocols should use implementation-defined prefixed names such as
mycompany.com/my-custom-protocol. | | | | `port` _integer_ | The port that will be exposed by this service. | | | | `targetPort` _[IntOrString](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#intorstring-intstr-util)_ | Number or name of the port to access on the pods targeted by the service.
Number must be in the range 1 to 65535. Name must be an IANA_SVC_NAME.
If this is a string, it will be looked up as a named port in the
target Pod's container ports. If this is not specified, the value
of the 'port' field is used (an identity map).
This field is ignored for services with clusterIP=None, and should be
omitted or set equal to the 'port' field.
More info: | | | | `nodePort` _integer_ | The port on each node on which this service is exposed when type is
NodePort or LoadBalancer. Usually assigned by the system. If a value is
specified, in-range, and not in use it will be used, otherwise the
operation will fail. If not specified, a port will be allocated if this
Service requires one. If this field is specified when creating a
Service which does not need it, creation will fail. This field will be
wiped when updating a Service to no longer need it (e.g. changing type
from NodePort to ClusterIP).
More info: | | | @@ -482,6 +482,7 @@ _Appears in:_ | `apiVersion` _string_ | `kubelb.k8c.io/v1alpha1` | | | | `kind` _string_ | `SyncSecret` | | | | `metadata` _[ObjectMeta](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#objectmeta-v1-meta)_ | Refer to Kubernetes API documentation for fields of `metadata`. | | | +| `immutable` _boolean_ | | | | | `data` _object (keys:string, values:integer array)_ | | | | | `stringData` _object (keys:string, values:string)_ | | | | | `type` _[SecretType](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#secrettype-v1-core)_ | | | | diff --git a/internal/controllers/ccm/gateway_controller.go b/internal/controllers/ccm/gateway_controller.go index 9c02df4..b41e323 100644 --- a/internal/controllers/ccm/gateway_controller.go +++ b/internal/controllers/ccm/gateway_controller.go @@ -37,8 +37,10 @@ import ( ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" "sigs.k8s.io/yaml" ) @@ -53,7 +55,7 @@ const ( type GatewayReconciler struct { ctrlclient.Client - LBClient ctrlclient.Client + LBManager ctrl.Manager ClusterName string UseGatewayClass bool @@ -117,14 +119,14 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct func (r *GatewayReconciler) reconcile(ctx context.Context, log logr.Logger, gateway *gwapiv1.Gateway) error { // Create/update the corresponding Route in LB cluster. - err := reconcileSourceForRoute(ctx, log, r.Client, r.LBClient, gateway, nil, nil, r.ClusterName) + err := reconcileSourceForRoute(ctx, log, r.Client, r.LBManager.GetClient(), gateway, nil, nil, r.ClusterName) if err != nil { return fmt.Errorf("failed to reconcile source for route: %w", err) } // Route was reconciled successfully, now we need to update the status of the Resource. route := kubelbv1alpha1.Route{} - err = r.LBClient.Get(ctx, types.NamespacedName{Name: string(gateway.UID), Namespace: r.ClusterName}, &route) + err = r.LBManager.GetClient().Get(ctx, types.NamespacedName{Name: string(gateway.UID), Namespace: r.ClusterName}, &route) if err != nil { return fmt.Errorf("failed to get Route from LB cluster: %w", err) } @@ -164,7 +166,7 @@ func (r *GatewayReconciler) reconcile(ctx context.Context, log logr.Logger, gate func (r *GatewayReconciler) cleanup(ctx context.Context, gateway *gwapiv1.Gateway) (ctrl.Result, error) { // Find the Route in LB cluster and delete it - err := cleanupRoute(ctx, r.LBClient, string(gateway.UID), r.ClusterName) + err := cleanupRoute(ctx, r.LBManager.GetClient(), string(gateway.UID), r.ClusterName) if err != nil { return reconcile.Result{}, fmt.Errorf("failed to cleanup route: %w", err) } @@ -223,5 +225,9 @@ func (r *GatewayReconciler) shouldReconcile(gateway *gwapiv1.Gateway) bool { func (r *GatewayReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&gwapiv1.Gateway{}, builder.WithPredicates(r.resourceFilter())). + WatchesRawSource( + source.Kind(r.LBManager.GetCache(), &kubelbv1alpha1.Route{}, + handler.TypedEnqueueRequestsFromMapFunc[*kubelbv1alpha1.Route](enqueueRoutes("Gateway.gateway.networking.k8s.io", r.ClusterName))), + ). Complete(r) } diff --git a/internal/controllers/ccm/gateway_grpcroute_controller.go b/internal/controllers/ccm/gateway_grpcroute_controller.go index 13a0b65..4a2ec96 100644 --- a/internal/controllers/ccm/gateway_grpcroute_controller.go +++ b/internal/controllers/ccm/gateway_grpcroute_controller.go @@ -43,6 +43,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" "sigs.k8s.io/yaml" ) @@ -55,7 +56,7 @@ const ( type GRPCRouteReconciler struct { ctrlclient.Client - LBClient ctrlclient.Client + LBManager ctrl.Manager ClusterName string Log logr.Logger @@ -115,14 +116,14 @@ func (r *GRPCRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( func (r *GRPCRouteReconciler) reconcile(ctx context.Context, log logr.Logger, grpcRoute *gwapiv1.GRPCRoute) error { // We need to traverse the GRPCRoute, find all the services associated with it, create/update the corresponding Route in LB cluster. originalServices := grpcrouteHelpers.GetServicesFromGRPCRoute(grpcRoute) - err := reconcileSourceForRoute(ctx, log, r.Client, r.LBClient, grpcRoute, originalServices, nil, r.ClusterName) + err := reconcileSourceForRoute(ctx, log, r.Client, r.LBManager.GetClient(), grpcRoute, originalServices, nil, r.ClusterName) if err != nil { return fmt.Errorf("failed to reconcile source for route: %w", err) } // Route was reconciled successfully, now we need to update the status of the Resource. route := kubelbv1alpha1.Route{} - err = r.LBClient.Get(ctx, types.NamespacedName{Name: string(grpcRoute.UID), Namespace: r.ClusterName}, &route) + err = r.LBManager.GetClient().Get(ctx, types.NamespacedName{Name: string(grpcRoute.UID), Namespace: r.ClusterName}, &route) if err != nil { return fmt.Errorf("failed to get Route from LB cluster: %w", err) } @@ -186,7 +187,7 @@ func (r *GRPCRouteReconciler) cleanup(ctx context.Context, grpcRoute *gwapiv1.GR } // Find the Route in LB cluster and delete it - err = cleanupRoute(ctx, r.LBClient, string(grpcRoute.UID), r.ClusterName) + err = cleanupRoute(ctx, r.LBManager.GetClient(), string(grpcRoute.UID), r.ClusterName) if err != nil { return reconcile.Result{}, fmt.Errorf("failed to cleanup route: %w", err) } @@ -278,5 +279,9 @@ func (r *GRPCRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { &corev1.Service{}, handler.EnqueueRequestsFromMapFunc(r.enqueueResources()), ). + WatchesRawSource( + source.Kind(r.LBManager.GetCache(), &kubelbv1alpha1.Route{}, + handler.TypedEnqueueRequestsFromMapFunc[*kubelbv1alpha1.Route](enqueueRoutes("GRPCRoute.gateway.networking.k8s.io", r.ClusterName))), + ). Complete(r) } diff --git a/internal/controllers/ccm/gateway_httproute_controller.go b/internal/controllers/ccm/gateway_httproute_controller.go index 1097c02..d0a58f1 100644 --- a/internal/controllers/ccm/gateway_httproute_controller.go +++ b/internal/controllers/ccm/gateway_httproute_controller.go @@ -43,6 +43,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" "sigs.k8s.io/yaml" ) @@ -55,7 +56,7 @@ const ( type HTTPRouteReconciler struct { ctrlclient.Client - LBClient ctrlclient.Client + LBManager ctrl.Manager ClusterName string Log logr.Logger @@ -119,14 +120,14 @@ func (r *HTTPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( func (r *HTTPRouteReconciler) reconcile(ctx context.Context, log logr.Logger, httpRoute *gwapiv1.HTTPRoute) error { // We need to traverse the HTTPRoute, find all the services associated with it, create/update the corresponding Route in LB cluster. originalServices := httprouteHelpers.GetServicesFromHTTPRoute(httpRoute) - err := reconcileSourceForRoute(ctx, log, r.Client, r.LBClient, httpRoute, originalServices, nil, r.ClusterName) + err := reconcileSourceForRoute(ctx, log, r.Client, r.LBManager.GetClient(), httpRoute, originalServices, nil, r.ClusterName) if err != nil { return fmt.Errorf("failed to reconcile source for route: %w", err) } // Route was reconciled successfully, now we need to update the status of the Resource. route := kubelbv1alpha1.Route{} - err = r.LBClient.Get(ctx, types.NamespacedName{Name: string(httpRoute.UID), Namespace: r.ClusterName}, &route) + err = r.LBManager.GetClient().Get(ctx, types.NamespacedName{Name: string(httpRoute.UID), Namespace: r.ClusterName}, &route) if err != nil { return fmt.Errorf("failed to get Route from LB cluster: %w", err) } @@ -190,7 +191,7 @@ func (r *HTTPRouteReconciler) cleanup(ctx context.Context, httpRoute *gwapiv1.HT } // Find the Route in LB cluster and delete it - err = cleanupRoute(ctx, r.LBClient, string(httpRoute.UID), r.ClusterName) + err = cleanupRoute(ctx, r.LBManager.GetClient(), string(httpRoute.UID), r.ClusterName) if err != nil { return reconcile.Result{}, fmt.Errorf("failed to cleanup route: %w", err) } @@ -282,5 +283,9 @@ func (r *HTTPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { &corev1.Service{}, handler.EnqueueRequestsFromMapFunc(r.enqueueResources()), ). + WatchesRawSource( + source.Kind(r.LBManager.GetCache(), &kubelbv1alpha1.Route{}, + handler.TypedEnqueueRequestsFromMapFunc[*kubelbv1alpha1.Route](enqueueRoutes("HTTPRoute.gateway.networking.k8s.io", r.ClusterName))), + ). Complete(r) } diff --git a/internal/controllers/ccm/ingress_controller.go b/internal/controllers/ccm/ingress_controller.go index c2c84fc..054b812 100644 --- a/internal/controllers/ccm/ingress_controller.go +++ b/internal/controllers/ccm/ingress_controller.go @@ -44,6 +44,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" "sigs.k8s.io/yaml" ) @@ -56,7 +57,7 @@ const ( type IngressReconciler struct { ctrlclient.Client - LBClient ctrlclient.Client + LBManager ctrl.Manager ClusterName string UseIngressClass bool @@ -121,14 +122,14 @@ func (r *IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct func (r *IngressReconciler) reconcile(ctx context.Context, log logr.Logger, ingress *networkingv1.Ingress) error { // We need to traverse the Ingress, find all the services associated with it, create/update the corresponding Route in LB cluster. originalServices := ingressHelpers.GetServicesFromIngress(*ingress) - err := reconcileSourceForRoute(ctx, log, r.Client, r.LBClient, ingress, originalServices, nil, r.ClusterName) + err := reconcileSourceForRoute(ctx, log, r.Client, r.LBManager.GetClient(), ingress, originalServices, nil, r.ClusterName) if err != nil { return fmt.Errorf("failed to reconcile source for route: %w", err) } // Route was reconciled successfully, now we need to update the status of the Ingress. route := kubelbv1alpha1.Route{} - err = r.LBClient.Get(ctx, types.NamespacedName{Name: string(ingress.UID), Namespace: r.ClusterName}, &route) + err = r.LBManager.GetClient().Get(ctx, types.NamespacedName{Name: string(ingress.UID), Namespace: r.ClusterName}, &route) if err != nil { return fmt.Errorf("failed to get Route from LB cluster: %w", err) } @@ -192,7 +193,7 @@ func (r *IngressReconciler) cleanup(ctx context.Context, ingress *networkingv1.I } // Find the Route in LB cluster and delete it - err = cleanupRoute(ctx, r.LBClient, string(ingress.UID), r.ClusterName) + err = cleanupRoute(ctx, r.LBManager.GetClient(), string(ingress.UID), r.ClusterName) if err != nil { return reconcile.Result{}, fmt.Errorf("failed to cleanup route: %w", err) } @@ -279,5 +280,9 @@ func (r *IngressReconciler) SetupWithManager(mgr ctrl.Manager) error { &corev1.Service{}, handler.EnqueueRequestsFromMapFunc(r.enqueueResources()), ). + WatchesRawSource( + source.Kind(r.LBManager.GetCache(), &kubelbv1alpha1.Route{}, + handler.TypedEnqueueRequestsFromMapFunc[*kubelbv1alpha1.Route](enqueueRoutes("Ingress.networking.k8s.io", r.ClusterName))), + ). Complete(r) } diff --git a/internal/controllers/ccm/secret_conversion_controller.go b/internal/controllers/ccm/secret_conversion_controller.go index 72c3c9a..e93c3e3 100644 --- a/internal/controllers/ccm/secret_conversion_controller.go +++ b/internal/controllers/ccm/secret_conversion_controller.go @@ -108,6 +108,7 @@ func (r *SecretConversionReconciler) reconcile(ctx context.Context, _ logr.Logge syncSecret.Labels[kubelb.LabelOriginName] = secret.Name syncSecret.Data = secret.Data syncSecret.StringData = secret.StringData + syncSecret.Immutable = secret.Immutable syncSecret.Type = secret.Type return CreateOrUpdateSyncSecret(ctx, r.Client, syncSecret) } diff --git a/internal/controllers/ccm/service_controller.go b/internal/controllers/ccm/service_controller.go index 76e6118..aa90483 100644 --- a/internal/controllers/ccm/service_controller.go +++ b/internal/controllers/ccm/service_controller.go @@ -24,17 +24,18 @@ import ( "github.com/go-logr/logr" kubelbv1alpha1 "k8c.io/kubelb/api/kubelb.k8c.io/v1alpha1" - utils "k8c.io/kubelb/internal/controllers" "k8c.io/kubelb/internal/kubelb" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -76,34 +77,35 @@ func (r *KubeLBServiceReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, nil } + // Resource is marked for deletion + if service.DeletionTimestamp != nil { + if controllerutil.ContainsFinalizer(&service, CleanupFinalizer) || controllerutil.ContainsFinalizer(&service, LBFinalizerName) { + return r.cleanupService(ctx, log, &service) + } + // Finalizer doesn't exist so clean up is already done + return reconcile.Result{}, nil + } + if !r.shouldReconcile(service) { return ctrl.Result{}, nil } - clusterEndpoints, useAddressesReference := r.getEndpoints(&service) - - // examine DeletionTimestamp to determine if object is under deletion - if !service.ObjectMeta.DeletionTimestamp.IsZero() { - // The object is being deleted - // our finalizer is present, so lets handle any external dependency - // if fail to delete the external dependency here, return with error - // so that it can be retried - // remove our finalizer from the list and update it. - // Stop reconciliation as the item is being deleted - return r.cleanupService(ctx, log, &service) - } + // Add finalizer if it doesn't exist + if !controllerutil.ContainsFinalizer(&service, CleanupFinalizer) { + if ok := controllerutil.AddFinalizer(&service, CleanupFinalizer); !ok { + log.Error(nil, "Failed to add finalizer for the LB Service") + return ctrl.Result{Requeue: true}, nil + } - // If it does not have our finalizer, then lets add the finalizer and update the object. This is equivalent - // registering our finalizer. - if !utils.ContainsString(service.ObjectMeta.Finalizers, LBFinalizerName) { - service.ObjectMeta.Finalizers = append(service.ObjectMeta.Finalizers, LBFinalizerName) - log.V(4).Info("setting finalizer") + // Remove old finalizer since it is not used anymore. + controllerutil.RemoveFinalizer(&service, LBFinalizerName) if err := r.Update(ctx, &service); err != nil { - return ctrl.Result{}, err + return reconcile.Result{}, fmt.Errorf("failed to add finalizer: %w", err) } } + clusterEndpoints, useAddressesReference := r.getEndpoints(&service) log.V(5).Info("proceeding with", "endpoints", clusterEndpoints) desiredLB := kubelb.MapLoadBalancer(&service, clusterEndpoints, useAddressesReference, r.ClusterName) @@ -166,10 +168,6 @@ func (r *KubeLBServiceReconciler) Reconcile(ctx context.Context, req ctrl.Reques } func (r *KubeLBServiceReconciler) cleanupService(ctx context.Context, log logr.Logger, service *corev1.Service) (reconcile.Result, error) { - if !utils.ContainsString(service.ObjectMeta.Finalizers, LBFinalizerName) { - return ctrl.Result{}, nil - } - lb := &kubelbv1alpha1.LoadBalancer{ ObjectMeta: metav1.ObjectMeta{ Name: string(service.UID), @@ -177,22 +175,17 @@ func (r *KubeLBServiceReconciler) cleanupService(ctx context.Context, log logr.L }, } err := r.KubeLBManager.GetClient().Delete(ctx, lb) - switch { - case apierrors.IsNotFound(err): - return ctrl.Result{}, nil - case err != nil: + if err != nil && !kerrors.IsNotFound(err) { return ctrl.Result{}, fmt.Errorf("deleting LoadBalancer: %w", err) - default: - // proceed } log.V(1).Info("deleting Service LoadBalancer finalizer", "name", lb.Name) - service.ObjectMeta.Finalizers = utils.RemoveString(service.ObjectMeta.Finalizers, LBFinalizerName) + controllerutil.RemoveFinalizer(service, LBFinalizerName) + controllerutil.RemoveFinalizer(service, CleanupFinalizer) if err := r.Update(ctx, service); err != nil { - return ctrl.Result{}, err + return reconcile.Result{}, fmt.Errorf("failed to remove finalizer: %w", err) } - log.V(4).Info("removed finalizer") return ctrl.Result{}, nil diff --git a/internal/controllers/ccm/shared.go b/internal/controllers/ccm/shared.go index 6fcb9c2..c788194 100644 --- a/internal/controllers/ccm/shared.go +++ b/internal/controllers/ccm/shared.go @@ -23,6 +23,7 @@ import ( "github.com/go-logr/logr" kubelbv1alpha1 "k8c.io/kubelb/api/kubelb.k8c.io/v1alpha1" + "k8c.io/kubelb/internal/kubelb" "k8c.io/kubelb/internal/resources/route" serviceHelpers "k8c.io/kubelb/internal/resources/service" "k8c.io/kubelb/internal/resources/unstructured" @@ -30,6 +31,8 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" gwapiv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" ) @@ -79,3 +82,43 @@ func cleanupRoute(ctx context.Context, client ctrlclient.Client, resourceUID str } return nil } + +func enqueueRoutes(gvk, clusterNamespace string) handler.TypedMapFunc[*kubelbv1alpha1.Route] { + return handler.TypedMapFunc[*kubelbv1alpha1.Route](func(_ context.Context, route *kubelbv1alpha1.Route) []reconcile.Request { + if route.GetNamespace() != clusterNamespace { + return []reconcile.Request{} + } + + originalNamespace, ok := route.GetLabels()[kubelb.LabelOriginNamespace] + if !ok || originalNamespace == "" { + // Can't process further + return []reconcile.Request{} + } + + originalName, ok := route.GetLabels()[kubelb.LabelOriginName] + if !ok || originalName == "" { + // Can't process further + return []reconcile.Request{} + } + + resourceGVK, ok := route.GetLabels()[kubelb.LabelOriginResourceKind] + if !ok || originalName == "" { + // Can't process further + return []reconcile.Request{} + } + + if gvk != resourceGVK { + // Can't process further + return []reconcile.Request{} + } + + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Name: originalName, + Namespace: originalNamespace, + }, + }, + } + }) +} diff --git a/internal/controllers/kubelb/bridgeservice_controller.go b/internal/controllers/kubelb/bridgeservice_controller.go new file mode 100644 index 0000000..bcac33f --- /dev/null +++ b/internal/controllers/kubelb/bridgeservice_controller.go @@ -0,0 +1,169 @@ +/* +Copyright 2024 The KubeLB Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelb + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + + "k8c.io/kubelb/internal/kubelb" + "k8c.io/kubelb/internal/util/predicate" + + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + "k8s.io/apimachinery/pkg/api/equality" + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +const ( + BridgeServiceControllerName = "bridge-service-controller" +) + +// BridgeServiceReconciler reconciles the "bridge" service. This service is used to forward traffic from tenant namespace to the controller namespace when global +// topology is used. +type BridgeServiceReconciler struct { + ctrlclient.Client + Log logr.Logger + Scheme *runtime.Scheme + Recorder record.EventRecorder +} + +// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="discovery.k8s.io",resources=endpointslices,verbs=get;list;watch;create;update;patch;delete + +func (r *BridgeServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := r.Log.WithValues("name", req.NamespacedName) + + log.Info("Reconciling Service") + + resource := &corev1.Service{} + if err := r.Get(ctx, req.NamespacedName, resource); err != nil { + if kerrors.IsNotFound(err) { + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + + // Resource is marked for deletion + if resource.DeletionTimestamp != nil { + // Ignore since no cleanup is required. Clean up is done automatically using owner references. + return reconcile.Result{}, nil + } + + err := r.reconcile(ctx, log, resource) + if err != nil { + log.Error(err, "reconciling failed") + } + + return reconcile.Result{}, err +} + +func (r *BridgeServiceReconciler) reconcile(ctx context.Context, _ logr.Logger, object *corev1.Service) error { + // Find original service against the bridge service + name := object.Labels[kubelb.LabelOriginName] + namespace := object.Labels[kubelb.LabelOriginNamespace] + + originalService := &corev1.Service{} + err := r.Get(ctx, ctrlclient.ObjectKey{Name: name, Namespace: namespace}, originalService) + if err != nil { + return fmt.Errorf("failed to get original service: %w", err) + } + + endpointSlice := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace}, + } + endpointSlice.Labels = make(map[string]string) + endpointSlice.Labels = kubelb.AddKubeLBLabels(endpointSlice.Labels, name, namespace, "Service") + endpointSlice.Labels[discoveryv1.LabelServiceName] = name + endpointSlice.AddressType = discoveryv1.AddressTypeIPv4 + for _, port := range originalService.Spec.Ports { + endpointSlice.Ports = append(endpointSlice.Ports, discoveryv1.EndpointPort{ + Name: ptr.To(port.Name), + Port: ptr.To(port.TargetPort.IntVal), + Protocol: ptr.To(port.Protocol), + }) + } + endpointSlice.Endpoints = []discoveryv1.Endpoint{ + { + Addresses: object.Spec.ClusterIPs, + Conditions: discoveryv1.EndpointConditions{ + Ready: ptr.To(true), + }, + }, + } + + ownerReference := metav1.OwnerReference{ + APIVersion: originalService.APIVersion, + Kind: originalService.Kind, + Name: originalService.Name, + UID: originalService.UID, + Controller: ptr.To(true), + } + + // Set owner reference for the resource. + endpointSlice.SetOwnerReferences([]metav1.OwnerReference{ownerReference}) + + return CreateOrUpdateEndpointSlice(ctx, r.Client, endpointSlice) +} + +func CreateOrUpdateEndpointSlice(ctx context.Context, client ctrlclient.Client, obj *discoveryv1.EndpointSlice) error { + key := ctrlclient.ObjectKey{Namespace: obj.Namespace, Name: obj.Name} + existing := &discoveryv1.EndpointSlice{} + if err := client.Get(ctx, key, existing); err != nil { + if !kerrors.IsNotFound(err) { + return fmt.Errorf("failed to get EndpointSlice: %w", err) + } + err := client.Create(ctx, obj) + if err != nil { + return fmt.Errorf("failed to create EndpointSlice: %w", err) + } + return nil + } + + // Update the object if it is different from the existing one. + if equality.Semantic.DeepEqual(existing.Ports, obj.Ports) && + equality.Semantic.DeepEqual(existing.Endpoints, obj.Endpoints) && + equality.Semantic.DeepEqual(existing.Labels, obj.Labels) { + return nil + } + + // Required to update the object. + obj.ResourceVersion = existing.ResourceVersion + obj.UID = existing.UID + + if err := client.Update(ctx, obj); err != nil { + return fmt.Errorf("failed to update EndpointSlice: %w", err) + } + return nil +} + +func (r *BridgeServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + // Watch services with label app.kubernetes.io/type=bridge-service + For(&corev1.Service{}, builder.WithPredicates(predicate.ByLabel(kubelb.LabelAppKubernetesType, kubelb.LabelBridgeService))). + Complete(r) +} diff --git a/internal/controllers/kubelb/envoy_cp_controller.go b/internal/controllers/kubelb/envoy_cp_controller.go index 59610f7..7cbb2dd 100644 --- a/internal/controllers/kubelb/envoy_cp_controller.go +++ b/internal/controllers/kubelb/envoy_cp_controller.go @@ -37,10 +37,13 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -84,12 +87,17 @@ func (r *EnvoyCPReconciler) reconcile(ctx context.Context, req ctrl.Request) err return fmt.Errorf("failed to list LoadBalancers and Routes: %w", err) } + namespace := req.Namespace + if r.EnvoyProxyTopology.IsGlobalTopology() { + namespace = r.Namespace + } + if len(lbs) == 0 && len(routes) == 0 { r.EnvoyCache.ClearSnapshot(snapshotName) - return r.cleanupEnvoyProxy(ctx, appName, req.Namespace) + return r.cleanupEnvoyProxy(ctx, appName, namespace) } - if err := r.ensureEnvoyProxy(ctx, req.Namespace, appName, snapshotName); err != nil { + if err := r.ensureEnvoyProxy(ctx, namespace, appName, snapshotName); err != nil { return fmt.Errorf("failed to update Envoy proxy: %w", err) } @@ -192,10 +200,6 @@ func (r *EnvoyCPReconciler) cleanupEnvoyProxy(ctx context.Context, appName strin log := ctrl.LoggerFrom(ctx).WithValues("reconcile", "envoy-proxy") log.V(2).Info("cleanup envoy-proxy") - if r.EnvoyProxyTopology == EnvoyProxyTopologyGlobal { - namespace = r.Namespace - } - objMeta := v1.ObjectMeta{ Name: fmt.Sprintf(envoyResourcePattern, appName), Namespace: namespace, @@ -371,10 +375,13 @@ func (r *EnvoyCPReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag // find an alternative for this since it is more of a "hack". return ctrl.NewControllerManagedBy(mgr). For(&kubelbv1alpha1.LoadBalancer{}). + // Disable concurrency to ensure that only one snapshot is created at a time. + WithOptions(controller.Options{MaxConcurrentReconciles: 1}). WithEventFilter(utils.ByLabelExistsOnNamespace(ctx, mgr.GetClient())). Watches( &kubelbv1alpha1.Route{}, handler.EnqueueRequestsFromMapFunc(r.enqueueLoadBalancers()), + builder.WithPredicates(predicate.GenerationChangedPredicate{}), ). Watches( &kubelbv1alpha1.Addresses{}, diff --git a/internal/controllers/kubelb/route_controller.go b/internal/controllers/kubelb/route_controller.go index 695ad98..27f36cd 100644 --- a/internal/controllers/kubelb/route_controller.go +++ b/internal/controllers/kubelb/route_controller.go @@ -43,7 +43,9 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -103,15 +105,10 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return reconcile.Result{}, err } - resourceNamespace := resource.Namespace - if config.Spec.EnvoyProxy.Topology == kubelbv1alpha1.EnvoyProxyTopologyGlobal { - resourceNamespace = r.Namespace - } - // Resource is marked for deletion if resource.DeletionTimestamp != nil { if controllerutil.ContainsFinalizer(resource, CleanupFinalizer) { - return r.cleanup(ctx, resource, resourceNamespace) + return r.cleanup(ctx, resource) } // Finalizer doesn't exist so clean up is already done return reconcile.Result{}, nil @@ -125,7 +122,7 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl // If the resource is disabled, we need to clean up the resources if controllerutil.ContainsFinalizer(resource, CleanupFinalizer) && disabled { - return r.cleanup(ctx, resource, resourceNamespace) + return r.cleanup(ctx, resource) } if !shouldReconcile { @@ -143,7 +140,7 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl } } - err = r.reconcile(ctx, log, resource, resourceNamespace, config, tenant) + err = r.reconcile(ctx, log, resource, config, tenant) if err != nil { log.Error(err, "reconciling failed") } @@ -151,17 +148,17 @@ func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return reconcile.Result{}, err } -func (r *RouteReconciler) reconcile(ctx context.Context, log logr.Logger, route *kubelbv1alpha1.Route, resourceNamespace string, config *kubelbv1alpha1.Config, tenant *kubelbv1alpha1.Tenant) error { +func (r *RouteReconciler) reconcile(ctx context.Context, log logr.Logger, route *kubelbv1alpha1.Route, config *kubelbv1alpha1.Config, tenant *kubelbv1alpha1.Tenant) error { annotations := GetAnnotations(tenant, config) // Create or update services based on the route. - err := r.manageServices(ctx, log, route, resourceNamespace, annotations) + err := r.manageServices(ctx, log, route, annotations) if err != nil { return fmt.Errorf("failed to create or update services: %w", err) } // Create or update the route object. - err = r.manageRoutes(ctx, log, route, resourceNamespace, config, tenant, annotations) + err = r.manageRoutes(ctx, log, route, config, tenant, annotations) if err != nil { return fmt.Errorf("failed to create or update route: %w", err) } @@ -169,18 +166,26 @@ func (r *RouteReconciler) reconcile(ctx context.Context, log logr.Logger, route return nil } -func (r *RouteReconciler) cleanup(ctx context.Context, route *kubelbv1alpha1.Route, ns string) (ctrl.Result, error) { +func (r *RouteReconciler) cleanup(ctx context.Context, route *kubelbv1alpha1.Route) (ctrl.Result, error) { // Route will be removed automatically because of owner reference. We need to take care of removing // the services while ensuring that the services are not being used by other routes. for _, value := range route.Status.Resources.Services { log := r.Log.WithValues("name", value.Name, "namespace", value.Namespace) - log.V(1).Info("Deleting service", "name", value.GeneratedName, "namespace", ns) svc := corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: value.GeneratedName, - Namespace: ns, + Namespace: route.Namespace, }, } + + if r.EnvoyProxyTopology.IsGlobalTopology() { + // Bridge services exist in the controller namespace. + if value.Namespace == r.Namespace { + svc.Namespace = r.Namespace + } + } + log.V(1).Info("Deleting service", "name", value.GeneratedName, "namespace", value.Namespace) + if err := r.Client.Delete(ctx, &svc); err != nil { if !kerrors.IsNotFound(err) { return reconcile.Result{}, fmt.Errorf("failed to delete service: %w", err) @@ -201,13 +206,13 @@ func (r *RouteReconciler) cleanup(ctx context.Context, route *kubelbv1alpha1.Rou return reconcile.Result{}, nil } -func (r *RouteReconciler) manageServices(ctx context.Context, log logr.Logger, route *kubelbv1alpha1.Route, resourceNamespace string, annotations kubelbv1alpha1.AnnotationSettings) error { +func (r *RouteReconciler) manageServices(ctx context.Context, log logr.Logger, route *kubelbv1alpha1.Route, annotations kubelbv1alpha1.AnnotationSettings) error { if route.Spec.Source.Kubernetes == nil { return nil } // Before creating/updating services, ensure that the orphaned services are cleaned up. - err := r.cleanupOrphanedServices(ctx, log, route, resourceNamespace) + err := r.cleanupOrphanedServices(ctx, log, route, r.EnvoyProxyTopology.IsGlobalTopology()) if err != nil { return fmt.Errorf("failed to cleanup orphaned services: %w", err) } @@ -221,7 +226,14 @@ func (r *RouteReconciler) manageServices(ctx context.Context, log logr.Logger, r services := []corev1.Service{} for _, service := range route.Spec.Source.Kubernetes.Services { // Transform the service into desired state. - svc := serviceHelpers.GenerateServiceForLBCluster(service.Service, appName, route.Namespace, resourceNamespace, r.PortAllocator, r.EnvoyProxyTopology.IsGlobalTopology(), annotations) + svc := serviceHelpers.GenerateServiceForLBCluster(service.Service, appName, route.Namespace, r.PortAllocator, r.EnvoyProxyTopology.IsGlobalTopology(), annotations) + + // We need a bridge service in the controller namespace. + if r.EnvoyProxyTopology.IsGlobalTopology() { + bridgeService := serviceHelpers.GenerateBridgeService(svc, appName, r.Namespace) + services = append(services, bridgeService) + } + services = append(services, svc) } @@ -240,7 +252,7 @@ func (r *RouteReconciler) manageServices(ctx context.Context, log logr.Logger, r return r.UpdateRouteStatus(ctx, route, *routeStatus) } -func (r *RouteReconciler) cleanupOrphanedServices(ctx context.Context, log logr.Logger, route *kubelbv1alpha1.Route, resourceNamespace string) error { +func (r *RouteReconciler) cleanupOrphanedServices(ctx context.Context, log logr.Logger, route *kubelbv1alpha1.Route, globalTopology bool) error { // Get all the services based on route. desiredServices := map[string]bool{} for _, service := range route.Spec.Source.Kubernetes.Services { @@ -254,13 +266,30 @@ func (r *RouteReconciler) cleanupOrphanedServices(ctx context.Context, log logr. } for key, value := range route.Status.Resources.Services { + ns := route.Namespace if _, ok := desiredServices[key]; !ok { + found := false + if globalTopology && value.Namespace == r.Namespace { + // Bridged services don't have any references in the spec. + for _, statusService := range route.Status.Resources.Services { + if statusService.GeneratedName == value.GeneratedName && statusService.Namespace != value.Namespace { + found = true + break + } + } + if found { + continue + } + + ns = r.Namespace + } + // Service is not desired, so delete it. - log.V(4).Info("Deleting orphaned service", "name", value.GeneratedName, "namespace", resourceNamespace) + log.V(4).Info("Deleting orphaned service", "name", value.GeneratedName, "namespace", route.Namespace) svc := corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: value.GeneratedName, - Namespace: resourceNamespace, + Namespace: ns, }, } if err := r.Client.Delete(ctx, &svc); err != nil { @@ -300,7 +329,7 @@ func (r *RouteReconciler) UpdateRouteStatus(ctx context.Context, route *kubelbv1 }) } -func (r *RouteReconciler) manageRoutes(ctx context.Context, log logr.Logger, route *kubelbv1alpha1.Route, resourceNamespace string, config *kubelbv1alpha1.Config, tenant *kubelbv1alpha1.Tenant, annotations kubelbv1alpha1.AnnotationSettings) error { +func (r *RouteReconciler) manageRoutes(ctx context.Context, log logr.Logger, route *kubelbv1alpha1.Route, config *kubelbv1alpha1.Config, tenant *kubelbv1alpha1.Tenant, annotations kubelbv1alpha1.AnnotationSettings) error { if route.Spec.Source.Kubernetes == nil { return nil } @@ -315,6 +344,7 @@ func (r *RouteReconciler) manageRoutes(ctx context.Context, log logr.Logger, rou Kind: route.Kind, Name: route.Name, UID: route.UID, + Controller: ptr.To(true), } // Set owner reference for the resource. @@ -337,7 +367,7 @@ func (r *RouteReconciler) manageRoutes(ctx context.Context, log logr.Logger, rou // Determine the type of the resource and call the appropriate method switch v := resource.(type) { case *v1.Ingress: // v1 "k8s.io/api/networking/v1" - err = ingressHelpers.CreateOrUpdateIngress(ctx, log, r.Client, v, referencedServices, resourceNamespace, config, tenant, annotations) + err = ingressHelpers.CreateOrUpdateIngress(ctx, log, r.Client, v, referencedServices, route.Namespace, config, tenant, annotations) if err == nil { // Retrieve updated object to get the status. key := client.ObjectKey{Namespace: v.Namespace, Name: v.Name} @@ -351,7 +381,7 @@ func (r *RouteReconciler) manageRoutes(ctx context.Context, log logr.Logger, rou } case *gwapiv1.Gateway: // v1 "sigs.k8s.io/gateway-api/apis/v1" - err = gatewayHelpers.CreateOrUpdateGateway(ctx, log, r.Client, v, resourceNamespace, config, tenant, annotations, config.IsGlobalTopology()) + err = gatewayHelpers.CreateOrUpdateGateway(ctx, log, r.Client, v, route.Namespace, config, tenant, annotations, config.IsGlobalTopology()) if err == nil { // Retrieve updated object to get the status. key := client.ObjectKey{Namespace: v.Namespace, Name: v.Name} @@ -365,7 +395,7 @@ func (r *RouteReconciler) manageRoutes(ctx context.Context, log logr.Logger, rou } case *gwapiv1.HTTPRoute: // v1 "sigs.k8s.io/gateway-api/apis/v1" - err = httprouteHelpers.CreateOrUpdateHTTPRoute(ctx, log, r.Client, v, referencedServices, resourceNamespace, tenant, annotations, config.IsGlobalTopology()) + err = httprouteHelpers.CreateOrUpdateHTTPRoute(ctx, log, r.Client, v, referencedServices, route.Namespace, tenant, annotations, config.IsGlobalTopology()) if err == nil { // Retrieve updated object to get the status. key := client.ObjectKey{Namespace: v.Namespace, Name: v.Name} @@ -379,7 +409,7 @@ func (r *RouteReconciler) manageRoutes(ctx context.Context, log logr.Logger, rou } case *gwapiv1.GRPCRoute: // v1 "sigs.k8s.io/gateway-api/apis/v1" - err = grpcrouteHelpers.CreateOrUpdateGRPCRoute(ctx, log, r.Client, v, referencedServices, resourceNamespace, tenant, annotations, config.IsGlobalTopology()) + err = grpcrouteHelpers.CreateOrUpdateGRPCRoute(ctx, log, r.Client, v, referencedServices, route.Namespace, tenant, annotations, config.IsGlobalTopology()) if err == nil { // Retrieve updated object to get the status. key := client.ObjectKey{Namespace: v.Namespace, Name: v.Name} @@ -472,6 +502,11 @@ func isGatewayAPIDisabled(log logr.Logger, disableGatewayAPI bool, config kubelb func updateServiceStatus(routeStatus *kubelbv1alpha1.RouteStatus, svc *corev1.Service, err error) { originalName := serviceHelpers.GetServiceName(*svc) originalNamespace := serviceHelpers.GetServiceNamespace(*svc) + + if val, ok := svc.Labels[kubelb.LabelAppKubernetesType]; ok && val == kubelb.LabelBridgeService { + originalNamespace = svc.Namespace + } + status := kubelbv1alpha1.RouteServiceStatus{ ResourceState: kubelbv1alpha1.ResourceState{ GeneratedName: svc.GetName(), @@ -571,8 +606,7 @@ func (r *RouteReconciler) SetupWithManager(mgr ctrl.Manager) error { // 1. Watch for changes in Route object. // 2. Skip reconciliation if generation is not changed; only status/metadata changed. return ctrl.NewControllerManagedBy(mgr). - For(&kubelbv1alpha1.Route{}). - WithEventFilter(predicate.GenerationChangedPredicate{}). + For(&kubelbv1alpha1.Route{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). Watches( &kubelbv1alpha1.Config{}, handler.EnqueueRequestsFromMapFunc(r.enqueueRoutesForConfig()), @@ -581,6 +615,10 @@ func (r *RouteReconciler) SetupWithManager(mgr ctrl.Manager) error { &kubelbv1alpha1.Tenant{}, handler.EnqueueRequestsFromMapFunc(r.enqueueRoutesForTenant()), ). + Owns(&v1.Ingress{}). + Owns(&gwapiv1.Gateway{}). + Owns(&gwapiv1.HTTPRoute{}). + Owns(&gwapiv1.GRPCRoute{}). Complete(r) } @@ -632,7 +670,6 @@ func (r *RouteReconciler) enqueueRoutesForTenant() handler.MapFunc { }, }) } - return result } } diff --git a/internal/controllers/kubelb/sync_secret_controller.go b/internal/controllers/kubelb/sync_secret_controller.go index f53e998..03c21d6 100644 --- a/internal/controllers/kubelb/sync_secret_controller.go +++ b/internal/controllers/kubelb/sync_secret_controller.go @@ -104,9 +104,6 @@ func (r *SyncSecretReconciler) reconcile(ctx context.Context, _ logr.Logger, obj secret.Labels = object.Labels secret.Annotations = object.Annotations secret.Namespace = object.Namespace - if r.EnvoyProxyTopology.IsGlobalTopology() { - secret.Namespace = r.Namespace - } // Name needs to be randomized so using the UID of the SyncSecret. secret.Name = string(object.UID) @@ -162,10 +159,6 @@ func (r *SyncSecretReconciler) cleanup(ctx context.Context, object *kubelbv1alph ObjectMeta: metav1.ObjectMeta{Name: string(object.UID), Namespace: object.Namespace}, } - if r.EnvoyProxyTopology.IsGlobalTopology() { - resource.Namespace = r.Namespace - } - err := r.Delete(ctx, resource) if err != nil && !kerrors.IsNotFound(err) { return reconcile.Result{}, fmt.Errorf("failed to delete secret %s from LB cluster: %w", resource.Name, err) diff --git a/internal/kubelb/utils.go b/internal/kubelb/utils.go index 91762ee..191928b 100644 --- a/internal/kubelb/utils.go +++ b/internal/kubelb/utils.go @@ -33,8 +33,10 @@ const LabelLoadBalancerName = "kubelb.k8c.io/lb-name" const LabelTenantName = "kubelb.k8c.io/tenant" const LabelManagedBy = "kubelb.k8c.io/managed-by" const LabelControllerName = "kubelb" +const LabelBridgeService = "bridge-service" const LabelAppKubernetesName = "app.kubernetes.io/name" // mysql +const LabelAppKubernetesType = "app.kubernetes.io/type" // mysql const LabelAppKubernetesInstance = "app.kubernetes.io/instance" // mysql-abcxzy" const LabelAppKubernetesVersion = "app.kubernetes.io/version" // 5.7.21 const LabelAppKubernetesComponent = "app.kubernetes.io/component" // database diff --git a/internal/resources/gatewayapi/gateway/gateway.go b/internal/resources/gatewayapi/gateway/gateway.go index 26a35c1..19ba7b4 100644 --- a/internal/resources/gatewayapi/gateway/gateway.go +++ b/internal/resources/gatewayapi/gateway/gateway.go @@ -28,7 +28,6 @@ import ( "k8s.io/apimachinery/pkg/api/equality" kerrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/utils/ptr" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" ) @@ -63,25 +62,19 @@ func CreateOrUpdateGateway(ctx context.Context, log logr.Logger, client ctrlclie return fmt.Errorf("multiple Gateway objects are not supported") } - // We scope the listeners for the gateway down to the same namespace - for i, listener := range object.Spec.Listeners { - if listener.AllowedRoutes != nil && listener.AllowedRoutes.Namespaces != nil { - object.Spec.Listeners[i].AllowedRoutes.Namespaces.Selector = nil - object.Spec.Listeners[i].AllowedRoutes.Namespaces.From = ptr.To(gwapiv1.NamespacesFromSame) - } - } - // Process annotations. object.Annotations = kubelb.PropagateAnnotations(object.Annotations, annotations) // Process secrets. for i, listener := range object.Spec.Listeners { if listener.TLS != nil { - for _, reference := range listener.TLS.CertificateRefs { - secretName := util.GetSecretNameIfExists(ctx, client, string(reference.Name), object.Namespace) + for j, reference := range listener.TLS.CertificateRefs { + secretName := util.GetSecretNameIfExists(ctx, client, string(reference.Name), object.Namespace, namespace) if secretName != "" { - object.Spec.Listeners[i].TLS.CertificateRefs[0].Name = gwapiv1.ObjectName(secretName) + object.Spec.Listeners[i].TLS.CertificateRefs[j].Name = gwapiv1.ObjectName(secretName) } + // cross namespace references are not allowed + object.Spec.Listeners[i].TLS.CertificateRefs[j].Namespace = nil } } } diff --git a/internal/resources/ingress/ingress.go b/internal/resources/ingress/ingress.go index 935b76c..aef351c 100644 --- a/internal/resources/ingress/ingress.go +++ b/internal/resources/ingress/ingress.go @@ -75,7 +75,7 @@ func CreateOrUpdateIngress(ctx context.Context, log logr.Logger, client ctrlclie // Process secrets. if object.Spec.TLS != nil { for i := range object.Spec.TLS { - secretName := util.GetSecretNameIfExists(ctx, client, object.Spec.TLS[i].SecretName, object.Namespace) + secretName := util.GetSecretNameIfExists(ctx, client, object.Spec.TLS[i].SecretName, object.Namespace, namespace) if secretName != "" { object.Spec.TLS[i].SecretName = secretName } diff --git a/internal/resources/service/service.go b/internal/resources/service/service.go index e7a33fe..913ac7f 100644 --- a/internal/resources/service/service.go +++ b/internal/resources/service/service.go @@ -108,11 +108,11 @@ func NormalizeAndReplicateServices(ctx context.Context, log logr.Logger, client return services, nil } -func GenerateServiceForLBCluster(service corev1.Service, appName, namespace, resourceNamespace string, portAllocator *portlookup.PortAllocator, globalTopology bool, annotations kubelbv1alpha1.AnnotationSettings) corev1.Service { +func GenerateServiceForLBCluster(service corev1.Service, appName, namespace string, portAllocator *portlookup.PortAllocator, globalTopology bool, annotations kubelbv1alpha1.AnnotationSettings) corev1.Service { endpointKey := fmt.Sprintf(kubelb.EnvoyEndpointRoutePattern, namespace, service.Namespace, service.Name) service.Name = kubelb.GenerateName(globalTopology, string(service.UID), GetServiceName(service), service.Namespace) - service.Namespace = resourceNamespace + service.Namespace = namespace service.UID = "" if service.Spec.Type == corev1.ServiceTypeNodePort { service.Spec.Type = corev1.ServiceTypeClusterIP @@ -132,14 +132,58 @@ func GenerateServiceForLBCluster(service corev1.Service, appName, namespace, res } // Replace the selector with the envoy proxy selector. - service.Spec.Selector = map[string]string{ - kubelb.LabelAppKubernetesName: appName, + if globalTopology { + service.Spec.Selector = nil + } else { + // Replace the selector with the envoy proxy selector. + service.Spec.Selector = map[string]string{ + kubelb.LabelAppKubernetesName: appName, + } } service.Annotations = kubelb.PropagateAnnotations(service.Annotations, annotations) - return service } +// GenerateBridgeService creates a service that is used to forward traffic from the tenant namespace to the controller namespace. Controller namespace hosts +// Envoy Proxy instance in case if Global topology is used. What other options were assessed for this: +// 1. ExternalName service that forwards traffic to `service-name.controller-namespace.svc.cluster.local`. Dropped this idea since ExternalName services are not supported in Gateway API +// 2. EndpointSlice with FQDN `service-name.controller-namespace.svc.cluster.local` as endpoint. FQDN is deprecated https://github.com/kubernetes/kubernetes/pull/114677 +// 3. Place all resources in controller namespace - this is possible but is poor from a security and isolation standpoint since we lose the ability of 1-1 mapping of resources to namespaces. Also +// resources are not unique anymore and someone might end up attaching their services/httproutes to Ingresses/Gateways for another tenant. Complete NO GO for Layer 7. +// Decision: Use a bridge service in controller namespace that simply forwards traffic to envoy on the correct target port. Use Service without Selector in tenant namespace and EndpointSlices to +// forward traffic from tenant namespace to the envoy proxy in controller namespace. +func GenerateBridgeService(service corev1.Service, appName, controllerNamespace string) corev1.Service { + bridgeService := corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: service.Name, + Namespace: controllerNamespace, + }, + } + + for _, port := range service.Spec.Ports { + bridgePort := corev1.ServicePort{ + Name: port.Name, + Protocol: port.Protocol, + Port: port.TargetPort.IntVal, + TargetPort: port.TargetPort, + } + bridgeService.Spec.Ports = append(bridgeService.Spec.Ports, bridgePort) + } + + if bridgeService.Labels == nil { + bridgeService.Labels = make(map[string]string) + } + + bridgeService.Labels = kubelb.AddKubeLBLabels(bridgeService.Labels, service.Name, service.Namespace, "") + bridgeService.Labels[kubelb.LabelAppKubernetesType] = kubelb.LabelBridgeService + + bridgeService.Spec.Selector = map[string]string{ + kubelb.LabelAppKubernetesName: appName, + } + bridgeService.Spec.Type = corev1.ServiceTypeClusterIP + return bridgeService +} + func CreateOrUpdateService(ctx context.Context, client ctrlclient.Client, obj *corev1.Service) error { key := ctrlclient.ObjectKey{Namespace: obj.Namespace, Name: obj.Name} existingObj := &corev1.Service{} diff --git a/internal/util/kubernetes/secret.go b/internal/util/kubernetes/secret.go index 9408688..e46c3be 100644 --- a/internal/util/kubernetes/secret.go +++ b/internal/util/kubernetes/secret.go @@ -25,9 +25,9 @@ import ( ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" ) -func GetSecretNameIfExists(ctx context.Context, client ctrlclient.Client, name, namespace string) string { +func GetSecretNameIfExists(ctx context.Context, client ctrlclient.Client, name, namespace string, tenantNamespace string) string { secrets := corev1.SecretList{} - err := client.List(ctx, &secrets, ctrlclient.MatchingLabels{kubelb.LabelOriginName: name, kubelb.LabelOriginNamespace: namespace}) + err := client.List(ctx, &secrets, ctrlclient.InNamespace(tenantNamespace), ctrlclient.MatchingLabels{kubelb.LabelOriginName: name, kubelb.LabelOriginNamespace: namespace}) if err != nil { return "" }