Skip to content

Commit

Permalink
feat: update atm endpoints when handling trafficmanagerbackend (#216)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiying-lin authored Nov 15, 2024
1 parent ecf2290 commit aef3b54
Show file tree
Hide file tree
Showing 10 changed files with 1,136 additions and 157 deletions.
161 changes: 150 additions & 11 deletions pkg/controllers/hub/trafficmanagerbackend/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/trafficmanager/armtrafficmanager"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -50,10 +51,13 @@ const (
AzureResourceEndpointNamePrefix = "fleet-%s#"

// AzureResourceEndpointNameFormat is the name format of the Azure Traffic Manager Endpoint created by the fleet controller.
// The naming convention of a Traffic Manager Endpoint is fleet-{TrafficManagerBackendUUID}#{ServiceImportName}#{ClusterName}.
// All the object name length should be restricted to <= 63 characters.
// The naming convention of a Traffic Manager Endpoint is {AzureResourceEndpointNamePrefix}{ServiceImportName}#{ClusterName}.
// which is fleet-{TrafficManagerBackendUUID}#{ServiceImportName}#{ClusterName}.
// ServiceImportName will be the same as the Service name, which is up to 63 characters (RFC 1035).
// https://github.com/kubernetes/kubernetes/pull/29523
// The cluster name length should be restricted to <= 63 characters.
// The endpoint name must contain no more than 260 characters, excluding the following characters "< > * % $ : \ ? + /".
AzureResourceEndpointNameFormat = AzureResourceEndpointNamePrefix + "%s#%s"
AzureResourceEndpointNameFormat = "%s%s#%s"
)

