Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add service to expose DC nodes in the control plane (fixes #1382) #1390

Merged
merged 5 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG/CHANGELOG-1.20.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ When cutting a new release, update the `unreleased` heading to the tag being gen
## unreleased

* [BUGFIX] [#1399](https://github.com/k8ssandra/k8ssandra-operator/issues/1399) Fixed SecretSyncController to handle multiple namespaces
* [FEATURE] [#1382](https://github.com/k8ssandra/k8ssandra-operator/issues/1382) Add service to expose DC nodes in the control plane
4 changes: 4 additions & 0 deletions controllers/k8ssandra/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ func (r *K8ssandraClusterReconciler) deleteDc(ctx context.Context, kc *api.K8ssa
}

if dc != nil {
if err := r.deleteContactPointsService(ctx, kc, dc, logger); err != nil {
return result.Error(err)
}

if dc.GetConditionStatus(cassdcapi.DatacenterDecommission) == corev1.ConditionTrue {
logger.Info("CassandraDatacenter decommissioning in progress", "CassandraDatacenter", utils.GetKey(dc))
// There is no need to requeue here. Reconciliation will be trigger by updates made by cass-operator.
Expand Down
254 changes: 254 additions & 0 deletions controllers/k8ssandra/contact_points_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
package k8ssandra

import (
"context"
"github.com/go-logr/logr"
cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
api "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1"
"github.com/k8ssandra/k8ssandra-operator/pkg/annotations"
"github.com/k8ssandra/k8ssandra-operator/pkg/result"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

// reconcileContactPointsService ensures that the control plane contains a `*-contact-point-service` Service for each
// DC. This is useful for control plane components that need to connect to DCs.
// To get the node information, we rely on the `*-all-pods-service` that cass-operator creates in the data planes for
// each DC. k8ssandra-operator watches the Endpoints resource associated with this service, and keeps the local service
// in sync with it. Because K8ssandra already requires that pod IP addresses be routable across all Kubernetes clusters,
// we can use those IPs directly.
// See also:
// - NewDatacenter(), where we annotate the remote service via AdditionalServiceConfig.AllPodsService so that it can be
// watched.
// - K8ssandraClusterReconciler.SetupWithManager(), where we set up the watch.
func (r *K8ssandraClusterReconciler) reconcileContactPointsService(
ctx context.Context,
kc *api.K8ssandraCluster,
dc *cassdcapi.CassandraDatacenter,
remoteClient client.Client,
logger logr.Logger,
) result.ReconcileResult {
// First, try to fetch the Endpoints of the *-all-pods-service of the CassandraDatacenter, it contains the
// information we want to duplicate locally.
remoteEndpoints, err := r.loadAllPodsEndpoints(ctx, dc, remoteClient, logger)
if err != nil {
return result.Error(err)
}
if remoteEndpoints == nil {
// Not found. Assume things are not ready yet, another reconcile will be triggered later.
return result.Continue()
}

// Ensure the Service exists
if err = r.createContactPointsService(ctx, kc, dc, remoteEndpoints, logger); err != nil {
return result.Error(err)
}

// Ensure the Endpoints exists and is up-to-date
if err = r.reconcileContactPointsServiceEndpoints(ctx, kc, dc, remoteEndpoints, logger); err != nil {
return result.Error(err)
}
return result.Continue()
}

func (r *K8ssandraClusterReconciler) loadAllPodsEndpoints(
ctx context.Context, dc *cassdcapi.CassandraDatacenter, remoteClient client.Client, logger logr.Logger,
) (*corev1.Endpoints, error) {
key := types.NamespacedName{
Namespace: dc.Namespace,
Name: dc.GetAllPodsServiceName(),
}
endpoints := &corev1.Endpoints{}
if err := remoteClient.Get(ctx, key, endpoints); err != nil {
if errors.IsNotFound(err) {
logger.Info("Remote Endpoints not found", "key", key)
return nil, nil
}
logger.Error(err, "Failed to fetch remote Endpoints", "key", key)
return nil, err
}
if len(endpoints.Subsets) == 0 {
logger.Info("Remote Endpoints found but have no subsets", "key", key)
return nil, nil
}
logger.Info("Remote Endpoints found", "key", key)
return endpoints, nil
}

func contactPointsServiceKey(kc *api.K8ssandraCluster, dc *cassdcapi.CassandraDatacenter) client.ObjectKey {
return types.NamespacedName{
Namespace: kc.Namespace,
Name: kc.SanitizedName() + "-" + dc.SanitizedName() + "-contact-points-service",
}
}

func (r *K8ssandraClusterReconciler) createContactPointsService(
ctx context.Context,
kc *api.K8ssandraCluster,
dc *cassdcapi.CassandraDatacenter,
remoteEndpoints *corev1.Endpoints,
logger logr.Logger,
) error {
key := contactPointsServiceKey(kc, dc)
err := r.Client.Get(ctx, key, &corev1.Service{})
if err == nil {
// Service already exists, nothing to do.
// (note that we don't use a hash annotation here, because the contents are always the same)
return nil
}
if !errors.IsNotFound(err) {
logger.Error(err, "Failed to fetch Service", "key", key)
return err
}

// Else not found, create it
logger.Info("Creating Service", "key", key)
service, err := r.newContactPointsService(key, kc, dc, remoteEndpoints)
if err != nil {
logger.Error(err, "Failed to initialize Service", "key", key)
return err
}
if err = r.Client.Create(ctx, service); err != nil {
logger.Error(err, "Failed to create Service", "key", key)
return err
}
return nil
}

func (r *K8ssandraClusterReconciler) newContactPointsService(
key client.ObjectKey,
kc *api.K8ssandraCluster,
dc *cassdcapi.CassandraDatacenter,
remoteEndpoints *corev1.Endpoints,
) (*corev1.Service, error) {
var ports []corev1.ServicePort
for _, remotePort := range remoteEndpoints.Subsets[0].Ports {
ports = append(ports, corev1.ServicePort{
Name: remotePort.Name,
Port: remotePort.Port,
Protocol: remotePort.Protocol,
})
}
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: key.Namespace,
Name: key.Name,
Labels: map[string]string{
api.NameLabel: api.NameLabelValue,
api.PartOfLabel: api.PartOfLabelValue,
api.ComponentLabel: api.ComponentLabelValueCassandra,
api.K8ssandraClusterNameLabel: kc.Name,
api.K8ssandraClusterNamespaceLabel: kc.Namespace,
api.DatacenterLabel: dc.Name,
},
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
ClusterIP: corev1.ClusterIPNone,
Ports: ports,
// We don't provide a selector since the operator manages the Endpoints directly
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this service should be headless (currently it is not), it seems convenient to use the service name the contact point if a component wants to connect to "any node" in the DC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think the service must be headless if we want to be able to resolve individual pod addresses.

},
}
if err := controllerutil.SetControllerReference(kc, service, r.Scheme); err != nil {
return nil, err
}
return service, nil
}

func (r *K8ssandraClusterReconciler) reconcileContactPointsServiceEndpoints(
ctx context.Context,
kc *api.K8ssandraCluster,
dc *cassdcapi.CassandraDatacenter,
remoteEndpoints *corev1.Endpoints,
logger logr.Logger,
) error {
key := contactPointsServiceKey(kc, dc)
desiredEndpoints, err := r.newContactPointsServiceEndpoints(key, kc, dc, remoteEndpoints)
if err != nil {
logger.Error(err, "Failed to initialize Endpoints", "key", key)
return err
}
actualEndpoints := &corev1.Endpoints{}
if err = r.Client.Get(ctx, key, actualEndpoints); err != nil {
if errors.IsNotFound(err) {
logger.Info("Creating Endpoints", "key", key)
if err = r.Client.Create(ctx, desiredEndpoints); err != nil {
logger.Error(err, "Failed to create Endpoints", "key", key)
return err
}
return nil
}
logger.Error(err, "Failed to fetch Endpoints", "key", key)
return err
}
if !annotations.CompareHashAnnotations(actualEndpoints, desiredEndpoints) {
resourceVersion := actualEndpoints.GetResourceVersion()
desiredEndpoints.DeepCopyInto(actualEndpoints)
actualEndpoints.SetResourceVersion(resourceVersion)
logger.Info("Updating Endpoints", "key", key)
if err := r.Client.Update(ctx, actualEndpoints); err != nil {
logger.Error(err, "Failed to update Endpoints", "key", key)
return err
}
}
return nil
}

func (r *K8ssandraClusterReconciler) newContactPointsServiceEndpoints(
serviceKey client.ObjectKey,
kc *api.K8ssandraCluster,
dc *cassdcapi.CassandraDatacenter,
remoteEndpoints *corev1.Endpoints,
) (*corev1.Endpoints, error) {
var subsets []corev1.EndpointSubset
for _, remoteSubset := range remoteEndpoints.Subsets {
var addresses []corev1.EndpointAddress
for _, remoteAddress := range remoteSubset.Addresses {
// Only preserve the IP. Other fields such as HostName, NodeName, etc. are specific to the remote cluster,
// so keeping them would be at best misleading (or worse if some component relies on them).
addresses = append(addresses, corev1.EndpointAddress{IP: remoteAddress.IP})
}
subsets = append(subsets, corev1.EndpointSubset{
Addresses: addresses,
Ports: remoteSubset.Ports,
})
}
endpoints := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Namespace: serviceKey.Namespace,
// The Endpoints resource has the same name as the Service (that's how Kubernetes links them)
Name: serviceKey.Name,
Labels: map[string]string{
api.NameLabel: api.NameLabelValue,
api.PartOfLabel: api.PartOfLabelValue,
api.ComponentLabel: api.ComponentLabelValueCassandra,
api.K8ssandraClusterNameLabel: kc.Name,
api.K8ssandraClusterNamespaceLabel: kc.Namespace,
api.DatacenterLabel: dc.Name,
},
},
Subsets: subsets,
}
annotations.AddHashAnnotation(endpoints)
return endpoints, nil
}

func (r *K8ssandraClusterReconciler) deleteContactPointsService(
ctx context.Context,
kc *api.K8ssandraCluster,
dc *cassdcapi.CassandraDatacenter,
logger logr.Logger,
) error {
key := contactPointsServiceKey(kc, dc)
// We just need to delete the Service, Kubernetes automatically cleans up the Endpoints.
service := corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: key.Namespace, Name: key.Name}}
if err := r.Client.Delete(ctx, &service); err != nil && !errors.IsNotFound(err) {
logger.Error(err, "Failed to delete Service", "key", key)
return err
}
return nil
}
2 changes: 2 additions & 0 deletions controllers/k8ssandra/datacenters.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k

