Skip to content

Commit

Permalink
Sliding sync: use new DB tables (#17630)
Browse files Browse the repository at this point in the history
Based on #17629

Utilizing the new sliding sync tables added in
#17512 for fast acquisition of
rooms for the user and filtering/sorting.

---------

Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
  • Loading branch information
erikjohnston and MadLittleMods authored Sep 1, 2024
1 parent 560b43a commit 709b736
Show file tree
Hide file tree
Showing 21 changed files with 877 additions and 18 deletions.
1 change: 1 addition & 0 deletions changelog.d/17630.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use new database tables for sliding sync.
562 changes: 549 additions & 13 deletions synapse/handlers/sliding_sync/room_lists.py

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ def _invalidate_state_caches(
self._attempt_to_invalidate_cache(
"_get_rooms_for_local_user_where_membership_is_inner", (user_id,)
)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", (user_id,)
)

# Purge other caches based on room state.
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
Expand Down Expand Up @@ -160,6 +163,7 @@ def _invalidate_state_caches_all(self, room_id: str) -> None:
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_room_type", (room_id,))
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)

def _attempt_to_invalidate_cache(
self, cache_name: str, key: Optional[Collection[Any]]
Expand Down
21 changes: 20 additions & 1 deletion synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Connection, Cursor
from synapse.types import JsonDict
from synapse.types import JsonDict, StrCollection
from synapse.util import Clock, json_encoder

from . import engines
Expand Down Expand Up @@ -487,6 +487,25 @@ async def has_completed_background_update(self, update_name: str) -> bool:

return not update_exists

async def have_completed_background_updates(
self, update_names: StrCollection
) -> bool:
"""Return the name of background updates that have not yet been
completed"""
if self._all_done:
return True

rows = await self.db_pool.simple_select_many_batch(
table="background_updates",
column="update_name",
iterable=update_names,
retcols=("update_name",),
desc="get_uncompleted_background_updates",
)

# If we find any rows then we've not completed the update.
return not bool(rows)

async def do_next_background_update(self, sleep: bool = True) -> bool:
"""Does some amount of work on the next queued background update
Expand Down
4 changes: 4 additions & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,9 @@ def _invalidate_caches_for_event(
self._attempt_to_invalidate_cache(
"_get_rooms_for_local_user_where_membership_is_inner", (state_key,)
)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", (state_key,)
)

self._attempt_to_invalidate_cache(
"did_forget",
Expand Down Expand Up @@ -417,6 +420,7 @@ def _invalidate_caches_for_room_events(self, room_id: str) -> None:
self._attempt_to_invalidate_cache(
"_get_rooms_for_local_user_where_membership_is_inner", None
)
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_references_for_event", None)
Expand Down
11 changes: 11 additions & 0 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -2342,6 +2342,17 @@ def _fill_table_txn(txn: LoggingTransaction) -> None:

return len(memberships_to_update_rows)

async def have_finished_sliding_sync_background_jobs(self) -> bool:
"""Return if its safe to use the sliding sync membership tables."""

return await self.db_pool.updates.have_completed_background_updates(
(
_BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
_BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
)
)


def _resolve_stale_data_in_sliding_sync_tables(
txn: LoggingTransaction,
Expand Down
55 changes: 54 additions & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import Sqlite3Engine
from synapse.storage.roommember import MemberSummary, ProfileInfo, RoomsForUser
from synapse.storage.roommember import (
MemberSummary,
ProfileInfo,
RoomsForUser,
RoomsForUserSlidingSync,
)
from synapse.types import (
JsonDict,
PersistedEventPosition,
Expand Down Expand Up @@ -1377,6 +1382,54 @@ async def update_room_forgetter_stream_pos(self, stream_id: int) -> None:
desc="room_forgetter_stream_pos",
)

@cached(iterable=True, max_entries=10000)
async def get_sliding_sync_rooms_for_user(
self,
user_id: str,
) -> Mapping[str, RoomsForUserSlidingSync]:
"""Get all the rooms for a user to handle a sliding sync request.
Ignores forgotten rooms and rooms that the user has been kicked from.
Returns:
Map from room ID to membership info
"""

def get_sliding_sync_rooms_for_user_txn(
txn: LoggingTransaction,
) -> Dict[str, RoomsForUserSlidingSync]:
sql = """
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
r.room_version,
m.event_instance_name, m.event_stream_ordering,
COALESCE(j.room_type, m.room_type),
COALESCE(j.is_encrypted, m.is_encrypted)
FROM sliding_sync_membership_snapshots AS m
INNER JOIN rooms AS r USING (room_id)
LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join')
WHERE user_id = ?
AND m.forgotten = 0
"""
txn.execute(sql, (user_id,))
return {
row[0]: RoomsForUserSlidingSync(
room_id=row[0],
sender=row[1],
membership=row[2],
event_id=row[3],
room_version_id=row[4],
event_pos=PersistedEventPosition(row[5], row[6]),
room_type=row[7],
is_encrypted=row[8],
)
for row in txn
}

return await self.db_pool.runInteraction(
"get_sliding_sync_rooms_for_user",
get_sliding_sync_rooms_for_user_txn,
)


class RoomMemberBackgroundUpdateStore(SQLBaseStore):
def __init__(
Expand Down
13 changes: 13 additions & 0 deletions synapse/storage/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ class RoomsForUser:
room_version_id: str


@attr.s(slots=True, frozen=True, weakref_slot=False, auto_attribs=True)
class RoomsForUserSlidingSync:
room_id: str
sender: Optional[str]
membership: str
event_id: Optional[str]
event_pos: PersistedEventPosition
room_version_id: str

room_type: Optional[str]
is_encrypted: bool


@attr.s(slots=True, frozen=True, weakref_slot=False, auto_attribs=True)
class GetRoomsForUserWithStreamOrdering:
room_id: str
Expand Down
16 changes: 15 additions & 1 deletion tests/rest/client/sliding_sync/test_connection_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#
import logging

from parameterized import parameterized
from parameterized import parameterized, parameterized_class

from twisted.test.proto_helpers import MemoryReactor

Expand All @@ -28,6 +28,18 @@
logger = logging.getLogger(__name__)


# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
# foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
# https://github.com/element-hq/synapse/issues/17623)
@parameterized_class(
("use_new_tables",),
[
(True,),
(False,),
],
class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}",
)
class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase):
"""
Test connection tracking in the Sliding Sync API.
Expand All @@ -44,6 +56,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.storage_controllers = hs.get_storage_controllers()

super().prepare(reactor, clock, hs)

def test_rooms_required_state_incremental_sync_LIVE(self) -> None:
"""Test that we only get state updates in incremental sync for rooms
we've already seen (LIVE).
Expand Down
16 changes: 16 additions & 0 deletions tests/rest/client/sliding_sync/test_extension_account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#
import logging

from parameterized import parameterized_class

from twisted.test.proto_helpers import MemoryReactor

import synapse.rest.admin
Expand All @@ -28,6 +30,18 @@
logger = logging.getLogger(__name__)


# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
# foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
# https://github.com/element-hq/synapse/issues/17623)
@parameterized_class(
("use_new_tables",),
[
(True,),
(False,),
],
class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}",
)
class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
"""Tests for the account_data sliding sync extension"""

Expand All @@ -43,6 +57,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.account_data_handler = hs.get_account_data_handler()

super().prepare(reactor, clock, hs)

def test_no_data_initial_sync(self) -> None:
"""
Test that enabling the account_data extension works during an intitial sync,
Expand Down
16 changes: 16 additions & 0 deletions tests/rest/client/sliding_sync/test_extension_e2ee.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#
import logging

from parameterized import parameterized_class

from twisted.test.proto_helpers import MemoryReactor

import synapse.rest.admin
Expand All @@ -27,6 +29,18 @@
logger = logging.getLogger(__name__)


# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
# foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
# https://github.com/element-hq/synapse/issues/17623)
@parameterized_class(
("use_new_tables",),
[
(True,),
(False,),
],
class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}",
)
class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase):
"""Tests for the e2ee sliding sync extension"""

