Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Expose new behaviour in admin API
Browse files Browse the repository at this point in the history
  • Loading branch information
David Robertson committed Nov 14, 2023
1 parent b9f0daf commit cdaecc0
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 2 deletions.
37 changes: 37 additions & 0 deletions docs/admin_api/user_admin_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,43 @@ Note: The token will expire if the *admin* user calls `/logout/all` from any
of their devices, but the token will *not* expire if the target user does the
same.

## Allow replacing master cross-signing key without User-Interactive Auth

This endpoint is not intended for server administrator usage;
we describe it here for completeness.

This API temporarily permits a user to replace their master cross-signing key
without going through
[user-interactive authentication](https://spec.matrix.org/v1.8/client-server-api/#user-interactive-authentication-api) (UIA).
This is useful when Synapse has delegated its authentication to the
[Matrix Authentication Service](https://github.com/matrix-org/matrix-authentication-service/);
as Synapse cannot perform UIA is not possible in these circumstances.

The API is

```http request
POST /_synapse/admin/v1/users/<user_id>/_allow_cross_signing_replacement_without_uia
{}
```

If the user does not exist, or does exist but has no master cross-signing key,
this will return with status code `404 Not Found`.

Otherwise, a response body like the following is returned, with status `200 OK`:

```json
{
"updatable_without_uia_before_ms": 1234567890
}
```

The response body is a JSON object with a single field:

- `updatable_without_uia_before_ms`: integer. The timestamp in milliseconds
before which the user is permitted to replace their cross-signing key without
going through UIA.

_Added in Synapse 1.97.0._

## User devices

Expand Down
2 changes: 2 additions & 0 deletions synapse/rest/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
UsersRestServletV2,
UserTokenRestServlet,
WhoisRestServlet,
UserReplaceMasterCrossSigningKeyRestServlet,
)
from synapse.types import JsonDict, RoomStreamToken, TaskStatus
from synapse.util import SYNAPSE_VERSION
Expand Down Expand Up @@ -292,6 +293,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ListDestinationsRestServlet(hs).register(http_server)
RoomMessagesRestServlet(hs).register(http_server)
RoomTimestampToEventRestServlet(hs).register(http_server)
UserReplaceMasterCrossSigningKeyRestServlet(hs).register(http_server)
UserByExternalId(hs).register(http_server)
UserByThreePid(hs).register(http_server)

Expand Down
40 changes: 40 additions & 0 deletions synapse/rest/admin/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,46 @@ async def on_GET(
}


class UserReplaceMasterCrossSigningKeyRestServlet(RestServlet):
"""Allow a given user to replace their master cross-signing key without UIA.
This replacement is permitted for a limited period (currently 10 minutes).
While this is exposed via the admin API, this is intended for use by the
Matrix Authentication Service rather than server admins.
"""

PATTERNS = admin_patterns(
"/users/(?P<user_id>[^/]*)/_allow_cross_signing_replacement_without_uia"
)
REPLACEMENT_PERIOD_MS = 10 * 60 * 1000 # 10 minutes

def __init__(self, hs: "HomeServer"):
self._auth = hs.get_auth()
self._store = hs.get_datastores().main

async def on_POST(
self,
request: SynapseRequest,
user_id: str,
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self._auth, request)

if user_id is None:
raise NotFoundError("User not found")

timestamp = (
await self._store.allow_master_cross_signing_key_replacement_without_uia(
user_id, self.REPLACEMENT_PERIOD_MS
)
)

if timestamp is None:
raise NotFoundError("User has no master cross-signing key")

return HTTPStatus.OK, {"updatable_without_uia_before_ms": timestamp}


class UserByExternalId(RestServlet):
"""Find a user based on an external ID from an auth provider"""

Expand Down
57 changes: 55 additions & 2 deletions tests/rest/admin/test_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from unittest.mock import AsyncMock, Mock, patch

from parameterized import parameterized, parameterized_class

from twisted.test.proto_helpers import MemoryReactor
from twisted.web.resource import Resource

Expand All @@ -44,7 +43,6 @@
from synapse.storage.databases.main.client_ips import LAST_SEEN_GRANULARITY
from synapse.types import JsonDict, UserID, create_requester
from synapse.util import Clock

from tests import unittest
from tests.test_utils import SMALL_PNG
from tests.unittest import override_config
Expand Down Expand Up @@ -4854,3 +4852,58 @@ def test_success(self) -> None:
{"user_id": self.other_user},
channel.json_body,
)


class AllowCrossSigningReplacementTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
]

@staticmethod
def url(user: str) -> str:
template = (
"/_synapse/admin/v1/users/{}/_allow_cross_signing_replacement_without_uia"
)
return template.format(urllib.parse.quote(user))

def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main

self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")

self.other_user = self.register_user("user", "pass")

def test_error_cases(self) -> None:
fake_user = "@bums:other"
channel = self.make_request(
"POST", self.url(fake_user), access_token=self.admin_user_tok
)
# Fail: user doesn't exist
self.assertEqual(404, channel.code, msg=channel.json_body)

channel = self.make_request(
"POST", self.url(self.other_user), access_token=self.admin_user_tok
)
# Fail: user exists, but has no master cross-signing key
self.assertEqual(404, channel.code, msg=channel.json_body)

def test_success(self) -> None:
# Upload a master key.
dummy_key = {"keys": {"a": "b"}}
self.get_success(
self.store.set_e2e_cross_signing_key(self.other_user, "master", dummy_key)
)

channel = self.make_request(
"POST", self.url(self.other_user), access_token=self.admin_user_tok
)
# Success!
self.assertEqual(200, channel.code, msg=channel.json_body)

# Should now find that the key exists.
_, timestamp = self.get_success(
self.store.get_master_cross_signing_key_updatable_before(self.other_user)
)
self.assertGreater(timestamp, self.clock.time_msec())

0 comments on commit cdaecc0

Please sign in to comment.