Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-3777] Add node port support #264

Merged
merged 8 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 84 additions & 13 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import os
import socket
from configparser import ConfigParser
from typing import Dict, Literal, Optional, Union, get_args
from typing import Any, Dict, Literal, Optional, Tuple, Union, get_args

import lightkube
from charms.data_platform_libs.v0.data_interfaces import DataPeer, DataPeerUnit
Expand Down Expand Up @@ -69,6 +69,7 @@ class PgBouncerK8sCharm(CharmBase):
def __init__(self, *args):
super().__init__(*args)

self._namespace = self.model.name
self.peer_relation_app = DataPeer(
self,
relation_name=PEER_RELATION_NAME,
Expand Down Expand Up @@ -137,11 +138,89 @@ def __init__(self, *args):
substrate="k8s",
)

@property
def _node_name(self) -> str:
"""Return the node name for this unit's pod ip."""
pod = lightkube.Client().get(
lightkube.resources.core_v1.Pod,
name=self.unit.name.replace("/", "-"),
namespace=self._namespace,
)
return pod.spec.nodeName

@property
def _node_ip(self) -> Optional[str]:
"""Return node IP."""
node = lightkube.Client().get(
lightkube.resources.core_v1.Node,
name=self._node_name,
namespace=self._namespace,
)
# [
# NodeAddress(address='192.168.0.228', type='InternalIP'),
# NodeAddress(address='example.com', type='Hostname')
# ]
# Remember that OpenStack, for example, will return an internal hostname, which is not
# accessible from the outside. Give preference to ExternalIP, then InternalIP first
# Separated, as we want to give preference to ExternalIP, InternalIP and then Hostname
for typ in ["ExternalIP", "InternalIP", "Hostname"]:
for a in node.status.addresses:
if a.type == typ:
return a.address

def _node_port(self, port_type: str) -> int:
"""Return node port."""
service = lightkube.Client().get(
lightkube.resources.core_v1.Service, self.app.name, namespace=self._namespace
)
if not service or not service.spec.type == "NodePort":
return -1
# svc.spec.ports
# [ServicePort(port=6432, appProtocol=None, name=None, nodePort=31438, protocol='TCP', targetPort=6432)]
# Keeping this distinction, so we have similarity with mysql-router-k8s
if port_type == "rw":
port = self.config["listen_port"]
elif port_type == "ro":
port = self.config["listen_port"]
else:
raise ValueError(f"Invalid {port_type=}")
logger.debug(f"Looking for NodePort for {port_type} in {service.spec.ports}")
for svc_port in service.spec.ports:
if svc_port.port == port:
return svc_port.nodePort
raise Exception(f"NodePort not found for {port_type}")

def get_all_k8s_node_hostnames_and_ips(
self,
) -> Tuple[list[str], list[str]]:
"""Return all node hostnames and IPs registered in k8s."""
node = lightkube.Client().get(
lightkube.resources.core_v1.Node,
name=self._node_name,
namespace=self._namespace,
)
hostnames = []
ips = []
for a in node.status.addresses:
if a.type in ["ExternalIP", "InternalIP"]:
ips.append(a.address)
elif a.type == "Hostname":
hostnames.append(a.address)
return hostnames, ips

@property
def get_node_ports(self) -> Dict[str, Any]:
"""Returns the service's nodes IPs and ports for both rw and ro types."""
return {
"rw": f"{self._node_ip}:{self._node_port('rw')}",
"ro": f"{self._node_ip}:{self._node_port('ro')}",
}

# =======================
# Charm Lifecycle Hooks
# =======================

