Skip to content

Commit

Permalink
Merge pull request #192 from Aiven-Open/khatskevich-materialized-views
Browse files Browse the repository at this point in the history
Properly support materialized views restoration
  • Loading branch information
kmichel-aiven authored Mar 21, 2024
2 parents 2712e4f + a8ebc86 commit 7a4b1ae
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 47 deletions.
49 changes: 35 additions & 14 deletions astacus/coordinator/plugins/clickhouse/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,37 @@
"""
from astacus.coordinator.plugins.clickhouse.manifest import AccessEntity, Table
from collections.abc import Sequence
from typing import Callable, Hashable, TypeVar

# noinspection PyCompatibility
import graphlib
import re
import uuid

Node = TypeVar("Node")
NodeKey = TypeVar("NodeKey", bound=Hashable)


def sort_topologically(
nodes: Sequence[Node],
get_key: Callable[[Node], NodeKey],
get_dependencies: Callable[[Node], Sequence[NodeKey]] = lambda x: [],
get_dependants: Callable[[Node], Sequence[NodeKey]] = lambda x: [],
) -> list[Node]:
"""
Sort elements topologically based on their dependencies.
"""
sorter = graphlib.TopologicalSorter() # type: ignore
for element in nodes:
element_key = get_key(element)
sorter.add(element_key)
for dependency in get_dependencies(element):
sorter.add(element_key, dependency)
for dependency in get_dependants(element):
sorter.add(dependency, element_key)
sort_order = list(sorter.static_order())
return sorted(nodes, key=lambda element: sort_order.index(get_key(element)))


def tables_sorted_by_dependencies(tables: Sequence[Table]) -> Sequence[Table]:
"""
Expand All @@ -22,13 +47,9 @@ def tables_sorted_by_dependencies(tables: Sequence[Table]) -> Sequence[Table]:
The `dependencies` attribute of each table must contain the list of
`(database_name: str, table_name: str)` that depend on this table.
"""
sorter = graphlib.TopologicalSorter() # type: ignore
for table in tables:
sorter.add((table.database, table.name))
for dependency in table.dependencies:
sorter.add(dependency, (table.database, table.name))
sort_order = list(sorter.static_order())
return sorted(tables, key=lambda t: sort_order.index((t.database, t.name)))
return sort_topologically(
tables, get_key=lambda table: (table.database, table.name), get_dependants=lambda table: table.dependencies
)


def access_entities_sorted_by_dependencies(access_entities: Sequence[AccessEntity]) -> Sequence[AccessEntity]:
Expand All @@ -42,15 +63,15 @@ def access_entities_sorted_by_dependencies(access_entities: Sequence[AccessEntit
roles can depend on other roles. This forces us to use a real topological sort to
determine the creation order.
"""
sorter = graphlib.TopologicalSorter() # type: ignore
# Unlike tables, ClickHouse does not provide a list of dependencies between entities.
# This means we need to parse the `attach_query` of the entity to find the uuid of
# other entities. This is unpleasant, but the quoting format of entity names and entity
# uuids is different enough to not risk false matches.
clickhouse_id = re.compile(rb"ID\('([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})'\)")
for entity in access_entities:
sorter.add(entity.uuid)
for uuid_bytes in clickhouse_id.findall(entity.attach_query):
sorter.add(entity.uuid, uuid.UUID(uuid_bytes.decode()))
sort_order = list(sorter.static_order())
return sorted(access_entities, key=lambda e: sort_order.index(e.uuid))
return sort_topologically(
access_entities,
get_key=lambda entity: entity.uuid,
get_dependencies=lambda entity: [
uuid.UUID(uuid_bytes.decode()) for uuid_bytes in clickhouse_id.findall(entity.attach_query)
],
)
13 changes: 13 additions & 0 deletions astacus/coordinator/plugins/clickhouse/engines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""

import enum


# This enum contains only used constants
class TableEngine(enum.Enum):
MySQL = "MySQL"
PostgreSQL = "PostgreSQL"
S3 = "S3"
16 changes: 10 additions & 6 deletions astacus/coordinator/plugins/clickhouse/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import dataclasses
import logging
import msgspec
import re
import secrets
import uuid