r.setStatusForDatacenter(kc, actualDc)

r.reconcileContactPointsService(ctx, kc, actualDc, remoteClient, dcLogger)

if !annotations.CompareHashAnnotations(actualDc, desiredDc) && !AllowUpdate(kc) {
logger.Info("Datacenter requires an update, but we're not allowed to do it", "CassandraDatacenter", dcKey)
// We're not allowed to update, but need to
Expand Down
18 changes: 18 additions & 0 deletions controllers/k8ssandra/k8ssandracluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package k8ssandra

import (
"context"
"strings"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -223,6 +224,19 @@ func (r *K8ssandraClusterReconciler) SetupWithManager(mgr ctrl.Manager, clusters
return requests
}

// Use a more specific filter for Endpoints because we are only interested in one particular service.
endpointsFilter := func(ctx context.Context, mapObj client.Object) []reconcile.Request {
requests := make([]reconcile.Request, 0)

kcName := labels.GetLabel(mapObj, api.K8ssandraClusterNameLabel)
kcNamespace := labels.GetLabel(mapObj, api.K8ssandraClusterNamespaceLabel)

if kcName != "" && kcNamespace != "" && strings.HasSuffix(mapObj.GetName(), "all-pods-service") {
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: kcNamespace, Name: kcName}})
}
return requests
}