def _patch_port(self) -> None:
def patch_port(self, use_node_port: bool = False) -> None:
"""Patch Juju-created k8s service.

The k8s service will be tied to pod-0 so that the service is auto cleaned by
Expand Down Expand Up @@ -175,6 +254,7 @@ def _patch_port(self) -> None:
targetPort=self.config["listen_port"],
)
],
type=("NodePort" if use_node_port else "ClusterIP"),
selector={"app.kubernetes.io/name": self.app.name},
),
)
Expand Down Expand Up @@ -268,10 +348,7 @@ def _on_pgbouncer_pebble_ready(self, event: PebbleReadyEvent) -> None:
self.unit.set_workload_version(self.version)

self.peers.unit_databag["container_initialised"] = "True"
if JujuVersion.from_environ().supports_open_port_on_k8s:
self.unit.open_port("tcp", self.config["listen_port"])
else:
self._patch_port()
self.patch_port()

@property
def is_container_ready(self) -> bool:
Expand Down Expand Up @@ -299,13 +376,7 @@ def _on_config_changed(self, event: ConfigChangedEvent) -> None:
# This emits relation-changed events to every client relation, so only do it when
# necessary
self.update_client_connection_info(self.config["listen_port"])

if JujuVersion.from_environ().supports_open_port_on_k8s:
if old_port:
self.unit.close_port("tcp", int(old_port))
self.unit.open_port("tcp", self.config["listen_port"])
else:
self._patch_port()
self.patch_port()

self.render_pgb_config()
try:
Expand Down
4 changes: 2 additions & 2 deletions src/relations/peers.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def _on_created(self, event: RelationCreatedEvent):
def _on_joined(self, event: HookEvent):
self._on_changed(event)
if self.charm.unit.is_leader():
self.charm.client_relation.update_read_only_endpoints()
self.charm.client_relation.update_endpoints()

def _on_changed(self, event: HookEvent):
"""If the current unit is a follower, write updated config and auth files to filesystem.
Expand Down Expand Up @@ -170,7 +170,7 @@ def _on_departed(self, event):
"Leader unit removed - waiting for leader_elected event"
)
if self.charm.unit.is_leader():
self.charm.client_relation.update_read_only_endpoints()
self.charm.client_relation.update_endpoints()

def _on_leader_elected(self, _):
self.update_leader()
Expand Down
77 changes: 51 additions & 26 deletions src/relations/pgbouncer_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
from ops.model import (
Application,
BlockedStatus,
MaintenanceStatus,
)

from constants import CLIENT_RELATION_NAME
Expand Down Expand Up @@ -87,6 +86,27 @@ def __init__(self, charm: CharmBase, relation_name: str = CLIENT_RELATION_NAME)
charm.on[self.relation_name].relation_broken, self._on_relation_broken
)

def external_connectivity(self, event=None) -> bool:
"""Whether any of the relations are marked as external."""
# New list to avoid modifying the original
relations = list(self.model.relations[self.relation_name])
if (
event
and type(event) in [RelationBrokenEvent, RelationDepartedEvent]
and event.relation in relations
):
# Disregard of what has been requested by the client, this relation is being removed
relations.remove(event.relation)

# Now, we will return true if any of the relations are marked as external OR if the event
# itself is requesting for an external_node_connectivity.
# Not all events received have external-node-connectivity
external_conn = getattr(event, "external_node_connectivity", lambda: False)()
return (event and external_conn) or any(
relation.data[relation.app].get("external-node-connectivity", "false") == "true"
for relation in relations
)

def _depart_flag(self, relation):
return f"{self.relation_name}_{relation.id}_departing"

Expand Down Expand Up @@ -161,6 +181,9 @@ def _on_relation_departed(self, event: RelationDepartedEvent) -> None:
if event.departing_unit == self.charm.unit:
self.charm.peers.unit_databag.update({self._depart_flag(event.relation): "true"})

# Check if we need to close the node port
self.charm.patch_port(self.external_connectivity(event))

phvalguima marked this conversation as resolved.
Show resolved Hide resolved
def _on_relation_broken(self, event: RelationBrokenEvent) -> None:
"""Remove the user created for this relation, and revoke connection permissions."""
self.update_connection_info(event.relation)
Expand All @@ -186,47 +209,49 @@ def _on_relation_broken(self, event: RelationBrokenEvent) -> None:
f"Failed to delete user during {self.relation_name} relation broken event"
)
raise
# Check if we need to close the node port
self.charm.patch_port(self.external_connectivity(event))

