Skip to content

Commit

Permalink
Add support for AS websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
hifi committed Feb 7, 2023
1 parent e0af58d commit 2ef90b3
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 1 deletion.
18 changes: 17 additions & 1 deletion heisenbridge/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from heisenbridge.room import Room
from heisenbridge.room import RoomInvalidError
from heisenbridge.space_room import SpaceRoom
from heisenbridge.websocket import AppserviceWebsocket


class MemoryBridgeStateStore(ASStateStore, MemoryStateStore):
Expand Down Expand Up @@ -484,6 +485,14 @@ async def run(self, listen_address, listen_port, homeserver_url, owner, safe_mod
if safe_mode:
print("Safe mode is enabled.", flush=True)

url = urllib.parse.urlparse(homeserver_url)
ws = None
if url.scheme in ["ws", "wss"]:
print("Using websockets to receive transactions. Listening is still enabled.")
ws = AppserviceWebsocket(homeserver_url, self.registration["as_token"], self._on_mx_event)
homeserver_url = url._replace(scheme=("https" if url.scheme == "wss" else "http")).geturl()
print(f"Connecting to HS at {homeserver_url}")

self.api = HTTPAPI(base_url=homeserver_url, token=self.registration["as_token"])

# conduit requires that the appservice user is registered before whoami
Expand Down Expand Up @@ -748,6 +757,10 @@ def sync_connect(room):

await self.push_bridge_state(BridgeStateEvent.UNCONFIGURED)

# late start WS to avoid getting transactions too early
if ws:
await ws.start()

if self.config["owner"] and not owner_control_open:
print(f"Opening control room for owner {self.config['owner']}")
try:
Expand Down Expand Up @@ -934,4 +947,7 @@ def main():


if __name__ == "__main__":
main()
try:
main()
except KeyboardInterrupt:
pass
66 changes: 66 additions & 0 deletions heisenbridge/websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import asyncio
import json
import logging

import aiohttp
from mautrix.types.event import Event


class AppserviceWebsocket:
def __init__(self, url, token, callback):
self.url = url + "/_matrix/client/unstable/fi.mau.as_sync"
self.headers = {
"Authorization": f"Bearer {token}",
"X-Mautrix-Websocket-Version": "3",
}
self.callback = callback

async def start(self):
asyncio.create_task(self._loop())

async def _loop(self):
while True:
try:
logging.info(f"Connecting to {self.url}...")

async with aiohttp.ClientSession(headers=self.headers) as sess:
async with sess.ws_connect(self.url) as ws:
logging.info("Websocket connected.")

async for msg in ws:
if msg.type != aiohttp.WSMsgType.TEXT:
logging.debug("Unhandled WS message: %s", msg)
continue

data = msg.json()
if data["status"] == "ok" and data["command"] == "transaction":
logging.debug(f"Websocket transaction {data['txn_id']}")
for event in data["events"]:
try:
await self.callback(Event.deserialize(event))
except Exception as e:
logging.error(e)

await ws.send_str(
json.dumps(
{
"command": "response",
"id": data["id"],
"data": {},
}
)
)
else:
logging.warn("Unhandled WS command: %s", data)

logging.info("Websocket disconnected.")
except asyncio.CancelledError:
logging.info("Websocket was cancelled.")
return
except Exception as e:
logging.error(e)

try:
await asyncio.sleep(5)
except asyncio.CancelledError:
return
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ install_requires =
ruamel.yaml >=0.15.35, <0.18
mautrix >=0.15.0, <0.17
python-socks[asyncio] >= 1.2.4
aiohttp >=3.8.0, <4.0.0

python_requires = >=3.8

Expand Down

0 comments on commit 2ef90b3

Please sign in to comment.