Skip to content

Commit

Permalink
[DPE-4236] Nodeport upgrade (#342)
Browse files Browse the repository at this point in the history
* Add data integrator upgrade test

* Repatch service

* Create separate service

* Enable nodeport tests on juju2

* Hack upgrade hook

* Resume data-integrator upgrade

* Fix test

* Lock file maintenance (#341)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

* Add unit tests

* Add label

* Increase coverage

---------

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
  • Loading branch information
dragomirp and renovate[bot] authored Jul 19, 2024
1 parent 92a01e7 commit 8c12894
Show file tree
Hide file tree
Showing 9 changed files with 359 additions and 66 deletions.
76 changes: 44 additions & 32 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,16 +196,17 @@ def _node_ip(self) -> Optional[str]:

def _node_port(self, port_type: str) -> int:
"""Return node port."""
service_name = f"{self.app.name}-nodeport"
try:
service = lightkube.Client().get(
lightkube.resources.core_v1.Service, self.app.name, namespace=self._namespace
lightkube.resources.core_v1.Service, service_name, namespace=self._namespace
)
except lightkube.ApiError as e:
if e.status.code == 403:
self.on_deployed_without_trust()
return
if not service or not service.spec.type == "NodePort":
return -1
if e.status.code == 404:
return -1
# svc.spec.ports
# [ServicePort(port=6432, appProtocol=None, name=None, nodePort=31438, protocol='TCP', targetPort=6432)]
# Keeping this distinction, so we have similarity with mysql-router-k8s
Expand Down Expand Up @@ -257,59 +258,70 @@ def get_node_ports(self) -> Dict[str, Any]:
# =======================

def patch_port(self, use_node_port: bool = False) -> None:
"""Patch Juju-created k8s service.
The k8s service will be tied to pod-0 so that the service is auto cleaned by
k8s when the last pod is scaled down.
"""
"""Create or delete a nodeport service for external node connectivity."""
if not self.unit.is_leader():
return

try:
logger.debug("Patching k8s service")
client = lightkube.Client()
client = lightkube.Client()
service_name = f"{self.app.name}-nodeport"
if use_node_port:
logger.debug("Creating nodeport service")
pod0 = client.get(
res=lightkube.resources.core_v1.Pod,
name=self.app.name + "-0",
namespace=self.model.name,
namespace=self._namespace,
)
service = lightkube.resources.core_v1.Service(
apiVersion="v1",
kind="Service",
metadata=lightkube.models.meta_v1.ObjectMeta(
name=self.app.name,
namespace=self.model.name,
name=service_name,
namespace=self._namespace,
ownerReferences=pod0.metadata.ownerReferences,
labels={
"app.kubernetes.io/name": self.app.name,
},
),
spec=lightkube.models.core_v1.ServiceSpec(
selector={"app.kubernetes.io/name": self.app.name},
ports=[
lightkube.models.core_v1.ServicePort(
name="pgbouncer",
port=self.config["listen_port"],
targetPort=self.config["listen_port"],
)
],
type=("NodePort" if use_node_port else "ClusterIP"),
selector={"app.kubernetes.io/name": self.app.name},
type="NodePort",
),
)
client.patch(
res=lightkube.resources.core_v1.Service,
obj=service,
name=service.metadata.name,
namespace=service.metadata.namespace,
force=True,
field_manager=self.app.name,
patch_type=lightkube.types.PatchType.MERGE,
)
logger.debug("Patched k8s service")
except lightkube.ApiError as e:
if e.status.code == 403:
self.on_deployed_without_trust()
return
logger.exception("Failed to patch k8s service")
raise

try:
client.create(service)
logger.info(f"Kubernetes service {service_name} created")
except lightkube.ApiError as e:
if e.status.code == 403:
self.on_deployed_without_trust()
return
if e.status.code == 409:
logger.info("Nodeport service already exists")
return
logger.exception("Failed to create k8s service")
raise
else:
logger.debug("Deleting nodeport service")
try:
client.delete(
lightkube.resources.core_v1.Service, service_name, namespace=self.model.name
)
except lightkube.ApiError as e:
if e.status.code == 403:
self.on_deployed_without_trust()
return
if e.status.code == 404:
logger.info("No nodeport service to clean up")
return
logger.exception("Failed to delete k8s service")
raise

@property
def version(self) -> str:
Expand Down
13 changes: 8 additions & 5 deletions src/relations/backend_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,11 @@ def _on_relation_departed(self, event: RelationDepartedEvent):

if event.departing_unit == self.charm.unit:
# This should only occur when the relation is being removed, not on scale-down
self.charm.peers.unit_databag.update({
f"{BACKEND_RELATION_NAME}_{event.relation.id}_departing": "true"
})
logger.warning("added relation-departing flag to peer databag")
if self.charm.peers.unit_databag:
self.charm.peers.unit_databag.update({
f"{BACKEND_RELATION_NAME}_{event.relation.id}_departing": "true"
})
logger.warning("added relation-departing flag to peer databag")
return

if not self.charm.unit.is_leader() or event.departing_unit.app != self.charm.app:
Expand Down Expand Up @@ -382,7 +383,9 @@ def _on_relation_broken(self, event: RelationBrokenEvent):
Removes all traces of this relation from pgbouncer config.
"""
depart_flag = f"{BACKEND_RELATION_NAME}_{event.relation.id}_departing"
if self.charm.peers.unit_databag.get(depart_flag, False):
if not self.charm.peers.unit_databag or self.charm.peers.unit_databag.get(
depart_flag, False
):
logging.info("exiting relation-broken hook - nothing to do")
return

Expand Down
9 changes: 8 additions & 1 deletion src/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from pydantic import BaseModel
from typing_extensions import override

from constants import CLIENT_RELATION_NAME

DEFAULT_MESSAGE = "Pre-upgrade check failed and cannot safely upgrade"

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -86,9 +88,14 @@ def _on_pgbouncer_pebble_ready(self, event: WorkloadEvent) -> None:
event.defer()
return

if self.peer_relation.data[self.charm.unit].get("state") != "upgrading":
if self.state not in ["upgrading", "recovery"]:
return

if self.charm.unit.is_leader() and self.charm.client_relation.external_connectivity():
self.charm.patch_port(True)
for relation in self.model.relations.get(CLIENT_RELATION_NAME, []):
self.charm.client_relation.update_connection_info(relation)

try:
self._cluster_checks()
except ClusterNotReadyError:
Expand Down
45 changes: 44 additions & 1 deletion tests/integration/relations/pgbouncer_provider/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import asyncio
import json
import logging
from typing import Optional
from typing import Dict, Optional
from uuid import uuid4

import psycopg2
import yaml
from juju.unit import Unit
from lightkube import AsyncClient
from lightkube.resources.core_v1 import Pod
from pytest_operator.plugin import OpsTest
Expand Down Expand Up @@ -249,3 +252,43 @@ async def delete_pod(ops_test: OpsTest, unit_name: str) -> None:
model = ops_test.model.info
client = AsyncClient(namespace=model.name)
await client.delete(Pod, name=unit_name.replace("/", "-"))


async def fetch_action_get_credentials(unit: Unit) -> Dict:
"""Helper to run an action to fetch connection info.
Args:
unit: The juju unit on which to run the get_credentials action for credentials
Returns:
A dictionary with the username, password and access info for the service
"""
action = await unit.run_action(action_name="get-credentials")
result = await action.wait()
return result.results


def check_exposed_connection(credentials, tls):
table_name = "expose_test"
smoke_val = str(uuid4())

host, port = credentials["postgresql"]["endpoints"].split(":")
user = credentials["postgresql"]["username"]
password = credentials["postgresql"]["password"]
database = credentials["postgresql"]["database"]
if tls:
sslmode = "require"
else:
sslmode = "disable"
connstr = f"dbname='{database}' user='{user}' host='{host}' port='{port}' password='{password}' connect_timeout=1 sslmode={sslmode}"
connection = psycopg2.connect(connstr)
connection.autocommit = True
smoke_query = (
f"DROP TABLE IF EXISTS {table_name};"
f"CREATE TABLE {table_name}(data TEXT);"
f"INSERT INTO {table_name}(data) VALUES('{smoke_val}');"
f"SELECT data FROM {table_name} WHERE data = '{smoke_val}';"
)
cursor = connection.cursor()
cursor.execute(smoke_query)

assert smoke_val == cursor.fetchone()[0]
29 changes: 8 additions & 21 deletions tests/integration/test_node_port.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
import logging
from pathlib import Path

import psycopg2
import pytest
import yaml
from pytest_operator.plugin import OpsTest

from . import architecture, markers
from . import architecture
from .helpers.ha_helpers import (
start_continuous_writes,
stop_continuous_writes,
Expand All @@ -23,10 +22,12 @@
POSTGRESQL_APP_NAME,
app_name,
get_endpoint_info,
get_juju_secret,
get_unit_info,
)
from .juju_ import juju_major_version
from .relations.pgbouncer_provider.helpers import (
check_exposed_connection,
fetch_action_get_credentials,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -147,23 +148,9 @@ async def test_node_port_and_clusterip_setup(ops_test: OpsTest):

@pytest.mark.group(1)
@pytest.mark.abort_on_fail
@markers.juju_secrets
async def test_data_integrator(ops_test: OpsTest):
"""Test the connection."""
endpoint = "postgresql"
info = (await get_unit_info(ops_test, f"{DATA_INTEGRATOR}/0"))["relation-info"]
info = list(filter(lambda x: x["endpoint"] == endpoint, info))[0]["application-data"]
userpass = await get_juju_secret(ops_test, info["secret-user"])
host, nodeport = info["endpoints"].split(":")

connection_string = (
f"dbname='{info['database']}' user='{userpass['username']}'"
f" host='{host}' port='{nodeport}' "
f"password='{userpass['password']}' connect_timeout=10"
credentials = await fetch_action_get_credentials(
ops_test.model.applications[DATA_INTEGRATOR].units[0]
)

with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor:
cursor.execute("select * from information_schema.tables;")
results = cursor.fetchone()
assert info["database"] in results
connection.close()
check_exposed_connection(credentials, True)
Loading

0 comments on commit 8c12894

Please sign in to comment.