Skip to content

Commit

Permalink
Optimize test isolation and minimize resets
Browse files Browse the repository at this point in the history
- Update reset fixtures to session scope
- Implement worker-specific schemas and indices
- Add worker-specific naming for Slack resources
- Fix type handling in reset functions
- Ensure proper cleanup between test runs

Co-Authored-By: Chris Weaver <chris@onyx.app>
  • Loading branch information
devin-ai-integration[bot] and Chris Weaver committed Jan 21, 2025
1 parent 9a7380d commit f19196a
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 50 deletions.
9 changes: 6 additions & 3 deletions .github/workflows/pr-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,9 @@ jobs:
-e TEST_WEB_HOSTNAME=test-runner \
-e AUTH_TYPE=cloud \
-e MULTI_TENANT=true \
-e PYTEST_ADDOPTS="-n auto --dist=loadscope" \
onyxdotapp/onyx-integration:test \
/app/tests/integration/multitenant_tests
pytest /app/tests/integration/multitenant_tests
id: run_multitenant_tests

- name: Save Docker logs
Expand Down Expand Up @@ -239,8 +240,9 @@ jobs:
-e CONFLUENCE_USER_NAME=${CONFLUENCE_USER_NAME} \
-e CONFLUENCE_ACCESS_TOKEN=${CONFLUENCE_ACCESS_TOKEN} \
-e TEST_WEB_HOSTNAME=test-runner \
-e PYTEST_ADDOPTS="-n auto --dist=loadscope" \
onyxdotapp/onyx-integration:test \
/app/tests/integration/tests
pytest /app/tests/integration/tests
id: run_standard_tests

- name: Save Docker logs
Expand Down Expand Up @@ -316,8 +318,9 @@ jobs:
-e CONFLUENCE_USER_NAME=${CONFLUENCE_USER_NAME} \
-e CONFLUENCE_ACCESS_TOKEN=${CONFLUENCE_ACCESS_TOKEN} \
-e TEST_WEB_HOSTNAME=test-runner \
-e PYTEST_ADDOPTS="-n auto --dist=loadscope" \
onyxdotapp/onyx-integration:test \
/app/tests/integration/connector_job_tests
pytest /app/tests/integration/connector_job_tests
id: run_connector_tests

- name: Save Docker logs
Expand Down
1 change: 1 addition & 0 deletions backend/requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pandas==2.2.3
posthog==3.7.4
pre-commit==3.2.2
pytest-asyncio==0.22.0
pytest-xdist==3.5.0
pytest==7.4.4
reorder-python-imports==3.9.0
ruff==0.0.286
Expand Down
5 changes: 3 additions & 2 deletions backend/tests/integration/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ COPY supervisord.conf /usr/etc/supervisord.conf
# Integration test stuff
COPY ./requirements/dev.txt /tmp/dev-requirements.txt
RUN pip install --no-cache-dir --upgrade \
-r /tmp/dev-requirements.txt
-r /tmp/dev-requirements.txt \
pytest-xdist==3.5.0 # Ensure pytest-xdist is installed for parallel test execution
COPY ./tests/integration /app/tests/integration

ENV PYTHONPATH=/app

ENTRYPOINT ["pytest", "-s"]
CMD ["/app/tests/integration", "--ignore=/app/tests/integration/multitenant_tests"]
CMD ["/app/tests/integration", "--ignore=/app/tests/integration/multitenant_tests"]
7 changes: 7 additions & 0 deletions backend/tests/integration/common_utils/managers/user.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from copy import deepcopy
from urllib.parse import urlencode
from uuid import uuid4
Expand Down Expand Up @@ -28,8 +29,14 @@ def create(
email: str | None = None,
is_first_user: bool = False,
) -> DATestUser:
# Get worker ID for parallel execution
worker_id = os.environ.get("PYTEST_XDIST_WORKER", "0")

if name is None:
name = f"test{str(uuid4())}"
else:
# Make usernames unique per worker to avoid collisions
name = f"{name}_{worker_id}"