def update_connection_info(self, relation):
"""Updates client-facing relation information."""
# Set the read/write endpoint.
if not self.charm.unit.is_leader():
return
initial_status = self.charm.unit.status
self.charm.unit.status = MaintenanceStatus(
f"Updating {self.relation_name} relation connection information"
)
endpoint = f"{self.charm.leader_hostname}:{self.charm.config['listen_port']}"
self.database_provides.set_endpoints(relation.id, endpoint)

self.update_read_only_endpoints()
if relation.data[relation.app].get("external-node-connectivity", "false") == "true":
# Make sure we have a node port exposed
self.charm.patch_port(True)

self.update_endpoints(relation)

# Set the database version.
if self.charm.backend.check_backend():
self.database_provides.set_version(
relation.id, self.charm.backend.postgres.get_postgresql_version()
)
self.charm.unit.status = initial_status

self.charm.update_status()
def update_endpoints(self, relation=None) -> None:
"""Set the endpoints for the relation."""
nodeports = self.charm.get_node_ports
internal_port = self.charm.config["listen_port"]
internal_hostnames = sorted(set(self.charm.peers.units_hostnames))
if self.charm.peers.leader_hostname in internal_hostnames:
internal_hostnames.remove(self.charm.peers.leader_hostname)
internal_rw = f"{self.charm.leader_hostname}:{self.charm.config['listen_port']}"

def update_read_only_endpoints(self, event: DatabaseRequestedEvent = None) -> None:
"""Set the read-only endpoint only if there are replicas."""
if not self.charm.unit.is_leader():
return
relations = [relation] if relation else self.model.relations[self.relation_name]

# Get the current relation or all the relations if this is triggered by another type of
# event.
relations = [event.relation] if event else self.model.relations[self.relation_name]

port = self.charm.config["listen_port"]
hostnames = set(self.charm.peers.units_hostnames)
hostnames.discard(self.charm.peers.leader_hostname)
# Set the endpoints for each relation.
for relation in relations:
self.database_provides.set_read_only_endpoints(
relation.id,
",".join([f"{host}:{port}" for host in hostnames]),
)
# Read-write endpoint
if relation.data[relation.app].get("external-node-connectivity", "false") == "true":
self.database_provides.set_endpoints(relation.id, nodeports["rw"])
self.database_provides.set_read_only_endpoints(relation.id, nodeports["ro"])
else:
self.database_provides.set_endpoints(relation.id, internal_rw)
self.database_provides.set_read_only_endpoints(
relation.id,
",".join([f"{host}:{internal_port}" for host in internal_hostnames]),
)

def get_database(self, relation):
"""Gets database name from relation."""
Expand Down
22 changes: 22 additions & 0 deletions tests/integration/helpers/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,28 @@ async def get_unit_info(ops_test: OpsTest, unit_name: str) -> Dict:
return json.loads(get_databag[1])[unit_name]


async def get_endpoint_info(ops_test: OpsTest, unit_name: str, endpoint: str) -> str:
"""Gets the endpoint information from the given unit.

Args:
ops_test: ops_test testing instance
unit_name: name of the unit
endpoint: name of the endpoint

Returns:
A str containing endpoint information available to juju
"""
get_databag = await ops_test.juju(
"show-unit",
unit_name,
"--format=json",
)
relation_info = json.loads(get_databag[1])[unit_name]["relation-info"]
return list(filter(lambda x: x["endpoint"] == endpoint, relation_info))[0]["application-data"][
"endpoints"
]


async def get_app_relation_databag(ops_test: OpsTest, unit_name: str, relation_id: int) -> Dict:
"""Gets the app relation databag from the given relation.

Expand Down
Loading
Loading