Skip to content

Commit

Permalink
Implement private endpoint support for Event Hubs
Browse files Browse the repository at this point in the history
  • Loading branch information
swoehrl-mw committed Feb 7, 2024
1 parent a0707aa commit efbc600
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 13 deletions.
27 changes: 24 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Hybrid Cloud Operator for Kafka
# Hybrid Cloud Operator for Kafka

The Hybrid Cloud Operator for Kafka is a Kubernetes Operator that has been designed for hybrid cloud, multi-teams kubernetes platforms to allow users and teams to deploy and manage their own Kafka brokers via kubernetes without cloud provider specific provisioning.

Expand Down Expand Up @@ -56,6 +56,16 @@ backends:
default_capacity: 1 # Default capacity to use for the Eventhub namespaces. Range 1-20, optional
name_pattern: "{namespace}-{name}" # Name pattern to use for the Eventhub namespaces, optional
fake_delete: false # If set to true the operator will not actually delete the eventhub namespace when the object in kubernetes is deleted, optional
network:
public_network_access: true # Allow access to the eventhub namespace from the public internet, optional
allow_trusted_services: true # Allow access to the eventhub namespace from trusted azure services (relevant if public access is disabled), optional
create_private_endpoint: false # If set to true the operator will create private endpoints for each subnet listed unter allowed_subnets, optional
allowed_ips: # List of IP nets to allow access to the Eventhub namespace, optional
- cidr: 127.0.0.1/128 # IP net CIDR
allowed_subnets: # List of VNet subnets to allow access to the Eventhub namespace, optional
- vnet: # Name of the virtual network, required
subnet: # Name of the subnet, required
resource_group: # Name of the resource group to provision the private endpoint in, if not set group of eventhub will be used, optional
topic:
fake_delete: false # If set to true the operator will not actually delete the eventhub when the object in kubernetes is deleted, optional
name_pattern: "{namespace}-{name}" # Name pattern to use for the Eventhub, optional
Expand All @@ -73,7 +83,18 @@ backends:
kafka_config: # map of kafka config options to use for the broker, optional
```
For the operator to interact with Azure it needs credentials. For local testing it can pick up the token from the azure cli but for real deployments it needs a dedicated service principal. Supply the credentials for the service principal using the environment variables `AZURE_SUBSCRIPTION_ID`, `AZURE_TENANT_ID`, `AZURE_CLIENT_ID` and `AZURE_CLIENT_SECRET` (if you deploy via the helm chart use the use `envSecret` value). The service principal needs permissions to manage Azure EventHubs.
For the operator to interact with Azure it needs credentials. For local testing it can pick up the token from the azure cli but for real deployments it needs a dedicated service principal. Supply the credentials for the service principal using the environment variables `AZURE_SUBSCRIPTION_ID`, `AZURE_TENANT_ID`, `AZURE_CLIENT_ID` and `AZURE_CLIENT_SECRET` (if you deploy via the helm chart use the use `envSecret` value). The service principal needs permissions to manage Azure Event Hubs.

### Azure Event Hubs

If you configure the operator to create private endpoints, some rquirements must be met:

* The user/principal/identity the operator uses must have permissions to manage networks (to configure the private endpoints) and DNS zones.
* You must have at least one virtual network with a subnet in the resource group.
* The resource group must have an existing private DNS zone for the domain `privatelink.servicebus.windows.net`.
* The DNS zone must be linked to any virtual network that should access an Event Hub.

Right now the operator only supports one private endpoint per resource group due to the connection with the private DNS zone. Also if you change the list of subnets and remove one the operator will not remove the endpoints from existing Event Hub namespaces.

### Deployment

Expand Down Expand Up @@ -164,7 +185,7 @@ To run it locally follow these steps:
2. Install dependencies: `pip install -r requirements.txt`
3. Setup a local kubernetes cluster, e.g. with k3d: `k3d cluster create`
4. Apply the CRDs in your local cluster: `kubectl apply -f helm/hybrid-cloud-kafka-operator-crds/templates/`
5. If you want to use the Azure EventHub backend: Either have the azure cli installed and configured with an active login or export the following environment variables: `AZURE_TENANT_ID`, `AZURE_CLIENT_ID`, `AZURE_CLIENT_SECRET`
5. If you want to use the Azure Event Hub backend: Either have the azure cli installed and configured with an active login or export the following environment variables: `AZURE_TENANT_ID`, `AZURE_CLIENT_ID`, `AZURE_CLIENT_SECRET`
6. Create a `config.yaml` to suit your needs
7. Run `kopf run main.py -A`
8. In another window apply some objects to the cluster to trigger the operator (see the `examples` folder)
Expand Down
113 changes: 105 additions & 8 deletions hybridcloud/backends/azureeventhub.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import string
from azure.core.exceptions import ResourceNotFoundError
from azure.mgmt.eventhub.models import Eventhub, AuthorizationRule, AccessRights, RegenerateAccessKeyParameters, CheckNameAvailabilityParameter, EHNamespace, Sku
from azure.mgmt.eventhub.models import Eventhub, AuthorizationRule, AccessRights, RegenerateAccessKeyParameters, CheckNameAvailabilityParameter, \
EHNamespace, Sku, NetworkRuleSet, NWRuleSetIpRules, NWRuleSetVirtualNetworkRules, Subnet
from azure.mgmt.network.models import PrivateEndpoint, PrivateLinkServiceConnection
from hybridcloud_core.configuration import get_one_of
from hybridcloud_core.operator.reconcile_helpers import field_from_spec
from ..util.azure import eventhub_client
from ..util.azure import eventhub_client, network_client, privatedns_client


ALLOWED_NAMESPACE_NAME_CHARACTERS = string.ascii_lowercase + string.digits + "-"
Expand Down Expand Up @@ -31,6 +33,8 @@ class AzureEventHubBackend:
def __init__(self, logger):
self._logger = logger
self._eventhub_client = eventhub_client()
self._network_client = network_client()
self._dns_client = privatedns_client()
self._subscription_id = _backend_config("subscription_id", fail_if_missing=True)
self._location = _backend_config("location", fail_if_missing=True)
self._resource_group = _backend_config("resource_group", fail_if_missing=True)
Expand Down Expand Up @@ -68,25 +72,118 @@ async def create_or_update_broker(self, namespace, name, spec, extra_tags=None):
class_def = _backend_config(f"broker.classes.{size_class}", default={})
sku = class_def.get("sku", sku)
capacity = class_def.get("capacity", capacity)
parameters = EHNamespace(
location=self._location,
tags=_tags(namespace, name, extra_tags),
sku=Sku(name=sku, tier=sku, capacity=capacity)
)
result = self._eventhub_client.namespaces.begin_create_or_update(self._resource_group, broker_name, parameters).result()

existing = self.broker_exists(namespace, name)
if not existing or existing.sku.tier != sku or existing.sku.capacity != capacity:
parameters = EHNamespace(
location=self._location,
tags=_tags(namespace, name, extra_tags),
sku=Sku(name=sku, tier=sku, capacity=capacity)
)
self._logger.info("Creating eventhub namespace")
result = self._eventhub_client.namespaces.begin_create_or_update(self._resource_group, broker_name, parameters).result()
broker_id = result.id
else:
self._logger.info("Eventhub namespace already configured. Not updating it")
broker_id = existing.id

self._configure_networking(broker_name, broker_id)

return {
"azure_name": broker_name,
"name": name,
"namespace": namespace
}

def _get_private_endpoint(self, resource_group, name):
try:
return self._network_client.private_endpoints.get(resource_group, name)
except ResourceNotFoundError:
return None

def _configure_networking(self, broker_name, broker_id):
# Create NetworkRuleSet
default_action = "Allow" if _backend_config("broker.network.public_network_access", True) else "Deny"
ip_rules = []
for rule in _backend_config("broker.network.allowed_ips", []):
ip_rules.append(NWRuleSetIpRules(ip_mask=rule["cidr"], action="Allow"))
vnet_rules = []
for rule in _backend_config("broker.network.allowed_subnets", []):
subnet_id = f"/subscriptions/{self._subscription_id}/resourceGroups/{self._resource_group}/providers/Microsoft.Network/virtualNetworks/{rule['vnet']}/subnets/{rule['subnet']}"
vnet_rules.append(NWRuleSetVirtualNetworkRules(subnet=Subnet(id=subnet_id), ignore_missing_vnet_service_endpoint=True))
parameters = NetworkRuleSet(
trusted_service_access_enabled=_backend_config("broker.network.allow_trusted_services", True),
public_network_access="Enabled" if _backend_config("broker.network.public_network_access", True) else "Disabled",
default_action=default_action,
virtual_network_rules=vnet_rules,
ip_rules=ip_rules,
)
self._logger.info("Configuring networkruleset")
self._eventhub_client.namespaces.create_or_update_network_rule_set(self._resource_group, broker_name, parameters)

# Create private endpoints
self._logger.info("Configuring private endpoints")
for rule in _backend_config("broker.network.allowed_subnets", []):
rg = rule.get("resource_group", self._resource_group)
endpoint_name = f"{broker_name}-{rule['vnet']}-{rule['subnet']}"
existing_endpoint = self._get_private_endpoint(rg, endpoint_name)
if not existing_endpoint:
self._logger.info(f"Creating endpoint for {rg}/vnets/{rule['vnet']}/subnets/{rule['subnet']}")
private_endpoint = self._network_client.private_endpoints.begin_create_or_update(
rg,
endpoint_name,
parameters=PrivateEndpoint(
location=self._location,
subnet=Subnet(id=f"/subscriptions/{self._subscription_id}/resourceGroups/{rg}/providers/Microsoft.Network/virtualNetworks/{rule['vnet']}/subnets/{rule['subnet']}"),
private_link_service_connections=[PrivateLinkServiceConnection(
name=f"link-{broker_name}",
private_link_service_id=broker_id,
group_ids=["namespace"]
)],
)
).result()
else:
self._logger.info(f"Endpoint for {rg}/vnets/{rule['vnet']}/subnets/{rule['subnet']} already exists. Not creating it again")
private_endpoint = existing_endpoint

self._logger.info(f"Creating DNS record for {rg}/vnets/{rule['vnet']}/subnets/{rule['subnet']}")
self._dns_client.record_sets.create_or_update(
rg,
'privatelink.servicebus.windows.net',
'A',
broker_name,
{
"ttl": 30,
"arecords": [{"ipv4_address": private_endpoint.custom_dns_configs[0].ip_addresses[0]}]
}
)

async def delete_broker(self, namespace, name):
broker_name = _calc_namespace_name(namespace, name)
fake_delete = _backend_config("broker.fake_delete", default=False)
if fake_delete:
self.create_or_update_broker(namespace, name, None, {"marked-for-deletion": "yes"})
else:
self._eventhub_client.namespaces.begin_delete(self._resource_group, broker_name).result()
self._delete_networking(broker_name)

def _delete_networking(self, broker_name):
# Create private endpoints
self._logger.info("Deleting private endpoints")
for rule in _backend_config("broker.network.allowed_subnets", []):
rg = rule.get("resource_group", self._resource_group)
endpoint_name = f"{broker_name}-{rule['vnet']}-{rule['subnet']}"
# Create private endpoint with privateservicelink
self._logger.info(f"Deleting endpoint for {rg}/vnets/{rule['vnet']}/subnets/{rule['subnet']}")
self._network_client.private_endpoints.begin_delete(rg, endpoint_name).result()
# Create private DNS record
self._logger.info(f"Deleting DNS record for {rg}/vnets/{rule['vnet']}/subnets/{rule['subnet']}")
self._dns_client.record_sets.delete(
rg,
'privatelink.servicebus.windows.net',
'A',
broker_name
)

def topic_spec_valid(self, namespace, name, spec, broker_info):
topic_name = _calc_topic_name(namespace, name)
Expand Down
9 changes: 9 additions & 0 deletions hybridcloud/util/azure.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from azure.identity import DefaultAzureCredential
from azure.mgmt.eventhub import EventHubManagementClient
from azure.mgmt.network import NetworkManagementClient
from azure.mgmt.privatedns import PrivateDnsManagementClient
from hybridcloud_core.configuration import get_one_of


Expand All @@ -13,3 +15,10 @@ def _credentials():

def eventhub_client() -> EventHubManagementClient:
return EventHubManagementClient(_credentials(), _subscription_id())


def network_client():
return NetworkManagementClient(_credentials(), _subscription_id())

def privatedns_client():
return PrivateDnsManagementClient(_credentials(), _subscription_id())
6 changes: 4 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
azure-identity==1.13.0
azure-identity==1.15.0
azure-mgmt-resource==23.0.1
azure-mgmt-eventhub==11.0.0
azure-mgmt-privatedns==1.1.0
azure-mgmt-network==25.2.0
requests==2.31.0
git+https://github.com/MaibornWolff/hybrid-cloud-operator-library.git@068c411
git+https://github.com/MaibornWolff/hybrid-cloud-operator-library.git@19a8275

0 comments on commit efbc600

Please sign in to comment.