Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix local echo in embedded mode #4498

Merged
merged 9 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions spec/unit/embedded.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
ITurnServer,
IRoomEvent,
IOpenIDCredentials,
ISendEventFromWidgetResponseData,
} from "matrix-widget-api";

import { createRoomWidgetClient, MsgType, UpdateDelayedEventAction } from "../../src/matrix";
Expand Down Expand Up @@ -187,6 +188,28 @@
.map((e) => e.getEffectiveEvent()),
).toEqual([event]);
});
it("updates local echo", async () => {
await makeClient({
receiveEvent: ["org.matrix.rageshake_request"],
sendEvent: ["org.matrix.rageshake_request"],
});
expect(widgetApi.requestCapabilityForRoomTimeline).toHaveBeenCalledWith("!1:example.org");
expect(widgetApi.requestCapabilityToReceiveEvent).toHaveBeenCalledWith("org.matrix.rageshake_request");
// const sendSpy = jest.spyOn(widgetApi.transport, "send");
const sendMock = jest.fn();
new Promise((resolve, _) => {
widgetApi.sendRoomEvent.mockImplementation(
async (eType, content, roomId): Promise<ISendEventFromWidgetResponseData> => {},

Check failure on line 202 in spec/unit/embedded.spec.ts

View workflow job for this annotation

GitHub Actions / Typescript Syntax Check

A function whose declared type is neither 'undefined', 'void', nor 'any' must return a value.
);
});
// widgetApi.transport.o;
client.sendEvent("!1:example.org", "org.matrix.rageshake_request", { request_id: 12 }, "widgetTxId");
expect(sendMock).toHaveBeenCalledWith("abc");
widgetApi.emit(
`action:${WidgetApiToWidgetAction.SendEvent}`,
new CustomEvent(`action:${WidgetApiToWidgetAction.SendEvent}`, { detail: { data: event } }),
);
});
});

describe("delayed events", () => {
Expand Down
67 changes: 66 additions & 1 deletion src/embedded.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import { ToDeviceBatch, ToDevicePayload } from "./models/ToDeviceMessage.ts";
import { DeviceInfo } from "./crypto/deviceinfo.ts";
import { IOlmDevice } from "./crypto/algorithms/megolm.ts";
import { MapWithDefault, recursiveMapToObject } from "./utils.ts";
import { TypedEventEmitter } from "./matrix.ts";

interface IStateEventRequest {
eventType: string;
Expand Down Expand Up @@ -117,6 +118,10 @@ export interface ICapabilities {
updateDelayedEvents?: boolean;
}

export enum PendingEvent {
PendingEventsChanged = "PendingEvent.pendingEventsChanged",
}
export type PendingEventHandlerMap = { [PendingEvent.PendingEventsChanged]: () => void };
/**
* A MatrixClient that routes its requests through the widget API instead of the
* real CS API.
Expand All @@ -128,6 +133,9 @@ export class RoomWidgetClient extends MatrixClient {
private lifecycle?: AbortController;
private syncState: SyncState | null = null;

private pendingSendingEventsTxId: { type: string; id: string | undefined; txId: string }[] = [];
private pendingEventsEmitter = new TypedEventEmitter<PendingEvent.PendingEventsChanged, PendingEventHandlerMap>();
toger5 marked this conversation as resolved.
Show resolved Hide resolved

/**
*
* @param widgetApi - The widget api to use for communication.
Expand Down Expand Up @@ -297,6 +305,8 @@ export class RoomWidgetClient extends MatrixClient {
const content = event.event.redacts
? { ...event.getContent(), redacts: event.event.redacts }
: event.getContent();

// Delayed event special case.
if (delayOpts) {
// TODO: updatePendingEvent for delayed events?
const response = await this.widgetApi.sendRoomEvent(
Expand All @@ -309,16 +319,26 @@ export class RoomWidgetClient extends MatrixClient {
return this.validateSendDelayedEventResponse(response);
}

const txId = event.getTxnId();
// Add the txnId to the pending list (still with unknown evID)
if (txId) this.pendingSendingEventsTxId.push({ type: event.getType(), id: undefined, txId });

let response: ISendEventFromWidgetResponseData;
try {
response = await this.widgetApi.sendRoomEvent(event.getType(), content, room.roomId);
} catch (e) {
this.updatePendingEventStatus(room, event, EventStatus.NOT_SENT);
throw e;
}

// This also checks for an event id on the response
room.updatePendingEvent(event, EventStatus.SENT, response.event_id);

// Update the pending events list with the eventId
this.pendingSendingEventsTxId.forEach((p) => {
if (p.txId === txId) p.id = response.event_id;
});
this.pendingEventsEmitter.emit(PendingEvent.PendingEventsChanged);

return { event_id: response.event_id! };
}

Expand Down Expand Up @@ -462,13 +482,58 @@ export class RoomWidgetClient extends MatrixClient {
await this.widgetApi.transport.reply<IWidgetApiAcknowledgeResponseData>(ev.detail, {});
}

private updateTxId = async (event: MatrixEvent): Promise<void> => {
// We update the txId for remote echos that originate from this client.
// This happens with the help of `pendingSendingEventsTxId` where we store all events that are currently sending
// with their widget txId and once ready the final evId.
if (
// This could theoretically be an event send by this device
// In that case we need to update the txId of the event because the embedded client/widget
// knows this event with a different transaction Id than what was used by the host client.
event.getSender() === this.getUserId() &&
// We optimize by not blocking events from types that we have not send
// with this client.
this.pendingSendingEventsTxId.some((p) => event.getType() === p.type)
) {
// Compare by event Id if we have a matching pending event where we know the txId.
let matchingTxId = this.pendingSendingEventsTxId.find((p) => p.id === event.getId())?.txId;
// Block any further processing of this event until we have received the sending response.
// -> until we know the event id.
// -> until we have not pending events anymore.
while (!matchingTxId && this.pendingSendingEventsTxId.length > 0) {
// Recheck whenever the PendingEventsChanged
await new Promise<void>((resolve) =>
this.pendingEventsEmitter.once(PendingEvent.PendingEventsChanged, () => resolve()),
);
matchingTxId = this.pendingSendingEventsTxId.find((p) => p.id === event.getId())?.txId;
}

// We found the correct txId: we update the event and delete the entry of the pending events.
if (matchingTxId) {
event.setTxnId(matchingTxId);
event.setUnsigned({ ...event.getUnsigned(), transaction_id: matchingTxId });
}
this.pendingSendingEventsTxId = this.pendingSendingEventsTxId.filter((p) => p.id !== event.getId());

// Emit once there are no pending events anymore to release all other events that got
// awaited in the `while (!matchingTxId && this.pendingSendingEventsTxId.length > 0)` loop
// but are not send by this client.
if (this.pendingSendingEventsTxId.length === 0) {
this.pendingEventsEmitter.emit(PendingEvent.PendingEventsChanged);
}
}
};

private onEvent = async (ev: CustomEvent<ISendEventToWidgetActionRequest>): Promise<void> => {
ev.preventDefault();

// Verify the room ID matches, since it's possible for the client to
// send us events from other rooms if this widget is always on screen
if (ev.detail.data.room_id === this.roomId) {
const event = new MatrixEvent(ev.detail.data as Partial<IEvent>);

// Only inject once we have update the txId
await this.updateTxId(event);
await this.syncApi!.injectRoomEvents(this.room!, [], [event]);
this.emit(ClientEvent.Event, event);
this.setSyncState(SyncState.Syncing);
Expand Down
Loading