From 5617a1fa618ea64f8ca7bca139f5742c6c7cc62e Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Thu, 29 Jun 2023 14:00:53 +0100 Subject: [PATCH 1/2] Refactor transaction handler. --- src/appservice/Appservice.ts | 405 ++++++++++++++++++----------------- 1 file changed, 206 insertions(+), 199 deletions(-) diff --git a/src/appservice/Appservice.ts b/src/appservice/Appservice.ts index 36f9356f..bfddc20d 100644 --- a/src/appservice/Appservice.ts +++ b/src/appservice/Appservice.ts @@ -236,7 +236,7 @@ export class Appservice extends EventEmitter { private appServer: any; private intentsCache: LRU; private eventProcessors: { [eventType: string]: IPreprocessor[] } = {}; - private pendingTransactions: { [txnId: string]: Promise } = {}; + private pendingTransactions = new Map>(); /** * Creates a new application service. @@ -655,248 +655,255 @@ export class Appservice extends EventEmitter { return providedToken === this.registration.hs_token; } - private async onTransaction(req: express.Request, res: express.Response): Promise { - if (!this.isAuthed(req)) { - res.status(401).json({ errcode: "AUTH_FAILED", error: "Authentication failed" }); - return; - } - - if (typeof (req.body) !== "object") { - res.status(400).json({ errcode: "BAD_REQUEST", error: "Expected JSON" }); - return; - } - - if (!req.body["events"] || !Array.isArray(req.body["events"])) { - res.status(400).json({ errcode: "BAD_REQUEST", error: "Invalid JSON: expected events" }); - return; - } - - const txnId = req.params["txnId"]; + private async handleTransaction(body: Record) { + // Process all the crypto stuff first to ensure that future transactions (if not this one) + // will decrypt successfully. We start with EDUs because we need structures to put counts + // and such into in a later stage, and EDUs are independent of crypto. - if (await Promise.resolve(this.storage.isTransactionCompleted(txnId))) { - res.status(200).json({}); - return; + const byUserId: { + [userId: string]: { + counts: Record; + toDevice: any[]; + unusedFallbacks: OTKAlgorithm[]; + }; + } = {}; + + const orderedEdus = []; + if (Array.isArray(body["de.sorunome.msc2409.to_device"])) { + orderedEdus.push(...body["de.sorunome.msc2409.to_device"].map(e => ({ + ...e, + unsigned: { + ...e['unsigned'], + [EDU_ANNOTATION_KEY]: EduAnnotation.ToDevice, + }, + }))); } - - if (this.pendingTransactions[txnId]) { - try { - await this.pendingTransactions[txnId]; - res.status(200).json({}); - } catch (e) { - LogService.error("Appservice", e); - res.status(500).json({}); - } - return; + if (Array.isArray(body["de.sorunome.msc2409.ephemeral"])) { + orderedEdus.push(...body["de.sorunome.msc2409.ephemeral"].map(e => ({ + ...e, + unsigned: { + ...e['unsigned'], + [EDU_ANNOTATION_KEY]: EduAnnotation.Ephemeral, + }, + }))); } + for (let event of orderedEdus) { + if (event['edu_type']) event['type'] = event['edu_type']; // handle property change during MSC2409's course - LogService.info("Appservice", "Processing transaction " + txnId); - // eslint-disable-next-line no-async-promise-executor - this.pendingTransactions[txnId] = new Promise(async (resolve) => { - // Process all the crypto stuff first to ensure that future transactions (if not this one) - // will decrypt successfully. We start with EDUs because we need structures to put counts - // and such into in a later stage, and EDUs are independent of crypto. - - const byUserId: { - [userId: string]: { - counts: Record; - toDevice: any[]; - unusedFallbacks: OTKAlgorithm[]; - }; - } = {}; - - const orderedEdus = []; - if (Array.isArray(req.body["de.sorunome.msc2409.to_device"])) { - orderedEdus.push(...req.body["de.sorunome.msc2409.to_device"].map(e => ({ - ...e, - unsigned: { - ...e['unsigned'], - [EDU_ANNOTATION_KEY]: EduAnnotation.ToDevice, - }, - }))); - } - if (Array.isArray(req.body["de.sorunome.msc2409.ephemeral"])) { - orderedEdus.push(...req.body["de.sorunome.msc2409.ephemeral"].map(e => ({ - ...e, - unsigned: { - ...e['unsigned'], - [EDU_ANNOTATION_KEY]: EduAnnotation.Ephemeral, - }, - }))); - } - for (let event of orderedEdus) { - if (event['edu_type']) event['type'] = event['edu_type']; // handle property change during MSC2409's course + LogService.info("Appservice", `Processing ${event['unsigned'][EDU_ANNOTATION_KEY]} event of type ${event["type"]}`); + event = await this.processEphemeralEvent(event); - LogService.info("Appservice", `Processing ${event['unsigned'][EDU_ANNOTATION_KEY]} event of type ${event["type"]}`); - event = await this.processEphemeralEvent(event); + // These events aren't tied to rooms, so just emit them generically + this.emit("ephemeral.event", event); - // These events aren't tied to rooms, so just emit them generically - this.emit("ephemeral.event", event); + if (this.cryptoStorage && (event["type"] === "m.room.encrypted" || event.unsigned?.[EDU_ANNOTATION_KEY] === EduAnnotation.ToDevice)) { + const toUser = event["to_user_id"]; + const intent = this.getIntentForUserId(toUser); + await intent.enableEncryption(); - if (this.cryptoStorage && (event["type"] === "m.room.encrypted" || event.unsigned?.[EDU_ANNOTATION_KEY] === EduAnnotation.ToDevice)) { - const toUser = event["to_user_id"]; - const intent = this.getIntentForUserId(toUser); - await intent.enableEncryption(); - - if (!byUserId[toUser]) byUserId[toUser] = { counts: null, toDevice: null, unusedFallbacks: null }; - if (!byUserId[toUser].toDevice) byUserId[toUser].toDevice = []; - byUserId[toUser].toDevice.push(event); - } + if (!byUserId[toUser]) byUserId[toUser] = { counts: null, toDevice: null, unusedFallbacks: null }; + if (!byUserId[toUser].toDevice) byUserId[toUser].toDevice = []; + byUserId[toUser].toDevice.push(event); } + } - const deviceLists: { changed: string[], removed: string[] } = req.body["org.matrix.msc3202.device_lists"] ?? { - changed: [], - removed: [], - }; + const deviceLists = body["org.matrix.msc3202.device_lists"] as { changed: string[], removed: string[] } ?? { + changed: [], + removed: [], + }; - if (!deviceLists.changed) deviceLists.changed = []; - if (!deviceLists.removed) deviceLists.removed = []; + if (!deviceLists.changed) deviceLists.changed = []; + if (!deviceLists.removed) deviceLists.removed = []; - if (deviceLists.changed.length || deviceLists.removed.length) { - this.emit("device_lists", deviceLists); - } + if (deviceLists.changed.length || deviceLists.removed.length) { + this.emit("device_lists", deviceLists); + } - let otks = req.body["org.matrix.msc3202.device_one_time_keys_count"]; - const otks2 = req.body["org.matrix.msc3202.device_one_time_key_counts"]; - if (otks2 && !otks) { - LogService.warn( - "Appservice", - "Your homeserver is using an outdated field (device_one_time_key_counts) to talk to this appservice. " + - "If you're using Synapse, please upgrade to 1.73.0 or higher.", - ); - otks = otks2; - } - if (otks) { - this.emit("otk.counts", otks); - } - if (otks && this.cryptoStorage) { - for (const userId of Object.keys(otks)) { - const intent = this.getIntentForUserId(userId); - await intent.enableEncryption(); - const otksForUser = otks[userId][intent.underlyingClient.crypto.clientDeviceId]; - if (otksForUser) { - if (!byUserId[userId]) { - byUserId[userId] = { - counts: null, - toDevice: null, - unusedFallbacks: null, - }; - } - byUserId[userId].counts = otksForUser; + let otks = body["org.matrix.msc3202.device_one_time_keys_count"]; + const otks2 = body["org.matrix.msc3202.device_one_time_key_counts"]; + if (otks2 && !otks) { + LogService.warn( + "Appservice", + "Your homeserver is using an outdated field (device_one_time_key_counts) to talk to this appservice. " + + "If you're using Synapse, please upgrade to 1.73.0 or higher.", + ); + otks = otks2; + } + if (otks) { + this.emit("otk.counts", otks); + } + if (otks && this.cryptoStorage) { + for (const userId of Object.keys(otks)) { + const intent = this.getIntentForUserId(userId); + await intent.enableEncryption(); + const otksForUser = otks[userId][intent.underlyingClient.crypto.clientDeviceId]; + if (otksForUser) { + if (!byUserId[userId]) { + byUserId[userId] = { + counts: null, + toDevice: null, + unusedFallbacks: null, + }; } + byUserId[userId].counts = otksForUser; } } + } - const fallbacks = req.body["org.matrix.msc3202.device_unused_fallback_key_types"]; - if (fallbacks) { - this.emit("otk.unused_fallback_keys", fallbacks); - } - if (fallbacks && this.cryptoStorage) { - for (const userId of Object.keys(fallbacks)) { - const intent = this.getIntentForUserId(userId); - await intent.enableEncryption(); - const fallbacksForUser = fallbacks[userId][intent.underlyingClient.crypto.clientDeviceId]; - if (Array.isArray(fallbacksForUser) && !fallbacksForUser.includes(OTKAlgorithm.Signed)) { - if (!byUserId[userId]) { - byUserId[userId] = { - counts: null, - toDevice: null, - unusedFallbacks: null, - }; - } - byUserId[userId].unusedFallbacks = fallbacksForUser; + const fallbacks = body["org.matrix.msc3202.device_unused_fallback_key_types"]; + if (fallbacks) { + this.emit("otk.unused_fallback_keys", fallbacks); + } + if (fallbacks && this.cryptoStorage) { + for (const userId of Object.keys(fallbacks)) { + const intent = this.getIntentForUserId(userId); + await intent.enableEncryption(); + const fallbacksForUser = fallbacks[userId][intent.underlyingClient.crypto.clientDeviceId]; + if (Array.isArray(fallbacksForUser) && !fallbacksForUser.includes(OTKAlgorithm.Signed)) { + if (!byUserId[userId]) { + byUserId[userId] = { + counts: null, + toDevice: null, + unusedFallbacks: null, + }; } + byUserId[userId].unusedFallbacks = fallbacksForUser; } } + } - if (this.cryptoStorage) { - for (const userId of Object.keys(byUserId)) { - const intent = this.getIntentForUserId(userId); - await intent.enableEncryption(); - const info = byUserId[userId]; - const userStorage = this.storage.storageForUser(userId); + if (this.cryptoStorage) { + for (const userId of Object.keys(byUserId)) { + const intent = this.getIntentForUserId(userId); + await intent.enableEncryption(); + const info = byUserId[userId]; + const userStorage = this.storage.storageForUser(userId); - if (!info.toDevice) info.toDevice = []; - if (!info.unusedFallbacks) info.unusedFallbacks = JSON.parse(await userStorage.readValue("last_unused_fallbacks") || "[]"); - if (!info.counts) info.counts = JSON.parse(await userStorage.readValue("last_counts") || "{}"); + if (!info.toDevice) info.toDevice = []; + if (!info.unusedFallbacks) info.unusedFallbacks = JSON.parse(await userStorage.readValue("last_unused_fallbacks") || "[]"); + if (!info.counts) info.counts = JSON.parse(await userStorage.readValue("last_counts") || "{}"); - LogService.info("Appservice", `Updating crypto state for ${userId}`); - await intent.underlyingClient.crypto.updateSyncData(info.toDevice, info.counts, info.unusedFallbacks, deviceLists.changed, deviceLists.removed); - } + LogService.info("Appservice", `Updating crypto state for ${userId}`); + await intent.underlyingClient.crypto.updateSyncData(info.toDevice, info.counts, info.unusedFallbacks, deviceLists.changed, deviceLists.removed); } + } - for (let event of req.body["events"]) { - LogService.info("Appservice", `Processing event of type ${event["type"]}`); - event = await this.processEvent(event); - if (event['type'] === 'm.room.encrypted') { - this.emit("room.encrypted_event", event["room_id"], event); - if (this.cryptoStorage) { + for (let event of body.events as any[]) { + LogService.info("Appservice", `Processing event of type ${event["type"]}`); + event = await this.processEvent(event); + if (event['type'] === 'm.room.encrypted') { + this.emit("room.encrypted_event", event["room_id"], event); + if (this.cryptoStorage) { + try { + const encrypted = new EncryptedRoomEvent(event); + const roomId = event['room_id']; try { - const encrypted = new EncryptedRoomEvent(event); - const roomId = event['room_id']; + event = (await this.botClient.crypto.decryptRoomEvent(encrypted, roomId)).raw; + event = await this.processEvent(event); + this.emit("room.decrypted_event", roomId, event); + + // For logging purposes: show that the event was decrypted + LogService.info("Appservice", `Processing decrypted event of type ${event["type"]}`); + } catch (e1) { + LogService.warn("Appservice", `Bot client was not able to decrypt ${roomId} ${event['event_id']} - trying other intents`); + + let tryUserId: string; try { - event = (await this.botClient.crypto.decryptRoomEvent(encrypted, roomId)).raw; + // TODO: This could be more efficient + const userIdsInRoom = await this.botClient.getJoinedRoomMembers(roomId); + tryUserId = userIdsInRoom.find(u => this.isNamespacedUser(u)); + } catch (e) { + LogService.error("Appservice", "Failed to get members of room - cannot decrypt message"); + } + + if (tryUserId) { + const intent = this.getIntentForUserId(tryUserId); + + event = (await intent.underlyingClient.crypto.decryptRoomEvent(encrypted, roomId)).raw; event = await this.processEvent(event); this.emit("room.decrypted_event", roomId, event); // For logging purposes: show that the event was decrypted LogService.info("Appservice", `Processing decrypted event of type ${event["type"]}`); - } catch (e1) { - LogService.warn("Appservice", `Bot client was not able to decrypt ${roomId} ${event['event_id']} - trying other intents`); - - let tryUserId: string; - try { - // TODO: This could be more efficient - const userIdsInRoom = await this.botClient.getJoinedRoomMembers(roomId); - tryUserId = userIdsInRoom.find(u => this.isNamespacedUser(u)); - } catch (e) { - LogService.error("Appservice", "Failed to get members of room - cannot decrypt message"); - } - - if (tryUserId) { - const intent = this.getIntentForUserId(tryUserId); - - event = (await intent.underlyingClient.crypto.decryptRoomEvent(encrypted, roomId)).raw; - event = await this.processEvent(event); - this.emit("room.decrypted_event", roomId, event); - - // For logging purposes: show that the event was decrypted - LogService.info("Appservice", `Processing decrypted event of type ${event["type"]}`); - } else { - // noinspection ExceptionCaughtLocallyJS - throw e1; - } + } else { + // noinspection ExceptionCaughtLocallyJS + throw e1; } - } catch (e) { - LogService.error("Appservice", `Decryption error on ${event['room_id']} ${event['event_id']}`, e); - this.emit("room.failed_decryption", event['room_id'], event, e); } + } catch (e) { + LogService.error("Appservice", `Decryption error on ${event['room_id']} ${event['event_id']}`, e); + this.emit("room.failed_decryption", event['room_id'], event, e); } } - this.emit("room.event", event["room_id"], event); - if (event['type'] === 'm.room.message') { - this.emit("room.message", event["room_id"], event); - } - if (event['type'] === 'm.room.member' && this.isNamespacedUser(event['state_key'])) { - await this.processMembershipEvent(event); - } - if (event['type'] === 'm.room.tombstone' && event['state_key'] === '') { - this.emit("room.archived", event['room_id'], event); - } - if (event['type'] === 'm.room.create' && event['state_key'] === '' && event['content'] && event['content']['predecessor']) { - this.emit("room.upgraded", event['room_id'], event); - } } + this.emit("room.event", event["room_id"], event); + if (event['type'] === 'm.room.message') { + this.emit("room.message", event["room_id"], event); + } + if (event['type'] === 'm.room.member' && this.isNamespacedUser(event['state_key'])) { + await this.processMembershipEvent(event); + } + if (event['type'] === 'm.room.tombstone' && event['state_key'] === '') { + this.emit("room.archived", event['room_id'], event); + } + if (event['type'] === 'm.room.create' && event['state_key'] === '' && event['content'] && event['content']['predecessor']) { + this.emit("room.upgraded", event['room_id'], event); + } + } + } - resolve(); - }); + private async onTransaction(req: express.Request, res: express.Response): Promise { + if (!this.isAuthed(req)) { + res.status(401).json({ errcode: "AUTH_FAILED", error: "Authentication failed" }); + return; + } + + if (typeof (req.body) !== "object") { + res.status(400).json({ errcode: "BAD_REQUEST", error: "Expected JSON" }); + return; + } + + if (!req.body["events"] || !Array.isArray(req.body["events"])) { + res.status(400).json({ errcode: "BAD_REQUEST", error: "Invalid JSON: expected events" }); + return; + } + + const txnId = req.params["txnId"]; + + try { + if (await this.storage.isTransactionCompleted(txnId)) { + res.status(200).json({}); + } + } catch (e) { + LogService.error("Appservice", e); + res.status(500).json({}); + } + + if (this.pendingTransactions.has(txnId)) { + // The homeserver has retried a transaction while we're still handling it. + try { + await this.pendingTransactions.get(txnId); + res.status(200).json({}); + } catch (e) { + LogService.error("Appservice", e); + res.status(500).json({}); + } + return; + } + + LogService.info("Appservice", `Processing transaction ${txnId}`); + const txnHandler = this.handleTransaction(req.body); + this.pendingTransactions.set(txnId, txnHandler); try { - await this.pendingTransactions[txnId]; + await txnHandler; await Promise.resolve(this.storage.setTransactionCompleted(txnId)); res.status(200).json({}); } catch (e) { LogService.error("Appservice", e); res.status(500).json({}); + } finally { + this.pendingTransactions.delete(txnId); } } From a0c209ee78a43d5f783cf2a623ee9cb8e432a21c Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Mon, 3 Jul 2023 09:47:40 +0100 Subject: [PATCH 2/2] Fix races --- src/appservice/Appservice.ts | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/appservice/Appservice.ts b/src/appservice/Appservice.ts index bfddc20d..d9dd8c0c 100644 --- a/src/appservice/Appservice.ts +++ b/src/appservice/Appservice.ts @@ -655,10 +655,14 @@ export class Appservice extends EventEmitter { return providedToken === this.registration.hs_token; } - private async handleTransaction(body: Record) { + private async handleTransaction(txnId: string, body: Record) { // Process all the crypto stuff first to ensure that future transactions (if not this one) // will decrypt successfully. We start with EDUs because we need structures to put counts // and such into in a later stage, and EDUs are independent of crypto. + if (await this.storage.isTransactionCompleted(txnId)) { + // Duplicate. + return; + } const byUserId: { [userId: string]: { @@ -868,16 +872,7 @@ export class Appservice extends EventEmitter { return; } - const txnId = req.params["txnId"]; - - try { - if (await this.storage.isTransactionCompleted(txnId)) { - res.status(200).json({}); - } - } catch (e) { - LogService.error("Appservice", e); - res.status(500).json({}); - } + const { txnId } = req.params; if (this.pendingTransactions.has(txnId)) { // The homeserver has retried a transaction while we're still handling it. @@ -892,12 +887,18 @@ export class Appservice extends EventEmitter { } LogService.info("Appservice", `Processing transaction ${txnId}`); - const txnHandler = this.handleTransaction(req.body); + const txnHandler = this.handleTransaction(txnId, req.body); this.pendingTransactions.set(txnId, txnHandler); try { await txnHandler; - await Promise.resolve(this.storage.setTransactionCompleted(txnId)); + try { + await this.storage.setTransactionCompleted(txnId); + } catch (ex) { + // Not fatal for the transaction since we *did* process it, but we should + // warn loudly. + LogService.warn("Appservice", "Failed to store completed transaction", ex); + } res.status(200).json({}); } catch (e) { LogService.error("Appservice", e);