Skip to content

Commit

Permalink
fix: wait until timeout for relayed ACKs
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-pousette committed Dec 14, 2023
1 parent 250cbb2 commit 398105e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 15 deletions.
15 changes: 14 additions & 1 deletion packages/transport/stream/src/__tests__/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -512,11 +512,24 @@ describe("streams", function () {
streams[1].stream.publicKeyHash
]); // "2" is fastest route

await waitForResolved(() =>
expect(
streams[2].stream.routes
.findNeighbor(
streams[0].stream.publicKeyHash,
streams[3].stream.publicKeyHash
)
?.list?.map((x) => x.hash)
).toEqual([streams[3].stream.publicKeyHash])
);

await streams[0].stream.publish(crypto.randomBytes(1e2), {
to: [streams[3].stream.components.peerId]
});

await waitFor(() => streams[3].received.length === 1);
await waitForResolved(() =>
expect(streams[3].received).toHaveLength(1)
);

expect(streams[1].messages).toHaveLength(0); // Because shortest route is 0 -> 2 -> 3
expect(streams[1].stream.routes.count()).toEqual(2);
Expand Down
26 changes: 12 additions & 14 deletions packages/transport/stream/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,7 @@ export abstract class DirectStream<
{
promise: Promise<void>;
callback: (
messageTarget: PublicSignKey,
seenCounter: number,
ack: ACK,
messageThrough: PeerStreams,
messageFrom?: PeerStreams
) => void;
Expand Down Expand Up @@ -1063,6 +1062,7 @@ export abstract class DirectStream<
this._onDataMessage(from, peerStream, msg, message).catch(logError);
} else {
if (message instanceof ACK) {
/* await delay(3000 * Math.random()) */
this.onAck(from, peerStream, msg, message).catch(logError);
} else if (message instanceof Goodbye) {
this.onGoodBye(from, peerStream, msg, message).catch(logError);
Expand Down Expand Up @@ -1250,12 +1250,7 @@ export abstract class DirectStream<

this._ackCallbacks
.get(messageIdString)
?.callback(
message.header.signatures!.publicKeys[0],
message.seenCounter,
peerStream,
nextStream
);
?.callback(message, peerStream, nextStream);

// relay ACK ?
// send exactly backwards same route we got this message
Expand Down Expand Up @@ -1524,8 +1519,11 @@ export abstract class DirectStream<

this._ackCallbacks.set(idString, {
promise: deliveryDeferredPromise.promise,
callback: (messageTarget, seenCounter, messageThrough, messageFrom) => {
callback: (ack: ACK, messageThrough, messageFrom) => {
const messageTarget = ack.header.signatures!.publicKeys[0];
const messageTargetHash = messageTarget.hashcode();
const seenCounter = ack.seenCounter;

// remove the automatic removal of route timeout since we have observed lifesigns of a peer
const timer = this.healthChecks.get(messageTargetHash);
clearTimeout(timer);
Expand Down Expand Up @@ -1564,18 +1562,18 @@ export abstract class DirectStream<
}
}

// This if clause should never enter for relayed connections, since we don't
// know how many ACKs we will get
if (
filterMessageForSeenCounter != null
? uniqueAcks.size >= messageToSet.size * filterMessageForSeenCounter
: messageToSet.size === fastestNodesReached.size
filterMessageForSeenCounter != null &&
uniqueAcks.size >= messageToSet.size * filterMessageForSeenCounter
) {
if (messageToSet.size > 0) {
// this statement exist beacuse if we do SEEK and have to = [], then it means we try to reach as many as possible hence we never want to delete this ACK callback
// only remove callback function if we actually expected a expected amount of responses
clearTimeout(timeout);
finalize();
// only remove callback function if we actually expected a finite amount of responses
}

deliveryDeferredPromise.resolve();
}
},
Expand Down

0 comments on commit 398105e

Please sign in to comment.