Skip to content

Commit

Permalink
fix: force messages to be provessed slowly to ensure topology
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-pousette committed Dec 14, 2023
1 parent 12a203a commit b80c203
Showing 1 changed file with 20 additions and 12 deletions.
32 changes: 20 additions & 12 deletions packages/transport/stream/src/__tests__/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ describe("streams", function () {
let session: TestSessionStream;
let streams: ReturnType<typeof createMetrics>[];

beforeAll(async () => {});
beforeAll(async () => { });

beforeEach(async () => {
// 0 and 2 not connected
Expand Down Expand Up @@ -248,7 +248,7 @@ describe("streams", function () {
let session: TestSessionStream;
let streams: ReturnType<typeof createMetrics>[];

beforeAll(async () => {});
beforeAll(async () => { });

beforeEach(async () => {
// 0 and 2 not connected
Expand Down Expand Up @@ -565,7 +565,7 @@ describe("streams", function () {
let session: TestSessionStream;
let streams: ReturnType<typeof createMetrics>[];

beforeAll(async () => {});
beforeAll(async () => { });

beforeEach(async () => {
session = await connected(3, {
Expand Down Expand Up @@ -717,7 +717,7 @@ describe("streams", function () {
let streams: ReturnType<typeof createMetrics>[];
const data = new Uint8Array([1, 2, 3]);

beforeAll(async () => {});
beforeAll(async () => { });

beforeEach(async () => {
session = await connected(3, {
Expand All @@ -742,6 +742,14 @@ describe("streams", function () {
});

it("messages are only sent once to each peer", async () => {
streams.forEach((stream) => {
const processFn = stream.stream.processMessage.bind(stream.stream);
stream.stream.processMessage = async (a, b, c) => {
await delay(200);
return processFn(a, b, c);
};
});

let totalWrites = 10;
expect(streams[0].ack).toHaveLength(0);

Expand Down Expand Up @@ -837,7 +845,7 @@ describe("streams", function () {
let streams: ReturnType<typeof createMetrics>[];
const data = new Uint8Array([1, 2, 3]);

beforeAll(async () => {});
beforeAll(async () => { });

beforeEach(async () => {
session = await disconnected(5, {
Expand Down Expand Up @@ -1202,7 +1210,7 @@ describe("streams", function () {
let streams: ReturnType<typeof createMetrics>[];
let timer: ReturnType<typeof setTimeout>;

beforeAll(async () => {});
beforeAll(async () => { });

beforeEach(async () => {
session = await connected(3, {
Expand Down Expand Up @@ -1937,13 +1945,13 @@ describe("join/leave", () => {
{
mode: seekDelivery[i]
? new SeekDelivery({
redundancy: 1,
to: [slow.publicKey, fast.publicKey]
})
redundancy: 1,
to: [slow.publicKey, fast.publicKey]
})
: new SilentDelivery({
redundancy: 1,
to: [slow.publicKey, fast.publicKey]
}) // undefined ?
redundancy: 1,
to: [slow.publicKey, fast.publicKey]
}) // undefined ?
}
);

Expand Down

0 comments on commit b80c203

Please sign in to comment.