Skip to content

Commit

Permalink
chore: split service creation (#8016)
Browse files Browse the repository at this point in the history
  • Loading branch information
loomts committed Sep 25, 2024
1 parent f76cc82 commit a382a84
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 73 deletions.
73 changes: 11 additions & 62 deletions controllers/apps/transformer_cluster_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (t *clusterServiceTransformer) Transform(ctx graph.TransformContext, dag *g
if err != nil {
return err
}
if err = createOrUpdateService(ctx, dag, graphCli, service, nil); err != nil {
if err = t.createOrUpdateService(ctx, dag, graphCli, service); err != nil {
return err
}
delete(services, service.Name)
Expand Down Expand Up @@ -224,83 +224,32 @@ func (t *clusterServiceTransformer) listOwnedClusterServices(transCtx *clusterTr
return services, nil
}

func createOrUpdateService(ctx graph.TransformContext, dag *graph.DAG, graphCli model.GraphClient, service *corev1.Service, owner client.Object) error {
func (t *clusterServiceTransformer) createOrUpdateService(ctx graph.TransformContext, dag *graph.DAG, graphCli model.GraphClient, service *corev1.Service) error {
key := types.NamespacedName{
Namespace: service.Namespace,
Name: service.Name,
}
obj := &corev1.Service{}
if err := ctx.GetClient().Get(ctx.GetContext(), key, obj, inDataContext4C()); err != nil {
originSvc := &corev1.Service{}
if err := ctx.GetClient().Get(ctx.GetContext(), key, originSvc, inDataContext4C()); err != nil {
if apierrors.IsNotFound(err) {
graphCli.Create(dag, service, inDataContext4G())
return nil
}
return err
}

// don't update service not owned by the owner, to keep compatible with existed cluster
if owner != nil && !model.IsOwnerOf(owner, obj) {
return nil
}

objCopy := obj.DeepCopy()
objCopy.Spec = service.Spec
newSvc := originSvc.DeepCopy()
newSvc.Spec = service.Spec
ctrlutil.MergeMetadataMapInplace(service.Labels, &newSvc.Labels)
ctrlutil.MergeMetadataMapInplace(service.Annotations, &newSvc.Annotations)
resolveServiceDefaultFields(&originSvc.Spec, &newSvc.Spec)

ctrlutil.MergeMetadataMapInplace(service.Labels, &objCopy.Labels)
ctrlutil.MergeMetadataMapInplace(service.Annotations, &objCopy.Annotations)

resolveServiceDefaultFields(&obj.Spec, &objCopy.Spec)

if !reflect.DeepEqual(obj, objCopy) {
graphCli.Update(dag, obj, objCopy, inDataContext4G())
if !reflect.DeepEqual(originSvc, newSvc) {
graphCli.Update(dag, originSvc, newSvc, inDataContext4G())
}
return nil
}

func resolveServiceDefaultFields(obj, objCopy *corev1.ServiceSpec) {
// TODO: how about the order changed?
for i, port := range objCopy.Ports {
if i == len(obj.Ports) {
break
}
// if the service type is NodePort or LoadBalancer, and the nodeport is not set, we should use the nodeport of the exist service
if (objCopy.Type == corev1.ServiceTypeNodePort || objCopy.Type == corev1.ServiceTypeLoadBalancer) && port.NodePort == 0 && obj.Ports[i].NodePort != 0 {
objCopy.Ports[i].NodePort = obj.Ports[i].NodePort
}
if port.TargetPort.IntVal != 0 {
continue
}
port.TargetPort = obj.Ports[i].TargetPort
if reflect.DeepEqual(port, obj.Ports[i]) {
objCopy.Ports[i].TargetPort = obj.Ports[i].TargetPort
}
}
if len(objCopy.ClusterIP) == 0 {
objCopy.ClusterIP = obj.ClusterIP
}
if len(objCopy.ClusterIPs) == 0 {
objCopy.ClusterIPs = obj.ClusterIPs
}
if len(objCopy.Type) == 0 {
objCopy.Type = obj.Type
}
if len(objCopy.SessionAffinity) == 0 {
objCopy.SessionAffinity = obj.SessionAffinity
}
if len(objCopy.IPFamilies) == 0 || (len(objCopy.IPFamilies) == 1 && *objCopy.IPFamilyPolicy != corev1.IPFamilyPolicySingleStack) {
objCopy.IPFamilies = obj.IPFamilies
}
if objCopy.IPFamilyPolicy == nil {
objCopy.IPFamilyPolicy = obj.IPFamilyPolicy
}
if objCopy.InternalTrafficPolicy == nil {
objCopy.InternalTrafficPolicy = obj.InternalTrafficPolicy
}
if objCopy.ExternalTrafficPolicy == "" && obj.ExternalTrafficPolicy != "" {
objCopy.ExternalTrafficPolicy = obj.ExternalTrafficPolicy
}
}

// func checkLegacyServiceExist(ctx graph.TransformContext, serviceName, namespace string) (bool, error) {
// key := types.NamespacedName{
// Namespace: namespace,
Expand Down
159 changes: 159 additions & 0 deletions controllers/apps/transformer_cluster_service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
Copyright (C) 2022-2024 ApeCloud Co., Ltd
This file is part of KubeBlocks project
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package apps

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"golang.org/x/exp/slices"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/model"
"github.com/apecloud/kubeblocks/pkg/controllerutil"
testapps "github.com/apecloud/kubeblocks/pkg/testutil/apps"
)

var _ = Describe("cluster service transformer test", func() {
const (
clusterName = "test-cluster"
clusterDefName = "test-clusterdef"
)

var (
reader *mockReader
dag *graph.DAG
transCtx *clusterTransformContext
)

newDAG := func(graphCli model.GraphClient, cluster *appsv1.Cluster) *graph.DAG {
d := graph.NewDAG()
graphCli.Root(d, cluster, cluster, model.ActionStatusPtr())
return d
}

BeforeEach(func() {
reader = &mockReader{}
graphCli := model.NewGraphClient(reader)
cluster := testapps.NewClusterFactory(testCtx.DefaultNamespace, clusterName, clusterDefName).
SetReplicas(1).
AddService(appsv1.ClusterService{
Service: appsv1.Service{
Name: testapps.ServiceNodePortName,
ServiceName: testapps.ServiceNodePortName,
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
},
},
}).
GetObject()

dag = newDAG(graphCli, cluster)
transCtx = &clusterTransformContext{
Context: ctx,
Client: graphCli,
Logger: logger,
Cluster: cluster,
OrigCluster: cluster.DeepCopy(),
}
})

clusterServiceName := func(clusterName, svcName string) string {
return clusterName + "-" + svcName
}

clusterNodePortService := func() *corev1.Service {
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: testCtx.DefaultNamespace,
Name: clusterServiceName(clusterName, testapps.ServiceNodePortName),
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
},
}
err := controllerutil.SetOwnerReference(transCtx.Cluster, svc)
Expect(err).Should(BeNil())
return svc
}

Context("cluster service", func() {

It("deletion", func() {
reader.objs = append(reader.objs, clusterNodePortService())
// remove cluster services
transCtx.Cluster.Spec.Services = nil
transformer := &clusterServiceTransformer{}
err := transformer.Transform(transCtx, dag)
Expect(err).Should(BeNil())

// check services to delete
graphCli := transCtx.Client.(model.GraphClient)
objs := graphCli.FindAll(dag, &corev1.Service{})
Expect(len(objs)).Should(Equal(len(reader.objs)))
slices.SortFunc(objs, func(a, b client.Object) bool {
return a.GetName() < b.GetName()
})
for i := 0; i < len(reader.objs); i++ {
svc := objs[i].(*corev1.Service)
Expect(svc.Name).Should(Equal(clusterServiceName(clusterName, testapps.ServiceNodePortName)))
Expect(graphCli.IsAction(dag, svc, model.ActionDeletePtr())).Should(BeTrue())
}
})
})

It("ports changed", func() {
port := corev1.ServicePort{Name: "client", Port: 2379, TargetPort: intstr.FromInt32(2379), NodePort: 30001}
newPorts := []corev1.ServicePort{
{Name: "server", Port: 2380, NodePort: 30002},
{Name: port.Name, Port: port.Port, TargetPort: intstr.FromInt32(0), NodePort: 0},
}
expectedPorts := []corev1.ServicePort{
{Name: "client", Port: 2379, TargetPort: intstr.FromInt32(2379), NodePort: 30001},
{Name: "server", Port: 2380, NodePort: 30002},
}
svc := clusterNodePortService()
svc.Spec.Ports = []corev1.ServicePort{port}
reader.objs = append(reader.objs, svc)
transCtx.Cluster.Spec.Services[0].Spec.Ports = newPorts
transformer := &clusterServiceTransformer{}
err := transformer.Transform(transCtx, dag)
Expect(err).Should(BeNil())

// check services to make change
graphCli := transCtx.Client.(model.GraphClient)
objs := graphCli.FindAll(dag, &corev1.Service{})
Expect(len(objs)).Should(Equal(len(transCtx.Cluster.Spec.Services)))

for i := 0; i < len(transCtx.Cluster.Spec.Services); i++ {
svc := objs[i].(*corev1.Service)
slices.SortFunc(svc.Spec.Ports, func(a, b corev1.ServicePort) bool { return a.Name < b.Name })
slices.SortFunc(expectedPorts, func(a, b corev1.ServicePort) bool { return a.Name < b.Name })
Expect(svc.Spec.Ports).Should(Equal(expectedPorts))
Expect(graphCli.IsAction(dag, svc, model.ActionUpdatePtr())).Should(BeTrue())
}
})

})
80 changes: 69 additions & 11 deletions controllers/apps/transformer_component_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ package apps
import (
"context"
"fmt"
"reflect"
"regexp"
"strconv"
"strings"

"golang.org/x/exp/maps"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
Expand Down Expand Up @@ -270,21 +273,76 @@ func (t *componentServiceTransformer) createOrUpdateService(ctx graph.TransformC
}

if podService && kind == multiClusterServicePlacementInUnique {
return t.createOrUpdateServiceInUnique(ctx, dag, graphCli, service, owner)
// create or update service in unique, by hacking the pod placement strategy.
ordinal := func() int {
subs := strings.Split(service.GetName(), "-")
o, _ := strconv.Atoi(subs[len(subs)-1])
return o
}
multicluster.Assign(ctx.GetContext(), service, ordinal)
}

createOrUpdateService := func(service *corev1.Service) error {
key := types.NamespacedName{
Namespace: service.Namespace,
Name: service.Name,
}
originSvc := &corev1.Service{}
if err := ctx.GetClient().Get(ctx.GetContext(), key, originSvc, inDataContext4C()); err != nil {
if apierrors.IsNotFound(err) {
graphCli.Create(dag, service, inDataContext4G())
return nil
}
return err
}

// don't update service not owned by the owner, to keep compatible with existed cluster
if !model.IsOwnerOf(owner, originSvc) {
return nil
}

newSvc := originSvc.DeepCopy()
newSvc.Spec = service.Spec

// if skip immutable check, update the service directly
if skipImmutableCheckForComponentService(originSvc) {
resolveServiceDefaultFields(&originSvc.Spec, &newSvc.Spec)
if !reflect.DeepEqual(originSvc, newSvc) {
graphCli.Update(dag, originSvc, newSvc, inDataContext4G())
}
return nil
}
// otherwise only support to update the override params defined in cluster.spec.componentSpec[].services

overrideMutableParams := func(originSvc, newSvc *corev1.Service) {
newSvc.Spec.Type = originSvc.Spec.Type
newSvc.Name = originSvc.Name
newSvc.Spec.Selector = originSvc.Spec.Selector
newSvc.Annotations = originSvc.Annotations
}

// modify mutable field of newSvc to check if it is overridable
overrideMutableParams(originSvc, newSvc)
if !reflect.DeepEqual(originSvc, newSvc) {
// other fields are immutable, we can't update the service
return nil
}

overrideMutableParams(service, newSvc)
if !reflect.DeepEqual(originSvc, newSvc) {
graphCli.Update(dag, originSvc, newSvc, inDataContext4G())
}
return nil
}
return createOrUpdateService(ctx, dag, graphCli, service, owner)
return createOrUpdateService(service)
}

func (t *componentServiceTransformer) createOrUpdateServiceInUnique(ctx graph.TransformContext, dag *graph.DAG,
graphCli model.GraphClient, service *corev1.Service, owner client.Object) error {
// hack the pod placement strategy.
ordinal := func() int {
subs := strings.Split(service.GetName(), "-")
o, _ := strconv.Atoi(subs[len(subs)-1])
return o
func skipImmutableCheckForComponentService(svc *corev1.Service) bool {
if svc.Annotations == nil {
return false
}
multicluster.Assign(ctx.GetContext(), service, ordinal)
return createOrUpdateService(ctx, dag, graphCli, service, owner)
skip, ok := svc.Annotations[constant.SkipImmutableCheckAnnotationKey]
return ok && strings.ToLower(skip) == "true"
}

func generatePodNames(synthesizeComp *component.SynthesizedComponent) ([]string, error) {
Expand Down
Loading

0 comments on commit a382a84

Please sign in to comment.