Skip to content

Commit

Permalink
refactor: ReplicatedStateMachine (#3733)
Browse files Browse the repository at this point in the history
1. rename the combined component(ConsensusSet&ReplicationSet) to RSM(Replicated State Machine)
2. make package coverage greater than 80.0%
  • Loading branch information
free6om authored Jun 25, 2023
1 parent 70295af commit a0ec155
Show file tree
Hide file tree
Showing 65 changed files with 3,786 additions and 744 deletions.
2 changes: 1 addition & 1 deletion PROJECT
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ resources:
controller: true
domain: kubeblocks.io
group: workloads
kind: ConsensusSet
kind: ReplicatedStateMachine
path: github.com/apecloud/kubeblocks/apis/workloads/v1alpha1
version: v1alpha1
webhooks:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

// ConsensusSetSpec defines the desired state of ConsensusSet
type ConsensusSetSpec struct {
// ReplicatedStateMachineSpec defines the desired state of ReplicatedStateMachine
type ReplicatedStateMachineSpec struct {
// Replicas defines number of Pods
// +kubebuilder:default=1
// +kubebuilder:validation:Minimum=0
Expand All @@ -46,17 +46,17 @@ type ConsensusSetSpec struct {
Template corev1.PodTemplateSpec `json:"template"`

// volumeClaimTemplates is a list of claims that pods are allowed to reference.
// The ConsensusSet controller is responsible for mapping network identities to
// The ReplicatedStateMachine controller is responsible for mapping network identities to
// claims in a way that maintains the identity of a pod. Every claim in
// this list must have at least one matching (by name) volumeMount in one
// container in the template. A claim in this list takes precedence over
// any volumes in the template, with the same name.
// +optional
VolumeClaimTemplates []corev1.PersistentVolumeClaim `json:"volumeClaimTemplates,omitempty"`

// Roles, a list of roles defined in this consensus system.
// Roles, a list of roles defined in the system.
// +kubebuilder:validation:Required
Roles []ConsensusRole `json:"roles"`
Roles []ReplicaRole `json:"roles"`

// RoleObservation provides method to observe role.
// +kubebuilder:validation:Required
Expand All @@ -82,8 +82,8 @@ type ConsensusSetSpec struct {
Credential *Credential `json:"credential,omitempty"`
}

// ConsensusSetStatus defines the observed state of ConsensusSet
type ConsensusSetStatus struct {
// ReplicatedStateMachineStatus defines the observed state of ReplicatedStateMachine
type ReplicatedStateMachineStatus struct {
appsv1.StatefulSetStatus `json:",inline"`

// InitReplicas is the number of pods(members) when cluster first initialized
Expand All @@ -97,36 +97,36 @@ type ConsensusSetStatus struct {

// members' status.
// +optional
MembersStatus []ConsensusMemberStatus `json:"membersStatus,omitempty"`
MembersStatus []MemberStatus `json:"membersStatus,omitempty"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:categories={kubeblocks,all},shortName=csset
// +kubebuilder:resource:categories={kubeblocks,all},shortName=rsm
// +kubebuilder:printcolumn:name="LEADER",type="string",JSONPath=".status.membersStatus[?(@.role.isLeader==true)].podName",description="leader pod name."
// +kubebuilder:printcolumn:name="READY",type="string",JSONPath=".status.readyReplicas",description="ready replicas."
// +kubebuilder:printcolumn:name="REPLICAS",type="string",JSONPath=".status.replicas",description="total replicas."
// +kubebuilder:printcolumn:name="AGE",type="date",JSONPath=".metadata.creationTimestamp"

// ConsensusSet is the Schema for the consensussets API
type ConsensusSet struct {
// ReplicatedStateMachine is the Schema for the replicatedstatemachines API
type ReplicatedStateMachine struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec ConsensusSetSpec `json:"spec,omitempty"`
Status ConsensusSetStatus `json:"status,omitempty"`
Spec ReplicatedStateMachineSpec `json:"spec,omitempty"`
Status ReplicatedStateMachineStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// ConsensusSetList contains a list of ConsensusSet
type ConsensusSetList struct {
// ReplicatedStateMachineList contains a list of ReplicatedStateMachine
type ReplicatedStateMachineList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []ConsensusSet `json:"items"`
Items []ReplicatedStateMachine `json:"items"`
}

type ConsensusRole struct {
type ReplicaRole struct {
// Name, role name.
// +kubebuilder:validation:Required
// +kubebuilder:default=leader
Expand Down Expand Up @@ -175,9 +175,9 @@ type RoleObservation struct {
// after all actions done, the final output should be a single string of the role name defined in spec.Roles
// latest [BusyBox](https://busybox.net/) image will be used if Image not configured
// Environment variables can be used in Command:
// - v_KB_CONSENSUS_SET_LAST_STDOUT stdout from last action, watch 'v_' prefixed
// - KB_CONSENSUS_SET_USERNAME username part of credential
// - KB_CONSENSUS_SET_PASSWORD password part of credential
// - v_KB_RSM_LAST_STDOUT stdout from last action, watch 'v_' prefixed
// - KB_RSM_USERNAME username part of credential
// - KB_RSM_PASSWORD password part of credential
// +kubebuilder:validation:Required
ObservationActions []Action `json:"observationActions"`

Expand All @@ -201,6 +201,7 @@ type RoleObservation struct {
// +optional
PeriodSeconds int32 `json:"periodSeconds,omitempty"`

// Minimum consecutive successes for the observation to be considered successful after having failed.
// Minimum consecutive successes for the observation to be considered successful after having failed.
// Defaults to 1. Minimum value is 1.
// +kubebuilder:default=1
Expand All @@ -218,12 +219,12 @@ type RoleObservation struct {

type Credential struct {
// Username
// variable name will be KB_CONSENSUS_SET_USERNAME
// variable name will be KB_RSM_USERNAME
// +kubebuilder:validation:Required
Username CredentialVar `json:"username"`

// Password
// variable name will be KB_CONSENSUS_SET_PASSWORD
// variable name will be KB_RSM_PASSWORD
// +kubebuilder:validation:Required
Password CredentialVar `json:"password"`
}
Expand All @@ -250,11 +251,11 @@ type CredentialVar struct {

type MembershipReconfiguration struct {
// Environment variables can be used in all following Actions:
// - KB_CONSENSUS_SET_USERNAME username part of credential
// - KB_CONSENSUS_SET_PASSWORD password part of credential
// - KB_CONSENSUS_SET_LEADER_HOST leader host
// - KB_CONSENSUS_SET_TARGET_HOST target host
// - KB_CONSENSUS_SET_SERVICE_PORT port
// - KB_RSM_USERNAME username part of credential
// - KB_RSM_PASSWORD password part of credential
// - KB_RSM_LEADER_HOST leader host
// - KB_RSM_TARGET_HOST target host
// - KB_RSM_SERVICE_PORT port

// SwitchoverAction specifies how to do switchover
// latest [BusyBox](https://busybox.net/) image will be used if Image not configured
Expand Down Expand Up @@ -292,15 +293,15 @@ type Action struct {
Command []string `json:"command"`
}

type ConsensusMemberStatus struct {
type MemberStatus struct {
// PodName pod name.
// +kubebuilder:validation:Required
// +kubebuilder:default=Unknown
PodName string `json:"podName"`

ConsensusRole `json:"role"`
ReplicaRole `json:"role"`
}

func init() {
SchemeBuilder.Register(&ConsensusSet{}, &ConsensusSetList{})
SchemeBuilder.Register(&ReplicatedStateMachine{}, &ReplicatedStateMachineList{})
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,54 +30,54 @@ import (
)

// log is for logging in this package.
var consensussetlog = logf.Log.WithName("consensusset-resource")
var replicatedstatemachinelog = logf.Log.WithName("replicatedstatemachine-resource")

func (r *ConsensusSet) SetupWebhookWithManager(mgr ctrl.Manager) error {
func (r *ReplicatedStateMachine) SetupWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(r).
Complete()
}

// TODO(user): EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!

//+kubebuilder:webhook:path=/mutate-workloads-kubeblocks-io-v1alpha1-consensusset,mutating=true,failurePolicy=fail,sideEffects=None,groups=workloads.kubeblocks.io,resources=consensussets,verbs=create;update,versions=v1alpha1,name=mconsensusset.kb.io,admissionReviewVersions=v1
//+kubebuilder:webhook:path=/mutate-workloads-kubeblocks-io-v1alpha1-replicatedstatemachine,mutating=true,failurePolicy=fail,sideEffects=None,groups=workloads.kubeblocks.io,resources=replicatedstatemachines,verbs=create;update,versions=v1alpha1,name=mreplicatedstatemachine.kb.io,admissionReviewVersions=v1

var _ webhook.Defaulter = &ConsensusSet{}
var _ webhook.Defaulter = &ReplicatedStateMachine{}

// Default implements webhook.Defaulter so a webhook will be registered for the type
func (r *ConsensusSet) Default() {
consensussetlog.Info("default", "name", r.Name)
func (r *ReplicatedStateMachine) Default() {
replicatedstatemachinelog.Info("default", "name", r.Name)

// TODO(user): fill in your defaulting logic.
}

// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation.
//+kubebuilder:webhook:path=/validate-workloads-kubeblocks-io-v1alpha1-consensusset,mutating=false,failurePolicy=fail,sideEffects=None,groups=workloads.kubeblocks.io,resources=consensussets,verbs=create;update,versions=v1alpha1,name=vconsensusset.kb.io,admissionReviewVersions=v1
//+kubebuilder:webhook:path=/validate-workloads-kubeblocks-io-v1alpha1-replicatedstatemachine,mutating=false,failurePolicy=fail,sideEffects=None,groups=workloads.kubeblocks.io,resources=replicatedstatemachines,verbs=create;update,versions=v1alpha1,name=vreplicatedstatemachine.kb.io,admissionReviewVersions=v1

var _ webhook.Validator = &ConsensusSet{}
var _ webhook.Validator = &ReplicatedStateMachine{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (r *ConsensusSet) ValidateCreate() error {
consensussetlog.Info("validate create", "name", r.Name)
func (r *ReplicatedStateMachine) ValidateCreate() error {
replicatedstatemachinelog.Info("validate create", "name", r.Name)

return r.validate()
}

// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *ConsensusSet) ValidateUpdate(old runtime.Object) error {
consensussetlog.Info("validate update", "name", r.Name)
func (r *ReplicatedStateMachine) ValidateUpdate(old runtime.Object) error {
replicatedstatemachinelog.Info("validate update", "name", r.Name)

return r.validate()
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
func (r *ConsensusSet) ValidateDelete() error {
consensussetlog.Info("validate delete", "name", r.Name)
func (r *ReplicatedStateMachine) ValidateDelete() error {
replicatedstatemachinelog.Info("validate delete", "name", r.Name)

return r.validate()
}

func (r *ConsensusSet) validate() error {
func (r *ReplicatedStateMachine) validate() error {
var allErrs field.ErrorList

// Leader is required
Expand All @@ -104,7 +104,7 @@ func (r *ConsensusSet) validate() error {
return apierrors.NewInvalid(
schema.GroupKind{
Group: "workloads.kubeblocks.io/v1alpha1",
Kind: "ConsensusSet",
Kind: "ReplicatedStateMachine",
},
r.Name, allErrs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _ = Describe("ConsensusSet Webhook", func() {
var _ = Describe("ReplicatedStateMachine Webhook", func() {
Context("spec validation", func() {
const name = "test-consensus-set"
var csSet *ConsensusSet
const name = "test-replicated-state-machine"
var rsm *ReplicatedStateMachine

BeforeEach(func() {
csSet = &ConsensusSet{
rsm = &ReplicatedStateMachine{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: testCtx.DefaultNamespace,
},
Spec: ConsensusSetSpec{
Spec: ReplicatedStateMachineSpec{
Replicas: 1,
RoleObservation: RoleObservation{
ObservationActions: []Action{
Expand All @@ -64,48 +64,48 @@ var _ = Describe("ConsensusSet Webhook", func() {
})

It("should return an error if no leader set", func() {
csSet.Spec.Roles = []ConsensusRole{
rsm.Spec.Roles = []ReplicaRole{
{
Name: "leader",
IsLeader: false,
AccessMode: ReadWriteMode,
},
}
err := k8sClient.Create(ctx, csSet)
err := k8sClient.Create(ctx, rsm)
Expect(err).ShouldNot(BeNil())
Expect(err.Error()).Should(ContainSubstring("leader is required"))
})

It("should return an error if servicePort not provided", func() {
csSet.Spec.Roles = []ConsensusRole{
rsm.Spec.Roles = []ReplicaRole{
{
Name: "leader",
IsLeader: true,
AccessMode: ReadWriteMode,
},
}
err := k8sClient.Create(ctx, csSet)
err := k8sClient.Create(ctx, rsm)
Expect(err).ShouldNot(BeNil())
Expect(err.Error()).Should(ContainSubstring("servicePort must provide"))
})

It("should succeed if spec is well defined", func() {
csSet.Spec.Roles = []ConsensusRole{
rsm.Spec.Roles = []ReplicaRole{
{
Name: "leader",
IsLeader: true,
AccessMode: ReadWriteMode,
},
}
csSet.Spec.Service.Ports = []corev1.ServicePort{
rsm.Spec.Service.Ports = []corev1.ServicePort{
{
Name: "foo",
Protocol: "tcp",
Port: 12345,
},
}
Expect(k8sClient.Create(ctx, csSet)).Should(Succeed())
Expect(k8sClient.Delete(ctx, csSet)).Should(Succeed())
Expect(k8sClient.Create(ctx, rsm)).Should(Succeed())
Expect(k8sClient.Delete(ctx, rsm)).Should(Succeed())
})
})
})
2 changes: 1 addition & 1 deletion apis/workloads/v1alpha1/webhook_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ var _ = BeforeSuite(func() {
})
Expect(err).NotTo(HaveOccurred())

err = (&ConsensusSet{}).SetupWebhookWithManager(mgr)
err = (&ReplicatedStateMachine{}).SetupWebhookWithManager(mgr)
Expect(err).NotTo(HaveOccurred())

testCtx = testutil.NewDefaultTestContext(ctx, k8sClient, testEnv)
Expand Down
20 changes: 9 additions & 11 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,15 +357,13 @@ func main() {
}
}

if viper.GetBool("enable_consensus_set") {
if err = (&workloadscontrollers.ConsensusSetReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("consensus-set-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ConsensusSet")
os.Exit(1)
}
if err = (&workloadscontrollers.ReplicatedStateMachineReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("replicated-state-machine-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ReplicatedStateMachine")
os.Exit(1)
}
// +kubebuilder:scaffold:builder

Expand Down Expand Up @@ -448,8 +446,8 @@ func main() {
os.Exit(1)
}

if err = (&workloadsv1alpha1.ConsensusSet{}).SetupWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "ConsensusSet")
if err = (&workloadsv1alpha1.ReplicatedStateMachine{}).SetupWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "ReplicatedStateMachine")
os.Exit(1)
}

Expand Down
Loading

0 comments on commit a0ec155

Please sign in to comment.