Skip to content

Commit

Permalink
fix: correctly handle ack cache cb
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-pousette committed Dec 5, 2023
1 parent e7ac7c4 commit ab1f8ce
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ describe("index", () => {

await l0c.trustGraph.log.waitForReplicator(
session.peers[0].identity.publicKey,
session.peers[1].identity.publicKey
session.peers[1].identity.publicKey,
session.peers[3].identity.publicKey
);

// Try query with trusted
Expand Down
60 changes: 31 additions & 29 deletions packages/transport/stream/src/__tests__/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,9 @@ describe("streams", function () {
});

// since redundancy is set to 2 by default we wil receive 2 acks
await waitForResolved(() => expect(streams[0].ack).toHaveLength(2));
await waitForResolved(() => expect(streams[0].ack).toHaveLength(3));
await delay(2000);
await waitForResolved(() => expect(streams[0].ack).toHaveLength(2));
await waitForResolved(() => expect(streams[0].ack).toHaveLength(3));

streams[1].messages = [];
streams[3].received = [];
Expand All @@ -459,7 +459,10 @@ describe("streams", function () {
streams[3].stream.publicKeyHash
)
?.list?.map((x) => x.hash)
).toEqual([streams[2].stream.publicKeyHash]); // "2" is fastest route
).toEqual([
streams[2].stream.publicKeyHash,
streams[1].stream.publicKeyHash
]); // "2" is fastest route

await streams[0].stream.publish(crypto.randomBytes(1e2), {
to: [streams[3].stream.components.peerId]
Expand Down Expand Up @@ -668,7 +671,6 @@ describe("streams", function () {
let totalWrites = 10;
expect(streams[0].ack).toHaveLength(0);

await delay(3000);
// push one message to ensure paths are found
await streams[0].stream.publish(data, {
to: [
Expand All @@ -694,21 +696,21 @@ describe("streams", function () {
)
).toBeTrue();

streams[0].stream.routeSeekInterval = Number.MAX_VALUE; // disable seek so that we can check that the right amount of messages are sent below
streams[1].received = [];
streams[2].received = [];
const allWrites = streams.map((x) => collectDataWrites(x.stream));

await waitForResolved(() =>
expect(streams[0].stream.routes.countAll()).toEqual(4)
);
await waitForResolved(() =>
expect(streams[1].stream.routes.countAll()).toEqual(3)
expect(streams[1].stream.routes.countAll()).toEqual(4)
);
await waitForResolved(() =>
expect(streams[2].stream.routes.countAll()).toEqual(3)
expect(streams[2].stream.routes.countAll()).toEqual(4)
);

streams[0].stream.routeSeekInterval = Number.MAX_VALUE; // disable seek so that we can check that the right amount of messages are sent below
streams[1].received = [];
streams[2].received = [];
const allWrites = streams.map((x) => collectDataWrites(x.stream));

// expect the data to be sent smartly
for (let i = 0; i < totalWrites; i++) {
await streams[0].stream.publish(data, {
Expand Down Expand Up @@ -737,22 +739,22 @@ describe("streams", function () {

describe("1->2->2", () => {
/**
┌─────┐
│0 │
└┬───┬┘
┌▽─┐┌▽─┐
│2 ││1 │
└┬┬┘└┬┬┘
││ ││
││ ││
││ └│┐
└│──┐││
┌│──│┘│
││ │┌┘
┌▽▽┐┌▽▽┐
│3 ││4 │ // 3 and 4 are connected also
└──┘└──┘
*/
┌─────┐
│0 │
└┬───┬┘
┌▽─┐┌▽─┐
│2 ││1 │
└┬┬┘└┬┬┘
││ ││
││ ││
││ └│┐
└│──┐││
┌│──│┘│
││ │┌┘
┌▽▽┐┌▽▽┐
│3 ││4 │ // 3 and 4 are connected also
└──┘└──┘
*/

let session: TestSessionStream;
let streams: ReturnType<typeof createMetrics>[];
Expand Down Expand Up @@ -844,11 +846,11 @@ describe("streams", function () {
expect(streams[2].stream.routes.countAll()).toEqual(5)
);
await waitForResolved(() =>
expect(streams[3].stream.routes.countAll()).toEqual(4)
expect(streams[3].stream.routes.countAll()).toEqual(5)
);

await waitForResolved(() =>
expect(streams[4].stream.routes.countAll()).toEqual(4)
expect(streams[4].stream.routes.countAll()).toEqual(5)
);

const allWrites = streams.map((x) => collectDataWrites(x.stream));
Expand Down
6 changes: 3 additions & 3 deletions packages/transport/stream/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1118,9 +1118,9 @@ export abstract class DirectStream<
ackCache.push(async (ack) => {
ack.seenCounter += 1;
await ack.sign(this.sign);
const trace = this.traces.get(messageId);
const stream = trace ? this.peers.get(trace) : undefined;
stream && this.publishMessage(this.publicKey, ack, [stream]);
/* const trace = this.traces.get(messageId);
const stream = trace ? this.peers.get(trace) : undefined; */
peerStream && this.publishMessage(this.publicKey, ack, [peerStream]);
});
this.ackCache.add(messageId, ackCache);
return true;
Expand Down

0 comments on commit ab1f8ce

Please sign in to comment.