cb = cb.Watches(&cassdcapi.CassandraDatacenter{},
handler.EnqueueRequestsFromMapFunc(clusterLabelFilter))
cb = cb.Watches(&stargateapi.Stargate{},
Expand All @@ -231,6 +245,8 @@ func (r *K8ssandraClusterReconciler) SetupWithManager(mgr ctrl.Manager, clusters
handler.EnqueueRequestsFromMapFunc(clusterLabelFilter))
cb = cb.Watches(&v1.ConfigMap{},
handler.EnqueueRequestsFromMapFunc(clusterLabelFilter))
cb = cb.Watches(&v1.Endpoints{},
handler.EnqueueRequestsFromMapFunc(endpointsFilter))

for _, c := range clusters {
cb = cb.WatchesRawSource(source.Kind(c.GetCache(), &cassdcapi.CassandraDatacenter{}),
Expand All @@ -241,6 +257,8 @@ func (r *K8ssandraClusterReconciler) SetupWithManager(mgr ctrl.Manager, clusters
handler.EnqueueRequestsFromMapFunc(clusterLabelFilter))
cb = cb.WatchesRawSource(source.Kind(c.GetCache(), &v1.ConfigMap{}),
handler.EnqueueRequestsFromMapFunc(clusterLabelFilter))
cb = cb.WatchesRawSource(source.Kind(c.GetCache(), &v1.Endpoints{}),
handler.EnqueueRequestsFromMapFunc(endpointsFilter))
}

return cb.Complete(r)
Expand Down
48 changes: 48 additions & 0 deletions controllers/k8ssandra/k8ssandracluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,22 @@ func createMultiDcCluster(t *testing.T, ctx context.Context, f *framework.Framew
return !(condition == nil && condition.Status == corev1.ConditionFalse)
}, timeout, interval, "timed out waiting for K8ssandraCluster status update")

t.Log("simulate the creation of the all-pods Endpoints for dc1")
err = f.Create(ctx, dc1Key, f.NewAllPodsEndpoints(kcKey, kc, dc1Key, "10.0.0.1"))
require.NoError(err, "failed to create Endpoints")

t.Log("check that the contact-points Service for dc1 was created in the control plane")
require.Eventually(func() bool {
_, endpoints, err := f.GetContactPointsService(ctx, kcKey, kc, dc1Key)
if err != nil {
t.Logf("failed to get contact-points Service: %v", err)
return false
}
return len(endpoints.Subsets) == 1 &&
endpoints.Subsets[0].Addresses[0].IP == "10.0.0.1" &&
endpoints.Subsets[0].Ports[0].Port == 9042
}, timeout, interval, "timed out waiting for creation of contact-points Service for dc1")

t.Log("check that dc2 has not been created yet")
dc2Key := framework.ClusterKey{NamespacedName: types.NamespacedName{Namespace: namespace, Name: "dc2"}, K8sContext: f.DataPlaneContexts[1]}
dc2 := &cassdcapi.CassandraDatacenter{}
Expand Down Expand Up @@ -892,6 +908,38 @@ func createMultiDcCluster(t *testing.T, ctx context.Context, f *framework.Framew
return condition != nil && condition.Status == corev1.ConditionTrue
}, timeout, interval, "timed out waiting for K8ssandraCluster status update")

