Skip to content

Commit

Permalink
Fix HA tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-ratushnyy committed Dec 12, 2023
1 parent 87b9afd commit 8e8cec5
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 19 deletions.
44 changes: 33 additions & 11 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,15 @@ async def mongod_ready(ops_test: OpsTest, unit: int) -> bool:


async def get_replica_set_primary(
ops_test: OpsTest, excluded: List[str] = [], application_name=APP_NAME
ops_test: OpsTest,
excluded: List[str] = [],
application_name=APP_NAME,
use_subprocess_to_get_password=False,
) -> Optional[Unit]:
"""Returns the primary unit name based no the replica set host."""
with await get_mongo_client(ops_test, excluded) as client:
with await get_mongo_client(
ops_test, excluded, use_subprocess_to_get_password=use_subprocess_to_get_password
) as client:
data = client.admin.command("replSetGetStatus")
unit_name = host_to_unit(primary_host(data))

Expand All @@ -347,35 +352,50 @@ async def count_primaries(ops_test: OpsTest) -> int:
return len([member for member in data["members"] if member["stateStr"] == "PRIMARY"])


async def fetch_replica_set_members(ops_test: OpsTest) -> List[str]:
async def fetch_replica_set_members(
ops_test: OpsTest, use_subprocess_to_get_password=False
) -> List[str]:
"""Fetches the hosts listed as replica set members in the MongoDB replica set configuration.
Args:
ops_test: reference to deployment.
use_subprocess_to_get_password: whether to use subprocess to get password.
"""
# connect to replica set uri
# get ips from MongoDB replica set configuration
with await get_mongo_client(ops_test) as client:
with await get_mongo_client(
ops_test, use_subprocess_to_get_password=use_subprocess_to_get_password
) as client:
data = client.admin.command("replSetGetConfig")

return [member["host"].split(":")[0] for member in data["config"]["members"]]


