Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Admin API delete room v2 actions to be run on worker #17904

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17904.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow Admin API delete room v2 actions to be run on workers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complement test for this is at matrix-org/complement@main...H-Shay:complement:shay/admin_tests. I didn't open a PR with this as it is unclear to me if this test belongs in the Complement repo - opinions welcome. FWIW since I have to push to a fork the test here isn't run against this PR anyway, although I can say it runs locally.

If we were to stick with Complement, probably belongs in the Synapse repo but would require some boilerplate to get it working.

But since it's a Synapse specific thing, we probably prefer to just write a Twisted test instead (I'm no dictator on this so feel free to ask in the Synapse rooms what people think). If we want it at the end-to-end level, probably in tests/rest/admin with some worker override config

3 changes: 2 additions & 1 deletion docker/complement/conf/start_for_complement.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
client_reader, \
appservice, \
pusher, \
stream_writers=account_data+presence+receipts+to_device+typing"
stream_writers=account_data+presence+receipts+to_device+typing, \
admin"

fi
log "Workers requested: $SYNAPSE_WORKER_TYPES"
Expand Down
9 changes: 9 additions & 0 deletions docker/configure_workers_and_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@
},
"worker_extra_conf": "enable_media_repo: true",
},
"admin": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we need a comment above for WORKERS_CONFIG alongside the others

Watching /_synapse/admin needs a "admin" listener

"app": "synapse.app.generic_worker",
"listener_resources": ["replication", "admin"],
Copy link
Contributor

@MadLittleMods MadLittleMods Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one question I had was whether a worker handling these admin requests needs a replication listener - since the code requires invalidating the cache and streaming, I guessed that the answer is yes, but would like to know if that is correct.

I'm not confident but I think it may not need "replication" 🤔

The admin API's probably use the various handlers which have the logic built-in to make the write or send it off over replication to the appropriate worker.

For example, the /_synapse/admin/v1/join/<room_id_or_alias> endpoint uses the normal event creation flow that hands it off to the event_persisters which has "replication".

When it invalidates caches, it will _send_invalidation_to_replication(...) and the stores mixin CacheInvalidationWorkerStore to bust their cache locally when they receive those messages.


I'm not sure if we're completely studious about this as looking at the device creation code, it seems a bit sketchy (check_device_registered(...)) but must be fine because /_matrix/client/v3/register is normally handled on a cleint_reader worker without "replication".

In any case, I think the bottom line is any code that isn't doing the "right" thing and asking the appropriate worker to make the "write" should be updated to do so. I haven't deep-dived into whether we have any of those problems now.