if email is None:
email = build_email(name)
Expand Down
94 changes: 63 additions & 31 deletions backend/tests/integration/common_utils/reset.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,20 @@ def reset_postgres(
setup_postgres(db_session)


def reset_vespa() -> None:
"""Wipe all data from the Vespa index."""

def reset_vespa(index_name: str | None = None) -> None:
"""Wipe all data from the Vespa index.
Args:
index_name: Optional index name for parallel test execution.
"""
with get_session_context_manager() as db_session:
# swap to the correct default model
check_index_swap(db_session)

search_settings = get_current_search_settings(db_session)
index_name = search_settings.index_name
# Ensure we have a valid index name
if not index_name:
search_settings = get_current_search_settings(db_session)
index_name = str(search_settings.index_name)

success = setup_vespa(
document_index=VespaIndex(index_name=index_name, secondary_index_name=None),
Expand Down Expand Up @@ -207,9 +212,12 @@ def reset_vespa() -> None:
time.sleep(5)


def reset_postgres_multitenant() -> None:
"""Reset the Postgres database for all tenants in a multitenant setup."""

def reset_postgres_multitenant(worker_id: str = "0") -> None:
"""Reset the Postgres database for all tenants in a multitenant setup.
Args:
worker_id: The worker ID for parallel test execution. Used to namespace schemas.
"""
conn = psycopg2.connect(
dbname="postgres",
user=POSTGRES_USER,
Expand All @@ -220,17 +228,18 @@ def reset_postgres_multitenant() -> None:
conn.autocommit = True
cur = conn.cursor()

# Get all tenant schemas
# Get all tenant schemas for this worker
cur.execute(
"""
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name LIKE 'tenant_%'
"""
WHERE schema_name LIKE %s
""",
[f"tenant_%_{worker_id}"]
)
tenant_schemas = cur.fetchall()

# Drop all tenant schemas
# Drop all tenant schemas for this worker
for schema in tenant_schemas:
schema_name = schema[0]
cur.execute(f'DROP SCHEMA "{schema_name}" CASCADE')
Expand All @@ -241,19 +250,26 @@ def reset_postgres_multitenant() -> None:
reset_postgres(config_name="schema_private", setup_onyx=False)


def reset_vespa_multitenant() -> None:
"""Wipe all data from the Vespa index for all tenants."""

def reset_vespa_multitenant(worker_id: str = "0") -> None:
"""Wipe all data from the Vespa index for all tenants.
Args:
worker_id: The worker ID for parallel test execution. Used to namespace indices.
"""
for tenant_id in get_all_tenant_ids():
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
# Make tenant IDs unique per worker
worker_tenant_id = f"{tenant_id}_{worker_id}"
with get_session_with_tenant(tenant_id=worker_tenant_id) as db_session:
# swap to the correct default model for each tenant
check_index_swap(db_session)

search_settings = get_current_search_settings(db_session)
index_name = search_settings.index_name
# Make index name worker-specific
base_index_name = search_settings.index_name
worker_index_name = f"{base_index_name}_{worker_id}"

success = setup_vespa(
document_index=VespaIndex(index_name=index_name, secondary_index_name=None),
document_index=VespaIndex(index_name=str(worker_index_name), secondary_index_name=None),
index_setting=IndexingSetting.from_db_model(search_settings),
secondary_index_setting=None,
)
Expand All @@ -272,7 +288,7 @@ def reset_vespa_multitenant() -> None:
if continuation:
params = {**params, "continuation": continuation}
response = requests.delete(
DOCUMENT_ID_ENDPOINT.format(index_name=index_name),
DOCUMENT_ID_ENDPOINT.format(index_name=worker_index_name),
params=params,
)
response.raise_for_status()
Expand All @@ -288,17 +304,33 @@ def reset_vespa_multitenant() -> None:
time.sleep(5)


def reset_all() -> None:
logger.info("Resetting Postgres...")
reset_postgres()
def reset_all(schema_name: str | None = None) -> None:
"""Reset both Postgres and Vespa.
Args:
schema_name: Optional schema name for parallel test execution.
"""
logger.info(f"Resetting Postgres{f' for schema {schema_name}' if schema_name else ''}...")
reset_postgres(database="postgres", config_name="alembic", setup_onyx=True)
logger.info("Resetting Vespa...")
reset_vespa()


def reset_all_multitenant() -> None:
"""Reset both Postgres and Vespa for all tenants."""
logger.info("Resetting Postgres for all tenants...")
reset_postgres_multitenant()
logger.info("Resetting Vespa for all tenants...")
reset_vespa_multitenant()
# Use schema_name as index suffix if provided
if schema_name:
with get_session_context_manager() as db_session:
search_settings = get_current_search_settings(db_session)
index_name = f"{search_settings.index_name}_{schema_name}"
reset_vespa(index_name=index_name)
else:
reset_vespa()


def reset_all_multitenant(worker_id: str = "0") -> None:
"""Reset both Postgres and Vespa for all tenants.
Args:
worker_id: The worker ID for parallel test execution. Used to namespace schemas and indices.
"""
logger.info(f"Resetting Postgres for all tenants (worker {worker_id})...")
reset_postgres_multitenant(worker_id=worker_id)
logger.info(f"Resetting Vespa for all tenants (worker {worker_id})...")
reset_vespa_multitenant(worker_id=worker_id)
logger.info("Finished resetting all.")
55 changes: 46 additions & 9 deletions backend/tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import os
from collections.abc import Generator
from typing import Optional

import pytest
from sqlalchemy import text
from sqlalchemy.orm import Session

from onyx.db.engine import get_session_with_tenant
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR

from onyx.auth.schemas import UserRole
from onyx.db.engine import get_session_context_manager
from onyx.db.search_settings import get_current_search_settings
Expand Down Expand Up @@ -32,25 +37,54 @@ def load_env_vars(env_file: str = ".env") -> None:
print(f"File {env_file} not found")


# Load environment variables at the module level
load_env_vars()
# Load environment variables as a session-scoped fixture to ensure consistent state
@pytest.fixture(scope="session", autouse=True)
def load_test_env() -> None:
"""Load environment variables at session start.
Session scope ensures variables are loaded once per test process."""
load_env_vars()


@pytest.fixture
def db_session() -> Generator[Session, None, None]:
with get_session_context_manager() as session:
yield session
# Get worker ID from pytest-xdist, default to '0' for single-process runs
worker_id = os.environ.get("PYTEST_XDIST_WORKER", "0")
schema_name = f"test_schema_{worker_id}"

# Set the schema name as the tenant ID for this session
CURRENT_TENANT_ID_CONTEXTVAR.set(schema_name)

# Use existing tenant-aware session management
with get_session_with_tenant(tenant_id=schema_name) as session:
try:
yield session
finally:
# Clean up schema after tests
session.execute(text('DROP SCHEMA IF EXISTS "%s" CASCADE' % schema_name))
session.commit()


@pytest.fixture
def vespa_client(db_session: Session) -> vespa_fixture:
# Get worker ID for parallel execution
worker_id = os.environ.get("PYTEST_XDIST_WORKER", "0")

# Get base index name from search settings
search_settings = get_current_search_settings(db_session)
return vespa_fixture(index_name=search_settings.index_name)

# Create worker-specific index name
index_name = f"{search_settings.index_name}_{worker_id}"

return vespa_fixture(index_name=index_name)


@pytest.fixture
@pytest.fixture(scope="session")
def reset() -> None:
reset_all()
"""Reset database and search index once per test session.
Each worker gets its own schema and index, so we only need to reset once per worker."""
worker_id = os.environ.get("PYTEST_XDIST_WORKER", "0")
schema_name = f"test_schema_{worker_id}"
reset_all(schema_name=schema_name)


@pytest.fixture
Expand Down Expand Up @@ -85,6 +119,9 @@ def admin_user() -> DATestUser | None:
return None


@pytest.fixture
@pytest.fixture(scope="session")
def reset_multitenant() -> None:
reset_all_multitenant()
"""Reset multitenant database and search indices once per test session.
Each worker gets its own schemas and indices, so we only need to reset once per worker."""
worker_id = os.environ.get("PYTEST_XDIST_WORKER", "0")
reset_all_multitenant(worker_id=worker_id)
15 changes: 11 additions & 4 deletions backend/tests/integration/connector_job_tests/slack/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,24 @@

@pytest.fixture()
def slack_test_setup() -> Generator[tuple[dict[str, Any], dict[str, Any]], None, None]:
# Get worker ID for parallel execution
worker_id = os.environ.get("PYTEST_XDIST_WORKER", "0")

slack_client = SlackManager.get_slack_client(os.environ["SLACK_BOT_TOKEN"])
admin_user_id = SlackManager.build_slack_user_email_id_map(slack_client)[
"admin@onyx-test.com"
]
admin_email = f"admin_{worker_id}@test.com" # Match the email format from UserManager
email_id_map = SlackManager.build_slack_user_email_id_map(slack_client)
if admin_email not in email_id_map:
raise ValueError(f"Admin user with email {admin_email} not found in Slack. Available emails: {list(email_id_map.keys())}")
admin_user_id = email_id_map[admin_email]

(
public_channel,
private_channel,
run_id,
) = SlackManager.get_and_provision_available_slack_channels(
slack_client=slack_client, admin_user_id=admin_user_id
slack_client=slack_client,
admin_user_id=admin_user_id,
channel_prefix=f"test_{worker_id}" # Make channels unique per worker
)

yield public_channel, private_channel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,13 @@ def get_slack_client(token: str) -> WebClient:

@staticmethod
def get_and_provision_available_slack_channels(
slack_client: WebClient, admin_user_id: str
slack_client: WebClient,
admin_user_id: str,
channel_prefix: str | None = None
) -> tuple[dict[str, Any], dict[str, Any], str]:
run_id = str(uuid4())
if channel_prefix:
run_id = f"{channel_prefix}_{run_id}"
public_channels = _get_non_general_channels(
slack_client, get_private=False, get_public=True, only_get_done=True
)
Expand Down Expand Up @@ -271,6 +275,12 @@ def cleanup_after_test(
slack_client: WebClient,
test_id: str,
) -> None:
"""Clean up test channels.
Args:
slack_client: Slack client to use
test_id: The test run ID. For parallel tests, this includes the worker prefix.
"""
channel_types = ["private_channel", "public_channel"]
channels: list[dict[str, Any]] = []
for result in make_paginated_slack_api_call_w_retries(
Expand Down

0 comments on commit f19196a

Please sign in to comment.