t.Log("simulate the creation of the all-pods Endpoints for dc2")
err = f.Create(ctx, dc2Key, f.NewAllPodsEndpoints(kcKey, kc, dc2Key, "10.0.0.2"))
require.NoError(err, "failed to create Endpoints")

t.Log("check that the contact-points Service for dc2 was created in the control plane")
require.Eventually(func() bool {
_, endpoints, err := f.GetContactPointsService(ctx, kcKey, kc, dc2Key)
if err != nil {
t.Logf("failed to get contact-points Service: %v", err)
return false
}
return len(endpoints.Subsets) == 1 &&
endpoints.Subsets[0].Addresses[0].IP == "10.0.0.2" &&
endpoints.Subsets[0].Ports[0].Port == 9042
}, timeout, interval, "timed out waiting for creation of contact-points Service for dc2")

t.Log("simulate a change of Endpoints in dc2")
err = f.Update(ctx, dc2Key, f.NewAllPodsEndpoints(kcKey, kc, dc2Key, "10.0.0.3"))
require.NoError(err, "failed to update Endpoints")

t.Log("check that the contact-points Service for dc2 was updated")
require.Eventually(func() bool {
_, endpoints, err := f.GetContactPointsService(ctx, kcKey, kc, dc2Key)
if err != nil {
t.Logf("failed to get contact-points Service: %v", err)
return false
}
return len(endpoints.Subsets) == 1 &&
endpoints.Subsets[0].Addresses[0].IP == "10.0.0.3" &&
endpoints.Subsets[0].Ports[0].Port == 9042
}, timeout, interval, "timed out waiting for update of contact-points Service for dc2")

t.Log("deleting K8ssandraCluster")
err = f.DeleteK8ssandraCluster(ctx, client.ObjectKey{Namespace: kc.Namespace, Name: kc.Name}, timeout, interval)
require.NoError(err, "failed to delete K8ssandraCluster")
Expand Down
Loading
Loading