Skip to content

Commit

Permalink
fix: cluster controller to watch pod (#3458)
Browse files Browse the repository at this point in the history
  • Loading branch information
leon-inf authored May 26, 2023
1 parent a817096 commit 86a6bc0
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 7 deletions.
26 changes: 25 additions & 1 deletion controllers/apps/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
"context"
"time"

"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

snapshotv1beta1 "github.com/kubernetes-csi/external-snapshotter/client/v3/apis/volumesnapshot/v1beta1"
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
"github.com/spf13/viper"
Expand Down Expand Up @@ -213,7 +218,8 @@ func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&corev1.PersistentVolumeClaim{}).
Owns(&policyv1.PodDisruptionBudget{}).
Owns(&dataprotectionv1alpha1.BackupPolicy{}).
Owns(&dataprotectionv1alpha1.Backup{})
Owns(&dataprotectionv1alpha1.Backup{}).
Watches(&source.Kind{Type: &corev1.Pod{}}, handler.EnqueueRequestsFromMapFunc(r.filterClusterPods))
if viper.GetBool("VOLUMESNAPSHOT") {
if intctrlutil.InVolumeSnapshotV1Beta1() {
b.Owns(&snapshotv1beta1.VolumeSnapshot{}, builder.OnlyMetadata, builder.Predicates{})
Expand All @@ -223,3 +229,21 @@ func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
}
return b.Complete(r)
}

func (r *ClusterReconciler) filterClusterPods(obj client.Object) []reconcile.Request {
labels := obj.GetLabels()
if v, ok := labels[constant.AppManagedByLabelKey]; !ok || v != constant.AppName {
return []reconcile.Request{}
}
if _, ok := labels[constant.AppInstanceLabelKey]; !ok {
return []reconcile.Request{}
}
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: labels[constant.AppInstanceLabelKey],
},
},
}
}
8 changes: 4 additions & 4 deletions controllers/apps/components/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ func NewComponent(reqCtx intctrlutil.RequestCtx,

switch compDef.WorkloadType {
case appsv1alpha1.Replication:
return replication.NewReplicationComponent(cli, cluster, version, synthesizedComp, dag), nil
return replication.NewReplicationComponent(cli, reqCtx.Recorder, cluster, version, synthesizedComp, dag), nil
case appsv1alpha1.Consensus:
return consensus.NewConsensusComponent(cli, cluster, version, synthesizedComp, dag), nil
return consensus.NewConsensusComponent(cli, reqCtx.Recorder, cluster, version, synthesizedComp, dag), nil
case appsv1alpha1.Stateful:
return stateful.NewStatefulComponent(cli, cluster, version, synthesizedComp, dag), nil
return stateful.NewStatefulComponent(cli, reqCtx.Recorder, cluster, version, synthesizedComp, dag), nil
case appsv1alpha1.Stateless:
return stateless.NewStatelessComponent(cli, cluster, version, synthesizedComp, dag), nil
return stateless.NewStatelessComponent(cli, reqCtx.Recorder, cluster, version, synthesizedComp, dag), nil
}
panic(fmt.Sprintf("unknown workload type: %s, cluster: %s, component: %s, component definition ref: %s",
compDef.WorkloadType, cluster.Name, compSpec.Name, compSpec.ComponentDefRef))
Expand Down
3 changes: 3 additions & 0 deletions controllers/apps/components/consensus/component_consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package consensus

import (
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
Expand All @@ -33,6 +34,7 @@ import (
)

func NewConsensusComponent(cli client.Client,
recorder record.EventRecorder,
cluster *appsv1alpha1.Cluster,
clusterVersion *appsv1alpha1.ClusterVersion,
synthesizedComponent *component.SynthesizedComponent,
Expand All @@ -41,6 +43,7 @@ func NewConsensusComponent(cli client.Client,
StatefulComponentBase: internal.StatefulComponentBase{
ComponentBase: internal.ComponentBase{
Client: cli,
Recorder: recorder,
Cluster: cluster,
ClusterVersion: clusterVersion,
Component: synthesizedComponent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package replication

import (
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
Expand All @@ -33,6 +34,7 @@ import (
)

func NewReplicationComponent(cli client.Client,
recorder record.EventRecorder,
cluster *appsv1alpha1.Cluster,
clusterVersion *appsv1alpha1.ClusterVersion,
synthesizedComponent *component.SynthesizedComponent,
Expand All @@ -41,6 +43,7 @@ func NewReplicationComponent(cli client.Client,
StatefulComponentBase: internal.StatefulComponentBase{
ComponentBase: internal.ComponentBase{
Client: cli,
Recorder: recorder,
Cluster: cluster,
ClusterVersion: clusterVersion,
Component: synthesizedComponent,
Expand Down
3 changes: 3 additions & 0 deletions controllers/apps/components/stateful/component_stateful.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package stateful

import (
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
Expand All @@ -32,6 +33,7 @@ import (
)

func NewStatefulComponent(cli client.Client,
recorder record.EventRecorder,
cluster *appsv1alpha1.Cluster,
clusterVersion *appsv1alpha1.ClusterVersion,
synthesizedComponent *component.SynthesizedComponent,
Expand All @@ -40,6 +42,7 @@ func NewStatefulComponent(cli client.Client,
StatefulComponentBase: internal.StatefulComponentBase{
ComponentBase: internal.ComponentBase{
Client: cli,
Recorder: recorder,
Cluster: cluster,
ClusterVersion: clusterVersion,
Component: synthesizedComponent,
Expand Down
3 changes: 3 additions & 0 deletions controllers/apps/components/stateless/component_stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
Expand All @@ -38,13 +39,15 @@ import (
)

func NewStatelessComponent(cli client.Client,
recorder record.EventRecorder,
cluster *appsv1alpha1.Cluster,
clusterVersion *appsv1alpha1.ClusterVersion,
synthesizedComponent *component.SynthesizedComponent,
dag *graph.DAG) *statelessComponent {
comp := &statelessComponent{
ComponentBase: internal.ComponentBase{
Client: cli,
Recorder: recorder,
Cluster: cluster,
ClusterVersion: clusterVersion,
Component: synthesizedComponent,
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/lifecycle/cluster_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ func (c *clusterPlanBuilder) defaultWalkFuncWithLogging(vertex graph.Vertex) err
c.transCtx.Logger.Error(err, "")
} else {
if node.Action == nil {
c.transCtx.Logger.Error(err, "%T", node)
c.transCtx.Logger.Error(err, fmt.Sprintf("%T", node))
} else {
c.transCtx.Logger.Error(err, "%s %T error", *node.Action, node.Obj)
c.transCtx.Logger.Error(err, fmt.Sprintf("%s %T error", *node.Action, node.Obj))
}
}
}
Expand Down

0 comments on commit 86a6bc0

Please sign in to comment.