From 8e8cec54479dbbd5992256baac1fd1248b4a0bcb Mon Sep 17 00:00:00 2001 From: Dmitry Ratushnyy Date: Tue, 12 Dec 2023 11:24:10 +0000 Subject: [PATCH] Fix HA tests --- tests/integration/ha_tests/helpers.py | 44 ++++++++--- tests/integration/ha_tests/test_ha.py | 15 +++- tests/integration/helpers.py | 102 +++++++++++++++++++++++++- 3 files changed, 142 insertions(+), 19 deletions(-) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 6f1bee3a4..fb9256dd5 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -319,10 +319,15 @@ async def mongod_ready(ops_test: OpsTest, unit: int) -> bool: async def get_replica_set_primary( - ops_test: OpsTest, excluded: List[str] = [], application_name=APP_NAME + ops_test: OpsTest, + excluded: List[str] = [], + application_name=APP_NAME, + use_subprocess_to_get_password=False, ) -> Optional[Unit]: """Returns the primary unit name based no the replica set host.""" - with await get_mongo_client(ops_test, excluded) as client: + with await get_mongo_client( + ops_test, excluded, use_subprocess_to_get_password=use_subprocess_to_get_password + ) as client: data = client.admin.command("replSetGetStatus") unit_name = host_to_unit(primary_host(data)) @@ -347,35 +352,50 @@ async def count_primaries(ops_test: OpsTest) -> int: return len([member for member in data["members"] if member["stateStr"] == "PRIMARY"]) -async def fetch_replica_set_members(ops_test: OpsTest) -> List[str]: +async def fetch_replica_set_members( + ops_test: OpsTest, use_subprocess_to_get_password=False +) -> List[str]: """Fetches the hosts listed as replica set members in the MongoDB replica set configuration. Args: ops_test: reference to deployment. + use_subprocess_to_get_password: whether to use subprocess to get password. """ # connect to replica set uri # get ips from MongoDB replica set configuration - with await get_mongo_client(ops_test) as client: + with await get_mongo_client( + ops_test, use_subprocess_to_get_password=use_subprocess_to_get_password + ) as client: data = client.admin.command("replSetGetConfig") return [member["host"].split(":")[0] for member in data["config"]["members"]] -async def get_direct_mongo_client(ops_test: OpsTest, unit: str) -> MongoClient: +async def get_direct_mongo_client( + ops_test: OpsTest, unit: str, use_subprocess_to_get_password=False +) -> MongoClient: """Returns a direct mongodb client to specific unit.""" - return MongoClient( - await mongodb_uri(ops_test, [int(unit.split("/")[1])]), directConnection=True + url = await mongodb_uri( + ops_test, + [int(unit.split("/")[1])], + use_subprocess_to_get_password=use_subprocess_to_get_password, ) + return MongoClient(url, directConnection=True) -async def get_mongo_client(ops_test: OpsTest, excluded: List[str] = []) -> MongoClient: +async def get_mongo_client( + ops_test: OpsTest, excluded: List[str] = [], use_subprocess_to_get_password=False +) -> MongoClient: """Returns a direct mongodb client potentially passing over some of the units.""" mongodb_name = await get_application_name(ops_test, APP_NAME) for unit in ops_test.model.applications[mongodb_name].units: if unit.name not in excluded and unit.workload_status == "active": - return MongoClient( - await mongodb_uri(ops_test, [int(unit.name.split("/")[1])]), directConnection=True + url = await mongodb_uri( + ops_test, + [int(unit.name.split("/")[1])], + use_subprocess_to_get_password=use_subprocess_to_get_password, ) + return MongoClient(url, directConnection=True) assert False, "No fitting unit could be found" @@ -657,7 +677,9 @@ async def wait_until_unit_in_status( ops_test: OpsTest, unit_to_check: Unit, online_unit: Unit, status: str ) -> None: """Waits until a replica is in the provided status as reported by MongoDB or timeout occurs.""" - with await get_direct_mongo_client(ops_test, online_unit.name) as client: + with await get_direct_mongo_client( + ops_test, online_unit.name, use_subprocess_to_get_password=True + ) as client: data = client.admin.command("replSetGetStatus") for member in data["members"]: diff --git a/tests/integration/ha_tests/test_ha.py b/tests/integration/ha_tests/test_ha.py index f6b103f70..7f7ae847e 100644 --- a/tests/integration/ha_tests/test_ha.py +++ b/tests/integration/ha_tests/test_ha.py @@ -559,28 +559,35 @@ async def test_network_cut(ops_test: OpsTest, continuous_writes, chaos_mesh): # Wait until Mongodb actually detects isolated instance logger.info(f"Waiting until Mongodb detects primary instance {primary.name} is not reachable") + await wait_until_unit_in_status(ops_test, primary, active_unit, "(not reachable/healthy)") # verify new writes are continuing by counting the number of writes before and after a 5 second # wait logger.info("Validating writes are continuing to DB") - with await get_mongo_client(ops_test, excluded=[primary.name]) as client: + with await get_mongo_client( + ops_test, excluded=[primary.name], use_subprocess_to_get_password=True + ) as client: writes = client[TEST_DB][TEST_COLLECTION].count_documents({}) time.sleep(5) more_writes = client[TEST_DB][TEST_COLLECTION].count_documents({}) assert more_writes > writes, "writes not continuing to DB" # verify that a new primary got elected, old primary is still cut off - new_primary = await get_replica_set_primary(ops_test, excluded=[primary.name]) + new_primary = await get_replica_set_primary( + ops_test, excluded=[primary.name], use_subprocess_to_get_password=True + ) assert new_primary.name != primary.name # Remove networkchaos policy isolating instance from cluster remove_instance_isolation(ops_test) - + + time.sleep(MEDIAN_REELECTION_TIME * 3) + await wait_until_unit_in_status(ops_test, primary, active_unit, "SECONDARY") # verify presence of primary, replica set member configuration, and number of primaries - member_hosts = await fetch_replica_set_members(ops_test) + member_hosts = await fetch_replica_set_members(ops_test, use_subprocess_to_get_password=True) assert set(member_hosts) == set(hostnames) assert ( await count_primaries(ops_test) == 1 diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 1b39fe426..e30529602 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -4,6 +4,7 @@ import json import logging import math +import subprocess from datetime import datetime from pathlib import Path from random import choices @@ -94,14 +95,17 @@ async def get_mongo_cmd(ops_test: OpsTest, unit_name: str): return mongo_cmd -async def mongodb_uri(ops_test: OpsTest, unit_ids: List[int] = None) -> str: +async def mongodb_uri( + ops_test: OpsTest, unit_ids: List[int] = None, use_subprocess_to_get_password=False +) -> str: if unit_ids is None: unit_ids = UNIT_IDS - addresses = [await get_address_of_unit(ops_test, unit_id) for unit_id in unit_ids] hosts = ",".join(addresses) - password = await get_password(ops_test, unit_id=0) - + if use_subprocess_to_get_password: + password = get_password_using_subprocess(ops_test) + else: + password = await get_password(ops_test, 0) return f"mongodb://operator:{password}@{hosts}/admin" @@ -344,3 +348,93 @@ async def get_secret_content(ops_test, secret_id) -> Dict[str, str]: _, stdout, _ = await ops_test.juju(*complete_command.split()) data = json.loads(stdout) return data[secret_id]["content"]["Data"] + + +def create_pod_if_not_exists(namespace, pod_name, container_name, image_name): + """Create a pod if not already exists.""" + logger.info("Checking or creating helper mongo pod ...") + get_pod_cmd = f"kubectl get pod {pod_name} -n {namespace} -o json" + result = subprocess.run(get_pod_cmd, shell=True, capture_output=True, text=True) + + if result.returncode == 0: + logger.info(f"pod '{pod_name}' in namespace '{namespace}' already exists.") + return + + if "NotFound" in result.stderr: + pod_manifest = { + "apiVersion": "v1", + "kind": "Pod", + "metadata": {"name": pod_name, "namespace": namespace}, + "spec": { + "restartPolicy": "Never", + "containers": [ + { + "name": container_name, + "image": image_name, + "command": ["/bin/bash"], + "stdin": True, + "tty": True, + } + ], + }, + } + + pod_manifest_json = json.dumps(pod_manifest) + + create_pod_cmd = f"echo '{pod_manifest_json}' | kubectl apply -f -" + create_result = subprocess.run(create_pod_cmd, shell=True, capture_output=True, text=True) + + if create_result.returncode == 0: + logger.info(f"pod '{pod_name}' created in namespace '{namespace}'.") + else: + logger.error(f"Failed to create pod: {create_result.stderr}") + else: + logger.error(f"Failed to check pod existence: {result.stderr}") + + +def is_pod_ready(namespace, pod_name): + """Checks that the pod is ready.""" + get_pod_cmd = f"kubectl get pod {pod_name} -n {namespace} -o json" + result = subprocess.run(get_pod_cmd, shell=True, capture_output=True, text=True) + logger.info(f"Checking pod {pod_name} is ready...") + if result.returncode != 0: + return False + + pod_info = json.loads(result.stdout) + for condition in pod_info["status"].get("conditions", []): + if condition["type"] == "Ready" and condition["status"] == "True": + return True + return False + + +@retry( + stop=stop_after_attempt(5), + wait=wait_fixed(30), + reraise=True, +) +def get_password_using_subprocess(ops_test: OpsTest, username="operator") -> str: + """Use the charm action to retrieve the password from provided unit. + + Returns: + String with the password stored on the peer relation databag. + """ + cmd = ["juju", "switch", ops_test.model_name] + result = subprocess.run(cmd, capture_output=True) + if result.returncode != 0: + logger.error( + "Failed to get password. Can't switch to juju model: '%s'. Error '%s'", + ops_test.model_name, + result.stderr, + ) + raise Exception(f"Failed to get password: {result.stderr}") + cmd = ["juju", "run", f"{APP_NAME}/leader", "get-password", f"username={username}"] + result = subprocess.run(cmd, capture_output=True) + if result.returncode != 0: + logger.error("get-password command returned non 0 exit code: %s", result.stderr) + raise Exception(f"get-password command returned non 0 exit code: {result.stderr}") + try: + password = result.stdout.decode("utf-8").split("password:")[-1].strip() + except Exception as e: + logger.error("Failed to get password: %s", e) + raise Exception(f"Failed to get password: {e}") + return password