From 8c128944a89b796be2e1ea3c4dab60ae33097db3 Mon Sep 17 00:00:00 2001 From: Dragomir Penev <6687393+dragomirp@users.noreply.github.com> Date: Fri, 19 Jul 2024 11:45:27 +0300 Subject: [PATCH] [DPE-4236] Nodeport upgrade (#342) * Add data integrator upgrade test * Repatch service * Create separate service * Enable nodeport tests on juju2 * Hack upgrade hook * Resume data-integrator upgrade * Fix test * Lock file maintenance (#341) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> * Add unit tests * Add label * Increase coverage --------- Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- src/charm.py | 76 +++++----- src/relations/backend_database.py | 13 +- src/upgrade.py | 9 +- .../relations/pgbouncer_provider/helpers.py | 45 +++++- tests/integration/test_node_port.py | 29 ++-- .../test_upgrade_data_integrator.py | 130 ++++++++++++++++++ .../unit/relations/test_pgbouncer_provider.py | 5 +- tests/unit/test_charm.py | 101 +++++++++++++- tests/unit/test_upgrade.py | 17 ++- 9 files changed, 359 insertions(+), 66 deletions(-) create mode 100644 tests/integration/test_upgrade_data_integrator.py diff --git a/src/charm.py b/src/charm.py index 6eee4ba1b..6fa70e36d 100755 --- a/src/charm.py +++ b/src/charm.py @@ -196,16 +196,17 @@ def _node_ip(self) -> Optional[str]: def _node_port(self, port_type: str) -> int: """Return node port.""" + service_name = f"{self.app.name}-nodeport" try: service = lightkube.Client().get( - lightkube.resources.core_v1.Service, self.app.name, namespace=self._namespace + lightkube.resources.core_v1.Service, service_name, namespace=self._namespace ) except lightkube.ApiError as e: if e.status.code == 403: self.on_deployed_without_trust() return - if not service or not service.spec.type == "NodePort": - return -1 + if e.status.code == 404: + return -1 # svc.spec.ports # [ServicePort(port=6432, appProtocol=None, name=None, nodePort=31438, protocol='TCP', targetPort=6432)] # Keeping this distinction, so we have similarity with mysql-router-k8s @@ -257,32 +258,32 @@ def get_node_ports(self) -> Dict[str, Any]: # ======================= def patch_port(self, use_node_port: bool = False) -> None: - """Patch Juju-created k8s service. - - The k8s service will be tied to pod-0 so that the service is auto cleaned by - k8s when the last pod is scaled down. - """ + """Create or delete a nodeport service for external node connectivity.""" if not self.unit.is_leader(): return - try: - logger.debug("Patching k8s service") - client = lightkube.Client() + client = lightkube.Client() + service_name = f"{self.app.name}-nodeport" + if use_node_port: + logger.debug("Creating nodeport service") pod0 = client.get( res=lightkube.resources.core_v1.Pod, name=self.app.name + "-0", - namespace=self.model.name, + namespace=self._namespace, ) service = lightkube.resources.core_v1.Service( + apiVersion="v1", + kind="Service", metadata=lightkube.models.meta_v1.ObjectMeta( - name=self.app.name, - namespace=self.model.name, + name=service_name, + namespace=self._namespace, ownerReferences=pod0.metadata.ownerReferences, labels={ "app.kubernetes.io/name": self.app.name, }, ), spec=lightkube.models.core_v1.ServiceSpec( + selector={"app.kubernetes.io/name": self.app.name}, ports=[ lightkube.models.core_v1.ServicePort( name="pgbouncer", @@ -290,26 +291,37 @@ def patch_port(self, use_node_port: bool = False) -> None: targetPort=self.config["listen_port"], ) ], - type=("NodePort" if use_node_port else "ClusterIP"), - selector={"app.kubernetes.io/name": self.app.name}, + type="NodePort", ), ) - client.patch( - res=lightkube.resources.core_v1.Service, - obj=service, - name=service.metadata.name, - namespace=service.metadata.namespace, - force=True, - field_manager=self.app.name, - patch_type=lightkube.types.PatchType.MERGE, - ) - logger.debug("Patched k8s service") - except lightkube.ApiError as e: - if e.status.code == 403: - self.on_deployed_without_trust() - return - logger.exception("Failed to patch k8s service") - raise + + try: + client.create(service) + logger.info(f"Kubernetes service {service_name} created") + except lightkube.ApiError as e: + if e.status.code == 403: + self.on_deployed_without_trust() + return + if e.status.code == 409: + logger.info("Nodeport service already exists") + return + logger.exception("Failed to create k8s service") + raise + else: + logger.debug("Deleting nodeport service") + try: + client.delete( + lightkube.resources.core_v1.Service, service_name, namespace=self.model.name + ) + except lightkube.ApiError as e: + if e.status.code == 403: + self.on_deployed_without_trust() + return + if e.status.code == 404: + logger.info("No nodeport service to clean up") + return + logger.exception("Failed to delete k8s service") + raise @property def version(self) -> str: diff --git a/src/relations/backend_database.py b/src/relations/backend_database.py index 426fb178a..9d2c3f417 100644 --- a/src/relations/backend_database.py +++ b/src/relations/backend_database.py @@ -344,10 +344,11 @@ def _on_relation_departed(self, event: RelationDepartedEvent): if event.departing_unit == self.charm.unit: # This should only occur when the relation is being removed, not on scale-down - self.charm.peers.unit_databag.update({ - f"{BACKEND_RELATION_NAME}_{event.relation.id}_departing": "true" - }) - logger.warning("added relation-departing flag to peer databag") + if self.charm.peers.unit_databag: + self.charm.peers.unit_databag.update({ + f"{BACKEND_RELATION_NAME}_{event.relation.id}_departing": "true" + }) + logger.warning("added relation-departing flag to peer databag") return if not self.charm.unit.is_leader() or event.departing_unit.app != self.charm.app: @@ -382,7 +383,9 @@ def _on_relation_broken(self, event: RelationBrokenEvent): Removes all traces of this relation from pgbouncer config. """ depart_flag = f"{BACKEND_RELATION_NAME}_{event.relation.id}_departing" - if self.charm.peers.unit_databag.get(depart_flag, False): + if not self.charm.peers.unit_databag or self.charm.peers.unit_databag.get( + depart_flag, False + ): logging.info("exiting relation-broken hook - nothing to do") return diff --git a/src/upgrade.py b/src/upgrade.py index 9d24f15a5..56a5e3e20 100644 --- a/src/upgrade.py +++ b/src/upgrade.py @@ -20,6 +20,8 @@ from pydantic import BaseModel from typing_extensions import override +from constants import CLIENT_RELATION_NAME + DEFAULT_MESSAGE = "Pre-upgrade check failed and cannot safely upgrade" logger = logging.getLogger(__name__) @@ -86,9 +88,14 @@ def _on_pgbouncer_pebble_ready(self, event: WorkloadEvent) -> None: event.defer() return - if self.peer_relation.data[self.charm.unit].get("state") != "upgrading": + if self.state not in ["upgrading", "recovery"]: return + if self.charm.unit.is_leader() and self.charm.client_relation.external_connectivity(): + self.charm.patch_port(True) + for relation in self.model.relations.get(CLIENT_RELATION_NAME, []): + self.charm.client_relation.update_connection_info(relation) + try: self._cluster_checks() except ClusterNotReadyError: diff --git a/tests/integration/relations/pgbouncer_provider/helpers.py b/tests/integration/relations/pgbouncer_provider/helpers.py index 700fa3d61..a8c037cdd 100644 --- a/tests/integration/relations/pgbouncer_provider/helpers.py +++ b/tests/integration/relations/pgbouncer_provider/helpers.py @@ -4,9 +4,12 @@ import asyncio import json import logging -from typing import Optional +from typing import Dict, Optional +from uuid import uuid4 +import psycopg2 import yaml +from juju.unit import Unit from lightkube import AsyncClient from lightkube.resources.core_v1 import Pod from pytest_operator.plugin import OpsTest @@ -249,3 +252,43 @@ async def delete_pod(ops_test: OpsTest, unit_name: str) -> None: model = ops_test.model.info client = AsyncClient(namespace=model.name) await client.delete(Pod, name=unit_name.replace("/", "-")) + + +async def fetch_action_get_credentials(unit: Unit) -> Dict: + """Helper to run an action to fetch connection info. + + Args: + unit: The juju unit on which to run the get_credentials action for credentials + Returns: + A dictionary with the username, password and access info for the service + """ + action = await unit.run_action(action_name="get-credentials") + result = await action.wait() + return result.results + + +def check_exposed_connection(credentials, tls): + table_name = "expose_test" + smoke_val = str(uuid4()) + + host, port = credentials["postgresql"]["endpoints"].split(":") + user = credentials["postgresql"]["username"] + password = credentials["postgresql"]["password"] + database = credentials["postgresql"]["database"] + if tls: + sslmode = "require" + else: + sslmode = "disable" + connstr = f"dbname='{database}' user='{user}' host='{host}' port='{port}' password='{password}' connect_timeout=1 sslmode={sslmode}" + connection = psycopg2.connect(connstr) + connection.autocommit = True + smoke_query = ( + f"DROP TABLE IF EXISTS {table_name};" + f"CREATE TABLE {table_name}(data TEXT);" + f"INSERT INTO {table_name}(data) VALUES('{smoke_val}');" + f"SELECT data FROM {table_name} WHERE data = '{smoke_val}';" + ) + cursor = connection.cursor() + cursor.execute(smoke_query) + + assert smoke_val == cursor.fetchone()[0] diff --git a/tests/integration/test_node_port.py b/tests/integration/test_node_port.py index 7e7021474..118066a40 100644 --- a/tests/integration/test_node_port.py +++ b/tests/integration/test_node_port.py @@ -5,12 +5,11 @@ import logging from pathlib import Path -import psycopg2 import pytest import yaml from pytest_operator.plugin import OpsTest -from . import architecture, markers +from . import architecture from .helpers.ha_helpers import ( start_continuous_writes, stop_continuous_writes, @@ -23,10 +22,12 @@ POSTGRESQL_APP_NAME, app_name, get_endpoint_info, - get_juju_secret, - get_unit_info, ) from .juju_ import juju_major_version +from .relations.pgbouncer_provider.helpers import ( + check_exposed_connection, + fetch_action_get_credentials, +) logger = logging.getLogger(__name__) @@ -147,23 +148,9 @@ async def test_node_port_and_clusterip_setup(ops_test: OpsTest): @pytest.mark.group(1) @pytest.mark.abort_on_fail -@markers.juju_secrets async def test_data_integrator(ops_test: OpsTest): """Test the connection.""" - endpoint = "postgresql" - info = (await get_unit_info(ops_test, f"{DATA_INTEGRATOR}/0"))["relation-info"] - info = list(filter(lambda x: x["endpoint"] == endpoint, info))[0]["application-data"] - userpass = await get_juju_secret(ops_test, info["secret-user"]) - host, nodeport = info["endpoints"].split(":") - - connection_string = ( - f"dbname='{info['database']}' user='{userpass['username']}'" - f" host='{host}' port='{nodeport}' " - f"password='{userpass['password']}' connect_timeout=10" + credentials = await fetch_action_get_credentials( + ops_test.model.applications[DATA_INTEGRATOR].units[0] ) - - with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: - cursor.execute("select * from information_schema.tables;") - results = cursor.fetchone() - assert info["database"] in results - connection.close() + check_exposed_connection(credentials, True) diff --git a/tests/integration/test_upgrade_data_integrator.py b/tests/integration/test_upgrade_data_integrator.py new file mode 100644 index 000000000..0134457d7 --- /dev/null +++ b/tests/integration/test_upgrade_data_integrator.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +import asyncio +import logging + +import pytest +from pytest_operator.plugin import OpsTest + +from constants import BACKEND_RELATION_NAME + +from .helpers.helpers import ( + PG, + PGB, +) +from .helpers.postgresql_helpers import ( + get_leader_unit, + get_unit_by_index, +) +from .relations.pgbouncer_provider.helpers import ( + check_exposed_connection, + fetch_action_get_credentials, +) + +logger = logging.getLogger(__name__) + +TIMEOUT = 600 +DATA_INTEGRATOR_APP_NAME = "data-integrator" + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_deploy_stable(ops_test: OpsTest, pgb_charm) -> None: + """Simple test to ensure that the PostgreSQL and application charms get deployed.""" + await asyncio.gather( + ops_test.model.deploy( + PG, + num_units=3, + channel="14/edge", + trust=True, + config={"profile": "testing"}, + ), + ops_test.model.deploy( + PGB, + channel="1/stable", + trust=True, + num_units=2, + ), + ops_test.model.deploy( + DATA_INTEGRATOR_APP_NAME, + num_units=2, + channel="latest/edge", + config={"database-name": "test-database"}, + ), + ) + logger.info("Wait for applications to become active") + + await ops_test.model.add_relation(f"{PGB}:{BACKEND_RELATION_NAME}", f"{PG}:database") + await ops_test.model.add_relation(DATA_INTEGRATOR_APP_NAME, PGB) + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle( + apps=[PG, PGB, DATA_INTEGRATOR_APP_NAME], status="active", timeout=1200 + ) + assert len(ops_test.model.applications[PG].units) == 3 + assert len(ops_test.model.applications[PGB].units) == 2 + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_pre_upgrade_check(ops_test: OpsTest) -> None: + """Test that the pre-upgrade-check action runs successfully.""" + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, PGB) + assert leader_unit is not None, "No leader unit found" + + logger.info("Run pre-upgrade-check action") + action = await leader_unit.run_action("pre-upgrade-check") + await action.wait() + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_upgrade_from_stable(ops_test: OpsTest, pgb_charm): + """Test updating from stable channel.""" + credentials = await fetch_action_get_credentials( + ops_test.model.applications[DATA_INTEGRATOR_APP_NAME].units[0] + ) + check_exposed_connection(credentials, False) + global initial_credentials + initial_credentials = credentials + + application = ops_test.model.applications[PGB] + actions = await application.get_actions() + + logger.info("Refresh the charm") + await application.refresh(path=pgb_charm) + + logger.info("Wait for upgrade to start") + await ops_test.model.block_until( + lambda: ("waiting" if "pre-upgrade-check" in actions else "maintenance") + in {unit.workload_status for unit in application.units}, + timeout=TIMEOUT, + ) + + logger.info("Wait for upgrade to complete on first upgrading unit") + # highest ordinal unit always the first to upgrade + unit = get_unit_by_index(PGB, application.units, 1) + + async with ops_test.fast_forward("60s"): + await ops_test.model.block_until(lambda: unit.workload_status == "active", timeout=TIMEOUT) + await ops_test.model.wait_for_idle(apps=[PGB], idle_period=30, timeout=TIMEOUT) + + logger.info("Resume upgrade") + leader_unit = await get_leader_unit(ops_test, PGB) + action = await leader_unit.run_action("resume-upgrade") + await action.wait() + + logger.info("Wait for upgrade to complete") + async with ops_test.fast_forward("60s"): + await ops_test.model.wait_for_idle( + apps=[PGB], status="active", idle_period=30, timeout=TIMEOUT + ) + + credentials = await fetch_action_get_credentials( + ops_test.model.applications[DATA_INTEGRATOR_APP_NAME].units[0] + ) + check_exposed_connection(credentials, False) + + # TODO Enable when we habe persistent service + # assert credentials["postgresql"]["endpoints"] == initial_credentials["postgresql"]["endpoints"] diff --git a/tests/unit/relations/test_pgbouncer_provider.py b/tests/unit/relations/test_pgbouncer_provider.py index 4355fcde1..1d2fe2f46 100644 --- a/tests/unit/relations/test_pgbouncer_provider.py +++ b/tests/unit/relations/test_pgbouncer_provider.py @@ -125,6 +125,7 @@ def test_on_database_requested( }) _render_pgb_config.assert_called_once_with(reload_pgbouncer=True) + @patch("charm.PgBouncerK8sCharm._node_port") @patch("relations.backend_database.BackendDatabaseRequires.check_backend", return_value=True) @patch( "relations.backend_database.BackendDatabaseRequires.postgres", new_callable=PropertyMock @@ -132,7 +133,9 @@ def test_on_database_requested( @patch("charm.PgBouncerK8sCharm.set_relation_databases") @patch("charm.PgBouncerK8sCharm.generate_relation_databases") @patch("charm.lightkube") - def test_on_relation_broken(self, _lightkube, _gen_rel_dbs, _set_rel_dbs, _pg, _check_backend): + def test_on_relation_broken( + self, _lightkube, _gen_rel_dbs, _set_rel_dbs, _pg, _check_backend, _ + ): _pg.return_value.get_postgresql_version.return_value = "10" _gen_rel_dbs.return_value = {"1": {"name": "test_db", "legacy": False}} self.harness.set_leader() diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 1b33c89d1..516ac8036 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -9,6 +9,7 @@ from signal import SIGHUP from unittest.mock import Mock, PropertyMock, call, patch +import lightkube import psycopg2 import pytest from jinja2 import Template @@ -470,14 +471,106 @@ def test_raise_untrusted_error(self, _, __, _deployed_without_trust): self.charm._node_name self.charm._node_ip self.charm._node_port("port") - self.charm.patch_port() self.charm.get_all_k8s_node_hostnames_and_ips() self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus) - # 7 total calls: - # 5 (1 per method) + # 6 total calls: + # 4 (1 per method) # 2 extra _node_name calls from inside _node_ip and get_all_k8s_node_hostnames_and_ips - assert _deployed_without_trust.call_count == 7 + assert _deployed_without_trust.call_count == 6 + + @patch("lightkube.Client") + def test_patch_port(self, _client): + # not run unless leader + self.charm.patch_port() + + assert not _client.called + + self.harness.set_leader(True) + self.charm.patch_port(True) + service = lightkube.resources.core_v1.Service( + apiVersion="v1", + kind="Service", + metadata=lightkube.models.meta_v1.ObjectMeta( + name="pgbouncer-k8s-nodeport", + namespace=self.charm._namespace, + ownerReferences=_client.return_value.get.return_value.metadata.ownerReferences, + labels={"app.kubernetes.io/name": "pgbouncer-k8s"}, + ), + spec=lightkube.models.core_v1.ServiceSpec( + selector={"app.kubernetes.io/name": self.charm.app.name}, + ports=[ + lightkube.models.core_v1.ServicePort( + name="pgbouncer", + port=6432, + targetPort=6432, + ) + ], + type="NodePort", + ), + ) + + _client.return_value.create.assert_called_once_with(service) + + self.charm.patch_port(False) + + _client.return_value.delete.assert_called_once_with( + lightkube.resources.core_v1.Service, + "pgbouncer-k8s-nodeport", + namespace=self.charm.model.name, + ) + + @patch("charm.logger.info") + @patch("charm.PgBouncerK8sCharm.on_deployed_without_trust") + @patch("lightkube.Client") + def test_patch_port_create_errors(self, _client, _on_deployed_without_trust, _logger_info): + self.harness.set_leader(True) + + # No permissions + _client.return_value.create.side_effect = _FakeApiError(403) + + self.charm.patch_port(True) + + _on_deployed_without_trust.assert_called_once_with() + _on_deployed_without_trust.reset_mock() + + # Svc already exists + _client.return_value.create.side_effect = _FakeApiError(409) + + self.charm.patch_port(True) + + _logger_info.assert_called_once_with("Nodeport service already exists") + + # General error + _client.return_value.create.side_effect = _FakeApiError(500) + with self.assertRaises(_FakeApiError): + self.charm.patch_port(True) + + @patch("charm.logger.info") + @patch("charm.PgBouncerK8sCharm.on_deployed_without_trust") + @patch("lightkube.Client") + def test_patch_port_delete_errors(self, _client, _on_deployed_without_trust, _logger_info): + self.harness.set_leader(True) + + # No permissions + _client.return_value.delete.side_effect = _FakeApiError(403) + + self.charm.patch_port(False) + + _on_deployed_without_trust.assert_called_once_with() + _on_deployed_without_trust.reset_mock() + + # Svc already exists + _client.return_value.delete.side_effect = _FakeApiError(404) + + self.charm.patch_port(False) + + _logger_info.assert_called_once_with("No nodeport service to clean up") + + # General error + _client.return_value.delete.side_effect = _FakeApiError(500) + with self.assertRaises(_FakeApiError): + self.charm.patch_port(False) # # Secrets diff --git a/tests/unit/test_upgrade.py b/tests/unit/test_upgrade.py index 17a50458b..6c5f12d13 100644 --- a/tests/unit/test_upgrade.py +++ b/tests/unit/test_upgrade.py @@ -64,11 +64,18 @@ def test_pre_upgrade_check_cluster_not_ready( _set_partition.assert_called_once_with(2) + @patch("charm.PgBouncerProvider.external_connectivity", return_value=True) + @patch("charm.PgBouncerK8sCharm.patch_port") @patch("charm.PgbouncerUpgrade.set_unit_completed") @patch("charm.PgbouncerUpgrade._cluster_checks") @patch("charm.PgbouncerUpgrade.peer_relation", new_callable=PropertyMock, return_value=None) def test_on_pgbouncer_pebble_ready( - self, _peer_relation: Mock, _cluster_checks: Mock, _set_unit_completed: Mock + self, + _peer_relation: Mock, + _cluster_checks: Mock, + _set_unit_completed: Mock, + _patch_port: Mock, + _, ): event = Mock() @@ -102,6 +109,14 @@ def test_on_pgbouncer_pebble_ready( event.defer.assert_called_once_with() + # Will try to repatch the nodeport service + self.harness.set_leader(True) + _cluster_checks.side_effect = None + + self.charm.upgrade._on_pgbouncer_pebble_ready(event) + + _patch_port.assert_called_once_with(True) + @patch("charm.PgBouncerK8sCharm.check_pgb_running", return_value=True) @patch("charm.PgBouncerK8sCharm.update_config") @patch("charm.PgbouncerUpgrade.peer_relation", new_callable=PropertyMock, return_value=None)