async def get_direct_mongo_client(ops_test: OpsTest, unit: str) -> MongoClient:
async def get_direct_mongo_client(
ops_test: OpsTest, unit: str, use_subprocess_to_get_password=False
) -> MongoClient:
"""Returns a direct mongodb client to specific unit."""
return MongoClient(
await mongodb_uri(ops_test, [int(unit.split("/")[1])]), directConnection=True
url = await mongodb_uri(
ops_test,
[int(unit.split("/")[1])],
use_subprocess_to_get_password=use_subprocess_to_get_password,
)
return MongoClient(url, directConnection=True)


async def get_mongo_client(ops_test: OpsTest, excluded: List[str] = []) -> MongoClient:
async def get_mongo_client(
ops_test: OpsTest, excluded: List[str] = [], use_subprocess_to_get_password=False
) -> MongoClient:
"""Returns a direct mongodb client potentially passing over some of the units."""
mongodb_name = await get_application_name(ops_test, APP_NAME)
for unit in ops_test.model.applications[mongodb_name].units:
if unit.name not in excluded and unit.workload_status == "active":
return MongoClient(
await mongodb_uri(ops_test, [int(unit.name.split("/")[1])]), directConnection=True
url = await mongodb_uri(
ops_test,
[int(unit.name.split("/")[1])],
use_subprocess_to_get_password=use_subprocess_to_get_password,
)
return MongoClient(url, directConnection=True)
assert False, "No fitting unit could be found"


Expand Down Expand Up @@ -657,7 +677,9 @@ async def wait_until_unit_in_status(
ops_test: OpsTest, unit_to_check: Unit, online_unit: Unit, status: str
) -> None:
"""Waits until a replica is in the provided status as reported by MongoDB or timeout occurs."""
with await get_direct_mongo_client(ops_test, online_unit.name) as client:
with await get_direct_mongo_client(
ops_test, online_unit.name, use_subprocess_to_get_password=True
) as client:
data = client.admin.command("replSetGetStatus")

for member in data["members"]:
Expand Down
15 changes: 11 additions & 4 deletions tests/integration/ha_tests/test_ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,28 +559,35 @@ async def test_network_cut(ops_test: OpsTest, continuous_writes, chaos_mesh):

# Wait until Mongodb actually detects isolated instance
logger.info(f"Waiting until Mongodb detects primary instance {primary.name} is not reachable")

await wait_until_unit_in_status(ops_test, primary, active_unit, "(not reachable/healthy)")

# verify new writes are continuing by counting the number of writes before and after a 5 second
# wait
logger.info("Validating writes are continuing to DB")
with await get_mongo_client(ops_test, excluded=[primary.name]) as client:
with await get_mongo_client(
ops_test, excluded=[primary.name], use_subprocess_to_get_password=True
) as client:
writes = client[TEST_DB][TEST_COLLECTION].count_documents({})
time.sleep(5)
more_writes = client[TEST_DB][TEST_COLLECTION].count_documents({})
assert more_writes > writes, "writes not continuing to DB"

# verify that a new primary got elected, old primary is still cut off
new_primary = await get_replica_set_primary(ops_test, excluded=[primary.name])
new_primary = await get_replica_set_primary(
ops_test, excluded=[primary.name], use_subprocess_to_get_password=True
)
assert new_primary.name != primary.name

# Remove networkchaos policy isolating instance from cluster
remove_instance_isolation(ops_test)


time.sleep(MEDIAN_REELECTION_TIME * 3)

await wait_until_unit_in_status(ops_test, primary, active_unit, "SECONDARY")

# verify presence of primary, replica set member configuration, and number of primaries
member_hosts = await fetch_replica_set_members(ops_test)
member_hosts = await fetch_replica_set_members(ops_test, use_subprocess_to_get_password=True)
assert set(member_hosts) == set(hostnames)
assert (
await count_primaries(ops_test) == 1
Expand Down
102 changes: 98 additions & 4 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import logging
import math
import subprocess
from datetime import datetime
from pathlib import Path
from random import choices
Expand Down Expand Up @@ -94,14 +95,17 @@ async def get_mongo_cmd(ops_test: OpsTest, unit_name: str):
return mongo_cmd


async def mongodb_uri(ops_test: OpsTest, unit_ids: List[int] = None) -> str:
async def mongodb_uri(
ops_test: OpsTest, unit_ids: List[int] = None, use_subprocess_to_get_password=False
) -> str:
if unit_ids is None:
unit_ids = UNIT_IDS

addresses = [await get_address_of_unit(ops_test, unit_id) for unit_id in unit_ids]
hosts = ",".join(addresses)
password = await get_password(ops_test, unit_id=0)

if use_subprocess_to_get_password:
password = get_password_using_subprocess(ops_test)
else:
password = await get_password(ops_test, 0)
return f"mongodb://operator:{password}@{hosts}/admin"


Expand Down Expand Up @@ -344,3 +348,93 @@ async def get_secret_content(ops_test, secret_id) -> Dict[str, str]:
_, stdout, _ = await ops_test.juju(*complete_command.split())
data = json.loads(stdout)
return data[secret_id]["content"]["Data"]


def create_pod_if_not_exists(namespace, pod_name, container_name, image_name):
"""Create a pod if not already exists."""
logger.info("Checking or creating helper mongo pod ...")
get_pod_cmd = f"kubectl get pod {pod_name} -n {namespace} -o json"
result = subprocess.run(get_pod_cmd, shell=True, capture_output=True, text=True)

if result.returncode == 0:
logger.info(f"pod '{pod_name}' in namespace '{namespace}' already exists.")
return

if "NotFound" in result.stderr:
pod_manifest = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {"name": pod_name, "namespace": namespace},
"spec": {
"restartPolicy": "Never",
"containers": [
{
"name": container_name,
"image": image_name,
"command": ["/bin/bash"],
"stdin": True,
"tty": True,
}
],
},
}

pod_manifest_json = json.dumps(pod_manifest)

create_pod_cmd = f"echo '{pod_manifest_json}' | kubectl apply -f -"
create_result = subprocess.run(create_pod_cmd, shell=True, capture_output=True, text=True)

if create_result.returncode == 0:
logger.info(f"pod '{pod_name}' created in namespace '{namespace}'.")
else:
logger.error(f"Failed to create pod: {create_result.stderr}")
else:
logger.error(f"Failed to check pod existence: {result.stderr}")


def is_pod_ready(namespace, pod_name):
"""Checks that the pod is ready."""
get_pod_cmd = f"kubectl get pod {pod_name} -n {namespace} -o json"
result = subprocess.run(get_pod_cmd, shell=True, capture_output=True, text=True)
logger.info(f"Checking pod {pod_name} is ready...")
if result.returncode != 0:
return False

pod_info = json.loads(result.stdout)
for condition in pod_info["status"].get("conditions", []):
if condition["type"] == "Ready" and condition["status"] == "True":
return True
return False


@retry(
stop=stop_after_attempt(5),
wait=wait_fixed(30),
reraise=True,
)
def get_password_using_subprocess(ops_test: OpsTest, username="operator") -> str:
"""Use the charm action to retrieve the password from provided unit.
Returns:
String with the password stored on the peer relation databag.
"""
cmd = ["juju", "switch", ops_test.model_name]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
logger.error(
"Failed to get password. Can't switch to juju model: '%s'. Error '%s'",
ops_test.model_name,
result.stderr,
)
raise Exception(f"Failed to get password: {result.stderr}")
cmd = ["juju", "run", f"{APP_NAME}/leader", "get-password", f"username={username}"]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
logger.error("get-password command returned non 0 exit code: %s", result.stderr)
raise Exception(f"get-password command returned non 0 exit code: {result.stderr}")
try:
password = result.stdout.decode("utf-8").split("password:")[-1].strip()
except Exception as e:
logger.error("Failed to get password: %s", e)
raise Exception(f"Failed to get password: {e}")
return password

0 comments on commit 8e8cec5

Please sign in to comment.