Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add flagd-proxy HA configuration #712

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 139 additions & 66 deletions common/flagdproxy/flagdproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@ import (
"golang.org/x/exp/maps"
appsV1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
FlagdProxyDeploymentName = "flagd-proxy"
FlagdProxyServiceAccountName = "open-feature-operator-flagd-proxy"
FlagdProxyServiceName = "flagd-proxy-svc"
FlagdProxyDeploymentName = "flagd-proxy"
FlagdProxyServiceAccountName = "open-feature-operator-flagd-proxy"
FlagdProxyServiceName = "flagd-proxy-svc"
FlagdProxyPodDisruptionBudgetName = "flagd-proxy-pdb"
)

type FlagdProxyHandler struct {
Expand All @@ -37,6 +40,7 @@ type FlagdProxyConfiguration struct {
DebugLogging bool
Image string
Tag string
Replicas int
Namespace string
OperatorDeploymentName string
ImagePullSecrets []string
Expand All @@ -53,6 +57,7 @@ func NewFlagdProxyConfiguration(env types.EnvConfig, imagePullSecrets []string,
Port: env.FlagdProxyPort,
ManagementPort: env.FlagdProxyManagementPort,
DebugLogging: env.FlagdProxyDebugLogging,
Replicas: env.FlagdProxyReplicaCount,
ImagePullSecrets: imagePullSecrets,
Labels: labels,
Annotations: annotations,
Expand All @@ -71,58 +76,99 @@ func (f *FlagdProxyHandler) Config() *FlagdProxyConfiguration {
return f.config
}

func (f *FlagdProxyHandler) createObject(ctx context.Context, obj client.Object) error {
return f.Client.Create(ctx, obj)
func specDiffers(a, b client.Object) (bool, error) {
if a == nil || b == nil {
return false, fmt.Errorf("object is nil")
}

// Compare only spec based on the object type
switch a.(type) {
case *corev1.Service:
return !reflect.DeepEqual(a.(*corev1.Service).Spec, b.(*corev1.Service).Spec), nil
case *appsV1.Deployment:
return !reflect.DeepEqual(a.(*appsV1.Deployment).Spec, b.(*appsV1.Deployment).Spec), nil
case *policyv1.PodDisruptionBudget:
return !reflect.DeepEqual(a.(*policyv1.PodDisruptionBudget).Spec, b.(*policyv1.PodDisruptionBudget).Spec), nil
default:
return false, fmt.Errorf("unsupported object type")
}
}

func (f *FlagdProxyHandler) updateObject(ctx context.Context, obj client.Object) error {
return f.Client.Update(ctx, obj)
// ensureFlagdProxyResource ensures that the given object is reconciled in the cluster. If the object does not exist, it will be created.
func (f *FlagdProxyHandler) ensureFlagdProxyResource(ctx context.Context, obj client.Object) error {
if obj == nil {
return fmt.Errorf("object is nil")
}

return retry.RetryOnConflict(retry.DefaultRetry, func() error {
var old = obj.DeepCopyObject().(client.Object)
f.Log.Info("Ensuring object exists", "name", obj.GetName(), "namespace", obj.GetNamespace())

xvzf marked this conversation as resolved.
Show resolved Hide resolved
// Try to get the existing object
err := f.Client.Get(ctx, client.ObjectKey{Name: old.GetName(), Namespace: old.GetNamespace()}, old)
notFound := errors.IsNotFound(err)
if err != nil && !notFound {
return err
}

// If the object is not found, we will create it
if notFound {
return f.Client.Create(ctx, obj)
}
// If the object exists but is not managed by OFO, return an error
if !common.IsManagedByOFO(old) {
return fmt.Errorf("%s not managed by OFO", obj.GetName())
}

// If the object is found, update if necessary
needsUpdate, err := specDiffers(obj, old)
if err != nil {
return err
}

if needsUpdate {
obj.SetResourceVersion(old.GetResourceVersion())
return f.Client.Update(ctx, obj)
}

return nil
})
}

// HandleFlagdProxy ensures flagd-proxy kubernetes components are configured properly
func (f *FlagdProxyHandler) HandleFlagdProxy(ctx context.Context) error {
exists, deployment, err := f.doesFlagdProxyExist(ctx)
if err != nil {
return err
}
var err error

ownerReference, err := f.getOwnerReference(ctx)
ownerRef, err := f.getOwnerReference(ctx)
if err != nil {
return err
}
newDeployment := f.newFlagdProxyManifest(ownerReference)
newService := f.newFlagdProxyServiceManifest(ownerReference)

if !exists {
f.Log.Info("flagd-proxy Deployment does not exist, creating")
return f.deployFlagdProxy(ctx, f.createObject, newDeployment, newService)
}
// flagd-proxy exists, need to check if we should update it
if f.shouldUpdateFlagdProxy(deployment, newDeployment) {
f.Log.Info("flagd-proxy Deployment out of sync, updating")
return f.deployFlagdProxy(ctx, f.updateObject, newDeployment, newService)
}
f.Log.Info("flagd-proxy Deployment up-to-date")
return nil
}

func (f *FlagdProxyHandler) deployFlagdProxy(ctx context.Context, createUpdateFunc CreateUpdateFunc, deployment *appsV1.Deployment, service *corev1.Service) error {
f.Log.Info("deploying the flagd-proxy")
if err := createUpdateFunc(ctx, deployment); err != nil && !errors.IsAlreadyExists(err) {
if err = f.ensureFlagdProxyResource(ctx, f.newFlagdProxyDeployment(ownerRef)); err != nil {
return err
}
f.Log.Info("deploying the flagd-proxy service")
if err := createUpdateFunc(ctx, service); err != nil && !errors.IsAlreadyExists(err) {

if err = f.ensureFlagdProxyResource(ctx, f.newFlagdProxyService(ownerRef)); err != nil {
return err
}
return nil

err = f.ensureFlagdProxyResource(ctx, f.newFlagdProxyPodDisruptionBudget(ownerRef))
return err
}

func (f *FlagdProxyHandler) newFlagdProxyServiceManifest(ownerReference *metav1.OwnerReference) *corev1.Service {
func (f *FlagdProxyHandler) newFlagdProxyService(ownerReference *metav1.OwnerReference) *corev1.Service {
return &corev1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: FlagdProxyServiceName,
Namespace: f.config.Namespace,
OwnerReferences: []metav1.OwnerReference{*ownerReference},
Labels: map[string]string{
common.ManagedByAnnotationKey: common.ManagedByAnnotationValue,
},
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
Expand All @@ -140,8 +186,41 @@ func (f *FlagdProxyHandler) newFlagdProxyServiceManifest(ownerReference *metav1.
}
}

func (f *FlagdProxyHandler) newFlagdProxyManifest(ownerReference *metav1.OwnerReference) *appsV1.Deployment {
replicas := int32(1)
func (f *FlagdProxyHandler) newFlagdProxyPodDisruptionBudget(ownerReference *metav1.OwnerReference) *policyv1.PodDisruptionBudget {

// Only require pods to be available if there is >1 replica configured (HA setup)
minReplicas := intstr.FromInt(0)
if f.config.Replicas > 1 {
minReplicas = intstr.FromInt(f.config.Replicas / 2)
}

return &policyv1.PodDisruptionBudget{
TypeMeta: metav1.TypeMeta{
Kind: "PodDisruptionBudget",
APIVersion: "policy/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: FlagdProxyPodDisruptionBudgetName,
Namespace: f.config.Namespace,
OwnerReferences: []metav1.OwnerReference{*ownerReference},
Labels: map[string]string{
common.ManagedByAnnotationKey: common.ManagedByAnnotationValue,
},
},
Spec: policyv1.PodDisruptionBudgetSpec{
MinAvailable: &minReplicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app.kubernetes.io/name": FlagdProxyDeploymentName,
common.ManagedByAnnotationKey: common.ManagedByAnnotationValue,
},
},
},
}
}

func (f *FlagdProxyHandler) newFlagdProxyDeployment(ownerReference *metav1.OwnerReference) *appsV1.Deployment {
replicas := int32(f.config.Replicas)
args := []string{
"start",
"--management-port",
Expand All @@ -157,10 +236,10 @@ func (f *FlagdProxyHandler) newFlagdProxyManifest(ownerReference *metav1.OwnerRe
})
}
flagdLabels := map[string]string{
"app": FlagdProxyDeploymentName,
"app.kubernetes.io/name": FlagdProxyDeploymentName,
"app.kubernetes.io/managed-by": common.ManagedByAnnotationValue,
"app.kubernetes.io/version": f.config.Tag,
"app": FlagdProxyDeploymentName,
"app.kubernetes.io/name": FlagdProxyDeploymentName,
common.ManagedByAnnotationKey: common.ManagedByAnnotationValue,
"app.kubernetes.io/version": f.config.Tag,
}
if len(f.config.Labels) > 0 {
maps.Copy(flagdLabels, f.config.Labels)
Expand All @@ -173,13 +252,17 @@ func (f *FlagdProxyHandler) newFlagdProxyManifest(ownerReference *metav1.OwnerRe
}

return &appsV1.Deployment{
TypeMeta: metav1.TypeMeta{
Kind: "Deployment",
APIVersion: "apps/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: FlagdProxyDeploymentName,
Namespace: f.config.Namespace,
Labels: map[string]string{
"app": FlagdProxyDeploymentName,
"app.kubernetes.io/managed-by": common.ManagedByAnnotationValue,
"app.kubernetes.io/version": f.config.Tag,
"app": FlagdProxyDeploymentName,
common.ManagedByAnnotationKey: common.ManagedByAnnotationValue,
"app.kubernetes.io/version": f.config.Tag,
},
OwnerReferences: []metav1.OwnerReference{*ownerReference},
},
Expand Down Expand Up @@ -215,41 +298,31 @@ func (f *FlagdProxyHandler) newFlagdProxyManifest(ownerReference *metav1.OwnerRe
Args: args,
},
},
TopologySpreadConstraints: []corev1.TopologySpreadConstraint{
{
MaxSkew: 1,
TopologyKey: "kubernetes.io/hostname",
WhenUnsatisfiable: corev1.DoNotSchedule,
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app.kubernetes.io/name": FlagdProxyDeploymentName,
common.ManagedByAnnotationKey: common.ManagedByAnnotationValue,
},
},
},
},
},
},
},
}
}

func (f *FlagdProxyHandler) doesFlagdProxyExist(ctx context.Context) (bool, *appsV1.Deployment, error) {
d := &appsV1.Deployment{}
err := f.Client.Get(ctx, client.ObjectKey{Name: FlagdProxyDeploymentName, Namespace: f.config.Namespace}, d)
if err != nil {
if errors.IsNotFound(err) {
// does not exist, is not ready, no error
return false, nil, nil
}
// does not exist, is not ready, is in error
return false, nil, err
}
return true, d, nil
}

func (f *FlagdProxyHandler) shouldUpdateFlagdProxy(old, new *appsV1.Deployment) bool {
if !common.IsManagedByOFO(old) {
f.Log.Info("flagd-proxy Deployment not managed by OFO")
return false
}
return !reflect.DeepEqual(old.Spec, new.Spec)
}

func (f *FlagdProxyHandler) getOperatorDeployment(ctx context.Context) (*appsV1.Deployment, error) {
d := &appsV1.Deployment{}
if err := f.Client.Get(ctx, client.ObjectKey{Name: f.config.OperatorDeploymentName, Namespace: f.config.Namespace}, d); err != nil {
return nil, fmt.Errorf("unable to fetch operator deployment: %w", err)
}
return d, nil

}

func (f *FlagdProxyHandler) getOwnerReference(ctx context.Context) (*metav1.OwnerReference, error) {
Expand Down
Loading
Loading