Expand All @@ -42,6 +56,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.e2e_keys_handler = hs.get_e2e_keys_handler()

super().prepare(reactor, clock, hs)

def test_no_data_initial_sync(self) -> None:
"""
Test that enabling e2ee extension works during an intitial sync, even if there
Expand Down
16 changes: 16 additions & 0 deletions tests/rest/client/sliding_sync/test_extension_receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#
import logging

from parameterized import parameterized_class

from twisted.test.proto_helpers import MemoryReactor

import synapse.rest.admin
Expand All @@ -28,6 +30,18 @@
logger = logging.getLogger(__name__)


# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
# foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
# https://github.com/element-hq/synapse/issues/17623)
@parameterized_class(
("use_new_tables",),
[
(True,),
(False,),
],
class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}",
)
class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
"""Tests for the receipts sliding sync extension"""

Expand All @@ -42,6 +56,8 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main

super().prepare(reactor, clock, hs)

def test_no_data_initial_sync(self) -> None:
"""
Test that enabling the receipts extension works during an intitial sync,
Expand Down
15 changes: 15 additions & 0 deletions tests/rest/client/sliding_sync/test_extension_to_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import logging
from typing import List

from parameterized import parameterized_class

from twisted.test.proto_helpers import MemoryReactor

import synapse.rest.admin
Expand All @@ -28,6 +30,18 @@
logger = logging.getLogger(__name__)


# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
# foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
# https://github.com/element-hq/synapse/issues/17623)
@parameterized_class(
("use_new_tables",),
[
(True,),
(False,),
],
class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}",
)
class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
"""Tests for the to-device sliding sync extension"""

Expand All @@ -40,6 +54,7 @@ class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):

def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
super().prepare(reactor, clock, hs)

def _assert_to_device_response(
self, response_body: JsonDict, expected_messages: List[JsonDict]
Expand Down
Loading

0 comments on commit 709b736

Please sign in to comment.