Expand Down Expand Up @@ -432,6 +433,12 @@ async def run_on_every_node(
await asyncio.gather(*[gather_limited(per_node_concurrency_limit, fn(client)) for client in clients])


def get_restore_table_query(table: Table) -> bytes:
# Use `ATTACH` instead of `CREATE` for materialized views for
# proper restore in case of `SELECT` table absence
return re.sub(b"^CREATE MATERIALIZED VIEW", b"ATTACH MATERIALIZED VIEW", table.create_query)


@dataclasses.dataclass
class RestoreReplicatedDatabasesStep(Step[None]):
"""
Expand Down Expand Up @@ -534,12 +541,9 @@ def _create_dbs(client: ClickHouseClient) -> Iterator[Awaitable[None]]:
# If any known table depends on an unknown table that was inside a non-replicated
# database engine, then this will crash. See comment in `RetrieveReplicatedDatabasesStep`.
for table in tables_sorted_by_dependencies(manifest.tables):
# Materialized views creates both a table for the view itself and a table
# with the .inner_id. prefix to store the data, we don't need to recreate
# them manually. We will need to restore their data parts however.
if not table.name.startswith(b".inner_id."):
# Create on the first client and let replication do its thing
await self.clients[0].execute(table.create_query, session_id=session_id)
restore_table_query = get_restore_table_query(table)
# Create on the first client and let replication do its thing
await self.clients[0].execute(restore_table_query, session_id=session_id)


DatabasesReplicas = Mapping[bytes, Sequence[DatabaseReplica]]
Expand Down
36 changes: 33 additions & 3 deletions tests/integration/coordinator/plugins/clickhouse/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ async def fixture_minio_bucket(minio: MinioService) -> AsyncIterator[MinioBucket
yield bucket


@pytest.fixture(scope="function", name="function_minio_bucket")
async def fixture_function_minio_bucket(minio: MinioService) -> AsyncIterator[MinioBucket]:
with minio.bucket(bucket_name="function-clickhouse-bucket") as bucket:
yield bucket


@contextlib.asynccontextmanager
async def create_minio_service(ports: Ports) -> AsyncIterator[MinioService]:
server_port = ports.allocate()
Expand Down Expand Up @@ -337,6 +343,10 @@ def create_clickhouse_configs(
minio_bucket: MinioBucket | None = None,
object_storage_prefix: str = "/",
):
# Helper for emitting XML configuration: avoid very long lines and deduplicate
def setting(name: str, value: int | float | str):
return f"<{name}>{value}</{name}>"

replicas = "\n".join(
f"""
<replica>
Expand Down Expand Up @@ -409,9 +419,29 @@ def create_clickhouse_configs(
<port>{zookeeper.port}</port>
</node>
</zookeeper>
<mark_cache_size>5368709120</mark_cache_size>
<max_server_memory_usage_to_ram_ratio>0.5</max_server_memory_usage_to_ram_ratio>
<enable_system_unfreeze>true</enable_system_unfreeze>
<merge_tree>
{setting("number_of_free_entries_in_pool_to_execute_mutation", 2)}
{setting("number_of_free_entries_in_pool_to_execute_optimize_entire_partition", 2)}
</merge_tree>
{setting("background_pool_size", 4)}
{setting("background_move_pool_size", 2)}
{setting("background_fetches_pool_size", 2)}
{setting("background_common_pool_size", 4)}
{setting("background_buffer_flush_schedule_pool_size", 2)}
{setting("background_schedule_pool_size", 2)}
{setting("background_message_broker_schedule_pool_size", 2)}
{setting("background_distributed_schedule_pool_size", 2)}
{setting("tables_loader_foreground_pool_size", 2)}
{setting("tables_loader_background_pool_size", 2)}
{setting("restore_threads", 2)}
{setting("backup_threads", 2)}
{setting("backups_io_thread_pool_queue_size", 2)}
{setting("max_parts_cleaning_thread_pool_size", 2)}
{setting("max_active_parts_loading_thread_pool_size", 2)}
{setting("max_outdated_parts_loading_thread_pool_size", 2)}
{setting("mark_cache_size", 5368709120)}
{setting("max_server_memory_usage_to_ram_ratio", 0.5)}
{setting("enable_system_unfreeze", "true")}
<user_directories>
<users_xml>
<path>{str(data_dir / "users.xml")}</path>
Expand Down
Loading

0 comments on commit 7a4b1ae

Please sign in to comment.