var (
Expand Down Expand Up @@ -181,7 +185,7 @@ func (r *Reconciler) cleanupEndpoints(ctx context.Context, backend *fleetnetv1al
endpoint := atmProfile.Properties.Endpoints[i]
if endpoint.Name == nil {
err := controller.NewUnexpectedBehaviorError(errors.New("azure Traffic Manager endpoint name is nil"))
klog.ErrorS(err, "Invalid Traffic Manager endpoint", "azureEndpoint", endpoint)
klog.ErrorS(err, "Invalid Traffic Manager endpoint", "atmEndpoint", endpoint)
continue
}
// Traffic manager endpoint name is case-insensitive.
Expand All @@ -191,21 +195,21 @@ func (r *Reconciler) cleanupEndpoints(ctx context.Context, backend *fleetnetv1al
errs.Go(func() error {
if _, err := r.EndpointsClient.Delete(cctx, r.ResourceGroupName, atmProfileName, armtrafficmanager.EndpointTypeAzureEndpoints, *endpoint.Name, nil); err != nil {
if azureerrors.IsNotFound(err) {
klog.V(2).InfoS("Ignoring NotFound Azure Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfileName", atmProfileName, "azureEndpointName", *endpoint.Name)
klog.V(2).InfoS("Ignoring NotFound Azure Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfileName", atmProfileName, "atmEndpoint", *endpoint.Name)
return nil
}
klog.ErrorS(err, "Failed to delete the endpoint", "trafficManagerBackend", backendKObj, "atmProfileName", atmProfileName, "azureEndpointName", *endpoint.Name)
klog.ErrorS(err, "Failed to delete the endpoint", "trafficManagerBackend", backendKObj, "atmProfileName", atmProfileName, "atmEndpoint", *endpoint.Name)
return err
}
klog.V(2).InfoS("Deleted Azure Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfileName", atmProfileName, "azureEndpointName", *endpoint.Name)
klog.V(2).InfoS("Deleted Azure Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfileName", atmProfileName, "atmEndpoint", *endpoint.Name)
return nil
})
}
return errs.Wait()
}

func isEndpointOwnedByBackend(backend *fleetnetv1alpha1.TrafficManagerBackend, endpoint string) bool {
return strings.HasPrefix(strings.ToLower(endpoint), generateAzureTrafficManagerEndpointNamePrefixFunc(backend))
return strings.HasPrefix(endpoint, generateAzureTrafficManagerEndpointNamePrefixFunc(backend))
}

func (r *Reconciler) handleUpdate(ctx context.Context, backend *fleetnetv1alpha1.TrafficManagerBackend) (ctrl.Result, error) {
Expand Down Expand Up @@ -240,11 +244,37 @@ func (r *Reconciler) handleUpdate(ctx context.Context, backend *fleetnetv1alpha1
klog.V(2).InfoS("Found the serviceImport", "trafficManagerBackend", backendKObj, "serviceImport", klog.KObj(serviceImport), "clusters", serviceImport.Status.Clusters)

desiredEndpointsMaps, invalidServicesMaps, err := r.validateExportedServiceForServiceImport(ctx, backend, serviceImport)
if err != nil {
if err != nil || (desiredEndpointsMaps == nil && invalidServicesMaps == nil) {
// We don't need to requeue not found internalServiceExport(err == nil and desiredEndpointsMaps == nil && invalidServicesMaps == nil)
// as when the serviceImport is updated, the controller will be re-triggered again.
// The controller will retry when err is not nil.
return ctrl.Result{}, err
}
klog.V(2).InfoS("Found the exported services behind the serviceImport", "trafficManagerBackend", backendKObj, "serviceImport", klog.KObj(serviceImport), "numberOfDesiredEndpoints", len(desiredEndpointsMaps), "numberOfInvalidServices", len(invalidServicesMaps))
return ctrl.Result{}, nil

acceptedEndpoints, badEndpointsErr, err := r.updateTrafficManagerEndpointsAndUpdateStatusIfUnknown(ctx, backend, atmProfile, desiredEndpointsMaps)
if err != nil {
return ctrl.Result{}, err
}
if len(invalidServicesMaps) == 0 && len(badEndpointsErr) == 0 {
setTrueCondition(backend, acceptedEndpoints)
} else {
var invalidEndpointErrMessage string
if len(badEndpointsErr) > 0 {
invalidEndpointErrMessage = fmt.Sprintf("%v endpoint(s) failed to be created/updated in the Azure Traffic Manager, for example, %v; ", len(badEndpointsErr), badEndpointsErr[0])
}
if len(invalidServicesMaps) > 0 {
for clusterID, invalidServiceErr := range invalidServicesMaps {
invalidEndpointErrMessage = invalidEndpointErrMessage + fmt.Sprintf("%v service(s) exported from clusters cannot be exposed as the Azure Traffic Manager, for example, service exported from %v is invalid: %v", len(invalidServicesMaps), clusterID, invalidServiceErr)
// Here we only populate the message with the first invalid exported service.
// Note, the loop of the invalidServicesMaps is not deterministic.
break
}
}
setFalseCondition(backend, acceptedEndpoints, invalidEndpointErrMessage)
}
klog.V(2).InfoS("Updated Traffic Manager endpoints for the serviceImport and updating the condition", "trafficManagerBackend", backendKObj, "status", backend.Status)
return ctrl.Result{}, r.updateTrafficManagerBackendStatus(ctx, backend)
}

// validateTrafficManagerProfile returns not nil profile when the profile is valid.
Expand Down Expand Up @@ -367,6 +397,18 @@ func setUnknownCondition(backend *fleetnetv1alpha1.TrafficManagerBackend, messag
meta.SetStatusCondition(&backend.Status.Conditions, cond)
}

func setTrueCondition(backend *fleetnetv1alpha1.TrafficManagerBackend, acceptedEndpoints []fleetnetv1alpha1.TrafficManagerEndpointStatus) {
cond := metav1.Condition{
Type: string(fleetnetv1alpha1.TrafficManagerBackendConditionAccepted),
Status: metav1.ConditionTrue,
ObservedGeneration: backend.Generation,
Reason: string(fleetnetv1alpha1.TrafficManagerBackendReasonAccepted),
Message: fmt.Sprintf("%v service(s) exported from clusters have been accepted as Traffic Manager endpoints", len(acceptedEndpoints)),
}
backend.Status.Endpoints = acceptedEndpoints
meta.SetStatusCondition(&backend.Status.Conditions, cond)
}

func (r *Reconciler) updateTrafficManagerBackendStatus(ctx context.Context, backend *fleetnetv1alpha1.TrafficManagerBackend) error {
backendKObj := klog.KObj(backend)
if err := r.Client.Status().Update(ctx, backend); err != nil {
Expand Down Expand Up @@ -460,7 +502,7 @@ func isValidTrafficManagerEndpoint(export *fleetnetv1alpha1.InternalServiceExpor
}

func generateAzureTrafficManagerEndpoint(backend *fleetnetv1alpha1.TrafficManagerBackend, service *fleetnetv1alpha1.InternalServiceExport) armtrafficmanager.Endpoint {
endpointName := fmt.Sprintf(AzureResourceEndpointNameFormat, backend.UID, backend.Spec.Backend, service.Spec.ServiceReference.ClusterID)
endpointName := fmt.Sprintf(AzureResourceEndpointNameFormat, generateAzureTrafficManagerEndpointNamePrefixFunc(backend), backend.Spec.Backend.Name, service.Spec.ServiceReference.ClusterID)
return armtrafficmanager.Endpoint{
Name: &endpointName,
Type: ptr.To(string(armtrafficmanager.EndpointTypeAzureEndpoints)),
Expand All @@ -471,6 +513,103 @@ func generateAzureTrafficManagerEndpoint(backend *fleetnetv1alpha1.TrafficManage
}
}

func buildAcceptedEndpointStatus(endpoint *armtrafficmanager.Endpoint, cluster *fleetnetv1alpha1.ClusterStatus) fleetnetv1alpha1.TrafficManagerEndpointStatus {
return fleetnetv1alpha1.TrafficManagerEndpointStatus{
Name: strings.ToLower(*endpoint.Name), // name is case-insensitive
Target: endpoint.Properties.Target,
Weight: endpoint.Properties.Weight,
Cluster: cluster,
}
}

// equalAzureTrafficManagerEndpoint compares only few fields of the current and desired Azure Traffic Manager endpoints
// by ignoring others.
// The desired endpoint is built by the controllers and all the required fields should not be nil.
func equalAzureTrafficManagerEndpoint(current, desired armtrafficmanager.Endpoint) bool {
if current.Type == nil || *current.Type != *desired.Type {
return false
}
if current.Properties == nil || current.Properties.TargetResourceID == nil || current.Properties.Weight == nil || current.Properties.EndpointStatus == nil {
return false
}
return strings.EqualFold(*current.Properties.TargetResourceID, *desired.Properties.TargetResourceID) &&
*current.Properties.Weight == *desired.Properties.Weight &&
*current.Properties.EndpointStatus == *desired.Properties.EndpointStatus
}

// updateTrafficManagerEndpointsAndUpdateStatusIfUnknown updates the Azure Traffic Manager endpoints.
// Returns the accepted endpoints and a list of bad endpoints error when it fails to create/update endpoint or not because of bad request.
func (r *Reconciler) updateTrafficManagerEndpointsAndUpdateStatusIfUnknown(ctx context.Context, backend *fleetnetv1alpha1.TrafficManagerBackend, profile *armtrafficmanager.Profile, desiredEndpoints map[string]desiredEndpoint) ([]fleetnetv1alpha1.TrafficManagerEndpointStatus, []error, error) {
backendKObj := klog.KObj(backend)
acceptedEndpoints := make([]fleetnetv1alpha1.TrafficManagerEndpointStatus, 0, len(desiredEndpoints))
for _, endpoint := range profile.Properties.Endpoints {
if endpoint.Name == nil {
err := controller.NewUnexpectedBehaviorError(errors.New("azure Traffic Manager endpoint name is nil"))
klog.ErrorS(err, "Invalid Traffic Manager endpoint", "atmEndpoint", endpoint)
continue
}

endpointName := strings.ToLower(*endpoint.Name) // resource name are case-insensitive
if !isEndpointOwnedByBackend(backend, endpointName) {
continue // skipping the endpoint which is not owned by this backend
}

desired, ok := desiredEndpoints[endpointName]
if !ok {
klog.V(2).InfoS("Deleting the Azure Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfile", profile.Name, "atmEndpoint", endpointName)
if _, deleteErr := r.EndpointsClient.Delete(ctx, r.ResourceGroupName, *profile.Name, armtrafficmanager.EndpointTypeAzureEndpoints, *endpoint.Name, nil); deleteErr != nil {
if azureerrors.IsNotFound(deleteErr) {
klog.V(2).InfoS("Ignoring NotFound Azure Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfile", profile.Name, "atmEndpoint", endpointName)
continue
}
klog.ErrorS(deleteErr, "Failed to delete the Azure Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfile", profile.Name, "atmEndpoint", endpointName)
setUnknownCondition(backend, fmt.Sprintf("Failed to cleanup the existing %q for %q: %v", endpointName, *profile.Name, deleteErr))
if err := r.updateTrafficManagerBackendStatus(ctx, backend); err != nil {
return nil, nil, err
}
return nil, nil, deleteErr
}
klog.V(2).InfoS("Deleted the Azure Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfile", profile.Name, "atmEndpoint", endpointName)
continue
}
if equalAzureTrafficManagerEndpoint(*endpoint, desired.Endpoint) {
klog.V(2).InfoS("Skipping updating the existing Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfile", profile.Name, "atmEndpoint", endpointName)
delete(desiredEndpoints, endpointName) // no need to update the existing endpoint
acceptedEndpoints = append(acceptedEndpoints, buildAcceptedEndpointStatus(endpoint, &desired.Cluster))
continue
} // no need to update the endpoint if it's the same
}
badEndpointsError := make([]error, 0, len(desiredEndpoints))
// The remaining endpoints in the desiredEndpoints should be created or updated.
for _, endpoint := range desiredEndpoints {
klog.V(2).InfoS("Creating new Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfile", profile.Name, "atmEndpoint", endpoint)
var responseError *azcore.ResponseError
endpointName := *endpoint.Endpoint.Name
res, updateErr := r.EndpointsClient.CreateOrUpdate(ctx, r.ResourceGroupName, *profile.Name, armtrafficmanager.EndpointTypeAzureEndpoints, endpointName, endpoint.Endpoint, nil)
if updateErr != nil {
if !errors.As(updateErr, &responseError) {
klog.ErrorS(updateErr, "Failed to send the createOrUpdate request", "trafficManagerBackend", backendKObj, "atmProfile", *profile.Name, "atmEndpoint", endpointName)
return nil, nil, updateErr
}
klog.ErrorS(updateErr, "Failed to create or update the Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfile", *profile.Name, "atmEndpoint", endpointName)
if azureerrors.IsClientError(updateErr) && !azureerrors.IsThrottled(updateErr) {
// When the failure is caused by the client error, will continue to process others.
badEndpointsError = append(badEndpointsError, updateErr)
continue
}
setUnknownCondition(backend, fmt.Sprintf("Failed to create or update %q for %q: %v", *endpoint.Endpoint.Name, *profile.Name, updateErr))
if err := r.updateTrafficManagerBackendStatus(ctx, backend); err != nil {
return nil, nil, err
}
return nil, nil, updateErr
}
klog.V(2).InfoS("Created or updated Traffic Manager endpoint", "trafficManagerBackend", backendKObj, "atmProfile", profile.Name, "atmEndpoint", endpointName)
acceptedEndpoints = append(acceptedEndpoints, buildAcceptedEndpointStatus(&res.Endpoint, &endpoint.Cluster))
}
klog.V(2).InfoS("Successfully updated the Traffic Manager endpoints", "trafficManagerBackend", backendKObj, "atmProfile", profile.Name, "numberOfAcceptedEndpoints", len(acceptedEndpoints), "numberOfBadEndpoints", len(badEndpointsError))
return acceptedEndpoints, badEndpointsError, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
// set up an index for efficient trafficManagerBackend lookup
Expand Down
Loading

0 comments on commit aef3b54

Please sign in to comment.