"endpoint_patterns": ["^/_synapse/admin/v2/rooms/.*$"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"appservice": {
"app": "synapse.app.generic_worker",
"listener_resources": [],
Expand Down Expand Up @@ -574,6 +581,7 @@ def is_sharding_allowed_for_worker_type(worker_type: str) -> bool:
"receipts",
"typing",
"to_device",
"admin",
]


Expand Down Expand Up @@ -1076,6 +1084,7 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
# Split type names by comma, ignoring whitespace.
worker_types = split_and_strip_string(worker_types_env, ",")
requested_worker_types = parse_worker_types(worker_types)
log(f"requested_worker_types {requested_worker_types}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove debug log?


# Always regenerate all other config files
log("Generating worker config files")
Expand Down
2 changes: 2 additions & 0 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,8 @@ Valid resource names are:
for [workers](../../workers.md) and containers without listener e.g.
[application services](../../workers.md#notifying-application-services).

* `admin`: the admin API (/_synapse/admin)

Example configuration #1:
```yaml
listeners:
Expand Down
10 changes: 10 additions & 0 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,16 @@ with `client` and `federation` `resources` must be configured in the
[`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners)
option in the worker config.

The following admin APIs are now available to be handled by workers, with more forthcoming:

# Admin APIs
"^/_synapse/admin/v2/rooms/.*$"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This regex currently matches anything after /_synapse/admin/v2/rooms/

But I think we want the following explicit list based on what synapse/rest/admin/__init__.py -> register_servlets looks like:

  • /rooms/<room_id>
  • /rooms/delete_status/<delete_id>
  • /rooms/<room_id>/delete_status

For example, we don't support the BlockRoomRestServlet at /rooms/<room_id>/block yet.


I guess this also applies to the endpoint_patterns in docker/configure_workers_and_start.py but perhaps we can re-use the PATTERNS of each servlet?


Note that a [HTTP listener](usage/configuration/config_documentation.md#listeners)
with `admin` and `replication `resources` must be configured in the
[`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners)
option in the worker config.

#### Load balancing

It is possible to run multiple instances of this worker app, with incoming requests
Expand Down
1 change: 1 addition & 0 deletions synapse/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
MEDIA_R0_PREFIX = "/_matrix/media/r0"
MEDIA_V3_PREFIX = "/_matrix/media/v3"
LEGACY_MEDIA_PREFIX = "/_matrix/media/v1"
ADMIN_PREFIX = "/_synapse/admin"


class ConsentURIBuilder:
Expand Down
8 changes: 6 additions & 2 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import synapse
import synapse.events
from synapse.api.urls import (
ADMIN_PREFIX,
CLIENT_API_PREFIX,
FEDERATION_PREFIX,
LEGACY_MEDIA_PREFIX,
Expand All @@ -52,7 +53,7 @@
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.rest import ClientRestResource
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.admin import AdminRestResource, register_servlets_for_media_repo
from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyResource
from synapse.rest.synapse.client import build_synapse_client_resource_tree
Expand Down Expand Up @@ -207,7 +208,7 @@ def _listen_http(self, listener_config: ListenerConfig) -> None:
MEDIA_R0_PREFIX: media_repo,
MEDIA_V3_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
"/_synapse/admin": admin_resource,
ADMIN_PREFIX: admin_resource,
}
)

Expand Down Expand Up @@ -248,6 +249,9 @@ def _listen_http(self, listener_config: ListenerConfig) -> None:
if name == "replication":
resources[REPLICATION_PREFIX] = ReplicationRestResource(self)

if name == "admin":
resources[ADMIN_PREFIX] = AdminRestResource(self)

# Attach additional resources registered by modules.
resources.update(self._module_web_resources)
self._module_web_resources_consumed = True
Expand Down
1 change: 1 addition & 0 deletions synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def generate_ip_set(
"openid",
"replication",
"static",
"admin",
}


Expand Down
5 changes: 4 additions & 1 deletion synapse/rest/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,11 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
"""
Register all the admin servlets.
"""
# Admin servlets aren't registered on workers.
# Only handle certain endpoints on workers.
if hs.config.worker.worker_app is not None:
RoomRestV2Servlet(hs).register(http_server)
DeleteRoomStatusByDeleteIdRestServlet(hs).register(http_server)
DeleteRoomStatusByRoomIdRestServlet(hs).register(http_server)
return

register_servlets_for_client_rest_resource(hs, http_server)
Expand Down
52 changes: 26 additions & 26 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,32 @@ def process_replication_position(
self._un_partial_stated_rooms_stream_id_gen.advance(instance_name, token)
return super().process_replication_position(stream_name, instance_name, token)

async def block_room(self, room_id: str, user_id: str) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this was moved but BlockRoomRestServlet isn't used on a worker yet. Just some leftover work from trying it out before? (fine to keep, just clarifying)

"""Marks the room as blocked.

Can be called multiple times (though we'll only track the last user to
block this room).

Can be called on a room unknown to this homeserver.

Args:
room_id: Room to block
user_id: Who blocked it
"""
await self.db_pool.simple_upsert(
table="blocked_rooms",
keyvalues={"room_id": room_id},
values={},
insertion_values={"user_id": user_id},
desc="block_room",
)
await self.db_pool.runInteraction(
"block_room_invalidation",
self._invalidate_cache_and_stream,
self.is_room_blocked,
(room_id,),
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not super clear from the diff but this was moved from the RoomStore to the RoomWorkerStore

async def store_room(
self,
room_id: str,
Expand Down Expand Up @@ -2493,32 +2519,6 @@ async def add_room_report(
)
return next_id

async def block_room(self, room_id: str, user_id: str) -> None:
"""Marks the room as blocked.

Can be called multiple times (though we'll only track the last user to
block this room).

Can be called on a room unknown to this homeserver.

Args:
room_id: Room to block
user_id: Who blocked it
"""
await self.db_pool.simple_upsert(
table="blocked_rooms",
keyvalues={"room_id": room_id},
values={},
insertion_values={"user_id": user_id},
desc="block_room",
)
await self.db_pool.runInteraction(
"block_room_invalidation",
self._invalidate_cache_and_stream,
self.is_room_blocked,
(room_id,),
)

async def unblock_room(self, room_id: str) -> None:
"""Remove the room from blocking list.

Expand Down
Loading